forked from LiveCarta/ContentGeneration
72 lines
2.5 KiB
Python
72 lines
2.5 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Mapping
|
|
|
|
import boto3
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class S3Config:
|
|
bucket_name: str
|
|
region_name: str | None = None
|
|
endpoint_url: str | None = None
|
|
aws_access_key_id: str | None = None
|
|
aws_secret_access_key: str | None = None
|
|
aws_session_token: str | None = None
|
|
|
|
|
|
class S3VideoStorage:
|
|
def __init__(self, s3_config: S3Config | Mapping[str, Any]) -> None:
|
|
self.config = self._normalize_config(s3_config)
|
|
|
|
client_kwargs: dict[str, Any] = {
|
|
"region_name": self.config.region_name,
|
|
"endpoint_url": self.config.endpoint_url,
|
|
"aws_access_key_id": self.config.aws_access_key_id,
|
|
"aws_secret_access_key": self.config.aws_secret_access_key,
|
|
"aws_session_token": self.config.aws_session_token,
|
|
}
|
|
filtered_kwargs = {k: v for k, v in client_kwargs.items() if v is not None}
|
|
self._s3_client = boto3.client("s3", **filtered_kwargs)
|
|
|
|
def store_file(self, file_path: str | Path) -> str:
|
|
path = Path(file_path)
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"File does not exist: {path}")
|
|
if not path.is_file():
|
|
raise ValueError(f"Path is not a file: {path}")
|
|
|
|
now = datetime.now(timezone.utc)
|
|
key = f"video_content/{now.year:04d}/{now.month:02d}/{now.day:02d}/{path.name}"
|
|
|
|
self._s3_client.upload_file(str(path), self.config.bucket_name, key)
|
|
return f"s3://{self.config.bucket_name}/{key}"
|
|
|
|
@staticmethod
|
|
def _normalize_config(s3_config: S3Config | Mapping[str, Any]) -> S3Config:
|
|
if isinstance(s3_config, S3Config):
|
|
return s3_config
|
|
|
|
bucket_name = s3_config.get("bucket_name")
|
|
if not bucket_name:
|
|
raise ValueError("s3_config must contain non-empty 'bucket_name'")
|
|
|
|
return S3Config(
|
|
bucket_name=str(bucket_name),
|
|
region_name=_optional_str(s3_config, "region_name"),
|
|
endpoint_url=_optional_str(s3_config, "endpoint_url"),
|
|
aws_access_key_id=_optional_str(s3_config, "aws_access_key_id"),
|
|
aws_secret_access_key=_optional_str(s3_config, "aws_secret_access_key"),
|
|
aws_session_token=_optional_str(s3_config, "aws_session_token"),
|
|
)
|
|
|
|
|
|
def _optional_str(config: Mapping[str, Any], key: str) -> str | None:
|
|
value = config.get(key)
|
|
if value is None:
|
|
return None
|
|
return str(value)
|