forked from LiveCarta/ContentAutomation
72 lines
3.0 KiB
Python
72 lines
3.0 KiB
Python
from __future__ import annotations
|
|
|
|
from content_automation.adapters.publish_store.mongodb import MongoPublishedContentStore
|
|
from content_automation.adapters.social.instagram import InstagramAdapter
|
|
from content_automation.adapters.social.youtube import YouTubeAdapter
|
|
from content_automation.adapters.storage.base import StorageAdapterBase
|
|
from content_automation.adapters.storage.local import LocalFilesystemStorageAdapter
|
|
from content_automation.adapters.storage.s3 import S3StorageAdapter
|
|
from content_automation.interfaces import PublishedContentStore, SocialNetworkAdapter
|
|
from content_automation.settings import AppSettings
|
|
|
|
|
|
def build_storage_adapter(settings: AppSettings) -> StorageAdapterBase:
|
|
backend = settings.storage.backend.lower()
|
|
if backend == "local":
|
|
return LocalFilesystemStorageAdapter(
|
|
root_directory=settings.storage.local.root_directory
|
|
)
|
|
if backend == "s3":
|
|
return S3StorageAdapter(
|
|
bucket_name=settings.storage.s3.bucket_name,
|
|
key_prefix=settings.storage.s3.key_prefix,
|
|
region_name=settings.storage.s3.region_name,
|
|
endpoint_url=settings.storage.s3.endpoint_url,
|
|
public_url_base=settings.storage.s3.public_url_base,
|
|
url_expiration_seconds=settings.storage.s3.url_expiration_seconds,
|
|
)
|
|
raise ValueError(f"Unsupported storage backend: {settings.storage.backend}")
|
|
|
|
|
|
def build_social_adapters(settings: AppSettings) -> dict[str, SocialNetworkAdapter]:
|
|
return {
|
|
"instagram": InstagramAdapter(
|
|
access_token=settings.instagram.access_token,
|
|
user_id=settings.instagram.user_id,
|
|
api_version=settings.instagram.api_version,
|
|
),
|
|
"youtube": YouTubeAdapter(
|
|
access_token=settings.youtube.access_token,
|
|
category_id=settings.youtube.category_id,
|
|
privacy_status=settings.youtube.privacy_status,
|
|
use_resumable_upload=settings.youtube.use_resumable_upload,
|
|
resumable_chunk_size=settings.youtube.resumable_chunk_size,
|
|
credentials_file_path=settings.youtube.credentials_file_path or None,
|
|
refresh_token=settings.youtube.refresh_token or None,
|
|
client_id=settings.youtube.client_id or None,
|
|
client_secret=settings.youtube.client_secret or None,
|
|
token_uri=settings.youtube.token_uri,
|
|
scopes=settings.youtube.scopes,
|
|
expiry=settings.youtube.expiry or None,
|
|
),
|
|
}
|
|
|
|
|
|
def build_published_content_store(
|
|
settings: AppSettings,
|
|
) -> PublishedContentStore | None:
|
|
if not settings.mongodb.enabled:
|
|
return None
|
|
|
|
if not settings.mongodb.connection_uri:
|
|
raise ValueError(
|
|
"CONTENT_AUTOMATION_MONGODB__CONNECTION_URI must be set when "
|
|
"MongoDB persistence is enabled."
|
|
)
|
|
|
|
return MongoPublishedContentStore(
|
|
connection_uri=settings.mongodb.connection_uri,
|
|
database_name=settings.mongodb.database_name,
|
|
collection_name=settings.mongodb.collection_name,
|
|
)
|