1
0

Refactor src layout and add logging lifecycle + tests

This commit is contained in:
2026-04-02 12:32:02 +02:00
parent e3c2b9ddee
commit a0a66264d2
13 changed files with 172 additions and 202 deletions

View File

@@ -43,7 +43,7 @@ docker run --rm --gpus all --env-file .env -v "$(pwd)":/app -w /app content-gene
## Project Layout
- `run_video_pipeline.py`: main entrypoint.
- `src/scripts/`: helper scripts used by the pipeline.
- `src/`: helper scripts used by the pipeline.
- `HunyuanVideo-1.5/`: Hunyuan inference code and model dependencies.
- `reel_script.json`: required script input with `shots`.
- `images/`, `audios/`, `videos/`, `merged/`, `results/`: working/output folders.
@@ -198,5 +198,5 @@ docker run --rm --gpus all \
8. Verify syntax quickly before running.
```bash
python3 -m py_compile run_video_pipeline.py src/scripts/*.py
python3 -m py_compile run_video_pipeline.py src/*.py
```

View File

@@ -10,12 +10,12 @@ import subprocess
import sys
from pathlib import Path
from src.scripts.logging_config import configure_logging
from src.scripts.s3_video_storage import S3VideoStorage
from src.logging_config import configure_logging, debug_log_lifecycle
from src.s3_video_storage import S3VideoStorage
PROJECT_ROOT = Path(__file__).resolve().parent
SCRIPT_DIR = PROJECT_ROOT / "src" / "scripts"
SCRIPT_DIR = PROJECT_ROOT / "src"
DEFAULT_BASE_DIR = PROJECT_ROOT
DEFAULT_HUNYUAN_DIR = DEFAULT_BASE_DIR / "HunyuanVideo-1.5"
DEFAULT_REEL_SCRIPT = DEFAULT_BASE_DIR / "reel_script.json"
@@ -43,10 +43,15 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--skip-merge", action="store_true")
parser.add_argument("--skip-concat", action="store_true")
parser.add_argument("--skip-s3-upload", action="store_true")
parser.add_argument("--log-level", default="INFO")
parser.add_argument(
"--log-level",
default=None,
help="Logging level (overrides LOG_LEVEL env var)",
)
return parser.parse_args()
@debug_log_lifecycle
def run_step(name: str, cmd: list[str], cwd: Path | None = None) -> None:
LOGGER.info("=== %s ===", name)
LOGGER.info("$ %s", " ".join(str(part) for part in cmd))
@@ -55,6 +60,13 @@ def run_step(name: str, cmd: list[str], cwd: Path | None = None) -> None:
subprocess.run(cmd, check=True, cwd=str(cwd) if cwd else None)
def _with_log_level(cmd: list[str], log_level: str | None) -> list[str]:
if not log_level:
return cmd
return [*cmd, "--log-level", log_level]
@debug_log_lifecycle
def maybe_upload_to_s3(output_path: Path) -> None:
bucket = os.getenv("AWS_S3_BUCKET")
if not bucket:
@@ -75,6 +87,7 @@ def maybe_upload_to_s3(output_path: Path) -> None:
LOGGER.info("Uploaded output to %s", s3_uri)
@debug_log_lifecycle
def main() -> int:
args = parse_args()
configure_logging(args.log_level)
@@ -100,10 +113,10 @@ def main() -> int:
if not args.skip_generate and not args.reel_script.exists():
run_step(
"Generate Reel Script",
[
_with_log_level([
sys.executable,
str(SCRIPT_DIR / "generate_script.py"),
],
], args.log_level),
cwd=args.base_dir,
)
if not args.reel_script.exists():
@@ -113,7 +126,7 @@ def main() -> int:
if not args.skip_generate:
run_step(
"Generate Videos",
[
_with_log_level([
sys.executable,
str(SCRIPT_DIR / "generate_videos.py"),
"--hunyuan-dir",
@@ -128,13 +141,13 @@ def main() -> int:
str(args.audios_dir),
"--seed",
str(args.seed),
],
], args.log_level),
)
if not args.skip_merge:
run_step(
"Merge Audio + Video",
[
_with_log_level([
sys.executable,
str(SCRIPT_DIR / "merge_audio_video.py"),
"--videos-dir",
@@ -143,20 +156,20 @@ def main() -> int:
str(args.audios_dir),
"--output-dir",
str(args.merged_dir),
],
], args.log_level),
)
if not args.skip_concat:
run_step(
"Concatenate Merged Videos",
[
_with_log_level([
sys.executable,
str(SCRIPT_DIR / "concat_merged.py"),
"--merged-dir",
str(args.merged_dir),
"--output",
str(args.output),
],
], args.log_level),
)
except subprocess.CalledProcessError as exc:
LOGGER.exception("Pipeline failed at command: %s", exc.cmd)

