Skip to content

Commit 3ce323b

Browse files
committed
fix(mysql): tendbcluster 清档时序问题 #8863
1 parent 69543a9 commit 3ce323b

File tree

4 files changed

+85
-29
lines changed

4 files changed

+85
-29
lines changed

dbm-services/mysql/db-priv/service/clone_client_priv.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -224,18 +224,45 @@ func (m *CloneClientPrivPara) CloneClientPriv(jsonPara string, ticket string) ([
224224
"source ip", m.SourceIp)
225225
continue
226226
}
227-
proxyGrants, err := GetProxyPrivilege(address, matchHosts, item.BkCloudId, m.User)
227+
proxyUsers, err := GetProxyPrivilege(address, matchHosts, item.BkCloudId, m.User)
228228
if err != nil {
229229
slog.Error("msg", "GetProxyPrivilege", err)
230230
AddError(&errMsg, address, err)
231231
}
232-
if len(proxyGrants) == 0 {
232+
if len(proxyUsers) == 0 {
233233
slog.Info("no match user@host", "instance", address, "user", m.User)
234234
continue
235235
}
236-
proxyGrants = ReplaceHostInProxyGrants(proxyGrants, m.TargetIp)
237-
clusterGrant.Sqls = append(clusterGrant.Sqls, InstanceGrantSql{address, proxyGrants})
238-
err = ImportProxyPrivileges(proxyGrants, address, item.BkCloudId)
236+
proxyUsers = ReplaceHostInProxyGrants(proxyUsers, m.TargetIp)
237+
238+
var oneBuckUsers []string
239+
for _, u := range proxyUsers {
240+
oneBuckUsers = append(oneBuckUsers, u)
241+
if len(oneBuckUsers) >= 1000 {
242+
refreshSql := fmt.Sprintf(
243+
"refresh_users('%s', '+')",
244+
strings.Join(oneBuckUsers, ","),
245+
)
246+
clusterGrant.Sqls = append(clusterGrant.Sqls,
247+
InstanceGrantSql{address, []string{refreshSql}},
248+
)
249+
err = ImportProxyPrivileges(
250+
[]string{refreshSql},
251+
address, item.BkCloudId)
252+
if err != nil {
253+
AddError(&errMsg, address, err)
254+
}
255+
oneBuckUsers = []string{}
256+
}
257+
}
258+
refreshSql := fmt.Sprintf(
259+
"refresh_users('%s', '+')",
260+
strings.Join(oneBuckUsers, ","),
261+
)
262+
clusterGrant.Sqls = append(clusterGrant.Sqls,
263+
InstanceGrantSql{address, []string{refreshSql}},
264+
)
265+
err = ImportProxyPrivileges([]string{refreshSql}, address, item.BkCloudId)
239266
if err != nil {
240267
AddError(&errMsg, address, err)
241268
}

dbm-services/mysql/db-priv/service/clone_client_priv_base_func.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ func GetProxyPrivilege(address string, hosts []string, bkCloudId int64, specifie
8181
if len(hosts) == 0 {
8282
// 实例间克隆
8383
for _, user := range usersResult {
84-
addUserSQL := fmt.Sprintf("refresh_users('%s','+')", user["user@ip"].(string))
85-
grants = append(grants, addUserSQL)
84+
//addUserSQL := fmt.Sprintf("refresh_users('%s','+')", user["user@ip"].(string))
85+
grants = append(grants, user["user@ip"].(string))
8686
}
8787
} else {
8888
// 客户端克隆
@@ -99,8 +99,9 @@ func GetProxyPrivilege(address string, hosts []string, bkCloudId int64, specifie
9999
for _, user := range usersResult {
100100
tmpUser := user["user@ip"].(string)
101101
if re.MatchString(tmpUser) && !monitorReg.MatchString(tmpUser) {
102-
addUserSQL := fmt.Sprintf("refresh_users('%s','+')", tmpUser)
103-
grants = append(grants, addUserSQL)
102+
//addUserSQL := fmt.Sprintf("refresh_users('%s','+')", tmpUser)
103+
//grants = append(grants, addUserSQL)
104+
grants = append(grants, tmpUser)
104105
}
105106
}
106107
}

dbm-services/mysql/db-priv/service/clone_instance_priv.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package service
22

33
import (
4+
"errors"
45
"fmt"
56
"strings"
67
"time"
@@ -93,15 +94,44 @@ func (m *CloneInstancePrivPara) CloneInstancePriv(jsonPara string, ticket string
9394
if err != nil {
9495
return errno.ClonePrivilegesFail.Add(err.Error())
9596
}
96-
proxyGrants, err := GetProxyPrivilege(m.Source.Address, nil, *m.BkCloudId, "")
97+
proxyUsers, err := GetProxyPrivilege(m.Source.Address, nil, *m.BkCloudId, "")
9798
if err != nil {
9899
return err
99-
} else if len(proxyGrants) == 0 {
100+
} else if len(proxyUsers) == 0 {
100101
return errno.NoPrivilegesNothingToDo
101102
}
102-
err = ImportProxyPrivileges(proxyGrants, m.Target.Address, *m.BkCloudId)
103+
104+
var oneBuckUsers []string
105+
var errCollect error
106+
for _, u := range proxyUsers {
107+
oneBuckUsers = append(oneBuckUsers, u)
108+
if len(oneBuckUsers) >= 1000 {
109+
refreshSql := fmt.Sprintf(
110+
"refresh_users('%s', '+')",
111+
strings.Join(oneBuckUsers, ","),
112+
)
113+
114+
err = ImportProxyPrivileges(
115+
[]string{refreshSql},
116+
m.Target.Address, *m.BkCloudId)
117+
if err != nil {
118+
errCollect = errors.Join(errCollect, err)
119+
}
120+
oneBuckUsers = []string{}
121+
}
122+
}
123+
refreshSql := fmt.Sprintf(
124+
"refresh_users('%s', '+')",
125+
strings.Join(oneBuckUsers, ","),
126+
)
127+
128+
err = ImportProxyPrivileges([]string{refreshSql}, m.Target.Address, *m.BkCloudId)
103129
if err != nil {
104-
return err
130+
errCollect = errors.Join(errCollect, err)
131+
}
132+
133+
if errCollect != nil {
134+
return errCollect
105135
}
106136
}
107137
return nil

dbm-services/mysql/db-tools/dbactuator/pkg/components/truncate/via_ctl.go

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -125,21 +125,6 @@ func (c *ViaCtlComponent) CreateStageTables() error {
125125

126126
// Truncate truncate table 不需要任何处理
127127
func (c *ViaCtlComponent) Truncate() error {
128-
/*
129-
v1 版本中, 这一步操作是直接 drop db, 而且写法是 drop db if exists
130-
所以可以不用关心 remote 上这个 db 在不在
131-
v2 版本优化了 dropSourceDBs, 会先定向删除 db 中表, 能减少超时
132-
但是当 remote 的 db 不存在时 (实际确实没了), 会报错
133-
所以需要在这里单独把 tcadmin 设置为 0, 不转发 drop 命令
134-
只能在这个函数内设置, 因为这个函数只在最后的 中控清档 步骤执行
135-
如果在前面设置会影响其他步骤
136-
*/
137-
_, err := c.dbConn.ExecContext(context.Background(), `SET TC_ADMIN=0`)
138-
if err != nil {
139-
logger.Error("truncate on ctl set tc admin failed: ", err.Error())
140-
return err
141-
}
142-
143128
if c.Param.TruncateDataType == "drop_database" {
144129
err := c.dropSourceDBs()
145130
if err != nil {
@@ -192,11 +177,24 @@ func (c *ViaCtlComponent) dropSourceTables() error {
192177
// 得循环起来先一个一个把表 drop 了
193178
func (c *ViaCtlComponent) dropSourceDBs() error {
194179
for db := range c.dbTablesMap {
180+
// 中控连接设置了 tc_admin = 1
181+
// 当清档类型是 drop db 时, remote 的库已经删除了
182+
// 为了能清理 spider, tc_admin 必须保持为 1
183+
// 所以必须在中控把库临时恢复出来
184+
_, err := c.dbConn.ExecContext(
185+
context.Background(),
186+
fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", db),
187+
)
188+
if err != nil {
189+
logger.Error("drop source dbs on ctl recreate source db failed: ", err.Error())
190+
return err
191+
}
192+
195193
stageDBName := generateStageDBName(c.Param.StageDBHeader, c.Param.FlowTimeStr, db)
196194

197195
logger.Info(fmt.Sprintf("drop source dbs %v should drop it's tables", c.dbTablesMap[db]))
198196

199-
err := tpkg.SafeDropSourceTables(c.dbConn, db, stageDBName, c.dbTablesMap[db])
197+
err = tpkg.SafeDropSourceTables(c.dbConn, db, stageDBName, c.dbTablesMap[db])
200198
if err != nil {
201199
logger.Error("drop source tables %v failed: ", c.dbTablesMap[db], err.Error())
202200
return err

0 commit comments

Comments
 (0)