@@ -25,8 +25,9 @@ import (
2525 "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/utils"
2626 "github.com/go-logr/logr"
2727 "github.com/prometheus/client_golang/prometheus"
28+ "golang.org/x/sync/semaphore"
2829 v1 "k8s.io/api/core/v1"
29- "k8s.io/apimachinery/pkg/api/errors"
30+ apierrors "k8s.io/apimachinery/pkg/api/errors"
3031 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3132 "k8s.io/apimachinery/pkg/runtime"
3233 "k8s.io/apimachinery/pkg/types"
@@ -35,6 +36,7 @@ import (
3536 ctrl "sigs.k8s.io/controller-runtime"
3637 "sigs.k8s.io/controller-runtime/pkg/client"
3738 "sigs.k8s.io/controller-runtime/pkg/controller"
39+ "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
3840 "sigs.k8s.io/controller-runtime/pkg/metrics"
3941)
4042
@@ -75,7 +77,8 @@ type CNINodeReconciler struct {
7577 clusterName string
7678 vpcId string
7779 finalizerManager k8s.FinalizerManager
78- newResourceCleaner func (nodeID string , eC2Wrapper ec2API.EC2Wrapper , vpcID string ) cleanup.ResourceCleaner
80+ deletePool * semaphore.Weighted
81+ newResourceCleaner func (nodeID string , eC2Wrapper ec2API.EC2Wrapper , vpcID string , log logr.Logger ) cleanup.ResourceCleaner
7982}
8083
8184func NewCNINodeReconciler (
@@ -88,7 +91,8 @@ func NewCNINodeReconciler(
8891 clusterName string ,
8992 vpcId string ,
9093 finalizerManager k8s.FinalizerManager ,
91- newResourceCleaner func (nodeID string , eC2Wrapper ec2API.EC2Wrapper , vpcID string ) cleanup.ResourceCleaner ,
94+ maxConcurrentWorkers int ,
95+ newResourceCleaner func (nodeID string , eC2Wrapper ec2API.EC2Wrapper , vpcID string , log logr.Logger ) cleanup.ResourceCleaner ,
9296) * CNINodeReconciler {
9397 return & CNINodeReconciler {
9498 Client : client ,
@@ -100,6 +104,7 @@ func NewCNINodeReconciler(
100104 clusterName : clusterName ,
101105 vpcId : vpcId ,
102106 finalizerManager : finalizerManager ,
107+ deletePool : semaphore .NewWeighted (int64 (maxConcurrentWorkers )),
103108 newResourceCleaner : newResourceCleaner ,
104109 }
105110}
@@ -118,7 +123,7 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
118123 nodeFound := true
119124 node := & v1.Node {}
120125 if err := r .Client .Get (ctx , req .NamespacedName , node ); err != nil {
121- if errors .IsNotFound (err ) {
126+ if apierrors .IsNotFound (err ) {
122127 nodeFound = false
123128 } else {
124129 r .log .Error (err , "failed to get the node object in CNINode reconciliation, will retry" )
@@ -128,66 +133,50 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
128133 }
129134
130135 if cniNode .GetDeletionTimestamp ().IsZero () {
131- shouldPatch := false
132136 cniNodeCopy := cniNode .DeepCopy ()
133- // Add cluster name tag if it does not exist
134- val , ok := cniNode .Spec .Tags [config .VPCCNIClusterNameKey ]
135- if ! ok || val != r .clusterName {
136- if len (cniNodeCopy .Spec .Tags ) != 0 {
137- cniNodeCopy .Spec .Tags [config .VPCCNIClusterNameKey ] = r .clusterName
138- } else {
139- cniNodeCopy .Spec .Tags = map [string ]string {
140- config .VPCCNIClusterNameKey : r .clusterName ,
141- }
142- }
143- shouldPatch = true
144- }
145- // if node exists, get & add OS label if it does not exist on CNINode
146- if nodeFound {
147- nodeLabelOS := node .ObjectMeta .Labels [config .NodeLabelOS ]
148- val , ok = cniNode .ObjectMeta .Labels [config .NodeLabelOS ]
149- if ! ok || val != nodeLabelOS {
150- if len (cniNodeCopy .ObjectMeta .Labels ) != 0 {
151- cniNodeCopy .ObjectMeta .Labels [config .NodeLabelOS ] = nodeLabelOS
152- } else {
153- cniNodeCopy .ObjectMeta .Labels = map [string ]string {
154- config .NodeLabelOS : nodeLabelOS ,
155- }
156- }
157- shouldPatch = true
158- }
159- }
137+ shouldPatch , err := r .ensureTagsAndLabels (cniNodeCopy , node )
138+ shouldPatch = controllerutil .AddFinalizer (cniNodeCopy , config .NodeTerminationFinalizer ) || shouldPatch
160139
161140 if shouldPatch {
162- r .log .Info ("patching CNINode to add required fields Tags and Labels " , "cninode" , cniNode .Name )
163- return ctrl. Result {}, r .Client .Patch (ctx , cniNodeCopy , client .MergeFromWithOptions (cniNode , client.MergeFromWithOptimisticLock {}))
164- }
165-
166- // Add finalizer if it does not exist
167- if err := r . finalizerManager . AddFinalizers ( ctx , cniNode , config . NodeTerminationFinalizer ); err != nil {
168- r . log . Error ( err , "failed to add finalizer on CNINode, will retry" , "cniNode" , cniNode . Name , "finalizer" , config . NodeTerminationFinalizer )
169- return ctrl. Result {}, err
141+ r .log .Info ("patching CNINode to add fields Tags, Labels and finalizer " , "cninode" , cniNode .Name )
142+ if err := r .Client .Patch (ctx , cniNodeCopy , client .MergeFromWithOptions (cniNode , client.MergeFromWithOptimisticLock {})); err != nil {
143+ if apierrors . IsConflict ( err ) {
144+ r . log . Info ( "failed to update cninode" , "cninode" , cniNode . Name , "error" , err )
145+ return ctrl. Result { Requeue : true }, nil
146+ }
147+ return ctrl. Result {}, err
148+ }
170149 }
171- return ctrl.Result {}, nil
172-
150+ return ctrl.Result {}, err
173151 } else { // CNINode is marked for deletion
174152 if ! nodeFound {
175153 // node is also deleted, proceed with running the cleanup routine and remove the finalizer
176-
177154 // run cleanup for Linux nodes only
178155 if val , ok := cniNode .ObjectMeta .Labels [config .NodeLabelOS ]; ok && val == config .OSLinux {
179156 r .log .Info ("running the finalizer routine on cniNode" , "cniNode" , cniNode .Name )
180157 // run cleanup when node id is present
181158 if nodeID , ok := cniNode .Spec .Tags [config .NetworkInterfaceNodeIDKey ]; ok && nodeID != "" {
182- if err := r . newResourceCleaner ( nodeID , r . eC2Wrapper , r . vpcId ). DeleteLeakedResources (); err != nil {
183- r .log .Error ( err , "failed to cleanup resources during node termination " )
184- ec2API . NodeTerminationENICleanupFailure . Inc ()
159+ if ! r . deletePool . TryAcquire ( 1 ) {
160+ r .log .Info ( "d, will requeue request " )
161+ return ctrl. Result { RequeueAfter : 10 * time . Second }, nil
185162 }
163+ go func (nodeID string ) {
164+ defer r .deletePool .Release (1 )
165+ childCtx , cancel := context .WithTimeout (ctx , config .NodeTerminationTimeout )
166+ defer cancel ()
167+ if err := r .newResourceCleaner (nodeID , r .eC2Wrapper , r .vpcId , r .log ).DeleteLeakedResources (childCtx ); err != nil {
168+ r .log .Error (err , "failed to cleanup resources during node termination" )
169+ ec2API .NodeTerminationENICleanupFailure .Inc ()
170+ }
171+ }(nodeID )
186172 }
187173 }
188174
189- if err := r .finalizerManager . RemoveFinalizers (ctx , cniNode , config .NodeTerminationFinalizer ); err != nil {
175+ if err := r .removeFinalizer (ctx , cniNode , config .NodeTerminationFinalizer ); err != nil {
190176 r .log .Error (err , "failed to remove finalizer on CNINode, will retry" , "cniNode" , cniNode .Name , "finalizer" , config .NodeTerminationFinalizer )
177+ if apierrors .IsConflict (err ) {
178+ return ctrl.Result {Requeue : true }, nil
179+ }
191180 return ctrl.Result {}, err
192181 }
193182 return ctrl.Result {}, nil
@@ -207,7 +196,7 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
207196 Spec : cniNode .Spec ,
208197 }
209198
210- if err := r .finalizerManager . RemoveFinalizers (ctx , cniNode , config .NodeTerminationFinalizer ); err != nil {
199+ if err := r .removeFinalizer (ctx , cniNode , config .NodeTerminationFinalizer ); err != nil {
211200 r .log .Error (err , "failed to remove finalizer on CNINode, will retry" )
212201 return ctrl.Result {}, err
213202 }
@@ -252,7 +241,7 @@ func (r *CNINodeReconciler) waitTillCNINodeDeleted(nameSpacedCNINode types.Names
252241 oldCNINode := & v1alpha1.CNINode {}
253242
254243 return wait .PollUntilContextTimeout (context .TODO (), 500 * time .Millisecond , time .Second * 3 , true , func (ctx context.Context ) (bool , error ) {
255- if err := r .Client .Get (ctx , nameSpacedCNINode , oldCNINode ); err != nil && errors .IsNotFound (err ) {
244+ if err := r .Client .Get (ctx , nameSpacedCNINode , oldCNINode ); err != nil && apierrors .IsNotFound (err ) {
256245 return true , nil
257246 }
258247 return false , nil
@@ -266,3 +255,45 @@ func (r *CNINodeReconciler) createCNINodeFromObj(ctx context.Context, newCNINode
266255 return r .Client .Create (ctx , newCNINode )
267256 })
268257}
258+
259+ func (r * CNINodeReconciler ) ensureTagsAndLabels (cniNode * v1alpha1.CNINode , node * v1.Node ) (bool , error ) {
260+ shouldPatch := false
261+ var err error
262+ if cniNode .Spec .Tags == nil {
263+ cniNode .Spec .Tags = make (map [string ]string )
264+ }
265+ // add cluster name tag if it does not exist
266+ if cniNode .Spec .Tags [config .VPCCNIClusterNameKey ] != r .clusterName {
267+ cniNode .Spec .Tags [config .VPCCNIClusterNameKey ] = r .clusterName
268+ shouldPatch = true
269+ }
270+ if node != nil {
271+ var nodeID string
272+ nodeID , err = utils .GetNodeID (node )
273+
274+ if nodeID != "" && cniNode .Spec .Tags [config .NetworkInterfaceNodeIDKey ] != nodeID {
275+ cniNode .Spec .Tags [config .NetworkInterfaceNodeIDKey ] = nodeID
276+ shouldPatch = true
277+ }
278+
279+ // add node label if it does not exist
280+ if cniNode .ObjectMeta .Labels == nil {
281+ cniNode .ObjectMeta .Labels = make (map [string ]string )
282+ }
283+ if cniNode .ObjectMeta .Labels [config .NodeLabelOS ] != node .ObjectMeta .Labels [config .NodeLabelOS ] {
284+ cniNode .ObjectMeta .Labels [config .NodeLabelOS ] = node .ObjectMeta .Labels [config .NodeLabelOS ]
285+ shouldPatch = true
286+ }
287+ }
288+ return shouldPatch , err
289+ }
290+
291+ func (r * CNINodeReconciler ) removeFinalizer (ctx context.Context , cniNode * v1alpha1.CNINode , finalizer string ) error {
292+ cniNodeCopy := cniNode .DeepCopy ()
293+
294+ if controllerutil .RemoveFinalizer (cniNodeCopy , finalizer ) {
295+ r .log .Info ("removing finalizer for cninode" , "name" , cniNode .GetName (), "finalizer" , finalizer )
296+ return r .Client .Patch (ctx , cniNodeCopy , client .MergeFromWithOptions (cniNode , client.MergeFromWithOptimisticLock {}))
297+ }
298+ return nil
299+ }
0 commit comments