Skip to content

Commit d2c51c7

Browse files
committed
dont queue more task if other one fails
1 parent 821729c commit d2c51c7

File tree

2 files changed

+211
-32
lines changed

2 files changed

+211
-32
lines changed

sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs

Lines changed: 75 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
using System.Collections.Generic;
2525
using System.Diagnostics.CodeAnalysis;
2626
using System.IO;
27+
using System.Linq;
2728
using System.Threading;
2829
using System.Threading.Tasks;
2930
using Amazon.Runtime;
@@ -377,37 +378,75 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
377378
// Pre-acquire capacity in sequential order to prevent race condition deadlock
378379
// This ensures Part 2 gets capacity before Part 3, etc., preventing out-of-order
379380
// parts from consuming all buffer slots and blocking the next expected part
380-
for (int partNum = 2; partNum <= discoveryResult.TotalParts; partNum++)
381+
try
381382
{
382-
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for buffer space", partNum);
383-
384-
// Acquire capacity sequentially - guarantees Part 2 before Part 3, etc.
385-
await _dataHandler.WaitForCapacityAsync(internalCts.Token).ConfigureAwait(false);
386-
387-
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Buffer space acquired", partNum);
388-
389-
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for HTTP concurrency slot (Available: {1}/{2})",
390-
partNum, _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests);
391-
392-
// Acquire HTTP slot in the loop before creating task
393-
// Loop will block here if all slots are in use
394-
await _httpConcurrencySlots.WaitAsync(internalCts.Token).ConfigureAwait(false);
395-
396-
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot acquired", partNum);
397-
398-
try
399-
{
400-
var task = CreateDownloadTaskAsync(partNum, discoveryResult.ObjectSize, wrappedCallback, internalCts.Token);
401-
downloadTasks.Add(task);
402-
}
403-
catch (Exception ex)
383+
for (int partNum = 2; partNum <= discoveryResult.TotalParts; partNum++)
404384
{
405-
// If task creation fails, release the HTTP slot we just acquired
406-
_httpConcurrencySlots.Release();
407-
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot released due to task creation failure: {1}", partNum, ex);
408-
throw;
385+
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for buffer space", partNum);
386+
387+
// Acquire capacity sequentially - guarantees Part 2 before Part 3, etc.
388+
await _dataHandler.WaitForCapacityAsync(internalCts.Token).ConfigureAwait(false);
389+
390+
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Buffer space acquired", partNum);
391+
392+
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for HTTP concurrency slot (Available: {1}/{2})",
393+
partNum, _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests);
394+
395+
// Acquire HTTP slot in the loop before creating task
396+
// Loop will block here if all slots are in use
397+
await _httpConcurrencySlots.WaitAsync(internalCts.Token).ConfigureAwait(false);
398+
399+
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot acquired", partNum);
400+
401+
try
402+
{
403+
var task = CreateDownloadTaskAsync(partNum, discoveryResult.ObjectSize, wrappedCallback, internalCts.Token);
404+
405+
// Add failure detection to immediately cancel internal token on first error
406+
// This prevents the for loop from queuing additional parts after a failure
407+
_ = task.ContinueWith(t =>
408+
{
409+
if (t.IsFaulted && !internalCts.IsCancellationRequested)
410+
{
411+
// Capture and log the ORIGINAL exception immediately
412+
var originalException = t.Exception?.GetBaseException();
413+
if (_downloadException == null && originalException != null)
414+
{
415+
_downloadException = originalException;
416+
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Download failed with exception: {1}",
417+
partNum, originalException.Message);
418+
}
419+
420+
// Then cancel to stop queuing more parts
421+
try
422+
{
423+
internalCts.Cancel();
424+
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Cancelled internal token to stop queuing", partNum);
425+
}
426+
catch (ObjectDisposedException)
427+
{
428+
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] CancellationTokenSource already disposed during cancellation", partNum);
429+
}
430+
}
431+
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
432+
433+
downloadTasks.Add(task);
434+
}
435+
catch (Exception ex)
436+
{
437+
// If task creation fails, release the HTTP slot we just acquired
438+
_httpConcurrencySlots.Release();
439+
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot released due to task creation failure: {1}", partNum, ex);
440+
throw;
441+
}
409442
}
410443
}
444+
catch (OperationCanceledException e)
445+
{
446+
// Expected when a task fails and ContinueWith cancels internalCts
447+
// Original exception already captured and logged in ContinueWith callback
448+
_logger.InfoFormat("MultipartDownloadManager: Stopped queuing early at {0} parts due to failure: {1}", downloadTasks.Count, e);
449+
}
411450

