@@ -30,13 +30,17 @@ type Coordinator struct {
3030 // Atomic counter for number of active workers
3131 busyWorkers atomic.Int64
3232
33+ // Mutex protecting currentCoordinates
3334 currentCoordinatesMutex sync.Mutex
3435 // The binlog coordinates of the low water mark transaction.
3536 currentCoordinates mysql.BinlogCoordinates
3637
3738 // Mutex to protect the fields below
3839 mu sync.Mutex
3940
41+ // list of workers
42+ workers []* Worker
43+
4044 // The low water mark. This is the sequence number of the last job that has been committed.
4145 lowWaterMark int64
4246
@@ -56,13 +60,25 @@ type Coordinator struct {
5660 finishedMigrating atomic.Bool
5761}
5862
63+ // Worker takes jobs from the Coordinator and applies the job's DML events.
5964type Worker struct {
6065 id int
6166 coordinator * Coordinator
67+ eventQueue chan * replication.BinlogEvent
6268
63- executedJobs int
69+ executedJobs atomic.Int64
70+ dmlEventsApplied atomic.Int64
71+ waitTimeNs atomic.Int64
72+ busyTimeNs atomic.Int64
73+ }
6474
65- eventQueue chan * replication.BinlogEvent
75+ type stats struct {
76+ dmlRate float64
77+ trxRate float64
78+ dmlEventsApplied int64
79+ executedJobs int64
80+ busyTime time.Duration
81+ waitTime time.Duration
6682}
6783
6884func (w * Worker ) ProcessEvents () error {
@@ -86,21 +102,15 @@ func (w *Worker) ProcessEvents() error {
86102 // Wait for conditions to be met
87103 waitChannel := w .coordinator .WaitForTransaction (gtidEvent .LastCommitted )
88104 if waitChannel != nil {
89- //fmt.Printf("Worker %d - transaction %d waiting for transaction: %d\n", w.id, gtidEvent.SequenceNumber, gtidEvent.LastCommitted)
90- t := time .Now ()
105+ waitStart := time .Now ()
91106 <- waitChannel
92- timeWaited := time .Since (t )
93- w .coordinator .migrationContext .Log .Infof (
94- "Worker %d waited for transaction %d for: %d\n " ,
95- w .id , gtidEvent .LastCommitted , timeWaited )
107+ timeWaited := time .Since (waitStart )
108+ w .waitTimeNs .Add (timeWaited .Nanoseconds ())
96109 }
97110
98111 // Process the transaction
99-
100112 var changelogEvent * binlog.BinlogDMLEvent
101-
102113 dmlEvents := make ([]* binlog.BinlogDMLEvent , 0 , int (atomic .LoadInt64 (& w .coordinator .migrationContext .DMLBatchSize )))
103-
104114 events:
105115 for {
106116 ev := <- w .eventQueue
@@ -166,21 +176,21 @@ func (w *Worker) ProcessEvents() error {
166176 dmlEvents = append (dmlEvents , dmlEvent )
167177
168178 if len (dmlEvents ) == cap (dmlEvents ) {
169- err := w .coordinator .applier .ApplyDMLEventQueries (dmlEvents )
170- if err != nil {
171- //TODO(meiji163) add retry
179+ if err := w .applyDMLEvents (dmlEvents ); err != nil {
172180 w .coordinator .migrationContext .Log .Errore (err )
173- w .coordinator .migrationContext .PanicAbort <- err
174181 }
175182 dmlEvents = dmlEvents [:0 ]
176183 }
177184 }
178185 }
179186 case * replication.XIDEvent :
180187 if len (dmlEvents ) > 0 {
181- w .coordinator .applier .ApplyDMLEventQueries (dmlEvents )
188+ if err := w .applyDMLEvents (dmlEvents ); err != nil {
189+ w .coordinator .migrationContext .Log .Errore (err )
190+ }
182191 }
183192
193+ w .executedJobs .Add (1 )
184194 break events
185195 }
186196 }
@@ -190,19 +200,33 @@ func (w *Worker) ProcessEvents() error {
190200 // Did we see a changelog event?
191201 // Handle it now
192202 if changelogEvent != nil {
203+ // wait for all transactions before this point
193204 waitChannel = w .coordinator .WaitForTransaction (gtidEvent .SequenceNumber - 1 )
194205 if waitChannel != nil {
206+ waitStart := time .Now ()
195207 <- waitChannel
208+ w .waitTimeNs .Add (time .Since (waitStart ).Nanoseconds ())
196209 }
197210 w .coordinator .HandleChangeLogEvent (changelogEvent )
198211 }
199212
200- w .executedJobs += 1
201213 w .coordinator .workerQueue <- w
202214 w .coordinator .busyWorkers .Add (- 1 )
203215 }
204216}
205217
218+ func (w * Worker ) applyDMLEvents (dmlEvents []* binlog.BinlogDMLEvent ) error {
219+ busyStart := time .Now ()
220+ err := w .coordinator .applier .ApplyDMLEventQueries (dmlEvents )
221+ if err != nil {
222+ //TODO(meiji163) add retry
223+ return err
224+ }
225+ w .busyTimeNs .Add (time .Since (busyStart ).Nanoseconds ())
226+ w .dmlEventsApplied .Add (int64 (len (dmlEvents )))
227+ return nil
228+ }
229+
206230func NewCoordinator (migrationContext * base.MigrationContext , applier * Applier , onChangelogEvent func (dmlEvent * binlog.BinlogDMLEvent ) error ) * Coordinator {
207231 connectionConfig := migrationContext .InspectorConnectionConfig
208232
@@ -338,7 +362,6 @@ func (c *Coordinator) ProcessEventsUntilDrained() error {
338362 if c .finishedMigrating .Load () {
339363 return nil
340364 }
341- // c.migrationContext.Log.Infof("Received event: %T - %+v", ev.Event, ev.Event)
342365
343366 switch binlogEvent := ev .Event .(type ) {
344367 case * replication.GTIDEvent :
@@ -358,13 +381,10 @@ func (c *Coordinator) ProcessEventsUntilDrained() error {
358381 worker := <- c .workerQueue
359382 c .busyWorkers .Add (1 )
360383
361- // c.migrationContext.Log.Infof("Submitting job %d to worker", ev.Event.(*replication.GTIDEvent).SequenceNumber)
362384 worker .eventQueue <- ev
363385
364386 ev = <- c .events
365387
366- // c.migrationContext.Log.Infof("Received event: %T - %+v", ev.Event, ev.Event)
367-
368388 switch binlogEvent := ev .Event .(type ) {
369389 case * replication.QueryEvent :
370390 if bytes .Equal ([]byte ("BEGIN" ), binlogEvent .Query ) {
@@ -381,9 +401,6 @@ func (c *Coordinator) ProcessEventsUntilDrained() error {
381401 events:
382402 for {
383403 ev = <- c .events
384-
385- // c.migrationContext.Log.Infof("Received event: %T - %+v", ev.Event, ev.Event)
386-
387404 switch ev .Event .(type ) {
388405 case * replication.RowsEvent :
389406 worker .eventQueue <- ev
@@ -402,8 +419,6 @@ func (c *Coordinator) ProcessEventsUntilDrained() error {
402419 busyWorkers := c .busyWorkers .Load ()
403420 if busyWorkers == 0 {
404421 return nil
405- } else {
406- //c.migrationContext.Log.Infof("%d/%d workers are busy\n", busyWorkers, cap(c.workerQueue))
407422 }
408423 }
409424 }
@@ -414,11 +429,35 @@ func (c *Coordinator) InitializeWorkers(count int) {
414429 c .workerQueue = make (chan * Worker , count )
415430 for i := 0 ; i < count ; i ++ {
416431 w := & Worker {id : i , coordinator : c , eventQueue : make (chan * replication.BinlogEvent , 1000 )}
432+
433+ c .mu .Lock ()
434+ c .workers = append (c .workers , w )
435+ c .mu .Unlock ()
436+
417437 c .workerQueue <- w
418438 go w .ProcessEvents ()
419439 }
420440}
421441
442+ func (c * Coordinator ) GetWorkerStats () []stats {
443+ c .mu .Lock ()
444+ defer c .mu .Unlock ()
445+ statSlice := make ([]stats , 0 , len (c .workers ))
446+ for _ , w := range c .workers {
447+ stat := stats {}
448+ stat .dmlEventsApplied = w .dmlEventsApplied .Load ()
449+ stat .executedJobs = w .executedJobs .Load ()
450+ stat .busyTime = time .Duration (w .busyTimeNs .Load ())
451+ stat .waitTime = time .Duration (w .waitTimeNs .Load ())
452+ if stat .busyTime .Milliseconds () > 0 {
453+ stat .dmlRate = 1000.0 * float64 (stat .dmlEventsApplied ) / float64 (stat .busyTime .Milliseconds ())
454+ stat .trxRate = 1000.0 * float64 (stat .executedJobs ) / float64 (stat .busyTime .Milliseconds ())
455+ }
456+ statSlice = append (statSlice , stat )
457+ }
458+ return statSlice
459+ }
460+
422461func (c * Coordinator ) WaitForTransaction (lastCommitted int64 ) chan struct {} {
423462 c .mu .Lock ()
424463 defer c .mu .Unlock ()
@@ -438,11 +477,8 @@ func (c *Coordinator) WaitForTransaction(lastCommitted int64) chan struct{} {
438477}
439478
440479func (c * Coordinator ) HandleChangeLogEvent (event * binlog.BinlogDMLEvent ) {
441- //c.migrationContext.Log.Infof("Coordinator: Handling changelog event: %+v\n", event)
442-
443480 c .mu .Lock ()
444481 defer c .mu .Unlock ()
445-
446482 c .onChangelogEvent (event )
447483}
448484
0 commit comments