Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbm-services/common/reverse-api/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
cmd
cmd/*
23 changes: 23 additions & 0 deletions dbm-services/common/reverse-api/apis/common/list_nginx_addrs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package common

import (
"dbm-services/common/reverse-api/config"
"dbm-services/common/reverse-api/internal"
"encoding/json"

"github.com/pkg/errors"
)

func ListNginxAddrs(bkCloudId int) ([]string, error) {
data, err := internal.ReverseCall(config.ReverseApiCommonListNginxAddrs, bkCloudId)
if err != nil {
return nil, errors.Wrap(err, "failed to call ListNginxAddrs")
}

var addrs []string
if err := json.Unmarshal(data, &addrs); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal ListNginxAddrs")
}

return addrs, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package mysql

import (
"dbm-services/common/reverse-api/config"
"dbm-services/common/reverse-api/internal"
"encoding/json"

"github.com/pkg/errors"
)

const (
AccessLayerStorage string = "storage"
AccessLayerProxy string = "proxy"
)

type instanceAddr struct {
Ip string `json:"ip"`
Port int `json:"port"`
}

type commonInstanceInfo struct {
instanceAddr
ImmuteDomain string `json:"immute_domain"`
Phase string `json:"phase"`
Status string `json:"status"`
AccessLayer string `json:"access_layer"`
MachineType string `json:"machine_type"`
}

type StorageInstanceInfo struct {
commonInstanceInfo
IsStandBy bool `json:"is_stand_by"`
InstanceRole string `json:"instance_role"`
InstanceInnerRole string `json:"instance_inner_role"`
Receivers []instanceAddr `json:"receivers"`
Ejectors []instanceAddr `json:"ejectors"`
}

type ProxyInstanceInfo struct {
commonInstanceInfo
StorageInstanceList []instanceAddr `json:"storage_instance_list"`
}

func ListInstanceInfo(bkCloudId int, ports ...int) ([]byte, string, error) {
data, err := internal.ReverseCall(config.ReverseApiMySQLListInstanceInfo, bkCloudId, ports...)
if err != nil {
return nil, "", errors.Wrap(err, "failed to call ListInstanceInfo")
}
var r []commonInstanceInfo
err = json.Unmarshal(data, &r)
if err != nil {
return nil, "", errors.Wrap(err, "failed to unmarshal ListInstanceInfo")
}

return data, r[0].AccessLayer, nil
}
6 changes: 6 additions & 0 deletions dbm-services/common/reverse-api/config/apis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package config

const (
ReverseApiCommonListNginxAddrs = "common/list_nginx_addrs"
ReverseApiMySQLListInstanceInfo = "mysql/list_instance_info"
)
11 changes: 11 additions & 0 deletions dbm-services/common/reverse-api/config/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package config

const CommonConfigDir = "/home/mysql/common_config"
const NginxProxyAddrsFileName = "nginx_proxy.list"
const ReverseApiBase = "apis/proxypass/reverse_api"

type ReverseApiName string

func (c ReverseApiName) String() string {
return string(c)
}
3 changes: 3 additions & 0 deletions dbm-services/common/reverse-api/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module dbm-services/common/reverse-api

go 1.21.11
1 change: 1 addition & 0 deletions dbm-services/common/reverse-api/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package reverse_api
11 changes: 11 additions & 0 deletions dbm-services/common/reverse-api/internal/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package internal

import "encoding/json"

type apiResponse struct {
Result bool `json:"result"`
Code int `json:"code"`
Message string `json:"message"`
Errors string `json:"errors"`
Data json.RawMessage `json:"data"`
}
103 changes: 103 additions & 0 deletions dbm-services/common/reverse-api/internal/reverse_call.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package internal

import (
"bufio"
"dbm-services/common/reverse-api/config"
"encoding/json"
errs "errors"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"

"github.com/pkg/errors"
)

func ReverseCall(api config.ReverseApiName, bkCloudId int, ports ...int) (data []byte, err error) {
addrs, err := readNginxProxyAddrs()
if err != nil {
return nil, errors.Wrap(err, "failed to read nginx proxy addresses")
}

var errCollect []error
for _, addr := range addrs {
apiPath, _ := url.JoinPath(config.ReverseApiBase, api.String(), "/")
ep := url.URL{
Scheme: "http",
Host: addr,
Path: apiPath,
}

req, err := http.NewRequest(http.MethodGet, ep.String(), nil)
if err != nil {
return nil, errors.Wrap(err, "failed to create request")
}

q := req.URL.Query()
q.Add("bk_cloud_id", strconv.Itoa(bkCloudId))
for _, port := range ports {
q.Add("port", strconv.Itoa(port))
}
req.URL.RawQuery = q.Encode()

data, err = do(req)
if err == nil {
return data, nil
}
errCollect = append(errCollect, err)
}

return nil, errs.Join(errCollect...)
}

func do(request *http.Request) (data []byte, err error) {
resp, err := http.DefaultClient.Do(request)
if err != nil {
return nil, errors.Wrap(err, "failed to send request")
}
defer func() {
_ = resp.Body.Close()
}()

b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrap(err, "failed to read response body")
}

if resp.StatusCode != http.StatusOK {
return nil, errors.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(b))
}

var r apiResponse
err = json.Unmarshal(b, &r)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal response body")
}

if !r.Result {
return nil, errors.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, r.Errors)
}

return r.Data, nil
}

func readNginxProxyAddrs() (addrs []string, err error) {
f, err := os.Open(filepath.Join(config.CommonConfigDir, config.NginxProxyAddrsFileName))
if err != nil {
return nil, errors.Wrap(err, "failed to open nginx proxy addrs")
}
defer func() {
_ = f.Close()
}()

scanner := bufio.NewScanner(f)
for scanner.Scan() {
addrs = append(addrs, scanner.Text())
}
if err := scanner.Err(); err != nil {
return nil, errors.Wrap(err, "failed to read nginx proxy addrs")
}
return addrs, nil
}
1 change: 1 addition & 0 deletions dbm-services/go.work
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use (
mongodb/db-tools/mongo-toolkit-go
mongodb/db-tools/dbactuator
common/db-dns/dns-api/pkg
common/reverse-api
)

replace github.com/go-sql-driver/mysql => github.com/go-sql-driver/mysql v1.7.1
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"encoding/json"
"log/slog"

"github.com/pkg/errors"
)
Expand All @@ -15,6 +16,7 @@ type JobDefine struct {
Creator string `json:"creator"`
Enable bool `json:"enable"`
WorkDir string `json:"work_dir"`
Overlap bool `json:"overlap"`
}

// CreateOrReplace TODO
Expand All @@ -26,6 +28,8 @@ func (m *Manager) CreateOrReplace(job JobDefine, permanent bool) (int, error) {
Job: job,
Permanent: permanent,
}
slog.Info("CreateOrReplace", slog.Any("job", job))

resp, err := m.do("/create_or_replace", "POST", body)
if err != nil {
return 0, errors.Wrap(err, "manager call /create_or_replace")
Expand Down
45 changes: 30 additions & 15 deletions dbm-services/mysql/db-tools/mysql-crond/pkg/config/job_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ExternalJob struct {
Schedule string `yaml:"schedule" json:"schedule" binding:"required" validate:"required"`
Creator string `yaml:"creator" json:"creator" binding:"required" validate:"required"`
WorkDir string `yaml:"work_dir" json:"work_dir"`
Overlap bool `yaml:"overlap" json:"overlap"` // 是否允许作业重叠执行, 默认 false
// JobID 这个 id 主要用于追溯哪个 cron job (如果有) 调起本 external job
JobID cron.EntryID `yaml:"-" json:"-"`
ch chan struct{}
Expand Down Expand Up @@ -87,23 +88,35 @@ func (j *ExternalJob) run() {

// Run TODO
func (j *ExternalJob) Run() {
select {
case v := <-j.ch:
slog.Info(
"run job",
slog.String("name", j.Name),
slog.Bool("overlap", j.Overlap),
)

if j.Overlap {
j.run()
j.ch <- v
default:
slog.Warn("skip job", slog.String("name", j.Name))
err := SendEvent(
mysqlCrondEventName,
fmt.Sprintf("%s skipt for last round use too much time", j.Name),
map[string]interface{}{
"job_name": j.Name,
},
)
if err != nil {
slog.Error("send event", slog.String("error", err.Error()))
} else {
select {
case v := <-j.ch:
j.run()
j.ch <- v
default:
slog.Warn("skip job", slog.String("name", j.Name))
err := SendEvent(
mysqlCrondEventName,
fmt.Sprintf("%s skipt for last round use too much time", j.Name),
map[string]interface{}{
"job_name": j.Name,
},
)
if err != nil {
slog.Error("send event", slog.String("error", err.Error()))
}
}
}

slog.Info("finish job", slog.String("name", j.Name))
}

// SetupChannel TODO
Expand Down Expand Up @@ -165,7 +178,9 @@ func InitJobsConfig() error {
panic(err)
}

j.SetupChannel()
if !j.Overlap {
j.SetupChannel()
}
}
return nil
}
6 changes: 6 additions & 0 deletions dbm-services/mysql/db-tools/mysql-crond/pkg/crond/crond.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package crond

import (
"dbm-services/mysql/db-tools/mysql-crond/pkg/third_party"
"log/slog"
"sync"

Expand Down Expand Up @@ -91,6 +92,11 @@ func Start() error {
slog.Info("add heart beat job", slog.Int("entry id", int(entryID)))
}

// 第三方
for _, rg := range third_party.ThirdPartyRegisters {
rg(cronJob)
}

cronJob.Start()
slog.Info("crond start")
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,9 @@ func CreateOrReplace(j *config.ExternalJob, permanent bool) (int, error) {
)
return 0, err
}
slog.Info(
"create or replace job",
slog.Any("job", j),
)
return entryID, nil
}
Loading
Loading