Skip to content

Commit a3e79e0

Browse files
authored
Merge branch 'master' into update-codeql
2 parents da4f1e9 + 005043d commit a3e79e0

25 files changed

+672
-305
lines changed

doc/command-line-flags.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ Table name prefix to be used on the temporary tables.
160160

161161
Add this flag when executing on a 1st generation Google Cloud Platform (GCP).
162162

163+
### gtid
164+
165+
Add this flag to enable support for [MySQL replication GTIDs](https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-concepts.html) for replication positioning. This requires `gtid_mode` and `enforce_gtid_consistency` to be set to `ON`.
166+
163167
### heartbeat-interval-millis
164168

165169
Default 100. See [`subsecond-lag`](subsecond-lag.md) for details.

go/base/context.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ type MigrationContext struct {
108108
// This is useful when connecting to a MySQL instance where the external port
109109
// may not match the internal port.
110110
SkipPortValidation bool
111+
UseGTIDs bool
111112

112113
config ContextConfig
113114
configMutex *sync.Mutex

go/base/context_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
Copyright 2022 GitHub Inc.
2+
Copyright 2021 GitHub Inc.
33
See https://github.com/github/gh-ost/blob/master/LICENSE
44
*/
55

go/binlog/binlog_entry.go

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,20 @@
11
/*
2-
Copyright 2016 GitHub Inc.
2+
Copyright 2022 GitHub Inc.
33
See https://github.com/github/gh-ost/blob/master/LICENSE
44
*/
55

66
package binlog
77

88
import (
99
"fmt"
10+
1011
"github.com/github/gh-ost/go/mysql"
1112
)
1213

1314
// BinlogEntry describes an entry in the binary log
1415
type BinlogEntry struct {
1516
Coordinates mysql.BinlogCoordinates
16-
EndLogPos uint64
17-
18-
DmlEvent *BinlogDMLEvent
19-
}
20-
21-
// NewBinlogEntry creates an empty, ready to go BinlogEntry object
22-
func NewBinlogEntry(logFile string, logPos uint64) *BinlogEntry {
23-
binlogEntry := &BinlogEntry{
24-
Coordinates: mysql.BinlogCoordinates{LogFile: logFile, LogPos: int64(logPos)},
25-
}
26-
return binlogEntry
17+
DmlEvent *BinlogDMLEvent
2718
}
2819

2920
// NewBinlogEntryAt creates an empty, ready to go BinlogEntry object
@@ -34,13 +25,6 @@ func NewBinlogEntryAt(coordinates mysql.BinlogCoordinates) *BinlogEntry {
3425
return binlogEntry
3526
}
3627

37-
// Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned
38-
func (this *BinlogEntry) Duplicate() *BinlogEntry {
39-
binlogEntry := NewBinlogEntry(this.Coordinates.LogFile, uint64(this.Coordinates.LogPos))
40-
binlogEntry.EndLogPos = this.EndLogPos
41-
return binlogEntry
42-
}
43-
4428
// String() returns a string representation of this binlog entry
4529
func (this *BinlogEntry) String() string {
4630
return fmt.Sprintf("[BinlogEntry at %+v; dml:%+v]", this.Coordinates, this.DmlEvent)

go/binlog/gomysql_reader.go

Lines changed: 77 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,27 @@ import (
1717

1818
gomysql "github.com/go-mysql-org/go-mysql/mysql"
1919
"github.com/go-mysql-org/go-mysql/replication"
20+
uuid "github.com/google/uuid"
2021
"golang.org/x/net/context"
2122
)
2223

2324
type GoMySQLReader struct {
24-
migrationContext *base.MigrationContext
25-
connectionConfig *mysql.ConnectionConfig
26-
binlogSyncer *replication.BinlogSyncer
27-
binlogStreamer *replication.BinlogStreamer
28-
currentCoordinates mysql.BinlogCoordinates
29-
currentCoordinatesMutex *sync.Mutex
30-
LastAppliedRowsEventHint mysql.BinlogCoordinates
25+
migrationContext *base.MigrationContext
26+
connectionConfig *mysql.ConnectionConfig
27+
binlogSyncer *replication.BinlogSyncer
28+
binlogStreamer *replication.BinlogStreamer
29+
currentCoordinates mysql.BinlogCoordinates
30+
currentCoordinatesMutex *sync.Mutex
31+
// LastTrxCoords are the coordinates of the last transaction completely read.
32+
// If using the file coordinates it is binlog position of the transaction's XID event.
33+
LastTrxCoords mysql.BinlogCoordinates
3134
}
3235

3336
func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader {
3437
connectionConfig := migrationContext.InspectorConnectionConfig
3538
return &GoMySQLReader{
3639
migrationContext: migrationContext,
3740
connectionConfig: connectionConfig,
38-
currentCoordinates: mysql.BinlogCoordinates{},
3941
currentCoordinatesMutex: &sync.Mutex{},
4042
binlogSyncer: replication.NewBinlogSyncer(replication.BinlogSyncerConfig{
4143
ServerID: uint32(migrationContext.ReplicaServerId),
@@ -46,8 +48,8 @@ func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader {
4648
Password: connectionConfig.Password,
4749
TLSConfig: connectionConfig.TLSConfig(),
4850
UseDecimal: true,
49-
MaxReconnectAttempts: migrationContext.BinlogSyncerMaxReconnectAttempts,
5051
TimestampStringLocation: time.UTC,
52+
MaxReconnectAttempts: migrationContext.BinlogSyncerMaxReconnectAttempts,
5153
}),
5254
}
5355
}
@@ -58,35 +60,33 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin
5860
return this.migrationContext.Log.Errorf("Empty coordinates at ConnectBinlogStreamer()")
5961
}
6062

63+
this.currentCoordinatesMutex.Lock()
64+
defer this.currentCoordinatesMutex.Unlock()
6165
this.currentCoordinates = coordinates
62-
this.migrationContext.Log.Infof("Connecting binlog streamer at %+v", this.currentCoordinates)
63-
// Start sync with specified binlog file and position
64-
this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{
65-
Name: this.currentCoordinates.LogFile,
66-
Pos: uint32(this.currentCoordinates.LogPos),
67-
})
68-
66+
this.migrationContext.Log.Infof("Connecting binlog streamer at %+v", coordinates)
67+
68+
// Start sync with specified GTID set or binlog file and position
69+
if this.migrationContext.UseGTIDs {
70+
coords := coordinates.(*mysql.GTIDBinlogCoordinates)
71+
this.binlogStreamer, err = this.binlogSyncer.StartSyncGTID(coords.GTIDSet)
72+
} else {
73+
coords := this.currentCoordinates.(*mysql.FileBinlogCoordinates)
74+
this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{
75+
Name: coords.LogFile,
76+
Pos: uint32(coords.LogPos)},
77+
)
78+
}
6979
return err
7080
}
7181

