@@ -10,6 +10,7 @@ import (
1010 gosql "database/sql"
1111 "strings"
1212 "testing"
13+ "time"
1314
1415 "github.com/stretchr/testify/require"
1516 "github.com/stretchr/testify/suite"
@@ -199,6 +200,30 @@ type ApplierTestSuite struct {
199200 mysqlContainer testcontainers.Container
200201}
201202
203+ func (suite * ApplierTestSuite ) getConnectionConfig (ctx context.Context ) (* mysql.ConnectionConfig , error ) {
204+ host , err := suite .mysqlContainer .ContainerIP (ctx )
205+ if err != nil {
206+ return nil , err
207+ }
208+
209+ config := mysql .NewConnectionConfig ()
210+ config .Key .Hostname = host
211+ config .Key .Port = 3306
212+ config .User = "root"
213+ config .Password = "root-password"
214+
215+ return config , nil
216+ }
217+
218+ func (suite * ApplierTestSuite ) getDb (ctx context.Context ) (* gosql.DB , error ) {
219+ host , err := suite .mysqlContainer .ContainerIP (ctx )
220+ if err != nil {
221+ return nil , err
222+ }
223+
224+ return gosql .Open ("mysql" , "root:root-password@tcp(" + host + ":3306)/test" )
225+ }
226+
202227func (suite * ApplierTestSuite ) SetupSuite () {
203228 ctx := context .Background ()
204229 req := testcontainers.ContainerRequest {
@@ -229,7 +254,7 @@ func (suite *ApplierTestSuite) SetupTest() {
229254 suite .Require ().NoError (err )
230255 suite .Require ().Equalf (0 , rc , "failed to created database: expected exit code 0, got %d" , rc )
231256
232- rc , _ , err = suite .mysqlContainer .Exec (ctx , []string {"mysql" , "-uroot" , "-proot-password" , "-e" , "CREATE TABLE test.testing (id INT, item_id INT);" })
257+ rc , _ , err = suite .mysqlContainer .Exec (ctx , []string {"mysql" , "-uroot" , "-proot-password" , "-e" , "CREATE TABLE test.testing (id INT, item_id INT, PRIMARY KEY (id) );" })
233258 suite .Require ().NoError (err )
234259 suite .Require ().Equalf (0 , rc , "failed to created table: expected exit code 0, got %d" , rc )
235260}
@@ -245,15 +270,11 @@ func (suite *ApplierTestSuite) TearDownTest() {
245270func (suite * ApplierTestSuite ) TestInitDBConnections () {
246271 ctx := context .Background ()
247272
248- host , err := suite .mysqlContainer . ContainerIP (ctx )
273+ connectionConfig , err := suite .getConnectionConfig (ctx )
249274 suite .Require ().NoError (err )
250275
251276 migrationContext := base .NewMigrationContext ()
252- migrationContext .ApplierConnectionConfig = mysql .NewConnectionConfig ()
253- migrationContext .ApplierConnectionConfig .Key .Hostname = host
254- migrationContext .ApplierConnectionConfig .Key .Port = 3306
255- migrationContext .ApplierConnectionConfig .User = "root"
256- migrationContext .ApplierConnectionConfig .Password = "root-password"
277+ migrationContext .ApplierConnectionConfig = connectionConfig
257278 migrationContext .DatabaseName = "test"
258279 migrationContext .OriginalTableName = "testing"
259280 migrationContext .SetConnectionConfig ("innodb" )
@@ -274,15 +295,11 @@ func (suite *ApplierTestSuite) TestInitDBConnections() {
274295func (suite * ApplierTestSuite ) TestApplyDMLEventQueries () {
275296 ctx := context .Background ()
276297
277- host , err := suite .mysqlContainer . ContainerIP (ctx )
298+ connectionConfig , err := suite .getConnectionConfig (ctx )
278299 suite .Require ().NoError (err )
279300
280301 migrationContext := base .NewMigrationContext ()
281- migrationContext .ApplierConnectionConfig = mysql .NewConnectionConfig ()
282- migrationContext .ApplierConnectionConfig .Key .Hostname = host
283- migrationContext .ApplierConnectionConfig .Key .Port = 3306
284- migrationContext .ApplierConnectionConfig .User = "root"
285- migrationContext .ApplierConnectionConfig .Password = "root-password"
302+ migrationContext .ApplierConnectionConfig = connectionConfig
286303 migrationContext .DatabaseName = "test"
287304 migrationContext .OriginalTableName = "testing"
288305 migrationContext .SetConnectionConfig ("innodb" )
@@ -313,7 +330,7 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
313330 suite .Require ().NoError (err )
314331
315332 // Check that the row was inserted
316- db , err := gosql . Open ( "mysql" , "root:root-password@tcp(" + host + ":3306)/test" )
333+ db , err := suite . getDb ( ctx )
317334 suite .Require ().NoError (err )
318335 defer db .Close ()
319336
@@ -340,15 +357,11 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
340357func (suite * ApplierTestSuite ) TestValidateOrDropExistingTables () {
341358 ctx := context .Background ()
342359
343- host , err := suite .mysqlContainer . ContainerIP (ctx )
360+ connectionConfig , err := suite .getConnectionConfig (ctx )
344361 suite .Require ().NoError (err )
345362
346363 migrationContext := base .NewMigrationContext ()
347- migrationContext .ApplierConnectionConfig = mysql .NewConnectionConfig ()
348- migrationContext .ApplierConnectionConfig .Key .Hostname = host
349- migrationContext .ApplierConnectionConfig .Key .Port = 3306
350- migrationContext .ApplierConnectionConfig .User = "root"
351- migrationContext .ApplierConnectionConfig .Password = "root-password"
364+ migrationContext .ApplierConnectionConfig = connectionConfig
352365 migrationContext .DatabaseName = "test"
353366 migrationContext .OriginalTableName = "testing"
354367 migrationContext .SetConnectionConfig ("innodb" )
@@ -367,6 +380,139 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() {
367380 suite .Require ().NoError (err )
368381}
369382
383+ func (suite * ApplierTestSuite ) TestApplyIterationInsertQuery () {
384+ ctx := context .Background ()
385+
386+ connectionConfig , err := suite .getConnectionConfig (ctx )
387+ suite .Require ().NoError (err )
388+
389+ migrationContext := base .NewMigrationContext ()
390+ migrationContext .ApplierConnectionConfig = connectionConfig
391+ migrationContext .DatabaseName = "test"
392+ migrationContext .OriginalTableName = "testing"
393+ migrationContext .ChunkSize = 10
394+ migrationContext .SetConnectionConfig ("innodb" )
395+
396+ db , err := suite .getDb (ctx )
397+ suite .Require ().NoError (err )
398+ defer db .Close ()
399+
400+ _ , err = db .Exec ("CREATE TABLE test._testing_gho (id INT, item_id INT, PRIMARY KEY (id))" )
401+ suite .Require ().NoError (err )
402+
403+ // Insert some test values
404+ for i := 1 ; i <= 10 ; i ++ {
405+ _ , err = db .Exec ("INSERT INTO test.testing (id, item_id) VALUES (?, ?)" , i , i )
406+ suite .Require ().NoError (err )
407+ }
408+
409+ migrationContext .SharedColumns = sql .NewColumnList ([]string {"id" , "item_id" })
410+ migrationContext .MappedSharedColumns = sql .NewColumnList ([]string {"id" , "item_id" })
411+ migrationContext .UniqueKey = & sql.UniqueKey {
412+ Name : "PRIMARY" ,
413+ Columns : * sql .NewColumnList ([]string {"id" }),
414+ }
415+
416+ migrationContext .MigrationIterationRangeMinValues = sql .ToColumnValues ([]interface {}{1 })
417+ migrationContext .MigrationIterationRangeMaxValues = sql .ToColumnValues ([]interface {}{10 })
418+
419+ applier := NewApplier (migrationContext )
420+ defer applier .Teardown ()
421+
422+ err = applier .InitDBConnections ()
423+ suite .Require ().NoError (err )
424+
425+ chunkSize , rowsAffected , duration , err := applier .ApplyIterationInsertQuery ()
426+ suite .Require ().NoError (err )
427+
428+ suite .Require ().Equal (int64 (migrationContext .ChunkSize ), chunkSize )
429+ suite .Require ().Equal (int64 (10 ), rowsAffected )
430+ suite .Require ().Greater (duration , time .Duration (0 ))
431+
432+ // Check that the rows were inserted
433+ rows , err := db .Query ("SELECT * FROM test._testing_gho" )
434+ suite .Require ().NoError (err )
435+ defer rows .Close ()
436+
437+ var count , id , item_id int
438+ for rows .Next () {
439+ err = rows .Scan (& id , & item_id )
440+ suite .Require ().NoError (err )
441+ count += 1
442+ }
443+ suite .Require ().NoError (rows .Err ())
444+
445+ suite .Require ().Equal (10 , count )
446+ }
447+
448+ func (suite * ApplierTestSuite ) TestApplyIterationInsertQueryFailsFastWhenSelectingLockedRows () {
449+ ctx := context .Background ()
450+
451+ connectionConfig , err := suite .getConnectionConfig (ctx )
452+ suite .Require ().NoError (err )
453+
454+ migrationContext := base .NewMigrationContext ()
455+ migrationContext .ApplierConnectionConfig = connectionConfig
456+ migrationContext .DatabaseName = "test"
457+ migrationContext .OriginalTableName = "testing"
458+ migrationContext .ChunkSize = 10
459+ migrationContext .SetConnectionConfig ("innodb" )
460+
461+ db , err := suite .getDb (ctx )
462+ suite .Require ().NoError (err )
463+ defer db .Close ()
464+
465+ _ , err = db .Exec ("CREATE TABLE test._testing_gho (id INT, item_id INT, PRIMARY KEY (id))" )
466+ suite .Require ().NoError (err )
467+
468+ // Insert some test values
469+ for i := 1 ; i <= 10 ; i ++ {
470+ _ , err = db .Exec ("INSERT INTO test.testing (id, item_id) VALUES (?, ?)" , i , i )
471+ suite .Require ().NoError (err )
472+ }
473+
474+ migrationContext .SharedColumns = sql .NewColumnList ([]string {"id" , "item_id" })
475+ migrationContext .MappedSharedColumns = sql .NewColumnList ([]string {"id" , "item_id" })
476+ migrationContext .UniqueKey = & sql.UniqueKey {
477+ Name : "PRIMARY" ,
478+ Columns : * sql .NewColumnList ([]string {"id" }),
479+ }
480+
481+ migrationContext .MigrationIterationRangeMinValues = sql .ToColumnValues ([]interface {}{1 })
482+ migrationContext .MigrationIterationRangeMaxValues = sql .ToColumnValues ([]interface {}{10 })
483+
484+ applier := NewApplier (migrationContext )
485+ defer applier .Teardown ()
486+
487+ err = applier .InitDBConnections ()
488+ suite .Require ().NoError (err )
489+
490+ // Lock one of the rows
491+ tx , err := db .Begin ()
492+ suite .Require ().NoError (err )
493+ defer func () {
494+ suite .Require ().NoError (tx .Rollback ())
495+ }()
496+
497+ _ , err = tx .Exec ("SELECT * FROM test.testing WHERE id = 5 FOR UPDATE" )
498+ suite .Require ().NoError (err )
499+
500+ chunkSize , rowsAffected , duration , err := applier .ApplyIterationInsertQuery ()
501+ suite .Require ().Error (err )
502+ suite .Require ().EqualError (err , "Error 3572 (HY000): Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set." )
503+
504+ suite .Require ().Equal (int64 (migrationContext .ChunkSize ), chunkSize )
505+ suite .Require ().Equal (int64 (0 ), rowsAffected )
506+ suite .Require ().Equal (time .Duration (0 ), duration )
507+
508+ // Check that the no rows were inserted
509+ var count int
510+ err = db .QueryRow ("SELECT COUNT(*) FROM test._testing_gho" ).Scan (& count )
511+ suite .Require ().NoError (err )
512+
513+ suite .Require ().Equal (0 , count )
514+ }
515+
370516func TestApplier (t * testing.T ) {
371517 suite .Run (t , new (ApplierTestSuite ))
372518}
0 commit comments