Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func main() {
)

var capacityController *capacity.Controller
var topologyInformer topology.Informer
if *enableCapacity {
// Publishing storage capacity information uses its own client
// with separate rate limiting.
Expand Down Expand Up @@ -501,7 +502,6 @@ func main() {
klog.Infof("using %s/%s %s as owner of CSIStorageCapacity objects", controller.APIVersion, controller.Kind, controller.Name)
}

var topologyInformer topology.Informer
if nodeDeployment == nil {
topologyRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax)
topologyInformer = topology.NewNodeTopology(
Expand All @@ -521,7 +521,6 @@ func main() {
klog.Infof("producing CSIStorageCapacity objects with fixed topology segment %s", segment)
topologyInformer = topology.NewFixedNodeTopology(&segment)
}
go topologyInformer.RunWorker(ctx)

managedByID := "external-provisioner"
if *enableNodeDeployment {
Expand Down Expand Up @@ -680,10 +679,13 @@ func main() {

factory.Start(ctx.Done())
if factoryForNamespace != nil {
// Starting is enough, the capacity controller will
// Starting is enough, the capacityController and topologyInformer will
// wait for sync.
factoryForNamespace.Start(ctx.Done())
}
if topologyInformer != nil {
go topologyInformer.RunWorker(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation which explains that RunWorker may only be called once per instance and why.

}
cacheSyncResult := factory.WaitForCacheSync(ctx.Done())
for _, v := range cacheSyncResult {
if !v {
Expand Down
22 changes: 14 additions & 8 deletions pkg/capacity/topology/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"
"sort"
"sync"
"sync/atomic"

v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
Expand Down Expand Up @@ -159,6 +160,7 @@ type nodeTopology struct {
nodeInformer coreinformersv1.NodeInformer
csiNodeInformer storageinformersv1.CSINodeInformer
queue workqueue.TypedRateLimitingInterface[string]
hasSynced atomic.Bool

mutex sync.Mutex
// segments hold a list of all currently known topology segments.
Expand Down Expand Up @@ -205,19 +207,22 @@ func (nt *nodeTopology) RunWorker(ctx context.Context) {
klog.Info("Started node topology worker")
defer klog.Info("Shutting node topology worker")

if !cache.WaitForCacheSync(ctx.Done(),
nt.nodeInformer.Informer().HasSynced, nt.csiNodeInformer.Informer().HasSynced) {
return
}

go func() {
<-ctx.Done()
nt.queue.ShutDown()
}()
Comment on lines +215 to +218
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synctest checks for leaked goroutines. So I have to clean it up.

nt.queue.Add("") // Initial sync to ensure HasSynced() will become true.
for nt.processNextWorkItem(ctx) {
}
}

func (nt *nodeTopology) HasSynced() bool {
if nt.nodeInformer.Informer().HasSynced() &&
nt.csiNodeInformer.Informer().HasSynced() {
// Now that both informers are up-to-date, use that
// information to update our own view of the world.
nt.sync(context.Background())
return true
}
return false
return nt.hasSynced.Load()
}

func (nt *nodeTopology) processNextWorkItem(ctx context.Context) bool {
Expand All @@ -227,6 +232,7 @@ func (nt *nodeTopology) processNextWorkItem(ctx context.Context) bool {
}
defer nt.queue.Done(obj)
nt.sync(ctx)
nt.hasSynced.Store(true)
return true
}

Expand Down
34 changes: 34 additions & 0 deletions pkg/capacity/topology/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"reflect"
"sort"
"testing"
"testing/synctest"
"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -566,6 +567,39 @@ func TestNodeTopology(t *testing.T) {
}
}

func TestHasSynced(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
client := fakeclientset.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 1*time.Hour)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For why not setting resync period to 0, see kubernetes/kubernetes#133500

nodeInformer := informerFactory.Core().V1().Nodes()
csiNodeInformer := informerFactory.Storage().V1().CSINodes()
rateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Second, 2*time.Second)
queue := workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{Name: "items"})

nt := NewNodeTopology(
driverName,
client,
nodeInformer,
csiNodeInformer,
queue,
).(*nodeTopology)

ctx := t.Context()
go nt.RunWorker(ctx)
time.Sleep(10 * time.Second)
if nt.HasSynced() {
t.Fatalf("upstream informer not started yet, expected HasSynced to return false")
}

informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
synctest.Wait()
if !nt.HasSynced() {
t.Fatalf("nt should be synced now")
}
})
}

type segmentsFound map[*Segment]bool

func (sf segmentsFound) Found() []*Segment {
Expand Down