72-
func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates {
82+
func (this *GoMySQLReader) GetCurrentBinlogCoordinates() mysql.BinlogCoordinates {
7383
this.currentCoordinatesMutex.Lock()
7484
defer this.currentCoordinatesMutex.Unlock()
75-
returnCoordinates := this.currentCoordinates
76-
return &returnCoordinates
85+
return this.currentCoordinates.Clone()
7786
}
7887

79-
// StreamEvents
8088
func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
81-
if this.currentCoordinates.IsLogPosOverflowBeyond4Bytes(&this.LastAppliedRowsEventHint) {
82-
return fmt.Errorf("Unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes", this.currentCoordinates)
83-
}
84-
85-
if this.currentCoordinates.SmallerThanOrEquals(&this.LastAppliedRowsEventHint) {
86-
this.migrationContext.Log.Debugf("Skipping handled query at %+v", this.currentCoordinates)
87-
return nil
88-
}
89-
89+
currentCoords := this.GetCurrentBinlogCoordinates()
9090
dml := ToEventDML(ev.Header.EventType.String())
9191
if dml == NotDML {
9292
return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String())
@@ -97,7 +97,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
9797
// We do both at the same time
9898
continue
9999
}
100-
binlogEntry := NewBinlogEntryAt(this.currentCoordinates)
100+
binlogEntry := NewBinlogEntryAt(currentCoords)
101101
binlogEntry.DmlEvent = NewBinlogDMLEvent(
102102
string(rowsEvent.Table.Schema),
103103
string(rowsEvent.Table.Table),
@@ -118,13 +118,13 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
118118
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
119119
}
120120
}
121+
121122
// The channel will do the throttling. Whoever is reading from the channel
122123
// decides whether action is taken synchronously (meaning we wait before
123124
// next iteration) or asynchronously (we keep pushing more events)
124125
// In reality, reads will be synchronous
125126
entriesChannel <- binlogEntry
126127
}
127-
this.LastAppliedRowsEventHint = this.currentCoordinates
128128
return nil
129129
}
130130

