diff --git a/apis/apps/v1alpha1/daemonset_types.go b/apis/apps/v1alpha1/daemonset_types.go index 2cce16500c..12e13cb0f8 100644 --- a/apis/apps/v1alpha1/daemonset_types.go +++ b/apis/apps/v1alpha1/daemonset_types.go @@ -163,6 +163,18 @@ type DaemonSetSpec struct { // Currently, we only support pre-delete hook for Advanced DaemonSet. // +optional Lifecycle *appspub.Lifecycle `json:"lifecycle,omitempty"` + + // volumeClaimTemplates is a list of claims that pods are allowed to reference. + // The DaemonSet controller is responsible for mapping network identities to + // claims in a way that maintains the identity of a pod. Every claim in + // this list must have at least one matching (by name) volumeMount in one + // container in the template. A claim in this list takes precedence over + // any volumes in the template, with the same name. + // TODO: Define the behavior if a claim already exists with the same name. + // +optional + // +kubebuilder:pruning:PreserveUnknownFields + // +kubebuilder:validation:Schemaless + VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"` } // DaemonSetStatus defines the observed state of DaemonSet diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 53ef3f1814..a5b31e5f91 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -982,6 +982,13 @@ func (in *DaemonSetSpec) DeepCopyInto(out *DaemonSetSpec) { *out = new(pub.Lifecycle) (*in).DeepCopyInto(*out) } + if in.VolumeClaimTemplates != nil { + in, out := &in.VolumeClaimTemplates, &out.VolumeClaimTemplates + *out = make([]corev1.PersistentVolumeClaim, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DaemonSetSpec. diff --git a/config/crd/bases/apps.kruise.io_daemonsets.yaml b/config/crd/bases/apps.kruise.io_daemonsets.yaml index 25d967b2af..0077e2bb23 100644 --- a/config/crd/bases/apps.kruise.io_daemonsets.yaml +++ b/config/crd/bases/apps.kruise.io_daemonsets.yaml @@ -352,6 +352,15 @@ spec: or "OnDelete". Default is RollingUpdate. type: string type: object + volumeClaimTemplates: + description: |- + volumeClaimTemplates is a list of claims that pods are allowed to reference. + The DaemonSet controller is responsible for mapping network identities to + claims in a way that maintains the identity of a pod. Every claim in + this list must have at least one matching (by name) volumeMount in one + container in the template. A claim in this list takes precedence over + any volumes in the template, with the same name. + x-kubernetes-preserve-unknown-fields: true required: - selector - template diff --git a/docs/proposals/20250722-ads-volumeclaimtemplate.md b/docs/proposals/20250722-ads-volumeclaimtemplate.md new file mode 100644 index 0000000000..a9e6c2b013 --- /dev/null +++ b/docs/proposals/20250722-ads-volumeclaimtemplate.md @@ -0,0 +1,118 @@ +--- +title: volumeClaimTemplate for Advanced DaemonSet +authors: + +- "@chengjoey" + +reviewers: + +- "@ChristianCiach" + +- "@furykerry" + +- "@ABNER-1" + +creation-date: 2025-07-22 +last-updated: 2025-07-22 +status: implementable +--- + +# volumeClaimTemplate for Advanced DaemonSet +Add volumeClaimTemplate to Advanced DaemonSet + +## Table of Contents + +- [volumeClaimTemplate for Advanced DaemonSet](#volumeClaimTemplate-for-Advanced-DaemonSet) + - [Table of Contents](#table-of-contents) + - [Motivation](#motivation) + - [Proposal](#proposal) + - [User Stories](#user-stories) + - [Story 1](#story-1) + - [Implementation Details/Notes/Constraints](#implementation-detailsnotesconstraints) + - [Implementation History](#implementation-history) + +## Motivation + +Now, Most of the daemon set specs I have seen so far use hostpath volumes for storage. I would like to use local persistent volumes (= LPV) +instead of hostpath volumes. The problem is that you need a PVC to get a LPV. +Daemon sets do not allow you to dynamically create a PVC that claims a LPV by using a dedicated storage class like +stateful sets do (using volumeClaimTemplates). + +We hope to add volumeClaimTemplate to Advanced DaemonSet like stateful sets do. + +## Proposal + +add volumeClaimTemplate to Advanced DaemonSet like stateful sets do. + +### API Definition + +``` +// DaemonSetSpec defines the desired state of DaemonSet +type DaemonSetSpec struct { + // volumeClaimTemplates is a list of claims that pods are allowed to reference. + // The DaemonSet controller is responsible for mapping network identities to + // claims in a way that maintains the identity of a pod. Every claim in + // this list must have at least one matching (by name) volumeMount in one + // container in the template. A claim in this list takes precedence over + // any volumes in the template, with the same name. + // TODO: Define the behavior if a claim already exists with the same name. + // +optional + // +kubebuilder:pruning:PreserveUnknownFields + // +kubebuilder:validation:Schemaless + VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"` +} +``` + +``` +apiVersion: apps.kruise.io/v1alpha1 +kind: DaemonSet +metadata: + name: app-ds +spec: + selector: + matchLabels: + app: app-ds + template: + metadata: + labels: + app: app-ds + spec: + containers: + - name: nginx + image: nginx:latest + imagePullPolicy: IfNotPresent + volumeMounts: + - mountPath: /var/lib/nginx + name: nginx-data + volumeClaimTemplates: + - apiVersion: v1 + kind: PersistentVolumeClaim + metadata: + name: nginx-data + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 5Gi + volumeMode: Filesystem +``` + +### User Stories + +#### Story 1 + +there are many people are looking for a DaemonSet-like Workload type that supports defining a PVC per Pod. + +[Feature Request : volumeClaimTemplates available for Daemon Sets](https://github.com/kubernetes/kubernetes/issues/78902) + +[feature request: Add volumeClaimTemplate to advanced DaemonSet](https://github.com/openkruise/kruise/issues/2112) + +### Implementation Details/Notes/Constraints + +We should consider whether to delete the PVC when a node is deleted, and whether to delete the PVC when the ads is deleted. +We can add a `PersistentVolumeClaimRetentionPolicy`, with optional values of `Retain` and `Delete`, the default being `Retain`. + +## Implementation History + +- [ ] 07/22/2025: Proposal submission, implement VolumeClaimTemplates create diff --git a/pkg/controller/daemonset/daemonset_controller.go b/pkg/controller/daemonset/daemonset_controller.go index a3cbd27330..be63c7933a 100644 --- a/pkg/controller/daemonset/daemonset_controller.go +++ b/pkg/controller/daemonset/daemonset_controller.go @@ -41,6 +41,7 @@ import ( appslisters "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" + toolscache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/retry" @@ -171,6 +172,10 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { if err != nil { return nil, err } + pvcInformer, err := cacher.GetInformerForKind(context.TODO(), corev1.SchemeGroupVersion.WithKind("PersistentVolumeClaim")) + if err != nil { + return nil, err + } dsLister := kruiseappslisters.NewDaemonSetLister(dsInformer.(cache.SharedIndexInformer).GetIndexer()) historyLister := appslisters.NewControllerRevisionLister(revInformer.(cache.SharedIndexInformer).GetIndexer()) @@ -178,6 +183,7 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { nodeLister := corelisters.NewNodeLister(nodeInformer.(cache.SharedIndexInformer).GetIndexer()) failedPodsBackoff := flowcontrol.NewBackOff(1*time.Second, 15*time.Minute) revisionAdapter := revisionadapter.NewDefaultImpl() + pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcInformer.(toolscache.SharedIndexInformer).GetIndexer()) cli := utilclient.NewClientFromManager(mgr, "daemonset-controller") dsc := &ReconcileDaemonSet{ @@ -185,7 +191,11 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { kubeClient: genericClient.KubeClient, kruiseClient: genericClient.KruiseClient, eventRecorder: recorder, - podControl: kubecontroller.RealPodControl{KubeClient: genericClient.KubeClient, Recorder: recorder}, + podControl: &dsPodControl{ + recorder: recorder, + objectMgr: kruiseutil.NewRealObjectManager(genericClient.KubeClient, podLister, pvcLister, nil), + PodControlInterface: kubecontroller.RealPodControl{KubeClient: genericClient.KubeClient, Recorder: recorder}, + }, crControl: kubecontroller.RealControllerRevisionControl{ KubeClient: genericClient.KubeClient, }, @@ -270,7 +280,7 @@ type ReconcileDaemonSet struct { kubeClient clientset.Interface kruiseClient kruiseclientset.Interface eventRecorder record.EventRecorder - podControl kubecontroller.PodControlInterface + podControl *dsPodControl crControl kubecontroller.ControllerRevisionControlInterface lifecycleControl lifecycle.Interface @@ -755,7 +765,7 @@ func (dsc *ReconcileDaemonSet) syncNodes(ctx context.Context, ds *appsv1alpha1.D podTemplate.Spec.NodeName = nodesNeedingDaemonPods[ix] } - err = dsc.podControl.CreatePods(ctx, ds.Namespace, podTemplate, ds, metav1.NewControllerRef(ds, controllerKind)) + err = dsc.podControl.CreatePod(ctx, ds.Namespace, podTemplate, ds, metav1.NewControllerRef(ds, controllerKind), nodesNeedingDaemonPods[ix]) if err != nil { if errors.HasStatusCause(err, corev1.NamespaceTerminatingCause) { @@ -790,6 +800,7 @@ func (dsc *ReconcileDaemonSet) syncNodes(ctx context.Context, ds *appsv1alpha1.D for i := 0; i < deleteDiff; i++ { go func(ix int) { defer deleteWait.Done() + // TODO: delete pvc when persistentVolumeClaimRetentionPolicy is set to Delete if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil { dsc.expectations.DeletionObserved(logger, dsKey) if !errors.IsNotFound(err) { diff --git a/pkg/controller/daemonset/daemonset_controller_test.go b/pkg/controller/daemonset/daemonset_controller_test.go index 4f1acb5915..c23bd9f460 100644 --- a/pkg/controller/daemonset/daemonset_controller_test.go +++ b/pkg/controller/daemonset/daemonset_controller_test.go @@ -219,7 +219,10 @@ func newTestController(initialObjects ...runtime.Object) (*daemonSetsController, fakeRecorder := record.NewFakeRecorder(100) dsc.eventRecorder = fakeRecorder podControl := newFakePodControl() - dsc.podControl = podControl + dsControl := &dsPodControl{ + PodControlInterface: podControl, + } + dsc.podControl = dsControl podControl.podStore = informerFactory.Core().V1().Pods().Informer().GetStore() newDsc := &daemonSetsController{ @@ -253,7 +256,7 @@ func NewDaemonSetController( kubeClient: kubeClient, kruiseClient: kruiseClient, eventRecorder: recorder, - podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder}, + podControl: &dsPodControl{PodControlInterface: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder}}, crControl: controller.RealControllerRevisionControl{ KubeClient: kubeClient, }, diff --git a/pkg/controller/daemonset/daemonset_util.go b/pkg/controller/daemonset/daemonset_util.go index 420188f0d8..6c4b515048 100644 --- a/pkg/controller/daemonset/daemonset_util.go +++ b/pkg/controller/daemonset/daemonset_util.go @@ -17,8 +17,10 @@ limitations under the License. package daemonset import ( + "context" "fmt" "sort" + "strings" "sync" "time" @@ -31,11 +33,15 @@ import ( apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + errorutils "k8s.io/apimachinery/pkg/util/errors" intstrutil "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/tools/record" v1helper "k8s.io/component-helpers/scheduling/corev1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + kubecontroller "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/daemon/util" "k8s.io/utils/integer" ) @@ -46,6 +52,12 @@ var ( newPodForDSLock sync.Mutex ) +type dsPodControl struct { + recorder record.EventRecorder + objectMgr kruiseutil.ObjectManager + kubecontroller.PodControlInterface +} + type newPodForDS struct { generation int64 pod *corev1.Pod @@ -371,3 +383,103 @@ func podAvailableWaitingTime(pod *corev1.Pod, minReadySeconds int32, now time.Ti } return minReadySecondsDuration - now.Sub(c.LastTransitionTime.Time) } + +func getPersistentVolumeClaimName(ds *appsv1alpha1.DaemonSet, claim *corev1.PersistentVolumeClaim, nodeName string) string { + return fmt.Sprintf("%s-%s-%s", claim.Name, ds.Name, nodeName) +} + +func getPersistentVolumeClaims(ds *appsv1alpha1.DaemonSet, nodeName string) map[string]corev1.PersistentVolumeClaim { + templates := ds.Spec.VolumeClaimTemplates + claims := make(map[string]corev1.PersistentVolumeClaim, len(templates)) + for i := range templates { + claim := templates[i].DeepCopy() + claim.Name = getPersistentVolumeClaimName(ds, claim, nodeName) + claim.Namespace = ds.Namespace + if claim.Labels != nil { + for key, value := range ds.Spec.Selector.MatchLabels { + claim.Labels[key] = value + } + } else { + claim.Labels = ds.Spec.Selector.MatchLabels + } + claims[templates[i].Name] = *claim + } + return claims +} + +func updateStorage(ds *appsv1alpha1.DaemonSet, template *corev1.PodTemplateSpec, nodeName string) { + currentVolumes := template.Spec.Volumes + claims := getPersistentVolumeClaims(ds, nodeName) + newVolumes := make([]corev1.Volume, 0, len(claims)) + for name, claim := range claims { + newVolumes = append(newVolumes, corev1.Volume{ + Name: name, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: claim.Name, + ReadOnly: false, + }, + }, + }) + } + for i := range currentVolumes { + if _, ok := claims[currentVolumes[i].Name]; !ok { + newVolumes = append(newVolumes, currentVolumes[i]) + } + } + template.Spec.Volumes = newVolumes +} + +// recordClaimEvent records an event for verb applied to the PersistentVolumeClaim of a Pod in a Daemonset. If err is +// nil the generated event will have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a +// reason of v1.EventTypeWarning. +func (dsc *dsPodControl) recordClaimEvent(verb string, ds *appsv1alpha1.DaemonSet, nodeName string, claim *corev1.PersistentVolumeClaim, err error) { + if err == nil { + reason := fmt.Sprintf("Successful%s", strings.Title(verb)) + message := fmt.Sprintf("%s Daemonset %s Claim %s in Node %s success", + strings.ToLower(verb), ds.Name, claim.Name, nodeName) + dsc.recorder.Event(ds, corev1.EventTypeNormal, reason, message) + } else { + reason := fmt.Sprintf("Failed%s", strings.Title(verb)) + message := fmt.Sprintf("%s Claim %s for Daemonset %s in Node %s failed error: %s", + strings.ToLower(verb), claim.Name, ds.Name, nodeName, err) + dsc.recorder.Event(ds, corev1.EventTypeWarning, reason, message) + } +} + +func (dsc *dsPodControl) createPersistentVolumeClaims(ds *appsv1alpha1.DaemonSet, nodeName string) error { + var errs []error + for _, claim := range getPersistentVolumeClaims(ds, nodeName) { + pvc, err := dsc.objectMgr.GetClaim(claim.Namespace, claim.Name) + switch { + case apierrors.IsNotFound(err): + err := dsc.objectMgr.CreateClaim(&claim) + if err != nil { + errs = append(errs, fmt.Errorf("failed to create PVC %s: %s", claim.Name, err)) + } + if err == nil || !apierrors.IsAlreadyExists(err) { + dsc.recordClaimEvent("Create", ds, nodeName, &claim, err) + } + case err != nil: + errs = append(errs, fmt.Errorf("failed to retrieve PVC %s: %s", claim.Name, err)) + dsc.recordClaimEvent("Create", ds, nodeName, &claim, err) + default: + if pvc.DeletionTimestamp != nil { + errs = append(errs, fmt.Errorf("pvc %s is to be deleted", claim.Name)) + } + } + } + return errorutils.NewAggregate(errs) +} + +// CreatePod creates a pod from a template. and create pvc if needed. +func (dsc *dsPodControl) CreatePod(ctx context.Context, namespace string, template *corev1.PodTemplateSpec, + ds *appsv1alpha1.DaemonSet, controllerRef *metav1.OwnerReference, nodeName string) error { + tmpl := template.DeepCopy() + if err := dsc.createPersistentVolumeClaims(ds, nodeName); err != nil { + return err + } + + updateStorage(ds, tmpl, nodeName) + return dsc.CreatePods(ctx, namespace, tmpl, ds, controllerRef) +} diff --git a/pkg/controller/statefulset/stateful_pod_control.go b/pkg/controller/statefulset/stateful_pod_control.go index dbe85f1d77..73cd174c9a 100644 --- a/pkg/controller/statefulset/stateful_pod_control.go +++ b/pkg/controller/statefulset/stateful_pod_control.go @@ -26,9 +26,7 @@ import ( "golang.org/x/text/language" v1 "k8s.io/api/core/v1" - storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" errorutils "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientset "k8s.io/client-go/kubernetes" @@ -40,26 +38,14 @@ import ( appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" "github.com/openkruise/kruise/pkg/features" + "github.com/openkruise/kruise/pkg/util" utilfeature "github.com/openkruise/kruise/pkg/util/feature" ) -// StatefulPodControlObjectManager abstracts the manipulation of Pods and PVCs. The real controller implements this -// with a clientset for writes and listers for reads; for tests we provide stubs. -type StatefulPodControlObjectManager interface { - CreatePod(ctx context.Context, pod *v1.Pod) error - GetPod(namespace, podName string) (*v1.Pod, error) - UpdatePod(pod *v1.Pod) error - DeletePod(pod *v1.Pod) error - CreateClaim(claim *v1.PersistentVolumeClaim) error - GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error) - UpdateClaim(claim *v1.PersistentVolumeClaim) error - GetStorageClass(scName string) (*storagev1.StorageClass, error) -} - // StatefulPodControl defines the interface that StatefulSetController uses to create, update, and delete Pods, // Manipulation of objects is provided through objectMgr, which allows the k8s API to be mocked out for testing. type StatefulPodControl struct { - objectMgr StatefulPodControlObjectManager + objectMgr util.ObjectManager recorder record.EventRecorder } @@ -72,58 +58,14 @@ func NewStatefulPodControl( scLister storagelisters.StorageClassLister, recorder record.EventRecorder, ) *StatefulPodControl { - return &StatefulPodControl{&realStatefulPodControlObjectManager{client, podLister, claimLister, scLister}, recorder} + return &StatefulPodControl{util.NewRealObjectManager(client, podLister, claimLister, scLister), recorder} } // NewStatefulPodControlFromManager creates a StatefulPodControl using the given StatefulPodControlObjectManager and recorder. -func NewStatefulPodControlFromManager(om StatefulPodControlObjectManager, recorder record.EventRecorder) *StatefulPodControl { +func NewStatefulPodControlFromManager(om util.ObjectManager, recorder record.EventRecorder) *StatefulPodControl { return &StatefulPodControl{om, recorder} } -// realStatefulPodControlObjectManager uses a clientset.Interface and listers. -type realStatefulPodControlObjectManager struct { - client clientset.Interface - podLister corelisters.PodLister - claimLister corelisters.PersistentVolumeClaimLister - scLister storagelisters.StorageClassLister -} - -func (om *realStatefulPodControlObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error { - _, err := om.client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) - return err -} - -func (om *realStatefulPodControlObjectManager) GetPod(namespace, podName string) (*v1.Pod, error) { - return om.podLister.Pods(namespace).Get(podName) -} - -func (om *realStatefulPodControlObjectManager) UpdatePod(pod *v1.Pod) error { - _, err := om.client.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) - return err -} - -func (om *realStatefulPodControlObjectManager) DeletePod(pod *v1.Pod) error { - return om.client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) -} - -func (om *realStatefulPodControlObjectManager) CreateClaim(claim *v1.PersistentVolumeClaim) error { - _, err := om.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Create(context.TODO(), claim, metav1.CreateOptions{}) - return err -} - -func (om *realStatefulPodControlObjectManager) GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error) { - return om.claimLister.PersistentVolumeClaims(namespace).Get(claimName) -} - -func (om *realStatefulPodControlObjectManager) UpdateClaim(claim *v1.PersistentVolumeClaim) error { - _, err := om.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(context.TODO(), claim, metav1.UpdateOptions{}) - return err -} - -func (om *realStatefulPodControlObjectManager) GetStorageClass(scName string) (*storagev1.StorageClass, error) { - return om.scLister.Get(scName) -} - func (spc *StatefulPodControl) CreateStatefulPod(ctx context.Context, set *appsv1beta1.StatefulSet, pod *v1.Pod) error { // Create the Pod's PVCs prior to creating the Pod if err := spc.createPersistentVolumeClaims(set, pod); err != nil { diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 604ec730a9..2cc73066a4 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -4090,7 +4090,7 @@ func (om *fakeObjectManager) setPodSpecifiedDelete(set *appsv1beta1.StatefulSet, return om.podsLister.Pods(set.Namespace).List(selector) } -var _ StatefulPodControlObjectManager = &fakeObjectManager{} +var _ util.ObjectManager = &fakeObjectManager{} type fakeStatefulSetStatusUpdater struct { setsLister kruiseappslisters.StatefulSetLister diff --git a/pkg/util/object_manager.go b/pkg/util/object_manager.go new file mode 100644 index 0000000000..6599f3da52 --- /dev/null +++ b/pkg/util/object_manager.go @@ -0,0 +1,78 @@ +package util + +import ( + "context" + + v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + storagelisters "k8s.io/client-go/listers/storage/v1" +) + +// ObjectManager abstracts the manipulation of Pods and PVCs. The real controller implements this +// with a clientset for writes and listers for reads; for tests we provide stubs. +type ObjectManager interface { + CreatePod(ctx context.Context, pod *v1.Pod) error + GetPod(namespace, podName string) (*v1.Pod, error) + UpdatePod(pod *v1.Pod) error + DeletePod(pod *v1.Pod) error + CreateClaim(claim *v1.PersistentVolumeClaim) error + GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error) + UpdateClaim(claim *v1.PersistentVolumeClaim) error + GetStorageClass(scName string) (*storagev1.StorageClass, error) +} + +// RealObjectManager uses a clientset.Interface and listers. +type RealObjectManager struct { + client clientset.Interface + podLister corelisters.PodLister + claimLister corelisters.PersistentVolumeClaimLister + scLister storagelisters.StorageClassLister +} + +func NewRealObjectManager(client clientset.Interface, podLister corelisters.PodLister, claimLister corelisters.PersistentVolumeClaimLister, scLister storagelisters.StorageClassLister) *RealObjectManager { + return &RealObjectManager{ + client: client, + podLister: podLister, + claimLister: claimLister, + scLister: scLister, + } +} + +func (om *RealObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error { + _, err := om.client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + return err +} + +func (om *RealObjectManager) GetPod(namespace, podName string) (*v1.Pod, error) { + return om.podLister.Pods(namespace).Get(podName) +} + +func (om *RealObjectManager) UpdatePod(pod *v1.Pod) error { + _, err := om.client.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) + return err +} + +func (om *RealObjectManager) DeletePod(pod *v1.Pod) error { + return om.client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) +} + +func (om *RealObjectManager) CreateClaim(claim *v1.PersistentVolumeClaim) error { + _, err := om.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Create(context.TODO(), claim, metav1.CreateOptions{}) + return err +} + +func (om *RealObjectManager) GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error) { + return om.claimLister.PersistentVolumeClaims(namespace).Get(claimName) +} + +func (om *RealObjectManager) UpdateClaim(claim *v1.PersistentVolumeClaim) error { + _, err := om.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(context.TODO(), claim, metav1.UpdateOptions{}) + return err +} + +func (om *RealObjectManager) GetStorageClass(scName string) (*storagev1.StorageClass, error) { + return om.scLister.Get(scName) +} diff --git a/pkg/webhook/daemonset/validating/daemonset_validation.go b/pkg/webhook/daemonset/validating/daemonset_validation.go index 0ceab7ebbc..c0b7c00e31 100644 --- a/pkg/webhook/daemonset/validating/daemonset_validation.go +++ b/pkg/webhook/daemonset/validating/daemonset_validation.go @@ -14,6 +14,7 @@ import ( "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/validation/field" appsvalidation "k8s.io/kubernetes/pkg/apis/apps/validation" + "k8s.io/kubernetes/pkg/apis/core" corevalidation "k8s.io/kubernetes/pkg/apis/core/validation" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" @@ -35,6 +36,22 @@ func validateDaemonSet(ds *appsv1alpha1.DaemonSet) field.ErrorList { return allErrs } +func volumesToAddForTemplates(spec *appsv1alpha1.DaemonSetSpec) map[string]core.Volume { + volumes := make(map[string]core.Volume) + templates := spec.VolumeClaimTemplates + for i := range templates { + volumes[templates[i].Name] = core.Volume{ + Name: templates[i].Name, + VolumeSource: core.VolumeSource{ + PersistentVolumeClaim: &core.PersistentVolumeClaimVolumeSource{ + ClaimName: templates[i].Name, + }, + }, + } + } + return volumes +} + // ValidateDaemonSetSpec tests if required fields in the DaemonSetSpec are set. func validateDaemonSetSpec(spec *appsv1alpha1.DaemonSetSpec, fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} @@ -54,7 +71,24 @@ func validateDaemonSetSpec(spec *appsv1alpha1.DaemonSetSpec, fldPath *field.Path allErrs = append(allErrs, field.Invalid(fldPath.Root(), spec.Template, fmt.Sprintf("Convert_v1_PodTemplateSpec_To_core_PodTemplateSpec failed: %v", err))) return allErrs } - allErrs = append(allErrs, corevalidation.ValidatePodTemplateSpec(coreTemplate, fldPath.Child("template"), webhookutil.DefaultPodValidationOptions)...) + templateToValidate := coreTemplate + if len(spec.VolumeClaimTemplates) > 0 { + templateToValidate = templateToValidate.DeepCopy() + if len(spec.VolumeClaimTemplates) > 0 { + templateVolumes := volumesToAddForTemplates(spec) + newVolumes := make([]core.Volume, 0, len(templateVolumes)) + for _, v := range templateVolumes { + newVolumes = append(newVolumes, v) + } + for _, v := range templateToValidate.Spec.Volumes { + if _, ok := templateVolumes[v.Name]; !ok { + newVolumes = append(newVolumes, v) + } + } + templateToValidate.Spec.Volumes = newVolumes + } + } + allErrs = append(allErrs, corevalidation.ValidatePodTemplateSpec(templateToValidate, fldPath.Child("template"), webhookutil.DefaultPodValidationOptions)...) // RestartPolicy has already been first-order validated as per ValidatePodTemplateSpec(). if spec.Template.Spec.RestartPolicy != corev1.RestartPolicyAlways { diff --git a/pkg/webhook/daemonset/validating/daemonset_validation_test.go b/pkg/webhook/daemonset/validating/daemonset_validation_test.go index 63d79fed06..09c10942ac 100644 --- a/pkg/webhook/daemonset/validating/daemonset_validation_test.go +++ b/pkg/webhook/daemonset/validating/daemonset_validation_test.go @@ -31,7 +31,25 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" ) -func newDaemonset(name string) *appsv1alpha1.DaemonSet { +var podTemplateWithVolumeMount = corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "key1": "value1", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "a", + Image: "b", + VolumeMounts: []corev1.VolumeMount{{ + Name: "volume", + MountPath: "/data", + }}, + }}, + }, +} + +func newDaemonSet(name string) *appsv1alpha1.DaemonSet { ds := &appsv1alpha1.DaemonSet{} ds.Name = name ds.Namespace = metav1.NamespaceDefault @@ -48,7 +66,7 @@ func TestValidateDaemonSet(t *testing.T) { { "selector not match", func() *appsv1alpha1.DaemonSet { - ds := newDaemonset("ds1") + ds := newDaemonSet("ds1") ds.Spec.Selector = &metav1.LabelSelector{ MatchLabels: map[string]string{ "key1": "value1", @@ -70,7 +88,7 @@ func TestValidateDaemonSet(t *testing.T) { "selector match", func() *appsv1alpha1.DaemonSet { maxUnavailable := intstr.FromInt(1) - ds := newDaemonset("ds1") + ds := newDaemonSet("ds1") ds.Spec.Selector = &metav1.LabelSelector{ MatchLabels: map[string]string{ "key1": "value1", @@ -96,6 +114,47 @@ func TestValidateDaemonSet(t *testing.T) { }(), true, }, + { + Title: "pod template volumeMounts with volumeClaimTemplates", + Ds: func() *appsv1alpha1.DaemonSet { + ds := newDaemonSet("ds1") + ds.Spec.Selector = &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "key1": "value1", + }, + } + ds.Spec.VolumeClaimTemplates = []corev1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "volume", + }, + }, + } + ds.Spec.Template = podTemplateWithVolumeMount + return ds + }(), + }, + { + Title: "pod template volumeMounts with volumeClaimTemplates no match", + Ds: func() *appsv1alpha1.DaemonSet { + ds := newDaemonSet("ds1") + ds.Spec.Selector = &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "key1": "value1", + }, + } + ds.Spec.VolumeClaimTemplates = []corev1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "volume1", + }, + }, + } + ds.Spec.Template = podTemplateWithVolumeMount + return ds + }(), + ExpectAllowResult: false, + }, } { result, _, err := validatingDaemonSetFn(context.TODO(), c.Ds) if !reflect.DeepEqual(c.ExpectAllowResult, result) { @@ -124,6 +183,13 @@ func TestValidateDaemonSetUpdate(t *testing.T) { }, }, } + podTemplateWithVolumeMounts := podTemplateWithVolumeMount.DeepCopy() + podTemplateWithVolumeMounts.Spec.Containers[0].VolumeMounts = append(podTemplateWithVolumeMounts.Spec.Containers[0].VolumeMounts, + corev1.VolumeMount{ + Name: "volume1", + MountPath: "/data1", + }, + ) intOrStr1 := intstr.FromInt(1) intOrStr2 := intstr.FromInt(2) successCases := []testCase{ @@ -175,6 +241,58 @@ func TestValidateDaemonSetUpdate(t *testing.T) { }, }, }, + { + spec: &appsv1alpha1.DaemonSetSpec{ + Template: *podTemplateWithVolumeMounts, + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{ + "key1": "value1", + }}, + BurstReplicas: &intOrStr1, + UpdateStrategy: appsv1alpha1.DaemonSetUpdateStrategy{ + Type: appsv1alpha1.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &appsv1alpha1.RollingUpdateDaemonSet{ + MaxUnavailable: &intOrStr1, + }, + }, + VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "volume", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "volume1", + }, + }, + }, + }, + oldSpec: &appsv1alpha1.DaemonSetSpec{ + Template: podTemplateWithVolumeMount, + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{ + "key1": "value1", + }}, + BurstReplicas: &intOrStr2, + UpdateStrategy: appsv1alpha1.DaemonSetUpdateStrategy{ + Type: appsv1alpha1.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &appsv1alpha1.RollingUpdateDaemonSet{ + MaxUnavailable: &intOrStr1, + }, + }, + VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "volume", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "volume1", + }, + }, + }, + }, + }, } uid := uuid.NewUUID() @@ -223,6 +341,49 @@ func TestValidateDaemonSetUpdate(t *testing.T) { }, }, }, + // missing volume1 volume mount + { + spec: &appsv1alpha1.DaemonSetSpec{ + Template: *podTemplateWithVolumeMounts, + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{ + "key1": "value1", + }}, + BurstReplicas: &intOrStr1, + UpdateStrategy: appsv1alpha1.DaemonSetUpdateStrategy{ + Type: appsv1alpha1.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &appsv1alpha1.RollingUpdateDaemonSet{ + MaxUnavailable: &intOrStr1, + }, + }, + VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "volume", + }, + }, + }, + }, + oldSpec: &appsv1alpha1.DaemonSetSpec{ + Template: podTemplateWithVolumeMount, + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{ + "key1": "value1", + }}, + BurstReplicas: &intOrStr2, + UpdateStrategy: appsv1alpha1.DaemonSetUpdateStrategy{ + Type: appsv1alpha1.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &appsv1alpha1.RollingUpdateDaemonSet{ + MaxUnavailable: &intOrStr1, + }, + }, + VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "volume", + }, + }, + }, + }, + }, } for i, successCase := range errorCases { obj := &appsv1alpha1.DaemonSet{ diff --git a/test/e2e/apps/daemonset.go b/test/e2e/apps/daemonset.go index 93fc0e7d63..5d19aaa965 100644 --- a/test/e2e/apps/daemonset.go +++ b/test/e2e/apps/daemonset.go @@ -47,8 +47,23 @@ var _ = SIGDescribe("DaemonSet", func() { if ginkgo.CurrentGinkgoTestDescription().Failed { framework.DumpDebugInfo(c, ns) } + ds, err := tester.GetDaemonSet(dsName) + if err != nil { + if !errors.IsNotFound(err) { + framework.Logf("Failed to get DaemonSet %s/%s: %v", ns, dsName, err) + } + return + } + nodes := tester.SchedulableNodes(ds) framework.Logf("Deleting DaemonSet %s/%s in cluster", ns, dsName) tester.DeleteDaemonSet(ns, dsName) + + pvcs, err := tester.ListDaemonPvcs(ds.Spec.Selector.MatchLabels) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(len(pvcs.Items)).Should(gomega.Equal(len(nodes))) + framework.Logf("Deleting PVCs in cluster") + err = tester.DeleteDaemonPvcs(ds.Spec.Selector.MatchLabels) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) }) /* diff --git a/test/e2e/framework/daemonset_util.go b/test/e2e/framework/daemonset_util.go index 0361e999c7..bce7fd1088 100644 --- a/test/e2e/framework/daemonset_util.go +++ b/test/e2e/framework/daemonset_util.go @@ -14,6 +14,7 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" @@ -73,7 +74,13 @@ func (t *DaemonSetTester) NewDaemonSet(name string, label map[string]string, ima { Name: "busybox", Image: image, - Command: []string{"/bin/sh", "-c", "sleep 10000000"}, + Command: []string{"/bin/sh", "-c", "echo 'data' > /data/data1 && sleep 10000000"}, + VolumeMounts: []v1.VolumeMount{ + { + Name: "data", + MountPath: "/data", + }, + }, }, }, HostNetwork: true, @@ -81,6 +88,23 @@ func (t *DaemonSetTester) NewDaemonSet(name string, label map[string]string, ima TerminationGracePeriodSeconds: utilpointer.Int64(3), }, }, + VolumeClaimTemplates: []v1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "data", + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{ + v1.ReadWriteOnce, + }, + Resources: v1.VolumeResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: *resource.NewQuantity(1*2^20, resource.BinarySI), + }, + }, + }, + }, + }, UpdateStrategy: updateStrategy, }, } @@ -372,6 +396,18 @@ func (t *DaemonSetTester) ListDaemonPods(label map[string]string) (*v1.PodList, return t.c.CoreV1().Pods(t.ns).List(context.TODO(), options) } +func (t *DaemonSetTester) ListDaemonPvcs(label map[string]string) (*v1.PersistentVolumeClaimList, error) { + selector := labels.Set(label).AsSelector() + options := metav1.ListOptions{LabelSelector: selector.String()} + return t.c.CoreV1().PersistentVolumeClaims(t.ns).List(context.TODO(), options) +} + +func (t *DaemonSetTester) DeleteDaemonPvcs(label map[string]string) error { + selector := labels.Set(label).AsSelector() + options := metav1.ListOptions{LabelSelector: selector.String()} + return t.c.CoreV1().PersistentVolumeClaims(t.ns).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, options) +} + func (t *DaemonSetTester) DaemonPodHasReadinessGate(pods []v1.Pod) bool { for _, pod := range pods { if !daemonset.ContainsReadinessGate(&pod) {