@@ -10,6 +10,7 @@ import (
1010 gosql "database/sql"
1111 "strings"
1212 "testing"
13+ "time"
1314
1415 "github.com/stretchr/testify/require"
1516 "github.com/stretchr/testify/suite"
@@ -199,6 +200,30 @@ type ApplierTestSuite struct {
199200 mysqlContainer testcontainers.Container
200201}
201202
203+ func (suite * ApplierTestSuite ) getConnectionConfig (ctx context.Context ) (* mysql.ConnectionConfig , error ) {
204+ host , err := suite .mysqlContainer .ContainerIP (ctx )
205+ if err != nil {
206+ return nil , err
207+ }
208+
209+ config := mysql .NewConnectionConfig ()
210+ config .Key .Hostname = host
211+ config .Key .Port = 3306
212+ config .User = "root"
213+ config .Password = "root-password"
214+
215+ return config , nil
216+ }
217+
218+ func (suite * ApplierTestSuite ) getDb (ctx context.Context ) (* gosql.DB , error ) {
219+ host , err := suite .mysqlContainer .ContainerIP (ctx )
220+ if err != nil {
221+ return nil , err
222+ }
223+
224+ return gosql .Open ("mysql" , "root:root-password@tcp(" + host + ":3306)/test" )
225+ }
226+
202227func (suite * ApplierTestSuite ) SetupSuite () {
203228 ctx := context .Background ()
204229 req := testcontainers.ContainerRequest {
@@ -229,7 +254,7 @@ func (suite *ApplierTestSuite) SetupTest() {
229254 suite .Require ().NoError (err )
230255 suite .Require ().Equalf (0 , rc , "failed to created database: expected exit code 0, got %d" , rc )
231256
232- rc , _ , err = suite .mysqlContainer .Exec (ctx , []string {"mysql" , "-uroot" , "-proot-password" , "-e" , "CREATE TABLE test.testing (id INT, item_id INT);" })
257+ rc , _ , err = suite .mysqlContainer .Exec (ctx , []string {"mysql" , "-uroot" , "-proot-password" , "-e" , "CREATE TABLE test.testing (id INT, item_id INT, PRIMARY KEY (id) );" })
233258 suite .Require ().NoError (err )
234259 suite .Require ().Equalf (0 , rc , "failed to created table: expected exit code 0, got %d" , rc )
235260}
@@ -245,15 +270,11 @@ func (suite *ApplierTestSuite) TearDownTest() {
245270func (suite * ApplierTestSuite ) TestInitDBConnections () {
246271 ctx := context .Background ()
247272
248- host , err := suite .mysqlContainer . ContainerIP (ctx )
273+ connectionConfig , err := suite .getConnectionConfig (ctx )
249274 suite .Require ().NoError (err )
250275
251276 migrationContext := base .NewMigrationContext ()
252- migrationContext .ApplierConnectionConfig = mysql .NewConnectionConfig ()
253- migrationContext .ApplierConnectionConfig .Key .Hostname = host
254- migrationContext .ApplierConnectionConfig .Key .Port = 3306
255- migrationContext .ApplierConnectionConfig .User = "root"
256- migrationContext .ApplierConnectionConfig .Password = "root-password"
277+ migrationContext .ApplierConnectionConfig = connectionConfig
257278 migrationContext .DatabaseName = "test"
258279 migrationContext .OriginalTableName = "testing"
259280 migrationContext .SetConnectionConfig ("innodb" )
@@ -274,15 +295,11 @@ func (suite *ApplierTestSuite) TestInitDBConnections() {
274295func (suite * ApplierTestSuite ) TestApplyDMLEventQueries () {
275296 ctx := context .Background ()
276297
277- host , err := suite .mysqlContainer . ContainerIP (ctx )
298+ connectionConfig , err := suite .getConnectionConfig (ctx )
278299 suite .Require ().NoError (err )
279300
280301 migrationContext := base .NewMigrationContext ()
281- migrationContext .ApplierConnectionConfig = mysql .NewConnectionConfig ()
282- migrationContext .ApplierConnectionConfig .Key .Hostname = host
283- migrationContext .ApplierConnectionConfig .Key .Port = 3306
284- migrationContext .ApplierConnectionConfig .User = "root"
285- migrationContext .ApplierConnectionConfig .Password = "root-password"
302+ migrationContext .ApplierConnectionConfig = connectionConfig
286303 migrationContext .DatabaseName = "test"
287304 migrationContext .OriginalTableName = "testing"
288305 migrationContext .SetConnectionConfig ("innodb" )
@@ -313,7 +330,7 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
313330 suite .Require ().NoError (err )
314331
315332 // Check that the row was inserted
316- db , err := gosql . Open ( "mysql" , "root:root-password@tcp(" + host + ":3306)/test" )
333+ db , err := suite . getDb ( ctx )
317334 suite .Require ().NoError (err )
318335 defer db .Close ()
319336
@@ -340,15 +357,11 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
340357func (suite * ApplierTestSuite ) TestValidateOrDropExistingTables () {
341358 ctx := context .Background ()
342359
343- host , err := suite .mysqlContainer . ContainerIP (ctx )
360+ connectionConfig , err := suite .getConnectionConfig (ctx )
344361 suite .Require ().NoError (err )
345362
346363 migrationContext := base .NewMigrationContext ()
347- migrationContext .ApplierConnectionConfig = mysql .NewConnectionConfig ()
348- migrationContext .ApplierConnectionConfig .Key .Hostname = host
349- migrationContext .ApplierConnectionConfig .Key .Port = 3306
350- migrationContext .ApplierConnectionConfig .User = "root"
351- migrationContext .ApplierConnectionConfig .Password = "root-password"
364+ migrationContext .ApplierConnectionConfig = connectionConfig
352365 migrationContext .DatabaseName = "test"
353366 migrationContext .OriginalTableName = "testing"
354367 migrationContext .SetConnectionConfig ("innodb" )
@@ -367,6 +380,140 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() {
367380 suite .Require ().NoError (err )
368381}
369382
383+ func (suite * ApplierTestSuite ) TestApplyIterationInsertQuery () {
384+ ctx := context .Background ()
385+
386+ connectionConfig , err := suite .getConnectionConfig (ctx )
387+ suite .Require ().NoError (err )
388+
389+ migrationContext := base .NewMigrationContext ()
390+ migrationContext .ApplierConnectionConfig = connectionConfig
391+ migrationContext .DatabaseName = "test"
392+ migrationContext .OriginalTableName = "testing"
393+ migrationContext .ChunkSize = 10
394+ migrationContext .SetConnectionConfig ("innodb" )
395+
396+ db , err := suite .getDb (ctx )
397+ suite .Require ().NoError (err )
398+ defer db .Close ()
399+
400+ _ , err = db .Exec ("CREATE TABLE test._testing_gho (id INT, item_id INT, PRIMARY KEY (id))" )
401+ suite .Require ().NoError (err )
402+
403+ // Insert some test values
404+ for i := 1 ; i <= 10 ; i ++ {
405+ _ , err = db .Exec ("INSERT INTO test.testing (id, item_id) VALUES (?, ?)" , i , i )
406+ suite .Require ().NoError (err )
407+ }
408+
409+ migrationContext .SharedColumns = sql .NewColumnList ([]string {"id" , "item_id" })
410+ migrationContext .MappedSharedColumns = sql .NewColumnList ([]string {"id" , "item_id" })
411+ migrationContext .UniqueKey = & sql.UniqueKey {
412+ Name : "PRIMARY" ,
413+ Columns : * sql .NewColumnList ([]string {"id" }),
414+ }
415+
416+ migrationContext .MigrationIterationRangeMinValues = sql .ToColumnValues ([]interface {}{1 })
417+ migrationContext .MigrationIterationRangeMaxValues = sql .ToColumnValues ([]interface {}{10 })
418+
419+ applier := NewApplier (migrationContext )
420+ defer applier .Teardown ()
421+
422+ err = applier .InitDBConnections ()
423+ suite .Require ().NoError (err )
424+
425+ chunkSize , rowsAffected , duration , err := applier .ApplyIterationInsertQuery ()
426+ suite .Require ().NoError (err )
427+
428+ suite .Require ().Equal (migrationContext .ChunkSize , chunkSize )
429+ suite .Require ().Equal (int64 (10 ), rowsAffected )
430+ suite .Require ().Greater (duration , time .Duration (0 ))
431+
432+ // Check that the rows were inserted
433+ rows , err := db .Query ("SELECT * FROM test._testing_gho" )
434+ suite .Require ().NoError (err )
435+ defer rows .Close ()
436+
437+ var count , id , item_id int
438+ for rows .Next () {
439+ err = rows .Scan (& id , & item_id )
440+ suite .Require ().NoError (err )
441+ count += 1
442+ }
443+ suite .Require ().NoError (rows .Err ())
444+
445+ suite .Require ().Equal (10 , count )
446+ }
447+
448+ func (suite * ApplierTestSuite ) TestApplyIterationInsertQueryFailsFastWhenSelectingLockedRows () {
449+ ctx := context .Background ()
450+
451+ connectionConfig , err := suite .getConnectionConfig (ctx )
452+ suite .Require ().NoError (err )
453+
454+ migrationContext := base .NewMigrationContext ()
455+ migrationContext .ApplierConnectionConfig = connectionConfig
456+ migrationContext .DatabaseName = "test"
457+ migrationContext .OriginalTableName = "testing"
458+ migrationContext .ChunkSize = 10
459+ migrationContext .TableEngine = "innodb"
460+ migrationContext .SetConnectionConfig ("innodb" )
461+
462+ db , err := suite .getDb (ctx )
463+ suite .Require ().NoError (err )
464+ defer db .Close ()
465+
466+ _ , err = db .Exec ("CREATE TABLE test._testing_gho (id INT, item_id INT, PRIMARY KEY (id))" )
467+ suite .Require ().NoError (err )
468+
469+ // Insert some test values
470+ for i := 1 ; i <= 10 ; i ++ {
471+ _ , err = db .Exec ("INSERT INTO test.testing (id, item_id) VALUES (?, ?)" , i , i )
472+ suite .Require ().NoError (err )
473+ }
474+
475+ migrationContext .SharedColumns = sql .NewColumnList ([]string {"id" , "item_id" })
476+ migrationContext .MappedSharedColumns = sql .NewColumnList ([]string {"id" , "item_id" })
477+ migrationContext .UniqueKey = & sql.UniqueKey {
478+ Name : "PRIMARY" ,
479+ Columns : * sql .NewColumnList ([]string {"id" }),
480+ }
481+
482+ migrationContext .MigrationIterationRangeMinValues = sql .ToColumnValues ([]interface {}{1 })
483+ migrationContext .MigrationIterationRangeMaxValues = sql .ToColumnValues ([]interface {}{10 })
484+
485+ applier := NewApplier (migrationContext )
486+ defer applier .Teardown ()
487+
488+ err = applier .InitDBConnections ()
489+ suite .Require ().NoError (err )
490+
491+ // Lock one of the rows
492+ tx , err := db .Begin ()
493+ suite .Require ().NoError (err )
494+ defer func () {
495+ suite .Require ().NoError (tx .Rollback ())
496+ }()
497+
498+ _ , err = tx .Exec ("SELECT * FROM test.testing WHERE id = 5 FOR UPDATE" )
499+ suite .Require ().NoError (err )
500+
501+ chunkSize , rowsAffected , duration , err := applier .ApplyIterationInsertQuery ()
502+ suite .Require ().Error (err )
503+ suite .Require ().EqualError (err , "Error 3572 (HY000): Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set." )
504+
505+ suite .Require ().Equal (migrationContext .ChunkSize , chunkSize )
506+ suite .Require ().Equal (int64 (0 ), rowsAffected )
507+ suite .Require ().Equal (time .Duration (0 ), duration )
508+
509+ // Check that the no rows were inserted
510+ var count int
511+ err = db .QueryRow ("SELECT COUNT(*) FROM test._testing_gho" ).Scan (& count )
512+ suite .Require ().NoError (err )
513+
514+ suite .Require ().Equal (0 , count )
515+ }
516+
370517func TestApplier (t * testing.T ) {
371518 suite .Run (t , new (ApplierTestSuite ))
372519}
0 commit comments