Skip to content

Commit 39e1d8c

Browse files
feat: implementing KubeBlocks integration with Rainbond (#2305)
* feat: implementing KubeBlocks integration with Rainbond Forward the request to Block Mechanica: - GET /v2/cluster/kubeblocks/supported-databases -> /v1/addons - GET /v2/cluster/kubeblocks/storage-classes -> /v1/storageclasses - GET /v2/cluster/kubeblocks/backup-repos -> /v1/backuprepos - POST /v2/cluster/kubeblocks/clusters -> /v1/clusters - GET /v2/cluster/kubeblocks/clusters/connect-infos -> /v1/clusters/connect-infos - GET /v2/cluster/kubeblocks/component/{service_id}/infos -> /v1/kubeblocks-component/{service-id} - GET /v2/cluster/kubeblocks/clusters/{service_id} -> /v1/clusters/{service-id} - PUT /v2/cluster/kubeblocks/clusters/{service_id} -> /v1/clusters/{service-id} - PUT /v2/cluster/kubeblocks/clusters/{service_id}/backup-schedules -> /v1/clusters/{service-id}/backup-schedules - POST /v2/cluster/kubeblocks/clusters/{service_id}/backups -> /v1/clusters/{service-id}/backups - GET /v2/cluster/kubeblocks/clusters/{service_id}/backups -> /v1/clusters/{service-id}/backups - DELETE /v2/cluster/kubeblocks/clusters -> /v1/clusters - DELETE /v2/cluster/kubeblocks/clusters/{service_id}/backups -> /v1/clusters/{service-id}/backups Signed-off-by: kurea <[email protected]> * feat: add new Rainbond Service Type "kubeblock_component" to integrate KubeBlocks with Rainbond Signed-off-by: kurea <[email protected]> * feat: support custom database configuration parameter Signed-off-by: kurea <[email protected]> * feat: add support restore cluster from backup API Signed-off-by: kurea <[email protected]> * feat: add selector support for RabbitMQ Signed-off-by: kurea <[email protected]> * refactor: remove deprecated API Signed-off-by: kurea <[email protected]> * feat: support outer service for kubeblcoks component Signed-off-by: kurea <[email protected]> * fix: generate correct selector for kubeblocks_component Signed-off-by: kurea <[email protected]> * refactor: change blockMechanicaBaseURL for new service name Signed-off-by: kurea <[email protected]> --------- Signed-off-by: kurea <[email protected]>
1 parent d752e66 commit 39e1d8c

File tree

25 files changed

+594
-2
lines changed

25 files changed

+594
-2
lines changed

api/api/api_interface.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,3 +363,25 @@ type GatewayLoadBalancerInterface interface {
363363
UpdateLoadBalancer(w http.ResponseWriter, r *http.Request)
364364
DeleteLoadBalancer(w http.ResponseWriter, r *http.Request)
365365
}
366+
367+
// KubeBlocksInterface KubeBlocks related interface
368+
type KubeBlocksInterface interface {
369+
GetSupportedDatabases(w http.ResponseWriter, r *http.Request)
370+
GetStorageClasses(w http.ResponseWriter, r *http.Request)
371+
GetBackupRepos(w http.ResponseWriter, r *http.Request)
372+
CreateCluster(w http.ResponseWriter, r *http.Request)
373+
GetClusterConnectInfos(w http.ResponseWriter, r *http.Request)
374+
GetClusterByID(w http.ResponseWriter, r *http.Request)
375+
ExpansionCluster(w http.ResponseWriter, r *http.Request)
376+
UpdateClusterBackupSchedules(w http.ResponseWriter, r *http.Request)
377+
CreateClusterBackup(w http.ResponseWriter, r *http.Request)
378+
GetClusterBackups(w http.ResponseWriter, r *http.Request)
379+
DeleteClusters(w http.ResponseWriter, r *http.Request)
380+
DeleteClusterBackup(w http.ResponseWriter, r *http.Request)
381+
ManageCluster(w http.ResponseWriter, r *http.Request)
382+
GetClusterPodDetail(w http.ResponseWriter, r *http.Request)
383+
GetClusterEvents(w http.ResponseWriter, r *http.Request)
384+
GetClusterParameters(w http.ResponseWriter, r *http.Request)
385+
ChangeClusterParameters(w http.ResponseWriter, r *http.Request)
386+
RestoreClusterFromBackup(w http.ResponseWriter, r *http.Request)
387+
}

api/api_routers/version2/v2Routers.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,24 @@ func (v2 *V2) clusterRouter() chi.Router {
414414
r.Put("/langVersion", controller.GetManager().UpdateLangVersion)
415415
r.Delete("/langVersion", controller.GetManager().DeleteLangVersion)
416416
r.Post("/over_score", controller.GetManager().SetOverScore)
417+
r.Get("/kubeblocks/supported-databases", controller.GetManager().GetSupportedDatabases)
418+
r.Get("/kubeblocks/storage-classes", controller.GetManager().GetStorageClasses)
419+
r.Get("/kubeblocks/backup-repos", controller.GetManager().GetBackupRepos)
420+
r.Post("/kubeblocks/clusters", controller.GetManager().CreateCluster)
421+
r.Get("/kubeblocks/clusters/connect-infos", controller.GetManager().GetClusterConnectInfos)
422+
r.Get("/kubeblocks/clusters/{service_id}", controller.GetManager().GetClusterByID)
423+
r.Put("/kubeblocks/clusters/{service_id}", controller.GetManager().ExpansionCluster)
424+
r.Put("/kubeblocks/clusters/{service_id}/backup-schedules", controller.GetManager().UpdateClusterBackupSchedules)
425+
r.Post("/kubeblocks/clusters/{service_id}/backups", controller.GetManager().CreateClusterBackup)
426+
r.Get("/kubeblocks/clusters/{service_id}/backups", controller.GetManager().GetClusterBackups)
427+
r.Delete("/kubeblocks/clusters", controller.GetManager().DeleteClusters)
428+
r.Delete("/kubeblocks/clusters/{service_id}/backups", controller.GetManager().DeleteClusterBackup)
429+
r.Post("/kubeblocks/clusters/actions", controller.GetManager().ManageCluster)
430+
r.Get("/kubeblocks/clusters/{service_id}/pods/{pod_name}/details", controller.GetManager().GetClusterPodDetail)
431+
r.Get("/kubeblocks/clusters/{service_id}/events", controller.GetManager().GetClusterEvents)
432+
r.Get("/kubeblocks/clusters/{service_id}/parameters", controller.GetManager().GetClusterParameters)
433+
r.Post("/kubeblocks/clusters/{service_id}/parameters", controller.GetManager().ChangeClusterParameters)
434+
r.Post("/kubeblocks/clusters/{service_id}/restores", controller.GetManager().RestoreClusterFromBackup)
417435
return r
418436
}
419437

api/controller/apigateway/api_gateway_route.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
v13 "github.com/cert-manager/cert-manager/pkg/apis/acme/v1"
1111
cmapi "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
1212
v12 "github.com/cert-manager/cert-manager/pkg/apis/meta/v1"
13+
kbutil "github.com/goodrain/rainbond/util/kubeblocks"
1314
"k8s.io/apimachinery/pkg/labels"
1415
"k8s.io/apimachinery/pkg/runtime"
1516
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -464,6 +465,17 @@ func (g Struct) CreateTCPRoute(w http.ResponseWriter, r *http.Request) {
464465
}()
465466
}
466467

