#!/usr/bin/env python3 """Run the full video pipeline: generate, merge, and concatenate.""" from __future__ import annotations import argparse import logging import os import subprocess import sys from pathlib import Path from src.scripts.logging_config import configure_logging from src.scripts.s3_video_storage import S3VideoStorage PROJECT_ROOT = Path(__file__).resolve().parent SCRIPT_DIR = PROJECT_ROOT / "src" / "scripts" DEFAULT_BASE_DIR = PROJECT_ROOT DEFAULT_HUNYUAN_DIR = DEFAULT_BASE_DIR / "HunyuanVideo-1.5" DEFAULT_REEL_SCRIPT = DEFAULT_BASE_DIR / "reel_script.json" DEFAULT_IMAGES_DIR = DEFAULT_BASE_DIR / "images" DEFAULT_VIDEOS_DIR = DEFAULT_BASE_DIR / "videos" DEFAULT_AUDIOS_DIR = DEFAULT_BASE_DIR / "audios" DEFAULT_MERGED_DIR = DEFAULT_BASE_DIR / "merged" DEFAULT_OUTPUT = DEFAULT_BASE_DIR / "results" / "final_output.mp4" LOGGER = logging.getLogger(__name__) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--base-dir", type=Path, default=DEFAULT_BASE_DIR) parser.add_argument("--hunyuan-dir", type=Path, default=DEFAULT_HUNYUAN_DIR) parser.add_argument("--reel-script", type=Path, default=DEFAULT_REEL_SCRIPT) parser.add_argument("--images-dir", type=Path, default=DEFAULT_IMAGES_DIR) parser.add_argument("--videos-dir", type=Path, default=DEFAULT_VIDEOS_DIR) parser.add_argument("--audios-dir", type=Path, default=DEFAULT_AUDIOS_DIR) parser.add_argument("--merged-dir", type=Path, default=DEFAULT_MERGED_DIR) parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT) parser.add_argument("--seed", type=int, default=1) parser.add_argument("--skip-generate", action="store_true") parser.add_argument("--skip-merge", action="store_true") parser.add_argument("--skip-concat", action="store_true") parser.add_argument("--skip-s3-upload", action="store_true") parser.add_argument("--log-level", default="INFO") return parser.parse_args() def run_step(name: str, cmd: list[str], cwd: Path | None = None) -> None: LOGGER.info("=== %s ===", name) LOGGER.info("$ %s", " ".join(str(part) for part in cmd)) if cwd is not None: LOGGER.info("(cwd: %s)", cwd) subprocess.run(cmd, check=True, cwd=str(cwd) if cwd else None) def maybe_upload_to_s3(output_path: Path) -> None: bucket = os.getenv("AWS_S3_BUCKET") if not bucket: LOGGER.warning("Skipping S3 upload: AWS_S3_BUCKET is not set") return storage = S3VideoStorage( { "bucket_name": bucket, "region_name": os.getenv("AWS_REGION"), "endpoint_url": os.getenv("AWS_S3_ENDPOINT_URL"), "aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID"), "aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY"), "aws_session_token": os.getenv("AWS_SESSION_TOKEN"), } ) s3_uri = storage.store_file(output_path) LOGGER.info("Uploaded output to %s", s3_uri) def main() -> int: args = parse_args() configure_logging(args.log_level) # If only base-dir is overridden, derive the common subpaths from it. if args.base_dir != DEFAULT_BASE_DIR: if args.hunyuan_dir == DEFAULT_HUNYUAN_DIR: args.hunyuan_dir = args.base_dir / "HunyuanVideo-1.5" if args.reel_script == DEFAULT_REEL_SCRIPT: args.reel_script = args.base_dir / "reel_script.json" if args.images_dir == DEFAULT_IMAGES_DIR: args.images_dir = args.base_dir / "images" if args.videos_dir == DEFAULT_VIDEOS_DIR: args.videos_dir = args.base_dir / "videos" if args.audios_dir == DEFAULT_AUDIOS_DIR: args.audios_dir = args.base_dir / "audios" if args.merged_dir == DEFAULT_MERGED_DIR: args.merged_dir = args.base_dir / "merged" if args.output == DEFAULT_OUTPUT: args.output = args.base_dir / "results" / "final_output.mp4" try: if not args.skip_generate and not args.reel_script.exists(): run_step( "Generate Reel Script", [ sys.executable, str(SCRIPT_DIR / "generate_script.py"), ], cwd=args.base_dir, ) if not args.reel_script.exists(): LOGGER.error("Reel script was not generated at %s", args.reel_script) return 1 if not args.skip_generate: run_step( "Generate Videos", [ sys.executable, str(SCRIPT_DIR / "generate_videos.py"), "--hunyuan-dir", str(args.hunyuan_dir), "--reel-script", str(args.reel_script), "--images-dir", str(args.images_dir), "--videos-dir", str(args.videos_dir), "--audios-dir", str(args.audios_dir), "--seed", str(args.seed), ], ) if not args.skip_merge: run_step( "Merge Audio + Video", [ sys.executable, str(SCRIPT_DIR / "merge_audio_video.py"), "--videos-dir", str(args.videos_dir), "--audios-dir", str(args.audios_dir), "--output-dir", str(args.merged_dir), ], ) if not args.skip_concat: run_step( "Concatenate Merged Videos", [ sys.executable, str(SCRIPT_DIR / "concat_merged.py"), "--merged-dir", str(args.merged_dir), "--output", str(args.output), ], ) except subprocess.CalledProcessError as exc: LOGGER.exception("Pipeline failed at command: %s", exc.cmd) return exc.returncode if not args.skip_s3_upload: try: maybe_upload_to_s3(args.output) except Exception: LOGGER.exception("Failed uploading output to S3") return 1 LOGGER.info("Pipeline complete") LOGGER.info("Final output: %s", args.output) return 0 if __name__ == "__main__": raise SystemExit(main())