66package binlog
77
88import (
9- "bytes"
10- "fmt"
119 "sync"
1210
1311 "github.com/github/gh-ost/go/base"
1412 "github.com/github/gh-ost/go/mysql"
15- "github.com/github/gh-ost/go/sql"
1613
1714 "time"
1815
@@ -77,116 +74,13 @@ func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinate
7774 return & returnCoordinates
7875}
7976
80- // handleRowsEvents processes a RowEvent from the binlog and sends the DML event to the entriesChannel.
81- func (this * GoMySQLReader ) handleRowsEvent (ev * replication.BinlogEvent , rowsEvent * replication.RowsEvent , entriesChannel chan <- * BinlogEntry ) error {
82- if this .currentCoordinates .IsLogPosOverflowBeyond4Bytes (& this .LastAppliedRowsEventHint ) {
83- return fmt .Errorf ("Unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes" , this .currentCoordinates )
84- }
85-
86- if this .currentCoordinates .SmallerThanOrEquals (& this .LastAppliedRowsEventHint ) {
87- this .migrationContext .Log .Debugf ("Skipping handled query at %+v" , this .currentCoordinates )
88- return nil
89- }
90-
91- dml := ToEventDML (ev .Header .EventType .String ())
92- if dml == NotDML {
93- return fmt .Errorf ("Unknown DML type: %s" , ev .Header .EventType .String ())
94- }
95- for i , row := range rowsEvent .Rows {
96- if dml == UpdateDML && i % 2 == 1 {
97- // An update has two rows (WHERE+SET)
98- // We do both at the same time
99- continue
100- }
101- binlogEntry := NewBinlogEntryAt (this .currentCoordinates )
102- binlogEntry .DmlEvent = NewBinlogDMLEvent (
103- string (rowsEvent .Table .Schema ),
104- string (rowsEvent .Table .Table ),
105- dml ,
106- )
107- switch dml {
108- case InsertDML :
109-
110- {
111- binlogEntry .DmlEvent .NewColumnValues = sql .ToColumnValues (row )
112- }
113- case UpdateDML :
114- {
115- binlogEntry .DmlEvent .WhereColumnValues = sql .ToColumnValues (row )
116- binlogEntry .DmlEvent .NewColumnValues = sql .ToColumnValues (rowsEvent .Rows [i + 1 ])
117- }
118- case DeleteDML :
119- {
120- binlogEntry .DmlEvent .WhereColumnValues = sql .ToColumnValues (row )
121- }
122- }
123- // The channel will do the throttling. Whoever is reading from the channel
124- // decides whether action is taken synchronously (meaning we wait before
125- // next iteration) or asynchronously (we keep pushing more events)
126- // In reality, reads will be synchronous
127- entriesChannel <- binlogEntry
128- }
129- this .LastAppliedRowsEventHint = this .currentCoordinates
130- return nil
131- }
132-
133- // RowsEventToBinlogEntry processes MySQL RowsEvent into our BinlogEntry for later application.
134- // copied from handleRowEvents
135- func RowsEventToBinlogEntry (eventType replication.EventType , rowsEvent * replication.RowsEvent , binlogCoords mysql.BinlogCoordinates ) (* BinlogEntry , error ) {
136- dml := ToEventDML (eventType .String ())
137- if dml == NotDML {
138- return nil , fmt .Errorf ("Unknown DML type: %s" , eventType .String ())
139- }
140- binlogEntry := NewBinlogEntryAt (binlogCoords )
141- for i , row := range rowsEvent .Rows {
142- if dml == UpdateDML && i % 2 == 1 {
143- // An update has two rows (WHERE+SET)
144- // We do both at the same time
145- continue
146- }
147- binlogEntry .DmlEvent = NewBinlogDMLEvent (
148- string (rowsEvent .Table .Schema ),
149- string (rowsEvent .Table .Table ),
150- dml ,
151- )
152- switch dml {
153- case InsertDML :
154- {
155- binlogEntry .DmlEvent .NewColumnValues = sql .ToColumnValues (row )
156- }
157- case UpdateDML :
158- {
159- binlogEntry .DmlEvent .WhereColumnValues = sql .ToColumnValues (row )
160- binlogEntry .DmlEvent .NewColumnValues = sql .ToColumnValues (rowsEvent .Rows [i + 1 ])
161- }
162- case DeleteDML :
163- {
164- binlogEntry .DmlEvent .WhereColumnValues = sql .ToColumnValues (row )
165- }
166- }
167- }
168- return binlogEntry , nil
169- }
170-
171- type Transaction struct {
172- SequenceNumber int64
173- LastCommitted int64
174- Changes chan * BinlogEntry
175- }
176-
177- func (this * GoMySQLReader ) StreamTransactions (ctx context.Context , transactionsChannel chan <- * Transaction ) error {
178- if err := ctx .Err (); err != nil {
179- return err
180- }
181-
182- previousSequenceNumber := int64 (0 )
183-
184- groups:
77+ // StreamEvents reads binlog events and sends them to the given channel.
78+ // It is blocking and should be executed in a goroutine.
79+ func (this * GoMySQLReader ) StreamEvents (ctx context.Context , eventChannel chan <- * replication.BinlogEvent ) error {
18580 for {
18681 if err := ctx .Err (); err != nil {
18782 return err
18883 }
189-
19084 ev , err := this .binlogStreamer .GetEvent (ctx )
19185 if err != nil {
19286 return err
@@ -197,160 +91,8 @@ groups:
19791 this .currentCoordinates .LogPos = int64 (ev .Header .LogPos )
19892 this .currentCoordinates .EventSize = int64 (ev .Header .EventSize )
19993 }()
200-
201- fmt .Printf ("Event: %s\n " , ev .Header .EventType )
202-
203- // Read each event and do something with it
204- //
205- // First, ignore all events until we find the next GTID event so that we can start
206- // at a transaction boundary.
207- //
208- // Once we find a GTID event, we can start an event group,
209- // and then process all events in that group.
210- // An event group is defined as all events that are part of the same transaction,
211- // which is defined as all events between the GTID event, a `QueryEvent` containing a `BEGIN` query and ends with
212- // either a XIDEvent or a `QueryEvent` containing a `COMMIT` or `ROLLBACK` query.
213- //
214- // Each group is a struct containing the SequenceNumber, LastCommitted, and a channel of events.
215- //
216- // Once the group has ended, we can start looking for the next GTID event.
217-
218- var group * Transaction
219- switch binlogEvent := ev .Event .(type ) {
220- case * replication.GTIDEvent :
221- this .migrationContext .Log .Infof ("GTIDEvent: %+v" , binlogEvent )
222-
223- // Bail out if we find a gap in the sequence numbers
224- if previousSequenceNumber != 0 && binlogEvent .SequenceNumber != previousSequenceNumber + 1 {
225- return fmt .Errorf ("unexpected sequence number: %d, expected %d" , binlogEvent .SequenceNumber , previousSequenceNumber + 1 )
226- }
227-
228- group = & Transaction {
229- SequenceNumber : binlogEvent .SequenceNumber ,
230- LastCommitted : binlogEvent .LastCommitted ,
231- Changes : make (chan * BinlogEntry , 1000 ),
232- }
233-
234- previousSequenceNumber = binlogEvent .SequenceNumber
235-
236- // We are good to send the transaction, the transaction events arrive async
237- this .migrationContext .Log .Infof ("sending transaction: %d %d" , group .SequenceNumber , group .LastCommitted )
238- transactionsChannel <- group
239- default :
240- this .migrationContext .Log .Infof ("Ignoring Event: %+v" , ev .Event )
241- continue
242- }
243-
244- // Next event should be a query event
245-
246- ev , err = this .binlogStreamer .GetEvent (ctx )
247- if err != nil {
248- close (group .Changes )
249- return err
250- }
251- this .migrationContext .Log .Infof ("1 - Event: %s" , ev .Header .EventType )
252-
253- switch binlogEvent := ev .Event .(type ) {
254- case * replication.QueryEvent :
255- if bytes .Equal ([]byte ("BEGIN" ), binlogEvent .Query ) {
256- this .migrationContext .Log .Infof ("BEGIN for transaction in schema %s" , binlogEvent .Schema )
257- } else {
258- this .migrationContext .Log .Infof ("QueryEvent: %+v" , binlogEvent )
259- this .migrationContext .Log .Infof ("Query: %s" , binlogEvent .Query )
260-
261- close (group .Changes )
262-
263- // wait for the next event group
264- continue groups
265- }
266- default :
267- this .migrationContext .Log .Infof ("unexpected Event: %+v" , ev .Event )
268- close (group .Changes )
269-
270- // TODO: handle the group - we want to make sure we process the group's LastCommitted and SequenceNumber
271-
272- // wait for the next event group
273- continue groups
274- }
275-
276- // Next event should be a table map event
277-
278- events:
279- // Now we can start processing the group
280- for {
281- ev , err = this .binlogStreamer .GetEvent (ctx )
282- if err != nil {
283- close (group .Changes )
284- return err
285- }
286- this .migrationContext .Log .Infof ("3 - Event: %s" , ev .Header .EventType )
287-
288- switch binlogEvent := ev .Event .(type ) {
289- case * replication.TableMapEvent :
290- this .migrationContext .Log .Infof ("TableMapEvent for %s.%s: %+v" , binlogEvent .Schema , binlogEvent .Table , binlogEvent )
291- case * replication.RowsEvent :
292- binlogEntry , err := RowsEventToBinlogEntry (ev .Header .EventType , binlogEvent , this .currentCoordinates )
293- if err != nil {
294- close (group .Changes )
295- return err
296- }
297- this .migrationContext .Log .Infof ("RowsEvent: %v" , binlogEvent )
298- group .Changes <- binlogEntry
299- this .migrationContext .Log .Infof ("Length of group.Changes: %d" , len (group .Changes ))
300- case * replication.XIDEvent :
301- this .migrationContext .Log .Infof ("XIDEvent: %+v" , binlogEvent )
302- this .migrationContext .Log .Infof ("COMMIT for transaction" )
303- break events
304- default :
305- close (group .Changes )
306- this .migrationContext .Log .Infof ("unexpected Event: %+v" , ev .Event )
307- return fmt .Errorf ("unexpected Event: %+v" , ev .Event )
308- }
309- }
310-
311- close (group .Changes )
312-
313- this .migrationContext .Log .Infof ("done processing group - %d events" , len (group .Changes ))
314- }
315- }
316-
317- // StreamEvents
318- func (this * GoMySQLReader ) StreamEvents (canStopStreaming func () bool , entriesChannel chan <- * BinlogEntry ) error {
319- if canStopStreaming () {
320- return nil
321- }
322- for {
323- if canStopStreaming () {
324- break
325- }
326- ev , err := this .binlogStreamer .GetEvent (context .Background ())
327- if err != nil {
328- return err
329- }
330- func () {
331- this .currentCoordinatesMutex .Lock ()
332- defer this .currentCoordinatesMutex .Unlock ()
333- this .currentCoordinates .LogPos = int64 (ev .Header .LogPos )
334- this .currentCoordinates .EventSize = int64 (ev .Header .EventSize )
335- }()
336-
337- switch binlogEvent := ev .Event .(type ) {
338- case * replication.RotateEvent :
339- func () {
340- this .currentCoordinatesMutex .Lock ()
341- defer this .currentCoordinatesMutex .Unlock ()
342- this .currentCoordinates .LogFile = string (binlogEvent .NextLogName )
343- }()
344- this .migrationContext .Log .Infof ("rotate to next log from %s:%d to %s" , this .currentCoordinates .LogFile , int64 (ev .Header .LogPos ), binlogEvent .NextLogName )
345- case * replication.RowsEvent :
346- if err := this .handleRowsEvent (ev , binlogEvent , entriesChannel ); err != nil {
347- return err
348- }
349- }
94+ eventChannel <- ev
35095 }
351- this .migrationContext .Log .Debugf ("done streaming events" )
352-
353- return nil
35496}
35597
35698func (this * GoMySQLReader ) Close () error {
0 commit comments