Skip to content

Commit 943dd00

Browse files
authored
VReplication: Handle vstream stalls more effectively (#18752)
Signed-off-by: Matt Lord <[email protected]>
1 parent 6587874 commit 943dd00

File tree

7 files changed

+184
-16
lines changed

7 files changed

+184
-16
lines changed

go/vt/vtgate/vstream_manager.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ const tabletPickerContextTimeout = 90 * time.Second
7070
// ending the stream from the tablet.
7171
const stopOnReshardDelay = 500 * time.Millisecond
7272

73+
// livenessTimeout is the point at which we return an error to the client if the stream has received
74+
// no events, including heartbeats, from any of the shards.
75+
var livenessTimeout = 10 * time.Minute
76+
7377
// vstream contains the metadata for one VStream request.
7478
type vstream struct {
7579
// mu protects parts of vgtid, the semantics of a send, and journaler.
@@ -136,6 +140,9 @@ type vstream struct {
136140

137141
tabletPickerOptions discovery.TabletPickerOptions
138142

143+
// At what point, without any activity in the stream, should we consider it dead.
144+
streamLivenessTimer *time.Timer
145+
139146
flags *vtgatepb.VStreamFlags
140147
}
141148

@@ -319,6 +326,10 @@ func (vsm *vstreamManager) GetTotalStreamDelay() int64 {
319326

320327
func (vs *vstream) stream(ctx context.Context) error {
321328
ctx, vs.cancel = context.WithCancel(ctx)
329+
if vs.streamLivenessTimer == nil {
330+
vs.streamLivenessTimer = time.NewTimer(livenessTimeout)
331+
defer vs.streamLivenessTimer.Stop()
332+
}
322333

323334
vs.wg.Add(1)
324335
go func() {
@@ -401,6 +412,13 @@ func (vs *vstream) sendEvents(ctx context.Context) {
401412
})
402413
return
403414
}
415+
case <-vs.streamLivenessTimer.C:
416+
msg := fmt.Sprintf("vstream failed liveness checks as there was no activity, including heartbeats, within the last %v", livenessTimeout)
417+
log.Infof("Error in vstream: %s", msg)
418+
vs.once.Do(func() {
419+
vs.setError(vterrors.New(vtrpcpb.Code_UNAVAILABLE, msg), "vstream is fully throttled or otherwise hung")
420+
})
421+
return
404422
}
405423
}
406424
}
@@ -697,6 +715,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
697715

