Skip to content

Commit a51ba6f

Browse files
authored
Introduce ResourceSnapshot interface. (#491)
Introduce a ResourceSnaphot interface to abstract the type of the snapshot stored in a SnapshotCache. This allows applications to use SnapshotCache without also having to use Snapshot. The usefulness of that is that it is possible to use SnapshotCache with non-Envoy xDS applications. This fixes #489. Signed-off-by: James Peach <[email protected]>
1 parent c047955 commit a51ba6f

File tree

10 files changed

+283
-105
lines changed

10 files changed

+283
-105
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe
88
github.com/envoyproxy/protoc-gen-validate v0.1.0
99
github.com/golang/protobuf v1.5.0
10-
github.com/google/go-cmp v0.5.5
10+
github.com/google/go-cmp v0.5.6
1111
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
1212
github.com/stretchr/testify v1.7.0
1313
go.opentelemetry.io/proto/otlp v0.7.0

go.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
3838
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
3939
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
4040
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
41-
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
4241
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
42+
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
43+
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
4344
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
4445
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
4546
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=

internal/example/resource.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func makeConfigSource() *core.ConfigSource {
165165
return source
166166
}
167167

168-
func GenerateSnapshot() cache.Snapshot {
168+
func GenerateSnapshot() *cache.Snapshot {
169169
snap, _ := cache.NewSnapshot("1",
170170
map[resource.Type][]types.Resource{
171171
resource.ClusterType: {makeCluster(ClusterName)},

pkg/cache/v3/delta_test.go

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/google/go-cmp/cmp"
1011
"github.com/stretchr/testify/assert"
12+
"google.golang.org/protobuf/testing/protocmp"
1113

1214
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
1315
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
@@ -18,6 +20,14 @@ import (
1820
"github.com/envoyproxy/go-control-plane/pkg/test/resource/v3"
1921
)
2022

23+
func assertResourceMapEqual(t *testing.T, want map[string]types.Resource, got map[string]types.Resource) {
24+
t.Helper()
25+
26+
if !cmp.Equal(want, got, protocmp.Transform()) {
27+
t.Errorf("got resources %v, want %v", got, want)
28+
}
29+
}
30+
2131
func TestSnapshotCacheDeltaWatch(t *testing.T) {
2232
c := cache.NewSnapshotCache(false, group{}, logger{t: t})
2333
watches := make(map[string]chan cache.DeltaResponse)
@@ -34,7 +44,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
3444
}, stream.NewStreamState(true, nil), watches[typ])
3545
}
3646

37-
if err := c.SetSnapshot(context.Background(), key, snapshot); err != nil {
47+
if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
3848
t.Fatal(err)
3949
}
4050

@@ -43,9 +53,8 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
4353
t.Run(typ, func(t *testing.T) {
4454
select {
4555
case out := <-watches[typ]:
46-
if !reflect.DeepEqual(cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ)) {
47-
t.Errorf("got resources %v, want %v", out.(*cache.RawDeltaResponse).Resources, snapshot.GetResources(typ))
48-
}
56+
snapshot := fixture.snapshot()
57+
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
4958
vMap := out.GetNextVersionMap()
5059
versionMap[typ] = vMap
5160
case <-time.After(time.Second):
@@ -72,8 +81,8 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
7281
}
7382

7483
// set partially-versioned snapshot
75-
snapshot2 := snapshot
76-
snapshot2.Resources[types.Endpoint] = cache.NewResources(version2, []types.Resource{resource.MakeEndpoint(clusterName, 9090)})
84+
snapshot2 := fixture.snapshot()
85+
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{resource.MakeEndpoint(clusterName, 9090)})
7786
if err := c.SetSnapshot(context.Background(), key, snapshot2); err != nil {
7887
t.Fatal(err)
7988
}
@@ -84,9 +93,9 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
8493
// validate response for endpoints
8594
select {
8695
case out := <-watches[testTypes[0]]:
87-
if !reflect.DeepEqual(cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType)) {
88-
t.Fatalf("got resources %v, want %v", out.(*cache.RawDeltaResponse).Resources, snapshot2.GetResources(rsrc.EndpointType))
89-
}
96+
snapshot2 := fixture.snapshot()
97+
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{resource.MakeEndpoint(clusterName, 9090)})
98+
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType))
9099
vMap := out.GetNextVersionMap()
91100
versionMap[testTypes[0]] = vMap
92101
case <-time.After(time.Second):
@@ -113,17 +122,16 @@ func TestDeltaRemoveResources(t *testing.T) {
113122
}, *streams[typ], watches[typ])
114123
}
115124

