Skip to content
Merged
Show file tree
Hide file tree
Changes from 56 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
9927217
WIP: add GTID support
meiji163 Sep 22, 2025
4851e63
Cleanp
timvaillancourt Mar 26, 2021
4c70abe
Add doc for flag
timvaillancourt Mar 26, 2021
89a40db
Rename GTIDSet var
timvaillancourt Mar 26, 2021
2901ac2
Fix GTID SID parsing
timvaillancourt Mar 27, 2021
27160e3
Cleanup
timvaillancourt Mar 27, 2021
1cf3f4c
Add to docs
timvaillancourt Mar 27, 2021
7e54607
Require enforce_gtid_consistency=ON
timvaillancourt Mar 27, 2021
6de0e2e
Rename validator func
timvaillancourt Mar 27, 2021
70fc75c
simplify check in validateBinlogsAndGTID()
timvaillancourt Mar 27, 2021
977ee3d
Only update GTIDSet if there was no err
timvaillancourt Mar 27, 2021
eafe840
Simplify GTIDEvent -> GTIDSet
timvaillancourt Mar 27, 2021
3130fd4
Simplify GTIDEvent -> GTIDSet further, resolve go.uuid dep issue
timvaillancourt Mar 27, 2021
c60a86c
Fix UUIDSet GNO
timvaillancourt Mar 27, 2021
fbcbf5b
Add .ParseGTIDBinlogCoordinates()
timvaillancourt Mar 28, 2021
21643da
Add missing smaller-than/equal logic
timvaillancourt Mar 29, 2021
c34b437
Comment-out WIP test
timvaillancourt Mar 29, 2021
49e9920
Fail on SID change
timvaillancourt Apr 1, 2021
56e9080
Fix import err
timvaillancourt Apr 1, 2021
6e2c5b6
Add missing .Equals() check
timvaillancourt Apr 1, 2021
77d2c58
Fix .SmallerThan()/.SmallerThanEquals() funcs and tests
timvaillancourt Apr 2, 2021
29953f6
Fix type change issues
timvaillancourt Apr 2, 2021
9166dea
Fix panics
timvaillancourt Apr 3, 2021
baf92e7
Cleanup .SmallerThan()
timvaillancourt Apr 3, 2021
ec04b98
Add missing check in .Equals()
timvaillancourt Apr 3, 2021
089ced9
Add large GTID sets to test
timvaillancourt Apr 3, 2021
51e0276
Simplify if cond
timvaillancourt Apr 3, 2021
dee9dfe
Add localtest/gtid
timvaillancourt Jul 9, 2021
0413c2a
simplify localtest gtid_mode check
timvaillancourt Jul 9, 2021
d5a163d
print gtid config
timvaillancourt Jul 9, 2021
d614b40
Fix var typo
timvaillancourt Jul 9, 2021
551e6d0
support enforce_gtid_consistency=1
timvaillancourt Jul 9, 2021
76691cb
fix test
timvaillancourt Jul 10, 2021
472abbe
Handle PreviousGTIDsEvent
timvaillancourt Jul 11, 2021
7cd643b
gofmt
timvaillancourt Feb 22, 2022
9db9a32
Create `validateGTIDConfig` func
timvaillancourt Feb 23, 2022
f472fbd
Update copyrights
timvaillancourt Feb 23, 2022
13d218e
Update copyrights pt 2
timvaillancourt Feb 23, 2022
713f70b
Copyrights again
timvaillancourt Feb 24, 2022
a2b178b
Update docs
timvaillancourt Feb 24, 2022
a328c22
WIP
timvaillancourt Feb 28, 2022
ca0499b
WIP
timvaillancourt May 27, 2022
788911d
fix BinlogCoordinates interface usage
meiji163 Sep 23, 2025
9e76411
add binlog stream retry and toxiproxy test
meiji163 Oct 7, 2025
afb6e5c
fix max retry
meiji163 Oct 7, 2025
c2044cc
use AddGTID instead of AddSet
meiji163 Oct 8, 2025
59f2581
remove previous GTID event handling
meiji163 Oct 8, 2025
82e51cc
Merge remote-tracking branch 'origin' into add-gtid
meiji163 Oct 8, 2025
0794428
use AddSet
meiji163 Oct 8, 2025
de0417a
modify GTID coord tracking
meiji163 Oct 9, 2025
38fd3be
fix last trx coords
meiji163 Oct 9, 2025
5ce8778
re-enable Binlogsyncer retry
meiji163 Oct 9, 2025
a3684dd
fix localtest arg
meiji163 Oct 10, 2025
a3272a0
Merge remote-tracking branch 'origin' into add-gtid
meiji163 Oct 10, 2025
f3ac19c
remove unneccesary coordinate check
meiji163 Oct 10, 2025
99626bc
rm unused funcs
meiji163 Oct 10, 2025
ed0eba9
add back ReplicaTermFor
meiji163 Oct 10, 2025
1cd6095
Update localtests/test.sh
meiji163 Oct 10, 2025
6ef3086
Merge branch 'master' into add-gtid
meiji163 Oct 10, 2025
1a135cf
Merge branch 'master' into add-gtid
meiji163 Oct 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/command-line-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ Table name prefix to be used on the temporary tables.

