diff --git a/dbm-services/mysql/db-simulation/app/service/simulation_task.go b/dbm-services/mysql/db-simulation/app/service/simulation_task.go index 4b67c03325..9338f4b742 100644 --- a/dbm-services/mysql/db-simulation/app/service/simulation_task.go +++ b/dbm-services/mysql/db-simulation/app/service/simulation_task.go @@ -172,8 +172,12 @@ func init() { }) go func() { // 等待一个周期的原因,避免重载执行正常的任务 - time.Sleep(time.Duration(HeartbeatInterval) * time.Second) - reloadRunningTaskFromdb() + // 因为http svr 是graceful shutdown, 可能旧的服务会接受请求 + for i := 0; i < 5; i++ { + logger.Info("the %d times reload running task", i) + time.Sleep(time.Duration(HeartbeatInterval) * time.Second) + reloadRunningTaskFromdb(i + HeartbeatInterval) + } rdb.Close() }() } @@ -185,7 +189,8 @@ type ReloadParam struct { } // reloadRunningTaskFromdb 重载重启服务前运行的任务 加锁是避免多个服务同时启动,避免相同任务被重载 -func reloadRunningTaskFromdb() { +// nolint +func reloadRunningTaskFromdb(heartbeatInterval int) { key := "simulation:reload:lock" locker := redislock.New(rdb) ctx := context.Background() @@ -200,9 +205,8 @@ func reloadRunningTaskFromdb() { }() var tks []model.TbSimulationTask if err := model.DB.Model(model.TbSimulationTask{}).Where( - //nolint - "phase not in (?) and create_time > DATE_SUB(NOW(),INTERVAL 2 HOUR) and time_to_sec(timediff(now(),heartbeat_time)) > ? ", - []string{model.PhaseDone, model.PhaseReloading}, HeartbeatInterval).Scan(&tks).Error; err != nil { + "phase not in (?) and create_time > DATE_SUB(NOW(),INTERVAL 2 HOUR) and time_to_sec(timediff(now(),heartbeat_time)) > ?", + []string{model.PhaseDone, model.PhaseReloading}, heartbeatInterval).Scan(&tks).Error; err != nil { logger.Error("get running task failed %s", err.Error()) return } diff --git a/dbm-services/mysql/db-simulation/app/syntax/syntax.go b/dbm-services/mysql/db-simulation/app/syntax/syntax.go index 45207a0125..d601990f85 100644 --- a/dbm-services/mysql/db-simulation/app/syntax/syntax.go +++ b/dbm-services/mysql/db-simulation/app/syntax/syntax.go @@ -142,7 +142,7 @@ func (tf *TmysqlParseFile) Do(dbtype string, versions []string) (result map[stri } func (tf *TmysqlParseFile) doSingleVersion(dbtype string, mysqlVersion string) (err error) { - errChan := make(chan error, len(tf.Param.FileNames)) + errChan := make(chan error) alreadExecutedSqlfileChan := make(chan string, len(tf.Param.FileNames)) signalChan := make(chan struct{}) @@ -257,7 +257,7 @@ func (t *TmysqlParse) delTempDir() { // Downloadfile download sqlfile func (tf *TmysqlParseFile) Downloadfile() (err error) { wg := &sync.WaitGroup{} - errCh := make(chan error, 10) + errCh := make(chan error) c := make(chan struct{}, 5) for _, fileName := range tf.Param.FileNames { wg.Add(1) @@ -335,7 +335,7 @@ func (tf *TmysqlParseFile) Execute(alreadExecutedSqlfileCh chan string, version var wg sync.WaitGroup var errs []error c := make(chan struct{}, 10) // Semaphore to limit concurrent goroutines - errChan := make(chan error, len(tf.Param.FileNames)) + errChan := make(chan error) // Iterate through all SQL files for _, fileName := range tf.Param.FileNames { @@ -383,7 +383,7 @@ func (t *TmysqlParse) AnalyzeParseResult(alreadExecutedSqlfileCh chan string, my dbtype string) (err error) { var errs []error c := make(chan struct{}, 10) - errChan := make(chan error, 5) + errChan := make(chan error) wg := &sync.WaitGroup{} for sqlfile := range alreadExecutedSqlfileCh { diff --git a/dbm-services/mysql/db-simulation/main.go b/dbm-services/mysql/db-simulation/main.go index 8604c17348..289a032ee5 100644 --- a/dbm-services/mysql/db-simulation/main.go +++ b/dbm-services/mysql/db-simulation/main.go @@ -1,3 +1,4 @@ +// Package main TODO /* * TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. * Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. @@ -7,7 +8,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ - +// main package main import ( @@ -26,7 +27,6 @@ import ( "github.com/gin-gonic/gin" "github.com/samber/lo" "go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin" - _ "go.uber.org/automaxprocs" "dbm-services/common/go-pubpkg/apm/metric" diff --git a/dbm-services/mysql/db-simulation/pkg/util/spider.go b/dbm-services/mysql/db-simulation/pkg/util/spider.go index 3d92bfdea5..8c51a81688 100644 --- a/dbm-services/mysql/db-simulation/pkg/util/spider.go +++ b/dbm-services/mysql/db-simulation/pkg/util/spider.go @@ -49,7 +49,7 @@ func ParseGetShardKeyForSpider(tableComment string) (string, error) { if end-pos <= 0 { return "", errors.New("parse error") } - + //nolint len := uint(end - pos) keyBuf := make([]byte, len) copy(keyBuf, tableComment[pos:end]) diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/import_schema_from_backend.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/import_schema_from_backend.go index 4bdff805fb..4995420780 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/import_schema_from_backend.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/import_schema_from_backend.go @@ -334,7 +334,7 @@ func (c *ImportSchemaFromBackendComp) migrateUseMysqlDump() (err error) { Password: c.GeneralParam.RuntimeAccountParam.AdminPwd, WorkDir: c.tmpDumpDir, } - errChan := make(chan error, len(c.dumpDbs)) + errChan := make(chan error) wg := sync.WaitGroup{} ctrChan := make(chan struct{}, c.maxThreads) for _, db := range c.dumpDbs { diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go b/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go index 9118126d1c..0786b2acda 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go @@ -98,7 +98,7 @@ func (m MySQLDumper) Dump() (err error) { var errs []error concurrencyControl := make(chan struct{}, m.maxConcurrency) dumpMap := m.GetDumpFileInfo() - errChan := make(chan error, len(dumpMap)) + errChan := make(chan error) for db, outputFileName := range dumpMap { dumper := m dumper.DbNames = []string{db}