Skip to content

Commit 6574852

Browse files
committed
optimize task creation
stack-info: PR: #4219, branch: GarrettBeatty/gcbeatty/taskoptimization/2 add cancellation stack-info: PR: #4221, branch: GarrettBeatty/gcbeatty/taskoptimization/4
1 parent 91abd13 commit 6574852

File tree

2 files changed

+318
-14
lines changed

2 files changed

+318
-14
lines changed

sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -379,12 +379,31 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
379379
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for buffer space", partNum);
380380

381381
// Acquire capacity sequentially - guarantees Part 2 before Part 3, etc.
382-
await _dataHandler.WaitForCapacityAsync(cancellationToken).ConfigureAwait(false);
382+
await _dataHandler.WaitForCapacityAsync(internalCts.Token).ConfigureAwait(false);
383383

384384
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Buffer space acquired", partNum);
385385

386-
var task = CreateDownloadTaskAsync(partNum, discoveryResult.ObjectSize, wrappedCallback, internalCts.Token);
387-
downloadTasks.Add(task);
386+
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for HTTP concurrency slot (Available: {1}/{2})",
387+
partNum, _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests);
388+
389+
// Acquire HTTP slot in the loop before creating task
390+
// Loop will block here if all slots are in use
391+
await _httpConcurrencySlots.WaitAsync(internalCts.Token).ConfigureAwait(false);
392+
393+
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot acquired", partNum);
394+
395+
try
396+
{
397+
var task = CreateDownloadTaskAsync(partNum, discoveryResult.ObjectSize, wrappedCallback, internalCts.Token);
398+
downloadTasks.Add(task);
399+
}
400+
catch (Exception ex)
401+
{
402+
// If task creation fails, release the HTTP slot we just acquired
403+
_httpConcurrencySlots.Release();
404+
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot released due to task creation failure: {1}", partNum, ex);
405+
throw;
406+
}
388407
}
389408

390409
var expectedTaskCount = downloadTasks.Count;
@@ -418,7 +437,27 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
418437
catch (Exception ex)
419438
{
420439
_downloadException = ex;
421-
_logger.Error(ex, "MultipartDownloadManager: Background download task failed");
440+
441+
442+
443+
// Cancel all remaining downloads immediately to prevent cascading timeout errors
444+
// This ensures that when one part fails, other tasks stop gracefully instead of
445+
// continuing until they hit their own timeout/cancellation errors
446+
// Check if cancellation was already requested to avoid ObjectDisposedException
447+
if (!internalCts.IsCancellationRequested)
448+
{
449+
try
450+
{
451+
internalCts.Cancel();
452+
_logger.DebugFormat("MultipartDownloadManager: Cancelled all in-flight downloads due to error");
453+
}
454+
catch (ObjectDisposedException)
455+
{
456+
// CancellationTokenSource was already disposed, ignore
457+
_logger.DebugFormat("MultipartDownloadManager: CancellationTokenSource already disposed during cancellation");
458+
}
459+
}
460+
422461
_dataHandler.OnDownloadComplete(ex);
423462
throw;
424463
}
@@ -440,6 +479,22 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
440479
_downloadException = ex;
441480
_logger.Error(ex, "MultipartDownloadManager: Download failed");
442481

482+
// Cancel all remaining downloads immediately to prevent cascading timeout errors
483+
// Check if cancellation was already requested to avoid ObjectDisposedException
484+
if (!internalCts.IsCancellationRequested)
485+
{
486+
try
487+
{
488+
internalCts.Cancel();
489+
_logger.DebugFormat("MultipartDownloadManager: Cancelled all in-flight downloads due to error");
490+
}
491+
catch (ObjectDisposedException)
492+
{
493+
// CancellationTokenSource was already disposed, ignore
494+
_logger.DebugFormat("MultipartDownloadManager: CancellationTokenSource already disposed during cancellation");
495+
}
496+
}
497+
443498
_dataHandler.OnDownloadComplete(ex);
444499

445500
// Dispose the CancellationTokenSource if background task was never started
@@ -459,15 +514,8 @@ private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, Even
459514