468+
// kubeblocks_component should use specific selector
469+
rbdService, err := db.GetManager().TenantServiceDao().GetServiceByID(serviceID)
470+
if err != nil {
471+
logrus.Errorf("get service by id %s error: %v", serviceID, err)
472+
httputil.ReturnBcodeError(r, w, bcode.ErrRouteUpdate)
473+
return
474+
}
475+
if rbdService.ExtendMethod == "kubeblocks_component" {
476+
spec.Selector = kbutil.GenerateKubeBlocksSelector(rbdService.K8sComponentName)
477+
}
478+
467479
// Try to get the existing service first
468480
service, err := k.Services(tenant.Namespace).Get(r.Context(), name, v1.GetOptions{})
469481
if err != nil {

api/controller/kubeblocks.go

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
package controller
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"strings"
9+
10+
"github.com/go-chi/chi"
11+
httputil "github.com/goodrain/rainbond/util/http"
12+
"github.com/sirupsen/logrus"
13+
)
14+
15+
// KubeBlocksController -
16+
type KubeBlocksController struct{}
17+
18+
// blockMechanicaBaseURL base URL for block-mechanica service
19+
const blockMechanicaBaseURL = "http://kb-adapter-rbdplugin.rbd-system.svc:80"
20+
21+
// GetSupportedDatabases get KubeBlocks supported databases list
22+
func (c *KubeBlocksController) GetSupportedDatabases(w http.ResponseWriter, r *http.Request) {
23+
c.forwardRequest(w, r, "/v1/addons", "GET")
24+
}
25+
26+
// GetStorageClasses get KubeBlocks storage classes list
27+
func (c *KubeBlocksController) GetStorageClasses(w http.ResponseWriter, r *http.Request) {
28+
c.forwardRequest(w, r, "/v1/storageclasses", "GET")
29+
}
30+
31+
// GetBackupRepos get KubeBlocks backup repositories list
32+
func (c *KubeBlocksController) GetBackupRepos(w http.ResponseWriter, r *http.Request) {
33+
c.forwardRequest(w, r, "/v1/backuprepos", "GET")
34+
}
35+
36+
// CreateCluster create KubeBlocks database cluster
37+
func (c *KubeBlocksController) CreateCluster(w http.ResponseWriter, r *http.Request) {
38+
c.forwardRequest(w, r, "/v1/clusters", "POST")
39+
}
40+
41+
// GetClusterConnectInfos get KubeBlocks clusters connection infos
42+
func (c *KubeBlocksController) GetClusterConnectInfos(w http.ResponseWriter, r *http.Request) {
43+
c.forwardRequest(w, r, "/v1/clusters/connect-infos", "GET")
44+
}
45+
46+
// GetClusterByID get KubeBlocks cluster by service ID
47+
func (c *KubeBlocksController) GetClusterByID(w http.ResponseWriter, r *http.Request) {
48+
serviceID := chi.URLParam(r, "service_id")
49+
c.forwardRequest(w, r, fmt.Sprintf("/v1/clusters/%s", serviceID), "GET")
50+
}
51+
52+
// ExpansionCluster update KubeBlocks cluster
53+
func (c *KubeBlocksController) ExpansionCluster(w http.ResponseWriter, r *http.Request) {
54+
serviceID := chi.URLParam(r, "service_id")
55+
c.forwardRequest(w, r, fmt.Sprintf("/v1/clusters/%s", serviceID), "PUT")
56+
}
57+
58+
// UpdateClusterBackupSchedules update KubeBlocks cluster backup schedules
59+
func (c *KubeBlocksController) UpdateClusterBackupSchedules(w http.ResponseWriter, r *http.Request) {
60+
serviceID := chi.URLParam(r, "service_id")
61+
c.forwardRequest(w, r, fmt.Sprintf("/v1/clusters/%s/backup-schedules", serviceID), "PUT")
62+
}
63+
64+
// CreateClusterBackup create KubeBlocks cluster backup
65+
func (c *KubeBlocksController) CreateClusterBackup(w http.ResponseWriter, r *http.Request) {
66+
serviceID := chi.URLParam(r, "service_id")
67+
c.forwardRequest(w, r, fmt.Sprintf("/v1/clusters/%s/backups", serviceID), "POST")
68+
}
69+
70+
// GetClusterBackups get KubeBlocks cluster backups
71+
func (c *KubeBlocksController) GetClusterBackups(w http.ResponseWriter, r *http.Request) {
72+
serviceID := chi.URLParam(r, "service_id")
73+
c.forwardRequest(w, r, fmt.Sprintf("/v1/clusters/%s/backups", serviceID), "GET")
74+
}
75+
76+
// DeleteClusters delete KubeBlocks clusters
77+
func (c *KubeBlocksController) DeleteClusters(w http.ResponseWriter, r *http.Request) {
78+
c.forwardRequest(w, r, "/v1/clusters", "DELETE")
79+
}
80+
81+
// DeleteClusterBackup delete KubeBlocks cluster backup
82+
func (c *KubeBlocksController) DeleteClusterBackup(w http.ResponseWriter, r *http.Request) {
83+
serviceID := chi.URLParam(r, "service_id")
84+
c.forwardRequest(w, r, fmt.Sprintf("/v1/clusters/%s/backups", serviceID), "DELETE")
85+
}
86+
87+
// ManageCluster forwards to block-mechanica to manage cluster lifecycle
88+
func (c *KubeBlocksController) ManageCluster(w http.ResponseWriter, r *http.Request) {
89+
logrus.Infof("ManageCluster request: %v", r.Body)
90+
c.forwardRequest(w, r, "/v1/clusters/actions", "POST")
91+
}
92+
93+
// GetClusterPodDetail forwards to block-mechanica to get pod details managed by Cluster
94+
func (c *KubeBlocksController) GetClusterPodDetail(w http.ResponseWriter, r *http.Request) {
95+
serviceID := chi.URLParam(r, "service_id")
96+
podName := chi.URLParam(r, "pod_name")
97+
c.forwardRequest(w, r, fmt.Sprintf("/v1/clusters/%s/pods/%s/details", serviceID, podName), "GET")
98+
}
99+
100+
// GetClusterEvents forwards to block-mechanica to get cluster's events
101+
func (c *KubeBlocksController) GetClusterEvents(w http.ResponseWriter, r *http.Request) {
102+
serviceID := chi.URLParam(r, "service_id")
103+
c.forwardRequest(w, r, fmt.Sprintf("/v1/clusters/%s/events", serviceID), "GET")
104+
}
105+
106+
func (c *KubeBlocksController) GetClusterParameters(w http.ResponseWriter, r *http.Request) {
107+
serviceID := chi.URLParam(r, "service_id")
108+
c.forwardRequest(w, r, fmt.Sprintf("/v1/clusters/%s/parameters", serviceID), "GET")
109+
}
110+
111+
func (c *KubeBlocksController) ChangeClusterParameters(w http.ResponseWriter, r *http.Request) {
112+
serviceID := chi.URLParam(r, "service_id")
113+
c.forwardRequest(w, r, fmt.Sprintf("/v1/clusters/%s/parameters", serviceID), "POST")
114+
}
115+
116+
func (c *KubeBlocksController) RestoreClusterFromBackup(w http.ResponseWriter, r *http.Request) {
117+
serviceID := chi.URLParam(r, "service_id")
118+
c.forwardRequest(w, r, fmt.Sprintf("/v1/clusters/%s/restores", serviceID), "POST") // TODO
119+
}
120+
121+
// forwardRequest helper function to forward requests to Block Mechanica
122+
func (c *KubeBlocksController) forwardRequest(w http.ResponseWriter, r *http.Request, api, method string) {
123+
// 构建URL
124+
targetURL := blockMechanicaBaseURL + api
125+
if r.URL.RawQuery != "" {
126+
targetURL += "?" + r.URL.RawQuery
127+
}
128+
logrus.Debugf("request block-mechanica service: %s %s", method, targetURL)
129+
130+
var req *http.Request
131+
var err error
132+
133+
// 读取请求体
134+
var body io.Reader
135+
requestBody, readErr := io.ReadAll(r.Body)
136+
if readErr != nil {
137+
logrus.Errorf("read request body failed: %v", readErr)
138+
httputil.ReturnError(r, w, 400, "read request body failed: "+readErr.Error())
139+
return
140+
}
141+
if len(requestBody) > 0 {
142+
body = strings.NewReader(string(requestBody))
143+
}
144+
req, err = http.NewRequest(method, targetURL, body)
145+
146+
if err != nil {
147+
logrus.Errorf("create request to block-mechanica failed: %v", err)
148+
httputil.ReturnError(r, w, 500, "create request to block-mechanica failed: "+err.Error())
149+
return
150+
}
151+
152+
// Set headers
153+
for key, values := range r.Header {
154+
for _, value := range values {
155+
req.Header.Set(key, value)
156+
}
157+
}
158+
159+
if req.Header.Get("Content-Type") == "" && (method == "POST" || method == "PUT" || method == "DELETE") {
160+
req.Header.Set("Content-Type", "application/json")
161+
}
162+
163+
client := &http.Client{}
164+
resp, err := client.Do(req)
165+
if err != nil {
166+
logrus.Errorf("request block-mechanica service failed: %v", err)
167+
httputil.ReturnError(r, w, 500, "request block-mechanica service failed: "+err.Error())
168+
return
169+
}
170+
defer resp.Body.Close()
171+
172+
if resp.StatusCode != http.StatusOK {
173+
body, _ := io.ReadAll(resp.Body)
174+
logrus.Errorf("block-mechanica service returned error status code %d: %s", resp.StatusCode, string(body))
175+
httputil.ReturnError(r, w, resp.StatusCode, "block-mechanica service returned error: "+string(body))
176+
return
177+
}
178+
179+
var result map[string]interface{}
180+
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
181+
logrus.Errorf("parse block-mechanica response failed: %v", err)
182+
httputil.ReturnError(r, w, 500, "parse block-mechanica response failed: "+err.Error())
183+
return
184+
}
185+
186+
if list, exists := result["list"]; exists {
187+
// 尝试提取分页信息,存在分页信息则 ReturnList
188+
if total, pageNum, ok := extractPaginationInfo(result); ok {
189+
httputil.ReturnList(r, w, total, pageNum, list)
190+
return
191+
}
192+
193+
httputil.ReturnSuccess(r, w, list)
194+
} else if field, exists := result["bean"]; exists {
195+
httputil.ReturnSuccess(r, w, field)
196+
} else {
197+
httputil.ReturnSuccess(r, w, result)
198+
}
199+
}
200+
201+
// extractPaginationInfo 从结果中提取分页信息
202+
func extractPaginationInfo(result map[string]interface{}) (total, pageNum int, ok bool) {
203+
number, hasNumber := result["number"]
204+
if !hasNumber {
205+
return 0, 0, false
206+
}
207+
208+
page, hasPage := result["page"]
209+
if !hasPage {
210+
return 0, 0, false
211+
}
212+
213+
total, totalOk := safeToInt(number)
214+
if !totalOk {
215+
return 0, 0, false
216+
}
217+
218+
pageNum, pageOk := safeToInt(page)
219+
if !pageOk {
220+
return 0, 0, false
221+
}
222+
223+
return total, pageNum, true
224+
}
225+
226+
// safeToInt 安全地将 interface{} 转换为 int,支持多种数字类型
227+
func safeToInt(value interface{}) (int, bool) {
228+
switch v := value.(type) {
229+
case int:
230+
return v, true
231+
case int32:
232+
return int(v), true
233+
case int64:
234+
return int(v), true
235+
case float32:
236+
return int(v), true
237+
case float64:
238+
return int(v), true
239+
case json.Number:
240+
if i, err := v.Int64(); err == nil {
241+
return int(i), true
242+
}
243+
}
244+
logrus.Warnf("Failed to convert %T(%v) to int", value, value)
245+
return 0, false
246+
}

