Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions deploy/rustfs-operator/crds/tenant.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,25 @@ spec:
- null
nullable: true
type: string
podDeletionPolicyWhenNodeIsDown:
description: |-
Controls how the operator handles Pods when the node hosting them is down (NotReady/Unknown).

Typical use-case: a StatefulSet Pod gets stuck in Terminating when the node goes down.
Setting this to ForceDelete allows the operator to force delete the Pod object so the
StatefulSet controller can recreate it elsewhere.

Values: DoNothing | Delete | ForceDelete
enum:
- DoNothing
- Delete
- ForceDelete
- DeleteStatefulSetPod
- DeleteDeploymentPod
- DeleteBothStatefulSetAndDeploymentPod
- null
nullable: true
type: string
pools:
items:
description: |-
Expand Down
5 changes: 5 additions & 0 deletions deploy/rustfs-operator/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ rules:
resources: ["configmaps", "secrets", "serviceaccounts", "pods", "services"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]

# Node status lookup (node down detection)
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", "watch"]

# RBAC resources created for tenants
- apiGroups: ["rbac.authorization.k8s.io"]
resources: ["roles", "rolebindings"]
Expand Down
311 changes: 310 additions & 1 deletion src/reconcile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
use crate::context::Context;
use crate::types::v1alpha1::tenant::Tenant;
use crate::{context, types};
use k8s_openapi::api::core::v1 as corev1;
use kube::ResourceExt;
use kube::api::{DeleteParams, ListParams, PropagationPolicy};
use kube::runtime::controller::Action;
use kube::runtime::events::EventType;
use snafu::Snafu;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, error};
use tracing::{debug, error, warn};

#[derive(Snafu, Debug)]
pub enum Error {
Expand Down Expand Up @@ -64,6 +66,17 @@ pub async fn reconcile_rustfs(tenant: Arc<Tenant>, ctx: Arc<Context>) -> Result<
return Err(e.into());
}

// 0. Optional: unblock StatefulSet pods stuck terminating when their node is down.
// This is inspired by Longhorn's "Pod Deletion Policy When Node is Down".
if let Some(policy) = latest_tenant
.spec
.pod_deletion_policy_when_node_is_down
.clone()
&& policy != crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DoNothing
{
cleanup_stuck_terminating_pods_on_down_nodes(&latest_tenant, &ns, &ctx, policy).await?;
}

// 1. Create RBAC resources (conditionally based on service account settings)
let custom_sa = latest_tenant.spec.service_account_name.is_some();
let create_rbac = latest_tenant
Expand Down Expand Up @@ -367,6 +380,167 @@ pub async fn reconcile_rustfs(tenant: Arc<Tenant>, ctx: Arc<Context>) -> Result<
}
}

async fn cleanup_stuck_terminating_pods_on_down_nodes(
tenant: &Tenant,
namespace: &str,
ctx: &Context,
policy: crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown,
) -> Result<(), Error> {
let pods_api: kube::Api<corev1::Pod> = kube::Api::namespaced(ctx.client.clone(), namespace);
let nodes_api: kube::Api<corev1::Node> = kube::Api::all(ctx.client.clone());

let selector = format!("rustfs.tenant={}", tenant.name());
let pods = pods_api
.list(&ListParams::default().labels(&selector))
.await
.map_err(|source| Error::Context {
source: context::Error::Kube { source },
})?;

for pod in pods.items {
// Only act on terminating pods to keep the behavior conservative.
if pod.metadata.deletion_timestamp.is_none() {
continue;
}

// Longhorn behavior: only force delete terminating pods managed by a controller.
// We approximate controller type via ownerReferences:
// - StatefulSet pod: owner kind == "StatefulSet"
// - Deployment pod: owner kind == "ReplicaSet" (Deployment owns ReplicaSet)
if !pod_matches_policy_controller_kind(&pod, &policy) {
continue;
}

let Some(node_name) = pod.spec.as_ref().and_then(|s| s.node_name.clone()) else {
continue;
};

let node_is_down = match nodes_api.get(&node_name).await {
Ok(node) => is_node_down(&node),
Err(kube::Error::Api(ae)) if ae.code == 404 => true,
Err(source) => {
return Err(Error::Context {
source: context::Error::Kube { source },
});
}
};

if !node_is_down {
continue;
}

let pod_name = pod.name_any();
warn!(
"Node {} is detected down. Pod {} is terminating on it.",
node_name, pod_name
);
let delete_params = match policy {
crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DoNothing => continue,
// Legacy option: normal delete.
crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::Delete => {
DeleteParams::default()
}
// Legacy option: explicit force delete.
crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::ForceDelete
// Longhorn-compatible options: always force delete.
| crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DeleteStatefulSetPod
| crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DeleteDeploymentPod
| crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DeleteBothStatefulSetAndDeploymentPod => {
DeleteParams {
grace_period_seconds: Some(0),
propagation_policy: Some(PropagationPolicy::Background),
..DeleteParams::default()
}
}
};

match pods_api.delete(&pod_name, &delete_params).await {
Ok(_) => {
let reason = match policy {
crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::ForceDelete => {
"ForceDeletedPodOnDownNode"
}
crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::Delete => {
"DeletedPodOnDownNode"
}
crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DeleteStatefulSetPod
| crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DeleteDeploymentPod
| crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DeleteBothStatefulSetAndDeploymentPod => {
"LonghornLikeForceDeletedPodOnDownNode"
}
crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DoNothing => {
""
}
};
let _ = ctx
.record(
tenant,
EventType::Warning,
reason,
&format!(
"Pod '{}' is terminating on down node '{}'; applied policy {:?}",
pod_name, node_name, policy
),
)
.await;
}
Err(kube::Error::Api(ae)) if ae.code == 404 => {
// Pod already gone.
}
Err(source) => {
return Err(Error::Context {
source: context::Error::Kube { source },
});
}
}
}

Ok(())
}

