1
0

Implemented content upload app with tests and pre-commit hooks

This commit is contained in:
2026-03-13 11:30:28 +00:00
commit 342d39d457
33 changed files with 2740 additions and 0 deletions

59
tests/test_controller.py Normal file
View File

@@ -0,0 +1,59 @@
from __future__ import annotations
import pytest
from content_automation.adapters.storage.base import StorageAdapterBase
from content_automation.controller import PublishController
from content_automation.settings import AppSettings
class FakeStorage(StorageAdapterBase):
def __init__(
self, exists_result: bool, public_url: str = "file:///tmp/video.mp4"
) -> None:
self._exists_result = exists_result
self._public_url = public_url
def exists(self, relative_path: str) -> bool:
return self._exists_result
def get_public_url(self, relative_path: str) -> str:
return self._public_url
class FakeAdapter:
def __init__(self, adapter_name: str) -> None:
self.name = adapter_name
def post_media(self, media_url: str, caption: str) -> str:
return f"{self.name}-post-id"
def test_controller_publishes_to_all_configured_networks() -> None:
settings = AppSettings.model_validate(
{"target_social_networks": ["instagram", "youtube"]}
)
controller = PublishController(
settings=settings,
storage=FakeStorage(exists_result=True),
social_adapters={
"instagram": FakeAdapter("instagram"),
"youtube": FakeAdapter("youtube"),
},
)
result = controller.publish(relative_path="video.mp4", caption="hello")
assert result == {"instagram": "instagram-post-id", "youtube": "youtube-post-id"}
def test_controller_raises_when_file_missing() -> None:
settings = AppSettings.model_validate({"target_social_networks": ["youtube"]})
controller = PublishController(
settings=settings,
storage=FakeStorage(exists_result=False),
social_adapters={"youtube": FakeAdapter("youtube")},
)
with pytest.raises(FileNotFoundError):
controller.publish(relative_path="video.mp4", caption="hello")

View File

@@ -0,0 +1,66 @@
from __future__ import annotations
from content_automation.adapters.social.instagram import InstagramAdapter
def test_instagram_post_media_happy_path(monkeypatch) -> None:
create_call_kwargs: dict = {}
publish_call_kwargs: dict = {}
def fake_create_container(self, *args, **kwargs):
create_call_kwargs.update(kwargs)
return {"id": "creation-123"}
def fake_publish_container(self, *args, **kwargs):
publish_call_kwargs.update(kwargs)
return {"id": "published-456"}
monkeypatch.setattr(
"content_automation.adapters.social.instagram.InstagramGraphClient.create_container",
fake_create_container,
)
monkeypatch.setattr(
"content_automation.adapters.social.instagram.InstagramGraphClient.publish_container",
fake_publish_container,
)
adapter = InstagramAdapter(access_token="token", user_id="user-1")
post_id = adapter.post_media(
media_url="https://cdn.example.com/reel.mp4",
caption="hello instagram",
)
assert post_id == "published-456"
assert create_call_kwargs["user_id"] == "user-1"
assert create_call_kwargs["payload"].media_type == "REELS"
assert create_call_kwargs["payload"].video_url == "https://cdn.example.com/reel.mp4"
assert create_call_kwargs["payload"].caption == "hello instagram"
assert publish_call_kwargs["user_id"] == "user-1"
assert publish_call_kwargs["payload"].creation_id == "creation-123"
def test_instagram_post_media_falls_back_to_creation_id(monkeypatch) -> None:
def fake_create_container(self, *args, **kwargs):
return {"id": "creation-abc"}
def fake_publish_container(self, *args, **kwargs):
return {}
monkeypatch.setattr(
"content_automation.adapters.social.instagram.InstagramGraphClient.create_container",
fake_create_container,
)
monkeypatch.setattr(
"content_automation.adapters.social.instagram.InstagramGraphClient.publish_container",
fake_publish_container,
)
adapter = InstagramAdapter(access_token="token", user_id="user-1")
post_id = adapter.post_media(
media_url="https://cdn.example.com/reel.mp4",
caption="hello instagram",
)
assert post_id == "creation-abc"

19
tests/test_settings.py Normal file
View File

@@ -0,0 +1,19 @@
from __future__ import annotations
from content_automation.settings import AppSettings
def test_settings_parse_nested_env(monkeypatch) -> None:
monkeypatch.setenv("CONTENT_AUTOMATION_TARGET_SOCIAL_NETWORKS", '["youtube"]')
monkeypatch.setenv("CONTENT_AUTOMATION_YOUTUBE__ACCESS_TOKEN", "yt-token")
monkeypatch.setenv("CONTENT_AUTOMATION_YOUTUBE__USE_RESUMABLE_UPLOAD", "true")
monkeypatch.setenv("CONTENT_AUTOMATION_STORAGE__BACKEND", "s3")
monkeypatch.setenv("CONTENT_AUTOMATION_STORAGE__S3__BUCKET_NAME", "bucket-a")
settings = AppSettings()
assert settings.target_social_networks == ["youtube"]
assert settings.youtube.access_token == "yt-token"
assert settings.youtube.use_resumable_upload is True
assert settings.storage.backend == "s3"
assert settings.storage.s3.bucket_name == "bucket-a"

View File

