@@ -185,51 +185,47 @@ func (s *jobScheduler) close() {
185185}
186186
187187// getJob reads tidb_ddl_job and returns the first runnable DDL job.
188- func (s * jobScheduler ) getJob (se * sess.Session , tp jobType ) (* model.Job , error ) {
188+ func (s * jobScheduler ) getJob (se * sess.Session ) (* model.Job , bool , error ) {
189189 defer s .runningJobs .resetAllPending ()
190190
191- not := "not"
192- label := "get_job_general"
193- if tp == jobTypeReorg {
194- not = ""
195- label = "get_job_reorg"
196- }
197- const getJobSQL = `select job_meta, processing from mysql.tidb_ddl_job where job_id in
191+ const getJobSQL = `select job_meta, processing, reorg from mysql.tidb_ddl_job where job_id in
198192 (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing)
199- and %s reorg %s order by processing desc, job_id`
193+ %s order by processing desc, job_id`
200194 var excludedJobIDs string
201195 if ids := s .runningJobs .allIDs (); len (ids ) > 0 {
202196 excludedJobIDs = fmt .Sprintf ("and job_id not in (%s)" , ids )
203197 }
204- sql := fmt .Sprintf (getJobSQL , not , excludedJobIDs )
205- rows , err := se .Execute (context .Background (), sql , label )
198+ sql := fmt .Sprintf (getJobSQL , excludedJobIDs )
199+ rows , err := se .Execute (context .Background (), sql , "get_job" )
206200 if err != nil {
207- return nil , errors .Trace (err )
201+ return nil , false , errors .Trace (err )
208202 }
209203 for _ , row := range rows {
210204 jobBinary := row .GetBytes (0 )
211205 isJobProcessing := row .GetInt64 (1 ) == 1
206+ isReorg := row .GetInt64 (2 ) != 0
212207
213208 job := model.Job {}
214209 err = job .Decode (jobBinary )
215210 if err != nil {
216- return nil , errors .Trace (err )
211+ return nil , isReorg , errors .Trace (err )
217212 }
218213
214+ involving := job .GetInvolvingSchemaInfo ()
219215 isRunnable , err := s .processJobDuringUpgrade (se , & job )
220216 if err != nil {
221- return nil , errors .Trace (err )
217+ return nil , isReorg , errors .Trace (err )
222218 }
223219 if ! isRunnable {
220+ s .runningJobs .addPending (involving )
224221 continue
225222 }
226223
227224 // The job has already been picked up, just return to continue it.
228225 if isJobProcessing {
229- return & job , nil
226+ return & job , isReorg , nil
230227 }
231228
232- involving := job .GetInvolvingSchemaInfo ()
233229 if ! s .runningJobs .checkRunnable (job .ID , involving ) {
234230 s .runningJobs .addPending (involving )
235231 continue
@@ -241,11 +237,11 @@ func (s *jobScheduler) getJob(se *sess.Session, tp jobType) (*model.Job, error)
241237 zap .Error (err ),
242238 zap .Stringer ("job" , & job ))
243239 s .runningJobs .addPending (involving )
244- return nil , errors .Trace (err )
240+ return nil , isReorg , errors .Trace (err )
245241 }
246- return & job , nil
242+ return & job , isReorg , nil
247243 }
248- return nil , nil
244+ return nil , false , nil
249245}
250246
251247func hasSysDB (job * model.Job ) bool {
@@ -394,8 +390,7 @@ func (s *jobScheduler) startDispatch() error {
394390 continue
395391 }
396392 failpoint .InjectCall ("beforeAllLoadDDLJobAndRun" )
397- s .loadDDLJobAndRun (se , s .generalDDLWorkerPool , jobTypeGeneral )
398- s .loadDDLJobAndRun (se , s .reorgWorkerPool , jobTypeReorg )
393+ s .loadDDLJobAndRun (se )
399394 }
400395}
401396
@@ -436,30 +431,32 @@ func (s *jobScheduler) checkAndUpdateClusterState(needUpdate bool) error {
436431 return nil
437432}
438433
439- func (s * jobScheduler ) loadDDLJobAndRun (se * sess.Session , pool * workerPool , tp jobType ) {
440- wk , err := pool .get ()
441- if err != nil || wk == nil {
442- logutil .DDLLogger ().Debug (fmt .Sprintf ("[ddl] no %v worker available now" , pool .tp ()), zap .Error (err ))
443- return
444- }
445-
434+ func (s * jobScheduler ) loadDDLJobAndRun (se * sess.Session ) {
446435 s .mu .RLock ()
447- s .mu .hook .OnGetJobBefore (pool . tp (). String () )
436+ s .mu .hook .OnGetJobBefore ()
448437 s .mu .RUnlock ()
449438
450439 startTime := time .Now ()
451- job , err := s .getJob (se , tp )
440+ job , isReorg , err := s .getJob (se )
452441 if job == nil || err != nil {
453442 if err != nil {
454- wk . jobLogger ( job ).Warn ("get job met error" , zap .Duration ("take time" , time .Since (startTime )), zap .Error (err ))
443+ logutil . DDLLogger ( ).Warn ("get job met error" , zap .Duration ("take time" , time .Since (startTime )), zap .Error (err ))
455444 }
456- pool .put (wk )
457445 return
458446 }
459447 s .mu .RLock ()
460- s .mu .hook .OnGetJobAfter (pool . tp (). String (), job )
448+ s .mu .hook .OnGetJobAfter (job )
461449 s .mu .RUnlock ()
462450
451+ pool := s .generalDDLWorkerPool
452+ if isReorg {
453+ pool = s .reorgWorkerPool
454+ }
455+ wk , err := pool .get ()
456+ if err != nil || wk == nil {
457+ logutil .DDLLogger ().Debug (fmt .Sprintf ("[ddl] no %v worker available now" , pool .tp ()), zap .Error (err ))
458+ return
459+ }
463460 s .delivery2Worker (wk , pool , job )
464461}
465462
@@ -526,10 +523,18 @@ func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model.
526523 jobID , involvedSchemaInfos := job .ID , job .GetInvolvingSchemaInfo ()
527524 s .runningJobs .addRunning (jobID , involvedSchemaInfos )
528525 metrics .DDLRunningJobCount .WithLabelValues (pool .tp ().String ()).Inc ()
529- s .wg .RunWithLog (func () {
526+ s .wg .Run (func () {
530527 defer func () {
528+ r := recover ()
529+ if r != nil {
530+ logutil .DDLLogger ().Error ("panic in delivery2Worker" , zap .Any ("recover" , r ), zap .Stack ("stack" ))
531+ }
531532 failpoint .InjectCall ("afterDelivery2Worker" , job )
532- s .runningJobs .removeRunning (jobID , involvedSchemaInfos )
533+ // Because there is a gap between `allIDs()` and `checkRunnable()`,
534+ // we append unfinished job to pending atomically to prevent `getJob()`
535+ // chosing another runnable job that involves the same schema object.
536+ moveRunningJobsToPending := r != nil || (job != nil && ! job .IsFinished ())
537+ s .runningJobs .finishOrPendJob (jobID , involvedSchemaInfos , moveRunningJobsToPending )
533538 asyncNotify (s .ddlJobNotifyCh )
534539 metrics .DDLRunningJobCount .WithLabelValues (pool .tp ().String ()).Dec ()
535540 pool .put (wk )
0 commit comments