Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 21 additions & 81 deletions src/Agent.Listener/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ private async Task RunAsync(Pipelines.AgentJobRequestMessage message, WorkerDisp

var jobRequestCancellationToken = newJobDispatch.WorkerCancellationTokenSource.Token;
var workerCancelTimeoutKillToken = newJobDispatch.WorkerCancelTimeoutKillTokenSource.Token;
var workerFlushLogsTimeoutToken = newJobDispatch.WorkerFlushLogsTimeoutTokenSource.Token;
var term = HostContext.GetService<ITerminal>();
term.WriteLine(StringUtil.Loc("RunningJob", DateTime.UtcNow, message.JobDisplayName));

Expand Down Expand Up @@ -451,7 +450,6 @@ private async Task RunAsync(Pipelines.AgentJobRequestMessage message, WorkerDisp
var featureFlagProvider = HostContext.GetService<IFeatureFlagProvider>();
var newMaskerAndRegexesFeatureFlagStatus = await featureFlagProvider.GetFeatureFlagAsync(HostContext, "DistributedTask.Agent.EnableNewMaskerAndRegexes", Trace);
var enhancedLoggingFlag = await featureFlagProvider.GetFeatureFlagAsync(HostContext, "DistributedTask.Agent.UseEnhancedLogging", Trace);

var environment = new Dictionary<string, string>();
if (newMaskerAndRegexesFeatureFlagStatus?.EffectiveState == "On")
{
Expand Down Expand Up @@ -736,19 +734,24 @@ await processChannel.SendAsync(
}

Trace.Info($"Waiting for worker to exit gracefully for job: {message.JobId}");
// wait worker to exit
// if worker doesn't exit within timeout, then kill worker.
var exitTask = await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerCancelTimeoutKillToken));

// Wait for worker to complete within the original timeout
var gracefulExitTask = await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerFlushLogsTimeoutToken));

if (gracefulExitTask != workerProcessTask)
// worker haven't exit within cancellation timeout.
if (exitTask != workerProcessTask)
{
// Original timeout expired, handle with timeout log flushing if enabled
await HandleWorkerTimeoutAsync(
message.JobId,
processChannel,
workerProcessTask,
workerProcessCancelTokenSource,
workerCancelTimeoutKillToken);
Trace.Info($"worker process for job {message.JobId} haven't exit within cancellation timout, kill running worker.");
workerProcessCancelTokenSource.Cancel();
try
{
await workerProcessTask;
Trace.Info("Worker process forceful termination completed");
}
catch (OperationCanceledException)
{
Trace.Info("worker process has been killed.");
}
}
else
{
Expand Down Expand Up @@ -1067,7 +1070,6 @@ private class WorkerDispatcher : IDisposable
public TaskCompletionSource<JobMetadataMessage> MetadataSource { get; set; }
public CancellationTokenSource WorkerCancellationTokenSource { get; private set; }
public CancellationTokenSource WorkerCancelTimeoutKillTokenSource { get; private set; }
public CancellationTokenSource WorkerFlushLogsTimeoutTokenSource { get; private set; }
private readonly object _lock = new object();

const int maxValueInMinutes = 35790; // 35790 * 60 * 1000 = 2147400000
Expand All @@ -1078,19 +1080,18 @@ public WorkerDispatcher(Guid jobId, long requestId)
{
JobId = jobId;
RequestId = requestId;
WorkerCancellationTokenSource = new CancellationTokenSource();
WorkerCancelTimeoutKillTokenSource = new CancellationTokenSource();
WorkerFlushLogsTimeoutTokenSource = new CancellationTokenSource();
WorkerCancellationTokenSource = new CancellationTokenSource();
MetadataSource = new TaskCompletionSource<JobMetadataMessage>();
}

public bool Cancel(TimeSpan timeout)
{
if (WorkerCancellationTokenSource != null && WorkerCancelTimeoutKillTokenSource != null && WorkerFlushLogsTimeoutTokenSource != null)
if (WorkerCancellationTokenSource != null && WorkerCancelTimeoutKillTokenSource != null)
{
lock (_lock)
{
if (WorkerCancellationTokenSource != null && WorkerCancelTimeoutKillTokenSource != null && WorkerFlushLogsTimeoutTokenSource != null)
if (WorkerCancellationTokenSource != null && WorkerCancelTimeoutKillTokenSource != null)
{
WorkerCancellationTokenSource.Cancel();

Expand All @@ -1106,12 +1107,7 @@ public bool Cancel(TimeSpan timeout)
timeout = TimeSpan.FromMinutes(maxValueInMinutes);
}

// Use the original timeout for worker execution (no flush signal beforehand)
WorkerFlushLogsTimeoutTokenSource.CancelAfter(timeout.Subtract(TimeSpan.FromSeconds(15)));

// Set kill timeout to original timeout + 1 minute for log flushing
TimeSpan killTimeout = timeout.Add(TimeSpan.FromMinutes(1));
WorkerCancelTimeoutKillTokenSource.CancelAfter(killTimeout);
WorkerCancelTimeoutKillTokenSource.CancelAfter(timeout.Subtract(TimeSpan.FromSeconds(15)));
return true;
}
}
Expand Down Expand Up @@ -1143,7 +1139,7 @@ private void Dispose(bool disposing)
{
if (disposing)
{
if (WorkerCancellationTokenSource != null || WorkerCancelTimeoutKillTokenSource != null || WorkerFlushLogsTimeoutTokenSource != null)
if (WorkerCancellationTokenSource != null || WorkerCancelTimeoutKillTokenSource != null)
{
lock (_lock)
{
Expand All @@ -1158,66 +1154,10 @@ private void Dispose(bool disposing)
WorkerCancelTimeoutKillTokenSource.Dispose();
WorkerCancelTimeoutKillTokenSource = null;
}

if (WorkerFlushLogsTimeoutTokenSource != null)
{
WorkerFlushLogsTimeoutTokenSource.Dispose();
WorkerFlushLogsTimeoutTokenSource = null;
}
}
}
}
}
}

