Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions backend/python/app/sources/external/s3/s3.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from datetime import datetime
from typing import Any, Dict, List, Optional, Union

from app.sources.client.s3.s3 import S3Client, S3Response
from codeflash.code_utils.codeflash_wrap_decorator import \
codeflash_performance_async

try:
import aioboto3 # type: ignore
from botocore.exceptions import ClientError # type: ignore
except ImportError:
raise ImportError("aioboto3 is not installed. Please install it with `pip install aioboto3`")

from app.sources.client.s3.s3 import S3Client, S3Response


class S3DataSource:
Expand Down Expand Up @@ -48,13 +51,12 @@ def _handle_s3_response(self, response: object) -> S3Response:
return S3Response(success=False, error="Empty response from S3 API")

if isinstance(response, dict):
if 'Error' in response:
error_info = response['Error']
error_code = error_info.get('Code', 'Unknown')
error_message = error_info.get('Message', 'No message')
error = response.get('Error')
if error is not None:
error_code = error.get('Code', 'Unknown')
error_message = error.get('Message', 'No message')
return S3Response(success=False, error=f"{error_code}: {error_message}")
return S3Response(success=True, data=response)

return S3Response(success=True, data=response)

except Exception as e:
Expand Down Expand Up @@ -1520,6 +1522,7 @@ async def get_bucket_logging(self,
except Exception as e:
return S3Response(success=False, error=f"Unexpected error: {str(e)}")

@codeflash_performance_async
async def get_bucket_metrics_configuration(self,
Bucket: str,
Id: str,
Expand Down Expand Up @@ -1763,10 +1766,9 @@ async def get_bucket_tagging(self,
kwargs['ExpectedBucketOwner'] = ExpectedBucketOwner

try:
session = await self._get_aioboto3_session()
async with session.client('s3') as s3_client:
response = await getattr(s3_client, 'get_bucket_tagging')(**kwargs)
return self._handle_s3_response(response)
s3_client = await self._get_s3_client()
response = await getattr(s3_client, 'get_bucket_tagging')(**kwargs)
return self._handle_s3_response(response)
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', 'Unknown')
error_message = e.response.get('Error', {}).get('Message', str(e))
Expand Down Expand Up @@ -4318,3 +4320,25 @@ async def get_sdk_info(self) -> S3Response:
'service': 's3'
}
return S3Response(success=True, data=info)


async def _get_s3_client(self) -> object:
"""
Get or create a cached S3 client as an async context manager.
Keeps a persistent client per S3DataSource instance for better reuse.
"""
# Only cache the client if not already active (best effort, still safe for concurrent calls)
if self._s3_client_cached is not None:
return self._s3_client_cached

session = await self._get_aioboto3_session()
# Intentionally not using 'async with' here: we cache the context manager and rely on the lifecycle of S3DataSource.
# This relies on the resource being closed properly when S3DataSource is cleaned up by user code.
self._s3_client_cached = await session.client('s3').__aenter__()
return self._s3_client_cached

async def aclose(self) -> None:
"""Cleanup the cached async S3 client context, if created."""
if self._s3_client_cached is not None:
await self._s3_client_cached.__aexit__(None, None, None)
self._s3_client_cached = None