Skip to content

Commit bc2eaa3

Browse files
committed
decouple cninode resource cleanup from deletion
1 parent 889809a commit bc2eaa3

File tree

4 files changed

+115
-40
lines changed

4 files changed

+115
-40
lines changed

config/controller/controller.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ spec:
3535
- --metrics-bind-address=:8443
3636
- --introspect-bind-addr=:22775
3737
- --vpc-id=VPC_ID
38+
- --aws-region=AWS_REGION
3839
image: controller:latest
3940
name: controller
4041
resources:

controllers/crds/cninode_controller.go

Lines changed: 89 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,33 @@ var (
5454
Help: "The number of requests that failed when controller tried to recreate the CNINode",
5555
},
5656
)
57+
cninodeOperationLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
58+
Name: "cninode_operation_latency",
59+
Help: "The latency of CNINode operation",
60+
Buckets: prometheus.DefBuckets,
61+
}, []string{"operation"})
62+
)
63+
64+
type CleanupTask struct {
65+
cniNode *v1alpha1.CNINode
66+
retryAfter time.Duration
67+
hasRetried int
68+
}
69+
70+
const (
71+
cleanupTaskRetryFactor = 2
72+
cleanupTaskMaxRetry = 5
73+
initalRetryDelay = 20
5774
)
5875

5976
func prometheusRegister() {
6077
prometheusRegistered = true
6178

6279
metrics.Registry.MustRegister(
6380
recreateCNINodeCallCount,
64-
recreateCNINodeErrCount)
81+
recreateCNINodeErrCount,
82+
cninodeOperationLatency,
83+
)
6584

6685
prometheusRegistered = true
6786
}
@@ -79,6 +98,7 @@ type CNINodeReconciler struct {
7998
finalizerManager k8s.FinalizerManager
8099
deletePool *semaphore.Weighted
81100
newResourceCleaner func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner
101+
cleanupChan chan any
82102
}
83103

84104
func NewCNINodeReconciler(
@@ -106,6 +126,8 @@ func NewCNINodeReconciler(
106126
finalizerManager: finalizerManager,
107127
deletePool: semaphore.NewWeighted(int64(maxConcurrentWorkers)),
108128
newResourceCleaner: newResourceCleaner,
129+
// TODO: considering use 150% workers to high throughput
130+
cleanupChan: make(chan any, maxConcurrentWorkers),
109131
}
110132
}
111133

@@ -134,10 +156,11 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
134156

135157
if cniNode.GetDeletionTimestamp().IsZero() {
136158
cniNodeCopy := cniNode.DeepCopy()
137-
shouldPatch, err := r.ensureTagsAndLabels(cniNodeCopy, node)
138-
shouldPatch = controllerutil.AddFinalizer(cniNodeCopy, config.NodeTerminationFinalizer) || shouldPatch
159+
shouldPatchTags, err := r.ensureTagsAndLabels(cniNodeCopy, node)
160+
shouldPatchFinalizer := controllerutil.AddFinalizer(cniNodeCopy, config.NodeTerminationFinalizer)
161+
createAt := time.Now()
139162

140-
if shouldPatch {
163+
if shouldPatchTags || shouldPatchFinalizer {
141164
r.log.Info("patching CNINode to add fields Tags, Labels and finalizer", "cninode", cniNode.Name)
142165
if err := r.Client.Patch(ctx, cniNodeCopy, client.MergeFromWithOptions(cniNode, client.MergeFromWithOptimisticLock{})); err != nil {
143166
if apierrors.IsConflict(err) {
@@ -146,29 +169,25 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
146169
}
147170
return ctrl.Result{}, err
148171
}
172+
if shouldPatchTags {
173+
cninodeOperationLatency.WithLabelValues("add_tag").Observe(time.Since(createAt).Seconds())
174+
}
175+
if shouldPatchFinalizer {
176+
cninodeOperationLatency.WithLabelValues("add_finalizer").Observe(time.Since(createAt).Seconds())
177+
}
149178
}
150179
return ctrl.Result{}, err
151180
} else { // CNINode is marked for deletion
181+
startAt := time.Now()
152182
if !nodeFound {
153183
// node is also deleted, proceed with running the cleanup routine and remove the finalizer
154184
// run cleanup for Linux nodes only
155185
if val, ok := cniNode.ObjectMeta.Labels[config.NodeLabelOS]; ok && val == config.OSLinux {
156-
r.log.Info("running the finalizer routine on cniNode", "cniNode", cniNode.Name)
157-
// run cleanup when node id is present
158-
if nodeID, ok := cniNode.Spec.Tags[config.NetworkInterfaceNodeIDKey]; ok && nodeID != "" {
159-
if !r.deletePool.TryAcquire(1) {
160-
r.log.Info("d, will requeue request")
161-
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
162-
}
163-
go func(nodeID string) {
164-
defer r.deletePool.Release(1)
165-
childCtx, cancel := context.WithTimeout(ctx, config.NodeTerminationTimeout)
166-
defer cancel()
167-
if err := r.newResourceCleaner(nodeID, r.eC2Wrapper, r.vpcId, r.log).DeleteLeakedResources(childCtx); err != nil {
168-
r.log.Error(err, "failed to cleanup resources during node termination")
169-
ec2API.NodeTerminationENICleanupFailure.Inc()
170-
}
171-
}(nodeID)
186+
// add the CNINode to the cleanup channel to run the cleanup routine
187+
r.cleanupChan <- CleanupTask{
188+
cniNode: cniNode,
189+
retryAfter: initalRetryDelay * time.Millisecond,
190+
hasRetried: 0,
172191
}
173192
}
174193

@@ -179,6 +198,7 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
179198
}
180199
return ctrl.Result{}, err
181200
}
201+
cninodeOperationLatency.WithLabelValues("remove_finalizer").Observe(time.Since(startAt).Seconds())
182202
return ctrl.Result{}, nil
183203
} else {
184204
// node exists, do not run the cleanup routine(periodic cleanup routine will delete leaked ENIs), remove the finalizer,
@@ -200,6 +220,8 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
200220
r.log.Error(err, "failed to remove finalizer on CNINode, will retry")
201221
return ctrl.Result{}, err
202222
}
223+
cninodeOperationLatency.WithLabelValues("remove_finalizer").Observe(time.Since(startAt).Seconds())
224+
203225
// wait till CNINode is deleted before recreation as the new object will be created with same name to avoid "object already exists" error
204226
if err := r.waitTillCNINodeDeleted(client.ObjectKeyFromObject(newCNINode)); err != nil {
205227
// raise event if CNINode was not deleted after removing the finalizer
@@ -219,6 +241,7 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
219241
// return nil as object is deleted and we cannot recreate the object now
220242
return ctrl.Result{}, nil
221243
}
244+
cninodeOperationLatency.WithLabelValues("re_create").Observe(time.Since(startAt).Seconds())
222245
r.log.Info("successfully recreated CNINode", "cniNode", newCNINode.Name)
223246
}
224247
}
@@ -230,12 +253,58 @@ func (r *CNINodeReconciler) SetupWithManager(mgr ctrl.Manager, maxNodeConcurrent
230253
if !prometheusRegistered {
231254
prometheusRegister()
232255
}
256+
257+
// start a watching goroutine for taking cninode cleanup tasks
258+
go r.watchCleanupTasks()
259+
233260
return ctrl.NewControllerManagedBy(mgr).
234261
For(&v1alpha1.CNINode{}).
235262
WithOptions(controller.Options{MaxConcurrentReconciles: maxNodeConcurrentReconciles}).
236263
Complete(r)
237264
}
238265