412451
var expectedTaskCount = downloadTasks.Count;
413452
_logger.DebugFormat("MultipartDownloadManager: Background task waiting for {0} download tasks", expectedTaskCount);
@@ -439,9 +478,11 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
439478

440479
catch (Exception ex)
441480
{
442-
_downloadException = ex;
443-
444-
481+
// Store the exception - this is the real failure from WhenAll, not a cancellation side-effect
482+
if (_downloadException == null)
483+
{
484+
_downloadException = ex;
485+
}
445486

446487
// Cancel all remaining downloads immediately to prevent cascading timeout errors
447488
// This ensures that when one part fails, other tasks stop gracefully instead of
@@ -688,8 +729,9 @@ private async Task<DownloadDiscoveryResult> DiscoverUsingPartStrategyAsync(Cance
688729
};
689730
}
690731
}
691-
catch
732+
catch (Exception ex)
692733
{
734+
_logger.Error(ex, "MultipartDownloadManager: Discovery using PART strategy failed");
693735
// On error, release semaphore and dispose response before rethrowing
694736
_httpConcurrencySlots.Release();
695737
firstPartResponse?.Dispose();
@@ -796,8 +838,9 @@ private async Task<DownloadDiscoveryResult> DiscoverUsingRangeStrategyAsync(Canc
796838
InitialResponse = firstRangeResponse // Keep response with stream
797839
};
798840
}
799-
catch
841+
catch (Exception ex)
800842
{
843+
_logger.Error(ex, "MultipartDownloadManager: Discovery using RANGE strategy failed");
801844
// On error, release semaphore and dispose response before rethrowing
802845
_httpConcurrencySlots.Release();
803846
firstRangeResponse?.Dispose();

sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using Amazon;
12
using Amazon.S3;
23
using Amazon.S3.Model;
34
using Amazon.S3.Transfer;
@@ -3597,5 +3598,140 @@ public async Task StartDownloadsAsync_CancellationRacesWithDispose_HandlesGracef
35973598
}
35983599