@@ -0,0 +1,208 @@
from __future__ import annotations
import json
from datetime import UTC, datetime, timedelta
from pathlib import Path
from content_automation.adapters.social.youtube import (
YouTubeAdapter,
YouTubeDataApiClient,
YouTubeSnippet,
YouTubeStatus,
YouTubeVideoInsertPayload,
)
class FakeInsertRequest:
def __init__(self, response: dict[str, object]) -> None:
self._response = response
def execute(self) -> dict[str, object]:
return self._response
class FakeResumableRequest:
def __init__(self) -> None:
self._calls = 0
def next_chunk(self):
self._calls += 1
if self._calls == 1:
return object(), None
return None, {"id": "video-123"}
class FakeVideosResource:
def __init__(self, request) -> None:
self._request = request
def insert(self, part: str, body: dict, media_body=None):
return self._request
class FakeService:
def __init__(self, request) -> None:
self._videos = FakeVideosResource(request)
def videos(self) -> FakeVideosResource:
return self._videos
def test_resumable_upload_happy_path(monkeypatch, tmp_path: Path) -> None:
media_file = tmp_path / "clip.mp4"
media_file.write_bytes(b"abcdef")
monkeypatch.setattr(
"content_automation.adapters.social.youtube.build",
lambda *args, **kwargs: FakeService(FakeResumableRequest()),
)
monkeypatch.setattr(
"content_automation.adapters.social.youtube.MediaFileUpload",
lambda *args, **kwargs: object(),
)
adapter = YouTubeAdapter(
access_token="token",
use_resumable_upload=True,
resumable_chunk_size=3,
)
post_id = adapter.post_media(media_url=media_file.as_uri(), caption="caption")
assert post_id == "video-123"
def test_regular_upload_happy_path(monkeypatch, tmp_path: Path) -> None:
media_file = tmp_path / "clip.mp4"
media_file.write_bytes(b"abcdef")
monkeypatch.setattr(
"content_automation.adapters.social.youtube.build",
lambda *args, **kwargs: FakeService(
FakeInsertRequest({"id": "video-regular-123"})
),
)
monkeypatch.setattr(
"content_automation.adapters.social.youtube.MediaFileUpload",
lambda *args, **kwargs: object(),
)
adapter = YouTubeAdapter(
access_token="token",
use_resumable_upload=False,
)
post_id = adapter.post_media(media_url=media_file.as_uri(), caption="caption")
assert post_id == "video-regular-123"
def test_insert_video_happy_path_for_non_local_url(monkeypatch) -> None:
monkeypatch.setattr(
"content_automation.adapters.social.youtube.build",
lambda *args, **kwargs: FakeService(
FakeInsertRequest({"id": "video-insert-123"})
),
)
adapter = YouTubeAdapter(access_token="token")
post_id = adapter.post_media(
media_url="https://cdn.example.com/path/to/video.mp4", caption="caption"
)
assert post_id == "video-insert-123"
def test_client_refreshes_expired_token_before_request(monkeypatch) -> None:
refreshed_tokens: list[str] = []
def fake_refresh(self, request) -> None:
self.token = "new-token"
self.expiry = datetime.now(UTC).replace(tzinfo=None) + timedelta(minutes=30)
refreshed_tokens.append(self.token)
monkeypatch.setattr(
"content_automation.adapters.social.youtube.Credentials.refresh",
fake_refresh,
)
monkeypatch.setattr(
"content_automation.adapters.social.youtube.build",
lambda *args, **kwargs: FakeService(
FakeInsertRequest({"id": "video-refreshed"})
),
)
client = YouTubeDataApiClient(
access_token="expired-token",
category_id="22",
privacy_status="public",
refresh_token="refresh-token",
client_id="client-id",
client_secret="client-secret",
expiry="2024-01-01T00:00:00Z",
)
payload = YouTubeVideoInsertPayload(
snippet=YouTubeSnippet(
title="title",
description="description",
categoryId="22",
),
status=YouTubeStatus(privacyStatus="public"),
sourceUrl="https://cdn.example.com/video.mp4",
)
response = client.insert_video(part="snippet,status", payload=payload)
assert response["id"] == "video-refreshed"
assert refreshed_tokens == ["new-token"]
def test_obtain_credentials_from_client_secret_file(
monkeypatch, tmp_path: Path
) -> None:
captured: dict[str, object] = {}
class FakeCredentials:
def to_json(self) -> str:
return json.dumps(
{
"token": "token-123",
"refresh_token": "refresh-123",
"token_uri": "https://oauth2.googleapis.com/token",
"client_id": "client-123",
"client_secret": "secret-123",
"scopes": ["https://www.googleapis.com/auth/youtube.upload"],
}
)
class FakeFlow:
def run_local_server(self):
return FakeCredentials()
def fake_from_client_secrets_file(client_secret_file: str, scopes: list[str]):
captured["client_secret_file"] = client_secret_file
captured["scopes"] = scopes
return FakeFlow()
monkeypatch.setattr(
"content_automation.adapters.social.youtube.InstalledAppFlow.from_client_secrets_file",
fake_from_client_secrets_file,
)
client_secret_path = tmp_path / "client_secret.json"
token_output_path = tmp_path / "youtube_credentials.json"
credentials_payload = YouTubeAdapter.obtain_credentials_from_client_secret_file(
client_secret_file_path=client_secret_path,
scopes=["https://www.googleapis.com/auth/youtube.upload"],
token_output_path=token_output_path,
)
assert captured["client_secret_file"] == str(client_secret_path)
assert captured["scopes"] == ["https://www.googleapis.com/auth/youtube.upload"]
assert credentials_payload["token"] == "token-123"
assert token_output_path.exists()
assert (
json.loads(token_output_path.read_text(encoding="utf-8"))["token"]
== "token-123"
)