1
0
Files
ContentGeneration/run_video_pipeline.py

207 lines
7.3 KiB
Python

#!/usr/bin/env python3
"""Run the full video pipeline: generate, merge, and concatenate."""
from __future__ import annotations
import argparse
import logging
import os
import subprocess
import sys
from pathlib import Path
from src.logging_config import configure_logging, debug_log_lifecycle
from src.s3_video_storage import S3VideoStorage
PROJECT_ROOT = Path(__file__).resolve().parent
SCRIPT_DIR = PROJECT_ROOT / "src"
DEFAULT_BASE_DIR = PROJECT_ROOT
DEFAULT_HUNYUAN_DIR = DEFAULT_BASE_DIR / "HunyuanVideo-1.5"
DEFAULT_REEL_SCRIPT = DEFAULT_BASE_DIR / "reel_script.json"
DEFAULT_IMAGES_DIR = DEFAULT_BASE_DIR / "images"
DEFAULT_VIDEOS_DIR = DEFAULT_BASE_DIR / "videos"
DEFAULT_AUDIOS_DIR = DEFAULT_BASE_DIR / "audios"
DEFAULT_MERGED_DIR = DEFAULT_BASE_DIR / "merged"
DEFAULT_OUTPUT = DEFAULT_BASE_DIR / "results" / "final_output.mp4"
LOGGER = logging.getLogger(__name__)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--base-dir", type=Path, default=DEFAULT_BASE_DIR)
parser.add_argument("--hunyuan-dir", type=Path, default=DEFAULT_HUNYUAN_DIR)
parser.add_argument("--reel-script", type=Path, default=DEFAULT_REEL_SCRIPT)
parser.add_argument("--images-dir", type=Path, default=DEFAULT_IMAGES_DIR)
parser.add_argument("--videos-dir", type=Path, default=DEFAULT_VIDEOS_DIR)
parser.add_argument("--audios-dir", type=Path, default=DEFAULT_AUDIOS_DIR)
parser.add_argument("--merged-dir", type=Path, default=DEFAULT_MERGED_DIR)
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT)
parser.add_argument("--seed", type=int, default=1)
parser.add_argument("--skip-generate", action="store_true")
parser.add_argument("--skip-audio-generate", action="store_true")
parser.add_argument("--skip-merge", action="store_true")
parser.add_argument("--skip-concat", action="store_true")
parser.add_argument("--skip-s3-upload", action="store_true")
parser.add_argument(
"--log-level",
default=None,
help="Logging level (overrides LOG_LEVEL env var)",
)
return parser.parse_args()
@debug_log_lifecycle
def run_step(name: str, cmd: list[str], cwd: Path | None = None) -> None:
LOGGER.info("=== %s ===", name)
LOGGER.info("$ %s", " ".join(str(part) for part in cmd))
if cwd is not None:
LOGGER.info("(cwd: %s)", cwd)
subprocess.run(cmd, check=True, cwd=str(cwd) if cwd else None)
def _with_log_level(cmd: list[str], log_level: str | None) -> list[str]:
if not log_level:
return cmd
return [*cmd, "--log-level", log_level]
@debug_log_lifecycle
def maybe_upload_to_s3(output_path: Path) -> None:
bucket = os.getenv("AWS_S3_BUCKET")
if not bucket:
LOGGER.warning("Skipping S3 upload: AWS_S3_BUCKET is not set")
return
storage = S3VideoStorage(
{
"bucket_name": bucket,
"region_name": os.getenv("AWS_REGION"),
"endpoint_url": os.getenv("AWS_S3_ENDPOINT_URL"),
"aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID"),
"aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY"),
"aws_session_token": os.getenv("AWS_SESSION_TOKEN"),
}
)
s3_uri = storage.store_file(output_path)
LOGGER.info("Uploaded output to %s", s3_uri)
@debug_log_lifecycle
def main() -> int:
args = parse_args()
configure_logging(args.log_level)
# If only base-dir is overridden, derive the common subpaths from it.
if args.base_dir != DEFAULT_BASE_DIR:
if args.hunyuan_dir == DEFAULT_HUNYUAN_DIR:
args.hunyuan_dir = args.base_dir / "HunyuanVideo-1.5"
if args.reel_script == DEFAULT_REEL_SCRIPT:
args.reel_script = args.base_dir / "reel_script.json"
if args.images_dir == DEFAULT_IMAGES_DIR:
args.images_dir = args.base_dir / "images"
if args.videos_dir == DEFAULT_VIDEOS_DIR:
args.videos_dir = args.base_dir / "videos"
if args.audios_dir == DEFAULT_AUDIOS_DIR:
args.audios_dir = args.base_dir / "audios"
if args.merged_dir == DEFAULT_MERGED_DIR:
args.merged_dir = args.base_dir / "merged"
if args.output == DEFAULT_OUTPUT:
args.output = args.base_dir / "results" / "final_output.mp4"
try:
if not args.skip_generate and not args.reel_script.exists():
run_step(
"Generate Reel Script",
_with_log_level([
sys.executable,
str(SCRIPT_DIR / "generate_script.py"),
], args.log_level),
cwd=args.base_dir,
)
if not args.reel_script.exists():
LOGGER.error("Reel script was not generated at %s", args.reel_script)
return 1
if not args.skip_generate and not args.skip_audio_generate:
run_step(
"Generate Audios",
_with_log_level([
sys.executable,
str(SCRIPT_DIR / "generate_audios.py"),
], args.log_level),
cwd=args.base_dir,
)
if not args.skip_generate:
run_step(
"Generate Videos",
_with_log_level([
sys.executable,
str(SCRIPT_DIR / "generate_videos.py"),
"--hunyuan-dir",
str(args.hunyuan_dir),
"--reel-script",
str(args.reel_script),
"--images-dir",
str(args.images_dir),
"--videos-dir",
str(args.videos_dir),
"--audios-dir",
str(args.audios_dir),
"--seed",
str(args.seed),
], args.log_level),
)
if not args.skip_merge:
merge_cmd = [
sys.executable,
str(SCRIPT_DIR / "merge_audio_video.py"),
"--videos-dir",
str(args.videos_dir),
"--audios-dir",
str(args.audios_dir),
"--output-dir",
str(args.merged_dir),
]
if args.skip_audio_generate:
merge_cmd.append("--allow-missing-audio")
run_step(
"Merge Audio + Video",
_with_log_level(merge_cmd, args.log_level),
)
if not args.skip_concat:
run_step(
"Concatenate Merged Videos",
_with_log_level([
sys.executable,
str(SCRIPT_DIR / "concat_merged.py"),
"--merged-dir",
str(args.merged_dir),
"--output",
str(args.output),
], args.log_level),
)
except subprocess.CalledProcessError as exc:
LOGGER.exception("Pipeline failed at command: %s", exc.cmd)
return exc.returncode
if not args.skip_s3_upload:
try:
maybe_upload_to_s3(args.output)
except Exception:
LOGGER.exception("Failed uploading output to S3")
return 1
LOGGER.info("Pipeline complete")
LOGGER.info("Final output: %s", args.output)
return 0
if __name__ == "__main__":
raise SystemExit(main())