Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions dbm-services/mysql/db-simulation/app/service/simulation_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,12 @@ func init() {
})
go func() {
// 等待一个周期的原因,避免重载执行正常的任务
time.Sleep(time.Duration(HeartbeatInterval) * time.Second)
reloadRunningTaskFromdb()
// 因为http svr 是graceful shutdown, 可能旧的服务会接受请求
for i := 0; i < 5; i++ {
logger.Info("the %d times reload running task", i)
time.Sleep(time.Duration(HeartbeatInterval) * time.Second)
reloadRunningTaskFromdb(i + HeartbeatInterval)
}
rdb.Close()
}()
}
Expand All @@ -185,7 +189,8 @@ type ReloadParam struct {
}

// reloadRunningTaskFromdb 重载重启服务前运行的任务 加锁是避免多个服务同时启动,避免相同任务被重载
func reloadRunningTaskFromdb() {
// nolint
func reloadRunningTaskFromdb(heartbeatInterval int) {
key := "simulation:reload:lock"
locker := redislock.New(rdb)
ctx := context.Background()
Expand All @@ -200,9 +205,8 @@ func reloadRunningTaskFromdb() {
}()
var tks []model.TbSimulationTask
if err := model.DB.Model(model.TbSimulationTask{}).Where(
//nolint
"phase not in (?) and create_time > DATE_SUB(NOW(),INTERVAL 2 HOUR) and time_to_sec(timediff(now(),heartbeat_time)) > ? ",
[]string{model.PhaseDone, model.PhaseReloading}, HeartbeatInterval).Scan(&tks).Error; err != nil {
"phase not in (?) and create_time > DATE_SUB(NOW(),INTERVAL 2 HOUR) and time_to_sec(timediff(now(),heartbeat_time)) > ?",
[]string{model.PhaseDone, model.PhaseReloading}, heartbeatInterval).Scan(&tks).Error; err != nil {
logger.Error("get running task failed %s", err.Error())
return
}
Expand Down
8 changes: 4 additions & 4 deletions dbm-services/mysql/db-simulation/app/syntax/syntax.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (tf *TmysqlParseFile) Do(dbtype string, versions []string) (result map[stri
}

func (tf *TmysqlParseFile) doSingleVersion(dbtype string, mysqlVersion string) (err error) {
errChan := make(chan error, len(tf.Param.FileNames))
errChan := make(chan error)
alreadExecutedSqlfileChan := make(chan string, len(tf.Param.FileNames))
signalChan := make(chan struct{})

Expand Down Expand Up @@ -257,7 +257,7 @@ func (t *TmysqlParse) delTempDir() {
// Downloadfile download sqlfile
func (tf *TmysqlParseFile) Downloadfile() (err error) {
wg := &sync.WaitGroup{}
errCh := make(chan error, 10)
errCh := make(chan error)
c := make(chan struct{}, 5)
for _, fileName := range tf.Param.FileNames {
wg.Add(1)
Expand Down Expand Up @@ -335,7 +335,7 @@ func (tf *TmysqlParseFile) Execute(alreadExecutedSqlfileCh chan string, version
var wg sync.WaitGroup
var errs []error
c := make(chan struct{}, 10) // Semaphore to limit concurrent goroutines
errChan := make(chan error, len(tf.Param.FileNames))
errChan := make(chan error)

// Iterate through all SQL files
for _, fileName := range tf.Param.FileNames {
Expand Down Expand Up @@ -383,7 +383,7 @@ func (t *TmysqlParse) AnalyzeParseResult(alreadExecutedSqlfileCh chan string, my
dbtype string) (err error) {
var errs []error
c := make(chan struct{}, 10)
errChan := make(chan error, 5)
errChan := make(chan error)
wg := &sync.WaitGroup{}

for sqlfile := range alreadExecutedSqlfileCh {
Expand Down
4 changes: 2 additions & 2 deletions dbm-services/mysql/db-simulation/main.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package main TODO
/*
* TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
* Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
Expand All @@ -7,7 +8,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

// main
package main

import (
Expand All @@ -26,7 +27,6 @@ import (
"github.com/gin-gonic/gin"
"github.com/samber/lo"
"go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin"

_ "go.uber.org/automaxprocs"

"dbm-services/common/go-pubpkg/apm/metric"
Expand Down
2 changes: 1 addition & 1 deletion dbm-services/mysql/db-simulation/pkg/util/spider.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func ParseGetShardKeyForSpider(tableComment string) (string, error) {
if end-pos <= 0 {
return "", errors.New("parse error")
}

//nolint
len := uint(end - pos)
keyBuf := make([]byte, len)
copy(keyBuf, tableComment[pos:end])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (c *ImportSchemaFromBackendComp) migrateUseMysqlDump() (err error) {
Password: c.GeneralParam.RuntimeAccountParam.AdminPwd,
WorkDir: c.tmpDumpDir,
}
errChan := make(chan error, len(c.dumpDbs))
errChan := make(chan error)
wg := sync.WaitGroup{}
ctrChan := make(chan struct{}, c.maxThreads)
for _, db := range c.dumpDbs {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (m MySQLDumper) Dump() (err error) {
var errs []error
concurrencyControl := make(chan struct{}, m.maxConcurrency)
dumpMap := m.GetDumpFileInfo()
errChan := make(chan error, len(dumpMap))
errChan := make(chan error)
for db, outputFileName := range dumpMap {
dumper := m
dumper.DbNames = []string{db}
Expand Down
Loading