@@ -21,6 +21,7 @@ import (
2121 "strings"
2222 "testing"
2323
24+ "github.com/stretchr/testify/assert"
2425 "github.com/stretchr/testify/require"
2526 "github.com/tidwall/gjson"
2627
@@ -38,6 +39,50 @@ import (
3839 As part of a separate cleanup we will build on this framework to replace the existing one.
3940*/
4041
42+ var (
43+ seqVSchema = `{
44+ "sharded": false,
45+ "tables": {
46+ "customer_seq": {
47+ "type": "sequence"
48+ }
49+ }
50+ }`
51+ seqSchema = `create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';`
52+ commerceSchema = `create table customer(cid int, name varchar(128), ts timestamp(3) not null default current_timestamp(3), primary key(cid));`
53+ commerceVSchema = `
54+ {
55+ "tables": {
56+ "customer": {}
57+ }
58+ }
59+ `
60+ customerSequenceVSchema = `
61+ {
62+ "sharded": true,
63+ "vindexes": {
64+ "reverse_bits": {
65+ "type": "reverse_bits"
66+ }
67+ },
68+ "tables": {
69+ "customer": {
70+ "column_vindexes": [
71+ {
72+ "column": "cid",
73+ "name": "reverse_bits"
74+ }
75+ ],
76+ "auto_increment": {
77+ "column": "cid",
78+ "sequence": "customer_seq"
79+ }
80+ }
81+ }
82+ }
83+ `
84+ )
85+
4186type keyspace struct {
4287 name string
4388 vschema string
@@ -74,50 +119,6 @@ type vrepTestCase struct {
74119}
75120
76121func initPartialMoveTablesComplexTestCase (t * testing.T ) * vrepTestCase {
77- const (
78- seqVSchema = `{
79- "sharded": false,
80- "tables": {
81- "customer_seq": {
82- "type": "sequence"
83- }
84- }
85- }`
86- seqSchema = `create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';`
87- commerceSchema = `create table customer(cid int, name varchar(128), ts timestamp(3) not null default current_timestamp(3), primary key(cid));`
88- commerceVSchema = `
89- {
90- "tables": {
91- "customer": {}
92- }
93- }
94- `
95- customerSchema = ""
96- customerVSchema = `
97- {
98- "sharded": true,
99- "vindexes": {
100- "reverse_bits": {
101- "type": "reverse_bits"
102- }
103- },
104- "tables": {
105- "customer": {
106- "column_vindexes": [
107- {
108- "column": "cid",
109- "name": "reverse_bits"
110- }
111- ],
112- "auto_increment": {
113- "column": "cid",
114- "sequence": "customer_seq"
115- }
116- }
117- }
118- }
119- `
120- )
121122 tc := & vrepTestCase {
122123 t : t ,
123124 testName : t .Name (),
@@ -134,14 +135,14 @@ func initPartialMoveTablesComplexTestCase(t *testing.T) *vrepTestCase {
134135 }
135136 tc .keyspaces ["customer" ] = & keyspace {
136137 name : "customer" ,
137- vschema : customerVSchema ,
138- schema : customerSchema ,
138+ vschema : customerSequenceVSchema ,
139+ schema : "" ,
139140 baseID : 200 ,
140141 shards : []string {"-80" , "80-" },
141142 }
142143 tc .keyspaces ["customer2" ] = & keyspace {
143144 name : "customer2" ,
144- vschema : customerVSchema ,
145+ vschema : customerSequenceVSchema ,
145146 schema : "" ,
146147 baseID : 1200 ,
147148 shards : []string {"-80" , "80-" },
@@ -165,6 +166,40 @@ func initPartialMoveTablesComplexTestCase(t *testing.T) *vrepTestCase {
165166 return tc
166167}
167168
169+ func initSequenceResetTestCase (t * testing.T ) * vrepTestCase {
170+ tc := & vrepTestCase {
171+ t : t ,
172+ testName : t .Name (),
173+ keyspaces : make (map [string ]* keyspace ),
174+ defaultCellName : "zone1" ,
175+ workflows : make (map [string ]* workflow ),
176+ }
177+ tc .keyspaces ["commerce" ] = & keyspace {
178+ name : "commerce" ,
179+ vschema : commerceVSchema ,
180+ schema : commerceSchema ,
181+ baseID : 100 ,
182+ shards : []string {"0" },
183+ }
184+ tc .keyspaces ["customer" ] = & keyspace {
185+ name : "customer" ,
186+ vschema : customerSequenceVSchema ,
187+ schema : "" ,
188+ baseID : 200 ,
189+ shards : []string {"-80" , "80-" },
190+ }
191+ tc .keyspaces ["seqSrc" ] = & keyspace {
192+ name : "seqSrc" ,
193+ vschema : seqVSchema ,
194+ schema : seqSchema ,
195+ baseID : 400 ,
196+ shards : []string {"0" },
197+ }
198+ tc .setupCluster ()
199+ tc .initData ()
200+ return tc
201+ }
202+
168203func (tc * vrepTestCase ) teardown () {
169204 tc .vtgateConn .Close ()
170205 vc .TearDown ()
@@ -268,6 +303,93 @@ func (wf *workflow) complete() {
268303 require .NoError (wf .tc .t , tstWorkflowExec (wf .tc .t , wf .tc .defaultCellName , wf .name , wf .fromKeyspace , wf .toKeyspace , "" , workflowActionComplete , "" , "" , "" , defaultWorkflowExecOptions ))
269304}
270305
306+ // TestSequenceResetOnSwitchTraffic tests that in-memory sequence info is
307+ // reset when switching traffic back and forth between keyspaces during
308+ // MoveTables workflow. This catches a bug where cached sequence values would
309+ // persist after traffic switches, causing sequence generation to produce
310+ // duplicate values in target keyspace.
311+ func TestSequenceResetOnSwitchTraffic (t * testing.T ) {
312+ origExtraVTGateArgs := extraVTGateArgs
313+ extraVTGateArgs = append (extraVTGateArgs , []string {
314+ "--enable-partial-keyspace-migration" ,
315+ "--schema_change_signal=false" ,
316+ }... )
317+ defer func () {
318+ extraVTGateArgs = origExtraVTGateArgs
319+ }()
320+
321+ tc := initSequenceResetTestCase (t )
322+ defer tc .teardown ()
323+
324+ currentCustomerCount = getCustomerCount (t , "" )
325+ newCustomerCount = 4
326+ t .Run ("Verify sequence reset during traffic switching" , func (t * testing.T ) {
327+ tc .setupKeyspaces ([]string {"customer" })
328+ wf := tc .newWorkflow ("MoveTables" , "customer" , "commerce" , "customer" , & workflowOptions {
329+ tables : []string {"customer" },
330+ })
331+ wf .create ()
332+
333+ vtgateConn , closeConn := getVTGateConn ()
334+ defer closeConn ()
335+
336+ getSequenceNextID := func () int64 {
337+ qr := execVtgateQuery (t , vtgateConn , "" , "SELECT next_id FROM seqSrc.customer_seq WHERE id = 0" )
338+ nextID , _ := qr .Rows [0 ][0 ].ToInt64 ()
339+ return nextID
340+ }
341+
342+ initialSeqValue := getSequenceNextID ()
343+ t .Logf ("Initial sequence next_id: %d" , initialSeqValue )
344+
345+ wf .switchTraffic ()
346+
347+ insertCustomers (t )
348+
349+ afterFirstSwitchSeqValue := getSequenceNextID ()
350+ t .Logf ("After first switch sequence next_id: %d" , afterFirstSwitchSeqValue )
351+ require .Greater (t , afterFirstSwitchSeqValue , initialSeqValue , "Sequence should increment after inserting customers" )
352+
353+ wf .reverseTraffic ()
354+
355+ afterReverseSeqValue := getSequenceNextID ()
356+ t .Logf ("After reverse switch sequence next_id: %d" , afterReverseSeqValue )
357+
358+ // Insert some random values when all writes are reversed back to
359+ // source keyspace. We are inserting here rows with IDs 1004, 1005,
360+ // 1006 (since the cache value was 1000) which would be the next
361+ // in-memory sequence IDs for inserting any new rows in `customer`
362+ // table if the sequence info isn't reset. This will result in
363+ // duplicate primary key value error in the next insert.
364+ //
365+ // Hence, this way we verify that even if there are any new
366+ // values inserted after the traffic has been reversed, the in-memory
367+ // sequence info is reset, so that on switching back the traffic to
368+ // the target keyspace the tablet refetches the next_id from the
369+ // sequence table for generating the next insert ID.
370+ _ , err := tc .vtgateConn .ExecuteFetch ("insert into customer(cid, name) values(1004, 'customer8'), (1005, 'customer9'),(1006, 'customer10')" , 1000 , false )
371+ require .NoError (t , err )
372+ _ , err = tc .vtgateConn .ExecuteFetch ("insert into customer(cid, name) values(2004, 'customer11'), (2005, 'customer12'),(2006, 'customer13')" , 1000 , false )
373+ require .NoError (t , err )
374+
375+ wf .switchTraffic ()
376+
377+ afterSecondSwitchSeqValue := getSequenceNextID ()
378+ // Since the highest ID before switching traffic was 2026, which is
379+ // greater than 2000 (the expected next_id from sequence table before switch.)
380+ assert .Equal (t , int64 (2007 ), afterSecondSwitchSeqValue )
381+
382+ currentCustomerCount = getCustomerCount (t , "after second switch" )
383+ newCustomerCount = 4
384+ insertCustomers (t )
385+
386+ finalSeqValue := getSequenceNextID ()
387+ assert .Equal (t , int64 (3007 ), finalSeqValue , "Since the cache is set to 1000, next_id is expected to be incremented to 3007" )
388+
389+ wf .complete ()
390+ })
391+ }
392+
271393// TestPartialMoveTablesWithSequences enhances TestPartialMoveTables by adding an unsharded keyspace which has a
272394// sequence. This tests that the sequence is migrated correctly and that we can reverse traffic back to the source
273395func TestPartialMoveTablesWithSequences (t * testing.T ) {
@@ -324,7 +446,6 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {
324446 targetKs := "customer2"
325447 shard := "80-"
326448 var wf80Dash , wfDash80 * workflow
327- currentCustomerCount = getCustomerCount (t , "before customer2.80-" )
328449 vtgateConn , closeConn := getVTGateConn ()
329450 t .Run ("Start MoveTables on customer2.80-" , func (t * testing.T ) {
330451 // Now setup the customer2 keyspace so we can do a partial move tables for one of the two shards: 80-.
@@ -336,14 +457,11 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {
336457 })
337458 wf80Dash .create ()
338459
339- currentCustomerCount = getCustomerCount (t , "after customer2.80-" )
340460 waitForRowCount (t , vtgateConn , "customer2:80-" , "customer" , 2 ) // customer2: 80-
341461 waitForRowCount (t , vtgateConn , "customer" , "customer" , 3 ) // customer: all shards
342462 waitForRowCount (t , vtgateConn , "customer2" , "customer" , 3 ) // customer2: all shards
343463 })
344464
345- currentCustomerCount = getCustomerCount (t , "after customer2.80-/2" )
346-
347465 // This query uses an ID that should always get routed to shard 80-
348466 shard80DashRoutedQuery := "select name from customer where cid = 1 and noexistcol = 'foo'"
349467 // This query uses an ID that should always get routed to shard -80
@@ -382,14 +500,12 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {
382500
383501 _ , err = vtgateConn .ExecuteFetch ("use `customer`" , 0 , false ) // switch vtgate default db back to customer
384502 require .NoError (t , err )
385- currentCustomerCount = getCustomerCount (t , "" )
386503
387504 // Switch all traffic for the shard
388505 wf80Dash .switchTraffic ()
389506 expectedSwitchOutput := fmt .Sprintf ("SwitchTraffic was successful for workflow %s.%s\n \n Start State: Reads Not Switched. Writes Not Switched\n Current State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n \n " ,
390507 targetKs , wfName , shard , shard )
391508 require .Equal (t , expectedSwitchOutput , lastOutput )
392- currentCustomerCount = getCustomerCount (t , "" )
393509
394510 // Confirm global routing rules -- everything should still be routed
395511 // to the source side, customer, globally.
@@ -431,7 +547,6 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {
431547 _ , err = vtgateConn .ExecuteFetch ("use `customer`" , 0 , false ) // switch vtgate default db back to customer
432548 require .NoError (t , err )
433549 })
434- currentCustomerCount = getCustomerCount (t , "" )
435550
436551 // Now move the other shard: -80
437552 t .Run ("Move shard -80 and validate routing rules" , func (t * testing.T ) {
0 commit comments