Skip to content

Commit 0529b41

Browse files
authored
fix: some change (#2430)
Signed-off-by: 张启航 <[email protected]>
1 parent 1982dd8 commit 0529b41

File tree

1 file changed

+176
-23
lines changed

1 file changed

+176
-23
lines changed

worker/appm/store/store.go

Lines changed: 176 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"strconv"
3333
"strings"
3434
"sync"
35+
"sync/atomic"
3536
"time"
3637

3738
"github.com/goodrain/rainbond/api/util/bcode"
@@ -155,6 +156,7 @@ type appRuntimeStore struct {
155156
volumeTypeListeners map[string]chan<- *model.TenantServiceVolumeType
156157
volumeTypeListenerLock sync.Mutex
157158
resourceCache *ResourceCache
159+
initLocks sync.Map // map[serviceID]*sync.Mutex for AppService initialization
158160
}
159161

160162
// NewStore new app runtime store
@@ -403,13 +405,45 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) {
403405
createrID := deployment.Labels["creater_id"]
404406
migrator := deployment.Labels["migrator"]
405407
appID := deployment.Labels["app_id"]
408+
409+
// 目标service_id用于调试
410+
targetServiceID := "f2aa2032719d4b82bc3d0cf6d44d4488"
411+
406412
if serviceID != "" && version != "" && createrID != "" {
413+
replicas := int32(0)
414+
if deployment.Spec.Replicas != nil {
415+
replicas = *deployment.Spec.Replicas
416+
}
417+
418+
if serviceID == targetServiceID {
419+
logrus.Errorf("[目标组件调试] ========== Deployment OnAdd 事件触发 ==========")
420+
logrus.Errorf("[目标组件调试] Deployment名称: %s", deployment.Name)
421+
logrus.Errorf("[目标组件调试] 命名空间: %s", deployment.Namespace)
422+
logrus.Errorf("[目标组件调试] version标签: %s", version)
423+
logrus.Errorf("[目标组件调试] ReadyReplicas: %d", deployment.Status.ReadyReplicas)
424+
logrus.Errorf("[目标组件调试] 期望副本数: %d", replicas)
425+
}
426+
427+
logrus.Infof("[Deployment新增] 检测到 %s 的 Deployment: %s, service_id=%s, version=%s, ReadyReplicas=%d, DesiredReplicas=%d",
428+
deployment.Namespace, deployment.Name, serviceID, version,
429+
deployment.Status.ReadyReplicas, replicas)
430+
407431
appservice, err := a.getAppService(serviceID, version, createrID, true)
408432
if err == conversion.ErrServiceNotFound {
409433
a.k8sClient.Clientset.AppsV1().Deployments(deployment.Namespace).Delete(context.Background(), deployment.Name, metav1.DeleteOptions{})
410434
}
411435
if appservice != nil {
412436
appservice.SetDeployment(deployment)
437+
438+
if serviceID == targetServiceID {
439+
logrus.Errorf("[目标组件调试] SetDeployment 执行成功")
440+
logrus.Errorf("[目标组件调试] AppService.DeployVersion: %s", appservice.DeployVersion)
441+
logrus.Errorf("[目标组件调试] AppService.Replicas: %d", appservice.Replicas)
442+
logrus.Errorf("[目标组件调试] 当前Pod数量: %d", len(appservice.GetPods(false)))
443+
}
444+
445+
logrus.Infof("[Deployment新增] %s Deployment %s: 成功设置到 AppService (serviceID=%s, AppService.Replicas=%d, CurrentPodCount=%d)",
446+
deployment.Namespace, deployment.Name, serviceID, appservice.Replicas, len(appservice.GetPods(false)))
413447
if migrator == "rainbond" {
414448
label := "service_id=" + serviceID
415449
pods, _ := a.k8sClient.Clientset.CoreV1().Pods(deployment.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: label})
@@ -758,29 +792,51 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) {
758792

759793
// getAppService if creator is true, will create new app service where not found in store
760794
func (a *appRuntimeStore) getAppService(serviceID, version, createrID string, creator bool) (*v1.AppService, error) {
761-
var appservice *v1.AppService
762-
appservice = a.GetAppService(serviceID)
795+
key := v1.GetCacheKeyOnlyServiceID(serviceID)
763796

764-
// 调试日志:检查是否从内存中获取到了 AppService
765-
if appservice != nil {
766-
// 已经存在于内存中
767-
return appservice, nil
797+
// 第一次检查(无锁,快速路径)
798+
if app, ok := a.appServices.Load(key); ok {
799+
return app.(*v1.AppService), nil
768800
}
769801

770-
// 内存中不存在,尝试从数据库初始化
771-
if appservice == nil && creator {
772-
logrus.Infof("[getAppService] 内存中未找到 AppService (serviceID=%s),尝试从数据库初始化", serviceID)
773-
var err error
774-
appservice, err = conversion.InitCacheAppService(a.dbmanager, serviceID, createrID)
775-
if err != nil {
776-
logrus.Warnf("[getAppService] 从数据库初始化 AppService 失败 (serviceID=%s, createrID=%s): %s",
777-
serviceID, createrID, err.Error())
778-
return nil, err
779-
}
780-
logrus.Infof("[getAppService] 成功从数据库初始化 AppService (serviceID=%s, TenantID=%s, ServiceAlias=%s)",
781-
serviceID, appservice.TenantID, appservice.ServiceAlias)
782-
a.RegistAppService(appservice)
802+
// 如果不需要创建,直接返回nil
803+
if !creator {
804+
return nil, nil
783805
}
806+
807+
// 获取该 serviceID 的专属锁
808+
// 使用 LoadOrStore 确保每个 serviceID 只有一个锁实例
809+
lockInterface, _ := a.initLocks.LoadOrStore(serviceID, &sync.Mutex{})
810+
mu := lockInterface.(*sync.Mutex)
811+
812+
// 加锁,确保同一 serviceID 的初始化串行执行
813+
mu.Lock()
814+
defer mu.Unlock()
815+
816+
// 第二次检查(持有锁)
817+
// 可能在等待锁期间,其他协程已经完成了初始化
818+
if app, ok := a.appServices.Load(key); ok {
819+
logrus.Debugf("[getAppService] 并发场景:其他协程已完成初始化 AppService (serviceID=%s)", serviceID)
820+
return app.(*v1.AppService), nil
821+
}
822+
823+
// 当前协程获得初始化权,开始真正的初始化
824+
logrus.Infof("[getAppService] 内存中未找到 AppService (serviceID=%s),开始从数据库初始化", serviceID)
825+
826+
appservice, err := conversion.InitCacheAppService(a.dbmanager, serviceID, createrID)
827+
if err != nil {
828+
logrus.Warnf("[getAppService] 从数据库初始化 AppService 失败 (serviceID=%s, createrID=%s): %s",
829+
serviceID, createrID, err.Error())
830+
return nil, err
831+
}
832+
833+
// 存储到缓存
834+
a.appServices.Store(key, appservice)
835+
atomic.AddInt32(&a.appCount, 1)
836+
837+
logrus.Infof("[getAppService] 成功从数据库初始化 AppService (serviceID=%s, TenantID=%s, ServiceAlias=%s)",
838+
serviceID, appservice.TenantID, appservice.ServiceAlias)
839+
784840
return appservice, nil
785841
}
786842

@@ -795,6 +851,23 @@ func (a *appRuntimeStore) getOperatorManaged(appID string) *v1.OperatorManaged {
795851
}
796852

797853
func (a *appRuntimeStore) OnUpdate(oldObj, newObj interface{}) {
854+
// Log deployment updates to debug status sync issues
855+
if deployment, ok := newObj.(*appsv1.Deployment); ok {
856+
oldDeployment := oldObj.(*appsv1.Deployment)
857+
serviceID := deployment.Labels["service_id"]
858+
if serviceID != "" && oldDeployment.Status.ReadyReplicas != deployment.Status.ReadyReplicas {
859+
logrus.Infof("[Deployment更新] %s Deployment %s: ReadyReplicas变化 %d -> %d, DesiredReplicas=%d",
860+
deployment.Namespace, deployment.Name,
861+
oldDeployment.Status.ReadyReplicas, deployment.Status.ReadyReplicas,
862+
func() int32 {
863+
if deployment.Spec.Replicas != nil {
864+
return *deployment.Spec.Replicas
865+
}
866+
return 0
867+
}())
868+
}
869+
}
870+
798871
// ingress update maybe change owner component
799872
if ingress, ok := newObj.(*networkingv1.Ingress); ok {
800873
oldIngress := oldObj.(*networkingv1.Ingress)
@@ -1064,8 +1137,8 @@ func (a *appRuntimeStore) OnDeletes(objs ...interface{}) {
10641137
// RegistAppService regist a app model to store.
10651138
func (a *appRuntimeStore) RegistAppService(app *v1.AppService) {
10661139
a.appServices.Store(v1.GetCacheKeyOnlyServiceID(app.ServiceID), app)
1067-
a.appCount++
1068-
logrus.Debugf("current have %d app after add \n", a.appCount)
1140+
newCount := atomic.AddInt32(&a.appCount, 1)
1141+
logrus.Debugf("current have %d app after add \n", newCount)
10691142
}
10701143

10711144
func (a *appRuntimeStore) RegistOperatorManaged(app *v1.OperatorManaged) {
@@ -1086,8 +1159,8 @@ func (a *appRuntimeStore) DeleteAppService(app *v1.AppService) {
10861159
// DeleteAppServiceByKey delete cache app service
10871160
func (a *appRuntimeStore) DeleteAppServiceByKey(key v1.CacheKey) {
10881161
a.appServices.Delete(key)
1089-
a.appCount--
1090-
logrus.Debugf("current have %d app after delete \n", a.appCount)
1162+
newCount := atomic.AddInt32(&a.appCount, -1)
1163+
logrus.Debugf("current have %d app after delete \n", newCount)
10911164
}
10921165

10931166
func (a *appRuntimeStore) GetAppService(serviceID string) *v1.AppService {
@@ -1274,13 +1347,63 @@ func (a *appRuntimeStore) GetAppServiceStatus(serviceID string) string {
12741347
func (a *appRuntimeStore) GetAppServiceStatuses(serviceIDs []string) map[string]string {
12751348
statusMap := make(map[string]string, len(serviceIDs))
12761349
var queryComponentIDs []string
1350+
1351+
// 目标service_id用于调试
1352+
targetServiceID := "f2aa2032719d4b82bc3d0cf6d44d4488"
1353+
12771354
for _, serviceID := range serviceIDs {
12781355
app := a.GetAppService(serviceID)
12791356
if app == nil {
1357+
if serviceID == targetServiceID {
1358+
logrus.Errorf("[目标组件调试] ========== GetAppServiceStatuses 被调用 ==========")
1359+
logrus.Errorf("[目标组件调试] AppService 在内存中不存在!将检查数据库和K8s")
1360+
}
12801361
queryComponentIDs = append(queryComponentIDs, serviceID)
12811362
continue
12821363
}
1364+
1365+
// 详细日志
1366+
if serviceID == targetServiceID {
1367+
deployment := app.GetDeployment()
1368+
statefulset := app.GetStatefulSet()
1369+
pods := app.GetPods(false)
1370+
1371+
logrus.Errorf("[目标组件调试] ========== GetAppServiceStatuses 被调用 ==========")
1372+
logrus.Errorf("[目标组件调试] AppService 在内存中找到")
1373+
logrus.Errorf("[目标组件调试] ServiceAlias: %s", app.ServiceAlias)
1374+
logrus.Errorf("[目标组件调试] TenantID: %s", app.TenantID)
1375+
logrus.Errorf("[目标组件调试] DeployVersion: %s", app.DeployVersion)
1376+
logrus.Errorf("[目标组件调试] 期望副本数: %d", app.Replicas)
1377+
logrus.Errorf("[目标组件调试] Deployment存在: %v", deployment != nil)
1378+
if deployment != nil {
1379+
logrus.Errorf("[目标组件调试] Deployment.ReadyReplicas: %d", deployment.Status.ReadyReplicas)
1380+
logrus.Errorf("[目标组件调试] Deployment.Replicas: %d", *deployment.Spec.Replicas)
1381+
}
1382+
logrus.Errorf("[目标组件调试] StatefulSet存在: %v", statefulset != nil)
1383+
logrus.Errorf("[目标组件调试] Pod总数: %d", len(pods))
1384+
for i, pod := range pods {
1385+
podReady := false
1386+
if pod.Status.Phase == corev1.PodRunning {
1387+
for _, cond := range pod.Status.Conditions {
1388+
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
1389+
podReady = true
1390+
break
1391+
}
1392+
}
1393+
}
1394+
logrus.Errorf("[目标组件调试] Pod[%d] 名称: %s", i, pod.Name)
1395+
logrus.Errorf("[目标组件调试] Pod[%d] version: %s", i, pod.Labels["version"])
1396+
logrus.Errorf("[目标组件调试] Pod[%d] 状态: %s", i, pod.Status.Phase)
1397+
logrus.Errorf("[目标组件调试] Pod[%d] Ready: %v", i, podReady)
1398+
}
1399+
}
1400+
12831401
status := app.GetServiceStatus()
1402+
1403+
if serviceID == targetServiceID {
1404+
logrus.Errorf("[目标组件调试] 最终返回状态: %s", status)
1405+
}
1406+
12841407
if status == v1.UNKNOW {
12851408
app := a.UpdateGetAppService(serviceID)
12861409
if app == nil {
@@ -1666,12 +1789,33 @@ func (a *appRuntimeStore) podEventHandler() cache.ResourceEventHandlerFuncs {
16661789
a.resourceCache.SetPodResource(pod)
16671790
tenantID, serviceID, version, createrID := k8sutil.ExtractLabels(pod.GetLabels())
16681791

1792+
// 目标service_id用于调试
1793+
targetServiceID := "f2aa2032719d4b82bc3d0cf6d44d4488"
1794+
16691795
// rbd-prd 命名空间的详细日志
16701796
if pod.Namespace == "rbd-prd" {
16711797
logrus.Infof("[Pod新增] 检测到 rbd-prd 的 Pod: %s, tenant_id=%s, service_id=%s, version=%s, creater_id=%s",
16721798
pod.Name, tenantID, serviceID, version, createrID)
16731799
}
16741800

1801+
if serviceID == targetServiceID {
1802+
podReady := false
1803+
if pod.Status.Phase == corev1.PodRunning {
1804+
for _, cond := range pod.Status.Conditions {
1805+
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
1806+
podReady = true
1807+
break
1808+
}
1809+
}
1810+
}
1811+
logrus.Errorf("[目标组件调试] ========== Pod OnAdd 事件触发 ==========")
1812+
logrus.Errorf("[目标组件调试] Pod名称: %s", pod.Name)
1813+
logrus.Errorf("[目标组件调试] 命名空间: %s", pod.Namespace)
1814+
logrus.Errorf("[目标组件调试] version标签: %s", version)
1815+
logrus.Errorf("[目标组件调试] Pod状态: %s", pod.Status.Phase)
1816+
logrus.Errorf("[目标组件调试] Pod是否Ready: %v", podReady)
1817+
}
1818+
16751819
// 检查必需标签是否存在
16761820
if serviceID == "" || version == "" || createrID == "" {
16771821
if pod.Namespace == "rbd-prd" {
@@ -1684,6 +1828,15 @@ func (a *appRuntimeStore) podEventHandler() cache.ResourceEventHandlerFuncs {
16841828
if serviceID != "" && version != "" && createrID != "" {
16851829
appservice, err := a.getAppService(serviceID, version, createrID, true)
16861830

1831+
if serviceID == targetServiceID {
1832+
if appservice != nil {
1833+
logrus.Errorf("[目标组件调试] 找到AppService,准备添加Pod")
1834+
logrus.Errorf("[目标组件调试] AppService.DeployVersion: %s", appservice.DeployVersion)
1835+
} else {
1836+
logrus.Errorf("[目标组件调试] AppService为nil, 错误: %v", err)
1837+
}
1838+
}
1839+
16871840
if pod.Namespace == "rbd-prd" {
16881841
if err != nil {
16891842
logrus.Warnf("[Pod新增] rbd-prd Pod %s: 获取 AppService 失败 - %v (serviceID=%s, version=%s, createrID=%s)",

0 commit comments

Comments
 (0)