forked from LiveCarta/ContentGeneration
Initial
This commit is contained in:
71
s3_video_storage.py
Normal file
71
s3_video_storage.py
Normal file
@@ -0,0 +1,71 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Mapping
|
||||
|
||||
import boto3
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class S3Config:
|
||||
bucket_name: str
|
||||
region_name: str | None = None
|
||||
endpoint_url: str | None = None
|
||||
aws_access_key_id: str | None = None
|
||||
aws_secret_access_key: str | None = None
|
||||
aws_session_token: str | None = None
|
||||
|
||||
|
||||
class S3VideoStorage:
|
||||
def __init__(self, s3_config: S3Config | Mapping[str, Any]) -> None:
|
||||
self.config = self._normalize_config(s3_config)
|
||||
|
||||
client_kwargs: dict[str, Any] = {
|
||||
"region_name": self.config.region_name,
|
||||
"endpoint_url": self.config.endpoint_url,
|
||||
"aws_access_key_id": self.config.aws_access_key_id,
|
||||
"aws_secret_access_key": self.config.aws_secret_access_key,
|
||||
"aws_session_token": self.config.aws_session_token,
|
||||
}
|
||||
filtered_kwargs = {k: v for k, v in client_kwargs.items() if v is not None}
|
||||
self._s3_client = boto3.client("s3", **filtered_kwargs)
|
||||
|
||||
def store_file(self, file_path: str | Path) -> str:
|
||||
path = Path(file_path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"File does not exist: {path}")
|
||||
if not path.is_file():
|
||||
raise ValueError(f"Path is not a file: {path}")
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
key = f"video_content/{now.year:04d}/{now.month:02d}/{now.day:02d}/{path.name}"
|
||||
|
||||
self._s3_client.upload_file(str(path), self.config.bucket_name, key)
|
||||
return f"s3://{self.config.bucket_name}/{key}"
|
||||
|
||||
@staticmethod
|
||||
def _normalize_config(s3_config: S3Config | Mapping[str, Any]) -> S3Config:
|
||||
if isinstance(s3_config, S3Config):
|
||||
return s3_config
|
||||
|
||||
bucket_name = s3_config.get("bucket_name")
|
||||
if not bucket_name:
|
||||
raise ValueError("s3_config must contain non-empty 'bucket_name'")
|
||||
|
||||
return S3Config(
|
||||
bucket_name=str(bucket_name),
|
||||
region_name=_optional_str(s3_config, "region_name"),
|
||||
endpoint_url=_optional_str(s3_config, "endpoint_url"),
|
||||
aws_access_key_id=_optional_str(s3_config, "aws_access_key_id"),
|
||||
aws_secret_access_key=_optional_str(s3_config, "aws_secret_access_key"),
|
||||
aws_session_token=_optional_str(s3_config, "aws_session_token"),
|
||||
)
|
||||
|
||||
|
||||
def _optional_str(config: Mapping[str, Any], key: str) -> str | None:
|
||||
value = config.get(key)
|
||||
if value is None:
|
||||
return None
|
||||
return str(value)
|
||||
Reference in New Issue
Block a user