Skip to content

Commit a3854e5

Browse files
Fix watcher storm during topo outages (#18434)
Signed-off-by: Harshit Gangal <[email protected]>
1 parent 3baf4bc commit a3854e5

File tree

3 files changed

+379
-8
lines changed

3 files changed

+379
-8
lines changed

go/vt/srvtopo/watch.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,18 @@ func (entry *watchEntry) update(ctx context.Context, value any, err error, init
161161
entry.onValueLocked(value)
162162
}
163163

164-
listeners := entry.listeners
165-
entry.listeners = entry.listeners[:0]
166-
167-
for _, callback := range listeners {
168-
if callback(entry.value, entry.lastError) {
169-
entry.listeners = append(entry.listeners, callback)
164+
// Only notify listeners on success or when no cached value exists after error processing.
165+
// This prevents unnecessary notifications during topo outages when cached data is available.
166+
shouldNotifyListeners := err == nil || entry.value == nil
167+
168+
if shouldNotifyListeners {
169+
listeners := entry.listeners
170+
entry.listeners = entry.listeners[:0]
171+
172+
for _, callback := range listeners {
173+
if callback(entry.value, entry.lastError) {
174+
entry.listeners = append(entry.listeners, callback)
175+
}
170176
}
171177
}
172178
}

go/vt/srvtopo/watch_test.go

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/*
2+
Copyright 2025 The Vitess Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package srvtopo
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"sync/atomic"
23+
"testing"
24+
"time"
25+
26+
"github.com/stretchr/testify/assert"
27+
"github.com/stretchr/testify/require"
28+
29+
"vitess.io/vitess/go/stats"
30+
"vitess.io/vitess/go/vt/topo"
31+
"vitess.io/vitess/go/vt/topo/memorytopo"
32+
33+
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
34+
)
35+
36+
// TestWatcherOutageBehavior tests that watchers remain silent during topo outages.
37+
func TestWatcherOutageBehavior(t *testing.T) {
38+
originalCacheTTL := srvTopoCacheTTL
39+
originalCacheRefresh := srvTopoCacheRefresh
40+
srvTopoCacheTTL = 100 * time.Millisecond
41+
srvTopoCacheRefresh = 50 * time.Millisecond
42+
defer func() {
43+
srvTopoCacheTTL = originalCacheTTL
44+
srvTopoCacheRefresh = originalCacheRefresh
45+
}()
46+
47+
ctx, cancel := context.WithCancel(context.Background())
48+
defer cancel()
49+
ts, factory := memorytopo.NewServerAndFactory(ctx, "test_cell")
50+
counts := stats.NewCountersWithSingleLabel("", "Watcher outage test", "type")
51+
rs := NewResilientServer(ctx, ts, counts)
52+
53+
initialVSchema := &vschemapb.SrvVSchema{
54+
Keyspaces: map[string]*vschemapb.Keyspace{
55+
"ks1": {Sharded: false},
56+
},
57+
}
58+
err := ts.UpdateSrvVSchema(ctx, "test_cell", initialVSchema)
59+
require.NoError(t, err)
60+
61+
var watcherCallCount atomic.Int32
62+
var lastWatcherError atomic.Value
63+
64+
rs.WatchSrvVSchema(ctx, "test_cell", func(v *vschemapb.SrvVSchema, e error) bool {
65+
watcherCallCount.Add(1)
66+
if e != nil {
67+
lastWatcherError.Store(e)
68+
} else {
69+
lastWatcherError.Store((*error)(nil))
70+
}
71+
return true
72+
})
73+
74+
// Wait for initial callback
75+
assert.Eventually(t, func() bool {
76+
return watcherCallCount.Load() >= 1
77+
}, 2*time.Second, 10*time.Millisecond)
78+
79+
initialWatcherCalls := watcherCallCount.Load()
80+
require.GreaterOrEqual(t, initialWatcherCalls, int32(1))
81+
if errPtr := lastWatcherError.Load(); errPtr != nil {
82+
if err, ok := errPtr.(error); ok && err != nil {
83+
require.NoError(t, err)
84+
}
85+
}
86+
87+
// Verify Get operations work normally
88+
vschema, err := rs.GetSrvVSchema(ctx, "test_cell")
89+
require.NoError(t, err)
90+
require.NotNil(t, vschema)
91+
92+
// Simulate topo outage
93+
factory.SetError(fmt.Errorf("simulated topo error"))
94+
95+
// Get should still work from cache during outage
96+
vschema, err = rs.GetSrvVSchema(ctx, "test_cell")
97+
assert.NoError(t, err)
98+
assert.NotNil(t, vschema)
99+
100+
// Wait during outage period
101+
outageDuration := 500 * time.Millisecond
102+
time.Sleep(outageDuration)
103+
104+
// Watchers should remain silent during outage
105+
watcherCallsDuringOutage := watcherCallCount.Load() - initialWatcherCalls
106+
assert.Equal(t, int32(0), watcherCallsDuringOutage, "watchers should be silent during outage")
107+
108+
// Get operations should continue working from cache
109+
vschema, err = rs.GetSrvVSchema(ctx, "test_cell")
110+
assert.NoError(t, err)
111+
assert.NotNil(t, vschema)
112+
113+
// Clear the error and update VSchema
114+
factory.SetError(nil)
115+
updatedVSchema := &vschemapb.SrvVSchema{
116+
Keyspaces: map[string]*vschemapb.Keyspace{
117+
"ks2": {Sharded: false},
118+
},
119+
}
120+
err = ts.UpdateSrvVSchema(ctx, "test_cell", updatedVSchema)
121+
require.NoError(t, err)
122+
123+
// Verify recovery callback occurs
124+
watcherCallsBeforeRecovery := watcherCallCount.Load()
125+
assert.Eventually(t, func() bool {
126+
errPtr := lastWatcherError.Load()
127+
isNoError := errPtr == nil || (errPtr.(*error) == nil)
128+
return watcherCallCount.Load() > watcherCallsBeforeRecovery && isNoError
129+
}, 2*time.Second, 10*time.Millisecond)
130+
131+
// Verify recovery worked
132+
vschema, err = rs.GetSrvVSchema(ctx, "test_cell")
133+
assert.NoError(t, err)
134+
assert.NotNil(t, vschema)
135+
}
136+
137+
// TestVSchemaWatcherCacheExpiryBehavior tests cache behavior during different error types.
138+
func TestVSchemaWatcherCacheExpiryBehavior(t *testing.T) {
139+
originalCacheTTL := srvTopoCacheTTL
140+
originalCacheRefresh := srvTopoCacheRefresh
141+
srvTopoCacheTTL = 100 * time.Millisecond
142+
srvTopoCacheRefresh = 50 * time.Millisecond
143+
defer func() {
144+
srvTopoCacheTTL = originalCacheTTL
145+
srvTopoCacheRefresh = originalCacheRefresh
146+
}()
147+
148+
ctx, cancel := context.WithCancel(context.Background())
149+
defer cancel()
150+
ts, factory := memorytopo.NewServerAndFactory(ctx, "test_cell")
151+
counts := stats.NewCountersWithSingleLabel("", "Cache expiry test", "type")
152+
rs := NewResilientServer(ctx, ts, counts)
153+
154+
// Set initial VSchema
155+
initialVSchema := &vschemapb.SrvVSchema{
156+
Keyspaces: map[string]*vschemapb.Keyspace{
157+
"ks1": {Sharded: false},
158+
},
159+
}
160+
err := ts.UpdateSrvVSchema(ctx, "test_cell", initialVSchema)
161+
require.NoError(t, err)
162+
163+
// Get the initial value to populate cache
164+
vschema, err := rs.GetSrvVSchema(ctx, "test_cell")
165+
require.NoError(t, err)
166+
require.NotNil(t, vschema)
167+
168+
// Wait for cache TTL to expire
169+
time.Sleep(srvTopoCacheTTL + 10*time.Millisecond)
170+
171+
// Set a non-topo error (like 500 HTTP error)
172+
nonTopoErr := fmt.Errorf("HTTP 500 internal server error")
173+
factory.SetError(nonTopoErr)
174+
175+
// Get VSchema after TTL expiry with non-topo error
176+
// Should still serve cached value (not the error)
177+
vschema, err = rs.GetSrvVSchema(ctx, "test_cell")
178+
assert.NoError(t, err, "Should serve cached value for non-topo errors even after TTL")
179+
assert.NotNil(t, vschema, "Should return cached VSchema")
180+
181+
// Now test with a topo error
182+
factory.SetError(topo.NewError(topo.Timeout, "topo timeout error"))
183+
time.Sleep(srvTopoCacheTTL + 10*time.Millisecond) // Let TTL expire again
184+
185+
// With topo error after TTL expiry, cache should be cleared
186+
vschema, err = rs.GetSrvVSchema(ctx, "test_cell")
187+
assert.Error(t, err, "Should return error for topo errors after TTL expiry")
188+
assert.True(t, topo.IsErrType(err, topo.Timeout), "Should return the topo error")
189+
assert.Nil(t, vschema, "Should not return vschema when error occurs")
190+
}
191+
192+
// TestWatcherShouldOnlyNotifyOnActualChanges tests that watchers are called when VSchema content changes.
193+
func TestWatcherShouldOnlyNotifyOnActualChanges(t *testing.T) {
194+
ctx, cancel := context.WithCancel(context.Background())
195+
defer cancel()
196+
ts := memorytopo.NewServer(ctx, "test_cell")
197+
counts := stats.NewCountersWithSingleLabel("", "Change detection test", "type")
198+
rs := NewResilientServer(ctx, ts, counts)
199+
200+
vschema := &vschemapb.SrvVSchema{
201+
Keyspaces: map[string]*vschemapb.Keyspace{
202+
"ks1": {Sharded: false},
203+
},
204+
}
205+
err := ts.UpdateSrvVSchema(ctx, "test_cell", vschema)
206+
require.NoError(t, err)
207+
208+
var callCount atomic.Int32
209+
rs.WatchSrvVSchema(ctx, "test_cell", func(v *vschemapb.SrvVSchema, e error) bool {
210+
callCount.Add(1)
211+
return true
212+
})
213+
214+
// Wait for initial call
215+
assert.Eventually(t, func() bool {
216+
return callCount.Load() >= 1
217+
}, 1*time.Second, 10*time.Millisecond)
218+
219+
initialCalls := callCount.Load()
220+
221+
// Update with same vschema content
222+
err = ts.UpdateSrvVSchema(ctx, "test_cell", vschema)
223+
require.NoError(t, err)
224+
225+
time.Sleep(100 * time.Millisecond)
226+
callsAfterSameUpdate := callCount.Load()
227+
228+
t.Logf("Calls after same content update: %d", callsAfterSameUpdate-initialCalls)
229+
230+
// Update with different vschema
231+
differentVSchema := &vschemapb.SrvVSchema{
232+
Keyspaces: map[string]*vschemapb.Keyspace{
233+
"ks2": {Sharded: true},
234+
},
235+
}
236+
err = ts.UpdateSrvVSchema(ctx, "test_cell", differentVSchema)
237+
require.NoError(t, err)
238+
239+
// Should trigger a call for actual changes
240+
assert.Eventually(t, func() bool {
241+
return callCount.Load() > callsAfterSameUpdate
242+
}, 1*time.Second, 10*time.Millisecond)
243+
}

0 commit comments

Comments
 (0)