35993600
#endregion
3601+
3602+
#region Issue #2 - Continued Queuing After Failure Tests
3603+
3604+
[TestMethod]
3605+
public async Task StartDownloadsAsync_PartFailsDuringDownload_StopsQueuingNewParts()
3606+
{
3607+
3608+
AWSConfigs.LoggingConfig.LogTo = LoggingOptions.Console;
3609+
AWSConfigs.LoggingConfig.LogResponses = ResponseLoggingOption.Always;
3610+
AWSConfigs.LoggingConfig.LogMetrics = true;
3611+
3612+
// Arrange - Test Issue #2: When Part 3 fails, loop should stop immediately (not queue all 10 parts)
3613+
var totalParts = 10;
3614+
var partSize = 8 * 1024 * 1024;
3615+
var totalObjectSize = totalParts * partSize;
3616+
3617+
// Track which parts get queued (when GetObjectAsync is called)
3618+
var queuedParts = new System.Collections.Concurrent.ConcurrentBag<int>();
3619+
var queuedPartsLock = new object();
3620+
3621+
var mockDataHandler = new Mock<IPartDataHandler>();
3622+
3623+
// WaitForCapacityAsync succeeds for all parts
3624+
mockDataHandler
3625+
.Setup(x => x.WaitForCapacityAsync(It.IsAny<CancellationToken>()))
3626+
.Returns(Task.CompletedTask);
3627+
3628+
// PrepareAsync succeeds
3629+
mockDataHandler
3630+
.Setup(x => x.PrepareAsync(It.IsAny<DownloadDiscoveryResult>(), It.IsAny<CancellationToken>()))
3631+
.Returns(Task.CompletedTask);
3632+
3633+
// ProcessPartAsync: Parts 1-2 succeed, Part 3 fails, Parts 4+ should not be processed
3634+
mockDataHandler
3635+
.Setup(x => x.ProcessPartAsync(It.IsAny<int>(), It.IsAny<GetObjectResponse>(), It.IsAny<CancellationToken>()))
3636+
.Returns<int, GetObjectResponse, CancellationToken>((partNum, response, ct) =>
3637+
{
3638+
if (partNum == 3)
3639+
{
3640+
// Part 3 fails during processing
3641+
throw new InvalidOperationException("Simulated Part 3 failure");
3642+
}
3643+
return Task.CompletedTask;
3644+
});
3645+
3646+
mockDataHandler.Setup(x => x.ReleaseCapacity());
3647+
mockDataHandler.Setup(x => x.OnDownloadComplete(It.IsAny<Exception>()));
3648+
3649+
// Create mock S3 client that tracks GetObjectAsync calls
3650+
var callCount = 0;
3651+
var mockClient = new Mock<IAmazonS3>();
3652+
mockClient.Setup(x => x.GetObjectAsync(It.IsAny<GetObjectRequest>(), It.IsAny<CancellationToken>()))
3653+
.Returns<GetObjectRequest, CancellationToken>((req, ct) =>
3654+
{
3655+
callCount++;
3656+
int partNum = callCount; // Part numbers are 1-based, callCount tracks them
3657+
3658+
lock (queuedPartsLock)
3659+
{
3660+
queuedParts.Add(partNum);
3661+
}
3662+
3663+
// Return appropriate response for each part
3664+
if (callCount == 1)
3665+
{
3666+
// Discovery call - return multipart response
3667+
return Task.FromResult(MultipartDownloadTestHelpers.CreateMultipartFirstPartResponse(
3668+
partSize, totalParts, totalObjectSize, "test-etag"));
3669+
}
3670+
else
3671+
{
3672+
// Subsequent parts
3673+
var startByte = (partNum - 1) * partSize;
3674+
var endByte = Math.Min(startByte + partSize - 1, totalObjectSize - 1);
3675+
return Task.FromResult(MultipartDownloadTestHelpers.CreateMockGetObjectResponse(
3676+
partSize, totalParts,
3677+
$"bytes {startByte}-{endByte}/{totalObjectSize}",
3678+
"test-etag"));
3679+
}
3680+
});
3681+
3682+
var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(
3683+
downloadType: MultipartDownloadType.PART);
3684+
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(
3685+
concurrentRequests: 5); // Allow some concurrency
3686+
var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object);
3687+
3688+
var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
3689+
3690+
// Act
3691+
await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None);
3692+
3693+
// Wait for background task to fail - expect OperationCanceledException (natural flow)
3694+
try
3695+
{
3696+
await coordinator.DownloadCompletionTask;
3697+
Assert.Fail("Expected OperationCanceledException to be thrown");
3698+
}
3699+
catch (OperationCanceledException)
3700+
{
3701+
// Expected - OperationCanceledException is the public API response
3702+
// Original exception is captured in _downloadException
3703+
}
3704+
3705+
// Assert - Before fix: loop continues queuing all 10 parts even after Part 3 fails
3706+
// After fix: loop stops immediately, only ~4-5 parts should be queued
3707+
lock (queuedPartsLock)
3708+
{
3709+
var maxPartQueued = queuedParts.Max();
3710+
var queuedPartsList = queuedParts.OrderBy(p => p).ToList();
3711+
3712+
// Key assertion: Part 10 should NOT be queued (proves loop stopped early)
3713+
Assert.IsFalse(queuedParts.Contains(10),
3714+
$"Part 10 should NOT be queued after Part 3 fails. Queued parts: {string.Join(", ", queuedPartsList)}");
3715+
3716+
// Verify max queued part is reasonable (should stop around Part 4-5)
3717+
// Allow up to Part 6 to account for already-in-flight downloads with concurrency=5
3718+
Assert.IsTrue(maxPartQueued <= 6,
3719+
$"Max queued part should be <= 6 after Part 3 fails. Actual max: {maxPartQueued}. Queued parts: {string.Join(", ", queuedPartsList)}");
3720+
3721+
// Verify we didn't queue all parts (the bug behavior)
3722+
Assert.IsTrue(queuedParts.Count < totalParts,
3723+
$"Should not queue all {totalParts} parts after Part 3 fails. Queued {queuedParts.Count} parts: {string.Join(", ", queuedPartsList)}");
3724+
}
3725+
3726+
// Verify ORIGINAL exception was captured in _downloadException (not the cancellation)
3727+
Assert.IsNotNull(coordinator.DownloadException,
3728+
"Download exception should be captured when Part 3 fails");
3729+
Assert.IsInstanceOfType(coordinator.DownloadException, typeof(InvalidOperationException),
3730+
"Download exception should be the ORIGINAL Part 3 failure (InvalidOperationException), not OperationCanceledException");
3731+
Assert.IsTrue(coordinator.DownloadException.Message.Contains("Simulated Part 3 failure"),
3732+
"Download exception message should match the original Part 3 failure");
3733+
}
3734+
3735+
#endregion
36003736
}
36013737
}

0 commit comments

Comments
 (0)