Add this flag when executing on a 1st generation Google Cloud Platform (GCP).

### gtid

Add this flag to enable support for [MySQL replication GTIDs](https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-concepts.html) for replication positioning. This requires `gtid_mode` and `enforce_gtid_consistency` to be set to `ON`.

### heartbeat-interval-millis

Default 100. See [`subsecond-lag`](subsecond-lag.md) for details.
Expand Down
1 change: 1 addition & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type MigrationContext struct {
// This is useful when connecting to a MySQL instance where the external port
// may not match the internal port.
SkipPortValidation bool
UseGTIDs bool

config ContextConfig
configMutex *sync.Mutex
Expand Down
2 changes: 1 addition & 1 deletion go/base/context_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2022 GitHub Inc.
Copyright 2021 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/

Expand Down
22 changes: 3 additions & 19 deletions go/binlog/binlog_entry.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
/*
Copyright 2016 GitHub Inc.
Copyright 2022 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/

package binlog

import (
"fmt"

"github.com/github/gh-ost/go/mysql"
)

// BinlogEntry describes an entry in the binary log
type BinlogEntry struct {
Coordinates mysql.BinlogCoordinates
EndLogPos uint64

DmlEvent *BinlogDMLEvent
}

// NewBinlogEntry creates an empty, ready to go BinlogEntry object
func NewBinlogEntry(logFile string, logPos uint64) *BinlogEntry {
binlogEntry := &BinlogEntry{
Coordinates: mysql.BinlogCoordinates{LogFile: logFile, LogPos: int64(logPos)},
}
return binlogEntry
DmlEvent *BinlogDMLEvent
}

// NewBinlogEntryAt creates an empty, ready to go BinlogEntry object
Expand All @@ -34,13 +25,6 @@ func NewBinlogEntryAt(coordinates mysql.BinlogCoordinates) *BinlogEntry {
return binlogEntry
}

// Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned
func (this *BinlogEntry) Duplicate() *BinlogEntry {
binlogEntry := NewBinlogEntry(this.Coordinates.LogFile, uint64(this.Coordinates.LogPos))
binlogEntry.EndLogPos = this.EndLogPos
return binlogEntry
}

// String() returns a string representation of this binlog entry
func (this *BinlogEntry) String() string {
return fmt.Sprintf("[BinlogEntry at %+v; dml:%+v]", this.Coordinates, this.DmlEvent)
Expand Down
121 changes: 77 additions & 44 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,27 @@ import (

gomysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
uuid "github.com/google/uuid"
"golang.org/x/net/context"
)

type GoMySQLReader struct {
migrationContext *base.MigrationContext
connectionConfig *mysql.ConnectionConfig
binlogSyncer *replication.BinlogSyncer
binlogStreamer *replication.BinlogStreamer
currentCoordinates mysql.BinlogCoordinates
currentCoordinatesMutex *sync.Mutex
LastAppliedRowsEventHint mysql.BinlogCoordinates
migrationContext *base.MigrationContext
connectionConfig *mysql.ConnectionConfig
binlogSyncer *replication.BinlogSyncer
binlogStreamer *replication.BinlogStreamer
currentCoordinates mysql.BinlogCoordinates
currentCoordinatesMutex *sync.Mutex
// LastTrxCoords are the coordinates of the last transaction completely read.
// If using the file coordinates it is binlog position of the transaction's XID event.
LastTrxCoords mysql.BinlogCoordinates
}

func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader {
connectionConfig := migrationContext.InspectorConnectionConfig
return &GoMySQLReader{
migrationContext: migrationContext,
connectionConfig: connectionConfig,
currentCoordinates: mysql.BinlogCoordinates{},
currentCoordinatesMutex: &sync.Mutex{},
binlogSyncer: replication.NewBinlogSyncer(replication.BinlogSyncerConfig{
ServerID: uint32(migrationContext.ReplicaServerId),
Expand All @@ -46,8 +48,8 @@ func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader {
Password: connectionConfig.Password,
TLSConfig: connectionConfig.TLSConfig(),
UseDecimal: true,
MaxReconnectAttempts: migrationContext.BinlogSyncerMaxReconnectAttempts,
TimestampStringLocation: time.UTC,
MaxReconnectAttempts: migrationContext.BinlogSyncerMaxReconnectAttempts,
}),
}
}
Expand All @@ -58,35 +60,33 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin
return this.migrationContext.Log.Errorf("Empty coordinates at ConnectBinlogStreamer()")
}

this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates = coordinates
this.migrationContext.Log.Infof("Connecting binlog streamer at %+v", this.currentCoordinates)
// Start sync with specified binlog file and position
this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{
Name: this.currentCoordinates.LogFile,
Pos: uint32(this.currentCoordinates.LogPos),
})

this.migrationContext.Log.Infof("Connecting binlog streamer at %+v", coordinates)

// Start sync with specified GTID set or binlog file and position
if this.migrationContext.UseGTIDs {
coords := coordinates.(*mysql.GTIDBinlogCoordinates)
this.binlogStreamer, err = this.binlogSyncer.StartSyncGTID(coords.GTIDSet)
} else {
coords := this.currentCoordinates.(*mysql.FileBinlogCoordinates)
this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{
Name: coords.LogFile,
Pos: uint32(coords.LogPos)},
)
}
return err
}

func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates {
func (this *GoMySQLReader) GetCurrentBinlogCoordinates() mysql.BinlogCoordinates {
this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()
returnCoordinates := this.currentCoordinates
return &returnCoordinates
return this.currentCoordinates.Clone()
}

// StreamEvents
func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
if this.currentCoordinates.IsLogPosOverflowBeyond4Bytes(&this.LastAppliedRowsEventHint) {
return fmt.Errorf("Unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes", this.currentCoordinates)
}

if this.currentCoordinates.SmallerThanOrEquals(&this.LastAppliedRowsEventHint) {
this.migrationContext.Log.Debugf("Skipping handled query at %+v", this.currentCoordinates)
return nil
}

currentCoords := this.GetCurrentBinlogCoordinates()
dml := ToEventDML(ev.Header.EventType.String())
if dml == NotDML {
return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String())
Expand All @@ -97,7 +97,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
// We do both at the same time
continue
}
binlogEntry := NewBinlogEntryAt(this.currentCoordinates)
binlogEntry := NewBinlogEntryAt(currentCoords)
binlogEntry.DmlEvent = NewBinlogDMLEvent(
string(rowsEvent.Table.Schema),
string(rowsEvent.Table.Table),
Expand All @@ -118,13 +118,13 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
}
}

// The channel will do the throttling. Whoever is reading from the channel
// decides whether action is taken synchronously (meaning we wait before
// next iteration) or asynchronously (we keep pushing more events)
// In reality, reads will be synchronous
entriesChannel <- binlogEntry
}
this.LastAppliedRowsEventHint = this.currentCoordinates
return nil
}

Expand All @@ -141,23 +141,56 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
if err != nil {
return err
}
func() {

// Update binlog coords if using file-based coords.
// GTID coordinates are updated on receiving GTID events.
if !this.migrationContext.UseGTIDs {
this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
this.currentCoordinates.EventSize = int64(ev.Header.EventSize)
}()
coords := this.currentCoordinates.(*mysql.FileBinlogCoordinates)
prevCoords := coords.Clone().(*mysql.FileBinlogCoordinates)
coords.LogPos = int64(ev.Header.LogPos)
coords.EventSize = int64(ev.Header.EventSize)
if coords.IsLogPosOverflowBeyond4Bytes(prevCoords) {
this.currentCoordinatesMutex.Unlock()
return fmt.Errorf("Unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes", coords)
}
this.currentCoordinatesMutex.Unlock()
}

switch binlogEvent := ev.Event.(type) {
switch event := ev.Event.(type) {
case *replication.GTIDEvent:
if !this.migrationContext.UseGTIDs {
continue
}
sid, err := uuid.FromBytes(event.SID)
if err != nil {
return err
}
this.currentCoordinatesMutex.Lock()
if this.LastTrxCoords != nil {
this.currentCoordinates = this.LastTrxCoords.Clone()
}
coords := this.currentCoordinates.(*mysql.GTIDBinlogCoordinates)
trxGset := gomysql.NewUUIDSet(sid, gomysql.Interval{Start: event.GNO, Stop: event.GNO + 1})
coords.GTIDSet.AddSet(trxGset)
this.currentCoordinatesMutex.Unlock()
case *replication.RotateEvent:
func() {
this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogFile = string(binlogEvent.NextLogName)
}()
this.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", this.currentCoordinates.LogFile, int64(ev.Header.LogPos), binlogEvent.NextLogName)
if this.migrationContext.UseGTIDs {
continue
}
this.currentCoordinatesMutex.Lock()
coords := this.currentCoordinates.(*mysql.FileBinlogCoordinates)
coords.LogFile = string(event.NextLogName)
this.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", coords.LogFile, int64(ev.Header.LogPos), event.NextLogName)
this.currentCoordinatesMutex.Unlock()
case *replication.XIDEvent:
if this.migrationContext.UseGTIDs {
this.LastTrxCoords = &mysql.GTIDBinlogCoordinates{GTIDSet: event.GSet.(*gomysql.MysqlGTIDSet)}
} else {
this.LastTrxCoords = this.currentCoordinates.Clone()
}
case *replication.RowsEvent:
if err := this.handleRowsEvent(ev, binlogEvent, entriesChannel); err != nil {
if err := this.handleRowsEvent(ev, event, entriesChannel); err != nil {
return err
}
}
Expand Down
1 change: 1 addition & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func main() {
flag.BoolVar(&migrationContext.AliyunRDS, "aliyun-rds", false, "set to 'true' when you execute on Aliyun RDS.")
flag.BoolVar(&migrationContext.GoogleCloudPlatform, "gcp", false, "set to 'true' when you execute on a 1st generation Google Cloud Platform (GCP).")
flag.BoolVar(&migrationContext.AzureMySQL, "azure", false, "set to 'true' when you execute on Azure Database on MySQL.")
flag.BoolVar(&migrationContext.UseGTIDs, "gtid", false, "(experimental) set to 'true' to use MySQL GTIDs for binlog positioning.")

executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust")
Expand Down
6 changes: 3 additions & 3 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2022 GitHub Inc.
Copyright 2021 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/

Expand Down Expand Up @@ -989,11 +989,11 @@ func (this *Applier) StopReplication() error {
return err
}

readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.migrationContext.ApplierMySQLVersion, this.db)
readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.migrationContext.ApplierMySQLVersion, this.db, this.migrationContext.UseGTIDs)
if err != nil {
return err
}
this.migrationContext.Log.Infof("Replication IO thread at %+v. SQL thread is at %+v", *readBinlogCoordinates, *executeBinlogCoordinates)
this.migrationContext.Log.Infof("Replication IO thread at %+v. SQL thread is at %+v", readBinlogCoordinates, executeBinlogCoordinates)
return nil
}

Expand Down
23 changes: 22 additions & 1 deletion go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (this *Inspector) InitDBConnections() (err error) {
if err := this.validateBinlogs(); err != nil {
return err
}
if this.migrationContext.UseGTIDs {
if err := this.validateGTIDConfig(); err != nil {
return err
}
}
if err := this.applyBinlogFormat(); err != nil {
return err
}
Expand Down Expand Up @@ -379,7 +384,7 @@ func (this *Inspector) applyBinlogFormat() error {

// validateBinlogs checks that binary log configuration is good to go
func (this *Inspector) validateBinlogs() error {
query := `select /* gh-ost */ @@global.log_bin, @@global.binlog_format`
query := `select @@global.log_bin, @@global.binlog_format`
var hasBinaryLogs bool
if err := this.db.QueryRow(query).Scan(&hasBinaryLogs, &this.migrationContext.OriginalBinlogFormat); err != nil {
return err
Expand Down Expand Up @@ -418,6 +423,22 @@ func (this *Inspector) validateBinlogs() error {
return nil
}

// validateGTIDConfig checks that the GTID configuration is good to go
func (this *Inspector) validateGTIDConfig() error {
var gtidMode, enforceGtidConsistency string
query := `select @@global.gtid_mode, @@global.enforce_gtid_consistency`
if err := this.db.QueryRow(query).Scan(&gtidMode, &enforceGtidConsistency); err != nil {
return err
}
enforceGtidConsistency = strings.ToUpper(enforceGtidConsistency)
if strings.ToUpper(gtidMode) != "ON" || (enforceGtidConsistency != "ON" && enforceGtidConsistency != "1") {
return fmt.Errorf("%s must have gtid_mode=ON and enforce_gtid_consistency=ON to use GTID support", this.connectionConfig.Key.String())
}

this.migrationContext.Log.Infof("gtid config validated on %s", this.connectionConfig.Key.String())
return nil
}

// validateLogSlaveUpdates checks that binary log log_slave_updates is set. This test is not required when migrating on replica or when migrating directly on master
func (this *Inspector) validateLogSlaveUpdates() error {
query := `select /* gh-ost */ @@global.log_slave_updates`
Expand Down
4 changes: 2 additions & 2 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
return
}

currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates()
currentBinlogCoordinates := this.eventsStreamer.GetCurrentBinlogCoordinates()

status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s",
totalRowsCopied, rowsEstimate, progressPct,
Expand Down Expand Up @@ -1140,7 +1140,7 @@ func (this *Migrator) initiateStreaming() error {
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
return
}
this.migrationContext.SetRecentBinlogCoordinates(*this.eventsStreamer.GetCurrentBinlogCoordinates())
this.migrationContext.SetRecentBinlogCoordinates(this.eventsStreamer.GetCurrentBinlogCoordinates())
}
}()
return nil
Expand Down
2 changes: 1 addition & 1 deletion go/logic/server.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2022 GitHub Inc.
Copyright 2021 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/

Expand Down
Loading