Skip to content

Commit 8778d34

Browse files
committed
queue fixes
1 parent 6fa4e5a commit 8778d34

File tree

3 files changed

+786
-2
lines changed

3 files changed

+786
-2
lines changed

sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,9 @@ private async Task StartBackgroundDownloadsAsync(DownloadResult downloadResult,
414414
_logger.DebugFormat("MultipartDownloadManager: Background task waiting for {0} download tasks", expectedTaskCount);
415415

416416
// Wait for all downloads to complete (fails fast on first exception)
417-
await TaskHelpers.WhenAllOrFirstExceptionAsync(downloadTasks, internalCts.Token).ConfigureAwait(false);
417+
// Use fault-priority variant to ensure original exceptions propagate instead of
418+
// OperationCanceledException when cancellation occurs due to a task failure
419+
await TaskHelpers.WhenAllOrFirstExceptionWithFaultPriorityAsync(downloadTasks, internalCts.Token).ConfigureAwait(false);
418420

419421
_logger.DebugFormat("MultipartDownloadManager: All download tasks completed successfully");
420422

@@ -451,13 +453,21 @@ private async Task CreateDownloadTasksAsync(DownloadResult downloadResult, Event
451453
// Pre-acquire capacity in sequential order to prevent race condition deadlock
452454
// This ensures Part 2 gets capacity before Part 3, etc., preventing out-of-order
453455
// parts from consuming all buffer slots and blocking the next expected part
454-
for (int partNum = 2; partNum <= downloadResult.TotalParts; partNum++)
456+
for (int partNum = 2; partNum <= downloadResult.TotalParts && !internalCts.IsCancellationRequested; partNum++)
455457
{
456458
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for buffer space", partNum);
457459

458460
// Acquire capacity sequentially - guarantees Part 2 before Part 3, etc.
459461
await _dataHandler.WaitForCapacityAsync(internalCts.Token).ConfigureAwait(false);
460462

463+
// Check cancellation after acquiring capacity - a task may have failed while waiting
464+
if (internalCts.IsCancellationRequested)
465+
{
466+
_logger.InfoFormat("MultipartDownloadManager: [Part {0}] Stopping early - cancellation requested after capacity acquired", partNum);
467+
_dataHandler.ReleaseCapacity();
468+
break;
469+
}
470+
461471
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Buffer space acquired", partNum);
462472

463473
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for HTTP concurrency slot (Available: {1}/{2})",
@@ -466,22 +476,58 @@ private async Task CreateDownloadTasksAsync(DownloadResult downloadResult, Event
466476
// Acquire HTTP slot in the loop before creating task
467477
// Loop will block here if all slots are in use
468478
await _httpConcurrencySlots.WaitAsync(internalCts.Token).ConfigureAwait(false);
479+
480+
// Check cancellation after acquiring HTTP slot - a task may have failed while waiting
481+
if (internalCts.IsCancellationRequested)
482+
{
483+
_logger.InfoFormat("MultipartDownloadManager: [Part {0}] Stopping early - cancellation requested after HTTP slot acquired", partNum);
484+
_httpConcurrencySlots.Release();
485+
_dataHandler.ReleaseCapacity();
486+
break;
487+
}
469488

470489
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot acquired", partNum);
471490

472491
try
473492
{
474493
var task = CreateDownloadTaskAsync(partNum, downloadResult.ObjectSize, wrappedCallback, internalCts.Token);
494+
495+
// Add failure detection to immediately cancel internal token on first error
496+
// This prevents the for loop from queuing additional parts after a failure
497+
_ = task.ContinueWith(t =>
498+
{
499+
if (t.IsFaulted && !internalCts.IsCancellationRequested)
500+
{
501+
// Then cancel to stop queuing more parts
502+
// Note: The original exception will be propagated by WhenAllOrFirstExceptionAsync
503+
try
504+
{
505+
internalCts.Cancel();
506+
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Cancelled internal token to stop queuing", partNum);
507+
}
508+
catch (ObjectDisposedException)
509+
{
510+
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] CancellationTokenSource already disposed during cancellation", partNum);
511+
}
512+
}
513+
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
514+
475515
downloadTasks.Add(task);
476516
}
477517
catch (Exception ex)
478518
{
479519
// If task creation fails, release the HTTP slot we just acquired
480520
_httpConcurrencySlots.Release();
521+
_dataHandler.ReleaseCapacity();
481522
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot released due to task creation failure: {1}", partNum, ex);
482523
throw;
483524
}
484525
}
526+
527+
if (internalCts.IsCancellationRequested && downloadTasks.Count < downloadResult.TotalParts - 1)
528+
{
529+
_logger.InfoFormat("MultipartDownloadManager: Stopped queuing early at {0} parts due to cancellation", downloadTasks.Count);
530+
}
485531
}
486532

487533
/// <summary>

sdk/src/Services/S3/Custom/Transfer/Internal/TaskHelpers.cs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,53 @@ namespace Amazon.S3.Transfer.Internal
2727
/// </summary>
2828
internal static class TaskHelpers
2929
{
30+
/// <summary>
31+
/// Waits for all tasks to complete or till any task fails or is canceled.
32+
/// Prioritizes returning already-completed (especially faulted) tasks before checking cancellation.
33+
/// This ensures the original exception is propagated rather than OperationCanceledException when
34+
/// cancellation occurs due to a task failure.
35+
/// </summary>
36+
/// <param name="pendingTasks">List of tasks to wait for completion</param>
37+
/// <param name="cancellationToken">Cancellation token to observe</param>
38+
/// <returns>A task that represents the completion of all tasks or the first exception</returns>
39+
internal static async Task WhenAllOrFirstExceptionWithFaultPriorityAsync(List<Task> pendingTasks, CancellationToken cancellationToken)
40+
{
41+
int processed = 0;
42+
int total = pendingTasks.Count;
43+
44+
Logger.GetLogger(typeof(TaskHelpers)).DebugFormat("TaskHelpers.WhenAllOrFirstExceptionWithFaultPriorityAsync: Starting with TotalTasks={0}", total);
45+
46+
while (processed < total)
47+
{
48+
// First check if any task has already completed (synchronously)
49+
// This ensures faulted tasks are processed before cancellation is checked,
50+
// so the original exception takes precedence over OperationCanceledException
51+
var completedTask = pendingTasks.FirstOrDefault(t => t.IsCompleted);
52+
53+
if (completedTask == null)
54+
{
55+
// No task completed yet - check cancellation before blocking
56+
cancellationToken.ThrowIfCancellationRequested();
57+
58+
// Now wait for one to complete
59+
completedTask = await Task.WhenAny(pendingTasks)
60+
.ConfigureAwait(continueOnCapturedContext: false);
61+
}
62+
63+
// Process the completed task - this will throw if faulted
64+
await completedTask
65+
.ConfigureAwait(continueOnCapturedContext: false);
66+
67+
pendingTasks.Remove(completedTask);
68+
processed++;
69+
70+
Logger.GetLogger(typeof(TaskHelpers)).DebugFormat("TaskHelpers.WhenAllOrFirstExceptionWithFaultPriorityAsync: Task completed (Processed={0}/{1}, Remaining={2})",
71+
processed, total, pendingTasks.Count);
72+
}
73+
74+
Logger.GetLogger(typeof(TaskHelpers)).DebugFormat("TaskHelpers.WhenAllOrFirstExceptionWithFaultPriorityAsync: All tasks completed (Total={0})", total);
75+
}
76+
3077
/// <summary>
3178
/// Waits for all tasks to complete or till any task fails or is canceled.
3279
/// </summary>

0 commit comments

Comments
 (0)