@@ -23,11 +23,12 @@ const (
2323)
2424
2525type GoMySQLReader struct {
26- connectionConfig * mysql.ConnectionConfig
27- binlogSyncer * replication.BinlogSyncer
28- binlogStreamer * replication.BinlogStreamer
29- tableMap map [uint64 ]string
30- currentCoordinates mysql.BinlogCoordinates
26+ connectionConfig * mysql.ConnectionConfig
27+ binlogSyncer * replication.BinlogSyncer
28+ binlogStreamer * replication.BinlogStreamer
29+ tableMap map [uint64 ]string
30+ currentCoordinates mysql.BinlogCoordinates
31+ lastHandledCoordinates mysql.BinlogCoordinates
3132}
3233
3334func NewGoMySQLReader (connectionConfig * mysql.ConnectionConfig ) (binlogReader * GoMySQLReader , err error ) {
@@ -39,24 +40,91 @@ func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *G
3940 }
4041 binlogReader .binlogSyncer = replication .NewBinlogSyncer (serverId , "mysql" )
4142
42- // Register slave, the MySQL master is at 127.0.0.1:3306, with user root and an empty password
43- err = binlogReader .binlogSyncer .RegisterSlave (connectionConfig .Key .Hostname , uint16 (connectionConfig .Key .Port ), connectionConfig .User , connectionConfig .Password )
44- if err != nil {
45- return binlogReader , err
46- }
47-
4843 return binlogReader , err
4944}
5045
5146// ConnectBinlogStreamer
5247func (this * GoMySQLReader ) ConnectBinlogStreamer (coordinates mysql.BinlogCoordinates ) (err error ) {
48+ if coordinates .IsEmpty () {
49+ return log .Errorf ("Emptry coordinates at ConnectBinlogStreamer()" )
50+ }
51+ log .Infof ("Registering replica at %+v:%+v" , this .connectionConfig .Key .Hostname , uint16 (this .connectionConfig .Key .Port ))
52+ if err := this .binlogSyncer .RegisterSlave (this .connectionConfig .Key .Hostname , uint16 (this .connectionConfig .Key .Port ), this .connectionConfig .User , this .connectionConfig .Password ); err != nil {
53+ return err
54+ }
55+
5356 this .currentCoordinates = coordinates
57+ log .Infof ("Connecting binlog streamer at %+v" , this .currentCoordinates )
5458 // Start sync with sepcified binlog file and position
55- this .binlogStreamer , err = this .binlogSyncer .StartSync (gomysql.Position {coordinates . LogFile , uint32 (coordinates .LogPos )})
59+ this .binlogStreamer , err = this .binlogSyncer .StartSync (gomysql.Position {this . currentCoordinates . LogFile , uint32 (this . currentCoordinates .LogPos )})
5660
5761 return err
5862}
5963
64+ func (this * GoMySQLReader ) Reconnect () error {
65+ this .binlogSyncer .Close ()
66+
67+ connectCoordinates := & this .lastHandledCoordinates
68+ if connectCoordinates .IsEmpty () {
69+ connectCoordinates = & this .currentCoordinates
70+ }
71+ if err := this .ConnectBinlogStreamer (* connectCoordinates ); err != nil {
72+ return err
73+ }
74+ return nil
75+ }
76+
77+ func (this * GoMySQLReader ) GetCurrentBinlogCoordinates () * mysql.BinlogCoordinates {
78+ return & this .currentCoordinates
79+ }
80+
81+ // StreamEvents
82+ func (this * GoMySQLReader ) handleRowsEvent (ev * replication.BinlogEvent , rowsEvent * replication.RowsEvent , entriesChannel chan <- * BinlogEntry ) error {
83+ if this .currentCoordinates .SmallerThanOrEquals (& this .lastHandledCoordinates ) && ! this .lastHandledCoordinates .IsEmpty () {
84+ log .Infof ("Skipping handled query at %+v" , this .currentCoordinates )
85+ return nil
86+ }
87+
88+ dml := ToEventDML (ev .Header .EventType .String ())
89+ if dml == NotDML {
90+ return fmt .Errorf ("Unknown DML type: %s" , ev .Header .EventType .String ())
91+ }
92+ for i , row := range rowsEvent .Rows {
93+ if dml == UpdateDML && i % 2 == 1 {
94+ // An update has two rows (WHERE+SET)
95+ // We do both at the same time
96+ continue
97+ }
98+ binlogEntry := NewBinlogEntryAt (this .currentCoordinates )
99+ binlogEntry .DmlEvent = NewBinlogDMLEvent (
100+ string (rowsEvent .Table .Schema ),
101+ string (rowsEvent .Table .Table ),
102+ dml ,
103+ )
104+ switch dml {
105+ case InsertDML :
106+ {
107+ binlogEntry .DmlEvent .NewColumnValues = sql .ToColumnValues (row )
108+ }
109+ case UpdateDML :
110+ {
111+ binlogEntry .DmlEvent .WhereColumnValues = sql .ToColumnValues (row )
112+ binlogEntry .DmlEvent .NewColumnValues = sql .ToColumnValues (rowsEvent .Rows [i + 1 ])
113+ }
114+ case DeleteDML :
115+ {
116+ binlogEntry .DmlEvent .WhereColumnValues = sql .ToColumnValues (row )
117+ }
118+ }
119+ // The channel will do the throttling. Whoever is reding from the channel
120+ // decides whether action is taken sycnhronously (meaning we wait before
121+ // next iteration) or asynchronously (we keep pushing more events)
122+ // In reality, reads will be synchronous
123+ entriesChannel <- binlogEntry
124+ }
125+ return nil
126+ }
127+
60128// StreamEvents
61129func (this * GoMySQLReader ) StreamEvents (canStopStreaming func () bool , entriesChannel chan <- * BinlogEntry ) error {
62130 for {
@@ -77,44 +145,11 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
77145 // future I should remove this.
78146 this .tableMap [tableMapEvent .TableID ] = string (tableMapEvent .Table )
79147 } else if rowsEvent , ok := ev .Event .(* replication.RowsEvent ); ok {
80- dml := ToEventDML (ev .Header .EventType .String ())
81- if dml == NotDML {
82- return fmt .Errorf ("Unknown DML type: %s" , ev .Header .EventType .String ())
83- }
84- for i , row := range rowsEvent .Rows {
85- if dml == UpdateDML && i % 2 == 1 {
86- // An update has two rows (WHERE+SET)
87- // We do both at the same time
88- continue
89- }
90- binlogEntry := NewBinlogEntryAt (this .currentCoordinates )
91- binlogEntry .DmlEvent = NewBinlogDMLEvent (
92- string (rowsEvent .Table .Schema ),
93- string (rowsEvent .Table .Table ),
94- dml ,
95- )
96- switch dml {
97- case InsertDML :
98- {
99- binlogEntry .DmlEvent .NewColumnValues = sql .ToColumnValues (row )
100- }
101- case UpdateDML :
102- {
103- binlogEntry .DmlEvent .WhereColumnValues = sql .ToColumnValues (row )
104- binlogEntry .DmlEvent .NewColumnValues = sql .ToColumnValues (rowsEvent .Rows [i + 1 ])
105- }
106- case DeleteDML :
107- {
108- binlogEntry .DmlEvent .WhereColumnValues = sql .ToColumnValues (row )
109- }
110- }
111- // The channel will do the throttling. Whoever is reding from the channel
112- // decides whether action is taken sycnhronously (meaning we wait before
113- // next iteration) or asynchronously (we keep pushing more events)
114- // In reality, reads will be synchronous
115- entriesChannel <- binlogEntry
148+ if err := this .handleRowsEvent (ev , rowsEvent , entriesChannel ); err != nil {
149+ return err
116150 }
117151 }
152+ this .lastHandledCoordinates = this .currentCoordinates
118153 }
119154 log .Debugf ("done streaming events" )
120155
0 commit comments