private async Task HandleWorkerTimeoutAsync(
Guid jobId,
IProcessChannel processChannel,
Task<int> workerProcessTask,
CancellationTokenSource workerProcessCancelTokenSource,
CancellationToken workerCancelTimeoutKillToken)
{
Trace.Info($"Worker process for job {jobId} hasn't completed within original timeout, sending flush logs request and waiting 1 minute before forceful kill.");
try
{
// Send special flush logs request to worker
using (var csSendFlush = new CancellationTokenSource(_channelTimeout))
{
await processChannel.SendAsync(
messageType: MessageType.FlushLogsRequest,
body: string.Empty,
cancellationToken: csSendFlush.Token);
}
Trace.Info("Flush logs request sent to worker, waiting 1 minute for log flushing before forceful kill.");
}
catch (Exception ex)
{
Trace.Warning($"Failed to send flush logs request to worker: {ex.Message}");
Trace.Warning(ex.ToString());
}

// Now wait for the additional 1 minute log flushing period
try
{
await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerCancelTimeoutKillToken));

if (!workerProcessTask.IsCompleted)
{
// Worker still hasn't exited after 1 minute log flushing period, force kill
Trace.Info($"Worker process for job {jobId} hasn't exited after 1 minute log flushing period, proceeding to forceful kill.");
workerProcessCancelTokenSource.Cancel();
await workerProcessTask;
Trace.Info("Worker process forceful termination completed");
}
else
{
Trace.Info("Worker process exited gracefully after flush logs signal");
}
}
catch (OperationCanceledException)
{
Trace.Info("worker process has been killed.");
}
}
}
}
8 changes: 0 additions & 8 deletions src/Agent.Sdk/Knob/AgentKnobs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -709,14 +709,6 @@ public class AgentKnobs
new EnvironmentKnobSource("AZP_ENABLE_NEW_MASKER_AND_REGEXES"),
new BuiltInDefaultKnobSource("false"));

