Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions backend/python/app/sources/external/s3/s3.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from datetime import datetime
from typing import Any, Dict, List, Optional, Union

from app.sources.client.s3.s3 import S3Client, S3Response
from codeflash.code_utils.codeflash_wrap_decorator import \
codeflash_performance_async

try:
import aioboto3 # type: ignore
from botocore.exceptions import ClientError # type: ignore
except ImportError:
raise ImportError("aioboto3 is not installed. Please install it with `pip install aioboto3`")

from app.sources.client.s3.s3 import S3Client, S3Response


class S3DataSource:
Expand Down Expand Up @@ -48,13 +51,12 @@ def _handle_s3_response(self, response: object) -> S3Response:
return S3Response(success=False, error="Empty response from S3 API")

if isinstance(response, dict):
if 'Error' in response:
error_info = response['Error']
error_code = error_info.get('Code', 'Unknown')
error_message = error_info.get('Message', 'No message')
error = response.get('Error')
if error is not None:
error_code = error.get('Code', 'Unknown')
error_message = error.get('Message', 'No message')
return S3Response(success=False, error=f"{error_code}: {error_message}")
return S3Response(success=True, data=response)

return S3Response(success=True, data=response)

except Exception as e:
Expand Down Expand Up @@ -1520,6 +1522,7 @@ async def get_bucket_logging(self,
except Exception as e:
return S3Response(success=False, error=f"Unexpected error: {str(e)}")

@codeflash_performance_async
async def get_bucket_metrics_configuration(self,
Bucket: str,
Id: str,
Expand Down Expand Up @@ -2457,10 +2460,9 @@ async def list_bucket_metrics_configurations(self,
kwargs['ExpectedBucketOwner'] = ExpectedBucketOwner

try:
session = await self._get_aioboto3_session()
async with session.client('s3') as s3_client:
response = await getattr(s3_client, 'list_bucket_metrics_configurations')(**kwargs)
return self._handle_s3_response(response)
s3_client = await self._get_s3_client()
response = await getattr(s3_client, 'list_bucket_metrics_configurations')(**kwargs)
return self._handle_s3_response(response)
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', 'Unknown')
error_message = e.response.get('Error', {}).get('Message', str(e))
Expand Down Expand Up @@ -4318,3 +4320,26 @@ async def get_sdk_info(self) -> S3Response:
'service': 's3'
}
return S3Response(success=True, data=info)


async def _get_s3_client(self):
"""Get or create an async aioboto3 S3 client, cache for reuse within this object."""
# Only create and enter the async client context manager once
if self._s3_client_obj is not None:
return self._s3_client_obj
session = await self._get_aioboto3_session()
self._s3_client_cm = session.client('s3')
self._s3_client_obj = await self._s3_client_cm.__aenter__()
return self._s3_client_obj

async def __aenter__(self):
"""Async context entry: allows for resource management."""
await self._get_s3_client()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context exit: close the S3 client properly."""
if self._s3_client_cm is not None:
await self._s3_client_cm.__aexit__(exc_type, exc_val, exc_tb)
self._s3_client_obj = None
self._s3_client_cm = None