View File

@@ -10,7 +10,7 @@ import subprocess
import tempfile
from pathlib import Path
from logging_config import configure_logging
from logging_config import configure_logging, debug_log_lifecycle
SCRIPT_DIR = Path(__file__).resolve().parent
@@ -30,10 +30,15 @@ def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--merged-dir", type=Path, default=DEFAULT_MERGED_DIR)
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT)
parser.add_argument("--log-level", default="INFO")
parser.add_argument(
"--log-level",
default=None,
help="Logging level (overrides LOG_LEVEL env var)",
)
return parser.parse_args()
@debug_log_lifecycle
def main() -> int:
args = parse_args()
configure_logging(args.log_level)

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import argparse
import json
import logging
import os
@@ -7,7 +8,7 @@ from pathlib import Path
from dotenv import load_dotenv
from elevenlabs.client import ElevenLabs
from logging_config import configure_logging
from logging_config import configure_logging, debug_log_lifecycle
SCRIPT_DIR = Path(__file__).resolve().parent
@@ -18,8 +19,20 @@ load_dotenv(PROJECT_ROOT / ".env")
LOGGER = logging.getLogger(__name__)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--log-level",
default=None,
help="Logging level (overrides LOG_LEVEL env var)",
)
return parser.parse_args()
@debug_log_lifecycle
def main() -> int:
configure_logging("INFO")
args = parse_args()
configure_logging(args.log_level)
api_key = os.getenv("ELEVENLABS_API_KEY")
if not api_key:
raise RuntimeError("ELEVENLABS_API_KEY is not set")

View File

@@ -1,12 +1,13 @@
from __future__ import annotations
import argparse
import json
import logging
from pathlib import Path
import torch
from diffusers import FluxPipeline
from logging_config import configure_logging
from logging_config import configure_logging, debug_log_lifecycle
SCRIPT_DIR = Path(__file__).resolve().parent
@@ -15,8 +16,20 @@ PROJECT_ROOT = SCRIPT_DIR.parents[1]
LOGGER = logging.getLogger(__name__)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--log-level",
default=None,
help="Logging level (overrides LOG_LEVEL env var)",
)
return parser.parse_args()
@debug_log_lifecycle
def main() -> int:
configure_logging("INFO")
args = parse_args()
configure_logging(args.log_level)
reel_script = PROJECT_ROOT / "reel_script.json"
images_dir = PROJECT_ROOT / "images"
images_dir.mkdir(parents=True, exist_ok=True)

View File

