forked from LiveCarta/ContentGeneration
Migrate to uv sync and pytest coverage workflow
This commit is contained in:
@@ -1,25 +1,18 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import unittest
|
||||
|
||||
from src.logging_config import debug_log_lifecycle
|
||||
|
||||
|
||||
class TestDebugLogLifecycle(unittest.TestCase):
|
||||
def test_logs_function_start_and_end(self) -> None:
|
||||
@debug_log_lifecycle
|
||||
def sample(a: int, b: int) -> int:
|
||||
return a + b
|
||||
def test_logs_function_start_and_end(caplog) -> None:
|
||||
@debug_log_lifecycle
|
||||
def sample(a: int, b: int) -> int:
|
||||
return a + b
|
||||
|
||||
with self.assertLogs(sample.__module__, level="DEBUG") as captured:
|
||||
result = sample(2, 3)
|
||||
with caplog.at_level(logging.DEBUG, logger=sample.__module__):
|
||||
result = sample(2, 3)
|
||||
|
||||
self.assertEqual(result, 5)
|
||||
joined = "\n".join(captured.output)
|
||||
self.assertIn("Start TestDebugLogLifecycle.test_logs_function_start_and_end.<locals>.sample", joined)
|
||||
self.assertIn("End TestDebugLogLifecycle.test_logs_function_start_and_end.<locals>.sample", joined)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
assert result == 5
|
||||
assert "Start test_logs_function_start_and_end.<locals>.sample" in caplog.text
|
||||
assert "End test_logs_function_start_and_end.<locals>.sample" in caplog.text
|
||||
|
||||
109
tests/test_pipeline_full_process.py
Normal file
109
tests/test_pipeline_full_process.py
Normal file
@@ -0,0 +1,109 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
import tempfile
|
||||
from argparse import Namespace
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
# Avoid requiring boto3 for orchestration tests.
|
||||
if "boto3" not in sys.modules:
|
||||
sys.modules["boto3"] = SimpleNamespace(client=lambda *args, **kwargs: object())
|
||||
|
||||
import run_video_pipeline as pipeline
|
||||
|
||||
|
||||
def test_full_generation_process_calls_all_scripts(monkeypatch) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
base_dir = Path(tmpdir)
|
||||
hunyuan_dir = base_dir / "HunyuanVideo-1.5"
|
||||
images_dir = base_dir / "images"
|
||||
videos_dir = base_dir / "videos"
|
||||
audios_dir = base_dir / "audios"
|
||||
merged_dir = base_dir / "merged"
|
||||
output_path = base_dir / "results" / "final_output.mp4"
|
||||
reel_script = base_dir / "reel_script.json"
|
||||
|
||||
hunyuan_dir.mkdir(parents=True)
|
||||
(base_dir / "topic_description.txt").write_text("Test topic")
|
||||
|
||||
args = Namespace(
|
||||
base_dir=base_dir,
|
||||
hunyuan_dir=hunyuan_dir,
|
||||
reel_script=reel_script,
|
||||
images_dir=images_dir,
|
||||
videos_dir=videos_dir,
|
||||
audios_dir=audios_dir,
|
||||
merged_dir=merged_dir,
|
||||
output=output_path,
|
||||
seed=1,
|
||||
skip_generate=False,
|
||||
skip_audio_generate=False,
|
||||
skip_merge=False,
|
||||
skip_concat=False,
|
||||
skip_s3_upload=True,
|
||||
log_level="DEBUG",
|
||||
)
|
||||
|
||||
executed_scripts: list[str] = []
|
||||
|
||||
expected_scripts = [
|
||||
"generate_script.py",
|
||||
"generate_audios.py",
|
||||
"generate_images.py",
|
||||
"generate_videos.py",
|
||||
"merge_audio_video.py",
|
||||
"concat_merged.py",
|
||||
]
|
||||
|
||||
def fake_subprocess_run(cmd: list[str], check: bool, cwd: str | None = None):
|
||||
script_name = Path(cmd[1]).name if len(cmd) > 1 else ""
|
||||
if script_name not in expected_scripts:
|
||||
pytest.fail(f"Unexpected external process call: {cmd}")
|
||||
|
||||
executed_scripts.append(script_name)
|
||||
|
||||
if script_name == "generate_script.py":
|
||||
payload = {
|
||||
"shots": [
|
||||
{
|
||||
"shot_number": 1,
|
||||
"image_description": "A test image",
|
||||
"voiceover": "A test voiceover",
|
||||
}
|
||||
]
|
||||
}
|
||||
reel_script.write_text(json.dumps(payload))
|
||||
elif script_name == "generate_audios.py":
|
||||
audios_dir.mkdir(parents=True, exist_ok=True)
|
||||
(audios_dir / "output_1.mp3").write_bytes(b"audio")
|
||||
elif script_name == "generate_images.py":
|
||||
images_dir.mkdir(parents=True, exist_ok=True)
|
||||
(images_dir / "shot_1.png").write_bytes(b"image")
|
||||
elif script_name == "generate_videos.py":
|
||||
videos_dir.mkdir(parents=True, exist_ok=True)
|
||||
(videos_dir / "output_1.mp4").write_bytes(b"video")
|
||||
elif script_name == "merge_audio_video.py":
|
||||
merged_dir.mkdir(parents=True, exist_ok=True)
|
||||
(merged_dir / "merged_1.mp4").write_bytes(b"merged")
|
||||
elif script_name == "concat_merged.py":
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
output_path.write_bytes(b"final")
|
||||
|
||||
class Result:
|
||||
returncode = 0
|
||||
|
||||
return Result()
|
||||
|
||||
monkeypatch.setattr(pipeline, "parse_args", lambda: args)
|
||||
monkeypatch.setattr(pipeline.subprocess, "run", fake_subprocess_run)
|
||||
|
||||
rc = pipeline.main()
|
||||
|
||||
assert rc == 0
|
||||
assert output_path.exists()
|
||||
# Coverage check for orchestration: ensure every required script stage was called.
|
||||
assert executed_scripts == expected_scripts
|
||||
Reference in New Issue
Block a user