Skip to content

Commit 5203ed4

Browse files
tanjinxmaksimovtimvaillancourt
authored
Fix bug where query consolidator returns empty result without error when the waiter cap exceeded (#18782)
Signed-off-by: Tanjin Xu <[email protected]> Signed-off-by: Tim Vaillancourt <[email protected]> Co-authored-by: Stas Maksimov <[email protected]> Co-authored-by: Tim Vaillancourt <[email protected]>
1 parent 01a9b3d commit 5203ed4

File tree

3 files changed

+96
-8
lines changed

3 files changed

+96
-8
lines changed

go/sync2/fake_consolidator.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,12 @@ type FakePendingResult struct {
5151
BroadcastCalls int
5252
// WaitCalls can be used to inspect Wait calls.
5353
WaitCalls int
54-
err error
55-
result *sqltypes.Result
54+
// AddWaiterCounterCalls can be used to inspect AddWaiterCounter calls.
55+
AddWaiterCounterCalls []int64
56+
// WaiterCount simulates the current waiter count
57+
WaiterCount int64
58+
err error
59+
result *sqltypes.Result
5660
}
5761

5862
var (
@@ -113,7 +117,9 @@ func (fr *FakePendingResult) Wait() {
113117
fr.WaitCalls++
114118
}
115119

116-
// AddWaiterCounter is currently a no-op.
117-
func (fr *FakePendingResult) AddWaiterCounter(int64) *int64 {
118-
return new(int64)
120+
// AddWaiterCounter records the call and simulates waiter count changes.
121+
func (fr *FakePendingResult) AddWaiterCounter(delta int64) *int64 {
122+
fr.AddWaiterCounterCalls = append(fr.AddWaiterCounterCalls, delta)
123+
fr.WaiterCount += delta
124+
return &fr.WaiterCount
119125
}

go/vt/vttablet/tabletserver/query_executor.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,7 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
744744
// Check tablet type.
745745
if qre.shouldConsolidate() {
746746
q, original := qre.tsv.qe.consolidator.Create(sqlWithoutComments)
747+
waiterCapExceeded := false
747748
if original {
748749
defer q.Broadcast()
749750
conn, err := qre.getConn()
@@ -763,13 +764,21 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
763764
startTime := time.Now()
764765
q.Wait()
765766
qre.tsv.stats.WaitTimings.Record("Consolidations", startTime)
767+
} else {
768+
// Waiter cap exceeded, fall back to independent query execution
769+
waiterCapExceeded = true
766770
}
767771
q.AddWaiterCounter(-1)
768772
}
769-
if q.Err() != nil {
770-
return nil, q.Err()
773+
774+
// Return consolidation results unless waiter cap was exceeded
775+
if !waiterCapExceeded {
776+
if q.Err() != nil {
777+
return nil, q.Err()
778+
}
779+
return q.Result(), nil
771780
}
772-
return q.Result(), nil
781+
// If waiter cap exceeded, fall through to independent execution
773782
}
774783
conn, err := qre.getConn()
775784
if err != nil {

go/vt/vttablet/tabletserver/query_executor_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1466,6 +1466,79 @@ func TestQueryExecutorShouldConsolidate(t *testing.T) {
14661466
}
14671467
}
14681468

1469+
func TestQueryExecutorConsolidatorWaiterCapFallback(t *testing.T) {
1470+
// Test that when the consolidator waiter cap is reached, queries fall back
1471+
// to independent execution instead of returning empty results.
1472+
1473+
db := setUpQueryExecutorTest(t)
1474+
defer db.Close()
1475+
1476+
ctx := context.Background()
1477+
tsv := newTestTabletServer(ctx, enableConsolidator, db)
1478+
defer tsv.StopService()
1479+
1480+
// Set a waiter cap of 1
1481+
tsv.config.ConsolidatorQueryWaiterCap = 1
1482+
1483+
fakeConsolidator := sync2.NewFakeConsolidator()
1484+
tsv.qe.consolidator = fakeConsolidator
1485+
1486+
input := "select * from t limit 10001"
1487+
result := &sqltypes.Result{
1488+
Fields: getTestTableFields(),
1489+
Rows: [][]sqltypes.Value{{
1490+
sqltypes.NewInt32(1), // pk
1491+
sqltypes.NewInt32(100), // name
1492+
sqltypes.NewInt32(200), // addr
1493+
}},
1494+
}
1495+
1496+
// Set up consolidator to simulate an identical query already running (Created=false)
1497+
fakePendingResult := &sync2.FakePendingResult{}
1498+
fakePendingResult.SetResult(result)
1499+
// Start with waiter count above the cap (2 > 1), so the condition fails
1500+
fakePendingResult.WaiterCount = 2
1501+
1502+
fakeConsolidator.CreateReturn = &sync2.FakeConsolidatorCreateReturn{
1503+
Created: false, // Simulate identical query already running
1504+
PendingResult: fakePendingResult,
1505+
}
1506+
1507+
// Set up database query/response for fallback execution
1508+
db.AddQuery(input, result)
1509+
1510+
qre := newTestQueryExecutor(context.Background(), tsv, input, 0)
1511+
qre.options = &querypb.ExecuteOptions{Consolidator: querypb.ExecuteOptions_CONSOLIDATOR_ENABLED}
1512+
1513+
// Execute query
1514+
actualResult, err := qre.Execute()
1515+
require.NoError(t, err)
1516+
require.NotNil(t, actualResult)
1517+
1518+
// Verify we got the correct result (not empty)
1519+
require.Equal(t, result.Fields, actualResult.Fields)
1520+
require.Equal(t, result.Rows, actualResult.Rows)
1521+
1522+
// Verify consolidator was attempted
1523+
require.Len(t, fakeConsolidator.CreateCalls, 1)
1524+
1525+
// Verify we did NOT wait (because waiter cap was exceeded)
1526+
require.Equal(t, 0, fakePendingResult.WaitCalls)
1527+
1528+
// Verify we did NOT broadcast (because we're not the original)
1529+
require.Equal(t, 0, fakePendingResult.BroadcastCalls)
1530+
1531+
// Verify AddWaiterCounter was called: once with 0 (to check count), once with -1 (cleanup)
1532+
require.Len(t, fakePendingResult.AddWaiterCounterCalls, 2)
1533+
require.Equal(t, int64(0), fakePendingResult.AddWaiterCounterCalls[0]) // Check current count
1534+
require.Equal(t, int64(-1), fakePendingResult.AddWaiterCounterCalls[1]) // Decrement
1535+
1536+
// Verify fallback executed the query independently
1537+
require.Equal(t, 1, db.GetQueryCalledNum(input))
1538+
1539+
db.VerifyAllExecutedOrFail()
1540+
}
1541+
14691542
func TestGetConnectionLogStats(t *testing.T) {
14701543
db := setUpQueryExecutorTest(t)
14711544
defer db.Close()

0 commit comments

Comments
 (0)