698716
sendevents := make([]*binlogdatapb.VEvent, 0, len(events))
699717
for i, event := range events {
718+
vs.streamLivenessTimer.Reset(livenessTimeout) // Any event in the stream demonstrates liveness
700719
switch event.Type {
701720
case binlogdatapb.VEventType_FIELD:
702721
ev := maybeUpdateTableName(event, sgtid.Keyspace, vs.flags.GetExcludeKeyspaceFromTableName(), extractFieldTableName)

go/vt/vtgate/vstream_manager_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1748,6 +1748,84 @@ func TestVStreamIdleHeartbeat(t *testing.T) {
17481748
}
17491749
}
17501750

1751+
func TestVStreamLivenessChecks(t *testing.T) {
1752+
ctx := utils.LeakCheckContext(t)
1753+
cell := "aa"
1754+
ks := "TestVStream"
1755+
_ = createSandbox(ks)
1756+
hc := discovery.NewFakeHealthCheck(nil)
1757+
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
1758+
vsm := newTestVStreamManager(ctx, hc, st, cell)
1759+
origLivenessTimeout := livenessTimeout
1760+
defer func() {
1761+
livenessTimeout = origLivenessTimeout
1762+
}()
1763+
fakeTablet := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
1764+
addTabletToSandboxTopo(t, ctx, st, fakeTablet.Tablet().Keyspace, fakeTablet.Tablet().Shard, fakeTablet.Tablet())
1765+
vgtid := &binlogdatapb.VGtid{
1766+
ShardGtids: []*binlogdatapb.ShardGtid{{
1767+
Keyspace: fakeTablet.Tablet().Keyspace,
1768+
Shard: fakeTablet.Tablet().Shard,
1769+
}},
1770+
}
1771+
1772+
type testcase struct {
1773+
name string
1774+
livenessTimeout time.Duration
1775+
// We use simulated tablet vstreamer heartbeats as a substitute for the hardcoded 900ms
1776+
// heartbeats that come from a real tablet server's vstreamer because we have no real
1777+
// tablet server here.
1778+
simulateVstreamerHeartbeats bool
1779+
wantErr string
1780+
}
1781+
testcases := []testcase{
1782+
{
1783+
name: "should fail liveness check",
1784+
livenessTimeout: 100 * time.Millisecond,
1785+
wantErr: fmt.Sprintf("vstream is fully throttled or otherwise hung: vstream failed liveness checks as there was no activity, including heartbeats, within the last %v", 100*time.Millisecond),
1786+
},
1787+
{
1788+
name: "should not fail liveness check",
1789+
livenessTimeout: 100 * time.Millisecond,
1790+
simulateVstreamerHeartbeats: true,
1791+
},
1792+
}
1793+
1794+
for _, tcase := range testcases {
1795+
t.Run(tcase.name, func(t *testing.T) {
1796+
vstreamCtx, vstreamCancel := context.WithTimeout(ctx, tcase.livenessTimeout*2)
1797+
defer vstreamCancel()
1798+
1799+
livenessTimeout = tcase.livenessTimeout
1800+
if tcase.simulateVstreamerHeartbeats {
1801+
// We need to ensure that there's a steady stream of vtgate<-vttablet vstream heartbeat
1802+
// events so that we stay within the defined livenessTimeout and the stream ends when
1803+
// the vstreamCtx times out.
1804+
numEvents := tcase.livenessTimeout.Nanoseconds() / 1e5
1805+
for range numEvents {
1806+
event := &binlogdatapb.VEvent{Type: binlogdatapb.VEventType_HEARTBEAT}
1807+
fakeTablet.AddVStreamEvents([]*binlogdatapb.VEvent{event}, nil)
1808+
}
1809+
origVStreamEventDelay := fakeTablet.VStreamEventDelay
1810+
defer func() {
1811+
fakeTablet.VStreamEventDelay = origVStreamEventDelay
1812+
}()
1813+
fakeTablet.VStreamEventDelay = time.Duration((tcase.livenessTimeout.Nanoseconds() / numEvents) * 2)
1814+
}
1815+
1816+
err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
1817+
return nil
1818+
})
1819+
if tcase.wantErr == "" {
1820+
// Then we expect the context ended error, which means that no real errors occurred
1821+
// in the stream.
1822+
tcase.wantErr = "context ended while sending events: context deadline exceeded"
1823+
}
1824+
require.EqualError(t, err, tcase.wantErr)
1825+
})
1826+
}
1827+
}
1828+
17511829
func TestKeyspaceHasBeenSharded(t *testing.T) {
17521830
ctx := utils.LeakCheckContext(t)
17531831

go/vt/vttablet/sandboxconn/sandboxconn.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,11 @@ type SandboxConn struct {
110110
MessageIDs []*querypb.Value
111111

112112
// vstream expectations.
113-
StartPos string
114-
VStreamEvents [][]*binlogdatapb.VEvent
115-
VStreamErrors []error
116-
VStreamCh chan *binlogdatapb.VEvent
113+
StartPos string
114+
VStreamEvents [][]*binlogdatapb.VEvent
115+
VStreamErrors []error
116+
VStreamCh chan *binlogdatapb.VEvent
117+
VStreamEventDelay time.Duration // Any sleep that should be introduced before each event is streamed
117118

118119
// transaction id generator
119120
TransactionID atomic.Int64
@@ -625,6 +626,9 @@ func (sbc *SandboxConn) VStream(ctx context.Context, request *binlogdatapb.VStre
625626
if ev == nil {
626627
return err
627628
}
629+
if sbc.VStreamEventDelay > 0 {
630+
time.Sleep(sbc.VStreamEventDelay)
631+
}
628632
if err := send(ev); err != nil {
629633
return err
630634
}

go/vt/vttablet/tabletserver/throttle/client.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,9 @@ func (c *Client) ThrottleCheckOK(ctx context.Context, overrideAppName throttlera
9494
return emptyCheckResult, true
9595
}
9696
if c.throttler == nil {
97-
// no throttler
98-
return emptyCheckResult, true
97+
// No throttler, so we only throttle if we're in a test and have set the
98+
// always throttle app name.
99+
return emptyCheckResult, overrideAppName != throttlerapp.TestingAlwaysThrottledName
99100
}
100101
checkApp := c.appName
101102
if overrideAppName != "" {

go/vt/vttablet/tabletserver/vstreamer/main_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,13 @@ func expectLog(ctx context.Context, t *testing.T, input any, ch <-chan []*binlog
287287
}
288288
}
289289

290+
func startFullyThrottledStream(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, position string, tablePKs []*binlogdatapb.TableLastPK) (*sync.WaitGroup, <-chan []*binlogdatapb.VEvent) {
291+
return startStreamWithAllOrNothingThrottlingOption(ctx, t, filter, position, tablePKs, true)
292+
}
290293
func startStream(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, position string, tablePKs []*binlogdatapb.TableLastPK) (*sync.WaitGroup, <-chan []*binlogdatapb.VEvent) {
294+
return startStreamWithAllOrNothingThrottlingOption(ctx, t, filter, position, tablePKs, false)
295+
}
296+
func startStreamWithAllOrNothingThrottlingOption(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, position string, tablePKs []*binlogdatapb.TableLastPK, alwaysThrottle bool) (*sync.WaitGroup, <-chan []*binlogdatapb.VEvent) {
291297
switch position {
292298
case "":
293299
position = primaryPosition(t)
@@ -302,14 +308,14 @@ func startStream(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter,
302308
go func() {
303309
defer close(ch)
304310
defer wg.Done()
305-
if vstream(ctx, t, position, tablePKs, filter, ch) != nil {
311+
if vstream(ctx, t, position, tablePKs, filter, ch, alwaysThrottle) != nil {
306312
t.Log("vstream returned error")
307313
}
308314
}()
309315
return &wg, ch
310316
}
311317

312-
func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, ch chan []*binlogdatapb.VEvent) error {
318+
func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, ch chan []*binlogdatapb.VEvent, fullyThrottle bool) error {
313319
if filter == nil {
314320
filter = &binlogdatapb.Filter{
315321
Rules: []*binlogdatapb.Rule{{
@@ -330,7 +336,12 @@ func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*binlogda
330336
utils.SetFlagVariantsForTests(options.ConfigOverrides, "vstream-dynamic-packet-size", dynamicPacketSize)
331337
utils.SetFlagVariantsForTests(options.ConfigOverrides, "vstream-packet-size", packetSize)
332338

333-
return engine.Stream(ctx, pos, tablePKs, filter, throttlerapp.VStreamerName, func(evs []*binlogdatapb.VEvent) error {
339+
appName := throttlerapp.VStreamerName
340+
if fullyThrottle {
341+
appName = throttlerapp.TestingAlwaysThrottledName
342+
}
343+
344+
return engine.Stream(ctx, pos, tablePKs, filter, appName, func(evs []*binlogdatapb.VEvent) error {
334345
timer := time.NewTimer(2 * time.Second)
335346
defer timer.Stop()
336347

go/vt/vttablet/tabletserver/vstreamer/vstreamer.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"fmt"
2424
"io"
2525
"strings"
26+
"sync/atomic"
2627
"time"
2728

2829
"google.golang.org/protobuf/encoding/prototext"
@@ -51,17 +52,24 @@ import (
5152
)
5253

5354
const (
54-
trxHistoryLenQuery = `select count as history_len from information_schema.INNODB_METRICS where name = 'trx_rseg_history_len'`
55-
replicaLagQuery = `show replica status`
56-
legacyLagQuery = `show slave status`
57-
hostQuery = `select @@hostname as hostname, @@port as port`
55+
trxHistoryLenQuery = `select count as history_len from information_schema.INNODB_METRICS where name = 'trx_rseg_history_len'`
56+
replicaLagQuery = `show replica status`
57+
legacyLagQuery = `show slave status`
58+
hostQuery = `select @@hostname as hostname, @@port as port`
59+
fullyThrottledMetricLabel = "FullyThrottledTimeout"
5860
)
5961

6062
// HeartbeatTime is set to slightly below 1s, compared to idleTimeout
6163
// set by VPlayer at slightly above 1s. This minimizes conflicts
6264
// between the two timeouts.
6365
var HeartbeatTime = 900 * time.Millisecond
6466

67+
// How long we can be fully throttled before returning an error.
68+
// If we hit this then we can surface a metric for operators and we can run the tablet picker again
69+
// to try and pick another tablet which is perhaps less burdened. Running the tablet picker also gives
70+
// us a natural backoff period.
71+
var fullyThrottledTimeout = 10 * time.Minute
72+
6573
// vstreamer is for serving a single vreplication stream on the source side.
6674
type vstreamer struct {
6775
ctx context.Context
@@ -330,7 +338,10 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
330338
if vs.filter != nil && vs.filter.WorkflowName != "" {
331339
wfNameLog = " in workflow " + vs.filter.WorkflowName
332340
}
341+
throttlerErrs := make(chan error, 1) // How we share the error when we've been fully throttled too long
342+
defer close(throttlerErrs)
333343
throttleEvents := func(throttledEvents chan mysql.BinlogEvent) {
344+
throttledTime := atomic.Int64{}
334345
for {
335346
// Check throttler.
336347
if checkResult, ok := vs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vs.throttlerApp); !ok {
@@ -341,9 +352,17 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
341352
default:
342353
// Do nothing special.
343354
}
355+
curtime := time.Now().Unix()
356+
if !throttledTime.CompareAndSwap(0, curtime) {
357+
if curtime-throttledTime.Load() > int64(fullyThrottledTimeout.Seconds()) {
358+
throttlerErrs <- vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vstreamer has been fully throttled for more than %v, giving up so that we can retry", fullyThrottledTimeout)
359+
return
360+
}
361+
}
344362
logger.Infof("vstreamer throttled%s: %s.", wfNameLog, checkResult.Summary())
345363
continue
346364
}
365+
throttledTime.Store(0) // We are no longer fully throttled
347366
select {
348367
case ev, ok := <-events:
349368
if ok {
@@ -414,6 +433,9 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
414433
}
415434
case err := <-errs:
416435
return err
436+
case throttlerErr := <-throttlerErrs:
437+
vs.vse.errorCounts.Add(fullyThrottledMetricLabel, 1)
438+
return throttlerErr
417439
case <-ctx.Done():
418440
return nil
419441
case <-hbTimer.C:

go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ func TestVStreamMissingFieldsInLastPK(t *testing.T) {
530530
}
531531
ctx := context.Background()
532532
ch := make(chan []*binlogdatapb.VEvent)
533-
err := vstream(ctx, t, "", tablePKs, filter, ch)
533+
err := vstream(ctx, t, "", tablePKs, filter, ch, false)
534534
require.ErrorContains(t, err, "lastpk for table t1 has no fields defined")
535535
}
536536

@@ -1483,7 +1483,7 @@ func TestDDLDropColumn(t *testing.T) {
14831483
}
14841484
}()
14851485
defer close(ch)
1486-
err := vstream(ctx, t, pos, nil, nil, ch)
1486+
err := vstream(ctx, t, pos, nil, nil, ch, false)
14871487
want := "cannot determine table columns"
14881488
if err == nil || !strings.Contains(err.Error(), want) {
14891489
t.Errorf("err: %v, must contain %s", err, want)
@@ -1962,6 +1962,39 @@ func TestHeartbeat(t *testing.T) {
19621962
cancel()
19631963
}
19641964

1965+
func TestFullyThrottledTimeout(t *testing.T) {
1966+
ctx, cancel := context.WithCancel(context.Background())
1967+
defer cancel()
1968+
origTimeout := fullyThrottledTimeout
1969+
origHeartbeatTime := HeartbeatTime
1970+
startingMetric := engine.errorCounts.Counts()[fullyThrottledMetricLabel]
1971+
defer func() {
1972+
fullyThrottledTimeout = origTimeout
1973+
HeartbeatTime = origHeartbeatTime
1974+
}()
1975+
1976+
fullyThrottledTimeout = 100 * time.Millisecond
1977+
HeartbeatTime = fullyThrottledTimeout * 15
1978+
waitTimer := time.NewTimer(HeartbeatTime)
1979+
defer waitTimer.Stop()
1980+
done := make(chan struct{})
1981+
go func() {
1982+
wg, evs := startFullyThrottledStream(ctx, t, nil, "", nil) // Fully throttled
1983+
wg.Wait()
1984+
require.Zero(t, len(evs))
1985+
close(done)
1986+
}()
1987+
1988+
select {
1989+
case <-done:
1990+
endingMetric := engine.errorCounts.Counts()[fullyThrottledMetricLabel]
1991+
require.Equal(t, startingMetric+1, endingMetric)
1992+
return
1993+
case <-waitTimer.C:
1994+
require.FailNow(t, "fully throttled stall handler did not fire as expected")
1995+
}
1996+
}
1997+
19651998
func TestNoFutureGTID(t *testing.T) {
19661999
// Execute something to make sure we have ranges in GTIDs.
19672000
execStatements(t, []string{
@@ -1990,7 +2023,7 @@ func TestNoFutureGTID(t *testing.T) {
19902023
}
19912024
}()
19922025
defer close(ch)
1993-
err = vstream(ctx, t, future, nil, nil, ch)
2026+
err = vstream(ctx, t, future, nil, nil, ch, false)
19942027
want := "GTIDSet Mismatch"
19952028
if err == nil || !strings.Contains(err.Error(), want) {
19962029
t.Errorf("err: %v, must contain %s", err, want)

0 commit comments

Comments
 (0)