Skip to content

Commit 9f1149a

Browse files
Reduce watch updates (#4050)
* Reduce logging verbosity and increase utility Signed-off-by: John Belamaric <[email protected]> * More logging changes Signed-off-by: John Belamaric <[email protected]> * Fix fake type Signed-off-by: John Belamaric <[email protected]> * Only send MODIFIED if the package revision resource version has changed Signed-off-by: John Belamaric <[email protected]> * Add ResourceVersion to fake repo package revision Signed-off-by: John Belamaric <[email protected]> * PV controller should requeue for some errors Signed-off-by: John Belamaric <[email protected]> * Abort a poll if one is already running Signed-off-by: John Belamaric <[email protected]> * Remove use of ticker for cache repo polling Signed-off-by: John Belamaric <[email protected]> --------- Signed-off-by: John Belamaric <[email protected]>
1 parent c36a94e commit 9f1149a

File tree

13 files changed

+100
-55
lines changed

13 files changed

+100
-55
lines changed

porch/controllers/packagevariants/pkg/controllers/packagevariant/packagevariant_controller.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"strconv"
2222
"strings"
23+
"time"
2324

2425
porchapi "github.com/GoogleContainerTools/kpt/porch/api/porch/v1alpha1"
2526
configapi "github.com/GoogleContainerTools/kpt/porch/api/porchconfig/v1alpha1"
@@ -59,6 +60,8 @@ const (
5960

6061
ConditionTypeStalled = "Stalled" // whether or not the packagevariant object is making progress or not
6162
ConditionTypeReady = "Ready" // whether or notthe reconciliation succeded
63+
64+
requeueDuration = 30 * time.Second
6265
)
6366

6467
//go:generate go run sigs.k8s.io/controller-tools/cmd/[email protected] rbac:roleName=porch-controllers-packagevariants webhook paths="." output:rbac:artifacts:config=../../../config/rbac
@@ -119,12 +122,14 @@ func (r *PackageVariantReconciler) Reconcile(ctx context.Context, req ctrl.Reque
119122

120123
if errs := validatePackageVariant(pv); len(errs) > 0 {
121124
setStalledConditionsToTrue(pv, combineErrors(errs))
125+
// do not requeue; failed validation requires a PV change
122126
return ctrl.Result{}, nil
123127
}
124128
upstream, err := r.getUpstreamPR(pv.Spec.Upstream, prList)
125129
if err != nil {
126130
setStalledConditionsToTrue(pv, err.Error())
127-
return ctrl.Result{}, err
131+
// requeue, as the upstream may appear
132+
return ctrl.Result{RequeueAfter: requeueDuration}, err
128133
}
129134
meta.SetStatusCondition(&pv.Status.Conditions, metav1.Condition{
130135
Type: ConditionTypeStalled,
@@ -134,7 +139,18 @@ func (r *PackageVariantReconciler) Reconcile(ctx context.Context, req ctrl.Reque
134139
})
135140

136141
targets, err := r.ensurePackageVariant(ctx, pv, upstream, prList)
137-
setTargetStatusConditions(pv, targets, err)
142+
if err != nil {
143+
meta.SetStatusCondition(&pv.Status.Conditions, metav1.Condition{
144+
Type: ConditionTypeReady,
145+
Status: "False",
146+
Reason: "Error",
147+
Message: err.Error(),
148+
})
149+
// requeue; it may be an intermittent error
150+
return ctrl.Result{RequeueAfter: requeueDuration}, nil
151+
}
152+
153+
setTargetStatusConditions(pv, targets)
138154

139155
return ctrl.Result{}, nil
140156
}
@@ -682,19 +698,8 @@ func (r *PackageVariantReconciler) updateDraft(ctx context.Context,
682698
return draft, nil
683699
}
684700

685-
func setTargetStatusConditions(pv *api.PackageVariant, targets []*porchapi.PackageRevision, err error) {
701+
func setTargetStatusConditions(pv *api.PackageVariant, targets []*porchapi.PackageRevision) {
686702
pv.Status.DownstreamTargets = nil
687-
if err != nil {
688-
klog.Infoln(fmt.Sprintf("setting status to error: %s", err.Error()))
689-
meta.SetStatusCondition(&pv.Status.Conditions, metav1.Condition{
690-
Type: ConditionTypeReady,
691-
Status: "False",
692-
Reason: "Error",
693-
Message: err.Error(),
694-
})
695-
klog.Infoln(fmt.Sprintf("Conditions: %v", pv.Status.Conditions))
696-
return
697-
}
698703
for _, t := range targets {
699704
pv.Status.DownstreamTargets = append(pv.Status.DownstreamTargets, api.DownstreamTarget{
700705
Name: t.GetName(),

porch/pkg/cache/cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type Cache struct {
5353
}
5454

5555
type objectNotifier interface {
56-
NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta)
56+
NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta) int
5757
}
5858

5959
type CacheOptions struct {

porch/pkg/cache/fake/objectnotifier.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@ import (
2222

2323
type ObjectNotifier struct{}
2424

25-
func (o *ObjectNotifier) NotifyPackageRevisionChange(watch.EventType, repository.PackageRevision, meta.PackageRevisionMeta) {
25+
func (o *ObjectNotifier) NotifyPackageRevisionChange(watch.EventType, repository.PackageRevision, meta.PackageRevisionMeta) int {
26+
return 0
2627
}

porch/pkg/cache/repository.go

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ func (r *cachedRepository) Close() error {
311311

312312
// Make sure that watch events are sent for packagerevisions that are
313313
// removed as part of closing the repository.
314+
sent := 0
314315
for _, pr := range r.cachedPackageRevisions {
315316
nn := types.NamespacedName{
316317
Name: pr.KubeObjectName(),
@@ -319,7 +320,7 @@ func (r *cachedRepository) Close() error {
319320
// There isn't really any correct way to handle finalizers here. We are removing
320321
// the repository, so we have to just delete the PackageRevision regardless of any
321322
// finalizers.
322-
klog.Infof("Deleting packagerev %s/%s because repository is closed", nn.Namespace, nn.Name)
323+
klog.Infof("repo %s: deleting packagerev %s/%s because repository is closed", r.id, nn.Namespace, nn.Name)
323324
pkgRevMeta, err := r.metadataStore.Delete(context.TODO(), nn, true)
324325
if err != nil {
325326
// There isn't much use in returning an error here, so we just log it
@@ -331,29 +332,31 @@ func (r *cachedRepository) Close() error {
331332
Namespace: nn.Namespace,
332333
}
333334
}
334-
r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta)
335+
sent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta)
335336
}
337+
klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.id, sent, len(r.cachedPackageRevisions))
336338
return r.repo.Close()
337339
}
338340

339341
// pollForever will continue polling until signal channel is closed or ctx is done.
340342
func (r *cachedRepository) pollForever(ctx context.Context) {
341343
r.pollOnce(ctx)
342-
ticker := time.NewTicker(1 * time.Minute)
343344
for {
344345
select {
345-
case <-ticker.C:
346-
r.pollOnce(ctx)
347-
348346
case <-ctx.Done():
349-
klog.V(2).Infof("exiting repository poller, because context is done: %v", ctx.Err())
347+
klog.V(2).Infof("repo %s: exiting repository poller, because context is done: %v", r.id, ctx.Err())
350348
return
349+
default:
350+
r.pollOnce(ctx)
351+
time.Sleep(60 * time.Second)
351352
}
352353
}
353354
}
354355

355356
func (r *cachedRepository) pollOnce(ctx context.Context) {
356-
klog.Infof("background-refreshing repo %q", r.id)
357+
start := time.Now()
358+
klog.Infof("repo %s: poll started", r.id)
359+
defer func() { klog.Infof("repo %s: poll finished in %f secs", r.id, time.Since(start).Seconds()) }()
357360
ctx, span := tracer.Start(ctx, "Repository::pollOnce", trace.WithAttributes())
358361
defer span.End()
359362

@@ -384,6 +387,9 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
384387
// TODO: Avoid simultaneous fetches?
385388
// TODO: Push-down partial refresh?
386389

390+
start := time.Now()
391+
defer func() { klog.Infof("repo %s: refresh finished in %f secs", r.id, time.Since(start).Seconds()) }()
392+
387393
// Look up all existing PackageRevCRs so we an compare those to the
388394
// actual Packagerevisions found in git/oci, and add/prune PackageRevCRs
389395
// as necessary.
@@ -406,12 +412,10 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
406412

407413
// Build mapping from kubeObjectName to PackageRevisions for new PackageRevisions.
408414
newPackageRevisionNames := make(map[string]*cachedPackageRevision, len(newPackageRevisions))
409-
klog.Infof("New packages:")
410415
for _, newPackage := range newPackageRevisions {
411-
klog.Infof("- %s", newPackage.KubeObjectName())
412416
kname := newPackage.KubeObjectName()
413417
if newPackageRevisionNames[kname] != nil {
414-
klog.Warningf("found duplicate packages with name %v", kname)
418+
klog.Warningf("repo %s: found duplicate packages with name %v", kname)
415419
}
416420

417421
pkgRev := &cachedPackageRevision{
@@ -432,16 +436,16 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
432436
// PackageRevision. The ones that doesn't is removed.
433437
for _, prm := range existingPkgRevCRs {
434438
if _, found := newPackageRevisionNames[prm.Name]; !found {
435-
klog.Infof("Deleting PackageRev %s/%s because parent PackageRevision was not found",
436-
prm.Namespace, prm.Name)
439+
klog.Infof("repo %s: deleting PackageRev %s/%s because parent PackageRevision was not found",
440+
r.id, prm.Namespace, prm.Name)
437441
if _, err := r.metadataStore.Delete(ctx, types.NamespacedName{
438442
Name: prm.Name,
439443
Namespace: prm.Namespace,
440444
}, true); err != nil {
441445
if !apierrors.IsNotFound(err) {
442446
// This will be retried the next time the sync runs.
443-
klog.Warningf("unable to delete PackageRev CR for %s/%s: %w",
444-
prm.Name, prm.Namespace, err)
447+
klog.Warningf("repo %s: unable to delete PackageRev CR for %s/%s: %w",
448+
r.id, prm.Name, prm.Namespace, err)
445449
}
446450
}
447451
}
@@ -466,41 +470,45 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
466470
}
467471

468472
// Send notification for packages that changed.
473+
addSent := 0
474+
modSent := 0
469475
for kname, newPackage := range newPackageRevisionNames {
470476
oldPackage := oldPackageRevisionNames[kname]
471477
metaPackage, found := existingPkgRevCRsMap[newPackage.KubeObjectName()]
472478
if !found {
473479
klog.Warningf("no PackageRev CR found for PackageRevision %s", newPackage.KubeObjectName())
474480
}
475481
if oldPackage == nil {
476-
r.objectNotifier.NotifyPackageRevisionChange(watch.Added, newPackage, metaPackage)
482+
addSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Added, newPackage, metaPackage)
477483
} else {
478-
// TODO: only if changed
479-
klog.Warningf("over-notifying of package updates (even on unchanged packages)")
480-
r.objectNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage, metaPackage)
484+
if oldPackage.ResourceVersion() != newPackage.ResourceVersion() {
485+
modSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage, metaPackage)
486+
}
481487
}
482488
}
483489

490+
delSent := 0
484491
// Send notifications for packages that was deleted in the SoT
485492
for kname, oldPackage := range oldPackageRevisionNames {
486493
if newPackageRevisionNames[kname] == nil {
487494
nn := types.NamespacedName{
488495
Name: oldPackage.KubeObjectName(),
489496
Namespace: oldPackage.KubeObjectNamespace(),
490497
}
491-
klog.Infof("Deleting PackageRev %s/%s because PackageRevision was removed from SoT",
492-
nn.Namespace, nn.Name)
498+
klog.Infof("repo %s: deleting PackageRev %s/%s because PackageRevision was removed from SoT",
499+
r.id, nn.Namespace, nn.Name)
493500
metaPackage, err := r.metadataStore.Delete(ctx, nn, true)
494501
if err != nil {
495-
klog.Warningf("Error deleting PkgRevMeta %s: %v")
502+
klog.Warningf("repo %s: error deleting PkgRevMeta %s: %v", r.id, nn, err)
496503
metaPackage = meta.PackageRevisionMeta{
497504
Name: nn.Name,
498505
Namespace: nn.Namespace,
499506
}
500507
}
501-
r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage, metaPackage)
508+
delSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage, metaPackage)
502509
}
503510
}
511+
klog.Infof("repo %s: addSent %d, modSent %d, delSent for %d old and %d new repo packages", r.id, addSent, modSent, len(oldPackageRevisionNames), len(newPackageRevisionNames))
504512

505513
newPackageRevisionMap := make(map[repository.PackageRevisionKey]*cachedPackageRevision, len(newPackageRevisions))
506514
for _, newPackage := range newPackageRevisions {

porch/pkg/engine/engine.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,8 @@ func (cad *cadEngine) CreatePackageRevision(ctx context.Context, repositoryObj *
342342
if err != nil {
343343
return nil, err
344344
}
345-
cad.watcherManager.NotifyPackageRevisionChange(watch.Added, repoPkgRev, pkgRevMeta)
345+
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Added, repoPkgRev, pkgRevMeta)
346+
klog.Infof("engine: sent %d for new PackageRevision %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName())
346347
return &PackageRevision{
347348
repoPackageRevision: repoPkgRev,
348349
packageRevisionMeta: pkgRevMeta,
@@ -576,7 +577,8 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj *
576577
return nil, err
577578
}
578579

579-
cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
580+
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
581+
klog.Infof("engine: sent %d for updated PackageRevision metadata %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName())
580582
return ToPackageRevision(repoPkgRev, pkgRevMeta), nil
581583
}
582584
switch lifecycle := newObj.Spec.Lifecycle; lifecycle {
@@ -601,7 +603,8 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj *
601603
return nil, err
602604
}
603605

604-
cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
606+
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
607+
klog.Infof("engine: sent %d for reclone and replay PackageRevision %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName())
605608
return ToPackageRevision(repoPkgRev, pkgRevMeta), nil
606609
}
607610

@@ -701,7 +704,8 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj *
701704
return nil, err
702705
}
703706

704-
cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
707+
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
708+
klog.Infof("engine: sent %d for updated PackageRevision %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName())
705709
return ToPackageRevision(repoPkgRev, pkgRevMeta), nil
706710
}
707711

@@ -851,7 +855,8 @@ func (cad *cadEngine) DeletePackageRevision(ctx context.Context, repositoryObj *
851855

852856
if len(pkgRevMeta.Finalizers) > 0 {
853857
klog.Infof("PackageRevision %s deleted, but still have finalizers: %s", oldPackage.KubeObjectName(), strings.Join(pkgRevMeta.Finalizers, ","))
854-
cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, oldPackage.repoPackageRevision, oldPackage.packageRevisionMeta)
858+
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, oldPackage.repoPackageRevision, oldPackage.packageRevisionMeta)
859+
klog.Infof("engine: sent %d modified for deleted PackageRevision %s/%s with finalizers", sent, oldPackage.repoPackageRevision.KubeObjectNamespace(), oldPackage.KubeObjectName())
855860
return nil
856861
}
857862
klog.Infof("PackageRevision %s deleted for real since no finalizers", oldPackage.KubeObjectName())
@@ -876,7 +881,8 @@ func (cad *cadEngine) deletePackageRevision(ctx context.Context, repo repository
876881
klog.Warningf("Error deleting PkgRevMeta %s: %v", nn.String(), err)
877882
}
878883

879-
cad.watcherManager.NotifyPackageRevisionChange(watch.Deleted, repoPkgRev, pkgRevMeta)
884+
sent := cad.watcherManager.NotifyPackageRevisionChange(watch.Deleted, repoPkgRev, pkgRevMeta)
885+
klog.Infof("engine: sent %d for deleted PackageRevision %s/%s", sent, repoPkgRev.KubeObjectNamespace(), repoPkgRev.KubeObjectName())
880886
return nil
881887
}
882888

porch/pkg/engine/fake/packagerevision.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ func (pr *PackageRevision) UID() types.UID {
4747
return pr.Uid
4848
}
4949

50+
func (pr *PackageRevision) ResourceVersion() string {
51+
return pr.PackageRevision.ResourceVersion
52+
}
53+
5054
func (pr *PackageRevision) Key() repository.PackageRevisionKey {
5155
return pr.PackageRevisionKey
5256
}

porch/pkg/engine/watchermanager.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,11 @@ func (r *watcherManager) WatchPackageRevisions(ctx context.Context, filter repos
8989
}
9090

9191
// notifyPackageRevisionChange is called to send a change notification to all interested listeners.
92-
func (r *watcherManager) NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta) {
92+
func (r *watcherManager) NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta) int {
9393
r.mutex.Lock()
9494
defer r.mutex.Unlock()
9595

96+
sent := 0
9697
for i, watcher := range r.watchers {
9798
if watcher == nil {
9899
continue
@@ -106,5 +107,8 @@ func (r *watcherManager) NotifyPackageRevisionChange(eventType watch.EventType,
106107
klog.Infof("stopping watcher in response to !keepGoing")
107108
r.watchers[i] = nil
108109
}
110+
sent += 1
109111
}
112+
113+
return sent
110114
}

porch/pkg/git/git.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1658,7 +1658,9 @@ func (r *gitRepository) discoverPackagesInTree(commit *object.Commit, opt Discov
16581658
return nil, err
16591659
}
16601660

1661-
klog.V(2).Infof("discovered packages @%v with prefix %q: %#v", commit.Hash, opt.FilterPrefix, t.packages)
1661+
if opt.FilterPrefix == "" {
1662+
klog.Infof("discovered %d packages @%v", len(t.packages), commit.Hash)
1663+
}
16621664
return t, nil
16631665
}
16641666

porch/pkg/git/package.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ func (p *gitPackageRevision) UID() types.UID {
7878
return p.uid()
7979
}
8080

81+
func (p *gitPackageRevision) ResourceVersion() string {
82+
return p.commit.String()
83+
}
84+
8185
func (p *gitPackageRevision) Key() repository.PackageRevisionKey {
8286
// if the repository has been registered with a directory, then the
8387
// package name is the package path relative to the registered directory

porch/pkg/git/package_tree.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ func (t *packageList) discoverPackages(tree *object.Tree, treePath string, recur
131131
}
132132

133133
// Found a package
134-
klog.Infof("found package %q with Kptfile hash %q", p, e.Hash)
135134
t.packages[treePath] = &packageListEntry{
136135
path: treePath,
137136
treeHash: tree.Hash,

0 commit comments

Comments
 (0)