Skip to content

Commit 70bfd90

Browse files
authored
ddl: record get owner TS and compare it before runReorgJob quit (#55049) (#55111)
close #54897
1 parent 6990c1f commit 70bfd90

File tree

3 files changed

+37
-9
lines changed

3 files changed

+37
-9
lines changed

pkg/ddl/backfilling.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -674,12 +674,8 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.Physical
674674
totalAddedCount := job.GetRowCount()
675675

676676
startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey
677+
677678
if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil {
678-
if errors.ErrorEqual(err, dbterror.ErrNotOwner) {
679-
// This instance is not DDL owner, we remove reorgctx proactively
680-
// to avoid being used later.
681-
dc.removeReorgCtx(reorgInfo.ID)
682-
}
683679
return errors.Trace(err)
684680
}
685681

pkg/ddl/ddl.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,19 @@ type reorgContexts struct {
493493
sync.RWMutex
494494
// reorgCtxMap maps job ID to reorg context.
495495
reorgCtxMap map[int64]*reorgCtx
496+
beOwnerTS int64
497+
}
498+
499+
func (r *reorgContexts) getOwnerTS() int64 {
500+
r.RLock()
501+
defer r.RUnlock()
502+
return r.beOwnerTS
503+
}
504+
505+
func (r *reorgContexts) setOwnerTS(ts int64) {
506+
r.Lock()
507+
r.beOwnerTS = ts
508+
r.Unlock()
496509
}
497510

498511
func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx {
@@ -510,7 +523,7 @@ func (dc *ddlCtx) newReorgCtx(jobID int64, rowCount int64) *reorgCtx {
510523
return existedRC
511524
}
512525
rc := &reorgCtx{}
513-
rc.doneCh = make(chan error, 1)
526+
rc.doneCh = make(chan reorgFnResult, 1)
514527
// initial reorgCtx
515528
rc.setRowCount(rowCount)
516529
rc.mu.warnings = make(map[errors.ErrorID]*terror.Error)
@@ -746,6 +759,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
746759
if err != nil {
747760
logutil.BgLogger().Error("error when getting the ddl history count", zap.Error(err))
748761
}
762+
d.reorgCtx.setOwnerTS(time.Now().Unix())
749763
d.runningJobs.clear()
750764
})
751765

pkg/ddl/reorg.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ type reorgCtx struct {
5959
// If the reorganization job is done, we will use this channel to notify outer.
6060
// TODO: Now we use goroutine to simulate reorganization jobs, later we may
6161
// use a persistent job list.
62-
doneCh chan error
62+
doneCh chan reorgFnResult
6363
// rowCount is used to simulate a job's row count.
6464
rowCount int64
6565
jobState model.JobState
@@ -74,6 +74,13 @@ type reorgCtx struct {
7474
references atomicutil.Int32
7575
}
7676

77+
// reorgFnResult records the DDL owner TS before executing reorg function, in order to help
78+
// receiver determine if the result is from reorg function of previous DDL owner in this instance.
79+
type reorgFnResult struct {
80+
ownerTS int64
81+
err error
82+
}
83+
7784
// newContext gets a context. It is only used for adding column in reorganization state.
7885
func newContext(store kv.Storage) sessionctx.Context {
7986
c := mock.NewContext()
@@ -200,11 +207,13 @@ func (w *worker) runReorgJob(reorgInfo *reorgInfo, tblInfo *model.TableInfo,
200207
return dbterror.ErrCancelledDDLJob
201208
}
202209

210+
beOwnerTS := w.ddlCtx.reorgCtx.getOwnerTS()
203211
rc = w.newReorgCtx(reorgInfo.Job.ID, reorgInfo.Job.GetRowCount())
204212
w.wg.Add(1)
205213
go func() {
206214
defer w.wg.Done()
207-
rc.doneCh <- f()
215+
err := f()
216+
rc.doneCh <- reorgFnResult{ownerTS: beOwnerTS, err: err}
208217
}()
209218
}
210219

@@ -220,7 +229,16 @@ func (w *worker) runReorgJob(reorgInfo *reorgInfo, tblInfo *model.TableInfo,
220229

221230
// wait reorganization job done or timeout
222231
select {
223-
case err := <-rc.doneCh:
232+
case res := <-rc.doneCh:
233+
err := res.err
234+
curTS := w.ddlCtx.reorgCtx.getOwnerTS()
235+
if res.ownerTS != curTS {
236+
d.removeReorgCtx(job.ID)
237+
logutil.BgLogger().Warn("owner ts mismatch, return timeout error and retry",
238+
zap.Int64("prevTS", res.ownerTS),
239+
zap.Int64("curTS", curTS))
240+
return dbterror.ErrWaitReorgTimeout
241+
}
224242
// Since job is cancelled,we don't care about its partial counts.
225243
if rc.isReorgCanceled() || terror.ErrorEqual(err, dbterror.ErrCancelledDDLJob) {
226244
d.removeReorgCtx(job.ID)

0 commit comments

Comments
 (0)