public static readonly Knob EnableTimeoutLogFlushing = new Knob(
nameof(EnableTimeoutLogFlushing),
"If true, enables timeout log flushing where worker gets 1 minute to flush logs after job timeout before force kill.",
new PipelineFeatureSource("EnableTimeoutLogFlushing"),
new RuntimeKnobSource("AZP_ENABLE_TIMEOUT_LOG_FLUSHING"),
new EnvironmentKnobSource("AZP_ENABLE_TIMEOUT_LOG_FLUSHING"),
new BuiltInDefaultKnobSource("false"));

public static readonly Knob SendSecretMaskerTelemetry = new Knob(
nameof(SendSecretMaskerTelemetry),
"If true, the agent will send telemetry about secret masking",
Expand Down
16 changes: 1 addition & 15 deletions src/Agent.Worker/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message,

IExecutionContext jobContext = null;
CancellationTokenRegistration? agentShutdownRegistration = null;
CancellationTokenRegistration? workerTimeoutRegistration = null;
VssConnection taskConnection = null;
VssConnection legacyTaskConnection = null;
IResourceMetricsManager resourceDiagnosticManager = null;
Expand Down Expand Up @@ -160,13 +159,6 @@ public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message,
jobContext.AddIssue(new Issue() { Type = IssueType.Error, Message = errorMessage });
});

// Register for worker timeout cancellation - similar to agent shutdown
workerTimeoutRegistration = HostContext.WorkerShutdownForTimeout.Register(() =>
{
Trace.Warning($"Worker shutdown for timeout triggered [JobId:{message.JobId}]");
jobContext.AddIssue(new Issue() { Type = IssueType.Error, Message = "Job cancelled due to worker timeout." });
});

// Validate directory permissions.
string workDirectory = HostContext.GetDirectory(WellKnownDirectory.Work);
Trace.Info($"Validating directory permissions for: '{workDirectory}'");
Expand Down Expand Up @@ -462,12 +454,6 @@ public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message,
agentShutdownRegistration = null;
}

if (workerTimeoutRegistration != null)
{
workerTimeoutRegistration.Value.Dispose();
workerTimeoutRegistration = null;
}

legacyTaskConnection?.Dispose();
taskConnection?.Dispose();
jobConnection?.Dispose();
Expand Down Expand Up @@ -753,4 +739,4 @@ private void PublishTelemetry(IExecutionContext context, string area, String fea
}
}
}
}
}
57 changes: 3 additions & 54 deletions src/Agent.Worker/StepsRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,6 @@ public async Task RunAsync(IExecutionContext jobContext, IList<IStep> steps)
conditionReTestResult = false;
Trace.Info($"Condition re-evaluation skipped [Step:'{step.DisplayName}', Reason:AgentShutdown]");
}
else if (AgentKnobs.EnableTimeoutLogFlushing.GetValue(step.ExecutionContext).AsBoolean() &&
HostContext.WorkerShutdownForTimeout.IsCancellationRequested)
{
jobContext.Result = TaskResult.Canceled;
jobContext.Variables.Agent_JobStatus = jobContext.Result;
conditionReTestResult = false;
Trace.Info($"Condition re-evaluation skipped [Step:'{step.DisplayName}', Reason:WorkerTimeout]");
}
else
{
try
Expand Down Expand Up @@ -211,14 +203,6 @@ public async Task RunAsync(IExecutionContext jobContext, IList<IStep> steps)
conditionResult = false;
Trace.Info($"Condition evaluation skipped due to agent shutdown: '{step.DisplayName}'");
}
else if (AgentKnobs.EnableTimeoutLogFlushing.GetValue(step.ExecutionContext).AsBoolean() &&
HostContext.WorkerShutdownForTimeout.IsCancellationRequested)
{
jobContext.Result = TaskResult.Canceled;
jobContext.Variables.Agent_JobStatus = jobContext.Result;
conditionResult = false;
Trace.Info($"Condition evaluation skipped due to worker timeout: '{step.DisplayName}'");
}
else
{
try
Expand Down Expand Up @@ -256,8 +240,8 @@ public async Task RunAsync(IExecutionContext jobContext, IList<IStep> steps)
else
{
Trace.Info($"RunStepAsync execution initiated for step: '{step.DisplayName}'");
// Run the step with worker timeout integration.
await RunStepWithTimeoutAsync(step, jobContext.CancellationToken);
// Run the step.
await RunStepAsync(step, jobContext.CancellationToken);
Trace.Info($"RunStepAsync execution completed for step: '{step.DisplayName}' - Result: {step.ExecutionContext.Result}");
}
}
Expand Down Expand Up @@ -296,41 +280,6 @@ public async Task RunAsync(IExecutionContext jobContext, IList<IStep> steps)
}
}

