diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedMultipartStream.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedMultipartStream.cs index 7093c2aa6a7b..fa3616a34f3d 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedMultipartStream.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedMultipartStream.cs @@ -43,16 +43,16 @@ internal class BufferedMultipartStream : Stream private bool _initialized = false; private bool _disposed = false; - private DownloadDiscoveryResult _discoveryResult; + private DownloadResult _discoveryResult; private long _totalBytesRead = 0; private readonly Logger _logger = Logger.GetLogger(typeof(BufferedMultipartStream)); /// - /// Gets the containing metadata from the initial GetObject response. + /// Gets the containing metadata from the initial GetObject response. /// Available after completes successfully. /// - public DownloadDiscoveryResult DiscoveryResult => _discoveryResult; + public DownloadResult DiscoveryResult => _discoveryResult; /// /// Creates a new with dependency injection. @@ -112,16 +112,14 @@ public async Task InitializeAsync(CancellationToken cancellationToken) _logger.DebugFormat("BufferedMultipartStream: Starting initialization"); - _discoveryResult = await _downloadCoordinator.DiscoverDownloadStrategyAsync(cancellationToken) - .ConfigureAwait(false); - - _logger.DebugFormat("BufferedMultipartStream: Discovery completed - ObjectSize={0}, TotalParts={1}, IsSinglePart={2}", + // Start unified download operation (discovers strategy and starts downloads) + _discoveryResult = await _downloadCoordinator.StartDownloadAsync(null, cancellationToken) + .ConfigureAwait(false); + + _logger.DebugFormat("BufferedMultipartStream: Download started - ObjectSize={0}, TotalParts={1}, IsSinglePart={2}", _discoveryResult.ObjectSize, _discoveryResult.TotalParts, _discoveryResult.IsSinglePart); - - await _downloadCoordinator.StartDownloadsAsync(_discoveryResult, null, cancellationToken) - .ConfigureAwait(false); _initialized = true; _logger.DebugFormat("BufferedMultipartStream: Initialization completed successfully"); diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs index 256a0228d086..2bb4cf198eb6 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs @@ -71,7 +71,7 @@ public BufferedPartDataHandler( _config = config ?? throw new ArgumentNullException(nameof(config)); } - public Task PrepareAsync(DownloadDiscoveryResult discoveryResult, CancellationToken cancellationToken) + public Task PrepareAsync(DownloadResult discoveryResult, CancellationToken cancellationToken) { // No preparation needed for buffered handler - buffers are created on demand return Task.CompletedTask; diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs index 4d7415a4a8f5..d85556a34ecc 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs @@ -55,7 +55,7 @@ public FilePartDataHandler(FileDownloadConfiguration config) } /// - public Task PrepareAsync(DownloadDiscoveryResult discoveryResult, CancellationToken cancellationToken) + public Task PrepareAsync(DownloadResult discoveryResult, CancellationToken cancellationToken) { // Create temporary file once during preparation phase _tempFilePath = _fileHandler.CreateTemporaryFile(_config.DestinationFilePath); diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/IDownloadManager.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/IDownloadManager.cs index 86f7240988a1..662076ded6da 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/IDownloadManager.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/IDownloadManager.cs @@ -32,23 +32,29 @@ namespace Amazon.S3.Transfer.Internal internal interface IDownloadManager : IDisposable { /// - /// Discovers whether the object requires single-part or multipart downloading. + /// Discovers the download strategy and starts concurrent downloads in a single operation. + /// This unified method eliminates resource leakage by managing HTTP slots and buffer capacity + /// internally throughout the entire download lifecycle. /// - /// A token to cancel the discovery operation. - /// - /// A task containing discovery results including total parts, object size, - /// and initial response data if single-part. - /// - Task DiscoverDownloadStrategyAsync(CancellationToken cancellationToken); - - /// - /// Starts concurrent downloads with HTTP concurrency control and part range calculations. - /// - /// Results from the discovery phase. /// Optional callback for progress tracking events. /// A token to cancel the download operation. - /// A task that completes when all downloads finish or an error occurs. - Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, EventHandler progressCallback, CancellationToken cancellationToken); + /// + /// A task containing download results including total parts, object size, + /// and initial response data. + /// + /// + /// This method performs both discovery and download operations atomically: + /// 1. Acquires HTTP slot and buffer capacity + /// 2. Makes initial GetObject request to discover download strategy + /// 3. Processes Part 1 immediately + /// 4. Starts background downloads for remaining parts (if multipart) + /// 5. Returns after Part 1 is processed, allowing consumer to begin reading + /// + /// Resources (HTTP slots, buffer capacity) are managed internally and released + /// at the appropriate times, eliminating the awkward resource holding that existed + /// with the previous two-method API. + /// + Task StartDownloadAsync(EventHandler progressCallback, CancellationToken cancellationToken); /// /// Exception that occurred during downloads, if any. @@ -57,9 +63,9 @@ internal interface IDownloadManager : IDisposable } /// - /// Download discovery results with metadata for determining download strategy. + /// Download results with metadata about the completed discovery and initial download. /// - internal class DownloadDiscoveryResult + internal class DownloadResult { /// /// Total parts needed (1 = single-part, >1 = multipart). @@ -72,7 +78,8 @@ internal class DownloadDiscoveryResult public long ObjectSize { get; set; } /// - /// GetObjectResponse obtained during download initialization, containing the ResponseStream. Represents the complete object for single-part downloads or the first range/part for multipart downloads. + /// GetObjectResponse obtained during download initialization, containing the ResponseStream. + /// Represents the complete object for single-part downloads or the first range/part for multipart downloads. /// public GetObjectResponse InitialResponse { get; set; } diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/IPartDataHandler.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/IPartDataHandler.cs index 864a49acbaa7..43cdce2075f6 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/IPartDataHandler.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/IPartDataHandler.cs @@ -40,7 +40,7 @@ internal interface IPartDataHandler : IDisposable /// Discovery result containing object metadata /// Cancellation token /// Task that completes when preparation is done - Task PrepareAsync(DownloadDiscoveryResult discoveryResult, CancellationToken cancellationToken); + Task PrepareAsync(DownloadResult discoveryResult, CancellationToken cancellationToken); /// /// Process a downloaded part from the GetObjectResponse. diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs index ecdcde369441..ef50b5f4a7bf 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs @@ -133,8 +133,7 @@ public MultipartDownloadManager(IAmazonS3 s3Client, BaseDownloadRequest request, /// /// /// - /// - /// + /// /// Thrown when using S3 encryption client, which does not support multipart downloads. public MultipartDownloadManager(IAmazonS3 s3Client, BaseDownloadRequest request, DownloadManagerConfiguration config, IPartDataHandler dataHandler, RequestEventHandler requestEventHandler, SemaphoreSlim sharedHttpThrottler) { @@ -177,42 +176,61 @@ public Exception DownloadException } /// - /// Discovers the download strategy (single-part vs multipart) by making an initial GetObject request. + /// Discovers the download strategy and starts concurrent downloads in a single unified operation. + /// This eliminates resource leakage by managing HTTP slots and buffer capacity internally. /// - /// Cancellation token to cancel the discovery operation. + /// Optional callback for progress tracking events. + /// A token to cancel the download operation. /// - /// A containing information about the object size, part count, + /// A containing information about the object size, part count, /// and the initial GetObject response. /// /// - /// IMPORTANT - HTTP Semaphore Lifecycle: - /// - /// This method acquires an HTTP concurrency slot from the configured semaphore and downloads Part 1. - /// The semaphore slot is HELD until completes processing Part 1. - /// Callers MUST call after this method to release the semaphore. - /// Failure to call will cause the semaphore slot to remain held indefinitely, - /// potentially blocking other downloads and causing deadlocks. - /// - /// Concurrency Implications: - /// - /// With limited HTTP concurrency (e.g., ConcurrentServiceRequests=1 for shared throttlers in directory downloads), - /// concurrent calls to this method will block until previous downloads complete their full lifecycle - /// (discover → start). This is by design to ensure the entire I/O operation (network + disk) is - /// within the concurrency limit. For single-slot throttlers, downloads must be processed sequentially: - /// complete one download's full lifecycle before starting the next. - /// - /// Typical Usage Pattern: - /// - /// var discovery = await manager.DiscoverDownloadStrategyAsync(cancellationToken); - /// await manager.StartDownloadsAsync(discovery, progressCallback, cancellationToken); - /// await manager.DownloadCompletionTask; // Wait for multipart downloads to finish - /// + /// This method performs both discovery and download operations atomically: + /// 1. Acquires HTTP slot and buffer capacity + /// 2. Makes initial GetObject request to discover download strategy + /// 3. Processes Part 1 immediately + /// 4. Starts background downloads for remaining parts (if multipart) + /// 5. Returns after Part 1 is processed, allowing consumer to begin reading + /// + /// Resources (HTTP slots, buffer capacity) are managed internally and released + /// at the appropriate times /// /// Thrown if the manager has been disposed. - /// Thrown if discovery has already been performed. + /// Thrown if download has already been started. /// Thrown if the operation is cancelled. /// - public async Task DiscoverDownloadStrategyAsync(CancellationToken cancellationToken) + public async Task StartDownloadAsync(EventHandler progressCallback, CancellationToken cancellationToken) + { + ThrowIfDisposed(); + + if (_discoveryCompleted) + throw new InvalidOperationException("Download has already been started"); + + // Step 1: Perform discovery (acquires resources, downloads Part 1) + var discoveryResult = await PerformDiscoveryAsync(cancellationToken).ConfigureAwait(false); + + // Step 2: Process Part 1 and start remaining downloads + await PerformDownloadsAsync(discoveryResult, progressCallback, cancellationToken).ConfigureAwait(false); + + // Step 3: Return results to caller + return discoveryResult; + } + + /// + /// Performs the discovery phase by making an initial GetObject request. + /// + /// Cancellation token to cancel the discovery operation. + /// + /// A containing information about the object size, part count, + /// and the initial GetObject response. + /// + /// + /// This method acquires an HTTP concurrency slot and buffer capacity, then makes the initial + /// GetObject request to determine the download strategy. The HTTP slot is held until + /// PerformDownloadsAsync processes Part 1. + /// + private async Task PerformDiscoveryAsync(CancellationToken cancellationToken) { ThrowIfDisposed(); @@ -251,9 +269,8 @@ public async Task DiscoverDownloadStrategyAsync(Cancell /// Processes Part 1 and starts downloading remaining parts for multipart downloads. /// Returns immediately after processing Part 1 to allow the consumer to begin reading. /// - /// - /// The discovery result from containing object metadata - /// and the initial GetObject response. + /// + /// The download result from discovery containing object metadata and the initial GetObject response. /// /// /// Optional progress callback that will be invoked as parts are downloaded. For multipart downloads, @@ -265,46 +282,22 @@ public async Task DiscoverDownloadStrategyAsync(Cancell /// continue downloading in the background (monitor via ). /// /// - /// HTTP Semaphore Release: - /// - /// This method processes Part 1 (downloaded during ) - /// and releases the HTTP semaphore slot that was acquired during discovery. - /// The semaphore is released after both the network download and disk write - /// operations complete for Part 1. This ensures the ConcurrentServiceRequests limit - /// controls the entire I/O operation (network + disk), not just the network download. - /// - /// Background Processing (Multipart Only): - /// - /// For multipart downloads (when TotalParts > 1), this method starts a background task - /// to download and process remaining parts (Part 2+) and returns immediately. This allows the - /// consumer to start reading from the buffer without waiting for all downloads to complete, - /// which prevents deadlocks when the buffer fills up before the consumer begins reading. - /// Monitor to detect when all background downloads have finished. - /// - /// Single-Part Downloads: - /// - /// For single-part downloads (when TotalParts = 1), this method processes Part 1 synchronously - /// and returns immediately. No background task is created, and - /// will already be completed when this method returns. - /// + /// This is a private method called by StartDownloadAsync after discovery completes. + /// It processes Part 1 and starts background downloads for remaining parts. /// - /// Thrown if the manager has been disposed. - /// Thrown if is null. - /// Thrown if the operation is cancelled. - /// - public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, EventHandler progressCallback, CancellationToken cancellationToken) + private async Task PerformDownloadsAsync(DownloadResult downloadResult, EventHandler progressCallback, CancellationToken cancellationToken) { ThrowIfDisposed(); - if (discoveryResult == null) - throw new ArgumentNullException(nameof(discoveryResult)); + if (downloadResult == null) + throw new ArgumentNullException(nameof(downloadResult)); // Store for progress aggregation _userProgressCallback = progressCallback; - _totalObjectSize = discoveryResult.ObjectSize; + _totalObjectSize = downloadResult.ObjectSize; _logger.DebugFormat("MultipartDownloadManager: Starting downloads - TotalParts={0}, IsSinglePart={1}", - discoveryResult.TotalParts, discoveryResult.IsSinglePart); + downloadResult.TotalParts, downloadResult.IsSinglePart); var internalCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); @@ -316,9 +309,9 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E : null; // Process Part 1 (downloaded during discovery) - await ProcessFirstPartAsync(discoveryResult, wrappedCallback, cancellationToken).ConfigureAwait(false); + await ProcessFirstPartAsync(downloadResult, wrappedCallback, cancellationToken).ConfigureAwait(false); - if (discoveryResult.IsSinglePart) + if (downloadResult.IsSinglePart) { // Single-part: Part 1 is the entire object _logger.DebugFormat("MultipartDownloadManager: Single-part download complete"); @@ -334,7 +327,7 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E // which prevents deadlock when MaxInMemoryParts is reached before consumer begins reading _downloadCompletionTask = Task.Run(async () => { - await StartBackgroundDownloadsAsync(discoveryResult, wrappedCallback, internalCts).ConfigureAwait(false); + await StartBackgroundDownloadsAsync(downloadResult, wrappedCallback, internalCts).ConfigureAwait(false); }, cancellationToken); // Return immediately to allow consumer to start reading @@ -361,23 +354,23 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E /// /// Processes Part 1 (downloaded during discovery) including preparation, progress tracking, and semaphore release. /// - private async Task ProcessFirstPartAsync(DownloadDiscoveryResult discoveryResult, EventHandler wrappedCallback, CancellationToken cancellationToken) + private async Task ProcessFirstPartAsync(DownloadResult downloadResult, EventHandler wrappedCallback, CancellationToken cancellationToken) { try { // Prepare the data handler (e.g., create temp files for file-based downloads) - await _dataHandler.PrepareAsync(discoveryResult, cancellationToken).ConfigureAwait(false); + await _dataHandler.PrepareAsync(downloadResult, cancellationToken).ConfigureAwait(false); // Attach progress callback to Part 1's response if provided if (wrappedCallback != null) { - discoveryResult.InitialResponse.WriteObjectProgressEvent += wrappedCallback; + downloadResult.InitialResponse.WriteObjectProgressEvent += wrappedCallback; } // Process Part 1 from InitialResponse (applies to both single-part and multipart) // NOTE: Semaphore is still held from discovery phase and will be released in finally block _logger.DebugFormat("MultipartDownloadManager: Processing Part 1 from discovery response"); - await _dataHandler.ProcessPartAsync(1, discoveryResult.InitialResponse, cancellationToken).ConfigureAwait(false); + await _dataHandler.ProcessPartAsync(1, downloadResult.InitialResponse, cancellationToken).ConfigureAwait(false); _logger.DebugFormat("MultipartDownloadManager: Part 1 processing completed"); } @@ -386,7 +379,7 @@ private async Task ProcessFirstPartAsync(DownloadDiscoveryResult discoveryResult // Always detach the event handler to prevent memory leak if (wrappedCallback != null) { - discoveryResult.InitialResponse.WriteObjectProgressEvent -= wrappedCallback; + downloadResult.InitialResponse.WriteObjectProgressEvent -= wrappedCallback; } // Release semaphore after BOTH network download AND disk write complete for Part 1 @@ -402,7 +395,7 @@ private async Task ProcessFirstPartAsync(DownloadDiscoveryResult discoveryResult /// Starts background downloads for remaining parts (Part 2+) in a multipart download. /// Handles capacity acquisition, task creation, completion validation, and error handling. /// - private async Task StartBackgroundDownloadsAsync(DownloadDiscoveryResult discoveryResult, EventHandler wrappedCallback, CancellationTokenSource internalCts) + private async Task StartBackgroundDownloadsAsync(DownloadResult downloadResult, EventHandler wrappedCallback, CancellationTokenSource internalCts) { var downloadTasks = new List(); @@ -412,10 +405,10 @@ private async Task StartBackgroundDownloadsAsync(DownloadDiscoveryResult discove // Multipart: Start concurrent downloads for remaining parts (Part 2 onwards) _logger.InfoFormat("MultipartDownloadManager: Starting concurrent downloads for parts 2-{0}", - discoveryResult.TotalParts); + downloadResult.TotalParts); // Create download tasks for all remaining parts - await CreateDownloadTasksAsync(discoveryResult, wrappedCallback, internalCts, downloadTasks).ConfigureAwait(false); + await CreateDownloadTasksAsync(downloadResult, wrappedCallback, internalCts, downloadTasks).ConfigureAwait(false); var expectedTaskCount = downloadTasks.Count; _logger.DebugFormat("MultipartDownloadManager: Background task waiting for {0} download tasks", expectedTaskCount); @@ -426,11 +419,11 @@ private async Task StartBackgroundDownloadsAsync(DownloadDiscoveryResult discove _logger.DebugFormat("MultipartDownloadManager: All download tasks completed successfully"); // Validate completion and mark successful - ValidateDownloadCompletion(expectedTaskCount, discoveryResult.TotalParts); + ValidateDownloadCompletion(expectedTaskCount, downloadResult.TotalParts); // Mark successful completion _logger.InfoFormat("MultipartDownloadManager: Download completed successfully - TotalParts={0}", - discoveryResult.TotalParts); + downloadResult.TotalParts); _dataHandler.OnDownloadComplete(null); } #pragma warning disable CA1031 // Do not catch general exception types @@ -453,12 +446,12 @@ private async Task StartBackgroundDownloadsAsync(DownloadDiscoveryResult discove /// Creates download tasks for all remaining parts (Part 2+) with sequential capacity acquisition. /// Pre-acquires capacity in sequential order to prevent race condition deadlock. /// - private async Task CreateDownloadTasksAsync(DownloadDiscoveryResult discoveryResult, EventHandler wrappedCallback, CancellationTokenSource internalCts, List downloadTasks) + private async Task CreateDownloadTasksAsync(DownloadResult downloadResult, EventHandler wrappedCallback, CancellationTokenSource internalCts, List downloadTasks) { // Pre-acquire capacity in sequential order to prevent race condition deadlock // This ensures Part 2 gets capacity before Part 3, etc., preventing out-of-order // parts from consuming all buffer slots and blocking the next expected part - for (int partNum = 2; partNum <= discoveryResult.TotalParts; partNum++) + for (int partNum = 2; partNum <= downloadResult.TotalParts; partNum++) { _logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for buffer space", partNum); @@ -478,7 +471,7 @@ private async Task CreateDownloadTasksAsync(DownloadDiscoveryResult discoveryRes try { - var task = CreateDownloadTaskAsync(partNum, discoveryResult.ObjectSize, wrappedCallback, internalCts.Token); + var task = CreateDownloadTaskAsync(partNum, downloadResult.ObjectSize, wrappedCallback, internalCts.Token); downloadTasks.Add(task); } catch (Exception ex) @@ -642,7 +635,7 @@ private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, Even } - private async Task DiscoverUsingPartStrategyAsync(CancellationToken cancellationToken) + private async Task DiscoverUsingPartStrategyAsync(CancellationToken cancellationToken) { // Check for cancellation before making any S3 calls cancellationToken.ThrowIfCancellationRequested(); @@ -693,7 +686,7 @@ private async Task DiscoverUsingPartStrategyAsync(Cance // SEP Part GET Step 7 will use this response for creating DownloadResponse // Keep the response with its stream (will be buffered in StartDownloadsAsync) - return new DownloadDiscoveryResult + return new DownloadResult { TotalParts = firstPartResponse.PartsCount.Value, ObjectSize = totalObjectSize, @@ -706,7 +699,7 @@ private async Task DiscoverUsingPartStrategyAsync(Cance _discoveredPartCount = 1; // Single part upload - return the response for immediate use (SEP Step 7) - return new DownloadDiscoveryResult + return new DownloadResult { TotalParts = 1, ObjectSize = firstPartResponse.ContentLength, @@ -723,7 +716,7 @@ private async Task DiscoverUsingPartStrategyAsync(Cance } } - private async Task DiscoverUsingRangeStrategyAsync(CancellationToken cancellationToken) + private async Task DiscoverUsingRangeStrategyAsync(CancellationToken cancellationToken) { // Check for cancellation before making any S3 calls cancellationToken.ThrowIfCancellationRequested(); @@ -771,7 +764,7 @@ private async Task DiscoverUsingRangeStrategyAsync(Canc // No ContentRange means we got the entire small object _discoveredPartCount = 1; - return new DownloadDiscoveryResult + return new DownloadResult { TotalParts = 1, ObjectSize = firstRangeResponse.ContentLength, @@ -792,7 +785,7 @@ private async Task DiscoverUsingRangeStrategyAsync(Canc // This request contains all of the data _discoveredPartCount = 1; - return new DownloadDiscoveryResult + return new DownloadResult { TotalParts = 1, ObjectSize = totalContentLength, @@ -815,7 +808,7 @@ private async Task DiscoverUsingRangeStrategyAsync(Canc // SEP Ranged GET Step 9 will use this response for creating DownloadResponse // Keep the response with its stream (will be buffered in StartDownloadsAsync) - return new DownloadDiscoveryResult + return new DownloadResult { TotalParts = _discoveredPartCount, ObjectSize = totalContentLength, diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/_async/MultipartDownloadCommand.async.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/_async/MultipartDownloadCommand.async.cs index cc58ffbbadac..c0b0a99c8709 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/_async/MultipartDownloadCommand.async.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/_async/MultipartDownloadCommand.async.cs @@ -62,23 +62,17 @@ public override async Task ExecuteAsync(Cancell long totalBytes = -1; try { - // Step 1: Discover download strategy (PART or RANGE) and get metadata - _logger.DebugFormat("MultipartDownloadCommand: Discovering download strategy"); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(cancellationToken) + // Start unified download operation (discovers strategy and starts downloads) + _logger.DebugFormat("MultipartDownloadCommand: Starting unified download operation"); + var downloadResult = await coordinator.StartDownloadAsync(DownloadPartProgressEventCallback, cancellationToken) .ConfigureAwait(false); - totalBytes = discoveryResult.ObjectSize; - - - _logger.DebugFormat("MultipartDownloadCommand: Discovered {0} part(s), total size: {1} bytes, IsSinglePart={2}", - discoveryResult.TotalParts, discoveryResult.ObjectSize, discoveryResult.IsSinglePart); + totalBytes = downloadResult.ObjectSize; - // Step 2: Start concurrent downloads for all parts - _logger.DebugFormat("Starting downloads for {0} part(s)", discoveryResult.TotalParts); - await coordinator.StartDownloadsAsync(discoveryResult, DownloadPartProgressEventCallback, cancellationToken) - .ConfigureAwait(false); + _logger.DebugFormat("MultipartDownloadCommand: Downloaded {0} part(s), total size: {1} bytes, IsSinglePart={2}", + downloadResult.TotalParts, downloadResult.ObjectSize, downloadResult.IsSinglePart); - // Step 2b: Wait for all downloads to complete before returning + // Wait for all downloads to complete before returning // This ensures file is fully written and committed for file-based downloads // For stream-based downloads, this task completes immediately (no-op) _logger.DebugFormat("MultipartDownloadCommand: Waiting for download completion"); @@ -86,23 +80,23 @@ await coordinator.StartDownloadsAsync(discoveryResult, DownloadPartProgressEvent _logger.DebugFormat("MultipartDownloadCommand: Completed multipart download"); - // Step 3: Map the response from the initial GetObject response + // Map the response from the initial GetObject response // The initial response contains all the metadata we need - var mappedResponse = ResponseMapper.MapGetObjectResponse(discoveryResult.InitialResponse); + var mappedResponse = ResponseMapper.MapGetObjectResponse(downloadResult.InitialResponse); // SEP Part GET Step 7 / Ranged GET Step 9: // Set ContentLength to total object size (not just first part) - mappedResponse.Headers.ContentLength = discoveryResult.ObjectSize; + mappedResponse.Headers.ContentLength = downloadResult.ObjectSize; // Set ContentRange to represent the entire object: bytes 0-(ContentLength-1)/ContentLength // S3 returns null for 0-byte objects, so we match that behavior - if (discoveryResult.ObjectSize == 0) + if (downloadResult.ObjectSize == 0) { mappedResponse.ContentRange = null; } else { - mappedResponse.ContentRange = $"bytes 0-{discoveryResult.ObjectSize - 1}/{discoveryResult.ObjectSize}"; + mappedResponse.ContentRange = $"bytes 0-{downloadResult.ObjectSize - 1}/{downloadResult.ObjectSize}"; } // SEP Part GET Step 7 / Ranged GET Step 9: diff --git a/sdk/test/Services/S3/UnitTests/Custom/BufferedMultipartStreamTests.cs b/sdk/test/Services/S3/UnitTests/Custom/BufferedMultipartStreamTests.cs index bf0e14a6dda3..29f966f34569 100644 --- a/sdk/test/Services/S3/UnitTests/Custom/BufferedMultipartStreamTests.cs +++ b/sdk/test/Services/S3/UnitTests/Custom/BufferedMultipartStreamTests.cs @@ -64,17 +64,16 @@ private async Task CreateInitializedStreamAsync( ? MultipartDownloadTestHelpers.CreateSinglePartResponse(objectSize) : new GetObjectResponse(); - var discoveryResult = new DownloadDiscoveryResult + var discoveryResult = new DownloadResult { TotalParts = totalParts, ObjectSize = objectSize, InitialResponse = mockResponse }; - _mockCoordinator.Setup(x => x.DiscoverDownloadStrategyAsync(It.IsAny())) + _mockCoordinator.Setup(x => x.StartDownloadAsync( + It.IsAny>(), It.IsAny())) .ReturnsAsync(discoveryResult); - _mockCoordinator.Setup(x => x.StartDownloadsAsync(It.IsAny(), It.IsAny>(), It.IsAny())) - .Returns(Task.CompletedTask); var stream = CreateStream(); await stream.InitializeAsync(CancellationToken.None); @@ -157,11 +156,11 @@ public void Create_WithNullParameter_ThrowsArgumentNullException( #region InitializeAsync Tests - Single Part [TestMethod] - public async Task InitializeAsync_SinglePart_UsesSinglePartHandler() + public async Task InitializeAsync_SinglePart_SetsCorrectDiscoveryResult() { // Arrange var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024); - var discoveryResult = new DownloadDiscoveryResult + var discoveryResult = new DownloadResult { TotalParts = 1, ObjectSize = 1024, @@ -169,7 +168,8 @@ public async Task InitializeAsync_SinglePart_UsesSinglePartHandler() }; var mockCoordinator = new Mock(); - mockCoordinator.Setup(x => x.DiscoverDownloadStrategyAsync(It.IsAny())) + mockCoordinator.Setup(x => x.StartDownloadAsync( + It.IsAny>(), It.IsAny())) .ReturnsAsync(discoveryResult); var mockBufferManager = new Mock(); @@ -182,25 +182,25 @@ public async Task InitializeAsync_SinglePart_UsesSinglePartHandler() // Assert Assert.IsNotNull(stream.DiscoveryResult); Assert.AreEqual(1, stream.DiscoveryResult.TotalParts); + Assert.AreEqual(1024, stream.DiscoveryResult.ObjectSize); } + [TestMethod] - public async Task InitializeAsync_SinglePart_CallsStartDownloads() + public async Task InitializeAsync_Multipart_UsesMultipartHandler() { // Arrange - var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024); - var discoveryResult = new DownloadDiscoveryResult + var discoveryResult = new DownloadResult { - TotalParts = 1, - ObjectSize = 1024, - InitialResponse = mockResponse + TotalParts = 5, + ObjectSize = 50 * 1024 * 1024, + InitialResponse = new GetObjectResponse() }; var mockCoordinator = new Mock(); - mockCoordinator.Setup(x => x.DiscoverDownloadStrategyAsync(It.IsAny())) + mockCoordinator.Setup(x => x.StartDownloadAsync( + It.IsAny>(), It.IsAny())) .ReturnsAsync(discoveryResult); - mockCoordinator.Setup(x => x.StartDownloadsAsync(It.IsAny(), It.IsAny>(), It.IsAny())) - .Returns(Task.CompletedTask); var mockBufferManager = new Mock(); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); @@ -210,31 +210,25 @@ public async Task InitializeAsync_SinglePart_CallsStartDownloads() await stream.InitializeAsync(CancellationToken.None); // Assert - mockCoordinator.Verify( - x => x.StartDownloadsAsync(discoveryResult, It.IsAny>(), It.IsAny()), - Times.Once); + Assert.AreEqual(5, stream.DiscoveryResult.TotalParts); } - #endregion - - #region InitializeAsync Tests - Multipart - [TestMethod] - public async Task InitializeAsync_Multipart_UsesMultipartHandler() + public async Task InitializeAsync_SinglePart_CallsStartDownloads() { // Arrange - var discoveryResult = new DownloadDiscoveryResult + var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024); + var discoveryResult = new DownloadResult { - TotalParts = 5, - ObjectSize = 50 * 1024 * 1024, - InitialResponse = new GetObjectResponse() + TotalParts = 1, + ObjectSize = 1024, + InitialResponse = mockResponse }; var mockCoordinator = new Mock(); - mockCoordinator.Setup(x => x.DiscoverDownloadStrategyAsync(It.IsAny())) + mockCoordinator.Setup(x => x.StartDownloadAsync( + It.IsAny>(), It.IsAny())) .ReturnsAsync(discoveryResult); - mockCoordinator.Setup(x => x.StartDownloadsAsync(It.IsAny(), It.IsAny>(), It.IsAny())) - .Returns(Task.CompletedTask); var mockBufferManager = new Mock(); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); @@ -244,14 +238,21 @@ public async Task InitializeAsync_Multipart_UsesMultipartHandler() await stream.InitializeAsync(CancellationToken.None); // Assert - Assert.AreEqual(5, stream.DiscoveryResult.TotalParts); + mockCoordinator.Verify( + x => x.StartDownloadAsync( + It.IsAny>(), It.IsAny()), + Times.Once); } + #endregion + + #region InitializeAsync Tests - Multipart + [TestMethod] public async Task InitializeAsync_Multipart_StartsDownloads() { // Arrange - var discoveryResult = new DownloadDiscoveryResult + var discoveryResult = new DownloadResult { TotalParts = 5, ObjectSize = 50 * 1024 * 1024, @@ -259,10 +260,9 @@ public async Task InitializeAsync_Multipart_StartsDownloads() }; var mockCoordinator = new Mock(); - mockCoordinator.Setup(x => x.DiscoverDownloadStrategyAsync(It.IsAny())) + mockCoordinator.Setup(x => x.StartDownloadAsync( + It.IsAny>(), It.IsAny())) .ReturnsAsync(discoveryResult); - mockCoordinator.Setup(x => x.StartDownloadsAsync(It.IsAny(), It.IsAny>(), It.IsAny())) - .Returns(Task.CompletedTask); var mockBufferManager = new Mock(); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); @@ -273,7 +273,8 @@ public async Task InitializeAsync_Multipart_StartsDownloads() // Assert mockCoordinator.Verify( - x => x.StartDownloadsAsync(discoveryResult, It.IsAny>(), It.IsAny()), + x => x.StartDownloadAsync( + It.IsAny>(), It.IsAny()), Times.Once); } @@ -285,7 +286,7 @@ public async Task InitializeAsync_Multipart_StartsDownloads() public async Task InitializeAsync_SetsDiscoveryResult() { // Arrange - var discoveryResult = new DownloadDiscoveryResult + var discoveryResult = new DownloadResult { TotalParts = 1, ObjectSize = 1024, @@ -293,7 +294,8 @@ public async Task InitializeAsync_SetsDiscoveryResult() }; var mockCoordinator = new Mock(); - mockCoordinator.Setup(x => x.DiscoverDownloadStrategyAsync(It.IsAny())) + mockCoordinator.Setup(x => x.StartDownloadAsync( + It.IsAny>(), It.IsAny())) .ReturnsAsync(discoveryResult); var mockBufferManager = new Mock(); @@ -314,7 +316,7 @@ public async Task InitializeAsync_CalledTwice_ThrowsInvalidOperationException() { // Arrange var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024); - var discoveryResult = new DownloadDiscoveryResult + var discoveryResult = new DownloadResult { TotalParts = 1, ObjectSize = 1024, @@ -322,7 +324,8 @@ public async Task InitializeAsync_CalledTwice_ThrowsInvalidOperationException() }; var mockCoordinator = new Mock(); - mockCoordinator.Setup(x => x.DiscoverDownloadStrategyAsync(It.IsAny())) + mockCoordinator.Setup(x => x.StartDownloadAsync( + It.IsAny>(), It.IsAny())) .ReturnsAsync(discoveryResult); var mockBufferManager = new Mock(); diff --git a/sdk/test/Services/S3/UnitTests/Custom/FilePartDataHandlerConcurrencyTests.cs b/sdk/test/Services/S3/UnitTests/Custom/FilePartDataHandlerConcurrencyTests.cs index 3d0c41243648..819ea95b2f2a 100644 --- a/sdk/test/Services/S3/UnitTests/Custom/FilePartDataHandlerConcurrencyTests.cs +++ b/sdk/test/Services/S3/UnitTests/Custom/FilePartDataHandlerConcurrencyTests.cs @@ -56,7 +56,7 @@ private async Task ExecuteConcurrentWriteTest( destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); // Determine write order (default to sequential if not specified) var order = writeOrder ?? Enumerable.Range(1, partCount).ToArray(); @@ -101,7 +101,7 @@ private async Task ExecuteVaryingSizeTest( destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); var totalSize = partDefinitions.Sum(p => p.Size); var order = writeOrder ?? Enumerable.Range(0, partDefinitions.Length).ToArray(); diff --git a/sdk/test/Services/S3/UnitTests/Custom/FilePartDataHandlerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/FilePartDataHandlerTests.cs index 37bf03a2c179..46c2a7536bb4 100644 --- a/sdk/test/Services/S3/UnitTests/Custom/FilePartDataHandlerTests.cs +++ b/sdk/test/Services/S3/UnitTests/Custom/FilePartDataHandlerTests.cs @@ -62,7 +62,7 @@ public async Task PrepareAsync_CreatesTempFile() var config = MultipartDownloadTestHelpers.CreateFileDownloadConfiguration( destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - var discoveryResult = new DownloadDiscoveryResult + var discoveryResult = new DownloadResult { TotalParts = 1, ObjectSize = 1024 @@ -85,7 +85,7 @@ public async Task PrepareAsync_TempFileFollowsPattern() var config = MultipartDownloadTestHelpers.CreateFileDownloadConfiguration( destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - var discoveryResult = new DownloadDiscoveryResult(); + var discoveryResult = new DownloadResult(); // Act await handler.PrepareAsync(discoveryResult, CancellationToken.None); @@ -102,7 +102,7 @@ public async Task PrepareAsync_ReturnsCompletedTask() var config = MultipartDownloadTestHelpers.CreateFileDownloadConfiguration( destinationPath: Path.Combine(_testDirectory, "test.dat")); var handler = new FilePartDataHandler(config); - var discoveryResult = new DownloadDiscoveryResult(); + var discoveryResult = new DownloadResult(); // Act var task = handler.PrepareAsync(discoveryResult, CancellationToken.None); @@ -125,7 +125,7 @@ public async Task ProcessPartAsync_WritesDataToFile() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); var partData = MultipartDownloadTestHelpers.GenerateTestData(1024, 0); var response = new GetObjectResponse @@ -156,7 +156,7 @@ public async Task ProcessPartAsync_WritesAtCorrectOffset() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); // Write part 2 (offset 1024) var part2Data = MultipartDownloadTestHelpers.GenerateTestData(1024, 1024); @@ -192,7 +192,7 @@ public async Task ProcessPartAsync_ParsesContentRangeForOffset() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); var partData = MultipartDownloadTestHelpers.GenerateTestData(100, 0); var response = new GetObjectResponse @@ -225,7 +225,7 @@ public async Task ProcessPartAsync_MissingContentRange_ThrowsInvalidOperationExc destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); var partData = MultipartDownloadTestHelpers.GenerateTestData(100, 0); var response = new GetObjectResponse @@ -250,7 +250,7 @@ public async Task ProcessPartAsync_InvalidContentRange_ThrowsInvalidOperationExc destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); var partData = MultipartDownloadTestHelpers.GenerateTestData(100, 0); var response = new GetObjectResponse @@ -277,7 +277,7 @@ public async Task ProcessPartAsync_PreservesDataIntegrity() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); var partData = MultipartDownloadTestHelpers.CreateMixedPattern(10240, 42); var response = new GetObjectResponse @@ -305,7 +305,7 @@ public async Task ProcessPartAsync_HandlesZeroByteResponse() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); var response = new GetObjectResponse { @@ -327,7 +327,7 @@ public async Task ProcessPartAsync_HandlesSmallPart() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); var partData = MultipartDownloadTestHelpers.GenerateTestData(100, 0); var response = new GetObjectResponse @@ -355,7 +355,7 @@ public async Task ProcessPartAsync_HandlesLargePart() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); var partSize = 16 * 1024 * 1024; // 16MB var partData = MultipartDownloadTestHelpers.GenerateTestData(partSize, 0); @@ -384,7 +384,7 @@ public async Task ProcessPartAsync_MultipleWritesPreserveAllData() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); // Write part 1 var part1Data = MultipartDownloadTestHelpers.GenerateTestData(1024, 0); @@ -431,7 +431,7 @@ public async Task ProcessPartAsync_SupportsConcurrentWrites() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); // Create multiple parts var part1Data = MultipartDownloadTestHelpers.GenerateTestData(1024, 0); @@ -489,7 +489,7 @@ public async Task ProcessPartAsync_ConcurrentWritesDontInterfere() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); // Create 10 parts with distinct patterns var tasks = new Task[10]; @@ -563,7 +563,7 @@ public async Task ProcessPartAsync_WithCancelledToken_ThrowsTaskCanceledExceptio destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); var partData = MultipartDownloadTestHelpers.GenerateTestData(1024, 0); var response = new GetObjectResponse @@ -657,7 +657,7 @@ public async Task OnDownloadComplete_WithSuccess_CommitsTempFile() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); var partData = MultipartDownloadTestHelpers.GenerateTestData(1024, 0); var response = new GetObjectResponse @@ -689,7 +689,7 @@ public async Task OnDownloadComplete_WithSuccess_DestinationContainsAllData() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); // Write 3 parts for (int i = 0; i < 3; i++) @@ -726,7 +726,7 @@ public async Task OnDownloadComplete_WithFailure_CleansTempFile() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); // Act handler.OnDownloadComplete(new Exception("Download failed")); @@ -744,7 +744,7 @@ public async Task OnDownloadComplete_WithDifferentExceptions_AllHandledCorrectly var config1 = MultipartDownloadTestHelpers.CreateFileDownloadConfiguration( destinationPath: destinationPath1); var handler1 = new FilePartDataHandler(config1); - await handler1.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler1.PrepareAsync(new DownloadResult(), CancellationToken.None); handler1.OnDownloadComplete(new OperationCanceledException()); Assert.IsFalse(File.Exists(destinationPath1)); @@ -753,7 +753,7 @@ public async Task OnDownloadComplete_WithDifferentExceptions_AllHandledCorrectly var config2 = MultipartDownloadTestHelpers.CreateFileDownloadConfiguration( destinationPath: destinationPath2); var handler2 = new FilePartDataHandler(config2); - await handler2.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler2.PrepareAsync(new DownloadResult(), CancellationToken.None); handler2.OnDownloadComplete(new IOException("IO error")); Assert.IsFalse(File.Exists(destinationPath2)); } @@ -771,7 +771,7 @@ public async Task Dispose_CleansUpUncommittedFile() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); // Act handler.Dispose(); @@ -790,7 +790,7 @@ public async Task Dispose_AfterCommit_DoesNotDeleteDestination() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); var partData = MultipartDownloadTestHelpers.GenerateTestData(1024, 0); var response = new GetObjectResponse @@ -853,7 +853,7 @@ public async Task Integration_CompleteWorkflow_ProducesCorrectFile() var handler = new FilePartDataHandler(config); // Act - Simulate complete download workflow - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); // Download 5 parts for (int i = 0; i < 5; i++) @@ -888,7 +888,7 @@ public async Task Integration_ParallelDownload_ProducesCorrectFile() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); // Act - Download parts in parallel (reverse order to test offset handling) var tasks = new Task[5]; @@ -926,7 +926,7 @@ public async Task Integration_FailedDownload_CleansUpProperly() var handler = new FilePartDataHandler(config); // Act - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); var partData = MultipartDownloadTestHelpers.GenerateTestData(1024, 0); var response = new GetObjectResponse @@ -955,7 +955,7 @@ public async Task Integration_LargeFileDownload_HandlesCorrectly() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); // Act - Download 3 parts of 1MB each for (int i = 0; i < 3; i++) @@ -990,7 +990,7 @@ public async Task Integration_SingleByteFile_HandlesCorrectly() destinationPath: destinationPath); var handler = new FilePartDataHandler(config); - await handler.PrepareAsync(new DownloadDiscoveryResult(), CancellationToken.None); + await handler.PrepareAsync(new DownloadResult(), CancellationToken.None); // Act - Download single byte var partData = new byte[] { 0x42 }; diff --git a/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs index d39db70c38d3..f77562ac617f 100644 --- a/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs +++ b/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs @@ -231,7 +231,7 @@ public async Task DiscoverUsingPartStrategy_WithNullPartsCount_ReturnsSinglePart var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var result = await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert Assert.IsNotNull(result); @@ -259,7 +259,7 @@ public async Task DiscoverUsingPartStrategy_WithPartsCountOne_ReturnsSinglePart( var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var result = await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert Assert.AreEqual(1, result.TotalParts); @@ -280,7 +280,7 @@ public async Task DiscoverUsingPartStrategy_SinglePart_DoesNotBufferFirstPart() var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var result = await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert - Single-part does not buffer during discovery Assert.IsNotNull(result.InitialResponse); @@ -310,7 +310,7 @@ public async Task DiscoverUsingPartStrategy_WithMultipleParts_ReturnsMultipart() var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var result = await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert Assert.AreEqual(5, result.TotalParts); @@ -338,7 +338,7 @@ public async Task DiscoverUsingPartStrategy_Multipart_BuffersFirstPart() var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var result = await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert - Multipart returns response with stream for buffering in StartDownloadsAsync Assert.IsNotNull(result.InitialResponse); @@ -360,7 +360,7 @@ public async Task DiscoverUsingPartStrategy_SavesETag() var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var result = await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert - ETag is saved internally (verified through subsequent validation) Assert.IsNotNull(result); @@ -389,7 +389,7 @@ public async Task DiscoverUsingPartStrategy_ParsesContentRange() var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var result = await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert Assert.AreEqual(totalObjectSize, result.ObjectSize); @@ -415,7 +415,7 @@ public async Task DiscoverUsingPartStrategy_WithInvalidContentRange_ThrowsExcept var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); // Act - await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); } #endregion @@ -443,7 +443,7 @@ public async Task DiscoverUsingRangeStrategy_SmallObject_ReturnsSinglePart() var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var result = await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert Assert.AreEqual(1, result.TotalParts); @@ -477,7 +477,7 @@ public async Task DiscoverUsingRangeStrategy_SinglePartRange_ReturnsSinglePart() var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var result = await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert Assert.AreEqual(1, result.TotalParts); @@ -509,7 +509,7 @@ public async Task DiscoverUsingRangeStrategy_Multipart_ReturnsMultipart() var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var result = await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert Assert.AreEqual(7, result.TotalParts); // 52428800 / 8388608 = 6.25 -> 7 parts @@ -541,7 +541,7 @@ public async Task DiscoverUsingRangeStrategy_Multipart_ValidatesContentLength() var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); // Act - await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); } [TestMethod] @@ -561,7 +561,7 @@ public async Task DiscoverUsingRangeStrategy_SavesETag() var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var result = await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert - ETag is saved internally Assert.IsNotNull(result); @@ -587,7 +587,7 @@ public async Task DiscoverUsingRangeStrategy_CalculatesPartCount() var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var result = await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert Assert.AreEqual(7, result.TotalParts); // Ceiling(52428800 / 8388608) = 7 @@ -609,25 +609,30 @@ public async Task StartDownloadsAsync_SinglePart_ReturnsImmediately() var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); // Act - Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + + await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert - should complete without any additional downloads (discovery already made the call) mockClient.Verify(x => x.GetObjectAsync(It.IsAny(), It.IsAny()), Times.Once); } [TestMethod] - [ExpectedException(typeof(ArgumentNullException))] - public async Task StartDownloadsAsync_WithNullDiscoveryResult_ThrowsArgumentNullException() + public async Task StartDownloadsAsync_SinglePart_ProcessesPartSynchronously() { // Arrange - var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(); + var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024); + var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client( + (req, ct) => Task.FromResult(mockResponse)); var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); // Act - await coordinator.StartDownloadsAsync(null, null, CancellationToken.None); + var result = await coordinator.StartDownloadAsync(null, CancellationToken.None); + + // Assert - should return discovery result immediately for single-part downloads + Assert.IsNotNull(result); + Assert.AreEqual(1, result.TotalParts); } #endregion @@ -647,10 +652,10 @@ public async Task Validation_Failures_ThrowInvalidOperationException( // Arrange var mockClient = MultipartDownloadTestHelpers.CreateMockClientWithValidationFailure(failureType); var coordinator = MultipartDownloadTestHelpers.CreateCoordinatorForValidationTest(mockClient.Object, failureType); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act & Assert (exception expected via attribute) - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); await coordinator.DownloadCompletionTask; // Wait for background task to observe exceptions } @@ -671,10 +676,10 @@ public async Task Validation_ETag_Matching_Succeeds() var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - should succeed with matching ETags - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert - no exception thrown } @@ -713,10 +718,10 @@ public async Task Validation_ContentRange_ValidRange_Succeeds() var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - should succeed with valid ranges - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert - no exception thrown } @@ -776,10 +781,10 @@ public async Task StartDownloadsAsync_MultipartDownload_AcquiresCapacitySequenti var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 2); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); // Wait for background task completion await coordinator.DownloadCompletionTask; @@ -836,10 +841,10 @@ public async Task StartDownloadsAsync_MultipartDownload_DoesNotCallWaitForCapaci var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); await coordinator.DownloadCompletionTask; // Assert @@ -903,10 +908,10 @@ public async Task StartDownloadsAsync_BackgroundTask_InterleavesCapacityAcquisit var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); await coordinator.DownloadCompletionTask; // Assert @@ -1023,11 +1028,11 @@ public async Task StartDownloadsAsync_PreventRaceConditionDeadlock_WithLimitedBu var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 3); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - This should not deadlock with the new sequential approach var startTime = DateTime.UtcNow; - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); await coordinator.DownloadCompletionTask; var endTime = DateTime.UtcNow; @@ -1095,10 +1100,10 @@ public async Task StartDownloadsAsync_SequentialCapacityAcquisition_PreventsOutO var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 2); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); await coordinator.DownloadCompletionTask; // Assert - Capacity acquisition should be in order, preventing blocking @@ -1137,10 +1142,10 @@ public async Task StartDownloadsAsync_BackgroundTaskSuccess_DisposesCancellation var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); // Wait for background task to complete await coordinator.DownloadCompletionTask; @@ -1194,10 +1199,10 @@ public async Task StartDownloadsAsync_BackgroundTaskFailure_DisposesCancellation var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); // Wait for background task to complete (with failure) try @@ -1236,7 +1241,7 @@ public async Task StartDownloadsAsync_EarlyError_DisposesCancellationTokenSource // Simulate error during PrepareAsync (before background task is created) mockDataHandler - .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) + .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) .ThrowsAsync(new InvalidOperationException("Simulated prepare failure")); var totalParts = 2; @@ -1252,12 +1257,12 @@ public async Task StartDownloadsAsync_EarlyError_DisposesCancellationTokenSource var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); // Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act & Assert try { - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); Assert.Fail("Expected InvalidOperationException to be thrown"); } catch (InvalidOperationException ex) @@ -1318,10 +1323,10 @@ public async Task StartDownloadsAsync_BackgroundTaskCancellation_HandlesTokenDis var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); // Wait for background task cancellation try @@ -1372,7 +1377,7 @@ public async Task Operations_AfterDispose_ThrowObjectDisposedException() // Act coordinator.Dispose(); - await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); } #endregion @@ -1396,9 +1401,29 @@ public async Task DiscoverDownloadStrategyAsync_WhenCancelled_ThrowsOperationCan cts.Cancel(); // Act - await coordinator.DiscoverDownloadStrategyAsync(cts.Token); + await coordinator.StartDownloadAsync(null, cts.Token); + } + + [TestMethod] + [ExpectedException(typeof(OperationCanceledException))] + public async Task StartDownloadAsync_SinglePart_WithPreCancelledToken_ThrowsOperationCanceledException() + { + var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024); + var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client( + (req, ct) => Task.FromResult(mockResponse)); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, CreateMockDataHandler().Object); + + var cts = new CancellationTokenSource(); + cts.Cancel(); + + await coordinator.StartDownloadAsync(null, cts.Token); } + [TestMethod] public async Task DiscoverDownloadStrategyAsync_WhenCancelled_SetsDownloadException() { @@ -1418,7 +1443,7 @@ public async Task DiscoverDownloadStrategyAsync_WhenCancelled_SetsDownloadExcept // Act try { - await coordinator.DiscoverDownloadStrategyAsync(cts.Token); + await coordinator.StartDownloadAsync(null, cts.Token); } catch (OperationCanceledException) { @@ -1447,7 +1472,7 @@ public async Task DiscoverDownloadStrategyAsync_PassesCancellationTokenToS3Clien var cts = new CancellationTokenSource(); // Act - await coordinator.DiscoverDownloadStrategyAsync(cts.Token); + await coordinator.StartDownloadAsync(null, cts.Token); // Assert Assert.AreEqual(cts.Token, capturedToken); @@ -1470,13 +1495,13 @@ public async Task StartDownloadsAsync_WhenCancelledBeforeStart_ThrowsOperationCa var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var cts = new CancellationTokenSource(); cts.Cancel(); // Act - await coordinator.StartDownloadsAsync(discoveryResult, null, cts.Token); + await coordinator.StartDownloadAsync(null, cts.Token); await coordinator.DownloadCompletionTask; // Wait for background task to observe exceptions } @@ -1512,12 +1537,11 @@ public async Task StartDownloadsAsync_WhenCancelledDuringDownloads_NotifiesBuffe var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); // Act try { - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); await coordinator.DownloadCompletionTask; // Wait for background task to observe exceptions } catch (OperationCanceledException) @@ -1557,12 +1581,12 @@ public async Task StartDownloadsAsync_WhenCancelled_SetsDownloadException() var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act try { - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); await coordinator.DownloadCompletionTask; // Wait for background task to observe exceptions } catch (OperationCanceledException) @@ -1591,41 +1615,17 @@ public async Task StartDownloadsAsync_PassesCancellationTokenToBufferManager() var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var cts = new CancellationTokenSource(); // Act - await coordinator.StartDownloadsAsync(discoveryResult, null, cts.Token); + var result = await coordinator.StartDownloadAsync(null, cts.Token); // Assert - The cancellation token was passed through to the data handler - Assert.IsNotNull(discoveryResult); + Assert.IsNotNull(result); } - [TestMethod] - public async Task StartDownloadsAsync_SinglePart_DoesNotThrowOnCancellation() - { - // Arrange - Single part download should return immediately without using cancellation token - var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024); - var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client( - (req, ct) => Task.FromResult(mockResponse)); - - var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); - var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); - var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); - - // Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); - - var cts = new CancellationTokenSource(); - cts.Cancel(); - - // Act - should complete without throwing even though token is cancelled - await coordinator.StartDownloadsAsync(discoveryResult, null, cts.Token); - - // Assert - discovery already made the S3 call, StartDownloadsAsync doesn't make additional calls for single-part - mockClient.Verify(x => x.GetObjectAsync(It.IsAny(), It.IsAny()), Times.Once); - } [TestMethod] public async Task StartDownloadsAsync_CancellationPropagatesAcrossConcurrentDownloads() @@ -1667,12 +1667,12 @@ public async Task StartDownloadsAsync_CancellationPropagatesAcrossConcurrentDown var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 2); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act try { - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); await coordinator.DownloadCompletionTask; // Wait for background task to observe exceptions } catch (OperationCanceledException) @@ -1702,7 +1702,7 @@ public async Task Coordinator_CanBeDisposedAfterCancellation() // Act try { - await coordinator.DiscoverDownloadStrategyAsync(cts.Token); + await coordinator.StartDownloadAsync(null, cts.Token); } catch (OperationCanceledException) { @@ -1746,10 +1746,10 @@ public async Task StartDownloadsAsync_RangeStrategy_CancellationDuringDownloads( var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); await coordinator.DownloadCompletionTask; // Wait for background task to observe exceptions } @@ -1800,11 +1800,11 @@ public async Task StartDownloadsAsync_ReturnsImmediately_PreventsDeadlock() var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 2); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - StartDownloadsAsync should return immediately (not wait for all downloads) var stopwatch = System.Diagnostics.Stopwatch.StartNew(); - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); stopwatch.Stop(); // Assert - StartDownloadsAsync should return almost immediately @@ -1847,11 +1847,11 @@ public async Task StartDownloadsAsync_SinglePart_ReturnsImmediatelyWithoutBackgr var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); // Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act var stopwatch = System.Diagnostics.Stopwatch.StartNew(); - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); stopwatch.Stop(); // DownloadCompletionTask should be completed immediately (no background work) @@ -1867,13 +1867,12 @@ public async Task StartDownloadsAsync_SinglePart_ReturnsImmediatelyWithoutBackgr #region Capacity Checking Tests [TestMethod] - public async Task DiscoverUsingPartStrategy_CallsWaitForCapacityAsync() + public async Task Discovery_PartStrategy_CallsWaitForCapacityAsync() { - // Arrange + // Arrange - PART strategy should check capacity during discovery var capacityCallCount = 0; var mockDataHandler = new Mock(); - // Track WaitForCapacityAsync calls mockDataHandler .Setup(x => x.WaitForCapacityAsync(It.IsAny())) .Returns(() => @@ -1886,8 +1885,12 @@ public async Task DiscoverUsingPartStrategy_CallsWaitForCapacityAsync() .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny())) .Returns(Task.CompletedTask); + var totalObjectSize = 24 * 1024 * 1024; // 24MB -> 3 parts @ 8MB + var partSize = 8 * 1024 * 1024; // 8MB + var totalParts = 3; + var mockResponse = MultipartDownloadTestHelpers.CreateMultipartFirstPartResponse( - 8 * 1024 * 1024, 3, 24 * 1024 * 1024, "test-etag"); + partSize, totalParts, totalObjectSize, "test-etag"); var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client( (req, ct) => Task.FromResult(mockResponse)); @@ -1895,28 +1898,27 @@ public async Task DiscoverUsingPartStrategy_CallsWaitForCapacityAsync() var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( downloadType: MultipartDownloadType.PART); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); - var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object); // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var result = await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert - Assert.AreEqual(1, capacityCallCount, "WaitForCapacityAsync should be called exactly once during Part 1 discovery"); Assert.IsNotNull(result); Assert.AreEqual(3, result.TotalParts); - - // Verify the mock was called with correct setup - mockDataHandler.Verify(x => x.WaitForCapacityAsync(It.IsAny()), Times.Once); + Assert.AreEqual(1, capacityCallCount, + "PART strategy should call WaitForCapacityAsync during Part 1 discovery"); } + [TestMethod] - public async Task DiscoverUsingRangeStrategy_CallsWaitForCapacityAsync() + public async Task Discovery_RangeStrategy_CallsWaitForCapacityAsync() { - // Arrange + // Arrange - RANGE strategy should also check capacity during discovery var capacityCallCount = 0; var mockDataHandler = new Mock(); - // Track WaitForCapacityAsync calls mockDataHandler .Setup(x => x.WaitForCapacityAsync(It.IsAny())) .Returns(() => @@ -1929,8 +1931,9 @@ public async Task DiscoverUsingRangeStrategy_CallsWaitForCapacityAsync() .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny())) .Returns(Task.CompletedTask); - var totalObjectSize = 52428800; // 50MB - var partSize = 8388608; // 8MB + var totalObjectSize = 17 * 1024 * 1024; // 17MB -> 3 parts @ 8MB + var partSize = 8 * 1024 * 1024; // 8MB + var mockResponse = MultipartDownloadTestHelpers.CreateRangeResponse( 0, partSize - 1, totalObjectSize, "test-etag"); @@ -1941,21 +1944,19 @@ public async Task DiscoverUsingRangeStrategy_CallsWaitForCapacityAsync() partSize: partSize, downloadType: MultipartDownloadType.RANGE); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); - var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object); // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var result = await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert - Assert.AreEqual(1, capacityCallCount, "WaitForCapacityAsync should be called exactly once during Part 1 discovery"); Assert.IsNotNull(result); - Assert.AreEqual(7, result.TotalParts); // 52428800 / 8388608 = 6.25 -> 7 parts - - // Verify the mock was called with correct setup - mockDataHandler.Verify(x => x.WaitForCapacityAsync(It.IsAny()), Times.Once); + Assert.AreEqual(3, result.TotalParts); // 17MB / 8MB = 3 parts (ceiling) + Assert.AreEqual(1, capacityCallCount, + "RANGE strategy should call WaitForCapacityAsync during Part 1 discovery"); } - [TestMethod] public async Task MultipleDownloads_WithSharedHttpThrottler_RespectsLimits() { @@ -1980,11 +1981,8 @@ public async Task MultipleDownloads_WithSharedHttpThrottler_RespectsLimits() var coordinator1 = new MultipartDownloadManager(mockClient1.Object, request1, config, mockDataHandler1.Object, null, sharedThrottler); var coordinator2 = new MultipartDownloadManager(mockClient2.Object, request2, config, mockDataHandler2.Object, null, sharedThrottler); - var discovery1 = await coordinator1.DiscoverDownloadStrategyAsync(CancellationToken.None); - await coordinator1.StartDownloadsAsync(discovery1, null, CancellationToken.None); - - var discovery2 = await coordinator2.DiscoverDownloadStrategyAsync(CancellationToken.None); - await coordinator2.StartDownloadsAsync(discovery2, null, CancellationToken.None); + var download1 = await coordinator1.StartDownloadAsync(null, CancellationToken.None); + var download2 = await coordinator2.StartDownloadAsync(null, CancellationToken.None); // Wait for all background work to complete await Task.WhenAll( @@ -1993,8 +1991,8 @@ await Task.WhenAll( ); // Assert - Both should complete successfully and semaphore should be fully released - Assert.IsNotNull(discovery1); - Assert.IsNotNull(discovery2); + Assert.IsNotNull(download1); + Assert.IsNotNull(download2); Assert.AreEqual(1, sharedThrottler.CurrentCount, "HTTP throttler should be fully released after complete download lifecycle"); // Cleanup @@ -2025,7 +2023,7 @@ public async Task Discovery_HttpRequestFails_ReleasesCapacityProperly() // Act & Assert try { - await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); Assert.Fail("Expected InvalidOperationException to be thrown"); } catch (InvalidOperationException ex) @@ -2068,7 +2066,7 @@ public async Task Discovery_CancellationDuringCapacityWait_ReleasesHttpSlotPrope // Act & Assert try { - await coordinator.DiscoverDownloadStrategyAsync(cts.Token); + await coordinator.StartDownloadAsync(null, cts.Token); Assert.Fail("Expected OperationCanceledException to be thrown"); } catch (OperationCanceledException) @@ -2117,7 +2115,7 @@ public async Task Discovery_CancellationAfterCapacityButBeforeHttp_ReleasesHttpS try { cts.Cancel(); // Cancel before discovery - await coordinator.DiscoverDownloadStrategyAsync(cts.Token); + await coordinator.StartDownloadAsync(null, cts.Token); Assert.Fail("Expected OperationCanceledException to be thrown"); } catch (OperationCanceledException) @@ -2161,7 +2159,7 @@ public async Task Discovery_SinglePart_StillCallsCapacityCheck() var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + var result = await coordinator.StartDownloadAsync(null, CancellationToken.None); // Assert Assert.IsNotNull(result); @@ -2170,6 +2168,7 @@ public async Task Discovery_SinglePart_StillCallsCapacityCheck() "Even single-part downloads should call WaitForCapacityAsync during discovery"); } + #endregion #region Concurrency Control Tests @@ -2231,10 +2230,10 @@ public async Task HttpSemaphore_HeldThroughProcessPartAsync() var coordinator = new MultipartDownloadManager( mockClient.Object, request, config, mockDataHandler.Object, null, httpSemaphore); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - var startTask = coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + var startTask = coordinator.StartDownloadAsync(null, CancellationToken.None); // Wait for Part 1 to enter ProcessPartAsync await part1EnteredProcessPart.Task; @@ -2306,10 +2305,10 @@ public async Task HttpSemaphore_RangeStrategy_HeldThroughProcessPartAsync() var coordinator = new MultipartDownloadManager( mockClient.Object, request, config, mockDataHandler.Object, null, httpSemaphore); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - var startTask = coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + var startTask = coordinator.StartDownloadAsync(null, CancellationToken.None); await part1EnteredProcessPart.Task; // Check semaphore state while Part 1 is in ProcessPartAsync @@ -2332,136 +2331,105 @@ public async Task HttpSemaphore_RangeStrategy_HeldThroughProcessPartAsync() #region Semaphore Release Error Path Tests [TestMethod] - public async Task StartDownloadsAsync_PrepareAsyncFails_ReleasesHttpSemaphore() + public async Task Discovery_WaitForCapacityFails_DoesNotReleaseHttpSemaphore() { - // Arrange - PrepareAsync fails but semaphore was acquired during discovery + // Arrange - Test that semaphore is NOT released when it was never acquired var httpThrottler = new SemaphoreSlim(2, 2); var initialCount = httpThrottler.CurrentCount; var mockDataHandler = new Mock(); - // WaitForCapacityAsync succeeds (buffer space available) + // WaitForCapacityAsync fails BEFORE HTTP semaphore is acquired mockDataHandler .Setup(x => x.WaitForCapacityAsync(It.IsAny())) - .Returns(Task.CompletedTask); - - // PrepareAsync fails BEFORE Part 1 processing - mockDataHandler - .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) - .ThrowsAsync(new InvalidOperationException("Simulated prepare failure")); - - var mockResponse = MultipartDownloadTestHelpers.CreateMultipartFirstPartResponse( - 8 * 1024 * 1024, 2, 16 * 1024 * 1024, "test-etag"); - - var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client( - (req, ct) => Task.FromResult(mockResponse)); + .ThrowsAsync(new InvalidOperationException("Simulated capacity wait failure")); - var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( - downloadType: MultipartDownloadType.PART); + var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(); + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var coordinator = new MultipartDownloadManager( mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); - - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); - - // After discovery, semaphore should have 1 slot held (2 total - 1 used = 1 available) - Assert.AreEqual(initialCount - 1, httpThrottler.CurrentCount, - "After discovery, semaphore should have 1 slot held"); // Act & Assert try { - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); Assert.Fail("Expected InvalidOperationException to be thrown"); } catch (InvalidOperationException ex) { - Assert.AreEqual("Simulated prepare failure", ex.Message); + Assert.AreEqual("Simulated capacity wait failure", ex.Message); } + // Assert - Semaphore should NOT be released (it was never acquired) Assert.AreEqual(initialCount, httpThrottler.CurrentCount, - "HTTP semaphore should be released when PrepareAsync fails"); + "HTTP semaphore should NOT be released when it was never acquired (failed before WaitAsync)"); // Cleanup httpThrottler.Dispose(); } + + [TestMethod] - public async Task StartDownloadsAsync_Part1ProcessingFails_ReleasesHttpSemaphore() + public async Task StartDownloadAsync_WaitForCapacityFails_DoesNotReleaseHttpSemaphore() { - // Arrange - Test that finally block correctly releases semaphore when Part 1 processing fails + // Arrange - Test that semaphore is NOT released when it was never acquired var httpThrottler = new SemaphoreSlim(2, 2); var initialCount = httpThrottler.CurrentCount; var mockDataHandler = new Mock(); - // WaitForCapacityAsync succeeds + // WaitForCapacityAsync fails BEFORE HTTP semaphore is acquired mockDataHandler .Setup(x => x.WaitForCapacityAsync(It.IsAny())) - .Returns(Task.CompletedTask); - - // PrepareAsync succeeds - mockDataHandler - .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) - .Returns(Task.CompletedTask); - - // ProcessPartAsync fails for Part 1 - mockDataHandler - .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny())) - .ThrowsAsync(new InvalidOperationException("Simulated Part 1 processing failure")); - - var mockResponse = MultipartDownloadTestHelpers.CreateMultipartFirstPartResponse( - 8 * 1024 * 1024, 2, 16 * 1024 * 1024, "test-etag"); - - var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client( - (req, ct) => Task.FromResult(mockResponse)); + .ThrowsAsync(new InvalidOperationException("Simulated capacity wait failure")); - var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( - downloadType: MultipartDownloadType.PART); + var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(); + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var coordinator = new MultipartDownloadManager( mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); - - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); - - // After discovery, semaphore should have 1 slot held - Assert.AreEqual(initialCount - 1, httpThrottler.CurrentCount, - "After discovery, semaphore should have 1 slot held"); // Act & Assert try { - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); Assert.Fail("Expected InvalidOperationException to be thrown"); } catch (InvalidOperationException ex) { - Assert.AreEqual("Simulated Part 1 processing failure", ex.Message); + Assert.AreEqual("Simulated capacity wait failure", ex.Message); } - // Assert - Finally block should release semaphore + // Assert - Semaphore should NOT be released (it was never acquired) Assert.AreEqual(initialCount, httpThrottler.CurrentCount, - "HTTP semaphore should be released by finally block when Part 1 processing fails"); + "HTTP semaphore should NOT be released when it was never acquired (failed before WaitAsync)"); // Cleanup httpThrottler.Dispose(); } [TestMethod] - public async Task Discovery_WaitForCapacityFails_DoesNotReleaseHttpSemaphore() + public async Task Discovery_HttpRequestAfterCapacityFails_ReleasesHttpSemaphore() { - // Arrange - Test that semaphore is NOT released when it was never acquired + // Arrange - Test semaphore release when HTTP request fails after capacity is acquired var httpThrottler = new SemaphoreSlim(2, 2); var initialCount = httpThrottler.CurrentCount; var mockDataHandler = new Mock(); - // WaitForCapacityAsync fails BEFORE HTTP semaphore is acquired + // WaitForCapacityAsync succeeds (capacity acquired) mockDataHandler .Setup(x => x.WaitForCapacityAsync(It.IsAny())) - .ThrowsAsync(new InvalidOperationException("Simulated capacity wait failure")); + .Returns(Task.CompletedTask); + + // HTTP request fails AFTER both capacity types are acquired + var mockClient = new Mock(); + mockClient + .Setup(x => x.GetObjectAsync(It.IsAny(), It.IsAny())) + .ThrowsAsync(new InvalidOperationException("Simulated S3 failure after capacity acquired")); - var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(); var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var coordinator = new MultipartDownloadManager( @@ -2470,74 +2438,111 @@ public async Task Discovery_WaitForCapacityFails_DoesNotReleaseHttpSemaphore() // Act & Assert try { - await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); Assert.Fail("Expected InvalidOperationException to be thrown"); } catch (InvalidOperationException ex) { - Assert.AreEqual("Simulated capacity wait failure", ex.Message); + Assert.AreEqual("Simulated S3 failure after capacity acquired", ex.Message); } - // Assert - Semaphore should NOT be released (it was never acquired) + // Assert - HTTP semaphore should be released by catch block in discovery Assert.AreEqual(initialCount, httpThrottler.CurrentCount, - "HTTP semaphore should NOT be released when it was never acquired (failed before WaitAsync)"); + "HTTP semaphore should be released when HTTP request fails in discovery"); // Cleanup httpThrottler.Dispose(); } [TestMethod] - public async Task StartDownloadsAsync_BackgroundPartHttpFails_ReleasesHttpSemaphore() + public async Task StartDownloadAsync_PrepareAsyncFails_ReleasesHttpSemaphore() { - // Arrange - Test that background part download failures properly release semaphore - var totalParts = 3; - var partSize = 8 * 1024 * 1024; - var totalObjectSize = totalParts * partSize; - + // Arrange - Test that HTTP semaphore is released when PrepareAsync fails after discovery var httpThrottler = new SemaphoreSlim(2, 2); var initialCount = httpThrottler.CurrentCount; var mockDataHandler = new Mock(); - // WaitForCapacityAsync succeeds for all parts + // WaitForCapacityAsync succeeds mockDataHandler .Setup(x => x.WaitForCapacityAsync(It.IsAny())) .Returns(Task.CompletedTask); - // PrepareAsync succeeds + // ProcessPartAsync succeeds for Part 1 (discovery) mockDataHandler - .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) + .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny())) .Returns(Task.CompletedTask); - // ProcessPartAsync succeeds for Part 1, but not called for Part 2 (HTTP fails first) + // PrepareAsync FAILS (this happens after Part 1 processing in StartDownloadAsync) mockDataHandler - .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny())) - .Returns(Task.CompletedTask); + .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) + .ThrowsAsync(new InvalidOperationException("Simulated prepare failure")); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + 2, 8 * 1024 * 1024, 16 * 1024 * 1024, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); + + // Act & Assert + try + { + await coordinator.StartDownloadAsync(null, CancellationToken.None); + Assert.Fail("Expected InvalidOperationException to be thrown"); + } + catch (InvalidOperationException ex) + { + Assert.AreEqual("Simulated prepare failure", ex.Message); + } + + // Assert - HTTP semaphore should be released even when PrepareAsync fails + Assert.AreEqual(initialCount, httpThrottler.CurrentCount, + "HTTP semaphore should be released when PrepareAsync fails"); + + // Cleanup + httpThrottler.Dispose(); + } + + [TestMethod] + public async Task StartDownloadAsync_BackgroundPartHttpFails_ReleasesHttpSemaphore() + { + // Arrange - Test that HTTP semaphore is released when background part HTTP request fails + var httpThrottler = new SemaphoreSlim(2, 2); + var initialCount = httpThrottler.CurrentCount; + + var mockDataHandler = new Mock(); - // ReleaseCapacity is called on failure + // Capacity checks succeed mockDataHandler - .Setup(x => x.ReleaseCapacity()); + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(Task.CompletedTask); + // Part 1 processing succeeds mockDataHandler - .Setup(x => x.OnDownloadComplete(It.IsAny())); + .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + mockDataHandler.Setup(x => x.OnDownloadComplete(It.IsAny())); + // HTTP client: Part 1 succeeds, Part 2 HTTP request FAILS var callCount = 0; var mockClient = new Mock(); - mockClient.Setup(x => x.GetObjectAsync(It.IsAny(), It.IsAny())) + mockClient + .Setup(x => x.GetObjectAsync(It.IsAny(), It.IsAny())) .Returns(() => { callCount++; if (callCount == 1) { - // Discovery call succeeds + // Part 1 discovery succeeds return Task.FromResult(MultipartDownloadTestHelpers.CreateMultipartFirstPartResponse( - partSize, totalParts, totalObjectSize, "test-etag")); - } - else - { - // Background part HTTP request fails - throw new InvalidOperationException("Simulated HTTP failure for background part"); + 8 * 1024 * 1024, 2, 16 * 1024 * 1024, "test-etag")); } + // Part 2 HTTP request fails + throw new AmazonS3Exception("Simulated S3 HTTP failure"); }); var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( @@ -2545,164 +2550,128 @@ public async Task StartDownloadsAsync_BackgroundPartHttpFails_ReleasesHttpSemaph var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); var coordinator = new MultipartDownloadManager( mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); - - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); - - // After discovery, semaphore should have 1 slot held (for Part 1) - Assert.AreEqual(initialCount - 1, httpThrottler.CurrentCount, - "After discovery, semaphore should have 1 slot held"); // Act - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); - // Wait for background task to fail + // Wait for background task to complete with failure try { await coordinator.DownloadCompletionTask; } - catch (InvalidOperationException) + catch (AmazonS3Exception) { - // Expected failure from background task + // Expected } - // Assert - Semaphore should be fully released (Part 1 released in StartDownloadsAsync, - // Parts 2 and 3 released in CreateDownloadTaskAsync catch blocks) - Assert.AreEqual(initialCount, httpThrottler.CurrentCount, - "HTTP semaphore should be fully released after background part HTTP failure"); - - // Verify ReleaseCapacity was called twice (once for Part 2 that failed, once for Part 3 that got cancelled) - // With sequential capacity acquisition, Part 3 acquired capacity before Part 2's HTTP call failed - mockDataHandler.Verify(x => x.ReleaseCapacity(), Times.Exactly(2), - "ReleaseCapacity should be called for both Part 2 (failed) and Part 3 (cancelled after acquiring capacity)"); + // Assert - HTTP semaphore should be fully released after background failure + Assert.AreEqual(initialCount, httpThrottler.CurrentCount, + "HTTP semaphore should be released when background part HTTP request fails"); // Cleanup httpThrottler.Dispose(); } [TestMethod] - public async Task StartDownloadsAsync_BackgroundPartProcessingFails_ReleasesHttpSemaphore() + public async Task StartDownloadAsync_Part1ProcessingFails_ReleasesHttpSemaphore() { - // Arrange - Test that background part ProcessPartAsync failures properly release semaphore - var totalParts = 3; - var partSize = 8 * 1024 * 1024; - var totalObjectSize = totalParts * partSize; - + // Arrange - Test that HTTP semaphore is released when Part 1 processing fails during discovery var httpThrottler = new SemaphoreSlim(2, 2); var initialCount = httpThrottler.CurrentCount; var mockDataHandler = new Mock(); - // WaitForCapacityAsync succeeds for all parts + // WaitForCapacityAsync succeeds mockDataHandler .Setup(x => x.WaitForCapacityAsync(It.IsAny())) .Returns(Task.CompletedTask); - // PrepareAsync succeeds - mockDataHandler - .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) - .Returns(Task.CompletedTask); - - // ProcessPartAsync succeeds for Part 1, fails for Part 2 - var processCallCount = 0; - mockDataHandler - .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny())) - .Returns((partNum, response, ct) => - { - processCallCount++; - if (partNum == 1) - { - return Task.CompletedTask; // Part 1 succeeds - } - throw new InvalidOperationException($"Simulated processing failure for Part {partNum}"); - }); - - // ReleaseCapacity is called on failure - mockDataHandler - .Setup(x => x.ReleaseCapacity()); - + // Part 1 ProcessPartAsync FAILS (during discovery phase of StartDownloadAsync) mockDataHandler - .Setup(x => x.OnDownloadComplete(It.IsAny())); + .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny())) + .ThrowsAsync(new InvalidOperationException("Simulated Part 1 processing failure")); var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( - totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + 2, 8 * 1024 * 1024, 16 * 1024 * 1024, "test-etag", usePartStrategy: true); var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( downloadType: MultipartDownloadType.PART); - var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var coordinator = new MultipartDownloadManager( mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); - - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); - - // After discovery, semaphore should have 1 slot held - Assert.AreEqual(initialCount - 1, httpThrottler.CurrentCount, - "After discovery, semaphore should have 1 slot held"); - // Act - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); - - // Wait for background task to fail + // Act & Assert try { - await coordinator.DownloadCompletionTask; + await coordinator.StartDownloadAsync(null, CancellationToken.None); + Assert.Fail("Expected InvalidOperationException to be thrown"); } - catch (InvalidOperationException) + catch (InvalidOperationException ex) { - // Expected failure from background task + Assert.AreEqual("Simulated Part 1 processing failure", ex.Message); } - // Assert - Semaphore should be fully released - Assert.AreEqual(initialCount, httpThrottler.CurrentCount, - "HTTP semaphore should be fully released after background part processing failure"); - - // Verify ReleaseCapacity was called twice (once for Part 2 that failed, once for Part 3 that may have continued) - // With sequential capacity acquisition, Part 3 acquired capacity before Part 2's processing failed - mockDataHandler.Verify(x => x.ReleaseCapacity(), Times.Exactly(2), - "ReleaseCapacity should be called for both Part 2 (failed) and Part 3 (cancelled/failed after acquiring capacity)"); + // Assert - HTTP semaphore should be released when Part 1 processing fails + Assert.AreEqual(initialCount, httpThrottler.CurrentCount, + "HTTP semaphore should be released when Part 1 processing fails during discovery"); // Cleanup httpThrottler.Dispose(); } [TestMethod] - public async Task Discovery_HttpRequestAfterCapacityFails_ReleasesHttpSemaphore() + public async Task StartDownloadAsync_BackgroundPartProcessingFails_ReleasesHttpSemaphore() { - // Arrange - Test semaphore release when HTTP request fails after capacity is acquired + // Arrange - Test that HTTP semaphore is released when background part processing fails var httpThrottler = new SemaphoreSlim(2, 2); var initialCount = httpThrottler.CurrentCount; var mockDataHandler = new Mock(); - // WaitForCapacityAsync succeeds (capacity acquired) + // Capacity checks succeed mockDataHandler .Setup(x => x.WaitForCapacityAsync(It.IsAny())) .Returns(Task.CompletedTask); - // HTTP request fails AFTER both capacity types are acquired - var mockClient = new Mock(); - mockClient - .Setup(x => x.GetObjectAsync(It.IsAny(), It.IsAny())) - .ThrowsAsync(new InvalidOperationException("Simulated S3 failure after capacity acquired")); + // Part 1 processing succeeds, Part 2 processing FAILS + mockDataHandler + .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns((partNum, response, ct) => + { + if (partNum == 1) + { + return Task.CompletedTask; // Part 1 succeeds + } + throw new InvalidOperationException("Simulated Part 2 processing failure"); + }); - var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); - var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); + mockDataHandler.Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + 2, 8 * 1024 * 1024, 16 * 1024 * 1024, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); var coordinator = new MultipartDownloadManager( mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); - // Act & Assert + // Act + await coordinator.StartDownloadAsync(null, CancellationToken.None); + + // Wait for background task to complete with failure try { - await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); - Assert.Fail("Expected InvalidOperationException to be thrown"); + await coordinator.DownloadCompletionTask; } - catch (InvalidOperationException ex) + catch (InvalidOperationException) { - Assert.AreEqual("Simulated S3 failure after capacity acquired", ex.Message); + // Expected } - // Assert - HTTP semaphore should be released by catch block in discovery - Assert.AreEqual(initialCount, httpThrottler.CurrentCount, - "HTTP semaphore should be released when HTTP request fails in discovery"); + // Assert - HTTP semaphore should be fully released after background failure + Assert.AreEqual(initialCount, httpThrottler.CurrentCount, + "HTTP semaphore should be released when background part processing fails"); // Cleanup httpThrottler.Dispose(); @@ -3253,10 +3222,10 @@ public async Task ProgressCallback_ConcurrentCompletion_FiresOnlyOneCompletionEv concurrentRequests: 3); // Allow all parts to complete simultaneously var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - await coordinator.StartDownloadsAsync(discoveryResult, progressCallback, CancellationToken.None); + await coordinator.StartDownloadAsync(progressCallback, CancellationToken.None); // Wait for async progress events to complete var success = await WaitForProgressEventsAsync(progressEvents, progressLock, totalObjectSize); @@ -3309,10 +3278,10 @@ public async Task ProgressCallback_MultiplePartsComplete_AggregatesCorrectly() var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - await coordinator.StartDownloadsAsync(discoveryResult, progressCallback, CancellationToken.None); + await coordinator.StartDownloadAsync(progressCallback, CancellationToken.None); // Wait for async progress events to complete var success = await WaitForProgressEventsAsync(progressEvents, progressLock, totalObjectSize); @@ -3370,7 +3339,7 @@ public async Task StartDownloadsAsync_BackgroundPartFails_CancelsInternalToken() // PrepareAsync succeeds mockDataHandler - .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) + .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) .Returns(Task.CompletedTask); // ProcessPartAsync: Controlled execution order using TaskCompletionSource @@ -3417,10 +3386,10 @@ public async Task StartDownloadsAsync_BackgroundPartFails_CancelsInternalToken() var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 2); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - Start downloads - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); // Wait for Part 3 to reach synchronization point await part3ReachedSyncPoint.Task; @@ -3471,7 +3440,7 @@ public async Task StartDownloadsAsync_MultiplePartsFail_HandlesGracefully() .Returns(Task.CompletedTask); mockDataHandler - .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) + .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) .Returns(Task.CompletedTask); // Part 1 succeeds, Parts 2, 3, 4 all fail @@ -3499,10 +3468,10 @@ public async Task StartDownloadsAsync_MultiplePartsFail_HandlesGracefully() var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 3); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); try { @@ -3534,7 +3503,7 @@ public async Task StartDownloadsAsync_CancellationRacesWithDispose_HandlesGracef .Returns(Task.CompletedTask); mockDataHandler - .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) + .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) .Returns(Task.CompletedTask); // Part 1 succeeds, Part 2 fails triggering cancellation @@ -3572,10 +3541,10 @@ public async Task StartDownloadsAsync_CancellationRacesWithDispose_HandlesGracef var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 2); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); - var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + // Act - await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.StartDownloadAsync(null, CancellationToken.None); try {