266+
func (r *CNINodeReconciler) watchCleanupTasks() {
267+
for {
268+
select {
269+
case task := <-r.cleanupChan:
270+
r.processCleanupTasks(r.context, task.(CleanupTask))
271+
case <-r.context.Done():
272+
r.log.Info("context cancelled and stop cninodes cleanup task")
273+
return
274+
}
275+
}
276+
}
277+
278+
func (r *CNINodeReconciler) processCleanupTasks(ctx context.Context, task CleanupTask) {
279+
log := r.log.WithValues("cniNode", task.cniNode.Name)
280+
log.Info("running the finalizer routine on cniNode", "cniNode", task.cniNode.Name)
281+
// run cleanup when node id is present
282+
if nodeID, ok := task.cniNode.Spec.Tags[config.NetworkInterfaceNodeIDKey]; ok && nodeID != "" {
283+
if !r.deletePool.TryAcquire(1) {
284+
if task.hasRetried >= cleanupTaskMaxRetry {
285+
log.Info("will not requeue request as max retries are already done")
286+
return
287+
}
288+
log.Info("will requeue request after", "after", task.retryAfter)
289+
time.Sleep(task.retryAfter)
290+
task.retryAfter *= cleanupTaskRetryFactor
291+
task.hasRetried += 1
292+
r.cleanupChan <- task
293+
return
294+
}
295+
go func(nodeID string) {
296+
defer r.deletePool.Release(1)
297+
childCtx, cancel := context.WithTimeout(ctx, config.NodeTerminationTimeout)
298+
defer cancel()
299+
if err := r.newResourceCleaner(nodeID, r.eC2Wrapper, r.vpcId, r.log).DeleteLeakedResources(childCtx); err != nil {
300+
log.Error(err, "failed to cleanup resources during node termination")
301+
ec2API.NodeTerminationENICleanupFailure.Inc()
302+
}
303+
log.Info("successfully cleaned up resources during node termination", "nodeID", nodeID)
304+
}(nodeID)
305+
}
306+
}
307+
239308
// waitTillCNINodeDeleted waits for CNINode to be deleted with timeout and returns error
240309
func (r *CNINodeReconciler) waitTillCNINodeDeleted(nameSpacedCNINode types.NamespacedName) error {
241310
oldCNINode := &v1alpha1.CNINode{}

controllers/crds/cninode_controller_test.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"github.com/go-logr/logr"
1515
"github.com/golang/mock/gomock"
1616
"github.com/stretchr/testify/assert"
17-
"golang.org/x/sync/semaphore"
1817
corev1 "k8s.io/api/core/v1"
1918
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2019
"k8s.io/apimachinery/pkg/runtime"
@@ -56,14 +55,7 @@ func NewCNINodeMock(ctrl *gomock.Controller, mockObjects ...client.Object) *CNIN
5655
_ = v1alpha1.AddToScheme(scheme)
5756
client := fakeClient.NewClientBuilder().WithScheme(scheme).WithObjects(mockObjects...).Build()
5857
return &CNINodeMock{
59-
Reconciler: CNINodeReconciler{
60-
Client: client,
61-
scheme: scheme,
62-
log: zap.New(),
63-
clusterName: mockClusterName,
64-
vpcId: "vpc-000000000000",
65-
deletePool: semaphore.NewWeighted(10),
66-
},
58+
Reconciler: *NewCNINodeReconciler(client, scheme, context.Background(), zap.New(), nil, nil, mockClusterName, "vpc-000000000000", nil, 10, nil),
6759
}
6860
}
6961

