@@ -437,15 +437,13 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
437437 {
438438 bool shouldDrain = ForceDrainTimelineQueue ;
439439
440- List < PendingTimelineRecord > pendingUpdates = new List < PendingTimelineRecord > ( ) ;
440+ var pendingUpdates = new List < PendingTimelineRecord > ( ) ;
441441 foreach ( var timeline in _allTimelines )
442442 {
443- ConcurrentQueue < TimelineRecord > recordQueue ;
444- if ( _timelineUpdateQueue . TryGetValue ( timeline , out recordQueue ) )
443+ if ( _timelineUpdateQueue . TryGetValue ( timeline , out ConcurrentQueue < TimelineRecord > recordQueue ) )
445444 {
446- List < TimelineRecord > records = new List < TimelineRecord > ( ) ;
447- TimelineRecord record ;
448- while ( recordQueue . TryDequeue ( out record ) )
445+ var records = new List < TimelineRecord > ( ) ;
446+ while ( recordQueue . TryDequeue ( out TimelineRecord record ) )
449447 {
450448 records . Add ( record ) ;
451449 // process at most 25 timeline records update for each timeline.
@@ -470,8 +468,7 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
470468 {
471469 foreach ( var update in pendingUpdates )
472470 {
473- List < TimelineRecord > bufferedRecords ;
474- if ( _bufferedRetryRecords . TryGetValue ( update . TimelineId , out bufferedRecords ) )
471+ if ( _bufferedRetryRecords . TryGetValue ( update . TimelineId , out List < TimelineRecord > bufferedRecords ) )
475472 {
476473 update . PendingRecords . InsertRange ( 0 , bufferedRecords ) ;
477474 }
@@ -484,7 +481,7 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
484481 {
485482 try
486483 {
487- Timeline newTimeline = await _jobServer . CreateTimelineAsync ( _scopeIdentifier , _hubName , _planId , detailTimeline . Details . Id , default ( CancellationToken ) ) ;
484+ Timeline newTimeline = await _jobServer . CreateTimelineAsync ( _scopeIdentifier , _hubName , _planId , detailTimeline . Details . Id , CancellationToken . None ) ;
488485 _allTimelines . Add ( newTimeline . Id ) ;
489486 pendingSubtimelineUpdate = true ;
490487 }
@@ -502,7 +499,7 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
502499
503500 try
504501 {
505- await _jobServer . UpdateTimelineRecordsAsync ( _scopeIdentifier , _hubName , _planId , update . TimelineId , update . PendingRecords , default ( CancellationToken ) ) ;
502+ await _jobServer . UpdateTimelineRecordsAsync ( _scopeIdentifier , _hubName , _planId , update . TimelineId , update . PendingRecords , CancellationToken . None ) ;
506503 if ( _bufferedRetryRecords . Remove ( update . TimelineId ) )
507504 {
508505 Trace . Verbose ( "Cleanup buffered timeline record for timeline: {0}." , update . TimelineId ) ;
@@ -512,7 +509,7 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
512509 {
513510 Trace . Info ( "Catch exception during update timeline records, try to update these timeline records next time." ) ;
514511 Trace . Error ( ex ) ;
515- _bufferedRetryRecords [ update . TimelineId ] = update . PendingRecords . ToList ( ) ;
512+ _bufferedRetryRecords [ update . TimelineId ] = update . PendingRecords ;
516513 if ( update . TimelineId == _jobTimelineId )
517514 {
518515 mainTimelineRecordsUpdateErrors . Add ( ex ) ;
@@ -532,26 +529,25 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
532529 }
533530 else
534531 {
535- if ( mainTimelineRecordsUpdateErrors . Count > 0 &&
532+ if ( ForceDrainTimelineQueue )
533+ {
534+ ForceDrainTimelineQueue = false ;
535+ }
536+ }
537+ }
538+
539+ if ( runOnce )
540+ {
541+ if ( mainTimelineRecordsUpdateErrors . Count > 0 &&
536542 _bufferedRetryRecords . ContainsKey ( _jobTimelineId ) &&
537543 _bufferedRetryRecords [ _jobTimelineId ] != null &&
538544 _bufferedRetryRecords [ _jobTimelineId ] . Any ( r => r . Variables . Count > 0 ) )
539- {
540- Trace . Info ( "Fail to update timeline records with output variables. Throw exception to fail the job since output variables are critical to downstream jobs." ) ;
541- throw new AggregateException ( StringUtil . Loc ( "OutputVariablePublishFailed" ) , mainTimelineRecordsUpdateErrors ) ;
542- }
543- else
544- {
545- if ( ForceDrainTimelineQueue )
546- {
547- ForceDrainTimelineQueue = false ;
548- }
549- if ( runOnce )
550- {
551- break ;
552- }
553- }
545+ {
546+ Trace . Info ( "Fail to update timeline records with output variables. Throw exception to fail the job since output variables are critical to downstream jobs." ) ;
547+ throw new AggregateException ( StringUtil . Loc ( "OutputVariablePublishFailed" ) , mainTimelineRecordsUpdateErrors ) ;
554548 }
549+
550+ break ;
555551 }
556552
557553 await Task . Delay ( _delayForTimelineUpdateDequeue ) ;
0 commit comments