@@ -141,23 +141,56 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
141141
if err != nil {
142142
return err
143143
}
144-
func() {
144+
145+
// Update binlog coords if using file-based coords.
146+
// GTID coordinates are updated on receiving GTID events.
147+
if !this.migrationContext.UseGTIDs {
145148
this.currentCoordinatesMutex.Lock()
146-
defer this.currentCoordinatesMutex.Unlock()
147-
this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
148-
this.currentCoordinates.EventSize = int64(ev.Header.EventSize)
149-
}()
149+
coords := this.currentCoordinates.(*mysql.FileBinlogCoordinates)
150+
prevCoords := coords.Clone().(*mysql.FileBinlogCoordinates)
151+
coords.LogPos = int64(ev.Header.LogPos)
152+
coords.EventSize = int64(ev.Header.EventSize)
153+
if coords.IsLogPosOverflowBeyond4Bytes(prevCoords) {
154+
this.currentCoordinatesMutex.Unlock()
155+
return fmt.Errorf("Unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes", coords)
156+
}
157+
this.currentCoordinatesMutex.Unlock()
158+
}
150159

151-
switch binlogEvent := ev.Event.(type) {
160+
switch event := ev.Event.(type) {
161+
case *replication.GTIDEvent:
162+
if !this.migrationContext.UseGTIDs {
163+
continue
164+
}
165+
sid, err := uuid.FromBytes(event.SID)
166+
if err != nil {
167+
return err
168+
}
169+
this.currentCoordinatesMutex.Lock()
170+
if this.LastTrxCoords != nil {
171+
this.currentCoordinates = this.LastTrxCoords.Clone()
172+
}
173+
coords := this.currentCoordinates.(*mysql.GTIDBinlogCoordinates)
174+
trxGset := gomysql.NewUUIDSet(sid, gomysql.Interval{Start: event.GNO, Stop: event.GNO + 1})
175+
coords.GTIDSet.AddSet(trxGset)
176+
this.currentCoordinatesMutex.Unlock()
152177
case *replication.RotateEvent:
153-
func() {
154-
this.currentCoordinatesMutex.Lock()
155-
defer this.currentCoordinatesMutex.Unlock()
156-
this.currentCoordinates.LogFile = string(binlogEvent.NextLogName)
157-
}()
158-
this.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", this.currentCoordinates.LogFile, int64(ev.Header.LogPos), binlogEvent.NextLogName)
178+
if this.migrationContext.UseGTIDs {
179+
continue
180+
}
181+
this.currentCoordinatesMutex.Lock()
182+
coords := this.currentCoordinates.(*mysql.FileBinlogCoordinates)
183+
coords.LogFile = string(event.NextLogName)
184+
this.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", coords.LogFile, int64(ev.Header.LogPos), event.NextLogName)
185+
this.currentCoordinatesMutex.Unlock()
186+
case *replication.XIDEvent:
187+
if this.migrationContext.UseGTIDs {
188+
this.LastTrxCoords = &mysql.GTIDBinlogCoordinates{GTIDSet: event.GSet.(*gomysql.MysqlGTIDSet)}
189+
} else {
190+
this.LastTrxCoords = this.currentCoordinates.Clone()
191+
}
159192
case *replication.RowsEvent:
160-
if err := this.handleRowsEvent(ev, binlogEvent, entriesChannel); err != nil {
193+
if err := this.handleRowsEvent(ev, event, entriesChannel); err != nil {
161194
return err
162195
}
163196
}

go/cmd/gh-ost/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func main() {
8787
flag.BoolVar(&migrationContext.AliyunRDS, "aliyun-rds", false, "set to 'true' when you execute on Aliyun RDS.")
8888
flag.BoolVar(&migrationContext.GoogleCloudPlatform, "gcp", false, "set to 'true' when you execute on a 1st generation Google Cloud Platform (GCP).")
8989
flag.BoolVar(&migrationContext.AzureMySQL, "azure", false, "set to 'true' when you execute on Azure Database on MySQL.")
90+
flag.BoolVar(&migrationContext.UseGTIDs, "gtid", false, "(experimental) set to 'true' to use MySQL GTIDs for binlog positioning.")
9091

9192
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
9293
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust")

go/logic/applier.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
Copyright 2022 GitHub Inc.
2+
Copyright 2021 GitHub Inc.
33
See https://github.com/github/gh-ost/blob/master/LICENSE
44
*/
55

@@ -575,6 +575,7 @@ func (this *Applier) InitiateHeartbeat() {
575575
continue
576576
}
577577
if err := injectHeartbeat(); err != nil {
578+
this.migrationContext.PanicAbort <- fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err)
578579
return
579580
}
580581
}
@@ -989,11 +990,11 @@ func (this *Applier) StopReplication() error {
989990
return err
990991
}
991992

992-
readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.migrationContext.ApplierMySQLVersion, this.db)
993+
readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.migrationContext.ApplierMySQLVersion, this.db, this.migrationContext.UseGTIDs)
993994
if err != nil {
994995
return err
995996
}
996-
this.migrationContext.Log.Infof("Replication IO thread at %+v. SQL thread is at %+v", *readBinlogCoordinates, *executeBinlogCoordinates)
997+
this.migrationContext.Log.Infof("Replication IO thread at %+v. SQL thread is at %+v", readBinlogCoordinates, executeBinlogCoordinates)
997998
return nil
998999
}
9991000