@@ -210,6 +202,7 @@ func TestCNINodeReconcile(t *testing.T) {
210202
if tt.prepare != nil {
211203
tt.prepare(&f)
212204
}
205+
go mock.Reconciler.watchCleanupTasks()
213206
res, err := mock.Reconciler.Reconcile(context.Background(), reconcileRequest)
214207

215208
cniNode := &v1alpha1.CNINode{}

pkg/aws/ec2/api/wrapper.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ import (
2222

2323
vpc_rc_config "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config"
2424
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/utils"
25+
"github.com/samber/lo"
2526

2627
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/version"
2728
smithymiddleware "github.com/aws/smithy-go/middleware"
2829
smithyhttp "github.com/aws/smithy-go/transport/http"
2930

3031
"github.com/aws/aws-sdk-go-v2/aws"
32+
"github.com/aws/aws-sdk-go-v2/aws/arn"
3133
awsmiddleware "github.com/aws/aws-sdk-go-v2/aws/middleware"
3234
"github.com/aws/aws-sdk-go-v2/aws/retry"
3335
"github.com/aws/aws-sdk-go-v2/config"
@@ -441,7 +443,7 @@ func NewEC2Wrapper(roleARN, clusterName, region string, instanceClientQPS, insta
441443

442444
ec2Wrapper := &ec2Wrapper{log: log}
443445

444-
cfg, err := ec2Wrapper.getInstanceConfig()
446+
cfg, err := ec2Wrapper.getInstanceConfig(region, lo.Must1(arn.Parse(roleARN)).AccountID)
445447
if err != nil {
446448
return nil, err
447449
}
@@ -481,7 +483,7 @@ func NewEC2Wrapper(roleARN, clusterName, region string, instanceClientQPS, insta
481483
return ec2Wrapper, nil
482484
}
483485

484-
func (e *ec2Wrapper) getInstanceConfig() (*aws.Config, error) {
486+
func (e *ec2Wrapper) getInstanceConfig(regionStr, accountID string) (*aws.Config, error) {
485487
// Create a new config
486488
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithAPIOptions([]func(stack *smithymiddleware.Stack) error{
487489
awsmiddleware.AddUserAgentKeyValue(AppName, version.GitVersion),
@@ -491,17 +493,27 @@ func (e *ec2Wrapper) getInstanceConfig() (*aws.Config, error) {
491493
}
492494

493495
ec2Metadata := imds.NewFromConfig(cfg)
494-
region, err := ec2Metadata.GetRegion(context.TODO(), &imds.GetRegionInput{})
495-
if err != nil {
496-
return nil, fmt.Errorf("failed to find the region from ec2 metadata: %v", err)
496+
if regionStr == "" {
497+
region, err := ec2Metadata.GetRegion(context.TODO(), &imds.GetRegionInput{})
498+
if err != nil {
499+
return nil, fmt.Errorf("failed to find the region from ec2 metadata: %v", err)
500+
}
501+
cfg.Region = region.Region
502+
} else {
503+
cfg.Region = regionStr
497504
}
498-
cfg.Region = region.Region
499-
instanceIdentity, err := ec2Metadata.GetInstanceIdentityDocument(context.TODO(), &imds.GetInstanceIdentityDocumentInput{})
500-
if err != nil {
501-
return nil, fmt.Errorf("failed to get the instance identity document %v", err)
505+
506+
if accountID == "" {
507+
instanceIdentity, err := ec2Metadata.GetInstanceIdentityDocument(context.TODO(), &imds.GetInstanceIdentityDocumentInput{})
508+
if err != nil {
509+
return nil, fmt.Errorf("failed to get the instance identity document %v", err)
510+
}
511+
// Set the Account ID
512+
e.accountID = instanceIdentity.AccountID
513+
} else {
514+
e.accountID = accountID
502515
}
503-
// Set the Account ID
504-
e.accountID = instanceIdentity.AccountID
516+
505517
return &cfg, nil
506518
}
507519

0 commit comments

Comments
 (0)