116-
if err := c.SetSnapshot(context.Background(), key, snapshot); err != nil {
125+
if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
117126
t.Fatal(err)
118127
}
119128

120129
for _, typ := range testTypes {
121130
t.Run(typ, func(t *testing.T) {
122131
select {
123132
case out := <-watches[typ]:
124-
if !reflect.DeepEqual(cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ)) {
125-
t.Errorf("got resources %v, want %v", out.(*cache.RawDeltaResponse).Resources, snapshot.GetResources(typ))
126-
}
133+
snapshot := fixture.snapshot()
134+
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
127135
nextVersionMap := out.GetNextVersionMap()
128136
streams[typ].SetResourceVersions(nextVersionMap)
129137
case <-time.After(time.Second):
@@ -149,18 +157,18 @@ func TestDeltaRemoveResources(t *testing.T) {
149157
}
150158

151159
// set a partially versioned snapshot with no endpoints
152-
snapshot2 := snapshot
153-
snapshot2.Resources[types.Endpoint] = cache.NewResources(version2, []types.Resource{})
160+
snapshot2 := fixture.snapshot()
161+
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{})
154162
if err := c.SetSnapshot(context.Background(), key, snapshot2); err != nil {
155163
t.Fatal(err)
156164
}
157165

158166
// validate response for endpoints
159167
select {
160168
case out := <-watches[testTypes[0]]:
161-
if !reflect.DeepEqual(cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType)) {
162-
t.Fatalf("got resources %v, want %v", out.(*cache.RawDeltaResponse).Resources, snapshot2.GetResources(rsrc.EndpointType))
163-
}
169+
snapshot2 := fixture.snapshot()
170+
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{})
171+
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType))
164172
nextVersionMap := out.GetNextVersionMap()
165173

