@@ -48,6 +48,7 @@ type Migrator struct {
4848 voluntaryLockAcquired chan bool
4949 panicAbort chan error
5050
51+ allEventsUpToLockProcessedFlag int64
5152 // copyRowsQueue should not be buffered; if buffered some non-damaging but
5253 // excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
5354 copyRowsQueue chan tableWriteFunc
@@ -65,6 +66,8 @@ func NewMigrator() *Migrator {
6566 voluntaryLockAcquired : make (chan bool , 1 ),
6667 panicAbort : make (chan error ),
6768
69+ allEventsUpToLockProcessedFlag : 0 ,
70+
6871 copyRowsQueue : make (chan tableWriteFunc ),
6972 applyEventsQueue : make (chan tableWriteFunc , applyEventsQueueBuffer ),
7073 handledChangelogStates : make (map [string ]bool ),
@@ -106,7 +109,7 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
106109 if time .Duration (lag ) > time .Duration (this .migrationContext .MaxLagMillisecondsThrottleThreshold )* time .Millisecond {
107110 return true , fmt .Sprintf ("lag=%fs" , time .Duration (lag ).Seconds ())
108111 }
109- if this .migrationContext .TestOnReplica {
112+ if this .migrationContext .TestOnReplica && ( atomic . LoadInt64 ( & this . allEventsUpToLockProcessedFlag ) == 0 ) {
110113 replicationLag , err := mysql .GetMaxReplicationLag (this .migrationContext .InspectorConnectionConfig , this .migrationContext .ThrottleControlReplicaKeys , this .migrationContext .ReplictionLagQuery )
111114 if err != nil {
112115 return true , err .Error ()
@@ -198,6 +201,16 @@ func (this *Migrator) executeAndThrottleOnError(operation func() error) (err err
198201 return nil
199202}
200203
204+ // consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then
205+ // consumers and drops any further incoming events that may be left hanging.
206+ func (this * Migrator ) consumeRowCopyComplete () {
207+ <- this .rowCopyComplete
208+ go func () {
209+ for <- this .rowCopyComplete {
210+ }
211+ }()
212+ }
213+
201214func (this * Migrator ) canStopStreaming () bool {
202215 return false
203216}
@@ -215,33 +228,18 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
215228 }
216229 case AllEventsUpToLockProcessed :
217230 {
218- this .allEventsUpToLockProcessed <- true
219- }
220- default :
221- {
222- return fmt .Errorf ("Unknown changelog state: %+v" , changelogState )
223- }
224- }
225- log .Debugf ("Received state %+v" , changelogState )
226- return nil
227- }
228-
229- func (this * Migrator ) onChangelogState (stateValue string ) (err error ) {
230- log .Fatalf ("I shouldn't be here" )
231- if this .handledChangelogStates [stateValue ] {
232- return nil
233- }
234- this .handledChangelogStates [stateValue ] = true
235-
236- changelogState := ChangelogState (stateValue )
237- switch changelogState {
238- case TablesInPlace :
239- {
240- this .tablesInPlace <- true
241- }
242- case AllEventsUpToLockProcessed :
243- {
244- this .allEventsUpToLockProcessed <- true
231+ applyEventFunc := func () error {
232+ this .allEventsUpToLockProcessed <- true
233+ return nil
234+ }
235+ // at this point we know all events up to lock have been read from the streamer,
236+ // because the streamer works sequentially. So those events are either already handled,
237+ // or have event functions in applyEventsQueue.
238+ // So as not to create a potential deadlock, we write this func to applyEventsQueue
239+ // asynchronously, understanding it doesn't really matter.
240+ go func () {
241+ this .applyEventsQueue <- applyEventFunc
242+ }()
245243 }
246244 default :
247245 {
@@ -295,6 +293,9 @@ func (this *Migrator) Migrate() (err error) {
295293 if err := this .inspector .InspectOriginalAndGhostTables (); err != nil {
296294 return err
297295 }
296+ if err := this .addDMLEventsListener (); err != nil {
297+ return err
298+ }
298299 go this .initiateHeartbeatListener ()
299300
300301 if err := this .applier .ReadMigrationRangeValues (); err != nil {
@@ -307,7 +308,7 @@ func (this *Migrator) Migrate() (err error) {
307308 go this .initiateStatus ()
308309
309310 log .Debugf ("Operating until row copy is complete" )
310- <- this .rowCopyComplete
311+ this .consumeRowCopyComplete ()
311312 log .Debugf ("Row copy complete" )
312313 this .printStatus ()
313314
@@ -336,18 +337,20 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
336337 if this .migrationContext .QuickAndBumpySwapTables {
337338 return this .stopWritesAndCompleteMigrationOnMasterQuickAndBumpy ()
338339 }
339- // Lock-based solution: we use low timeout and multiple attempts. But for
340- // each failed attempt, we throttle until replication lag is back to normal
341- if err := this .retryOperation (
342- func () error {
343- return this .executeAndThrottleOnError (this .stopWritesAndCompleteMigrationOnMasterViaLock )
344- }); err != nil {
345- return err
346- }
347- if err := this .dropOldTableIfRequired (); err != nil {
348- return err
349- }
350340
341+ {
342+ // Lock-based solution: we use low timeout and multiple attempts. But for
343+ // each failed attempt, we throttle until replication lag is back to normal
344+ if err := this .retryOperation (
345+ func () error {
346+ return this .executeAndThrottleOnError (this .stopWritesAndCompleteMigrationOnMasterViaLock )
347+ }); err != nil {
348+ return err
349+ }
350+ if err := this .dropOldTableIfRequired (); err != nil {
351+ return err
352+ }
353+ }
351354 return
352355}
353356
@@ -364,6 +367,21 @@ func (this *Migrator) dropOldTableIfRequired() (err error) {
364367 return nil
365368}
366369
370+ // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
371+ // make sure the queue is drained.
372+ func (this * Migrator ) waitForEventsUpToLock () (err error ) {
373+ if _ , err := this .applier .WriteChangelogState (string (AllEventsUpToLockProcessed )); err != nil {
374+ return err
375+ }
376+ log .Debugf ("Waiting for events up to lock" )
377+ <- this .allEventsUpToLockProcessed
378+ atomic .StoreInt64 (& this .allEventsUpToLockProcessedFlag , 1 )
379+ log .Debugf ("Done waiting for events up to lock" )
380+ this .printStatus ()
381+
382+ return nil
383+ }
384+
367385// stopWritesAndCompleteMigrationOnMasterQuickAndBumpy will lock down the original table, execute
368386// what's left of last DML entries, and **non-atomically** swap original->old, then new->original.
369387// There is a point in time where the "original" table does not exist and queries are non-blocked
@@ -373,11 +391,9 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err
373391 return err
374392 }
375393
376- this .applier .WriteChangelogState (string (AllEventsUpToLockProcessed ))
377- log .Debugf ("Waiting for events up to lock" )
378- <- this .allEventsUpToLockProcessed
379- log .Debugf ("Done waiting for events up to lock" )
380-
394+ if err := this .retryOperation (this .waitForEventsUpToLock ); err != nil {
395+ return err
396+ }
381397 if err := this .retryOperation (this .applier .SwapTablesQuickAndBumpy ); err != nil {
382398 return err
383399 }
@@ -438,10 +454,7 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error
438454 log .Infof ("Found RENAME to be executing" )
439455
440456 // OK, at this time we know any newly incoming DML on original table is blocked.
441- this .applier .WriteChangelogState (string (AllEventsUpToLockProcessed ))
442- log .Debugf ("Waiting for events up to lock" )
443- <- this .allEventsUpToLockProcessed
444- log .Debugf ("Done waiting for events up to lock" )
457+ this .waitForEventsUpToLock ()
445458
446459 okToReleaseLock <- true
447460 // BAM: voluntary lock is released, blocking query is released, rename is released.
@@ -466,14 +479,11 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error
466479// in sync. There is no table swap.
467480func (this * Migrator ) stopWritesAndCompleteMigrationOnReplica () (err error ) {
468481 log .Debugf ("testing on replica. Instead of LOCK tables I will STOP SLAVE" )
469- if err := this .retryOperation (this .applier .StopSlaveIOThread ); err != nil {
482+ if err := this .retryOperation (this .applier .StopSlaveNicely ); err != nil {
470483 return err
471484 }
472485
473- this .applier .WriteChangelogState (string (AllEventsUpToLockProcessed ))
474- log .Debugf ("Waiting for events up to lock" )
475- <- this .allEventsUpToLockProcessed
476- log .Debugf ("Done waiting for events up to lock" )
486+ this .waitForEventsUpToLock ()
477487
478488 log .Info ("Table duplicated with new schema. Am not touching the original table. Replication is stopped. You may now compare the two tables to gain trust into this tool's operation" )
479489 return nil
@@ -612,8 +622,18 @@ func (this *Migrator) initiateStreaming() error {
612622 return this .onChangelogStateEvent (dmlEvent )
613623 },
614624 )
615- this .eventsStreamer .AddListener (
616- true ,
625+
626+ go func () {
627+ log .Debugf ("Beginning streaming" )
628+ this .eventsStreamer .StreamEvents (func () bool { return this .canStopStreaming () })
629+ }()
630+ return nil
631+ }
632+
633+ // addDMLEventsListener
634+ func (this * Migrator ) addDMLEventsListener () error {
635+ err := this .eventsStreamer .AddListener (
636+ false ,
617637 this .migrationContext .DatabaseName ,
618638 this .migrationContext .OriginalTableName ,
619639 func (dmlEvent * binlog.BinlogDMLEvent ) error {
@@ -624,12 +644,7 @@ func (this *Migrator) initiateStreaming() error {
624644 return nil
625645 },
626646 )
627-
628- go func () {
629- log .Debugf ("Beginning streaming" )
630- this .eventsStreamer .StreamEvents (func () bool { return this .canStopStreaming () })
631- }()
632- return nil
647+ return err
633648}
634649
635650func (this * Migrator ) initiateApplier () error {
@@ -680,13 +695,16 @@ func (this *Migrator) iterateChunks() error {
680695 if ! hasFurtherRange {
681696 return terminateRowIteration (nil )
682697 }
683- _ , rowsAffected , _ , err := this .applier .ApplyIterationInsertQuery ()
684- if err != nil {
685- return terminateRowIteration (err )
698+ applyCopyRowsFunc := func () error {
699+ _ , rowsAffected , _ , err := this .applier .ApplyIterationInsertQuery ()
700+ if err != nil {
701+ return terminateRowIteration (err )
702+ }
703+ atomic .AddInt64 (& this .migrationContext .TotalRowsCopied , rowsAffected )
704+ atomic .AddInt64 (& this .migrationContext .Iteration , 1 )
705+ return nil
686706 }
687- atomic .AddInt64 (& this .migrationContext .TotalRowsCopied , rowsAffected )
688- atomic .AddInt64 (& this .migrationContext .Iteration , 1 )
689- return nil
707+ return this .retryOperation (applyCopyRowsFunc )
690708 }
691709 this .copyRowsQueue <- copyRowsFunc
692710 }
@@ -714,7 +732,8 @@ func (this *Migrator) executeWriteFuncs() error {
714732 select {
715733 case copyRowsFunc := <- this .copyRowsQueue :
716734 {
717- if err := this .retryOperation (copyRowsFunc ); err != nil {
735+ // Retries are handled within the copyRowsFunc
736+ if err := copyRowsFunc (); err != nil {
718737 return log .Errore (err )
719738 }
720739 }
0 commit comments