Skip to content

Commit 46d337d

Browse files
zvictorinoiSecloud
authored andcommitted
feat(kafka): kafka 4.x版本支持 close #14179
1 parent 71af08b commit 46d337d

File tree

11 files changed

+1086
-201
lines changed

11 files changed

+1086
-201
lines changed

dbm-services/bigdata/db-tools/dbactuator/pkg/components/kafka/install_kafka.go

Lines changed: 133 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -34,28 +34,32 @@ type InstallKafkaComp struct {
3434

3535
// InstallKafkaParams TODO
3636
type InstallKafkaParams struct {
37-
KafkaConfigs json.RawMessage `json:"kafka_configs" ` // server.properties
38-
Version string `json:"version" ` // 版本号eg
39-
Port int `json:"port" ` // 连接端口
40-
JmxPort int `json:"jmx_port" ` // 连接端口
41-
Retention int `json:"retention" ` // 保存时间
42-
Replication int `json:"replication" ` // 默认副本数
43-
Partition int `json:"partition" ` // 默认分区数
44-
Factor int `json:"factor" ` // __consumer_offsets副本数
45-
ZookeeperIP string `json:"zookeeper_ip" ` // zookeeper ip, eg: ip1,ip2,ip3
46-
ZookeeperConf string `json:"zookeeper_conf" ` // zookeeper ip, eg: ip1,ip2,ip3
47-
MyID int `json:"my_id" ` // 默认副本数
48-
JvmMem string `json:"jvm_mem"` // eg: 10g
49-
Host string `json:"host" `
50-
ClusterName string `json:"cluster_name" ` // 集群名
51-
Username string `json:"username" `
52-
Password string `json:"password" `
53-
BkBizID int `json:"bk_biz_id"`
54-
DbType string `json:"db_type"`
55-
ServiceType string `json:"service_type"`
56-
NoSecurity int `json:"no_security"` // 兼容现有,0:有鉴权,1:无鉴权
57-
RetentionBytes int `json:"retention_bytes"` // log.retention.bytes 默认-1
58-
Rack string `json:"rack"` // 所属机架
37+
KafkaConfigs json.RawMessage `json:"kafka_configs" ` // server.properties
38+
Version string `json:"version" ` // 版本号eg
39+
Port int `json:"port" ` // 连接端口
40+
JmxPort int `json:"jmx_port" ` // 连接端口
41+
Retention int `json:"retention" ` // 保存时间
42+
Replication int `json:"replication" ` // 默认副本数
43+
Partition int `json:"partition" ` // 默认分区数
44+
Factor int `json:"factor" ` // __consumer_offsets副本数
45+
ZookeeperIP string `json:"zookeeper_ip" ` // zookeeper ip, eg: ip1,ip2,ip3
46+
ZookeeperConf string `json:"zookeeper_conf" ` // zookeeper ip, eg: ip1,ip2,ip3
47+
MyID int `json:"my_id" ` // 默认副本数
48+
JvmMem string `json:"jvm_mem"` // eg: 10g
49+
Host string `json:"host" `
50+
ClusterName string `json:"cluster_name" ` // 集群名
51+
Username string `json:"username" `
52+
Password string `json:"password" `
53+
BkBizID int `json:"bk_biz_id"`
54+
DbType string `json:"db_type"`
55+
ServiceType string `json:"service_type"`
56+
NoSecurity int `json:"no_security"` // 兼容现有,0:有鉴权,1:无鉴权
57+
RetentionBytes int `json:"retention_bytes"` // log.retention.bytes 默认-1
58+
Rack string `json:"rack"` // 所属机架
59+
NodeID int `json:"node_id"` // 节点ID
60+
Role string `json:"role"` // 节点角色
61+
ControllerVoters string `json:"controller_voters"` // 控制器选举节点列表
62+
ControllerServers string `json:"controller_servers"` // 控制器节点列表
5963
}
6064

6165
// InitDirs TODO
@@ -72,17 +76,6 @@ type KfConfig struct {
7276
ZookeeperDir string
7377
}
7478

75-
// RenderConfig 需要替换的配置值 Todo
76-
type RenderConfig struct {
77-
ClusterName string
78-
NodeName string
79-
HTTPPort int
80-
CharacterSetServer string
81-
InnodbBufferPoolSize string
82-
Logdir string
83-
ServerID uint64
84-
}
85-
8679
// CmakConfig Kafka manager config
8780
type CmakConfig struct {
8881
ZookeeperIP string
@@ -518,29 +511,36 @@ func (i *InstallKafkaComp) InitKafkaUser() (err error) {
518511
*/
519512
func (i *InstallKafkaComp) InstallBroker() error {
520513
var (
521-
retentionHours = i.Params.Retention
522-
replicationNum = i.Params.Replication
523-
partitionNum = i.Params.Partition
524-
factor = i.Params.Factor
525-
nodeIP = i.Params.Host
526-
port = i.Params.Port
527-
jmxPort = i.Params.JmxPort
528-
listeners = fmt.Sprintf("%s:%d", nodeIP, port)
529-
version = i.Params.Version
530-
processors = runtime.NumCPU()
531-
zookeeperIP = i.Params.ZookeeperIP
532-
kafkaBaseDir = fmt.Sprintf("%s/kafka-%s", cst.DefaultKafkaEnv, version)
533-
username = i.Params.Username
534-
password = i.Params.Password
535-
noSecurity = i.Params.NoSecurity
536-
brokerConfig = i.Params.KafkaConfigs
537-
retentionBytes = i.Params.RetentionBytes
538-
rack = i.Params.Rack
514+
retentionHours = i.Params.Retention
515+
replicationNum = i.Params.Replication
516+
partitionNum = i.Params.Partition
517+
factor = i.Params.Factor
518+
nodeIP = i.Params.Host
519+
port = i.Params.Port
520+
jmxPort = i.Params.JmxPort
521+
version = i.Params.Version
522+
processors = runtime.NumCPU()
523+
zookeeperIP = i.Params.ZookeeperIP
524+
kafkaBaseDir = fmt.Sprintf("%s/kafka-%s", cst.DefaultKafkaEnv, version)
525+
username = i.Params.Username
526+
password = i.Params.Password
527+
noSecurity = i.Params.NoSecurity
528+
brokerConfig = i.Params.KafkaConfigs
529+
retentionBytes = i.Params.RetentionBytes
530+
rack = i.Params.Rack
531+
nodeID = i.Params.NodeID
532+
processRoles = i.Params.Role
533+
controllerServers = i.Params.ControllerServers
539534
)
540535

541536
if retentionBytes == 0 {
542537
retentionBytes = -1
543538
}
539+
// controller指定端口2181
540+
if processRoles == cst.KafkaRoleController {
541+
port = cst.KafkaControllerPort
542+
}
543+
listeners := fmt.Sprintf("%s:%d", nodeIP, port)
544544

545545
// ln -s /data/kafkaenv/kafka-$version /data/kafkaenv/kafka
546546
kafkaLink := fmt.Sprintf("%s/kafka", cst.DefaultKafkaEnv)
@@ -598,21 +598,35 @@ func (i *InstallKafkaComp) InstallBroker() error {
598598
}
599599
} else {
600600
templateData := kafkautil.TemplateData{
601-
NumNetWorkThreads: processors,
602-
LogRetentionHours: retentionHours,
603-
DefaultReplicationFactor: replicationNum,
604-
NumPartitions: partitionNum,
605-
NumIOThreads: processors * 2,
606-
NumReplicaFetchers: processors,
607-
LogDirs: kafkaDataDir,
608-
Listeners: listeners,
609-
ZookeeperConnect: zookeeperConnect,
610-
LogRetentionBytes: retentionBytes,
611-
BrokerRack: rack,
601+
NumNetWorkThreads: processors,
602+
LogRetentionHours: retentionHours,
603+
DefaultReplicationFactor: replicationNum,
604+
NumPartitions: partitionNum,
605+
NumIOThreads: processors * 2,
606+
NumReplicaFetchers: processors,
607+
LogDirs: kafkaDataDir,
608+
Listeners: listeners,
609+
ZookeeperConnect: zookeeperConnect,
610+
LogRetentionBytes: retentionBytes,
611+
BrokerRack: rack,
612+
ControllerQuorumBootstrapServers: controllerServers,
613+
NodeId: nodeID,
614+
ProcessRoles: processRoles,
615+
UpperProcessRoles: strings.ToUpper(processRoles),
612616
}
613617
if err := kafkautil.CreateServerPropertiesFile(brokerConfig, templateData, cst.KafkaTmpConfig); err != nil {
614618
return err
615619
}
620+
if processRoles == cst.KafkaRoleController {
621+
logger.Info("controller mode, 去掉sasl配置")
622+
extraCmd = fmt.Sprintf(`sed -i -e "/sasl.enabled.mechanisms=/d" \
623+
-e "/sasl.mechanism.inter.broker.protocol=/d" \
624+
-e "/inter.broker.listener.name=/d" %s`, cst.KafkaTmpConfig)
625+
if _, err := osutil.ExecShellCommandJ(false, extraCmd); err != nil {
626+
logger.Error("[%s] execute failed, %v", extraCmd, err)
627+
return err
628+
}
629+
}
616630
// copy to server.properties
617631
extraCmd = fmt.Sprintf("cp %s %s", cst.KafkaTmpConfig, cst.KafkaConfigFile)
618632
logger.Info("Exec cmd [%s]", extraCmd)
@@ -635,11 +649,17 @@ func (i *InstallKafkaComp) InstallBroker() error {
635649
}
636650
}
637651

652+
if kafkautil.CompareVersion(version, cst.Kafka400) >= 0 {
653+
if err := i.FormatDir(); err != nil {
654+
return err
655+
}
656+
}
657+
638658
if err := configKafka(username, password, kafkaLink, jmxPort); err != nil {
639659
return err
640660
}
641661

642-
if err := startKafka(i.KafkaEnvDir, noSecurity); err != nil {
662+
if err := startKafka(i.KafkaEnvDir, noSecurity, processRoles); err != nil {
643663
return err
644664
}
645665

@@ -746,7 +766,7 @@ func configJVM(kafkaLink string) (err error) {
746766
return nil
747767
}
748768

749-
func startKafka(kafkaEnvDir string, noSecurity int) (err error) {
769+
func startKafka(kafkaEnvDir string, noSecurity int, role string) (err error) {
750770
logger.Info("生成kafka.ini文件")
751771
kafkaini := esutil.GenKafkaini()
752772
kafkainiFile := fmt.Sprintf("%s/kafka.ini", cst.DefaultKafkaSupervisorConf)
@@ -767,7 +787,7 @@ func startKafka(kafkaEnvDir string, noSecurity int) (err error) {
767787
}
768788

769789
// Security related
770-
if noSecurity == 1 {
790+
if noSecurity == 1 || role == cst.KafkaRoleController {
771791
extraCmd = fmt.Sprintf("sed -i 's/kafka-server-scram-start.sh/kafka-server-start.sh/' %s/kafka.ini",
772792
cst.DefaultKafkaSupervisorConf)
773793
if _, err = osutil.ExecShellCommand(false, extraCmd); err != nil {
@@ -1188,3 +1208,51 @@ func getenvOr(key, def string) string {
11881208
}
11891209
return def
11901210
}
1211+
1212+
// FormatDir TODO
1213+
func (i *InstallKafkaComp) FormatDir() error {
1214+
clusterName := i.Params.ClusterName
1215+
username := i.Params.Username
1216+
password := i.Params.Password
1217+
role := i.Params.Role
1218+
controllerQuorumVoters := i.Params.ControllerVoters
1219+
1220+
// bin/kafka-storage.sh format -t cluster -c config/server.properties --no-initial-controllers \
1221+
// --add-scram "SCRAM-SHA-512=[name=***,password=***]"
1222+
extraCmd := fmt.Sprintf(
1223+
`%s format -t %s -c %s --no-initial-controllers --add-scram "SCRAM-SHA-512=[name=%s,password=%s]"`,
1224+
cst.KafkaStorageBin,
1225+
clusterName, cst.KafkaConfigFile, username, password)
1226+
1227+
if role == cst.KafkaRoleController {
1228+
// bin/kafka-storage.sh format -t cluster -c config/server.properties --initial-controllers "" \
1229+
// --add-scram "SCRAM-SHA-512=[name=***,password=***]"
1230+
extraCmd = fmt.Sprintf(
1231+
`%s format -t %s -c %s --initial-controllers "%s" --add-scram "SCRAM-SHA-512=[name=%s,password=%s]"`,
1232+
cst.KafkaStorageBin,
1233+
clusterName,
1234+
cst.KafkaConfigFile,
1235+
controllerQuorumVoters,
1236+
username,
1237+
password)
1238+
}
1239+
1240+
logger.Info("Format dir: [%s]", extraCmd)
1241+
if output, err := osutil.ExecShellCommandJ(false, extraCmd); err != nil {
1242+
logger.Error("Format dir failed, %s, %s", output, err.Error())
1243+
return err
1244+
}
1245+
dataDir, err := kafkautil.ReadDataDirs(cst.KafkaConfigFile)
1246+
if err != nil {
1247+
logger.Error("Read data dir from config failed, %s", err.Error())
1248+
return err
1249+
}
1250+
for _, dir := range dataDir {
1251+
extraCmd = fmt.Sprintf("chown -R mysql %s", dir)
1252+
if _, err := osutil.ExecShellCommandJ(false, extraCmd); err != nil {
1253+
logger.Error("%s execute failed, %v", extraCmd, err)
1254+
return err
1255+
}
1256+
}
1257+
return nil
1258+
}

0 commit comments

Comments
 (0)