#!/usr/bin/env python3 """Run the full video pipeline: generate, merge, and concatenate.""" from __future__ import annotations import argparse import logging import os import subprocess import sys from pathlib import Path from src.logging_config import configure_logging, debug_log_lifecycle from src.s3_video_storage import S3VideoStorage PROJECT_ROOT = Path(__file__).resolve().parent SCRIPT_DIR = PROJECT_ROOT / "src" DEFAULT_BASE_DIR = PROJECT_ROOT DEFAULT_HUNYUAN_DIR = DEFAULT_BASE_DIR / "HunyuanVideo-1.5" DEFAULT_REEL_SCRIPT = DEFAULT_BASE_DIR / "reel_script.json" DEFAULT_IMAGES_DIR = DEFAULT_BASE_DIR / "images" DEFAULT_VIDEOS_DIR = DEFAULT_BASE_DIR / "videos" DEFAULT_AUDIOS_DIR = DEFAULT_BASE_DIR / "audios" DEFAULT_MERGED_DIR = DEFAULT_BASE_DIR / "merged" DEFAULT_OUTPUT = DEFAULT_BASE_DIR / "results" / "final_output.mp4" LOGGER = logging.getLogger(__name__) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--base-dir", type=Path, default=DEFAULT_BASE_DIR) parser.add_argument("--hunyuan-dir", type=Path, default=DEFAULT_HUNYUAN_DIR) parser.add_argument("--reel-script", type=Path, default=DEFAULT_REEL_SCRIPT) parser.add_argument("--images-dir", type=Path, default=DEFAULT_IMAGES_DIR) parser.add_argument("--videos-dir", type=Path, default=DEFAULT_VIDEOS_DIR) parser.add_argument("--audios-dir", type=Path, default=DEFAULT_AUDIOS_DIR) parser.add_argument("--merged-dir", type=Path, default=DEFAULT_MERGED_DIR) parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT) parser.add_argument("--seed", type=int, default=1) parser.add_argument("--skip-generate", action="store_true") parser.add_argument("--skip-audio-generate", action="store_true") parser.add_argument("--skip-merge", action="store_true") parser.add_argument("--skip-concat", action="store_true") parser.add_argument("--skip-s3-upload", action="store_true") parser.add_argument( "--log-level", default=None, help="Logging level (overrides LOG_LEVEL env var)", ) return parser.parse_args() @debug_log_lifecycle def run_step(name: str, cmd: list[str], cwd: Path | None = None) -> None: LOGGER.info("=== %s ===", name) LOGGER.info("$ %s", " ".join(str(part) for part in cmd)) if cwd is not None: LOGGER.info("(cwd: %s)", cwd) subprocess.run(cmd, check=True, cwd=str(cwd) if cwd else None) def _with_log_level(cmd: list[str], log_level: str | None) -> list[str]: if not log_level: return cmd return [*cmd, "--log-level", log_level] @debug_log_lifecycle def maybe_upload_to_s3(output_path: Path) -> None: bucket = os.getenv("AWS_S3_BUCKET") if not bucket: LOGGER.warning("Skipping S3 upload: AWS_S3_BUCKET is not set") return storage = S3VideoStorage( { "bucket_name": bucket, "region_name": os.getenv("AWS_REGION"), "endpoint_url": os.getenv("AWS_S3_ENDPOINT_URL"), "aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID"), "aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY"), "aws_session_token": os.getenv("AWS_SESSION_TOKEN"), } ) s3_uri = storage.store_file(output_path) LOGGER.info("Uploaded output to %s", s3_uri) @debug_log_lifecycle def main() -> int: args = parse_args() configure_logging(args.log_level) # If only base-dir is overridden, derive the common subpaths from it. if args.base_dir != DEFAULT_BASE_DIR: if args.hunyuan_dir == DEFAULT_HUNYUAN_DIR: args.hunyuan_dir = args.base_dir / "HunyuanVideo-1.5" if args.reel_script == DEFAULT_REEL_SCRIPT: args.reel_script = args.base_dir / "reel_script.json" if args.images_dir == DEFAULT_IMAGES_DIR: args.images_dir = args.base_dir / "images" if args.videos_dir == DEFAULT_VIDEOS_DIR: args.videos_dir = args.base_dir / "videos" if args.audios_dir == DEFAULT_AUDIOS_DIR: args.audios_dir = args.base_dir / "audios" if args.merged_dir == DEFAULT_MERGED_DIR: args.merged_dir = args.base_dir / "merged" if args.output == DEFAULT_OUTPUT: args.output = args.base_dir / "results" / "final_output.mp4" try: if not args.skip_generate and not args.reel_script.exists(): run_step( "Generate Reel Script", _with_log_level([ sys.executable, str(SCRIPT_DIR / "generate_script.py"), ], args.log_level), cwd=args.base_dir, ) if not args.reel_script.exists(): LOGGER.error("Reel script was not generated at %s", args.reel_script) return 1 if not args.skip_generate and not args.skip_audio_generate: run_step( "Generate Audios", _with_log_level([ sys.executable, str(SCRIPT_DIR / "generate_audios.py"), ], args.log_level), cwd=args.base_dir, ) if not args.skip_generate: run_step( "Generate Images", _with_log_level([ sys.executable, str(SCRIPT_DIR / "generate_images.py"), ], args.log_level), cwd=args.base_dir, ) if not args.skip_generate: run_step( "Generate Videos", _with_log_level([ sys.executable, str(SCRIPT_DIR / "generate_videos.py"), "--hunyuan-dir", str(args.hunyuan_dir), "--reel-script", str(args.reel_script), "--images-dir", str(args.images_dir), "--videos-dir", str(args.videos_dir), "--audios-dir", str(args.audios_dir), "--seed", str(args.seed), ], args.log_level), ) if not args.skip_merge: merge_cmd = [ sys.executable, str(SCRIPT_DIR / "merge_audio_video.py"), "--videos-dir", str(args.videos_dir), "--audios-dir", str(args.audios_dir), "--output-dir", str(args.merged_dir), ] if args.skip_audio_generate: merge_cmd.append("--allow-missing-audio") run_step( "Merge Audio + Video", _with_log_level(merge_cmd, args.log_level), ) if not args.skip_concat: run_step( "Concatenate Merged Videos", _with_log_level([ sys.executable, str(SCRIPT_DIR / "concat_merged.py"), "--merged-dir", str(args.merged_dir), "--output", str(args.output), ], args.log_level), ) except subprocess.CalledProcessError as exc: LOGGER.exception("Pipeline failed at command: %s", exc.cmd) return exc.returncode if not args.skip_s3_upload: try: maybe_upload_to_s3(args.output) except Exception: LOGGER.exception("Failed uploading output to S3") return 1 LOGGER.info("Pipeline complete") LOGGER.info("Final output: %s", args.output) return 0 if __name__ == "__main__": raise SystemExit(main())