fn pod_matches_policy_controller_kind(
pod: &corev1::Pod,
policy: &crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown,
) -> bool {
use crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown as P;

match policy {
// Longhorn-compatible modes: only act on controller-owned pods of certain kinds.
P::DeleteStatefulSetPod => pod_has_owner_kind(pod, "StatefulSet"),
P::DeleteDeploymentPod => pod_has_owner_kind(pod, "ReplicaSet"),
P::DeleteBothStatefulSetAndDeploymentPod => {
pod_has_owner_kind(pod, "StatefulSet") || pod_has_owner_kind(pod, "ReplicaSet")
}
// Legacy modes: act on any tenant-owned pod.
_ => true,
}
}

fn pod_has_owner_kind(pod: &corev1::Pod, kind: &str) -> bool {
pod.metadata
.owner_references
.as_ref()
.is_some_and(|refs| refs.iter().any(|r| r.kind == kind))
}

fn is_node_down(node: &corev1::Node) -> bool {
let Some(status) = &node.status else {
return false;
};
let Some(conditions) = &status.conditions else {
return false;
};

for c in conditions {
if c.type_ == "Ready" {
// Ready=False or Ready=Unknown => treat as down
return c.status != "True";
}
}

false
}

pub fn error_policy(_object: Arc<Tenant>, error: &Error, _ctx: Arc<Context>) -> Action {
error!("error_policy: {:?}", error);

Expand Down Expand Up @@ -415,6 +589,11 @@ pub fn error_policy(_object: Arc<Tenant>, error: &Error, _ctx: Arc<Context>) ->

#[cfg(test)]
mod tests {
use super::is_node_down;
use super::{pod_has_owner_kind, pod_matches_policy_controller_kind};
use k8s_openapi::api::core::v1 as corev1;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1;

// Test 10: RBAC creation logic - default behavior
#[test]
fn test_should_create_rbac_default() {
Expand Down Expand Up @@ -497,4 +676,134 @@ mod tests {
let sa_name_custom = tenant_custom.service_account_name();
assert_eq!(sa_name_custom, "custom-sa");
}

#[test]
fn test_is_node_down_ready_true() {
let node = corev1::Node {
status: Some(corev1::NodeStatus {
conditions: Some(vec![corev1::NodeCondition {
type_: "Ready".to_string(),
status: "True".to_string(),
..Default::default()
}]),
..Default::default()
}),
..Default::default()
};
assert!(!is_node_down(&node));
}

#[test]
fn test_is_node_down_ready_false() {
let node = corev1::Node {
status: Some(corev1::NodeStatus {
conditions: Some(vec![corev1::NodeCondition {
type_: "Ready".to_string(),
status: "False".to_string(),
..Default::default()
}]),
..Default::default()
}),
..Default::default()
};
assert!(is_node_down(&node));
}

#[test]
fn test_is_node_down_ready_unknown() {
let node = corev1::Node {
status: Some(corev1::NodeStatus {
conditions: Some(vec![corev1::NodeCondition {
type_: "Ready".to_string(),
status: "Unknown".to_string(),
..Default::default()
}]),
..Default::default()
}),
..Default::default()
};
assert!(is_node_down(&node));
}

#[test]
fn test_pod_owner_kind_helpers() {
let pod = corev1::Pod {
metadata: metav1::ObjectMeta {
owner_references: Some(vec![metav1::OwnerReference {
api_version: "apps/v1".to_string(),
kind: "StatefulSet".to_string(),
name: "ss".to_string(),
uid: "uid".to_string(),
..Default::default()
}]),
..Default::default()
},
..Default::default()
};

assert!(pod_has_owner_kind(&pod, "StatefulSet"));
assert!(!pod_has_owner_kind(&pod, "ReplicaSet"));
}

#[test]
fn test_policy_controller_kind_matching_longhorn_like() {
use crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown as P;

let ss_pod = corev1::Pod {
metadata: metav1::ObjectMeta {
deletion_timestamp: Some(metav1::Time(chrono::Utc::now())),
owner_references: Some(vec![metav1::OwnerReference {
api_version: "apps/v1".to_string(),
kind: "StatefulSet".to_string(),
name: "ss".to_string(),
uid: "uid".to_string(),
..Default::default()
}]),
..Default::default()
},
..Default::default()
};

let deploy_pod = corev1::Pod {
metadata: metav1::ObjectMeta {
deletion_timestamp: Some(metav1::Time(chrono::Utc::now())),
owner_references: Some(vec![metav1::OwnerReference {
api_version: "apps/v1".to_string(),
kind: "ReplicaSet".to_string(),
name: "rs".to_string(),
uid: "uid".to_string(),
..Default::default()
}]),
..Default::default()
},
..Default::default()
};

assert!(pod_matches_policy_controller_kind(
&ss_pod,
&P::DeleteStatefulSetPod
));
assert!(!pod_matches_policy_controller_kind(
&deploy_pod,
&P::DeleteStatefulSetPod
));

assert!(pod_matches_policy_controller_kind(
&deploy_pod,
&P::DeleteDeploymentPod
));
assert!(!pod_matches_policy_controller_kind(
&ss_pod,
&P::DeleteDeploymentPod
));

assert!(pod_matches_policy_controller_kind(
&ss_pod,
&P::DeleteBothStatefulSetAndDeploymentPod
));
assert!(pod_matches_policy_controller_kind(
&deploy_pod,
&P::DeleteBothStatefulSetAndDeploymentPod
));
}
}
Loading