166174
// make sure the version maps are different since we no longer are tracking any endpoint resources
@@ -225,7 +233,7 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {
225233
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
226234
defer cancel()
227235

228-
err := c.SetSnapshot(ctx, key, snapshot)
236+
err := c.SetSnapshot(ctx, key, fixture.snapshot())
229237
assert.EqualError(t, err, context.Canceled.Error())
230238

231239
// Now reset the snapshot with a consuming channel. This verifies that if setting the snapshot fails,
@@ -238,7 +246,7 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {
238246
close(watchTriggeredCh)
239247
}()
240248

241-
err = c.SetSnapshot(context.WithValue(context.Background(), testKey{}, "bar"), key, snapshot)
249+
err = c.SetSnapshot(context.WithValue(context.Background(), testKey{}, "bar"), key, fixture.snapshot())
242250
assert.NoError(t, err)
243251

244252
// The channel should get closed due to the watch trigger.

pkg/cache/v3/fixtures_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package cache_test
2+
3+
import (
4+
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
5+
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
6+
rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
7+
)
8+
9+
var fixture = &fixtureGenerator{
10+
version: "x",
11+
version2: "y",
12+
}
13+
14+
type fixtureGenerator struct {
15+
version string
16+
version2 string
17+
}
18+
19+
func (f *fixtureGenerator) snapshot() *cache.Snapshot {
20+
snapshot, err := cache.NewSnapshot(
21+
f.version,
22+
map[rsrc.Type][]types.Resource{
23+
rsrc.EndpointType: {testEndpoint},
24+
rsrc.ClusterType: {testCluster},
25+
rsrc.RouteType: {testRoute},
26+
rsrc.ListenerType: {testListener},
27+
rsrc.RuntimeType: {testRuntime},
28+
rsrc.SecretType: {testSecret[0]},
29+
rsrc.ExtensionConfigType: {testExtensionConfig},
30+
},
31+
)
32+
33+
if err != nil {
34+
panic(err.Error())
35+
}
36+
37+
return snapshot
38+
}

pkg/cache/v3/simple.go

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,35 @@ import (
2626
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
2727
)
2828

29+
// ResourceSnapshot is an abstract snapshot of a collection of resources that
30+
// can be stored in a SnapshotCache. This enables applications to use the
31+
// SnapshotCache watch machinery with their own resource types. Most
32+
// applications will use Snapshot.
33+
type ResourceSnapshot interface {
34+
// GetVersion should return the current version of the resource indicated
35+
// by typeURL. The version string that is returned is opaque and should
36+
// only be compared for equality.
37+
GetVersion(typeURL string) string
38+
39+
// GetResourcesAndTTL returns all resources of the type indicted by
40+
// typeURL, together with their TTL.
41+
GetResourcesAndTTL(typeURL string) map[string]types.ResourceWithTTL
42+
43+
// GetResources returns all resources of the type indicted by
44+
// typeURL. This is identical to GetResourcesAndTTL, except that
45+
// the TTL is omitted.
46+
GetResources(typeURL string) map[string]types.Resource
47+
48+
// ConstructVersionMap is a hint that a delta watch will soon make a
49+
// call to GetVersionMap. The snapshot should construct an internal
50+
// opaque version string for each collection of resource types.
51+
ConstructVersionMap() error
52+
53+
// GetVersionMap returns a map of resource name to resource version for
54+
// all the resources of type indicated by typeURL.
55+
GetVersionMap(typeURL string) map[string]string
56+
}
57+
2958
// SnapshotCache is a snapshot-based cache that maintains a single versioned
3059
// snapshot of responses per node. SnapshotCache consistently replies with the
3160
// latest snapshot. For the protocol to work correctly in ADS mode, EDS/RDS
@@ -46,10 +75,10 @@ type SnapshotCache interface {
4675
//
4776
// This method will cause the server to respond to all open watches, for which
4877
// the version differs from the snapshot version.
49-
SetSnapshot(ctx context.Context, node string, snapshot Snapshot) error
78+
SetSnapshot(ctx context.Context, node string, snapshot ResourceSnapshot) error
5079

5180
// GetSnapshots gets the snapshot for a node.
52-
GetSnapshot(node string) (Snapshot, error)
81+
GetSnapshot(node string) (ResourceSnapshot, error)
5382

5483
// ClearSnapshot removes all status and snapshot information associated with a node.
5584
ClearSnapshot(node string)
@@ -75,7 +104,7 @@ type snapshotCache struct {
75104
ads bool
76105

77106
// snapshots are cached resources indexed by node IDs
78-
snapshots map[string]Snapshot
107+
snapshots map[string]ResourceSnapshot
79108

80109
// status information for all nodes indexed by node IDs
81110
status map[string]*statusInfo
@@ -109,7 +138,7 @@ func newSnapshotCache(ads bool, hash NodeHash, logger log.Logger) *snapshotCache
109138
cache := &snapshotCache{
110139
log: logger,
111140
ads: ads,
112-
snapshots: make(map[string]Snapshot),
141+
snapshots: make(map[string]ResourceSnapshot),
113142
status: make(map[string]*statusInfo),
114143
hash: hash,
115144
}
@@ -188,7 +217,7 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) {
188217
}
189218

190219
// SetSnapshotCacheContext updates a snapshot for a node.
191-
func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapshot Snapshot) error {
220+
func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapshot ResourceSnapshot) error {
192221
cache.mu.Lock()
193222
defer cache.mu.Unlock()
194223

@@ -229,7 +258,7 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
229258
for id, watch := range info.deltaWatches {
230259
res, err := cache.respondDelta(
231260
ctx,
232-
&snapshot,
261+
snapshot,
233262
watch.Request,
234263
watch.Response,
235264
watch.StreamState,
@@ -249,13 +278,13 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
249278
}
250279

251280
// GetSnapshots gets the snapshot for a node, and returns an error if not found.
252-
func (cache *snapshotCache) GetSnapshot(node string) (Snapshot, error) {
281+
func (cache *snapshotCache) GetSnapshot(node string) (ResourceSnapshot, error) {
253282
cache.mu.RLock()
254283
defer cache.mu.RUnlock()
255284

256285
snap, ok := cache.snapshots[node]
257286
if !ok {
258-
return Snapshot{}, fmt.Errorf("no snapshot found for node %s", node)
287+
return nil, fmt.Errorf("no snapshot found for node %s", node)
259288
}
260289
return snap, nil
261290
}
@@ -306,8 +335,12 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str
306335
info.lastWatchRequestTime = time.Now()
307336
info.mu.Unlock()
308337

338+
var version string
339+
309340
snapshot, exists := cache.snapshots[nodeID]
310-
version := snapshot.GetVersion(request.TypeUrl)
341+
if exists {
342+
version = snapshot.GetVersion(request.TypeUrl)
343+
}
311344

312345
if exists {
313346
knownResourceNames := streamState.GetKnownResourceNames(request.TypeUrl)
@@ -339,7 +372,6 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str
339372
if !exists || request.VersionInfo == version {
340373
watchID := cache.nextWatchID()
341374
cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID, request.TypeUrl, request.ResourceNames, nodeID, request.VersionInfo)
342-
343375
info.mu.Lock()
344376
info.watches[watchID] = ResponseWatch{Request: request, Response: value}
345377
info.mu.Unlock()
@@ -452,29 +484,34 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream
452484
if exists {
453485
err := snapshot.ConstructVersionMap()
454486
if err != nil {
455-
cache.log.Errorf("failed to compute version for snapshot resources inline, waiting for next snapshot update")
487+
cache.log.Errorf("failed to compute version for snapshot resources inline: %s", err)
456488
}
457-
response, err := cache.respondDelta(context.Background(), &snapshot, request, value, state)
489+
response, err := cache.respondDelta(context.Background(), snapshot, request, value, state)
458490
if err != nil {
459-
cache.log.Errorf("failed to respond with delta response, waiting for next snapshot update: %s", err)
491+
cache.log.Errorf("failed to respond with delta response: %s", err)
460492
}
461493

462494
delayedResponse = response == nil
463495
}
464496

465497
if delayedResponse {
466498
watchID := cache.nextDeltaWatchID()
467-
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, system version %q", watchID, t, state.GetResourceVersions(), nodeID, snapshot.GetVersion(t))
468-
info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state})
469499

500+
if exists {
501+
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, state.GetResourceVersions(), nodeID, snapshot.GetVersion(t))
502+
} else {
503+
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, state.GetResourceVersions(), nodeID)
504+
}
505+
506+
info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state})
470507
return cache.cancelDeltaWatch(nodeID, watchID)
471508
}
472509

473510
return nil
474511
}
475512

476513
// Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change.
477-
func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot *Snapshot, request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) (*RawDeltaResponse, error) {
514+
func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) (*RawDeltaResponse, error) {
478515
resp := createDeltaResponse(ctx, request, state, resourceContainer{
479516
resourceMap: snapshot.GetResources(request.TypeUrl),
480517
versionMap: snapshot.GetVersionMap(request.TypeUrl),

0 commit comments

Comments
 (0)