go/logic/inspect.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ func (this *Inspector) InitDBConnections() (err error) {
7373
if err := this.validateBinlogs(); err != nil {
7474
return err
7575
}
76+
if this.migrationContext.UseGTIDs {
77+
if err := this.validateGTIDConfig(); err != nil {
78+
return err
79+
}
80+
}
7681
if err := this.applyBinlogFormat(); err != nil {
7782
return err
7883
}
@@ -379,7 +384,7 @@ func (this *Inspector) applyBinlogFormat() error {
379384

380385
// validateBinlogs checks that binary log configuration is good to go
381386
func (this *Inspector) validateBinlogs() error {
382-
query := `select /* gh-ost */ @@global.log_bin, @@global.binlog_format`
387+
query := `select /* gh-ost */@@global.log_bin, @@global.binlog_format`
383388
var hasBinaryLogs bool
384389
if err := this.db.QueryRow(query).Scan(&hasBinaryLogs, &this.migrationContext.OriginalBinlogFormat); err != nil {
385390
return err
@@ -418,6 +423,22 @@ func (this *Inspector) validateBinlogs() error {
418423
return nil
419424
}
420425

426+
// validateGTIDConfig checks that the GTID configuration is good to go
427+
func (this *Inspector) validateGTIDConfig() error {
428+
var gtidMode, enforceGtidConsistency string
429+
query := `select @@global.gtid_mode, @@global.enforce_gtid_consistency`
430+
if err := this.db.QueryRow(query).Scan(&gtidMode, &enforceGtidConsistency); err != nil {
431+
return err
432+
}
433+
enforceGtidConsistency = strings.ToUpper(enforceGtidConsistency)
434+
if strings.ToUpper(gtidMode) != "ON" || (enforceGtidConsistency != "ON" && enforceGtidConsistency != "1") {
435+
return fmt.Errorf("%s must have gtid_mode=ON and enforce_gtid_consistency=ON to use GTID support", this.connectionConfig.Key.String())
436+
}
437+
438+
this.migrationContext.Log.Infof("gtid config validated on %s", this.connectionConfig.Key.String())
439+
return nil
440+
}
441+
421442
// validateLogSlaveUpdates checks that binary log log_slave_updates is set. This test is not required when migrating on replica or when migrating directly on master
422443
func (this *Inspector) validateLogSlaveUpdates() error {
423444
query := `select /* gh-ost */ @@global.log_slave_updates`

go/logic/migrator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,7 +1075,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
10751075
return
10761076
}
10771077

1078-
currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates()
1078+
currentBinlogCoordinates := this.eventsStreamer.GetCurrentBinlogCoordinates()
10791079

10801080
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s",
10811081
totalRowsCopied, rowsEstimate, progressPct,
@@ -1140,7 +1140,7 @@ func (this *Migrator) initiateStreaming() error {
11401140
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
11411141
return
11421142
}
1143-
this.migrationContext.SetRecentBinlogCoordinates(*this.eventsStreamer.GetCurrentBinlogCoordinates())
1143+
this.migrationContext.SetRecentBinlogCoordinates(this.eventsStreamer.GetCurrentBinlogCoordinates())
11441144
}
11451145
}()
11461146
return nil

go/logic/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
Copyright 2022 GitHub Inc.
2+
Copyright 2021 GitHub Inc.
33
See https://github.com/github/gh-ost/blob/master/LICENSE
44
*/
55

0 commit comments

Comments
 (0)