api/controller/manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type V2Manager interface {
5252
api.HelmInterface
5353
api.RegistryInterface
5454
api.GatewayInterface
55+
api.KubeBlocksInterface
5556
}
5657

5758
var defaultV2Manager V2Manager

api/controller/resources.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ type V2Routes struct {
7070
HelmStruct
7171
Registry
7272
apigateway.Struct
73+
KubeBlocksController
7374
}
7475

7576
// Show test

api/handler/group/group_backup.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,9 @@ func (h *BackupHandle) snapshot(ids []string, sourceDir string, force bool) erro
232232
//TODO: support thirdpart service backup and restore
233233
continue
234234
}
235+
if service.IsKubeBlocksComponent() {
236+
continue
237+
}
235238
data := &RegionServiceSnapshot{
236239
ServiceID: id,
237240
}

api/handler/service.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1961,6 +1961,10 @@ func (s *ServiceAction) GetStatus(serviceID string) (*apimodel.StatusList, error
19611961
CurStatus: services.CurStatus,
19621962
StatusCN: TransStatus(services.CurStatus),
19631963
}
1964+
// kubeblocks_component 的 workload 由 KubeBlocks 管理,Rainbond 不读取其运行状态
1965+
if services.IsKubeBlocksComponent() {
1966+
return sl, nil
1967+
}
19641968
status := s.statusCli.GetStatus(serviceID)
19651969
if status != "" {
19661970
sl.CurStatus = status
@@ -2120,6 +2124,10 @@ type K8sPodInfo struct {
21202124

21212125
// GetPods get pods
21222126
func (s *ServiceAction) GetPods(serviceID string) (*K8sPodInfos, error) {
2127+
// kubeblocks_component 的 workload 由 KubeBlocks 管理,Rainbond 不查询其 Pods
2128+
if svc, _ := db.GetManager().TenantServiceDao().GetServiceByID(serviceID); svc != nil && svc.IsKubeBlocksComponent() {
2129+
return &K8sPodInfos{NewPods: []*K8sPodInfo{}, OldPods: []*K8sPodInfo{}}, nil
2130+
}
21232131
pods, err := s.statusCli.GetServicePods(serviceID)
21242132
if err != nil && !strings.Contains(err.Error(), server.ErrAppServiceNotFound.Error()) &&
21252133
!strings.Contains(err.Error(), server.ErrPodNotFound.Error()) {

0 commit comments

Comments
 (0)