Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ internal class BufferedMultipartStream : Stream

private bool _initialized = false;
private bool _disposed = false;
private DownloadDiscoveryResult _discoveryResult;
private DownloadResult _discoveryResult;
private long _totalBytesRead = 0;

private readonly Logger _logger = Logger.GetLogger(typeof(BufferedMultipartStream));

/// <summary>
/// Gets the <see cref="DownloadDiscoveryResult"/> containing metadata from the initial GetObject response.
/// Gets the <see cref="DownloadResult"/> containing metadata from the initial GetObject response.
/// Available after <see cref="InitializeAsync"/> completes successfully.
/// </summary>
public DownloadDiscoveryResult DiscoveryResult => _discoveryResult;
public DownloadResult DiscoveryResult => _discoveryResult;

/// <summary>
/// Creates a new <see cref="BufferedMultipartStream"/> with dependency injection.
Expand Down Expand Up @@ -112,16 +112,14 @@ public async Task InitializeAsync(CancellationToken cancellationToken)

_logger.DebugFormat("BufferedMultipartStream: Starting initialization");

_discoveryResult = await _downloadCoordinator.DiscoverDownloadStrategyAsync(cancellationToken)
.ConfigureAwait(false);

_logger.DebugFormat("BufferedMultipartStream: Discovery completed - ObjectSize={0}, TotalParts={1}, IsSinglePart={2}",
// Start unified download operation (discovers strategy and starts downloads)
_discoveryResult = await _downloadCoordinator.StartDownloadAsync(null, cancellationToken)
.ConfigureAwait(false);

_logger.DebugFormat("BufferedMultipartStream: Download started - ObjectSize={0}, TotalParts={1}, IsSinglePart={2}",
_discoveryResult.ObjectSize,
_discoveryResult.TotalParts,
_discoveryResult.IsSinglePart);

await _downloadCoordinator.StartDownloadsAsync(_discoveryResult, null, cancellationToken)
.ConfigureAwait(false);

_initialized = true;
_logger.DebugFormat("BufferedMultipartStream: Initialization completed successfully");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public BufferedPartDataHandler(
_config = config ?? throw new ArgumentNullException(nameof(config));
}

public Task PrepareAsync(DownloadDiscoveryResult discoveryResult, CancellationToken cancellationToken)
public Task PrepareAsync(DownloadResult discoveryResult, CancellationToken cancellationToken)
{
// No preparation needed for buffered handler - buffers are created on demand
return Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public FilePartDataHandler(FileDownloadConfiguration config)
}

/// <inheritdoc/>
public Task PrepareAsync(DownloadDiscoveryResult discoveryResult, CancellationToken cancellationToken)
public Task PrepareAsync(DownloadResult discoveryResult, CancellationToken cancellationToken)
{
// Create temporary file once during preparation phase
_tempFilePath = _fileHandler.CreateTemporaryFile(_config.DestinationFilePath);
Expand Down
41 changes: 24 additions & 17 deletions sdk/src/Services/S3/Custom/Transfer/Internal/IDownloadManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,29 @@ namespace Amazon.S3.Transfer.Internal
internal interface IDownloadManager : IDisposable
{
/// <summary>
/// Discovers whether the object requires single-part or multipart downloading.
/// Discovers the download strategy and starts concurrent downloads in a single operation.
/// This unified method eliminates resource leakage by managing HTTP slots and buffer capacity
/// internally throughout the entire download lifecycle.
/// </summary>
/// <param name="cancellationToken">A token to cancel the discovery operation.</param>
/// <returns>
/// A task containing discovery results including total parts, object size,
/// and initial response data if single-part.
/// </returns>
Task<DownloadDiscoveryResult> DiscoverDownloadStrategyAsync(CancellationToken cancellationToken);

/// <summary>
/// Starts concurrent downloads with HTTP concurrency control and part range calculations.
/// </summary>
/// <param name="discoveryResult">Results from the discovery phase.</param>
/// <param name="progressCallback">Optional callback for progress tracking events.</param>
/// <param name="cancellationToken">A token to cancel the download operation.</param>
/// <returns>A task that completes when all downloads finish or an error occurs.</returns>
Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, EventHandler<WriteObjectProgressArgs> progressCallback, CancellationToken cancellationToken);
/// <returns>
/// A task containing download results including total parts, object size,
/// and initial response data.
/// </returns>
/// <remarks>
/// This method performs both discovery and download operations atomically:
/// 1. Acquires HTTP slot and buffer capacity
/// 2. Makes initial GetObject request to discover download strategy
/// 3. Processes Part 1 immediately
/// 4. Starts background downloads for remaining parts (if multipart)
/// 5. Returns after Part 1 is processed, allowing consumer to begin reading
///
/// Resources (HTTP slots, buffer capacity) are managed internally and released
/// at the appropriate times, eliminating the awkward resource holding that existed
/// with the previous two-method API.
/// </remarks>
Task<DownloadResult> StartDownloadAsync(EventHandler<WriteObjectProgressArgs> progressCallback, CancellationToken cancellationToken);

/// <summary>
/// Exception that occurred during downloads, if any.
Expand All @@ -57,9 +63,9 @@ internal interface IDownloadManager : IDisposable
}

/// <summary>
/// Download discovery results with metadata for determining download strategy.
/// Download results with metadata about the completed discovery and initial download.
/// </summary>
internal class DownloadDiscoveryResult
internal class DownloadResult
{
/// <summary>
/// Total parts needed (1 = single-part, >1 = multipart).
Expand All @@ -72,7 +78,8 @@ internal class DownloadDiscoveryResult
public long ObjectSize { get; set; }

/// <summary>
/// GetObjectResponse obtained during download initialization, containing the ResponseStream. Represents the complete object for single-part downloads or the first range/part for multipart downloads.
/// GetObjectResponse obtained during download initialization, containing the ResponseStream.
/// Represents the complete object for single-part downloads or the first range/part for multipart downloads.
/// </summary>
public GetObjectResponse InitialResponse { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ internal interface IPartDataHandler : IDisposable
/// <param name="discoveryResult">Discovery result containing object metadata</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>Task that completes when preparation is done</returns>
Task PrepareAsync(DownloadDiscoveryResult discoveryResult, CancellationToken cancellationToken);
Task PrepareAsync(DownloadResult discoveryResult, CancellationToken cancellationToken);

/// <summary>
/// Process a downloaded part from the GetObjectResponse.
Expand Down
Loading