From 9a3a0202f36610c641f13dce96042b624ae9c4eb Mon Sep 17 00:00:00 2001 From: loverustfs Date: Sat, 20 Dec 2025 10:23:59 +0800 Subject: [PATCH 1/4] feat: add node-down pod deletion policy --- deploy/rustfs-operator/crds/tenant.yaml | 19 ++ .../templates/clusterrole.yaml | 5 + src/reconcile.rs | 295 ++++++++++++++++++ src/types/v1alpha1/k8s.rs | 38 +++ src/types/v1alpha1/tenant.rs | 10 + 5 files changed, 367 insertions(+) diff --git a/deploy/rustfs-operator/crds/tenant.yaml b/deploy/rustfs-operator/crds/tenant.yaml index 7178381..275dd18 100644 --- a/deploy/rustfs-operator/crds/tenant.yaml +++ b/deploy/rustfs-operator/crds/tenant.yaml @@ -311,6 +311,25 @@ spec: - null nullable: true type: string + podDeletionPolicyWhenNodeIsDown: + description: |- + Controls how the operator handles Pods when the node hosting them is down (NotReady/Unknown). + + Typical use-case: a StatefulSet Pod gets stuck in Terminating when the node goes down. + Setting this to ForceDelete allows the operator to force delete the Pod object so the + StatefulSet controller can recreate it elsewhere. + + Values: DoNothing | Delete | ForceDelete + enum: + - DoNothing + - Delete + - ForceDelete + - DeleteStatefulSetPod + - DeleteDeploymentPod + - DeleteBothStatefulSetAndDeploymentPod + - null + nullable: true + type: string pools: items: description: |- diff --git a/deploy/rustfs-operator/templates/clusterrole.yaml b/deploy/rustfs-operator/templates/clusterrole.yaml index 77519c5..ba53196 100644 --- a/deploy/rustfs-operator/templates/clusterrole.yaml +++ b/deploy/rustfs-operator/templates/clusterrole.yaml @@ -19,6 +19,11 @@ rules: resources: ["configmaps", "secrets", "serviceaccounts", "pods", "services"] verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] + # Node status lookup (node down detection) + - apiGroups: [""] + resources: ["nodes"] + verbs: ["get", "list", "watch"] + # RBAC resources created for tenants - apiGroups: ["rbac.authorization.k8s.io"] resources: ["roles", "rolebindings"] diff --git a/src/reconcile.rs b/src/reconcile.rs index d6d5b39..73dfe83 100644 --- a/src/reconcile.rs +++ b/src/reconcile.rs @@ -15,6 +15,8 @@ use crate::context::Context; use crate::types::v1alpha1::tenant::Tenant; use crate::{context, types}; +use k8s_openapi::api::core::v1 as corev1; +use kube::api::{DeleteParams, ListParams, PropagationPolicy}; use kube::ResourceExt; use kube::runtime::controller::Action; use kube::runtime::events::EventType; @@ -64,6 +66,19 @@ pub async fn reconcile_rustfs(tenant: Arc, ctx: Arc) -> Result< return Err(e.into()); } + // 0. Optional: unblock StatefulSet pods stuck terminating when their node is down. + // This is inspired by Longhorn's "Pod Deletion Policy When Node is Down". + if let Some(policy) = latest_tenant + .spec + .pod_deletion_policy_when_node_is_down + .clone() + { + if policy != crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DoNothing { + cleanup_stuck_terminating_pods_on_down_nodes(&latest_tenant, &ns, &ctx, policy) + .await?; + } + } + // 1. Create RBAC resources (conditionally based on service account settings) let custom_sa = latest_tenant.spec.service_account_name.is_some(); let create_rbac = latest_tenant @@ -367,6 +382,163 @@ pub async fn reconcile_rustfs(tenant: Arc, ctx: Arc) -> Result< } } +async fn cleanup_stuck_terminating_pods_on_down_nodes( + tenant: &Tenant, + namespace: &str, + ctx: &Context, + policy: crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown, +) -> Result<(), Error> { + let pods_api: kube::Api = kube::Api::namespaced(ctx.client.clone(), namespace); + let nodes_api: kube::Api = kube::Api::all(ctx.client.clone()); + + let selector = format!("rustfs.tenant={}", tenant.name()); + let pods = pods_api + .list(&ListParams::default().labels(&selector)) + .await + .map_err(|source| Error::Context { + source: context::Error::Kube { source }, + })?; + + for pod in pods.items { + // Only act on terminating pods to keep the behavior conservative. + if pod.metadata.deletion_timestamp.is_none() { + continue; + } + + // Longhorn behavior: only force delete terminating pods managed by a controller. + // We approximate controller type via ownerReferences: + // - StatefulSet pod: owner kind == "StatefulSet" + // - Deployment pod: owner kind == "ReplicaSet" (Deployment owns ReplicaSet) + if !pod_matches_policy_controller_kind(&pod, &policy) { + continue; + } + + let Some(node_name) = pod.spec.as_ref().and_then(|s| s.node_name.clone()) else { + continue; + }; + + let node_is_down = match nodes_api.get(&node_name).await { + Ok(node) => is_node_down(&node), + Err(kube::Error::Api(ae)) if ae.code == 404 => true, + Err(source) => { + return Err(Error::Context { + source: context::Error::Kube { source }, + }); + } + }; + + if !node_is_down { + continue; + } + + let pod_name = pod.name_any(); + let delete_params = match policy { + crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DoNothing => continue, + // Legacy option: normal delete. + crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::Delete => { + DeleteParams::default() + } + // Legacy option: explicit force delete. + crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::ForceDelete + // Longhorn-compatible options: always force delete. + | crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DeleteStatefulSetPod + | crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DeleteDeploymentPod + | crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DeleteBothStatefulSetAndDeploymentPod => { + DeleteParams { + grace_period_seconds: Some(0), + propagation_policy: Some(PropagationPolicy::Background), + ..DeleteParams::default() + } + } + }; + + match pods_api.delete(&pod_name, &delete_params).await { + Ok(_) => { + let reason = match policy { + crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::ForceDelete => { + "ForceDeletedPodOnDownNode" + } + crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::Delete => { + "DeletedPodOnDownNode" + } + crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DeleteStatefulSetPod + | crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DeleteDeploymentPod + | crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DeleteBothStatefulSetAndDeploymentPod => { + "LonghornLikeForceDeletedPodOnDownNode" + } + crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DoNothing => { + "" + } + }; + let _ = ctx + .record( + tenant, + EventType::Warning, + reason, + &format!( + "Pod '{}' is terminating on down node '{}'; applied policy {:?}", + pod_name, node_name, policy + ), + ) + .await; + } + Err(kube::Error::Api(ae)) if ae.code == 404 => { + // Pod already gone. + } + Err(source) => { + return Err(Error::Context { + source: context::Error::Kube { source }, + }); + } + } + } + + Ok(()) +} + +fn pod_matches_policy_controller_kind( + pod: &corev1::Pod, + policy: &crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown, +) -> bool { + use crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown as P; + + match policy { + // Longhorn-compatible modes: only act on controller-owned pods of certain kinds. + P::DeleteStatefulSetPod => pod_has_owner_kind(pod, "StatefulSet"), + P::DeleteDeploymentPod => pod_has_owner_kind(pod, "ReplicaSet"), + P::DeleteBothStatefulSetAndDeploymentPod => { + pod_has_owner_kind(pod, "StatefulSet") || pod_has_owner_kind(pod, "ReplicaSet") + } + // Legacy modes: act on any tenant-owned pod. + _ => true, + } +} + +fn pod_has_owner_kind(pod: &corev1::Pod, kind: &str) -> bool { + pod.metadata + .owner_references + .as_ref() + .is_some_and(|refs| refs.iter().any(|r| r.kind == kind)) +} + +fn is_node_down(node: &corev1::Node) -> bool { + let Some(status) = &node.status else { + return false; + }; + let Some(conditions) = &status.conditions else { + return false; + }; + + for c in conditions { + if c.type_ == "Ready" { + // Ready=False or Ready=Unknown => treat as down + return c.status != "True"; + } + } + + false +} + pub fn error_policy(_object: Arc, error: &Error, _ctx: Arc) -> Action { error!("error_policy: {:?}", error); @@ -415,6 +587,11 @@ pub fn error_policy(_object: Arc, error: &Error, _ctx: Arc) -> #[cfg(test)] mod tests { + use super::is_node_down; + use super::{pod_has_owner_kind, pod_matches_policy_controller_kind}; + use k8s_openapi::api::core::v1 as corev1; + use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1; + // Test 10: RBAC creation logic - default behavior #[test] fn test_should_create_rbac_default() { @@ -497,4 +674,122 @@ mod tests { let sa_name_custom = tenant_custom.service_account_name(); assert_eq!(sa_name_custom, "custom-sa"); } + + #[test] + fn test_is_node_down_ready_true() { + let node = corev1::Node { + status: Some(corev1::NodeStatus { + conditions: Some(vec![corev1::NodeCondition { + type_: "Ready".to_string(), + status: "True".to_string(), + ..Default::default() + }]), + ..Default::default() + }), + ..Default::default() + }; + assert!(!is_node_down(&node)); + } + + #[test] + fn test_is_node_down_ready_false() { + let node = corev1::Node { + status: Some(corev1::NodeStatus { + conditions: Some(vec![corev1::NodeCondition { + type_: "Ready".to_string(), + status: "False".to_string(), + ..Default::default() + }]), + ..Default::default() + }), + ..Default::default() + }; + assert!(is_node_down(&node)); + } + + #[test] + fn test_is_node_down_ready_unknown() { + let node = corev1::Node { + status: Some(corev1::NodeStatus { + conditions: Some(vec![corev1::NodeCondition { + type_: "Ready".to_string(), + status: "Unknown".to_string(), + ..Default::default() + }]), + ..Default::default() + }), + ..Default::default() + }; + assert!(is_node_down(&node)); + } + + #[test] + fn test_pod_owner_kind_helpers() { + let pod = corev1::Pod { + metadata: metav1::ObjectMeta { + owner_references: Some(vec![metav1::OwnerReference { + api_version: "apps/v1".to_string(), + kind: "StatefulSet".to_string(), + name: "ss".to_string(), + uid: "uid".to_string(), + ..Default::default() + }]), + ..Default::default() + }, + ..Default::default() + }; + + assert!(pod_has_owner_kind(&pod, "StatefulSet")); + assert!(!pod_has_owner_kind(&pod, "ReplicaSet")); + } + + #[test] + fn test_policy_controller_kind_matching_longhorn_like() { + use crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown as P; + + let ss_pod = corev1::Pod { + metadata: metav1::ObjectMeta { + deletion_timestamp: Some(metav1::Time(chrono::Utc::now())), + owner_references: Some(vec![metav1::OwnerReference { + api_version: "apps/v1".to_string(), + kind: "StatefulSet".to_string(), + name: "ss".to_string(), + uid: "uid".to_string(), + ..Default::default() + }]), + ..Default::default() + }, + ..Default::default() + }; + + let deploy_pod = corev1::Pod { + metadata: metav1::ObjectMeta { + deletion_timestamp: Some(metav1::Time(chrono::Utc::now())), + owner_references: Some(vec![metav1::OwnerReference { + api_version: "apps/v1".to_string(), + kind: "ReplicaSet".to_string(), + name: "rs".to_string(), + uid: "uid".to_string(), + ..Default::default() + }]), + ..Default::default() + }, + ..Default::default() + }; + + assert!(pod_matches_policy_controller_kind(&ss_pod, &P::DeleteStatefulSetPod)); + assert!(!pod_matches_policy_controller_kind(&deploy_pod, &P::DeleteStatefulSetPod)); + + assert!(pod_matches_policy_controller_kind(&deploy_pod, &P::DeleteDeploymentPod)); + assert!(!pod_matches_policy_controller_kind(&ss_pod, &P::DeleteDeploymentPod)); + + assert!(pod_matches_policy_controller_kind( + &ss_pod, + &P::DeleteBothStatefulSetAndDeploymentPod + )); + assert!(pod_matches_policy_controller_kind( + &deploy_pod, + &P::DeleteBothStatefulSetAndDeploymentPod + )); + } } diff --git a/src/types/v1alpha1/k8s.rs b/src/types/v1alpha1/k8s.rs index 685a0aa..9e9dfda 100644 --- a/src/types/v1alpha1/k8s.rs +++ b/src/types/v1alpha1/k8s.rs @@ -56,3 +56,41 @@ pub enum ImagePullPolicy { #[default] IfNotPresent, } + +/// Pod deletion policy when the node hosting the Pod is down (NotReady/Unknown). +/// +/// This is primarily intended to unblock StatefulSet pods stuck in terminating state +/// when the node becomes unreachable. +/// +/// WARNING: Force-deleting pods can have data consistency implications depending on +/// your storage backend and workload semantics. +#[derive(Default, Deserialize, Serialize, Clone, Debug, JsonSchema, Display, PartialEq, Eq)] +#[serde(rename_all = "PascalCase")] +#[schemars(rename_all = "PascalCase")] +pub enum PodDeletionPolicyWhenNodeIsDown { + /// Do not delete pods automatically. + #[strum(to_string = "DoNothing")] + #[default] + DoNothing, + + /// Request a normal delete for the pod. + #[strum(to_string = "Delete")] + Delete, + + /// Force delete the pod with gracePeriodSeconds=0. + #[strum(to_string = "ForceDelete")] + ForceDelete, + + /// Longhorn-compatible: force delete StatefulSet terminating pods on down nodes. + #[strum(to_string = "DeleteStatefulSetPod")] + DeleteStatefulSetPod, + + /// Longhorn-compatible: force delete Deployment terminating pods on down nodes. + /// (Deployment pods are owned by ReplicaSet.) + #[strum(to_string = "DeleteDeploymentPod")] + DeleteDeploymentPod, + + /// Longhorn-compatible: force delete both StatefulSet and Deployment terminating pods on down nodes. + #[strum(to_string = "DeleteBothStatefulSetAndDeploymentPod")] + DeleteBothStatefulSetAndDeploymentPod, +} diff --git a/src/types/v1alpha1/tenant.rs b/src/types/v1alpha1/tenant.rs index 757094c..541e837 100644 --- a/src/types/v1alpha1/tenant.rs +++ b/src/types/v1alpha1/tenant.rs @@ -67,6 +67,16 @@ pub struct TenantSpec { #[serde(default, skip_serializing_if = "Option::is_none")] pub pod_management_policy: Option, + /// Controls how the operator handles Pods when the node hosting them is down (NotReady/Unknown). + /// + /// Typical use-case: a StatefulSet Pod gets stuck in Terminating when the node goes down. + /// Setting this to `ForceDelete` allows the operator to force delete the Pod object so the + /// StatefulSet controller can recreate it elsewhere. + /// + /// Values: DoNothing | Delete | ForceDelete + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pod_deletion_policy_when_node_is_down: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] pub env: Vec, From ebb733b1fcbbc36d47358f33d5c0883fc37a8642 Mon Sep 17 00:00:00 2001 From: loverustfs Date: Sat, 20 Dec 2025 11:24:38 +0800 Subject: [PATCH 2/4] fix: apply rustfmt formatting fixes - Reformat imports to follow standard order - Make cleanup_stuck_terminating_pods_on_down_nodes call single-line - Reformat assert! statements in tests to multi-line for better readability --- src/reconcile.rs | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/src/reconcile.rs b/src/reconcile.rs index 73dfe83..cefeeef 100644 --- a/src/reconcile.rs +++ b/src/reconcile.rs @@ -16,8 +16,8 @@ use crate::context::Context; use crate::types::v1alpha1::tenant::Tenant; use crate::{context, types}; use k8s_openapi::api::core::v1 as corev1; -use kube::api::{DeleteParams, ListParams, PropagationPolicy}; use kube::ResourceExt; +use kube::api::{DeleteParams, ListParams, PropagationPolicy}; use kube::runtime::controller::Action; use kube::runtime::events::EventType; use snafu::Snafu; @@ -74,8 +74,7 @@ pub async fn reconcile_rustfs(tenant: Arc, ctx: Arc) -> Result< .clone() { if policy != crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DoNothing { - cleanup_stuck_terminating_pods_on_down_nodes(&latest_tenant, &ns, &ctx, policy) - .await?; + cleanup_stuck_terminating_pods_on_down_nodes(&latest_tenant, &ns, &ctx, policy).await?; } } @@ -777,11 +776,23 @@ mod tests { ..Default::default() }; - assert!(pod_matches_policy_controller_kind(&ss_pod, &P::DeleteStatefulSetPod)); - assert!(!pod_matches_policy_controller_kind(&deploy_pod, &P::DeleteStatefulSetPod)); + assert!(pod_matches_policy_controller_kind( + &ss_pod, + &P::DeleteStatefulSetPod + )); + assert!(!pod_matches_policy_controller_kind( + &deploy_pod, + &P::DeleteStatefulSetPod + )); - assert!(pod_matches_policy_controller_kind(&deploy_pod, &P::DeleteDeploymentPod)); - assert!(!pod_matches_policy_controller_kind(&ss_pod, &P::DeleteDeploymentPod)); + assert!(pod_matches_policy_controller_kind( + &deploy_pod, + &P::DeleteDeploymentPod + )); + assert!(!pod_matches_policy_controller_kind( + &ss_pod, + &P::DeleteDeploymentPod + )); assert!(pod_matches_policy_controller_kind( &ss_pod, From 3e736d107c8249092d69447b19935e97a514c6e1 Mon Sep 17 00:00:00 2001 From: loverustfs Date: Sat, 20 Dec 2025 11:41:37 +0800 Subject: [PATCH 3/4] fix: resolve clippy collapsible-if warning --- src/reconcile.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/reconcile.rs b/src/reconcile.rs index cefeeef..7ddbb7e 100644 --- a/src/reconcile.rs +++ b/src/reconcile.rs @@ -72,10 +72,9 @@ pub async fn reconcile_rustfs(tenant: Arc, ctx: Arc) -> Result< .spec .pod_deletion_policy_when_node_is_down .clone() + && policy != crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DoNothing { - if policy != crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DoNothing { - cleanup_stuck_terminating_pods_on_down_nodes(&latest_tenant, &ns, &ctx, policy).await?; - } + cleanup_stuck_terminating_pods_on_down_nodes(&latest_tenant, &ns, &ctx, policy).await?; } // 1. Create RBAC resources (conditionally based on service account settings) From 0fc046f64eb4083d6d3fc8ff693b0714fda9ecc9 Mon Sep 17 00:00:00 2001 From: loverustfs Date: Sat, 20 Dec 2025 19:53:11 +0800 Subject: [PATCH 4/4] fix: emit warning log when node is detected down --- src/reconcile.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/reconcile.rs b/src/reconcile.rs index 7ddbb7e..77a00e6 100644 --- a/src/reconcile.rs +++ b/src/reconcile.rs @@ -23,7 +23,7 @@ use kube::runtime::events::EventType; use snafu::Snafu; use std::sync::Arc; use std::time::Duration; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; #[derive(Snafu, Debug)] pub enum Error { @@ -430,6 +430,10 @@ async fn cleanup_stuck_terminating_pods_on_down_nodes( } let pod_name = pod.name_any(); + warn!( + "Node {} is detected down. Pod {} is terminating on it.", + node_name, pod_name + ); let delete_params = match policy { crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DoNothing => continue, // Legacy option: normal delete.