@@ -17,6 +17,9 @@ limitations under the License.
1717package clusters
1818
1919import (
20+ "context"
21+ "fmt"
22+
2023 "github.com/go-logr/logr"
2124
2225 "sigs.k8s.io/controller-runtime/pkg/cluster"
@@ -27,11 +30,31 @@ import (
2730)
2831
2932var _ multicluster.Provider = & Provider {}
33+ var _ multicluster.ProviderRunnable = & Provider {}
34+
35+ // InputChannelSize is the size of the input channel used to queue
36+ // clusters to be added to the provider.
37+ var InputChannelSize = 10
3038
31- // Provider is a provider that only embeds clusters.Clusters.
39+ // Provider is a provider that embeds clusters.Clusters.
40+ //
41+ // It showcases how to implement a multicluster.Provider using
42+ // clusters.Clusters and can be used as a starting point for building
43+ // custom providers.
44+ // Other providers should utilize clusters.Clusters directly instead of
45+ // this type, as this type is primarily for demonstration purposes and
46+ // can lead to opaque errors as it e.g. overwrites the .Add method.
3247type Provider struct {
3348 clusters.Clusters [cluster.Cluster ]
3449 log logr.Logger
50+
51+ waiting map [string ]cluster.Cluster
52+ input chan item
53+ }
54+
55+ type item struct {
56+ clusterName string
57+ cluster cluster.Cluster
3558}
3659
3760// New creates a new provider that embeds clusters.Clusters.
@@ -40,5 +63,63 @@ func New() *Provider {
4063 p .log = log .Log .WithName ("clusters-cluster-provider" )
4164 p .Clusters = clusters .New [cluster.Cluster ]()
4265 p .Clusters .ErrorHandler = p .log .Error
66+ p .waiting = make (map [string ]cluster.Cluster )
4367 return p
4468}
69+
70+ func (p * Provider ) startOnce () error {
71+ p .Lock .Lock ()
72+ defer p .Lock .Unlock ()
73+ if p .input != nil {
74+ return fmt .Errorf ("provider already started" )
75+ }
76+ p .input = make (chan item , InputChannelSize )
77+ return nil
78+ }
79+
80+ // Start starts the provider.
81+ func (p * Provider ) Start (ctx context.Context , aware multicluster.Aware ) error {
82+ if err := p .startOnce (); err != nil {
83+ return err
84+ }
85+
86+ p .log .Info ("starting provider" )
87+ for name , cl := range p .waiting {
88+ if err := p .Clusters .AddOrReplace (ctx , name , cl , aware ); err != nil {
89+ p .log .Error (err , "error adding cluster" , "name" , name )
90+ }
91+ }
92+ p .Lock .Lock ()
93+ p .waiting = nil
94+ p .Lock .Unlock ()
95+
96+ for {
97+ select {
98+ case <- ctx .Done ():
99+ p .log .Info ("stopping provider" )
100+ return nil
101+ case it := <- p .input :
102+ p .log .Info ("adding cluster to provider" , "name" , it .clusterName )
103+ if err := p .Clusters .AddOrReplace (ctx , it .clusterName , it .cluster , aware ); err != nil {
104+ p .log .Error (err , "error adding cluster" , "name" , it .clusterName )
105+ }
106+ }
107+ }
108+ }
109+
110+ // Add adds a new cluster to the provider. If the provider has not been
111+ // started yet it queues the cluster to be added when the provider
112+ // starts.
113+ func (p * Provider ) Add (ctx context.Context , clusterName string , cl cluster.Cluster ) error {
114+ p .Lock .Lock ()
115+ defer p .Lock .Unlock ()
116+ if p .input != nil {
117+ p .input <- item {
118+ clusterName : clusterName ,
119+ cluster : cl ,
120+ }
121+ return nil
122+ }
123+ p .waiting [clusterName ] = cl
124+ return nil
125+ }
0 commit comments