Source code for the_data_packet.utils.s3
"""AWS S3 storage backend."""
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
from the_data_packet.core.config import get_config
from the_data_packet.core.exceptions import ConfigurationError, NetworkError
from the_data_packet.core.logging import get_logger
logger = get_logger(__name__)
[docs]
@dataclass
class S3UploadResult:
"""Result of S3 upload operation."""
success: bool
s3_url: Optional[str] = None
error_message: Optional[str] = None
file_size_bytes: Optional[int] = None
[docs]
class S3Storage:
"""AWS S3 storage backend."""
[docs]
def __init__(
self,
bucket_name: Optional[str] = None,
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
region: Optional[str] = None,
):
"""
Initialize S3 storage.
Args:
bucket_name: S3 bucket name (defaults to config)
aws_access_key_id: AWS access key (defaults to config/env)
aws_secret_access_key: AWS secret key (defaults to config/env)
region: AWS region (defaults to config)
"""
config = get_config()
self.bucket_name = bucket_name or config.s3_bucket_name
if not self.bucket_name:
raise ConfigurationError("S3 bucket name is required")
self.region = region or config.aws_region
# Initialize S3 client
try:
session_kwargs = {"region_name": self.region}
# Use provided credentials if available
if aws_access_key_id and aws_secret_access_key:
session_kwargs.update(
{
"aws_access_key_id": aws_access_key_id,
"aws_secret_access_key": aws_secret_access_key,
}
)
self.s3_client = boto3.client("s3", **session_kwargs)
# Test credentials
self._test_credentials()
except NoCredentialsError:
raise ConfigurationError(
"AWS credentials not found. Set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY "
"environment variables or configure AWS CLI."
)
except Exception as e:
raise ConfigurationError(f"Failed to initialize S3 client: {e}")
logger.info(f"Initialized S3 storage for bucket: {self.bucket_name}")
[docs]
def upload_file(
self,
local_path: Path,
s3_key: Optional[str] = None,
content_type: Optional[str] = None,
) -> S3UploadResult:
"""
Upload a file to S3.
Args:
local_path: Path to local file
s3_key: S3 object key (defaults to filename)
public: Whether to make the file publicly readable
content_type: MIME type for the file
Returns:
S3UploadResult with upload details
"""
if not local_path.exists():
return S3UploadResult(
success=False, error_message=f"File not found: {local_path}"
)
if s3_key is None:
s3_key = local_path.name
logger.info(f"Uploading {local_path} to s3://{self.bucket_name}/{s3_key}")
try:
file_size = local_path.stat().st_size
# Prepare upload arguments
upload_args: Dict[str, Any] = {
"Filename": str(local_path),
"Bucket": self.bucket_name,
"Key": s3_key,
}
# Add content type if specified
extra_args = {}
if content_type:
extra_args["ContentType"] = content_type
if extra_args:
upload_args["ExtraArgs"] = extra_args
# Upload file
self.s3_client.upload_file(**upload_args)
# Generate S3 URL
s3_url = (
f"https://{self.bucket_name}.s3.{self.region}.amazonaws.com/{s3_key}"
)
logger.info(f"Upload successful: {s3_url}")
return S3UploadResult(
success=True,
s3_url=s3_url,
file_size_bytes=file_size,
)
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "Unknown")
error_message = f"S3 upload failed ({error_code}): {e}"
logger.error(error_message)
return S3UploadResult(
success=False,
error_message=error_message,
)
except Exception as e:
error_message = f"Upload failed: {e}"
logger.error(error_message)
return S3UploadResult(
success=False,
error_message=error_message,
)
def _test_credentials(self) -> None:
"""Test S3 credentials by listing buckets."""
try:
self.s3_client.list_buckets()
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "Unknown")
if error_code == "AccessDenied":
raise ConfigurationError(
"AWS credentials are invalid or lack permissions"
)
elif error_code == "SignatureDoesNotMatch":
raise ConfigurationError("AWS secret access key is incorrect")
else:
raise ConfigurationError(f"AWS credentials test failed: {e}")
except Exception as e:
raise NetworkError(f"Failed to test AWS credentials: {e}")
[docs]
def bucket_exists(self) -> bool:
"""Check if the configured bucket exists and is accessible."""
try:
self.s3_client.head_bucket(Bucket=self.bucket_name)
return True
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "Unknown")
if error_code in ["404", "NoSuchBucket"]:
logger.warning(f"Bucket {self.bucket_name} does not exist")
return False
elif error_code == "403":
logger.warning(f"No access to bucket {self.bucket_name}")
return False
else:
logger.error(f"Error checking bucket {self.bucket_name}: {e}")
return False
except Exception as e:
logger.error(f"Failed to check bucket {self.bucket_name}: {e}")
return False