Skip to content

Commit 797234e

Browse files
authored
fix: component states are out of sync (#2431)
Signed-off-by: 张启航 <[email protected]>
1 parent 8e7b4be commit 797234e

File tree

1 file changed

+246
-18
lines changed

1 file changed

+246
-18
lines changed

worker/appm/store/store.go

Lines changed: 246 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"strconv"
3333
"strings"
3434
"sync"
35+
"sync/atomic"
3536
"time"
3637

3738
"github.com/goodrain/rainbond/api/util/bcode"
@@ -155,6 +156,7 @@ type appRuntimeStore struct {
155156
volumeTypeListeners map[string]chan<- *model.TenantServiceVolumeType
156157
volumeTypeListenerLock sync.Mutex
157158
resourceCache *ResourceCache
159+
initLocks sync.Map // map[serviceID]*sync.Mutex for AppService initialization
158160
}
159161

160162
// NewStore new app runtime store
@@ -403,13 +405,45 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) {
403405
createrID := deployment.Labels["creater_id"]
404406
migrator := deployment.Labels["migrator"]
405407
appID := deployment.Labels["app_id"]
408+
409+
// 目标service_id用于调试
410+
targetServiceID := "f2aa2032719d4b82bc3d0cf6d44d4488"
411+
406412
if serviceID != "" && version != "" && createrID != "" {
413+
replicas := int32(0)
414+
if deployment.Spec.Replicas != nil {
415+
replicas = *deployment.Spec.Replicas
416+
}
417+
418+
if serviceID == targetServiceID {
419+
logrus.Errorf("[目标组件调试] ========== Deployment OnAdd 事件触发 ==========")
420+
logrus.Errorf("[目标组件调试] Deployment名称: %s", deployment.Name)
421+
logrus.Errorf("[目标组件调试] 命名空间: %s", deployment.Namespace)
422+
logrus.Errorf("[目标组件调试] version标签: %s", version)
423+
logrus.Errorf("[目标组件调试] ReadyReplicas: %d", deployment.Status.ReadyReplicas)
424+
logrus.Errorf("[目标组件调试] 期望副本数: %d", replicas)
425+
}
426+
427+
logrus.Infof("[Deployment新增] 检测到 %s 的 Deployment: %s, service_id=%s, version=%s, ReadyReplicas=%d, DesiredReplicas=%d",
428+
deployment.Namespace, deployment.Name, serviceID, version,
429+
deployment.Status.ReadyReplicas, replicas)
430+
407431
appservice, err := a.getAppService(serviceID, version, createrID, true)
408432
if err == conversion.ErrServiceNotFound {
409433
a.k8sClient.Clientset.AppsV1().Deployments(deployment.Namespace).Delete(context.Background(), deployment.Name, metav1.DeleteOptions{})
410434
}
411435
if appservice != nil {
412436
appservice.SetDeployment(deployment)
437+
438+
if serviceID == targetServiceID {
439+
logrus.Errorf("[目标组件调试] SetDeployment 执行成功")
440+
logrus.Errorf("[目标组件调试] AppService.DeployVersion: %s", appservice.DeployVersion)
441+
logrus.Errorf("[目标组件调试] AppService.Replicas: %d", appservice.Replicas)
442+
logrus.Errorf("[目标组件调试] 当前Pod数量: %d", len(appservice.GetPods(false)))
443+
}
444+
445+
logrus.Infof("[Deployment新增] %s Deployment %s: 成功设置到 AppService (serviceID=%s, AppService.Replicas=%d, CurrentPodCount=%d)",
446+
deployment.Namespace, deployment.Name, serviceID, appservice.Replicas, len(appservice.GetPods(false)))
413447
if migrator == "rainbond" {
414448
label := "service_id=" + serviceID
415449
pods, _ := a.k8sClient.Clientset.CoreV1().Pods(deployment.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: label})
@@ -758,17 +792,51 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) {
758792

759793
// getAppService if creator is true, will create new app service where not found in store
760794
func (a *appRuntimeStore) getAppService(serviceID, version, createrID string, creator bool) (*v1.AppService, error) {
761-
var appservice *v1.AppService
762-
appservice = a.GetAppService(serviceID)
763-
if appservice == nil && creator {
764-
var err error
765-
appservice, err = conversion.InitCacheAppService(a.dbmanager, serviceID, createrID)
766-
if err != nil {
767-
logrus.Infof("init cache app service %s failure:%s ", serviceID, err.Error())
768-
return nil, err
769-
}
770-
a.RegistAppService(appservice)
795+
key := v1.GetCacheKeyOnlyServiceID(serviceID)
796+
797+
// 第一次检查(无锁,快速路径)
798+
if app, ok := a.appServices.Load(key); ok {
799+
return app.(*v1.AppService), nil
800+
}
801+
802+
// 如果不需要创建,直接返回nil
803+
if !creator {
804+
return nil, nil
805+
}
806+
807+
// 获取该 serviceID 的专属锁
808+
// 使用 LoadOrStore 确保每个 serviceID 只有一个锁实例
809+
lockInterface, _ := a.initLocks.LoadOrStore(serviceID, &sync.Mutex{})
810+
mu := lockInterface.(*sync.Mutex)
811+
812+
// 加锁,确保同一 serviceID 的初始化串行执行
813+
mu.Lock()
814+
defer mu.Unlock()
815+
816+
// 第二次检查(持有锁)
817+
// 可能在等待锁期间,其他协程已经完成了初始化
818+
if app, ok := a.appServices.Load(key); ok {
819+
logrus.Debugf("[getAppService] 并发场景:其他协程已完成初始化 AppService (serviceID=%s)", serviceID)
820+
return app.(*v1.AppService), nil
821+
}
822+
823+
// 当前协程获得初始化权,开始真正的初始化
824+
logrus.Infof("[getAppService] 内存中未找到 AppService (serviceID=%s),开始从数据库初始化", serviceID)
825+
826+
appservice, err := conversion.InitCacheAppService(a.dbmanager, serviceID, createrID)
827+
if err != nil {
828+
logrus.Warnf("[getAppService] 从数据库初始化 AppService 失败 (serviceID=%s, createrID=%s): %s",
829+
serviceID, createrID, err.Error())
830+
return nil, err
771831
}
832+
833+
// 存储到缓存
834+
a.appServices.Store(key, appservice)
835+
atomic.AddInt32(&a.appCount, 1)
836+
837+
logrus.Infof("[getAppService] 成功从数据库初始化 AppService (serviceID=%s, TenantID=%s, ServiceAlias=%s)",
838+
serviceID, appservice.TenantID, appservice.ServiceAlias)
839+
772840
return appservice, nil
773841
}
774842

@@ -783,6 +851,23 @@ func (a *appRuntimeStore) getOperatorManaged(appID string) *v1.OperatorManaged {
783851
}
784852

785853
func (a *appRuntimeStore) OnUpdate(oldObj, newObj interface{}) {
854+
// Log deployment updates to debug status sync issues
855+
if deployment, ok := newObj.(*appsv1.Deployment); ok {
856+
oldDeployment := oldObj.(*appsv1.Deployment)
857+
serviceID := deployment.Labels["service_id"]
858+
if serviceID != "" && oldDeployment.Status.ReadyReplicas != deployment.Status.ReadyReplicas {
859+
logrus.Infof("[Deployment更新] %s Deployment %s: ReadyReplicas变化 %d -> %d, DesiredReplicas=%d",
860+
deployment.Namespace, deployment.Name,
861+
oldDeployment.Status.ReadyReplicas, deployment.Status.ReadyReplicas,
862+
func() int32 {
863+
if deployment.Spec.Replicas != nil {
864+
return *deployment.Spec.Replicas
865+
}
866+
return 0
867+
}())
868+
}
869+
}
870+
786871
// ingress update maybe change owner component
787872
if ingress, ok := newObj.(*networkingv1.Ingress); ok {
788873
oldIngress := oldObj.(*networkingv1.Ingress)
@@ -1052,8 +1137,8 @@ func (a *appRuntimeStore) OnDeletes(objs ...interface{}) {
10521137
// RegistAppService regist a app model to store.
10531138
func (a *appRuntimeStore) RegistAppService(app *v1.AppService) {
10541139
a.appServices.Store(v1.GetCacheKeyOnlyServiceID(app.ServiceID), app)
1055-
a.appCount++
1056-
logrus.Debugf("current have %d app after add \n", a.appCount)
1140+
newCount := atomic.AddInt32(&a.appCount, 1)
1141+
logrus.Debugf("current have %d app after add \n", newCount)
10571142
}
10581143

10591144
func (a *appRuntimeStore) RegistOperatorManaged(app *v1.OperatorManaged) {
@@ -1074,8 +1159,8 @@ func (a *appRuntimeStore) DeleteAppService(app *v1.AppService) {
10741159
// DeleteAppServiceByKey delete cache app service
10751160
func (a *appRuntimeStore) DeleteAppServiceByKey(key v1.CacheKey) {
10761161
a.appServices.Delete(key)
1077-
a.appCount--
1078-
logrus.Debugf("current have %d app after delete \n", a.appCount)
1162+
newCount := atomic.AddInt32(&a.appCount, -1)
1163+
logrus.Debugf("current have %d app after delete \n", newCount)
10791164
}
10801165

10811166
func (a *appRuntimeStore) GetAppService(serviceID string) *v1.AppService {
@@ -1262,13 +1347,63 @@ func (a *appRuntimeStore) GetAppServiceStatus(serviceID string) string {
12621347
func (a *appRuntimeStore) GetAppServiceStatuses(serviceIDs []string) map[string]string {
12631348
statusMap := make(map[string]string, len(serviceIDs))
12641349
var queryComponentIDs []string
1350+
1351+
// 目标service_id用于调试
1352+
targetServiceID := "f2aa2032719d4b82bc3d0cf6d44d4488"
1353+
12651354
for _, serviceID := range serviceIDs {
12661355
app := a.GetAppService(serviceID)
12671356
if app == nil {
1357+
if serviceID == targetServiceID {
1358+
logrus.Errorf("[目标组件调试] ========== GetAppServiceStatuses 被调用 ==========")
1359+
logrus.Errorf("[目标组件调试] AppService 在内存中不存在!将检查数据库和K8s")
1360+
}
12681361
queryComponentIDs = append(queryComponentIDs, serviceID)
12691362
continue
12701363
}
1364+
1365+
// 详细日志
1366+
if serviceID == targetServiceID {
1367+
deployment := app.GetDeployment()
1368+
statefulset := app.GetStatefulSet()
1369+
pods := app.GetPods(false)
1370+
1371+
logrus.Errorf("[目标组件调试] ========== GetAppServiceStatuses 被调用 ==========")
1372+
logrus.Errorf("[目标组件调试] AppService 在内存中找到")
1373+
logrus.Errorf("[目标组件调试] ServiceAlias: %s", app.ServiceAlias)
1374+
logrus.Errorf("[目标组件调试] TenantID: %s", app.TenantID)
1375+
logrus.Errorf("[目标组件调试] DeployVersion: %s", app.DeployVersion)
1376+
logrus.Errorf("[目标组件调试] 期望副本数: %d", app.Replicas)
1377+
logrus.Errorf("[目标组件调试] Deployment存在: %v", deployment != nil)
1378+
if deployment != nil {
1379+
logrus.Errorf("[目标组件调试] Deployment.ReadyReplicas: %d", deployment.Status.ReadyReplicas)
1380+
logrus.Errorf("[目标组件调试] Deployment.Replicas: %d", *deployment.Spec.Replicas)
1381+
}
1382+
logrus.Errorf("[目标组件调试] StatefulSet存在: %v", statefulset != nil)
1383+
logrus.Errorf("[目标组件调试] Pod总数: %d", len(pods))
1384+
for i, pod := range pods {
1385+
podReady := false
1386+
if pod.Status.Phase == corev1.PodRunning {
1387+
for _, cond := range pod.Status.Conditions {
1388+
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
1389+
podReady = true
1390+
break
1391+
}
1392+
}
1393+
}
1394+
logrus.Errorf("[目标组件调试] Pod[%d] 名称: %s", i, pod.Name)
1395+
logrus.Errorf("[目标组件调试] Pod[%d] version: %s", i, pod.Labels["version"])
1396+
logrus.Errorf("[目标组件调试] Pod[%d] 状态: %s", i, pod.Status.Phase)
1397+
logrus.Errorf("[目标组件调试] Pod[%d] Ready: %v", i, podReady)
1398+
}
1399+
}
1400+
12711401
status := app.GetServiceStatus()
1402+
1403+
if serviceID == targetServiceID {
1404+
logrus.Errorf("[目标组件调试] 最终返回状态: %s", status)
1405+
}
1406+
12721407
if status == v1.UNKNOW {
12731408
app := a.UpdateGetAppService(serviceID)
12741409
if app == nil {
@@ -1652,17 +1787,80 @@ func (a *appRuntimeStore) podEventHandler() cache.ResourceEventHandlerFuncs {
16521787
AddFunc: func(obj interface{}) {
16531788
pod := obj.(*corev1.Pod)
16541789
a.resourceCache.SetPodResource(pod)
1655-
_, serviceID, version, createrID := k8sutil.ExtractLabels(pod.GetLabels())
1790+
tenantID, serviceID, version, createrID := k8sutil.ExtractLabels(pod.GetLabels())
1791+
1792+
// 目标service_id用于调试
1793+
targetServiceID := "f2aa2032719d4b82bc3d0cf6d44d4488"
1794+
1795+
// rbd-prd 命名空间的详细日志
1796+
if pod.Namespace == "rbd-prd" {
1797+
logrus.Infof("[Pod新增] 检测到 rbd-prd 的 Pod: %s, tenant_id=%s, service_id=%s, version=%s, creater_id=%s",
1798+
pod.Name, tenantID, serviceID, version, createrID)
1799+
}
1800+
1801+
if serviceID == targetServiceID {
1802+
podReady := false
1803+
if pod.Status.Phase == corev1.PodRunning {
1804+
for _, cond := range pod.Status.Conditions {
1805+
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
1806+
podReady = true
1807+
break
1808+
}
1809+
}
1810+
}
1811+
logrus.Errorf("[目标组件调试] ========== Pod OnAdd 事件触发 ==========")
1812+
logrus.Errorf("[目标组件调试] Pod名称: %s", pod.Name)
1813+
logrus.Errorf("[目标组件调试] 命名空间: %s", pod.Namespace)
1814+
logrus.Errorf("[目标组件调试] version标签: %s", version)
1815+
logrus.Errorf("[目标组件调试] Pod状态: %s", pod.Status.Phase)
1816+
logrus.Errorf("[目标组件调试] Pod是否Ready: %v", podReady)
1817+
}
1818+
1819+
// 检查必需标签是否存在
1820+
if serviceID == "" || version == "" || createrID == "" {
1821+
if pod.Namespace == "rbd-prd" {
1822+
logrus.Warnf("[Pod新增] rbd-prd Pod %s 缺少必需标签,将被忽略 - service_id=%q, version=%q, creater_id=%q, 所有标签: %v",
1823+
pod.Name, serviceID, version, createrID, pod.GetLabels())
1824+
}
1825+
return
1826+
}
1827+
16561828
if serviceID != "" && version != "" && createrID != "" {
16571829
appservice, err := a.getAppService(serviceID, version, createrID, true)
1830+
1831+
if serviceID == targetServiceID {
1832+
if appservice != nil {
1833+
logrus.Errorf("[目标组件调试] 找到AppService,准备添加Pod")
1834+
logrus.Errorf("[目标组件调试] AppService.DeployVersion: %s", appservice.DeployVersion)
1835+
} else {
1836+
logrus.Errorf("[目标组件调试] AppService为nil, 错误: %v", err)
1837+
}
1838+
}
1839+
1840+
if pod.Namespace == "rbd-prd" {
1841+
if err != nil {
1842+
logrus.Warnf("[Pod新增] rbd-prd Pod %s: 获取 AppService 失败 - %v (serviceID=%s, version=%s, createrID=%s)",
1843+
pod.Name, err, serviceID, version, createrID)
1844+
} else if appservice == nil {
1845+
logrus.Warnf("[Pod新增] rbd-prd Pod %s: AppService 为空,无法添加 (serviceID=%s, version=%s, createrID=%s)",
1846+
pod.Name, serviceID, version, createrID)
1847+
} else {
1848+
logrus.Infof("[Pod新增] rbd-prd Pod %s: 成功添加到 AppService (serviceID=%s, 当前Pod数量=%d)",
1849+
pod.Name, serviceID, len(appservice.GetPods(false))+1)
1850+
}
1851+
}
1852+
16581853
if err == conversion.ErrServiceNotFound {
1854+
if pod.Namespace == "rbd-prd" {
1855+
logrus.Warnf("[Pod新增] rbd-prd Pod %s: 数据库中未找到服务,将删除该 Pod (serviceID=%s)", pod.Name, serviceID)
1856+
}
16591857
a.k8sClient.Clientset.CoreV1().Pods(pod.Namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
16601858
}
16611859
if appservice != nil {
16621860
if vmName, t := pod.Labels["kubevirt.io/domain"]; t {
16631861
vm, err := a.k8sClient.KubevirtCli.VirtualMachine(pod.Namespace).Get(context.Background(), vmName, &metav1.GetOptions{})
16641862
if err != nil {
1665-
logrus.Errorf("get vm failure: %v", err)
1863+
logrus.Errorf("获取虚拟机失败: %v", err)
16661864
}
16671865
appservice.SetVirtualMachine(vm)
16681866
}
@@ -1688,17 +1886,47 @@ func (a *appRuntimeStore) podEventHandler() cache.ResourceEventHandlerFuncs {
16881886
UpdateFunc: func(old, cur interface{}) {
16891887
pod := cur.(*corev1.Pod)
16901888
a.resourceCache.SetPodResource(pod)
1691-
_, serviceID, version, createrID := k8sutil.ExtractLabels(pod.GetLabels())
1889+
tenantID, serviceID, version, createrID := k8sutil.ExtractLabels(pod.GetLabels())
1890+
1891+
// rbd-prd 命名空间的详细日志
1892+
if pod.Namespace == "rbd-prd" {
1893+
logrus.Infof("[Pod更新] 检测到 rbd-prd 的 Pod: %s, tenant_id=%s, service_id=%s, version=%s, creater_id=%s, Phase=%s",
1894+
pod.Name, tenantID, serviceID, version, createrID, pod.Status.Phase)
1895+
}
1896+
1897+
// 检查必需标签是否存在
1898+
if serviceID == "" || version == "" || createrID == "" {
1899+
if pod.Namespace == "rbd-prd" {
1900+
logrus.Warnf("[Pod更新] rbd-prd Pod %s 缺少必需标签,将被忽略 - service_id=%q, version=%q, creater_id=%q",
1901+
pod.Name, serviceID, version, createrID)
1902+
}
1903+
return
1904+
}
1905+
16921906
if serviceID != "" && version != "" && createrID != "" {
16931907
appservice, err := a.getAppService(serviceID, version, createrID, true)
1908+
1909+
if pod.Namespace == "rbd-prd" {
1910+
if err != nil {
1911+
logrus.Warnf("[Pod更新] rbd-prd Pod %s: 获取 AppService 失败 - %v", pod.Name, err)
1912+
} else if appservice == nil {
1913+
logrus.Warnf("[Pod更新] rbd-prd Pod %s: AppService 为空,无法更新", pod.Name)
1914+
} else {
1915+
logrus.Infof("[Pod更新] rbd-prd Pod %s: 成功更新到 AppService (serviceID=%s)", pod.Name, serviceID)
1916+
}
1917+
}
1918+
16941919
if err == conversion.ErrServiceNotFound {
1920+
if pod.Namespace == "rbd-prd" {
1921+
logrus.Warnf("[Pod更新] rbd-prd Pod %s: 数据库中未找到服务,将删除该 Pod", pod.Name)
1922+
}
16951923
a.k8sClient.Clientset.CoreV1().Pods(pod.Namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
16961924
}
16971925
if appservice != nil {
16981926
if vmName, t := pod.Labels["kubevirt.io/domain"]; t {
16991927
vm, err := a.k8sClient.KubevirtCli.VirtualMachine(pod.Namespace).Get(context.Background(), vmName, &metav1.GetOptions{})
17001928
if err != nil {
1701-
logrus.Errorf("get vm failure: %v", err)
1929+
logrus.Errorf("获取虚拟机失败: %v", err)
17021930
}
17031931
appservice.SetVirtualMachine(vm)
17041932
}

0 commit comments

Comments
 (0)