460515
try
461516
{
462-
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for HTTP concurrency slot (Available: {1}/{2})",
463-
partNumber, _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests);
464-
465-
// Limit HTTP concurrency for both network download AND disk write
466-
// The semaphore is held until AFTER ProcessPartAsync completes to ensure
467-
// ConcurrentServiceRequests controls the entire I/O operation
468-
await _httpConcurrencySlots.WaitAsync(cancellationToken).ConfigureAwait(false);
469-
470-
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot acquired", partNumber);
517+
// HTTP slot was already acquired in the for loop before this task was created
518+
// We just need to use it and release it when done
471519

472520
try
473521
{
@@ -544,7 +592,7 @@ private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, Even
544592
finally
545593
{
546594
// Release semaphore after BOTH network download AND disk write complete
547-
// This ensures ConcurrentServiceRequests limits the entire I/O operation
595+
// Slot was acquired in the for loop before this task was created
548596
_httpConcurrencySlots.Release();
549597
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot released (Available: {1}/{2})",
550598
partNumber, _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests);

sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3337,5 +3337,261 @@ public async Task ProgressCallback_MultiplePartsComplete_AggregatesCorrectly()
33373337
}
33383338

33393339
#endregion
3340+
3341+
#region Cancellation Enhancement Tests
3342+
3343+
[TestMethod]
3344+
public async Task StartDownloadsAsync_BackgroundPartFails_CancelsInternalToken()
3345+
{
3346+
// Arrange - Deterministic test using TaskCompletionSource to control execution order
3347+
// This ensures Part 3 waits at synchronization point, Part 2 fails, then Part 3 checks cancellation
3348+
var totalParts = 3;
3349+
var partSize = 8 * 1024 * 1024;
3350+
var totalObjectSize = totalParts * partSize;
3351+
3352+
var part2Failed = false;
3353+
var part3SawCancellation = false;
3354+
3355+
// Synchronization primitives to control execution order
3356+
var part3ReachedSyncPoint = new TaskCompletionSource<bool>();
3357+
var part2CanFail = new TaskCompletionSource<bool>();
3358+
var part3CanCheckCancellation = new TaskCompletionSource<bool>();
3359+
3360+
var mockDataHandler = new Mock<IPartDataHandler>();
3361+
3362+
// Capacity acquisition succeeds for all parts
3363+
mockDataHandler
3364+
.Setup(x => x.WaitForCapacityAsync(It.IsAny<CancellationToken>()))
3365+
.Returns(Task.CompletedTask);
3366+
3367+
// PrepareAsync succeeds
3368+
mockDataHandler
3369+
.Setup(x => x.PrepareAsync(It.IsAny<DownloadDiscoveryResult>(), It.IsAny<CancellationToken>()))
3370+
.Returns(Task.CompletedTask);
3371+
3372+
// ProcessPartAsync: Controlled execution order using TaskCompletionSource
3373+
mockDataHandler
3374+
.Setup(x => x.ProcessPartAsync(It.IsAny<int>(), It.IsAny<GetObjectResponse>(), It.IsAny<CancellationToken>()))
3375+
.Returns<int, GetObjectResponse, CancellationToken>(async (partNum, response, ct) =>
3376+
{
3377+
if (partNum == 1)
3378+
{
3379+
return; // Part 1 succeeds immediately
3380+
}
3381+
else if (partNum == 2)
3382+
{
3383+
// Part 2 waits for Part 3 to reach sync point before failing
3384+
await part2CanFail.Task;
3385+
part2Failed = true;
3386+
throw new InvalidOperationException("Simulated Part 2 failure");
3387+
}
3388+
else // Part 3
3389+
{
3390+
// Part 3 reaches sync point and signals to Part 2
3391+
part3ReachedSyncPoint.SetResult(true);
3392+
3393+
// Wait for Part 2 to fail and cancellation to propagate
3394+
await part3CanCheckCancellation.Task;
3395+
3396+
// Now check if cancellation was received from internalCts
3397+
if (ct.IsCancellationRequested)
3398+
{
3399+
part3SawCancellation = true;
3400+
throw new OperationCanceledException("Part 3 cancelled due to Part 2 failure");
3401+
}
3402+
}
3403+
});
3404+
3405+
mockDataHandler.Setup(x => x.ReleaseCapacity());
3406+
mockDataHandler.Setup(x => x.OnDownloadComplete(It.IsAny<Exception>()));
3407+
3408+
var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart(
3409+
totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true);
3410+
3411+
var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(
3412+
downloadType: MultipartDownloadType.PART);
3413+
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 2);
3414+
var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object);
3415+
3416+
var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
3417+
3418+
// Act - Start downloads
3419+
await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None);
3420+
3421+
// Wait for Part 3 to reach synchronization point
3422+
await part3ReachedSyncPoint.Task;
3423+
3424+
// Allow Part 2 to fail
3425+
part2CanFail.SetResult(true);
3426+
3427+
// Give cancellation time to propagate
3428+
await Task.Delay(100);
3429+
3430+
// Allow Part 3 to check cancellation
3431+
part3CanCheckCancellation.SetResult(true);
3432+
3433+
// Wait for background task to complete
3434+
try
3435+
{
3436+
await coordinator.DownloadCompletionTask;
3437+
}
3438+
catch (InvalidOperationException)
3439+
{
3440+
// Expected failure from Part 2
3441+
}
3442+
3443+
// Assert - Deterministic verification that cancellation propagated
3444+
Assert.IsTrue(part2Failed, "Part 2 should have failed");
3445+
Assert.IsTrue(part3SawCancellation,
3446+
"Part 3 should have received cancellation via internalCts.Token (deterministic with TaskCompletionSource)");
3447+
3448+
Assert.IsNotNull(coordinator.DownloadException,
3449+
"Download exception should be captured when background part fails");
3450+
Assert.IsInstanceOfType(coordinator.DownloadException, typeof(InvalidOperationException),
3451+
"Download exception should be the Part 2 failure");
3452+
}
3453+
3454+
[TestMethod]
3455+
public async Task StartDownloadsAsync_MultiplePartsFail_HandlesGracefully()
3456+
{
3457+
// Arrange - Test simultaneous failures from multiple parts
3458+
var totalParts = 4;
3459+
var partSize = 8 * 1024 * 1024;
3460+
var totalObjectSize = totalParts * partSize;
3461+
3462+
var failedParts = new System.Collections.Concurrent.ConcurrentBag<int>();
3463+
var mockDataHandler = new Mock<IPartDataHandler>();
3464+
3465+
mockDataHandler
3466+
.Setup(x => x.WaitForCapacityAsync(It.IsAny<CancellationToken>()))
3467+
.Returns(Task.CompletedTask);
3468+
3469+
mockDataHandler
3470+
.Setup(x => x.PrepareAsync(It.IsAny<DownloadDiscoveryResult>(), It.IsAny<CancellationToken>()))
3471+
.Returns(Task.CompletedTask);
3472+
3473+
// Part 1 succeeds, Parts 2, 3, 4 all fail
3474+
mockDataHandler
3475+
.Setup(x => x.ProcessPartAsync(It.IsAny<int>(), It.IsAny<GetObjectResponse>(), It.IsAny<CancellationToken>()))
3476+
.Returns<int, GetObjectResponse, CancellationToken>((partNum, response, ct) =>
3477+
{
3478+
if (partNum == 1)
3479+
{
3480+
return Task.CompletedTask;
3481+
}
3482+
3483+
failedParts.Add(partNum);
3484+
throw new InvalidOperationException($"Simulated Part {partNum} failure");
3485+
});
3486+
3487+
mockDataHandler.Setup(x => x.ReleaseCapacity());
3488+
mockDataHandler.Setup(x => x.OnDownloadComplete(It.IsAny<Exception>()));
3489+
3490+
var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart(
3491+
totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true);
3492+
3493+
var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(
3494+
downloadType: MultipartDownloadType.PART);
3495+
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 3);
3496+
var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object);
3497+
3498+
var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
3499+
3500+
// Act
3501+
await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None);
3502+
3503+
try
3504+
{
3505+
await coordinator.DownloadCompletionTask;
3506+
}
3507+
catch (InvalidOperationException)
3508+
{
3509+
// Expected - at least one part failed
3510+
}
3511+
3512+
// Assert - Should handle multiple failures gracefully
3513+
Assert.IsTrue(failedParts.Count > 0, "At least one part should have failed");
3514+
Assert.IsNotNull(coordinator.DownloadException, "Download exception should be captured");
3515+
}
3516+
3517+
[TestMethod]
3518+
public async Task StartDownloadsAsync_CancellationRacesWithDispose_HandlesGracefully()
3519+
{
3520+
// Arrange - Test race condition between Cancel() and Dispose()
3521+
var totalParts = 3;
3522+
var partSize = 8 * 1024 * 1024;
3523+
var totalObjectSize = totalParts * partSize;
3524+
3525+
var objectDisposedExceptionCaught = false;
3526+
var mockDataHandler = new Mock<IPartDataHandler>();
3527+
3528+
mockDataHandler
3529+
.Setup(x => x.WaitForCapacityAsync(It.IsAny<CancellationToken>()))
3530+
.Returns(Task.CompletedTask);
3531+
3532+
mockDataHandler
3533+
.Setup(x => x.PrepareAsync(It.IsAny<DownloadDiscoveryResult>(), It.IsAny<CancellationToken>()))
3534+
.Returns(Task.CompletedTask);
3535+
3536+
// Part 1 succeeds, Part 2 fails triggering cancellation
3537+
mockDataHandler
3538+
.Setup(x => x.ProcessPartAsync(It.IsAny<int>(), It.IsAny<GetObjectResponse>(), It.IsAny<CancellationToken>()))
3539+
.Returns<int, GetObjectResponse, CancellationToken>((partNum, response, ct) =>
3540+
{
3541+
if (partNum == 1)
3542+
{
3543+
return Task.CompletedTask;
3544+
}
3545+
3546+
// Part 2 failure will trigger Cancel() in catch block
3547+
// The enhancement should check IsCancellationRequested to avoid ObjectDisposedException
3548+
throw new InvalidOperationException("Simulated Part 2 failure");
3549+
});
3550+
3551+
mockDataHandler.Setup(x => x.ReleaseCapacity());
3552+
mockDataHandler
3553+
.Setup(x => x.OnDownloadComplete(It.IsAny<Exception>()))
3554+
.Callback<Exception>(ex =>
3555+
{
3556+
// Check if ObjectDisposedException was handled
3557+
if (ex is ObjectDisposedException)
3558+
{
3559+
objectDisposedExceptionCaught = true;
3560+
}
3561+
});
3562+
3563+
var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart(
3564+
totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true);
3565+
3566+
var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(
3567+
downloadType: MultipartDownloadType.PART);
3568+
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 2);
3569+
var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object);
3570+
3571+
var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
3572+
3573+
// Act
3574+
await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None);
3575+
3576+
try
3577+
{
3578+
await coordinator.DownloadCompletionTask;
3579+
}
3580+
catch (InvalidOperationException)
3581+
{
3582+
// Expected failure
3583+
}
3584+
3585+
// Assert - The enhancement should prevent ObjectDisposedException from being thrown
3586+
// by checking IsCancellationRequested before calling Cancel()
3587+
Assert.IsFalse(objectDisposedExceptionCaught,
3588+
"ObjectDisposedException should not propagate due to IsCancellationRequested check");
3589+
Assert.IsNotNull(coordinator.DownloadException,
3590+
"Download exception should be the original failure, not ObjectDisposedException");
3591+
Assert.IsInstanceOfType(coordinator.DownloadException, typeof(InvalidOperationException),
3592+
"Download exception should be the original InvalidOperationException from Part 2 failure");
3593+
}
3594+
3595+
#endregion
33403596
}
33413597
}

0 commit comments

Comments
 (0)