diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/IDownloadManager.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/IDownloadManager.cs index 662076ded6da..3c54bfcd8b00 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/IDownloadManager.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/IDownloadManager.cs @@ -55,11 +55,6 @@ internal interface IDownloadManager : IDisposable /// with the previous two-method API. /// Task StartDownloadAsync(EventHandler progressCallback, CancellationToken cancellationToken); - - /// - /// Exception that occurred during downloads, if any. - /// - Exception DownloadException { get; } } /// diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs index ef50b5f4a7bf..a961fa0141f8 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs @@ -46,8 +46,6 @@ internal class MultipartDownloadManager : IDownloadManager private readonly SemaphoreSlim _httpConcurrencySlots; private readonly bool _ownsHttpThrottler; private readonly RequestEventHandler _requestEventHandler; - - private Exception _downloadException; private bool _disposed = false; private bool _discoveryCompleted = false; private Task _downloadCompletionTask; @@ -166,15 +164,6 @@ public MultipartDownloadManager(IAmazonS3 s3Client, BaseDownloadRequest request, } } - /// - public Exception DownloadException - { - get - { - return _downloadException; - } - } - /// /// Discovers the download strategy and starts concurrent downloads in a single unified operation. /// This eliminates resource leakage by managing HTTP slots and buffer capacity internally. @@ -259,7 +248,6 @@ private async Task PerformDiscoveryAsync(CancellationToken cance } catch (Exception ex) { - _downloadException = ex; _logger.Error(ex, "MultipartDownloadManager: Discovery failed"); throw; } @@ -336,7 +324,6 @@ private async Task PerformDownloadsAsync(DownloadResult downloadResult, EventHan } catch (Exception ex) { - _downloadException = ex; _logger.Error(ex, "MultipartDownloadManager: Download failed"); HandleDownloadError(ex, internalCts); @@ -414,7 +401,7 @@ private async Task StartBackgroundDownloadsAsync(DownloadResult downloadResult, _logger.DebugFormat("MultipartDownloadManager: Background task waiting for {0} download tasks", expectedTaskCount); // Wait for all downloads to complete (fails fast on first exception) - await TaskHelpers.WhenAllOrFirstExceptionAsync(downloadTasks, internalCts.Token).ConfigureAwait(false); + await TaskHelpers.WhenAllFailFastAsync(downloadTasks, internalCts.Token).ConfigureAwait(false); _logger.DebugFormat("MultipartDownloadManager: All download tasks completed successfully"); @@ -429,7 +416,6 @@ private async Task StartBackgroundDownloadsAsync(DownloadResult downloadResult, #pragma warning disable CA1031 // Do not catch general exception types catch (Exception ex) { - _downloadException = ex; HandleDownloadError(ex, internalCts); throw; } @@ -451,13 +437,21 @@ private async Task CreateDownloadTasksAsync(DownloadResult downloadResult, Event // Pre-acquire capacity in sequential order to prevent race condition deadlock // This ensures Part 2 gets capacity before Part 3, etc., preventing out-of-order // parts from consuming all buffer slots and blocking the next expected part - for (int partNum = 2; partNum <= downloadResult.TotalParts; partNum++) + for (int partNum = 2; partNum <= downloadResult.TotalParts && !internalCts.IsCancellationRequested; partNum++) { _logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for buffer space", partNum); // Acquire capacity sequentially - guarantees Part 2 before Part 3, etc. await _dataHandler.WaitForCapacityAsync(internalCts.Token).ConfigureAwait(false); + // Check cancellation after acquiring capacity - a task may have failed while waiting + if (internalCts.IsCancellationRequested) + { + _logger.InfoFormat("MultipartDownloadManager: [Part {0}] Stopping early - cancellation requested after capacity acquired", partNum); + _dataHandler.ReleaseCapacity(); + break; + } + _logger.DebugFormat("MultipartDownloadManager: [Part {0}] Buffer space acquired", partNum); _logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for HTTP concurrency slot (Available: {1}/{2})", @@ -466,6 +460,15 @@ private async Task CreateDownloadTasksAsync(DownloadResult downloadResult, Event // Acquire HTTP slot in the loop before creating task // Loop will block here if all slots are in use await _httpConcurrencySlots.WaitAsync(internalCts.Token).ConfigureAwait(false); + + // Check cancellation after acquiring HTTP slot - a task may have failed while waiting + if (internalCts.IsCancellationRequested) + { + _logger.InfoFormat("MultipartDownloadManager: [Part {0}] Stopping early - cancellation requested after HTTP slot acquired", partNum); + _httpConcurrencySlots.Release(); + _dataHandler.ReleaseCapacity(); + break; + } _logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot acquired", partNum); @@ -478,10 +481,16 @@ private async Task CreateDownloadTasksAsync(DownloadResult downloadResult, Event { // If task creation fails, release the HTTP slot we just acquired _httpConcurrencySlots.Release(); + _dataHandler.ReleaseCapacity(); _logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot released due to task creation failure: {1}", partNum, ex); throw; } } + + if (internalCts.IsCancellationRequested && downloadTasks.Count < downloadResult.TotalParts - 1) + { + _logger.InfoFormat("MultipartDownloadManager: Stopped queuing early at {0} parts due to cancellation", downloadTasks.Count); + } } /// @@ -491,7 +500,7 @@ private void ValidateDownloadCompletion(int expectedTaskCount, int totalParts) { // SEP Part GET Step 6 / Ranged GET Step 8: // "validate that the total number of part GET requests sent matches with the expected PartsCount" - // Note: This should always be true if we reach this point, since WhenAllOrFirstException + // Note: This should always be true if we reach this point, since WhenAllFailFastAsync // ensures all tasks completed successfully (or threw on first failure). // The check serves as a defensive assertion for SEP compliance. // Note: expectedTaskCount + 1 accounts for Part 1 being buffered during discovery diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/TaskHelpers.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/TaskHelpers.cs index 4ca8db0c4fea..acc3dce55b6f 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/TaskHelpers.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/TaskHelpers.cs @@ -27,10 +27,52 @@ namespace Amazon.S3.Transfer.Internal /// internal static class TaskHelpers { + /// + /// Waits for all tasks to complete, failing fast on the first exception. + /// When any task faults, its exception is immediately propagated without waiting for other tasks. + /// + /// List of tasks to wait for completion. This list is not modified. + /// Cancellation token to observe (not actively checked - caller handles cancellation) + /// A task that represents the completion of all tasks or throws on first exception + /// + /// This method creates an internal copy of the task list for tracking purposes, + /// so the caller's list remains unchanged after this method completes. + /// The caller is responsible for cancelling remaining tasks when this method throws. + /// + internal static async Task WhenAllFailFastAsync(List pendingTasks, CancellationToken cancellationToken) + { + var remaining = new HashSet(pendingTasks); + int total = remaining.Count; + int processed = 0; + + Logger.GetLogger(typeof(TaskHelpers)).DebugFormat("TaskHelpers.WhenAllFailFastAsync: Starting with TotalTasks={0}", total); + + while (remaining.Count > 0) + { + // Wait for any task to complete + var completedTask = await Task.WhenAny(remaining) + .ConfigureAwait(continueOnCapturedContext: false); + + // Process the completed task - will throw if faulted + // The caller's catch block handles cancellation AFTER this exception propagates, + // which ensures the original exception is always thrown (not OperationCanceledException) + await completedTask + .ConfigureAwait(continueOnCapturedContext: false); + + remaining.Remove(completedTask); + processed++; + + Logger.GetLogger(typeof(TaskHelpers)).DebugFormat("TaskHelpers.WhenAllFailFastAsync: Task completed (Processed={0}/{1}, Remaining={2})", + processed, total, remaining.Count); + } + + Logger.GetLogger(typeof(TaskHelpers)).DebugFormat("TaskHelpers.WhenAllFailFastAsync: All tasks completed (Total={0})", total); + } + /// /// Waits for all tasks to complete or till any task fails or is canceled. /// - /// List of tasks to wait for completion + /// List of tasks to wait for completion. Note: This list is mutated during processing. /// Cancellation token to observe /// A task that represents the completion of all tasks or the first exception internal static async Task WhenAllOrFirstExceptionAsync(List pendingTasks, CancellationToken cancellationToken) @@ -47,8 +89,8 @@ internal static async Task WhenAllOrFirstExceptionAsync(List pendingTasks, var completedTask = await Task.WhenAny(pendingTasks) .ConfigureAwait(continueOnCapturedContext: false); - //If RanToCompletion a response will be returned - //If Faulted or Canceled an appropriate exception will be thrown + // If RanToCompletion a response will be returned + // If Faulted or Canceled an appropriate exception will be thrown await completedTask .ConfigureAwait(continueOnCapturedContext: false); diff --git a/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs index f77562ac617f..ed047675bcb3 100644 --- a/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs +++ b/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs @@ -130,7 +130,6 @@ public void Constructor_WithValidParameters_CreatesCoordinator() // Assert Assert.IsNotNull(coordinator); - Assert.IsNull(coordinator.DownloadException); } [DataTestMethod] @@ -192,26 +191,6 @@ public void Constructor_WithEncryptionClient_ExceptionMessageIsDescriptive() #endregion - #region Property Tests - - [TestMethod] - public void DownloadException_InitiallyNull() - { - // Arrange - var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(); - var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); - var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); - var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); - - // Act - var exception = coordinator.DownloadException; - - // Assert - Assert.IsNull(exception); - } - - #endregion - #region Discovery - PART Strategy - Single Part Tests [TestMethod] @@ -1156,8 +1135,6 @@ public async Task StartDownloadsAsync_BackgroundTaskSuccess_DisposesCancellation !coordinator.DownloadCompletionTask.IsCanceled, "Background task should complete successfully"); - Assert.IsNull(coordinator.DownloadException, - "No download exception should occur"); } [TestMethod] @@ -1217,10 +1194,8 @@ public async Task StartDownloadsAsync_BackgroundTaskFailure_DisposesCancellation // Assert - Background task should have failed but cleanup should be done Assert.IsTrue(coordinator.DownloadCompletionTask.IsCompleted, "Background task should be completed (even with failure)"); - Assert.IsNotNull(coordinator.DownloadException, - "Download exception should be captured"); - Assert.IsInstanceOfType(coordinator.DownloadException, typeof(InvalidOperationException), - "Should capture the simulated failure"); + Assert.IsTrue(coordinator.DownloadCompletionTask.IsFaulted, + "Background task should be faulted"); } [TestMethod] @@ -1270,8 +1245,7 @@ public async Task StartDownloadsAsync_EarlyError_DisposesCancellationTokenSource Assert.AreEqual("Simulated prepare failure", ex.Message); } - // Assert - Exception should be captured and no background task should exist - Assert.IsNotNull(coordinator.DownloadException, "Download exception should be captured"); + // Assert - DownloadCompletionTask should return completed task when no background work exists Assert.IsTrue(coordinator.DownloadCompletionTask.IsCompleted, "DownloadCompletionTask should return completed task when no background work exists"); } @@ -1341,8 +1315,8 @@ public async Task StartDownloadsAsync_BackgroundTaskCancellation_HandlesTokenDis // Assert - Cancellation should be handled properly with cleanup Assert.IsTrue(coordinator.DownloadCompletionTask.IsCompleted, "Background task should be completed"); - Assert.IsNotNull(coordinator.DownloadException, - "Cancellation exception should be captured"); + Assert.IsTrue(coordinator.DownloadCompletionTask.IsFaulted || coordinator.DownloadCompletionTask.IsCanceled, + "Background task should be faulted or canceled"); } #endregion @@ -1424,36 +1398,6 @@ public async Task StartDownloadAsync_SinglePart_WithPreCancelledToken_ThrowsOper } - [TestMethod] - public async Task DiscoverDownloadStrategyAsync_WhenCancelled_SetsDownloadException() - { - // Arrange - var mockClient = new Mock(); - var cancelledException = new OperationCanceledException(); - mockClient.Setup(x => x.GetObjectAsync(It.IsAny(), It.IsAny())) - .ThrowsAsync(cancelledException); - - var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); - var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); - var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); - - var cts = new CancellationTokenSource(); - cts.Cancel(); - - // Act - try - { - await coordinator.StartDownloadAsync(null, cts.Token); - } - catch (OperationCanceledException) - { - // Expected - } - - // Assert - Assert.IsNotNull(coordinator.DownloadException); - Assert.IsInstanceOfType(coordinator.DownloadException, typeof(OperationCanceledException)); - } [TestMethod] public async Task DiscoverDownloadStrategyAsync_PassesCancellationTokenToS3Client() @@ -1549,13 +1493,14 @@ public async Task StartDownloadsAsync_WhenCancelledDuringDownloads_NotifiesBuffe // Expected } - // Assert - Assert.IsNotNull(coordinator.DownloadException); - Assert.IsInstanceOfType(coordinator.DownloadException, typeof(OperationCanceledException)); + // Assert - Verify DownloadCompletionTask is faulted with the cancellation exception + Assert.IsTrue(coordinator.DownloadCompletionTask.IsCompleted, "DownloadCompletionTask should be completed"); + Assert.IsTrue(coordinator.DownloadCompletionTask.IsFaulted || coordinator.DownloadCompletionTask.IsCanceled, + "DownloadCompletionTask should be faulted or canceled"); } [TestMethod] - public async Task StartDownloadsAsync_WhenCancelled_SetsDownloadException() + public async Task StartDownloadsAsync_WhenCancelled_CompletionTaskIsFaulted() { // Arrange var totalParts = 3; @@ -1594,9 +1539,10 @@ public async Task StartDownloadsAsync_WhenCancelled_SetsDownloadException() // Expected } - // Assert - Assert.IsNotNull(coordinator.DownloadException); - Assert.IsInstanceOfType(coordinator.DownloadException, typeof(OperationCanceledException)); + // Assert - Verify DownloadCompletionTask is faulted with the cancellation + Assert.IsTrue(coordinator.DownloadCompletionTask.IsCompleted, "DownloadCompletionTask should be completed"); + Assert.IsTrue(coordinator.DownloadCompletionTask.IsFaulted || coordinator.DownloadCompletionTask.IsCanceled, + "DownloadCompletionTask should be faulted or canceled"); } [TestMethod] @@ -1680,8 +1626,9 @@ public async Task StartDownloadsAsync_CancellationPropagatesAcrossConcurrentDown // Expected } - // Assert - Error should be captured - Assert.IsNotNull(coordinator.DownloadException); + // Assert - DownloadCompletionTask should be faulted + Assert.IsTrue(coordinator.DownloadCompletionTask.IsFaulted || coordinator.DownloadCompletionTask.IsCanceled, + "DownloadCompletionTask should be faulted or canceled when errors occur"); } [TestMethod] @@ -3418,10 +3365,8 @@ public async Task StartDownloadsAsync_BackgroundPartFails_CancelsInternalToken() Assert.IsTrue(part3SawCancellation, "Part 3 should have received cancellation via internalCts.Token (deterministic with TaskCompletionSource)"); - Assert.IsNotNull(coordinator.DownloadException, - "Download exception should be captured when background part fails"); - Assert.IsInstanceOfType(coordinator.DownloadException, typeof(InvalidOperationException), - "Download exception should be the Part 2 failure"); + Assert.IsTrue(coordinator.DownloadCompletionTask.IsFaulted, + "DownloadCompletionTask should be faulted when background part fails"); } [TestMethod] @@ -3484,7 +3429,7 @@ public async Task StartDownloadsAsync_MultiplePartsFail_HandlesGracefully() // Assert - Should handle multiple failures gracefully Assert.IsTrue(failedParts.Count > 0, "At least one part should have failed"); - Assert.IsNotNull(coordinator.DownloadException, "Download exception should be captured"); + Assert.IsTrue(coordinator.DownloadCompletionTask.IsFaulted, "DownloadCompletionTask should be faulted"); } [TestMethod] @@ -3559,10 +3504,674 @@ public async Task StartDownloadsAsync_CancellationRacesWithDispose_HandlesGracef // by checking IsCancellationRequested before calling Cancel() Assert.IsFalse(objectDisposedExceptionCaught, "ObjectDisposedException should not propagate due to IsCancellationRequested check"); - Assert.IsNotNull(coordinator.DownloadException, - "Download exception should be the original failure, not ObjectDisposedException"); - Assert.IsInstanceOfType(coordinator.DownloadException, typeof(InvalidOperationException), - "Download exception should be the original InvalidOperationException from Part 2 failure"); + Assert.IsTrue(coordinator.DownloadCompletionTask.IsFaulted, + "DownloadCompletionTask should be faulted with the original failure"); + + // Verify the exception type via the Task's exception + var aggregateException = coordinator.DownloadCompletionTask.Exception; + Assert.IsNotNull(aggregateException, "Task should have an exception"); + Assert.IsInstanceOfType(aggregateException.InnerException, typeof(InvalidOperationException), + "Inner exception should be the original InvalidOperationException from Part 2 failure"); + } + + [TestMethod] + public async Task StartDownloadsAsync_PartFailsDuringDownload_OriginalExceptionPropagatesFromCompletionTask() + { + // Arrange - Test that when a part fails with InvalidOperationException, + // the DownloadCompletionTask throws InvalidOperationException (not OperationCanceledException) + // This validates the WhenAllOrFirstExceptionWithFaultPriorityAsync fix + var totalParts = 5; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var mockDataHandler = new Mock(); + + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + mockDataHandler + .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + // Part 1, 2 succeed; Part 3 fails with InvalidOperationException + mockDataHandler + .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns((partNum, response, ct) => + { + if (partNum <= 2) + { + return Task.CompletedTask; // Parts 1-2 succeed + } + if (partNum == 3) + { + throw new InvalidOperationException("Simulated Part 3 failure"); + } + // Parts 4-5 may or may not run depending on cancellation timing + ct.ThrowIfCancellationRequested(); + return Task.CompletedTask; + }); + + mockDataHandler.Setup(x => x.ReleaseCapacity()); + mockDataHandler.Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 2); + var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); + + // Act + await coordinator.StartDownloadAsync(null, CancellationToken.None); + + Exception caughtException = null; + try + { + await coordinator.DownloadCompletionTask; + } + catch (Exception ex) + { + caughtException = ex; + } + + // Assert - The key validation: exception should be InvalidOperationException, NOT OperationCanceledException + // Before fix: WhenAllOrFirstExceptionAsync checked cancellation before processing faulted tasks, + // so OperationCanceledException would be thrown instead of the original exception + // After fix: WhenAllOrFirstExceptionWithFaultPriorityAsync checks for completed tasks first, + // ensuring the original InvalidOperationException propagates + Assert.IsNotNull(caughtException, "DownloadCompletionTask should throw an exception"); + Assert.IsInstanceOfType(caughtException, typeof(InvalidOperationException), + "DownloadCompletionTask should throw InvalidOperationException (the original failure), " + + "NOT OperationCanceledException. If this fails, WhenAllOrFirstExceptionWithFaultPriorityAsync " + + "is not properly prioritizing faulted tasks over cancellation checks."); + Assert.AreEqual("Simulated Part 3 failure", caughtException.Message, + "The original exception message should be preserved"); + + // Also verify DownloadCompletionTask is faulted + Assert.IsTrue(coordinator.DownloadCompletionTask.IsFaulted, "DownloadCompletionTask should be faulted"); + } + + #endregion + + #region Semaphore and Capacity Release Tests + + [TestMethod] + public async Task CreateDownloadTasksAsync_CancellationAfterCapacityBeforeHttpSlot_ReleasesCapacityExactlyOnce() + { + // Arrange - Test that when cancellation happens after acquiring capacity but before HTTP slot, + // capacity is released exactly once (not double-released) + var totalParts = 3; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var capacityReleaseCount = 0; + var capacityAcquireCount = 0; + var httpSlotAcquireCount = 0; + + // Use a blocking HTTP throttler that we control + var httpThrottler = new SemaphoreSlim(1, 1); + + // Control when Part 2 can acquire HTTP slot + var part2CanAcquireHttpSlot = new TaskCompletionSource(); + var part2AcquiredCapacity = new TaskCompletionSource(); + + var mockDataHandler = new Mock(); + + // Track capacity acquisition + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(ct => + { + var count = Interlocked.Increment(ref capacityAcquireCount); + if (count == 2) // Part 2's capacity acquisition + { + part2AcquiredCapacity.SetResult(true); + // Wait a bit to let the cancellation happen + return Task.Delay(50); + } + return Task.CompletedTask; + }); + + // Track capacity release + mockDataHandler + .Setup(x => x.ReleaseCapacity()) + .Callback(() => + { + Interlocked.Increment(ref capacityReleaseCount); + }); + + // Part 1 processing succeeds + mockDataHandler + .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + mockDataHandler.Setup(x => x.OnDownloadComplete(It.IsAny())); + + // S3 client: Part 1 succeeds, Part 2 will be cancelled before HTTP request + var callCount = 0; + var mockClient = new Mock(); + mockClient + .Setup(x => x.GetObjectAsync(It.IsAny(), It.IsAny())) + .Returns(async (req, ct) => + { + await Task.Yield(); + var count = Interlocked.Increment(ref callCount); + if (count == 1) + { + // Part 1 discovery succeeds + return MultipartDownloadTestHelpers.CreateMultipartFirstPartResponse( + partSize, totalParts, totalObjectSize, "test-etag"); + } + + Interlocked.Increment(ref httpSlotAcquireCount); + // Part 2 HTTP request - should not reach here if cancellation works + throw new OperationCanceledException(); + }); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); + + // Act + var startTask = coordinator.StartDownloadAsync(null, CancellationToken.None); + + // Wait for Part 1 to complete and Part 2 to acquire capacity + await startTask; + + // The background task will cancel when Part 2 tries to acquire the HTTP slot + // and finds the slot is held (we're not releasing it) + try + { + await coordinator.DownloadCompletionTask; + } + catch (OperationCanceledException) + { + // Expected - timed out or cancelled + } + catch (Exception) + { + // Other exceptions are also acceptable for this test + } + + // Assert - Capacity should be released exactly once per acquisition (no double-release) + // Part 1 capacity is released in ProcessFirstPartAsync's finally block (not ReleaseCapacity) + // Part 2+ capacity is released via ReleaseCapacity when cancellation or error occurs + Assert.IsTrue(capacityReleaseCount <= capacityAcquireCount - 1, + $"Capacity should not be double-released. Acquired={capacityAcquireCount}, Released={capacityReleaseCount}"); + + // Cleanup + httpThrottler.Dispose(); + } + + [TestMethod] + public async Task CreateDownloadTasksAsync_CancellationAfterBothAcquired_ReleasesBothExactlyOnce() + { + // Arrange - Test that when cancellation happens after acquiring both capacity and HTTP slot, + // both are released exactly once + var totalParts = 3; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var capacityReleaseCount = 0; + var capacityAcquireCount = 0; + + var httpThrottler = new SemaphoreSlim(2, 2); + var initialHttpCount = httpThrottler.CurrentCount; + + var mockDataHandler = new Mock(); + + // Track capacity acquisition + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(() => + { + Interlocked.Increment(ref capacityAcquireCount); + return Task.CompletedTask; + }); + + // Track capacity release + mockDataHandler + .Setup(x => x.ReleaseCapacity()) + .Callback(() => + { + Interlocked.Increment(ref capacityReleaseCount); + }); + + // Part 1 processing succeeds + mockDataHandler + .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + // Part 2 processing fails after both capacity and HTTP slot are acquired + mockDataHandler + .Setup(x => x.ProcessPartAsync(2, It.IsAny(), It.IsAny())) + .ThrowsAsync(new InvalidOperationException("Simulated Part 2 processing failure")); + + mockDataHandler.Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 2); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); + + // Act + await coordinator.StartDownloadAsync(null, CancellationToken.None); + + try + { + await coordinator.DownloadCompletionTask; + } + catch (InvalidOperationException) + { + // Expected failure from Part 2 + } + + // Assert - HTTP semaphore should be back to initial count (all slots released) + Assert.AreEqual(initialHttpCount, httpThrottler.CurrentCount, + $"HTTP semaphore should be fully released. Initial={initialHttpCount}, Current={httpThrottler.CurrentCount}"); + + // Capacity releases should match acquisitions minus Part 1 (which doesn't use ReleaseCapacity) + // Part 2 will release capacity in error handler + Assert.IsTrue(capacityReleaseCount >= 1, + $"At least Part 2's capacity should be released. Released={capacityReleaseCount}"); + + // Cleanup + httpThrottler.Dispose(); + } + + [TestMethod] + public async Task CreateDownloadTasksAsync_TaskCreationFails_ReleasesHttpSlotAndCapacity() + { + // Arrange - Test that if task creation fails, both HTTP slot and capacity are released + // This tests the catch block in CreateDownloadTasksAsync + var totalParts = 3; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var capacityReleaseCount = 0; + + var httpThrottler = new SemaphoreSlim(2, 2); + var initialHttpCount = httpThrottler.CurrentCount; + + var mockDataHandler = new Mock(); + + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + mockDataHandler + .Setup(x => x.ReleaseCapacity()) + .Callback(() => + { + Interlocked.Increment(ref capacityReleaseCount); + }); + + // Part 1 processing succeeds + mockDataHandler + .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + mockDataHandler.Setup(x => x.OnDownloadComplete(It.IsAny())); + + // S3 client: Part 1 succeeds, Part 2 HTTP request fails + var callCount = 0; + var mockClient = new Mock(); + mockClient + .Setup(x => x.GetObjectAsync(It.IsAny(), It.IsAny())) + .Returns(() => + { + callCount++; + if (callCount == 1) + { + return Task.FromResult(MultipartDownloadTestHelpers.CreateMultipartFirstPartResponse( + partSize, totalParts, totalObjectSize, "test-etag")); + } + // Part 2 HTTP request fails + throw new AmazonS3Exception("Simulated HTTP failure"); + }); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); + + // Act + await coordinator.StartDownloadAsync(null, CancellationToken.None); + + try + { + await coordinator.DownloadCompletionTask; + } + catch (AmazonS3Exception) + { + // Expected + } + + // Assert - HTTP semaphore should be fully released + Assert.AreEqual(initialHttpCount, httpThrottler.CurrentCount, + $"HTTP semaphore should be fully released after HTTP failure. Initial={initialHttpCount}, Current={httpThrottler.CurrentCount}"); + + // Capacity should be released for failed part + Assert.IsTrue(capacityReleaseCount >= 1, + $"Capacity should be released for failed Part 2. Released={capacityReleaseCount}"); + + // Cleanup + httpThrottler.Dispose(); + } + + [TestMethod] + public async Task CreateDownloadTasksAsync_MultiplePartsFailConcurrently_NoDoubleRelease() + { + // Arrange - Test that when multiple parts fail concurrently, no double releases occur + var totalParts = 5; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var capacityReleaseCount = 0; + var capacityAcquireCount = 0; + + var httpThrottler = new SemaphoreSlim(3, 3); // Allow 3 concurrent requests + var initialHttpCount = httpThrottler.CurrentCount; + + var mockDataHandler = new Mock(); + + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(() => + { + Interlocked.Increment(ref capacityAcquireCount); + return Task.CompletedTask; + }); + + mockDataHandler + .Setup(x => x.ReleaseCapacity()) + .Callback(() => + { + Interlocked.Increment(ref capacityReleaseCount); + }); + + // Part 1 succeeds + mockDataHandler + .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + // Parts 2, 3, 4 all fail concurrently + mockDataHandler + .Setup(x => x.ProcessPartAsync(It.IsInRange(2, 5, Moq.Range.Inclusive), It.IsAny(), It.IsAny())) + .ThrowsAsync(new InvalidOperationException("Simulated concurrent failure")); + + mockDataHandler.Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 3); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); + + // Act + await coordinator.StartDownloadAsync(null, CancellationToken.None); + + try + { + await coordinator.DownloadCompletionTask; + } + catch (InvalidOperationException) + { + // Expected - first failure propagates + } + + // Assert - No double releases should occur + // HTTP semaphore should be back to initial count + Assert.AreEqual(initialHttpCount, httpThrottler.CurrentCount, + $"HTTP semaphore should be fully released. Initial={initialHttpCount}, Current={httpThrottler.CurrentCount}"); + + // Capacity releases should not exceed acquisitions minus Part 1 + Assert.IsTrue(capacityReleaseCount <= capacityAcquireCount - 1, + $"Capacity should not be double-released. Acquired={capacityAcquireCount}, Released={capacityReleaseCount}"); + + // Cleanup + httpThrottler.Dispose(); + } + + [TestMethod] + public async Task CreateDownloadTasksAsync_CancellationDuringCapacityWait_DoesNotReleaseUnacquiredResources() + { + // Arrange - Test that when cancellation happens DURING capacity wait, + // no resources are released (since they weren't acquired) + var totalParts = 3; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var capacityReleaseCount = 0; + + var httpThrottler = new SemaphoreSlim(2, 2); + var initialHttpCount = httpThrottler.CurrentCount; + + var cts = new CancellationTokenSource(); + var mockDataHandler = new Mock(); + + var callCount = 0; + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(ct => + { + callCount++; + if (callCount == 1) + { + // Part 1 discovery succeeds + return Task.CompletedTask; + } + // Part 2 capacity wait is cancelled + cts.Cancel(); + throw new OperationCanceledException(); + }); + + mockDataHandler + .Setup(x => x.ReleaseCapacity()) + .Callback(() => + { + Interlocked.Increment(ref capacityReleaseCount); + }); + + // Part 1 processing succeeds + mockDataHandler + .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + mockDataHandler.Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); + + // Act + await coordinator.StartDownloadAsync(null, CancellationToken.None); + + try + { + await coordinator.DownloadCompletionTask; + } + catch (OperationCanceledException) + { + // Expected + } + + // Assert - No resources should be released for Part 2 since capacity was never acquired + Assert.AreEqual(0, capacityReleaseCount, + $"No capacity should be released when cancelled during WaitForCapacityAsync. Released={capacityReleaseCount}"); + + // HTTP semaphore should still be at initial count (Part 1's slot was released normally) + Assert.AreEqual(initialHttpCount, httpThrottler.CurrentCount, + $"HTTP semaphore should be at initial count. Initial={initialHttpCount}, Current={httpThrottler.CurrentCount}"); + + // Cleanup + httpThrottler.Dispose(); + } + + [TestMethod] + public async Task CreateDownloadTasksAsync_SuccessfulDownload_AllResourcesReleasedProperly() + { + // Arrange - Test that on successful download, all resources are released properly + var totalParts = 4; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var capacityAcquireCount = 0; + var capacityReleaseCount = 0; + + var httpThrottler = new SemaphoreSlim(2, 2); + var initialHttpCount = httpThrottler.CurrentCount; + + var mockDataHandler = new Mock(); + + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(() => + { + Interlocked.Increment(ref capacityAcquireCount); + return Task.CompletedTask; + }); + + mockDataHandler + .Setup(x => x.ReleaseCapacity()) + .Callback(() => + { + Interlocked.Increment(ref capacityReleaseCount); + }); + + // All parts succeed + mockDataHandler + .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + mockDataHandler.Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 2); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); + + // Act + await coordinator.StartDownloadAsync(null, CancellationToken.None); + await coordinator.DownloadCompletionTask; + + // Assert - All resources should be released properly + // HTTP semaphore should be back to initial count + Assert.AreEqual(initialHttpCount, httpThrottler.CurrentCount, + $"HTTP semaphore should be fully released after successful download. Initial={initialHttpCount}, Current={httpThrottler.CurrentCount}"); + + // Capacity is acquired for all parts but released differently: + // - Part 1: Capacity is managed by the stream (not via ReleaseCapacity) + // - Parts 2-4: Should NOT call ReleaseCapacity on success (handler manages it) + // Note: ReleaseCapacity is only called on ERROR paths in CreateDownloadTaskAsync + Assert.AreEqual(0, capacityReleaseCount, + $"ReleaseCapacity should not be called on success path (handler manages capacity). Released={capacityReleaseCount}"); + + // Verify all parts acquired capacity + Assert.AreEqual(totalParts, capacityAcquireCount, + $"All parts should have acquired capacity. Acquired={capacityAcquireCount}"); + + // Cleanup + httpThrottler.Dispose(); + } + + [TestMethod] + public async Task CreateDownloadTasksAsync_CancellationImmediatelyAfterHttpSlot_ReleasesResourcesCorrectly() + { + // Arrange - Test the specific code path where cancellation is detected + // immediately after acquiring HTTP slot (the second cancellation check in CreateDownloadTasksAsync) + var totalParts = 3; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var capacityReleaseCount = 0; + + var httpThrottler = new SemaphoreSlim(1, 1); + var initialHttpCount = httpThrottler.CurrentCount; + + var internalCts = new CancellationTokenSource(); + var mockDataHandler = new Mock(); + + var capacityCallCount = 0; + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(() => + { + capacityCallCount++; + return Task.CompletedTask; + }); + + mockDataHandler + .Setup(x => x.ReleaseCapacity()) + .Callback(() => + { + Interlocked.Increment(ref capacityReleaseCount); + }); + + // Part 1 succeeds + mockDataHandler + .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + // Part 2 processing will trigger cancellation + mockDataHandler + .Setup(x => x.ProcessPartAsync(2, It.IsAny(), It.IsAny())) + .ThrowsAsync(new InvalidOperationException("Part 2 failure triggers cancellation")); + + mockDataHandler.Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); + + // Act + await coordinator.StartDownloadAsync(null, CancellationToken.None); + + try + { + await coordinator.DownloadCompletionTask; + } + catch (InvalidOperationException) + { + // Expected + } + + // Assert - Resources should be released correctly + Assert.AreEqual(initialHttpCount, httpThrottler.CurrentCount, + $"HTTP semaphore should be fully released. Initial={initialHttpCount}, Current={httpThrottler.CurrentCount}"); + + // Part 2 should have its capacity released due to error + Assert.IsTrue(capacityReleaseCount >= 1, + $"At least Part 2's capacity should be released on error. Released={capacityReleaseCount}"); + + // Cleanup + httpThrottler.Dispose(); } #endregion