Skip to content

Commit d15a4df

Browse files
committed
fix(mysql): tendbcluster 清档时序问题 #8863
1 parent 69543a9 commit d15a4df

File tree

3 files changed

+48
-21
lines changed

3 files changed

+48
-21
lines changed

dbm-services/mysql/db-priv/service/clone_client_priv.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,37 @@ func (m *CloneClientPrivPara) CloneClientPriv(jsonPara string, ticket string) ([
233233
slog.Info("no match user@host", "instance", address, "user", m.User)
234234
continue
235235
}
236-
proxyGrants = ReplaceHostInProxyGrants(proxyGrants, m.TargetIp)
237-
clusterGrant.Sqls = append(clusterGrant.Sqls, InstanceGrantSql{address, proxyGrants})
238-
err = ImportProxyPrivileges(proxyGrants, address, item.BkCloudId)
236+
proxyUsers := ReplaceHostInProxyGrants(proxyGrants, m.TargetIp)
237+
238+
var oneBuckUsers []string
239+
for _, u := range proxyUsers {
240+
if len(oneBuckUsers) >= 1000 {
241+
refreshSql := fmt.Sprintf(
242+
"refresh_users('%s', '+')",
243+
strings.Join(oneBuckUsers, ","),
244+
)
245+
clusterGrant.Sqls = append(clusterGrant.Sqls,
246+
InstanceGrantSql{address, []string{refreshSql}},
247+
)
248+
err = ImportProxyPrivileges(
249+
[]string{refreshSql},
250+
address, item.BkCloudId)
251+
if err != nil {
252+
AddError(&errMsg, address, err)
253+
}
254+
oneBuckUsers = []string{}
255+
} else {
256+
oneBuckUsers = append(oneBuckUsers, u)
257+
}
258+
}
259+
refreshSql := fmt.Sprintf(
260+
"refresh_users('%s', '+')",
261+
strings.Join(oneBuckUsers, ","),
262+
)
263+
clusterGrant.Sqls = append(clusterGrant.Sqls,
264+
InstanceGrantSql{address, []string{refreshSql}},
265+
)
266+
err = ImportProxyPrivileges([]string{refreshSql}, address, item.BkCloudId)
239267
if err != nil {
240268
AddError(&errMsg, address, err)
241269
}

dbm-services/mysql/db-priv/service/clone_client_priv_base_func.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,9 @@ func GetProxyPrivilege(address string, hosts []string, bkCloudId int64, specifie
9999
for _, user := range usersResult {
100100
tmpUser := user["user@ip"].(string)
101101
if re.MatchString(tmpUser) && !monitorReg.MatchString(tmpUser) {
102-
addUserSQL := fmt.Sprintf("refresh_users('%s','+')", tmpUser)
103-
grants = append(grants, addUserSQL)
102+
//addUserSQL := fmt.Sprintf("refresh_users('%s','+')", tmpUser)
103+
//grants = append(grants, addUserSQL)
104+
grants = append(grants, tmpUser)
104105
}
105106
}
106107
}

dbm-services/mysql/db-tools/dbactuator/pkg/components/truncate/via_ctl.go

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -125,21 +125,6 @@ func (c *ViaCtlComponent) CreateStageTables() error {
125125

126126
// Truncate truncate table 不需要任何处理
127127
func (c *ViaCtlComponent) Truncate() error {
128-
/*
129-
v1 版本中, 这一步操作是直接 drop db, 而且写法是 drop db if exists
130-
所以可以不用关心 remote 上这个 db 在不在
131-
v2 版本优化了 dropSourceDBs, 会先定向删除 db 中表, 能减少超时
132-
但是当 remote 的 db 不存在时 (实际确实没了), 会报错
133-
所以需要在这里单独把 tcadmin 设置为 0, 不转发 drop 命令
134-
只能在这个函数内设置, 因为这个函数只在最后的 中控清档 步骤执行
135-
如果在前面设置会影响其他步骤
136-
*/
137-
_, err := c.dbConn.ExecContext(context.Background(), `SET TC_ADMIN=0`)
138-
if err != nil {
139-
logger.Error("truncate on ctl set tc admin failed: ", err.Error())
140-
return err
141-
}
142-
143128
if c.Param.TruncateDataType == "drop_database" {
144129
err := c.dropSourceDBs()
145130
if err != nil {
@@ -192,11 +177,24 @@ func (c *ViaCtlComponent) dropSourceTables() error {
192177
// 得循环起来先一个一个把表 drop 了
193178
func (c *ViaCtlComponent) dropSourceDBs() error {
194179
for db := range c.dbTablesMap {
180+
// 中控连接设置了 tc_admin = 1
181+
// 当清档类型是 drop db 时, remote 的库已经删除了
182+
// 为了能清理 spider, tc_admin 必须保持为 1
183+
// 所以必须在中控把库临时恢复出来
184+
_, err := c.dbConn.ExecContext(
185+
context.Background(),
186+
fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", db),
187+
)
188+
if err != nil {
189+
logger.Error("drop source dbs on ctl recreate source db failed: ", err.Error())
190+
return err
191+
}
192+
195193
stageDBName := generateStageDBName(c.Param.StageDBHeader, c.Param.FlowTimeStr, db)
196194

197195
logger.Info(fmt.Sprintf("drop source dbs %v should drop it's tables", c.dbTablesMap[db]))
198196

199-
err := tpkg.SafeDropSourceTables(c.dbConn, db, stageDBName, c.dbTablesMap[db])
197+
err = tpkg.SafeDropSourceTables(c.dbConn, db, stageDBName, c.dbTablesMap[db])
200198
if err != nil {
201199
logger.Error("drop source tables %v failed: ", c.dbTablesMap[db], err.Error())
202200
return err

0 commit comments

Comments
 (0)