private async Task RunStepWithTimeoutAsync(IStep step, CancellationToken jobCancellationToken)
{
Trace.Info($"Individual step execution initiated: '{step.DisplayName}'");

// Check if timeout log flushing feature is enabled
bool timeoutLogFlushingEnabled = AgentKnobs.EnableTimeoutLogFlushing.GetValue(step.ExecutionContext).AsBoolean();

// Register for worker timeout to cancel the step only if timeout log flushing is enabled
CancellationTokenRegistration? workerTimeoutRegistration = null;
if (timeoutLogFlushingEnabled && !HostContext.WorkerShutdownForTimeout.IsCancellationRequested)
{
workerTimeoutRegistration = HostContext.WorkerShutdownForTimeout.Register(() =>
{
Trace.Warning($"Worker timeout detected during step execution: '{step.DisplayName}' - cancelling step");
step.ExecutionContext.Error("Step cancelled due to worker timeout");
step.ExecutionContext.CancelToken();
});
Trace.Info($"Worker timeout registration active for step: '{step.DisplayName}'");
}

try
{
await RunStepAsync(step, jobCancellationToken);
}
finally
{
// Dispose worker timeout registration
if (workerTimeoutRegistration != null)
{
workerTimeoutRegistration.Value.Dispose();
Trace.Info($"Worker timeout registration disposed for step: '{step.DisplayName}'");
}
}
}

private async Task RunStepAsync(IStep step, CancellationToken jobCancellationToken)
{
Trace.Info($"Individual step execution initiated: '{step.DisplayName}'");
Expand Down Expand Up @@ -453,7 +402,7 @@ private async Task RunStepAsync(IStep step, CancellationToken jobCancellationTok
Trace.Info($"Step result merged with command result - Step: {step.DisplayName}, CommandResult:{step.ExecutionContext.CommandResult} FinalResult: {step.ExecutionContext.Result}");
}

// Fixup the step result if ContinueOnError.
// Fixup the step result if ContinueOnError.
if (step.ExecutionContext.Result == TaskResult.Failed && step.ContinueOnError)
{
step.ExecutionContext.Result = TaskResult.SucceededWithIssues;
Expand Down
5 changes: 1 addition & 4 deletions src/Agent.Worker/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public async Task<int> RunAsync(string pipeIn, string pipeOut)
{
case MessageType.CancelRequest:
Trace.Info("Job cancellation request received - initiating graceful job termination");
cancel = true;
jobRequestCancellationToken.Cancel(); // Expire the host cancellation token.
break;
case MessageType.AgentShutdown:
Expand All @@ -132,10 +133,6 @@ public async Task<int> RunAsync(string pipeIn, string pipeOut)
jobRunner.UpdateMetadata(metadataMessage);
Trace.Info("Job metadata update processed successfully");
break;
case MessageType.FlushLogsRequest:
Trace.Info("FlushLogsRequest received in main message loop");
HostContext.ShutdownWorkerForTimeout();
break;
default:
throw new ArgumentOutOfRangeException(nameof(channelMessage.MessageType), channelMessage.MessageType, nameof(channelMessage.MessageType));
}
Expand Down
Loading
Loading