#!/usr/bin/env python3 """Run the full video pipeline: generate, merge, and concatenate.""" from __future__ import annotations import argparse import logging import os from pathlib import Path from typing import Callable from src import concat_merged, generate_audios, generate_images, generate_script, generate_videos, merge_audio_video from src.logging_config import configure_logging, debug_log_lifecycle from src.s3_video_storage import S3VideoStorage PROJECT_ROOT = Path(__file__).resolve().parent DEFAULT_BASE_DIR = PROJECT_ROOT DEFAULT_HUNYUAN_DIR = DEFAULT_BASE_DIR / "HunyuanVideo-1.5" DEFAULT_REEL_SCRIPT = DEFAULT_BASE_DIR / "reel_script.json" DEFAULT_IMAGES_DIR = DEFAULT_BASE_DIR / "images" DEFAULT_VIDEOS_DIR = DEFAULT_BASE_DIR / "videos" DEFAULT_AUDIOS_DIR = DEFAULT_BASE_DIR / "audios" DEFAULT_MERGED_DIR = DEFAULT_BASE_DIR / "merged" DEFAULT_OUTPUT = DEFAULT_BASE_DIR / "results" / "final_output.mp4" LOGGER = logging.getLogger(__name__) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--base-dir", type=Path, default=DEFAULT_BASE_DIR) parser.add_argument("--hunyuan-dir", type=Path, default=DEFAULT_HUNYUAN_DIR) parser.add_argument("--reel-script", type=Path, default=DEFAULT_REEL_SCRIPT) parser.add_argument("--images-dir", type=Path, default=DEFAULT_IMAGES_DIR) parser.add_argument("--videos-dir", type=Path, default=DEFAULT_VIDEOS_DIR) parser.add_argument("--audios-dir", type=Path, default=DEFAULT_AUDIOS_DIR) parser.add_argument("--merged-dir", type=Path, default=DEFAULT_MERGED_DIR) parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT) parser.add_argument("--seed", type=int, default=1) parser.add_argument("--skip-generate", action="store_true") parser.add_argument("--skip-audio-generate", action="store_true") parser.add_argument("--skip-merge", action="store_true") parser.add_argument("--skip-concat", action="store_true") parser.add_argument("--skip-s3-upload", action="store_true") parser.add_argument( "--log-level", default=None, help="Logging level (overrides LOG_LEVEL env var)", ) return parser.parse_args() @debug_log_lifecycle def run_step(name: str, step: Callable[[], int]) -> None: LOGGER.info("=== %s ===", name) rc = step() if rc != 0: raise RuntimeError(f"Step '{name}' failed with exit code {rc}") def _with_log_level(argv: list[str], log_level: str | None) -> list[str]: if not log_level: return argv return [*argv, "--log-level", log_level] @debug_log_lifecycle def maybe_upload_to_s3(output_path: Path) -> None: bucket = os.getenv("AWS_S3_BUCKET") if not bucket: LOGGER.warning("Skipping S3 upload: AWS_S3_BUCKET is not set") return storage = S3VideoStorage( { "bucket_name": bucket, "region_name": os.getenv("AWS_REGION"), "endpoint_url": os.getenv("AWS_S3_ENDPOINT_URL"), "aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID"), "aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY"), "aws_session_token": os.getenv("AWS_SESSION_TOKEN"), } ) s3_uri = storage.store_file(output_path) LOGGER.info("Uploaded output to %s", s3_uri) @debug_log_lifecycle def main() -> int: args = parse_args() configure_logging(args.log_level) # If only base-dir is overridden, derive the common subpaths from it. if args.base_dir != DEFAULT_BASE_DIR: if args.hunyuan_dir == DEFAULT_HUNYUAN_DIR: args.hunyuan_dir = args.base_dir / "HunyuanVideo-1.5" if args.reel_script == DEFAULT_REEL_SCRIPT: args.reel_script = args.base_dir / "reel_script.json" if args.images_dir == DEFAULT_IMAGES_DIR: args.images_dir = args.base_dir / "images" if args.videos_dir == DEFAULT_VIDEOS_DIR: args.videos_dir = args.base_dir / "videos" if args.audios_dir == DEFAULT_AUDIOS_DIR: args.audios_dir = args.base_dir / "audios" if args.merged_dir == DEFAULT_MERGED_DIR: args.merged_dir = args.base_dir / "merged" if args.output == DEFAULT_OUTPUT: args.output = args.base_dir / "results" / "final_output.mp4" try: if not args.skip_generate and not args.reel_script.exists(): run_step( "Generate Reel Script", lambda: generate_script.main( _with_log_level( [ "--topic-description", str(args.base_dir / "topic_description.txt"), "--output-script", str(args.reel_script), ], args.log_level, ) ), ) if not args.reel_script.exists(): LOGGER.error("Reel script was not generated at %s", args.reel_script) return 1 if not args.skip_generate and not args.skip_audio_generate: run_step( "Generate Audios", lambda: generate_audios.main( _with_log_level( [ "--reel-script", str(args.reel_script), "--audios-dir", str(args.audios_dir), ], args.log_level, ) ), ) if not args.skip_generate: run_step( "Generate Images", lambda: generate_images.main( _with_log_level( [ "--reel-script", str(args.reel_script), "--images-dir", str(args.images_dir), ], args.log_level, ) ), ) if not args.skip_generate: run_step( "Generate Videos", lambda: generate_videos.main( _with_log_level( [ "--hunyuan-dir", str(args.hunyuan_dir), "--reel-script", str(args.reel_script), "--images-dir", str(args.images_dir), "--videos-dir", str(args.videos_dir), "--audios-dir", str(args.audios_dir), "--seed", str(args.seed), ], args.log_level, ) ), ) if not args.skip_merge: merge_argv = [ "--videos-dir", str(args.videos_dir), "--audios-dir", str(args.audios_dir), "--output-dir", str(args.merged_dir), ] if args.skip_audio_generate: merge_argv.append("--allow-missing-audio") run_step( "Merge Audio + Video", lambda: merge_audio_video.main(_with_log_level(merge_argv, args.log_level)), ) if not args.skip_concat: run_step( "Concatenate Merged Videos", lambda: concat_merged.main( _with_log_level( [ "--merged-dir", str(args.merged_dir), "--output", str(args.output), ], args.log_level, ) ), ) except Exception: LOGGER.exception("Pipeline failed") return 1 if not args.skip_s3_upload: try: maybe_upload_to_s3(args.output) except Exception: LOGGER.exception("Failed uploading output to S3") return 1 LOGGER.info("Pipeline complete") LOGGER.info("Final output: %s", args.output) return 0 if __name__ == "__main__": raise SystemExit(main())