@@ -568,7 +568,15 @@ func (this *Applier) readMigrationMaxValues(tx *gosql.Tx, uniqueKey *sql.UniqueK
568568 return err
569569 }
570570 }
571- this .migrationContext .Log .Infof ("Migration max values: [%s]" , this .migrationContext .MigrationRangeMaxValues )
571+
572+ // Save a snapshot copy of the initial MigrationRangeMaxValues
573+ if this .migrationContext .MigrationRangeMaxValues == nil {
574+ this .migrationContext .MigrationRangeMaxValuesInitial = nil
575+ } else {
576+ abstractValues := make ([]interface {}, len (this .migrationContext .MigrationRangeMaxValues .AbstractValues ()))
577+ copy (abstractValues , this .migrationContext .MigrationRangeMaxValues .AbstractValues ())
578+ this .migrationContext .MigrationRangeMaxValuesInitial = sql .ToColumnValues (abstractValues )
579+ }
572580
573581 return rows .Err ()
574582}
@@ -611,6 +619,63 @@ func (this *Applier) ReadMigrationRangeValues() error {
611619 return tx .Commit ()
612620}
613621
622+ // ResetMigrationRangeMaxValues updates the MigrationRangeMaxValues with new values
623+ func (this * Applier ) ResetMigrationRangeMaxValues (uniqueKeyAbstractValues []interface {}) {
624+ abstractValues := make ([]interface {}, len (uniqueKeyAbstractValues ))
625+ copy (abstractValues , uniqueKeyAbstractValues )
626+ this .migrationContext .MigrationRangeMaxValues = sql .ToColumnValues (abstractValues )
627+ this .migrationContext .Log .Debugf ("Reset migration max values: [%s]" , this .migrationContext .MigrationRangeMaxValues )
628+ }
629+
630+ // LockMigrationRangeMaxValues locks the MigrationRangeMaxValues to prevent further updates
631+ func (this * Applier ) LockMigrationRangeMaxValues () {
632+ if this .migrationContext .IsMigrationRangeMaxValuesLocked {
633+ return
634+ }
635+ this .migrationContext .IsMigrationRangeMaxValuesLocked = true
636+ this .migrationContext .Log .Infof ("Lock migration max values: [%s]" , this .migrationContext .MigrationRangeMaxValues )
637+ }
638+
639+ // AttemptToLockMigrationRangeMaxValues attempts to lock MigrationRangeMaxValues to prevent endless copying.
640+ // To avoid infinite updates of MigrationRangeMaxValues causing the copy to never end,
641+ // we need a strategy to stop updates. When the initial copy target is achieved,
642+ // MigrationRangeMaxValues will be locked.
643+ func (this * Applier ) AttemptToLockMigrationRangeMaxValues () {
644+ if this .migrationContext .IsMigrationRangeMaxValuesLocked {
645+ return
646+ }
647+
648+ // Currently only supports single-column unique index of int type
649+ uniqueKeyCols := this .migrationContext .UniqueKey .Columns .Columns ()
650+ if len (uniqueKeyCols ) != 1 {
651+ this .LockMigrationRangeMaxValues ()
652+ return
653+ }
654+ uniqueKeyCol := uniqueKeyCols [0 ]
655+ if uniqueKeyCol .CompareValueFunc == nil {
656+ this .LockMigrationRangeMaxValues ()
657+ return
658+ }
659+
660+ // Compare MigrationIterationRangeMinValues with MigrationRangeMaxValuesInitial to determine copy progress
661+ if this .migrationContext .MigrationIterationRangeMinValues == nil {
662+ return
663+ }
664+ than , err := uniqueKeyCol .CompareValueFunc (
665+ this .migrationContext .MigrationIterationRangeMinValues .AbstractValues ()[0 ],
666+ this .migrationContext .MigrationRangeMaxValuesInitial .AbstractValues ()[0 ],
667+ )
668+ if err != nil {
669+ // If comparison fails, fallback to locking MigrationRangeMaxValues
670+ this .migrationContext .Log .Errore (err )
671+ this .LockMigrationRangeMaxValues ()
672+ return
673+ }
674+ if than >= 0 {
675+ this .LockMigrationRangeMaxValues ()
676+ }
677+ }
678+
614679// CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values,
615680// which will be used for copying the next chunk of rows. Ir returns "false" if there is
616681// no further chunk to work through, i.e. we're past the last chunk and are done with
@@ -620,6 +685,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
620685 if this .migrationContext .MigrationIterationRangeMinValues == nil {
621686 this .migrationContext .MigrationIterationRangeMinValues = this .migrationContext .MigrationRangeMinValues
622687 }
688+ this .AttemptToLockMigrationRangeMaxValues ()
623689 for i := 0 ; i < 2 ; i ++ {
624690 buildFunc := sql .BuildUniqueKeyRangeEndPreparedQueryViaOffset
625691 if i == 1 {
@@ -661,6 +727,8 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
661727 }
662728 }
663729 this .migrationContext .Log .Debugf ("Iteration complete: no further range to iterate" )
730+ // Ensure MigrationRangeMaxValues is locked after iteration is complete
731+ this .LockMigrationRangeMaxValues ()
664732 return hasFurtherRange , nil
665733}
666734
@@ -1282,7 +1350,14 @@ func (this *Applier) IsIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs []interface{})
12821350 case than < 0 :
12831351 return true , nil
12841352 case than > 0 :
1285- return false , nil
1353+ // When the value is greater than MigrationRangeMaxValues boundary, attempt to dynamically expand MigrationRangeMaxValues
1354+ // After expand, treat this comparison as equal, otherwise it cannot be ignored
1355+ if ! this .migrationContext .IsMigrationRangeMaxValuesLocked {
1356+ this .ResetMigrationRangeMaxValues (uniqueKeyArgs )
1357+ return true , nil
1358+ } else {
1359+ return false , nil
1360+ }
12861361 default :
12871362 // Since rowcopy is left-open-right-closed, when it is equal to the MigrationRangeMaxValues boundary value, it can be ignored.
12881363 return true , nil
0 commit comments