From f7f6cbb72c583a1d3e564d9600417bb75cc15544 Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Tue, 4 Nov 2025 13:47:03 -0500 Subject: [PATCH 1/7] Added BlueGreen ingress that switches between active Svc + resolve path conflict on Blue and Green deployment ingresses --- .../spec/FlinkBlueGreenDeploymentSpec.java | 2 + .../bluegreen/BlueGreenDeploymentService.java | 54 +++++++++ .../operator/utils/IngressUtils.java | 104 ++++++++++++++---- .../utils/bluegreen/BlueGreenUtils.java | 5 + ...linkBlueGreenDeploymentControllerTest.java | 2 +- ...uegreendeployments.flink.apache.org-v1.yml | 26 +++++ 6 files changed, 172 insertions(+), 21 deletions(-) diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java index 704d354152..cec2654023 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java @@ -38,5 +38,7 @@ public class FlinkBlueGreenDeploymentSpec { @JsonProperty("configuration") private Map configuration; + private IngressSpec ingress; + private FlinkDeploymentTemplateSpec template; } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java index 85de365b97..fcb93d37cd 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java @@ -18,6 +18,7 @@ package org.apache.flink.kubernetes.operator.controller.bluegreen; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType; @@ -29,6 +30,7 @@ import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeployments; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; +import org.apache.flink.kubernetes.operator.utils.IngressUtils; import org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils; import org.apache.flink.util.Preconditions; @@ -605,9 +607,61 @@ public UpdateControl finalizeBlueGreenDeployment( context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0)); context.getDeploymentStatus().setSavepointTriggerId(null); + reconcileBlueGreenIngress(context, nextState); return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING, null); } + /** + * Reconciles the ingress for Blue/Green deployment, pointing to the active deployment. + * + * @param context the Blue/Green context + * @param nextState which deployment (ACTIVE_BLUE or ACTIVE_GREEN) is currently active + */ + public void reconcileBlueGreenIngress( + BlueGreenContext context, + FlinkBlueGreenDeploymentState nextState) { + + var bgDeployment = context.getBgDeployment(); + var bgSpec = bgDeployment.getSpec(); + + if (bgSpec.getIngress() == null) { + // No ingress configured, nothing to do + return; + } + + FlinkDeployment activeDeployment; + String serviceName; + switch (nextState) { + case ACTIVE_BLUE: + activeDeployment = context.getBlueDeployment(); + serviceName = context.getBgDeployment().getMetadata().getName() + "-blue"; + break; + case ACTIVE_GREEN: + activeDeployment = context.getGreenDeployment(); + serviceName = context.getBgDeployment().getMetadata().getName() + "-green"; + break; + default: + LOG.debug("Skipping ingress reconciliation for non-active state: {}", nextState); + return; + } + + + // Create a FlinkResourceContext for the active deployment to get proper config + FlinkResourceContext ctx = + context.getCtxFactory() + .getResourceContext(activeDeployment, context.getJosdkContext()); + // Get the deployment configuration (includes REST port and all Flink settings) + Configuration deployConfig = ctx.getDeployConfig(activeDeployment.getSpec()); + + // Call IngressUtils to reconcile the ingress pointing to the active service + IngressUtils.reconcileBlueGreenIngress( + context, + serviceName, + deployConfig, + context.getJosdkContext()); + } + + // ==================== Common Utility Methods ==================== public static UpdateControl patchStatusUpdateControl( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java index 722270a1f6..28f929e35e 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java @@ -17,10 +17,16 @@ package org.apache.flink.kubernetes.operator.utils; +import io.javaoperatorsdk.operator.api.reconciler.Context; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment; +import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.api.spec.IngressSpec; +import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; +import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; import org.apache.flink.kubernetes.utils.Constants; import org.apache.flink.util.Preconditions; @@ -95,7 +101,7 @@ public static void reconcileIngress( } var objectMeta = ctx.getResource().getMetadata(); if (spec.getIngress() != null) { - HasMetadata ingress = getIngress(objectMeta, spec, effectiveConfig, client); + HasMetadata ingress = getIngress(objectMeta, spec.getIngress(), effectiveConfig, client); setOwnerReference(ctx.getResource(), Collections.singletonList(ingress)); LOG.info("Updating ingress rules {}", ingress); client.resource(ingress) @@ -116,33 +122,89 @@ public static void reconcileIngress( } } + public static void reconcileBlueGreenIngress( + BlueGreenContext context, + String serviceName, + Configuration effectiveConfig, + Context client + ){ + // todo see if I need to find way to cover this using this blueGreen method + // todo see if eventRecorder is strictly required (would be nice to have) +// if (!ctx.getOperatorConfig().isManageIngress()) { +// if (spec.getIngress() != null) { +// eventRecorder.triggerEvent( +// ctx.getResource(), +// EventRecorder.Type.Warning, +// INGRESS_MANAGEMENT, +// INGRESS_MANAGEMENT_OFF_BUT_SPEC_SET, +// EventRecorder.Component.Operator, +// client); +// } +// +// return; +// } + var spec = context.getBgDeployment().getSpec(); + var objectMeta = context.getBgDeployment().getMetadata(); + if (spec.getIngress() != null) { + HasMetadata ingress = getIngress(objectMeta, spec.getIngress(), effectiveConfig, client.getClient(), serviceName); + setOwnerReference(context.getBgDeployment(), Collections.singletonList(ingress)); + LOG.info("BLUE GREEN Updating ingress rules {}", ingress); + client.getClient().resource(ingress) + .inNamespace(objectMeta.getNamespace()) + .createOr(NonDeletingOperation::update); + } else { + LOG.info("BLUE GREEN NOTNOTNOT Updating ingress rules "); + Optional ingress; + if (ingressInNetworkingV1(client.getClient())) { + ingress = + client + .getSecondaryResource( + io.fabric8.kubernetes.api.model.networking.v1.Ingress + .class); + } else { + ingress = client.getSecondaryResource(Ingress.class); + } + ingress.ifPresent(i ->client.getClient().resource(i).delete()); + } + } + private static HasMetadata getIngress( ObjectMeta objectMeta, - FlinkDeploymentSpec spec, + IngressSpec spec, Configuration effectiveConfig, - KubernetesClient client) { + KubernetesClient client + ) { + return getIngress(objectMeta,spec,effectiveConfig,client, objectMeta.getName() + REST_SVC_NAME_SUFFIX); + } + + private static HasMetadata getIngress( + ObjectMeta objectMeta, + IngressSpec spec, + Configuration effectiveConfig, + KubernetesClient client, + String serviceName) { Map labels = - spec.getIngress().getLabels() == null + spec.getLabels() == null ? new HashMap<>() - : new HashMap<>(spec.getIngress().getLabels()); + : new HashMap<>(spec.getLabels()); labels.put(Constants.LABEL_COMPONENT_KEY, LABEL_COMPONENT_INGRESS); if (ingressInNetworkingV1(client)) { return new IngressBuilder() .withNewMetadata() .withLabels(labels) - .withAnnotations(spec.getIngress().getAnnotations()) + .withAnnotations(spec.getAnnotations()) .withName(objectMeta.getName()) .withNamespace(objectMeta.getNamespace()) .endMetadata() .withNewSpec() - .withIngressClassName(spec.getIngress().getClassName()) - .withTls(spec.getIngress().getTls()) - .withRules(getIngressRule(objectMeta, spec, effectiveConfig)) + .withIngressClassName(spec.getClassName()) + .withTls(spec.getTls()) + .withRules(getIngressRule(objectMeta, spec, effectiveConfig, serviceName)) .endSpec() .build(); } else { List ingressTLS = - Optional.ofNullable(spec.getIngress().getTls()) + Optional.ofNullable(spec.getTls()) .map( list -> list.stream() @@ -160,28 +222,29 @@ private static HasMetadata getIngress( .orElse(Collections.emptyList()); return new io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder() .withNewMetadata() - .withAnnotations(spec.getIngress().getAnnotations()) + .withAnnotations(spec.getAnnotations()) .withLabels(labels) .withName(objectMeta.getName()) .withNamespace(objectMeta.getNamespace()) .endMetadata() .withNewSpec() - .withIngressClassName(spec.getIngress().getClassName()) + .withIngressClassName(spec.getClassName()) .withTls(ingressTLS) - .withRules(getIngressRuleForV1beta1(objectMeta, spec, effectiveConfig)) + .withRules(getIngressRuleForV1beta1(objectMeta, spec, effectiveConfig, serviceName)) .endSpec() .build(); } } + // Todo remove, creatse new svc with every ingress call private static IngressRule getIngressRule( - ObjectMeta objectMeta, FlinkDeploymentSpec spec, Configuration effectiveConfig) { + ObjectMeta objectMeta, IngressSpec spec, Configuration effectiveConfig, String serviceName) { final String clusterId = objectMeta.getName(); final int restPort = effectiveConfig.getInteger(RestOptions.PORT); URL ingressUrl = getIngressUrl( - spec.getIngress().getTemplate(), + spec.getTemplate(), objectMeta.getName(), objectMeta.getNamespace()); @@ -192,7 +255,7 @@ private static IngressRule getIngressRule( .withPathType("ImplementationSpecific") .withNewBackend() .withNewService() - .withName(clusterId + REST_SVC_NAME_SUFFIX) + .withName(serviceName) .withNewPort() .withNumber(restPort) .endPort() @@ -219,14 +282,15 @@ private static IngressRule getIngressRule( private static io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRule getIngressRuleForV1beta1( ObjectMeta objectMeta, - FlinkDeploymentSpec spec, - Configuration effectiveConfig) { + IngressSpec spec, + Configuration effectiveConfig, + String serviceName) { final String clusterId = objectMeta.getName(); final int restPort = effectiveConfig.getInteger(RestOptions.PORT); URL ingressUrl = getIngressUrl( - spec.getIngress().getTemplate(), + spec.getTemplate(), objectMeta.getName(), objectMeta.getNamespace()); @@ -236,7 +300,7 @@ private static IngressRule getIngressRule( new io.fabric8.kubernetes.api.model.networking.v1beta1.HTTPIngressRuleValueBuilder() .addNewPath() .withNewBackend() - .withServiceName(clusterId + REST_SVC_NAME_SUFFIX) + .withServiceName(serviceName) .withServicePort(new IntOrString(restPort)) .endBackend() .endPath() diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java index 98344a9ba8..a451afbce3 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java @@ -350,6 +350,11 @@ public static FlinkDeployment prepareFlinkDeployment( flinkDeployment.setSpec(spec.getTemplate().getSpec()); + // Update Ingress template if exists to prevent path collision between Blue and Green + if (flinkDeployment.getSpec().getIngress() != null) { + flinkDeployment.getSpec().getIngress().setTemplate(blueGreenDeploymentType.toString().toLowerCase() + "-" + flinkDeployment.getSpec().getIngress().getTemplate()); + } + // Deployment metadata ObjectMeta flinkDeploymentMeta = getDependentObjectMeta(context.getBgDeployment()); flinkDeploymentMeta.setName(childDeploymentName); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java index 229406bbc9..de66681159 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java @@ -1224,6 +1224,6 @@ private static FlinkBlueGreenDeploymentSpec getTestFlinkDeploymentSpec(FlinkVers var flinkDeploymentTemplateSpec = FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build(); - return new FlinkBlueGreenDeploymentSpec(configuration, flinkDeploymentTemplateSpec); + return new FlinkBlueGreenDeploymentSpec(configuration, null,flinkDeploymentTemplateSpec); } } diff --git a/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml index d31d14aa24..0f8ef636fb 100644 --- a/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml @@ -24,6 +24,32 @@ spec: properties: spec: properties: + ingress: + properties: + annotations: + additionalProperties: + type: string + type: object + className: + type: string + labels: + additionalProperties: + type: string + type: object + template: + type: string + tls: + items: + properties: + hosts: + items: + type: string + type: array + secretName: + type: string + type: object + type: array + type: object configuration: additionalProperties: type: "string" From c6dba76d47a637360f4cf4f6b9b0c6620eafb8da Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Thu, 6 Nov 2025 11:00:55 -0500 Subject: [PATCH 2/7] Spotless style fix --- .../bluegreen/BlueGreenDeploymentService.java | 10 +-- .../operator/utils/IngressUtils.java | 88 ++++++++++--------- .../utils/bluegreen/BlueGreenUtils.java | 8 +- ...linkBlueGreenDeploymentControllerTest.java | 2 +- 4 files changed, 56 insertions(+), 52 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java index fcb93d37cd..3b61c0df4f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java @@ -618,8 +618,7 @@ public UpdateControl finalizeBlueGreenDeployment( * @param nextState which deployment (ACTIVE_BLUE or ACTIVE_GREEN) is currently active */ public void reconcileBlueGreenIngress( - BlueGreenContext context, - FlinkBlueGreenDeploymentState nextState) { + BlueGreenContext context, FlinkBlueGreenDeploymentState nextState) { var bgDeployment = context.getBgDeployment(); var bgSpec = bgDeployment.getSpec(); @@ -645,7 +644,6 @@ public void reconcileBlueGreenIngress( return; } - // Create a FlinkResourceContext for the active deployment to get proper config FlinkResourceContext ctx = context.getCtxFactory() @@ -655,13 +653,9 @@ public void reconcileBlueGreenIngress( // Call IngressUtils to reconcile the ingress pointing to the active service IngressUtils.reconcileBlueGreenIngress( - context, - serviceName, - deployConfig, - context.getJosdkContext()); + context, serviceName, deployConfig, context.getJosdkContext()); } - // ==================== Common Utility Methods ==================== public static UpdateControl patchStatusUpdateControl( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java index 28f929e35e..7a073e1fce 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java @@ -17,14 +17,11 @@ package org.apache.flink.kubernetes.operator.utils; -import io.javaoperatorsdk.operator.api.reconciler.Context; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment; -import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.api.spec.IngressSpec; -import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; @@ -44,6 +41,7 @@ import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressTLS; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.dsl.NonDeletingOperation; +import io.javaoperatorsdk.operator.api.reconciler.Context; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -101,7 +99,8 @@ public static void reconcileIngress( } var objectMeta = ctx.getResource().getMetadata(); if (spec.getIngress() != null) { - HasMetadata ingress = getIngress(objectMeta, spec.getIngress(), effectiveConfig, client); + HasMetadata ingress = + getIngress(objectMeta, spec.getIngress(), effectiveConfig, client); setOwnerReference(ctx.getResource(), Collections.singletonList(ingress)); LOG.info("Updating ingress rules {}", ingress); client.resource(ingress) @@ -126,30 +125,36 @@ public static void reconcileBlueGreenIngress( BlueGreenContext context, String serviceName, Configuration effectiveConfig, - Context client - ){ + Context client) { // todo see if I need to find way to cover this using this blueGreen method // todo see if eventRecorder is strictly required (would be nice to have) -// if (!ctx.getOperatorConfig().isManageIngress()) { -// if (spec.getIngress() != null) { -// eventRecorder.triggerEvent( -// ctx.getResource(), -// EventRecorder.Type.Warning, -// INGRESS_MANAGEMENT, -// INGRESS_MANAGEMENT_OFF_BUT_SPEC_SET, -// EventRecorder.Component.Operator, -// client); -// } -// -// return; -// } + // if (!ctx.getOperatorConfig().isManageIngress()) { + // if (spec.getIngress() != null) { + // eventRecorder.triggerEvent( + // ctx.getResource(), + // EventRecorder.Type.Warning, + // INGRESS_MANAGEMENT, + // INGRESS_MANAGEMENT_OFF_BUT_SPEC_SET, + // EventRecorder.Component.Operator, + // client); + // } + // + // return; + // } var spec = context.getBgDeployment().getSpec(); var objectMeta = context.getBgDeployment().getMetadata(); if (spec.getIngress() != null) { - HasMetadata ingress = getIngress(objectMeta, spec.getIngress(), effectiveConfig, client.getClient(), serviceName); + HasMetadata ingress = + getIngress( + objectMeta, + spec.getIngress(), + effectiveConfig, + client.getClient(), + serviceName); setOwnerReference(context.getBgDeployment(), Collections.singletonList(ingress)); LOG.info("BLUE GREEN Updating ingress rules {}", ingress); - client.getClient().resource(ingress) + client.getClient() + .resource(ingress) .inNamespace(objectMeta.getNamespace()) .createOr(NonDeletingOperation::update); } else { @@ -157,14 +162,12 @@ public static void reconcileBlueGreenIngress( Optional ingress; if (ingressInNetworkingV1(client.getClient())) { ingress = - client - .getSecondaryResource( - io.fabric8.kubernetes.api.model.networking.v1.Ingress - .class); + client.getSecondaryResource( + io.fabric8.kubernetes.api.model.networking.v1.Ingress.class); } else { ingress = client.getSecondaryResource(Ingress.class); } - ingress.ifPresent(i ->client.getClient().resource(i).delete()); + ingress.ifPresent(i -> client.getClient().resource(i).delete()); } } @@ -172,9 +175,13 @@ private static HasMetadata getIngress( ObjectMeta objectMeta, IngressSpec spec, Configuration effectiveConfig, - KubernetesClient client - ) { - return getIngress(objectMeta,spec,effectiveConfig,client, objectMeta.getName() + REST_SVC_NAME_SUFFIX); + KubernetesClient client) { + return getIngress( + objectMeta, + spec, + effectiveConfig, + client, + objectMeta.getName() + REST_SVC_NAME_SUFFIX); } private static HasMetadata getIngress( @@ -184,9 +191,7 @@ private static HasMetadata getIngress( KubernetesClient client, String serviceName) { Map labels = - spec.getLabels() == null - ? new HashMap<>() - : new HashMap<>(spec.getLabels()); + spec.getLabels() == null ? new HashMap<>() : new HashMap<>(spec.getLabels()); labels.put(Constants.LABEL_COMPONENT_KEY, LABEL_COMPONENT_INGRESS); if (ingressInNetworkingV1(client)) { return new IngressBuilder() @@ -230,7 +235,9 @@ private static HasMetadata getIngress( .withNewSpec() .withIngressClassName(spec.getClassName()) .withTls(ingressTLS) - .withRules(getIngressRuleForV1beta1(objectMeta, spec, effectiveConfig, serviceName)) + .withRules( + getIngressRuleForV1beta1( + objectMeta, spec, effectiveConfig, serviceName)) .endSpec() .build(); } @@ -238,15 +245,15 @@ private static HasMetadata getIngress( // Todo remove, creatse new svc with every ingress call private static IngressRule getIngressRule( - ObjectMeta objectMeta, IngressSpec spec, Configuration effectiveConfig, String serviceName) { + ObjectMeta objectMeta, + IngressSpec spec, + Configuration effectiveConfig, + String serviceName) { final String clusterId = objectMeta.getName(); final int restPort = effectiveConfig.getInteger(RestOptions.PORT); URL ingressUrl = - getIngressUrl( - spec.getTemplate(), - objectMeta.getName(), - objectMeta.getNamespace()); + getIngressUrl(spec.getTemplate(), objectMeta.getName(), objectMeta.getNamespace()); IngressRuleBuilder ingressRuleBuilder = new IngressRuleBuilder(); ingressRuleBuilder.withHttp( @@ -289,10 +296,7 @@ private static IngressRule getIngressRule( final int restPort = effectiveConfig.getInteger(RestOptions.PORT); URL ingressUrl = - getIngressUrl( - spec.getTemplate(), - objectMeta.getName(), - objectMeta.getNamespace()); + getIngressUrl(spec.getTemplate(), objectMeta.getName(), objectMeta.getNamespace()); io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRuleBuilder ingressRuleBuilder = new io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRuleBuilder(); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java index a451afbce3..cf9797e66c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java @@ -352,7 +352,13 @@ public static FlinkDeployment prepareFlinkDeployment( // Update Ingress template if exists to prevent path collision between Blue and Green if (flinkDeployment.getSpec().getIngress() != null) { - flinkDeployment.getSpec().getIngress().setTemplate(blueGreenDeploymentType.toString().toLowerCase() + "-" + flinkDeployment.getSpec().getIngress().getTemplate()); + flinkDeployment + .getSpec() + .getIngress() + .setTemplate( + blueGreenDeploymentType.toString().toLowerCase() + + "-" + + flinkDeployment.getSpec().getIngress().getTemplate()); } // Deployment metadata diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java index de66681159..b9dbdd4449 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java @@ -1224,6 +1224,6 @@ private static FlinkBlueGreenDeploymentSpec getTestFlinkDeploymentSpec(FlinkVers var flinkDeploymentTemplateSpec = FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build(); - return new FlinkBlueGreenDeploymentSpec(configuration, null,flinkDeploymentTemplateSpec); + return new FlinkBlueGreenDeploymentSpec(configuration, null, flinkDeploymentTemplateSpec); } } From 36261698fdd0dc6537766f1f019e7574985022a7 Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Thu, 6 Nov 2025 11:11:55 -0500 Subject: [PATCH 3/7] test fix --- .../kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java index 859d19f5d3..c5df792308 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java @@ -102,6 +102,7 @@ private static FlinkBlueGreenDeployment buildBlueGreenDeployment( var bgDeploymentSpec = new FlinkBlueGreenDeploymentSpec( new HashMap<>(), + null, FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build()); deployment.setSpec(bgDeploymentSpec); From 840ac59642b07c30016588cc368fd9236bbb1f3b Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Thu, 6 Nov 2025 12:23:14 -0500 Subject: [PATCH 4/7] changed field to nullable --- .../operator/api/spec/FlinkBlueGreenDeploymentSpec.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java index cec2654023..3caf8d260b 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java @@ -25,6 +25,8 @@ import lombok.Data; import lombok.NoArgsConstructor; +import javax.annotation.Nullable; + import java.util.Map; /** Spec that describes a Flink application with blue/green deployment capabilities. */ @@ -38,6 +40,7 @@ public class FlinkBlueGreenDeploymentSpec { @JsonProperty("configuration") private Map configuration; + @Nullable private IngressSpec ingress; private FlinkDeploymentTemplateSpec template; From 211fd13881daf77deafada42c1c8210ac11e21c1 Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Thu, 6 Nov 2025 12:30:18 -0500 Subject: [PATCH 5/7] spotless apply 2 --- .../operator/api/spec/FlinkBlueGreenDeploymentSpec.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java index 3caf8d260b..a515b0a91b 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java @@ -40,8 +40,7 @@ public class FlinkBlueGreenDeploymentSpec { @JsonProperty("configuration") private Map configuration; - @Nullable - private IngressSpec ingress; + @Nullable private IngressSpec ingress; private FlinkDeploymentTemplateSpec template; } From ac3e25bfff11f24717a4aab498d97de2a422ea82 Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Thu, 6 Nov 2025 21:54:58 -0500 Subject: [PATCH 6/7] updated docs for ingress addition --- docs/content/docs/custom-resource/reference.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md index 003b6495b0..98d18e7d65 100644 --- a/docs/content/docs/custom-resource/reference.md +++ b/docs/content/docs/custom-resource/reference.md @@ -89,6 +89,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r | Parameter | Type | Docs | | ----------| ---- | ---- | | configuration | java.util.Map | | +| ingress | org.apache.flink.kubernetes.operator.api.spec.IngressSpec | | | template | org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec | | ### FlinkDeploymentSpec From d61ca2ad7788e5f3607c39db990633058dca9802 Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Fri, 7 Nov 2025 15:34:23 -0500 Subject: [PATCH 7/7] fixed svc ref and refactor of strings that are passed --- .../bluegreen/BlueGreenDeploymentService.java | 13 +++++++------ .../kubernetes/operator/utils/IngressUtils.java | 10 +++++----- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java index 3b61c0df4f..e302a18887 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java @@ -607,17 +607,17 @@ public UpdateControl finalizeBlueGreenDeployment( context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0)); context.getDeploymentStatus().setSavepointTriggerId(null); - reconcileBlueGreenIngress(context, nextState); + updateBlueGreenIngress(context, nextState); return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING, null); } /** - * Reconciles the ingress for Blue/Green deployment, pointing to the active deployment. + * Updates the ingress for Blue/Green deployment, pointing to the active deployment. * * @param context the Blue/Green context * @param nextState which deployment (ACTIVE_BLUE or ACTIVE_GREEN) is currently active */ - public void reconcileBlueGreenIngress( + public void updateBlueGreenIngress( BlueGreenContext context, FlinkBlueGreenDeploymentState nextState) { var bgDeployment = context.getBgDeployment(); @@ -633,11 +633,9 @@ public void reconcileBlueGreenIngress( switch (nextState) { case ACTIVE_BLUE: activeDeployment = context.getBlueDeployment(); - serviceName = context.getBgDeployment().getMetadata().getName() + "-blue"; break; case ACTIVE_GREEN: activeDeployment = context.getGreenDeployment(); - serviceName = context.getBgDeployment().getMetadata().getName() + "-green"; break; default: LOG.debug("Skipping ingress reconciliation for non-active state: {}", nextState); @@ -653,7 +651,10 @@ public void reconcileBlueGreenIngress( // Call IngressUtils to reconcile the ingress pointing to the active service IngressUtils.reconcileBlueGreenIngress( - context, serviceName, deployConfig, context.getJosdkContext()); + context, + activeDeployment.getMetadata().getName(), + deployConfig, + context.getJosdkContext()); } // ==================== Common Utility Methods ==================== diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java index 7a073e1fce..a84491b386 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java @@ -123,7 +123,7 @@ public static void reconcileIngress( public static void reconcileBlueGreenIngress( BlueGreenContext context, - String serviceName, + String targetDeploymentName, Configuration effectiveConfig, Context client) { // todo see if I need to find way to cover this using this blueGreen method @@ -141,16 +141,16 @@ public static void reconcileBlueGreenIngress( // // return; // } - var spec = context.getBgDeployment().getSpec(); + var flinkBlueGreenDeploymentSpec = context.getBgDeployment().getSpec(); var objectMeta = context.getBgDeployment().getMetadata(); - if (spec.getIngress() != null) { + if (flinkBlueGreenDeploymentSpec.getIngress() != null) { HasMetadata ingress = getIngress( objectMeta, - spec.getIngress(), + flinkBlueGreenDeploymentSpec.getIngress(), effectiveConfig, client.getClient(), - serviceName); + targetDeploymentName + REST_SVC_NAME_SUFFIX); setOwnerReference(context.getBgDeployment(), Collections.singletonList(ingress)); LOG.info("BLUE GREEN Updating ingress rules {}", ingress); client.getClient()