⚡️ Speed up method S3DataSource.get_object by 43%
#621
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
📄 43% (0.43x) speedup for
S3DataSource.get_objectinbackend/python/app/sources/external/s3/s3.py⏱️ Runtime :
6.38 milliseconds→4.45 milliseconds(best of77runs)📝 Explanation and details
The optimized code achieves a 43% runtime improvement through session caching, a crucial optimization for async S3 operations.
Key Optimization: Session Caching
The original code called
await self._get_aioboto3_session()on every request, creating a new session each time. The optimized version introduces session caching with:Performance Impact Analysis:
Why This Works:
aioboto3 session creation involves credential resolution, region configuration, and connection pool setup - expensive operations that don't need repetition for the same S3DataSource instance. By caching the session after first creation, subsequent calls skip this overhead entirely.
Throughput Benefits:
The optimization delivers a 2.7% throughput improvement (64,425 → 66,143 ops/sec), particularly beneficial for:
Test Case Performance:
The optimization shows consistent benefits across all test scenarios, with the most significant gains in concurrent and high-volume operations where session reuse multiplies the performance advantage.
✅ Correctness verification report:
🌀 Generated Regression Tests and Runtime
import asyncio # used to run async functions
from datetime import datetime, timedelta
from typing import Optional
import pytest # used for our unit tests
---- Function to test (EXACT COPY) ----
from app.sources.client.s3.s3 import S3Client, S3Response
from app.sources.external.s3.s3 import S3DataSource
---- Minimal S3Client and S3Response mocks for testing ----
class S3Response:
"""Minimal S3Response for testing."""
def init(self, success: bool, data=None, error: Optional[str] = None):
self.success = success
self.data = data
self.error = error
class S3Client:
"""Dummy S3Client for compatibility (not used directly)."""
pass
---- Dummy aioboto3 session and client for async context ----
class DummyS3Client:
"""Dummy async S3 client with get_object method."""
async def get_object(self, **kwargs):
# Simulate different responses based on input for testing
# Basic valid case
if kwargs['Bucket'] == 'test-bucket' and kwargs['Key'] == 'test-key':
# Simulate a valid S3 object response
return {'Body': b'data', 'ContentLength': 4, 'Metadata': {'foo': 'bar'}}
# Simulate missing key
if kwargs['Key'] == 'missing-key':
raise DummyClientError({'Error': {'Code': 'NoSuchKey', 'Message': 'The specified key does not exist.'}})
# Simulate bucket not found
if kwargs['Bucket'] == 'missing-bucket':
raise DummyClientError({'Error': {'Code': 'NoSuchBucket', 'Message': 'The specified bucket does not exist.'}})
# Simulate large object
if kwargs['Key'] == 'large-object':
return {'Body': b'x' * 1024 * 1024, 'ContentLength': 1024 * 1024}
# Simulate versioned object
if kwargs.get('VersionId') == 'v1':
return {'Body': b'versioned', 'ContentLength': 9, 'VersionId': 'v1'}
# Simulate range request
if kwargs.get('Range') == 'bytes=0-3':
return {'Body': b'data', 'ContentLength': 4}
# Simulate IfModifiedSince
if kwargs.get('IfModifiedSince'):
# IfModifiedSince in future returns error
if kwargs['IfModifiedSince'] > datetime.now():
raise DummyClientError({'Error': {'Code': 'PreconditionFailed', 'Message': 'Object not modified.'}})
# Simulate generic error
if kwargs.get('Key') == 'error-key':
raise Exception("Unexpected error occurred")
# Default: valid response
return {'Body': b'default', 'ContentLength': 7}
class DummyClientError(Exception):
"""Simulate botocore.exceptions.ClientError."""
def init(self, response):
self.response = response
class DummySession:
"""Dummy async session context manager."""
async def aenter(self):
return DummyS3Client()
async def aexit(self, exc_type, exc, tb):
pass
def client(self, service_name):
return self
---- Patch S3DataSource for testing ----
Patch S3DataSource._get_aioboto3_session to return DummySession
def dummy_get_aioboto3_session(self):
return DummySession()
def dummy_handle_s3_response(self, response):
# Return S3Response with success and data
return S3Response(success=True, data=response)
try:
from botocore.exceptions import ClientError # type: ignore
except ImportError:
raise ImportError("aioboto3 is not installed. Please install it with
pip install aioboto3")from app.sources.external.s3.s3 import S3DataSource
---- Patch S3DataSource for testing ----
S3DataSource._get_aioboto3_session = dummy_get_aioboto3_session
S3DataSource._handle_s3_response = dummy_handle_s3_response
---- Unit Tests ----
@pytest.mark.asyncio
async def test_get_object_basic_missing_key():
"""Test get_object returns error for missing key."""
s3 = S3DataSource(S3Client())
resp = await s3.get_object(Bucket='test-bucket', Key='missing-key')
@pytest.mark.asyncio
async def test_get_object_basic_missing_bucket():
"""Test get_object returns error for missing bucket."""
s3 = S3DataSource(S3Client())
resp = await s3.get_object(Bucket='missing-bucket', Key='test-key')
@pytest.mark.asyncio
async def test_get_object_edge_versioned_object():
"""Test get_object with VersionId parameter."""
s3 = S3DataSource(S3Client())
resp = await s3.get_object(Bucket='test-bucket', Key='test-key', VersionId='v1')
@pytest.mark.asyncio
async def test_get_object_edge_range_request():
"""Test get_object with Range header."""
s3 = S3DataSource(S3Client())
resp = await s3.get_object(Bucket='test-bucket', Key='test-key', Range='bytes=0-3')
@pytest.mark.asyncio
async def test_get_object_edge_if_modified_since_future():
"""Test get_object with IfModifiedSince set to future date (should error)."""
s3 = S3DataSource(S3Client())
future_dt = datetime.now() + timedelta(days=1)
resp = await s3.get_object(Bucket='test-bucket', Key='test-key', IfModifiedSince=future_dt)
@pytest.mark.asyncio
async def test_get_object_edge_unexpected_exception():
"""Test get_object returns error for unexpected exception."""
s3 = S3DataSource(S3Client())
resp = await s3.get_object(Bucket='test-bucket', Key='error-key')
@pytest.mark.asyncio
async def test_get_object_concurrent_requests():
"""Test concurrent get_object calls with asyncio.gather."""
s3 = S3DataSource(S3Client())
# 5 concurrent requests: 3 valid, 2 error
tasks = [
s3.get_object(Bucket='test-bucket', Key='test-key'),
s3.get_object(Bucket='test-bucket', Key='missing-key'),
s3.get_object(Bucket='test-bucket', Key='test-key', Range='bytes=0-3'),
s3.get_object(Bucket='missing-bucket', Key='test-key'),
s3.get_object(Bucket='test-bucket', Key='error-key'),
]
results = await asyncio.gather(*tasks)
@pytest.mark.asyncio
async def test_get_object_large_object():
"""Test get_object with a large object (1MB)."""
s3 = S3DataSource(S3Client())
resp = await s3.get_object(Bucket='test-bucket', Key='large-object')
@pytest.mark.asyncio
async def test_get_object_large_scale_mixed_concurrent():
"""Test get_object with mixed valid/error concurrent requests."""
s3 = S3DataSource(S3Client())
tasks = []
for i in range(25):
tasks.append(s3.get_object(Bucket='test-bucket', Key='test-key'))
tasks.append(s3.get_object(Bucket='test-bucket', Key='missing-key'))
results = await asyncio.gather(*tasks)
for i in range(50):
if i % 2 == 0:
pass
else:
pass
---- Throughput Tests ----
@pytest.mark.asyncio
async def test_get_object_throughput_large_load():
"""Throughput: Test 200 concurrent get_object calls (large load, bounded)."""
s3 = S3DataSource(S3Client())
tasks = [s3.get_object(Bucket='test-bucket', Key='test-key') for _ in range(200)]
results = await asyncio.gather(*tasks)
@pytest.mark.asyncio
async def test_get_object_throughput_mixed_load():
"""Throughput: Test mixed valid/error requests under concurrent load."""
s3 = S3DataSource(S3Client())
tasks = []
for i in range(50):
tasks.append(s3.get_object(Bucket='test-bucket', Key='test-key'))
tasks.append(s3.get_object(Bucket='test-bucket', Key='missing-key'))
results = await asyncio.gather(*tasks)
valid = [r for r in results if r.success]
error = [r for r in results if not r.success]
for r in error:
pass
@pytest.mark.asyncio
async def test_get_object_throughput_large_object_load():
"""Throughput: Test concurrent large object retrievals."""
s3 = S3DataSource(S3Client())
tasks = [s3.get_object(Bucket='test-bucket', Key='large-object') for _ in range(10)]
results = await asyncio.gather(*tasks)
for r in results:
pass
codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
import asyncio # used to run async functions
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
import pytest # used for our unit tests
from app.sources.external.s3.s3 import S3DataSource
--- Minimal stubs for S3Client and S3Response to allow testing ---
class S3Response:
def init(self, success: bool, data=None, error=None):
self.success = success
self.data = data
self.error = error
class S3Client:
pass # Not used directly in our tests, just for type compatibility
--- The function to test (copied EXACTLY as provided) ---
try:
from botocore.exceptions import ClientError # type: ignore
except ImportError:
raise ImportError("aioboto3 is not installed. Please install it with
pip install aioboto3")from app.sources.external.s3.s3 import S3DataSource
--- TESTS ---
@pytest.fixture
def s3datasource_success():
"""
Returns an S3DataSource instance with all dependencies mocked for successful get_object.
"""
# Mock session.client('s3') as async context manager
mock_s3_client = AsyncMock()
# The s3_client.get_object returns a dict (simulated S3 response)
mock_s3_client.get_object = AsyncMock(return_value={'Body': b'data', 'ContentLength': 4, 'Key': 'key1'})
mock_cm = AsyncMock()
mock_cm.aenter.return_value = mock_s3_client
mock_session = AsyncMock()
mock_session.client.return_value = mock_cm
@pytest.fixture
def s3datasource_clienterror():
"""
Returns an S3DataSource instance where get_object raises a ClientError.
"""
mock_s3_client = AsyncMock()
error_response = {
'Error': {'Code': 'NoSuchKey', 'Message': 'The specified key does not exist.'}
}
# Simulate botocore.exceptions.ClientError
client_error = ClientError(error_response, operation_name='GetObject')
mock_s3_client.get_object = AsyncMock(side_effect=client_error)
mock_cm = AsyncMock()
mock_cm.aenter.return_value = mock_s3_client
mock_session = AsyncMock()
mock_session.client.return_value = mock_cm
@pytest.fixture
def s3datasource_exception():
"""
Returns an S3DataSource instance where get_object raises a generic Exception.
"""
mock_s3_client = AsyncMock()
mock_s3_client.get_object = AsyncMock(side_effect=RuntimeError("Unexpected failure"))
mock_cm = AsyncMock()
mock_cm.aenter.return_value = mock_s3_client
mock_session = AsyncMock()
mock_session.client.return_value = mock_cm
--- BASIC TEST CASES ---
@pytest.mark.asyncio
To edit these changes
git checkout codeflash/optimize-S3DataSource.get_object-mhx0sbaiand push.