@@ -1,3 +1,4 @@
import argparse
import torch
import json
import logging
@@ -5,7 +6,7 @@ from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import re
from typing import Optional
from logging_config import configure_logging
from logging_config import configure_logging, debug_log_lifecycle
LOGGER = logging.getLogger(__name__)
@@ -19,6 +20,17 @@ MAX_VOICEOVER_WORDS = int(MAX_VOICEOVER_SECONDS * WORDS_PER_SECOND)
MIN_VOICEOVER_WORDS = 5
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--log-level",
default=None,
help="Logging level (overrides LOG_LEVEL env var)",
)
return parser.parse_args()
@debug_log_lifecycle
def load_model(model_id: str = MODEL_ID):
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
bnb_config = BitsAndBytesConfig(
@@ -37,6 +49,7 @@ def load_model(model_id: str = MODEL_ID):
return model, tokenizer
@debug_log_lifecycle
def generate_reel_scenario(
model,
tokenizer,
@@ -231,6 +244,7 @@ def extract_field(label: str, next_label: Optional[str], text: str) -> str:
return ""
@debug_log_lifecycle
def parse_reel_scenario(raw_scenario: str) -> dict:
"""
Parse the shot-by-shot reel scenario into a structured dict.
@@ -336,7 +350,8 @@ def parse_reel_scenario(raw_scenario: str) -> dict:
if __name__ == '__main__':
configure_logging("INFO")
args = parse_args()
configure_logging(args.log_level)
with open("topic_description.txt", "r") as f:
topic = f.read()

View File

@@ -10,7 +10,7 @@ import os
import subprocess
from pathlib import Path
from logging_config import configure_logging
from logging_config import configure_logging, debug_log_lifecycle
SCRIPT_DIR = Path(__file__).resolve().parent
@@ -32,10 +32,15 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--videos-dir", type=Path, default=DEFAULT_VIDEOS_DIR)
parser.add_argument("--audios-dir", type=Path, default=DEFAULT_AUDIOS_DIR)
parser.add_argument("--seed", type=int, default=1)
parser.add_argument("--log-level", default="INFO")
parser.add_argument(
"--log-level",
default=None,
help="Logging level (overrides LOG_LEVEL env var)",
)
return parser.parse_args()
@debug_log_lifecycle
def get_audio_duration(audio_path: Path) -> float:
result = subprocess.run(
[
@@ -55,6 +60,7 @@ def get_audio_duration(audio_path: Path) -> float:
return float(result.stdout.strip())
@debug_log_lifecycle
def duration_to_video_length(duration: float) -> int:
frames = int(duration * 24) + 1
if frames % 2 == 0:
@@ -62,6 +68,7 @@ def duration_to_video_length(duration: float) -> int:
return max(49, min(frames, 169))
@debug_log_lifecycle
def main() -> int:
args = parse_args()
configure_logging(args.log_level)

50
src/logging_config.py Normal file
View File

@@ -0,0 +1,50 @@
from __future__ import annotations
import functools
import logging
import os
from collections.abc import Callable
from typing import Any, TypeVar
DEFAULT_LOG_FORMAT = "%(asctime)s | %(levelname)s | %(name)s | %(message)s"
DEFAULT_LOG_LEVEL = "INFO"
LOG_LEVEL_ENV_VAR = "LOG_LEVEL"
F = TypeVar("F", bound=Callable[..., Any])
def resolve_log_level(
cli_level: str | None,
*,
default_level: str = DEFAULT_LOG_LEVEL,
env_var: str = LOG_LEVEL_ENV_VAR,
) -> str:
level = default_level
env_level = os.getenv(env_var)
if env_level:
level = env_level
if cli_level:
level = cli_level
return level
def configure_logging(level: str | None = None, *, default_level: str = DEFAULT_LOG_LEVEL) -> None:
resolved = resolve_log_level(level, default_level=default_level)
logging.basicConfig(
level=getattr(logging, resolved.upper(), logging.INFO),
format=DEFAULT_LOG_FORMAT,
)
def debug_log_lifecycle(func: F) -> F:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
logger = logging.getLogger(func.__module__)
logger.debug("Start %s", func.__qualname__)
try:
return func(*args, **kwargs)
finally:
logger.debug("End %s", func.__qualname__)
return wrapper # type: ignore[return-value]

View File

@@ -9,7 +9,7 @@ import re
import subprocess
from pathlib import Path
from logging_config import configure_logging
from logging_config import configure_logging, debug_log_lifecycle
SCRIPT_DIR = Path(__file__).resolve().parent
@@ -31,10 +31,15 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--videos-dir", type=Path, default=DEFAULT_VIDEOS_DIR)
parser.add_argument("--audios-dir", type=Path, default=DEFAULT_AUDIOS_DIR)
parser.add_argument("--output-dir", type=Path, default=DEFAULT_OUTPUT_DIR)
parser.add_argument("--log-level", default="INFO")
parser.add_argument(
"--log-level",
default=None,
help="Logging level (overrides LOG_LEVEL env var)",
)
return parser.parse_args()
@debug_log_lifecycle
def main() -> int:
args = parse_args()
configure_logging(args.log_level)

View File

@@ -1,13 +0,0 @@
from __future__ import annotations
import logging
DEFAULT_LOG_FORMAT = "%(asctime)s | %(levelname)s | %(name)s | %(message)s"
def configure_logging(level: str = "INFO") -> None:
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format=DEFAULT_LOG_FORMAT,
)

View File

@@ -1,163 +0,0 @@
#!/usr/bin/env python3
"""Run the full video pipeline: generate, merge, and concatenate."""
from __future__ import annotations
import argparse
import logging
import os
import subprocess
import sys
from pathlib import Path
from logging_config import configure_logging
from s3_video_storage import S3VideoStorage
SCRIPT_DIR = Path(__file__).resolve().parent
PROJECT_ROOT = SCRIPT_DIR.parents[1]
DEFAULT_BASE_DIR = PROJECT_ROOT
DEFAULT_HUNYUAN_DIR = DEFAULT_BASE_DIR / "HunyuanVideo-1.5"
DEFAULT_REEL_SCRIPT = DEFAULT_BASE_DIR / "reel_script.json"
DEFAULT_IMAGES_DIR = DEFAULT_BASE_DIR / "images"
DEFAULT_VIDEOS_DIR = DEFAULT_BASE_DIR / "videos"
DEFAULT_AUDIOS_DIR = DEFAULT_BASE_DIR / "audios"
DEFAULT_MERGED_DIR = DEFAULT_BASE_DIR / "merged"
DEFAULT_OUTPUT = DEFAULT_BASE_DIR / "results" / "final_output.mp4"
LOGGER = logging.getLogger(__name__)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--base-dir", type=Path, default=DEFAULT_BASE_DIR)
parser.add_argument("--hunyuan-dir", type=Path, default=DEFAULT_HUNYUAN_DIR)
parser.add_argument("--reel-script", type=Path, default=DEFAULT_REEL_SCRIPT)
parser.add_argument("--images-dir", type=Path, default=DEFAULT_IMAGES_DIR)
parser.add_argument("--videos-dir", type=Path, default=DEFAULT_VIDEOS_DIR)
parser.add_argument("--audios-dir", type=Path, default=DEFAULT_AUDIOS_DIR)
parser.add_argument("--merged-dir", type=Path, default=DEFAULT_MERGED_DIR)
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT)
parser.add_argument("--seed", type=int, default=1)
parser.add_argument("--skip-generate", action="store_true")
parser.add_argument("--skip-merge", action="store_true")
parser.add_argument("--skip-concat", action="store_true")
parser.add_argument("--skip-s3-upload", action="store_true")
parser.add_argument("--log-level", default="INFO")
return parser.parse_args()
def run_step(name: str, cmd: list[str]) -> None:
LOGGER.info("=== %s ===", name)
LOGGER.info("$ %s", " ".join(str(part) for part in cmd))
subprocess.run(cmd, check=True)
def maybe_upload_to_s3(output_path: Path) -> None:
bucket = os.getenv("AWS_S3_BUCKET")
if not bucket:
LOGGER.warning("Skipping S3 upload: AWS_S3_BUCKET is not set")
return
storage = S3VideoStorage(
{
"bucket_name": bucket,
"region_name": os.getenv("AWS_REGION"),
"endpoint_url": os.getenv("AWS_S3_ENDPOINT_URL"),
"aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID"),
"aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY"),
"aws_session_token": os.getenv("AWS_SESSION_TOKEN"),
}
)
s3_uri = storage.store_file(output_path)
LOGGER.info("Uploaded output to %s", s3_uri)
def main() -> int:
args = parse_args()
configure_logging(args.log_level)
# If only base-dir is overridden, derive the common subpaths from it.
if args.base_dir != DEFAULT_BASE_DIR:
if args.hunyuan_dir == DEFAULT_HUNYUAN_DIR:
args.hunyuan_dir = args.base_dir / "HunyuanVideo-1.5"
if args.reel_script == DEFAULT_REEL_SCRIPT:
args.reel_script = args.base_dir / "reel_script.json"
if args.images_dir == DEFAULT_IMAGES_DIR:
args.images_dir = args.base_dir / "images"
if args.videos_dir == DEFAULT_VIDEOS_DIR:
args.videos_dir = args.base_dir / "videos"
if args.audios_dir == DEFAULT_AUDIOS_DIR:
args.audios_dir = args.base_dir / "audios"
if args.merged_dir == DEFAULT_MERGED_DIR:
args.merged_dir = args.base_dir / "merged"
if args.output == DEFAULT_OUTPUT:
args.output = args.base_dir / "results" / "final_output.mp4"
try:
if not args.skip_generate:
run_step(
"Generate Videos",
[
sys.executable,
str(SCRIPT_DIR / "generate_videos.py"),
"--hunyuan-dir",
str(args.hunyuan_dir),
"--reel-script",
str(args.reel_script),
"--images-dir",
str(args.images_dir),
"--videos-dir",
str(args.videos_dir),
"--audios-dir",
str(args.audios_dir),
"--seed",
str(args.seed),
],
)
if not args.skip_merge:
run_step(
"Merge Audio + Video",
[
sys.executable,
str(SCRIPT_DIR / "merge_audio_video.py"),
"--videos-dir",
str(args.videos_dir),
"--audios-dir",
str(args.audios_dir),
"--output-dir",
str(args.merged_dir),
],
)
if not args.skip_concat:
run_step(
"Concatenate Merged Videos",
[
sys.executable,
str(SCRIPT_DIR / "concat_merged.py"),
"--merged-dir",
str(args.merged_dir),
"--output",
str(args.output),
],
)
except subprocess.CalledProcessError as exc:
LOGGER.exception("Pipeline failed at command: %s", exc.cmd)
return exc.returncode
if not args.skip_s3_upload:
try:
maybe_upload_to_s3(args.output)
except Exception:
LOGGER.exception("Failed uploading output to S3")
return 1
LOGGER.info("Pipeline complete")
LOGGER.info("Final output: %s", args.output)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,25 @@
from __future__ import annotations
import logging
import unittest
from src.logging_config import debug_log_lifecycle
class TestDebugLogLifecycle(unittest.TestCase):
def test_logs_function_start_and_end(self) -> None:
@debug_log_lifecycle
def sample(a: int, b: int) -> int:
return a + b
with self.assertLogs(sample.__module__, level="DEBUG") as captured:
result = sample(2, 3)
self.assertEqual(result, 5)
joined = "\n".join(captured.output)
self.assertIn("Start TestDebugLogLifecycle.test_logs_function_start_and_end.<locals>.sample", joined)
self.assertIn("End TestDebugLogLifecycle.test_logs_function_start_and_end.<locals>.sample", joined)
if __name__ == "__main__":
unittest.main()