1
0

24 Commits

Author SHA1 Message Date
dede1988e6 Run Hunyuan's generate script with torchrun utility 2026-04-03 20:12:31 +02:00
9668088b27 Added HunyuanVideo folder to the python sources list 2026-04-03 19:35:44 +02:00
17ac033729 Run video generation model from python instead of calling subprocess 2026-04-03 18:34:21 +02:00
a4059aa4f8 Add protobuf and sentencepiece runtime dependencies 2026-04-03 17:41:08 +02:00
65ac72cb98 Run script with python instead of uv 2026-04-03 17:19:16 +02:00
a80ecf411e Use src.logging_config imports in entry scripts 2026-04-03 16:20:50 +02:00
008ee18ba8 Refactor pipeline to call script entrypoints directly 2026-04-03 16:15:19 +02:00
74f8159eff Define and set device 2026-04-03 15:50:11 +02:00
e767aca68c Exclude transformers 5.4.0 due to flash_attn KeyError 2026-04-03 13:32:11 +02:00
4fe7762262 Updated uv paths, do not use system python 2026-04-03 12:58:20 +02:00
8ed5554d80 Changed the name of env var name related to huggingface api token 2026-04-02 19:30:43 +02:00
7ad1a08692 Pin torch to CUDA 12 compatible version 2026-04-02 18:11:57 +02:00
4f3c568123 Changed container command to run script with uv 2026-04-02 18:00:21 +02:00
c3bfa5ba65 Fix project root path for image/audio generation 2026-04-02 17:40:25 +02:00
323a41c622 Updated dependecies installation process 2026-04-02 15:36:16 +02:00
f6f5b87329 Fixed Dockerfile 2026-04-02 15:12:39 +02:00
9d0c509acd Removed --system flag 2026-04-02 14:37:05 +02:00
17e0099265 Migrate to uv sync and pytest coverage workflow 2026-04-02 14:28:18 +02:00
08ebab6348 Add skip-audio mode and resilient merge handling 2026-04-02 13:25:54 +02:00
a0a66264d2 Refactor src layout and add logging lifecycle + tests 2026-04-02 12:32:02 +02:00
e3c2b9ddee Added generate reel script step 2026-04-02 11:54:29 +02:00
48a11705f3 Moved COPY code statement, added shell scripts to the dockerignore 2026-04-02 11:14:51 +02:00
686a458905 Refactored code, added Dockerfile, replaced bash scripts with python alternatives, added README with instructions on running a pipeline 2026-04-01 16:56:06 +02:00
ca116562fe Initial 2026-04-01 15:55:10 +02:00
21 changed files with 3560 additions and 72 deletions

64
.dockerignore Normal file
View File

@@ -0,0 +1,64 @@
# Git
.git/
.gitignore
# Python cache / bytecode
__pycache__/
*.py[cod]
*$py.class
# Virtual environments
.venv/
venv/
env/
ENV/
# Build / packaging artifacts
build/
dist/
*.egg-info/
.eggs/
pip-wheel-metadata/
# Test and tooling caches
.pytest_cache/
.mypy_cache/
.ruff_cache/
.tox/
.nox/
.coverage
.coverage.*
htmlcov/
# Notebooks
.ipynb_checkpoints/
# Editor and OS files
.vscode/
.idea/
.DS_Store
Thumbs.db
# Local env and logs
.env
.env.*
!.env.example
*.log
*.pid
# Optional large local media
*.mp4
*.mov
*.avi
*.mkv
# Project generated data and checkpoints
images/
audios/
videos/
merged/
results/
outputs/
ckpts/
HunyuanVideo-1.5/ckpts/
*.sh

1
.env
View File

@@ -1 +0,0 @@
ELEVENLABS_API_KEY=sk_e343522cb3fd4da2d46844e81e1152e3de2a72cd1430a383

19
.env.example Normal file
View File

@@ -0,0 +1,19 @@
# ElevenLabs
ELEVENLABS_API_KEY=
# Hugging Face (required for gated model downloads, e.g. FLUX.1-schnell)
HF_TOKEN=
# Hunyuan prompt rewrite endpoints (optional; rewrite is disabled in current generate_videos.py)
T2V_REWRITE_BASE_URL=
T2V_REWRITE_MODEL_NAME=
I2V_REWRITE_BASE_URL=
I2V_REWRITE_MODEL_NAME=
# AWS / S3 (used when initializing S3VideoStorage)
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_SESSION_TOKEN=
AWS_REGION=
AWS_S3_BUCKET=
AWS_S3_ENDPOINT_URL=

67
.gitignore vendored Normal file
View File

@@ -0,0 +1,67 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
dist/
downloads/
.eggs/
*.egg-info/
*.egg
pip-wheel-metadata/
# Virtual environments
.venv/
venv/
env/
ENV/
# Unit test / coverage reports
.pytest_cache/
.coverage
.coverage.*
htmlcov/
.tox/
.nox/
# Type checker / linter caches
.mypy_cache/
.pyre/
.ruff_cache/
.pytype/
.dmypy.json
dmypy.json
# Jupyter Notebook
.ipynb_checkpoints/
# IDE/editor
.vscode/
.idea/
*.swp
*.swo
# OS files
.DS_Store
Thumbs.db
# Logs and local runtime files
*.log
*.pid
# Local environment variables
.env
.env.*
!.env.example
# Project-specific artifacts
*.mp4
*.mov
*.avi
*.mkv

67
Dockerfile Normal file
View File

@@ -0,0 +1,67 @@
FROM nvidia/cuda:12.4.1-cudnn-devel-ubuntu22.04
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
UV_EXTRA_INDEX_URL=https://download.pytorch.org/whl/cu121 \
UV_NO_SYNC=true \
PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True,max_split_size_mb:128
# Base OS tools + media stack + Python toolchain.
RUN apt-get update && apt-get install -y --no-install-recommends \
ffmpeg \
git \
git-lfs \
ca-certificates \
curl \
build-essential \
pkg-config \
ninja-build \
libglib2.0-0 \
libgl1 \
&& rm -rf /var/lib/apt/lists/* \
&& ln -sf /usr/bin/python3.10 /usr/bin/python \
&& git lfs install
# Install uv.
COPY --from=ghcr.io/astral-sh/uv:0.6.17 /uv /uvx /usr/local/bin/
RUN uv python install 3.10
# Place executables in the environment at the front of the path
ENV PATH="/app/.venv/bin:$PATH"
WORKDIR /app
# Install app Python dependencies first for better layer caching.
COPY pyproject.toml README.md /app/
RUN uv sync --no-dev --no-install-project
# Ensure HunyuanVideo source exists in the image.
ARG HUNYUAN_REPO=https://github.com/Tencent-Hunyuan/HunyuanVideo-1.5.git
RUN if [ ! -f /app/HunyuanVideo-1.5/requirements.txt ]; then \
rm -rf /app/HunyuanVideo-1.5 && \
git clone --depth 1 "$HUNYUAN_REPO" /app/HunyuanVideo-1.5; \
fi
# Install HunyuanVideo dependencies from upstream README guidance.
RUN uv pip install --index-strategy unsafe-best-match -r /app/HunyuanVideo-1.5/requirements.txt \
&& uv pip install --upgrade tencentcloud-sdk-python \
&& uv pip install sgl-kernel==0.3.18
ENV PYTHONPATH="/app/HunyuanVideo-1.5:$PYTHONPATH"
# Optional attention backends from Hunyuan docs.
# Build with: --build-arg INSTALL_OPTIONAL_ATTENTION=1
ARG INSTALL_OPTIONAL_ATTENTION=0
RUN if [ "$INSTALL_OPTIONAL_ATTENTION" = "1" ]; then \
uv pip install flash-attn --no-build-isolation && \
git clone --depth 1 https://github.com/Tencent-Hunyuan/flex-block-attn.git /tmp/flex-block-attn && \
cd /tmp/flex-block-attn && git submodule update --init --recursive && python setup.py install && \
git clone --depth 1 https://github.com/cooper1637/SageAttention.git /tmp/SageAttention && \
cd /tmp/SageAttention && python setup.py install; \
fi
# Copy application source after dependencies are installed.
COPY . /app
# Default pipeline entrypoint.
CMD ["python", "run_video_pipeline.py"]

218
README.md Normal file
View File

@@ -0,0 +1,218 @@
# ContentGeneration Pipeline
This project runs a 3-step video pipeline:
1. Generate shot videos from images + prompts.
2. Merge each generated video with its audio.
3. Concatenate merged clips into one final output.
The pipeline entrypoint is `run_video_pipeline.py`.
## Quick Start
Local Python:
```bash
cp .env.example .env
uv sync --dev
uv run python run_video_pipeline.py
```
Docker (GPU):
```bash
cp .env.example .env
docker build -t content-generation:latest .
docker run --rm --gpus all --env-file .env \
-v "$(pwd)":/app \
-v "$HOME/.cache/huggingface":/root/.cache/huggingface \
-w /app content-generation:latest
```
First run (skip S3 upload):
```bash
python run_video_pipeline.py --skip-s3-upload
```
Docker first run (skip S3 upload):
```bash
docker run --rm --gpus all --env-file .env \
-v "$(pwd)":/app \
-v "$HOME/.cache/huggingface":/root/.cache/huggingface \
-w /app \
content-generation:latest \
python run_video_pipeline.py --skip-s3-upload
```
## Project Layout
- `run_video_pipeline.py`: main entrypoint.
- `src/`: helper scripts used by the pipeline.
- `HunyuanVideo-1.5/`: Hunyuan inference code and model dependencies.
- `reel_script.json`: required script input with `shots`.
- `images/`, `audios/`, `videos/`, `merged/`, `results/`: working/output folders.
- `.env.example`: environment variable template.
## Prerequisites
1. Linux with NVIDIA GPU and CUDA runtime.
2. `ffmpeg` and `ffprobe` available on PATH.
3. Python 3.10+.
4. `uv` installed (https://docs.astral.sh/uv/).
5. Hunyuan model checkpoints under `HunyuanVideo-1.5/ckpts`.
6. If using FLUX local download, access approved for `black-forest-labs/FLUX.1-schnell`.
## Environment Variables
1. Create local env file:
```bash
cp .env.example .env
```
2. Fill required variables in `.env`:
- `ELEVENLABS_API_KEY` for audio generation.
- `HUGGINGFACE_HUB_TOKEN` if gated Hugging Face model access is needed.
- `AWS_S3_BUCKET` (+ optional AWS vars) if you want final output uploaded to S3.
## Run Locally (Python)
1. Create and activate a virtual environment:
```bash
uv venv
source .venv/bin/activate
```
2. Install Python dependencies:
```bash
uv sync --dev
```
3. Install Hunyuan dependencies:
```bash
uv pip install -r HunyuanVideo-1.5/requirements.txt
uv pip install --upgrade tencentcloud-sdk-python
uv pip install sgl-kernel==0.3.18
```
4. Run full pipeline:
```bash
uv run python run_video_pipeline.py
```
5. Common options:
```bash
# Skip generation and only merge + concat
python run_video_pipeline.py --skip-generate
# Skip S3 upload
python run_video_pipeline.py --skip-s3-upload
# Override base directory
python run_video_pipeline.py --base-dir /absolute/path/to/workdir
# Change logging verbosity
python run_video_pipeline.py --log-level DEBUG
```
## Run with Docker
1. Build image:
```bash
docker build -t content-generation:latest .
```
2. Optional build with extra attention backends:
```bash
docker build -t content-generation:latest --build-arg INSTALL_OPTIONAL_ATTENTION=1 .
```
3. Run pipeline in container (GPU required):
```bash
docker run --rm --gpus all \
--env-file .env \
-v "$(pwd)":/app \
-v "$HOME/.cache/huggingface":/root/.cache/huggingface \
-w /app \
content-generation:latest
```
4. Pass extra pipeline args:
```bash
docker run --rm --gpus all \
--env-file .env \
-v "$(pwd)":/app \
-v "$HOME/.cache/huggingface":/root/.cache/huggingface \
-w /app \
content-generation:latest \
python run_video_pipeline.py --skip-s3-upload --log-level DEBUG
```
## Input Expectations
1. `reel_script.json` must exist and contain a `shots` array.
2. `images/shot_<n>.png` and `audios/output_<n>.mp3` should align by shot number.
3. Final output is written by default to `results/final_output.mp4`.
## S3 Upload Behavior
1. If `AWS_S3_BUCKET` is set, the pipeline uploads final output to S3 using `S3VideoStorage`.
2. If `AWS_S3_BUCKET` is missing, upload is skipped with a warning.
3. Disable upload explicitly with `--skip-s3-upload`.
## Troubleshooting
1. `torch.cuda.is_available()` is false in Docker.
- Run with GPU flags: `docker run --gpus all ...`
- Verify NVIDIA Container Toolkit is installed on host.
- Check host GPU visibility: `nvidia-smi`.
2. `ffmpeg` or `ffprobe` not found.
- Local: install ffmpeg with your package manager.
- Docker: ffmpeg is installed in the provided Dockerfile.
3. Hunyuan generate step fails due to missing checkpoints.
- Ensure checkpoints are available under `HunyuanVideo-1.5/ckpts`.
- Confirm mounted project path in Docker includes checkpoints.
4. Hugging Face model download fails (401/403).
- Accept model access terms for gated models (for example FLUX.1-schnell).
- Set `HUGGINGFACE_HUB_TOKEN` in `.env`.
5. S3 upload fails.
- Confirm `AWS_S3_BUCKET` is set.
- If needed, set `AWS_REGION` and credentials (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, optional `AWS_SESSION_TOKEN`).
- For S3-compatible providers, set `AWS_S3_ENDPOINT_URL`.
6. Permission issues when running Docker with mounted volumes.
- Use your host user mapping if needed:
`docker run --rm --gpus all -u "$(id -u):$(id -g)" ...`
7. Out-of-memory during video generation.
- Keep `PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True,max_split_size_mb:128`.
- Reduce workload by skipping optional enhancements or lowering resolution/steps in generation scripts.
8. Verify syntax quickly before running.
```bash
uv run python -m py_compile run_video_pipeline.py src/*.py
```
## Testing
Run tests with coverage:
```bash
uv run pytest
```

View File

@@ -1,35 +0,0 @@
from elevenlabs.client import ElevenLabs
from elevenlabs.play import play
import os
import json
from dotenv import load_dotenv
load_dotenv()
ELEVENLABS_API_KEY = os.getenv('ELEVENLABS_API_KEY')
if __name__ == '__main__':
script_path = "reel_script.json"
with open(script_path, "r") as f:
reel_data = json.load(f)
client = ElevenLabs(
api_key=ELEVENLABS_API_KEY
)
for shot in reel_data["shots"]:
print(shot["shot_number"], shot["voiceover"])
prompt = shot["voiceover"]
audio = client.text_to_speech.convert(
text=prompt,
voice_id="JBFqnCBsd6RMkjVDRZzb",
model_id="eleven_multilingual_v2",
output_format="mp3_44100_128",
)
audio_bytes = b"".join(audio)
if not os.path.exists("audios"):
os.makedirs("audios")
with open(f"audios/output_{shot["shot_number"]}.mp3", "wb") as f:
f.write(audio_bytes)

View File

@@ -1,28 +0,0 @@
import torch
from diffusers import FluxPipeline
import json
import os
if __name__ == '__main__':
script_path = "reel_script.json"
with open(script_path, "r") as f:
reel_data = json.load(f)
pipe = FluxPipeline.from_pretrained("black-forest-labs/FLUX.1-schnell", torch_dtype=torch.bfloat16)
pipe.enable_model_cpu_offload()
for shot in reel_data["shots"]:
print(shot["shot_number"], shot["image_description"])
prompt = shot["image_description"]
image = pipe(
prompt,
guidance_scale=0.0,
num_inference_steps=4,
max_sequence_length=256,
generator=torch.Generator("cpu").manual_seed(0)
).images[0]
if not os.path.exists("images"):
os.makedirs("images")
image.save(f"images/shot_{shot["shot_number"]}.png")

32
pyproject.toml Normal file
View File

@@ -0,0 +1,32 @@
[project]
name = "content-generation"
version = "0.1.0"
description = "Video content generation pipeline"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"boto3",
"python-dotenv",
"elevenlabs",
"torch==2.5.1",
"transformers!=5.4.0",
"diffusers",
"accelerate",
"safetensors",
"huggingface-hub",
"bitsandbytes",
"protobuf",
"sentencepiece",
]
[dependency-groups]
dev = [
"pytest",
"pytest-cov",
"coverage[toml]",
]
[tool.pytest.ini_options]
addopts = "-q --cov=run_video_pipeline --cov=src/logging_config.py --cov-report=term-missing --cov-fail-under=70"
testpaths = ["tests"]
python_files = ["test_*.py"]

236
run_video_pipeline.py Normal file
View File

@@ -0,0 +1,236 @@
#!/usr/bin/env python3
"""Run the full video pipeline: generate, merge, and concatenate."""
from __future__ import annotations
import argparse
import logging
import os
from pathlib import Path
from typing import Callable
from src import concat_merged, generate_audios, generate_images, generate_script, generate_videos, merge_audio_video
from src.logging_config import configure_logging, debug_log_lifecycle
from src.s3_video_storage import S3VideoStorage
PROJECT_ROOT = Path(__file__).resolve().parent
DEFAULT_BASE_DIR = PROJECT_ROOT
DEFAULT_HUNYUAN_DIR = DEFAULT_BASE_DIR / "HunyuanVideo-1.5"
DEFAULT_REEL_SCRIPT = DEFAULT_BASE_DIR / "reel_script.json"
DEFAULT_IMAGES_DIR = DEFAULT_BASE_DIR / "images"
DEFAULT_VIDEOS_DIR = DEFAULT_BASE_DIR / "videos"
DEFAULT_AUDIOS_DIR = DEFAULT_BASE_DIR / "audios"
DEFAULT_MERGED_DIR = DEFAULT_BASE_DIR / "merged"
DEFAULT_OUTPUT = DEFAULT_BASE_DIR / "results" / "final_output.mp4"
LOGGER = logging.getLogger(__name__)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--base-dir", type=Path, default=DEFAULT_BASE_DIR)
parser.add_argument("--hunyuan-dir", type=Path, default=DEFAULT_HUNYUAN_DIR)
parser.add_argument("--reel-script", type=Path, default=DEFAULT_REEL_SCRIPT)
parser.add_argument("--images-dir", type=Path, default=DEFAULT_IMAGES_DIR)
parser.add_argument("--videos-dir", type=Path, default=DEFAULT_VIDEOS_DIR)
parser.add_argument("--audios-dir", type=Path, default=DEFAULT_AUDIOS_DIR)
parser.add_argument("--merged-dir", type=Path, default=DEFAULT_MERGED_DIR)
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT)
parser.add_argument("--seed", type=int, default=1)
parser.add_argument("--skip-generate", action="store_true")
parser.add_argument("--skip-audio-generate", action="store_true")
parser.add_argument("--skip-merge", action="store_true")
parser.add_argument("--skip-concat", action="store_true")
parser.add_argument("--skip-s3-upload", action="store_true")
parser.add_argument(
"--log-level",
default=None,
help="Logging level (overrides LOG_LEVEL env var)",
)
return parser.parse_args()
@debug_log_lifecycle
def run_step(name: str, step: Callable[[], int]) -> None:
LOGGER.info("=== %s ===", name)
rc = step()
if rc != 0:
raise RuntimeError(f"Step '{name}' failed with exit code {rc}")
def _with_log_level(argv: list[str], log_level: str | None) -> list[str]:
if not log_level:
return argv
return [*argv, "--log-level", log_level]
@debug_log_lifecycle
def maybe_upload_to_s3(output_path: Path) -> None:
bucket = os.getenv("AWS_S3_BUCKET")
if not bucket:
LOGGER.warning("Skipping S3 upload: AWS_S3_BUCKET is not set")
return
storage = S3VideoStorage(
{
"bucket_name": bucket,
"region_name": os.getenv("AWS_REGION"),
"endpoint_url": os.getenv("AWS_S3_ENDPOINT_URL"),
"aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID"),
"aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY"),
"aws_session_token": os.getenv("AWS_SESSION_TOKEN"),
}
)
s3_uri = storage.store_file(output_path)
LOGGER.info("Uploaded output to %s", s3_uri)
@debug_log_lifecycle
def main() -> int:
args = parse_args()
configure_logging(args.log_level)
# If only base-dir is overridden, derive the common subpaths from it.
if args.base_dir != DEFAULT_BASE_DIR:
if args.hunyuan_dir == DEFAULT_HUNYUAN_DIR:
args.hunyuan_dir = args.base_dir / "HunyuanVideo-1.5"
if args.reel_script == DEFAULT_REEL_SCRIPT:
args.reel_script = args.base_dir / "reel_script.json"
if args.images_dir == DEFAULT_IMAGES_DIR:
args.images_dir = args.base_dir / "images"
if args.videos_dir == DEFAULT_VIDEOS_DIR:
args.videos_dir = args.base_dir / "videos"
if args.audios_dir == DEFAULT_AUDIOS_DIR:
args.audios_dir = args.base_dir / "audios"
if args.merged_dir == DEFAULT_MERGED_DIR:
args.merged_dir = args.base_dir / "merged"
if args.output == DEFAULT_OUTPUT:
args.output = args.base_dir / "results" / "final_output.mp4"
try:
if not args.skip_generate and not args.reel_script.exists():
run_step(
"Generate Reel Script",
lambda: generate_script.main(
_with_log_level(
[
"--topic-description",
str(args.base_dir / "topic_description.txt"),
"--output-script",
str(args.reel_script),
],
args.log_level,
)
),
)
if not args.reel_script.exists():
LOGGER.error("Reel script was not generated at %s", args.reel_script)
return 1
if not args.skip_generate and not args.skip_audio_generate:
run_step(
"Generate Audios",
lambda: generate_audios.main(
_with_log_level(
[
"--reel-script",
str(args.reel_script),
"--audios-dir",
str(args.audios_dir),
],
args.log_level,
)
),
)
if not args.skip_generate:
run_step(
"Generate Images",
lambda: generate_images.main(
_with_log_level(
[
"--reel-script",
str(args.reel_script),
"--images-dir",
str(args.images_dir),
],
args.log_level,
)
),
)
if not args.skip_generate:
run_step(
"Generate Videos",
lambda: generate_videos.main(
_with_log_level(
[
"--hunyuan-dir",
str(args.hunyuan_dir),
"--reel-script",
str(args.reel_script),
"--images-dir",
str(args.images_dir),
"--videos-dir",
str(args.videos_dir),
"--audios-dir",
str(args.audios_dir),
"--seed",
str(args.seed),
],
args.log_level,
)
),
)
if not args.skip_merge:
merge_argv = [
"--videos-dir",
str(args.videos_dir),
"--audios-dir",
str(args.audios_dir),
"--output-dir",
str(args.merged_dir),
]
if args.skip_audio_generate:
merge_argv.append("--allow-missing-audio")
run_step(
"Merge Audio + Video",
lambda: merge_audio_video.main(_with_log_level(merge_argv, args.log_level)),
)
if not args.skip_concat:
run_step(
"Concatenate Merged Videos",
lambda: concat_merged.main(
_with_log_level(
[
"--merged-dir",
str(args.merged_dir),
"--output",
str(args.output),
],
args.log_level,
)
),
)
except Exception:
LOGGER.exception("Pipeline failed")
return 1
if not args.skip_s3_upload:
try:
maybe_upload_to_s3(args.output)
except Exception:
LOGGER.exception("Failed uploading output to S3")
return 1
LOGGER.info("Pipeline complete")
LOGGER.info("Final output: %s", args.output)
return 0
if __name__ == "__main__":
raise SystemExit(main())

85
src/concat_merged.py Normal file
View File

@@ -0,0 +1,85 @@
#!/usr/bin/env python3
"""Concatenate merged_*.mp4 files into a single output using ffmpeg concat demuxer."""
from __future__ import annotations
import argparse
import logging
import re
import subprocess
import tempfile
from pathlib import Path
from src.logging_config import configure_logging, debug_log_lifecycle
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_BASE_DIR = SCRIPT_DIR.parents[1]
DEFAULT_MERGED_DIR = DEFAULT_BASE_DIR / "merged"
DEFAULT_OUTPUT = DEFAULT_BASE_DIR / "results" / "run_3" / "final_output.mp4"
LOGGER = logging.getLogger(__name__)
def shot_number(path: Path) -> int:
match = re.search(r"merged_(\d+)\.mp4$", path.name)
return int(match.group(1)) if match else -1
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--merged-dir", type=Path, default=DEFAULT_MERGED_DIR)
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT)
parser.add_argument(
"--log-level",
default=None,
help="Logging level (overrides LOG_LEVEL env var)",
)
return parser.parse_args(argv)
@debug_log_lifecycle
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
configure_logging(args.log_level)
videos = sorted(args.merged_dir.glob("merged_*.mp4"), key=shot_number)
if not videos:
LOGGER.warning("No merged videos found in %s", args.merged_dir)
return 1
args.output.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as tmp:
filelist = Path(tmp.name)
for video in videos:
tmp.write(f"file '{video}'\\n")
try:
LOGGER.info("Concatenating the following files:\n%s", filelist.read_text().rstrip())
subprocess.run(
[
"ffmpeg",
"-f",
"concat",
"-safe",
"0",
"-i",
str(filelist),
"-c",
"copy",
"-y",
str(args.output),
],
check=True,
)
finally:
filelist.unlink(missing_ok=True)
LOGGER.info("Done")
return 0
if __name__ == "__main__":
raise SystemExit(main())

67
src/generate_audios.py Normal file
View File

@@ -0,0 +1,67 @@
from __future__ import annotations
import argparse
import json
import logging
import os
from pathlib import Path
from dotenv import load_dotenv
from elevenlabs.client import ElevenLabs
from src.logging_config import configure_logging, debug_log_lifecycle
SCRIPT_DIR = Path(__file__).resolve().parent
PROJECT_ROOT = SCRIPT_DIR.parent
load_dotenv(PROJECT_ROOT / ".env")
LOGGER = logging.getLogger(__name__)
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--reel-script", type=Path, default=PROJECT_ROOT / "reel_script.json")
parser.add_argument("--audios-dir", type=Path, default=PROJECT_ROOT / "audios")
parser.add_argument(
"--log-level",
default=None,
help="Logging level (overrides LOG_LEVEL env var)",
)
return parser.parse_args(argv)
@debug_log_lifecycle
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
configure_logging(args.log_level)
api_key = os.getenv("ELEVENLABS_API_KEY")
if not api_key:
raise RuntimeError("ELEVENLABS_API_KEY is not set")
args.audios_dir.mkdir(parents=True, exist_ok=True)
reel_data = json.loads(args.reel_script.read_text())
client = ElevenLabs(api_key=api_key)
for shot in reel_data["shots"]:
shot_num = shot["shot_number"]
prompt = shot["voiceover"]
LOGGER.info("Generating audio for shot %s: %s", shot_num, prompt)
audio = client.text_to_speech.convert(
text=prompt,
voice_id="JBFqnCBsd6RMkjVDRZzb",
model_id="eleven_multilingual_v2",
output_format="mp3_44100_128",
)
audio_bytes = b"".join(audio)
out_path = args.audios_dir / f"output_{shot_num}.mp3"
out_path.write_bytes(audio_bytes)
return 0
if __name__ == "__main__":
raise SystemExit(main())

63
src/generate_images.py Normal file
View File

@@ -0,0 +1,63 @@
from __future__ import annotations
import argparse
import json
import logging
from pathlib import Path
import torch
from diffusers import FluxPipeline
from src.logging_config import configure_logging, debug_log_lifecycle
SCRIPT_DIR = Path(__file__).resolve().parent
PROJECT_ROOT = SCRIPT_DIR.parent
LOGGER = logging.getLogger(__name__)
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--reel-script", type=Path, default=PROJECT_ROOT / "reel_script.json")
parser.add_argument("--images-dir", type=Path, default=PROJECT_ROOT / "images")
parser.add_argument(
"--log-level",
default=None,
help="Logging level (overrides LOG_LEVEL env var)",
)
return parser.parse_args(argv)
@debug_log_lifecycle
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
configure_logging(args.log_level)
args.images_dir.mkdir(parents=True, exist_ok=True)
reel_data = json.loads(args.reel_script.read_text())
pipe = FluxPipeline.from_pretrained(
"black-forest-labs/FLUX.1-schnell",
torch_dtype=torch.bfloat16,
)
pipe.enable_model_cpu_offload()
for shot in reel_data["shots"]:
shot_num = shot["shot_number"]
prompt = shot["image_description"]
LOGGER.info("Generating image for shot %s: %s", shot_num, prompt)
image = pipe(
prompt,
guidance_scale=0.0,
num_inference_steps=4,
max_sequence_length=256,
generator=torch.Generator("cpu").manual_seed(0),
).images[0]
image.save(args.images_dir / f"shot_{shot_num}.png")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,10 +1,20 @@
import argparse
import torch import torch
import json import json
import logging
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import re import re
from typing import Optional from typing import Optional
from pathlib import Path
from src.logging_config import configure_logging, debug_log_lifecycle
LOGGER = logging.getLogger(__name__)
SCRIPT_DIR = Path(__file__).resolve().parent
PROJECT_ROOT = SCRIPT_DIR.parent
device = 'cuda' if torch.cuda.is_available() else 'cpu'
MODEL_ID = "Qwen/Qwen3-14B" MODEL_ID = "Qwen/Qwen3-14B"
WORDS_PER_SECOND = 2.5 WORDS_PER_SECOND = 2.5
MAX_DEAD_AIR_SECONDS = 1 MAX_DEAD_AIR_SECONDS = 1
@@ -13,6 +23,33 @@ MAX_VOICEOVER_WORDS = int(MAX_VOICEOVER_SECONDS * WORDS_PER_SECOND)
MIN_VOICEOVER_WORDS = 5 MIN_VOICEOVER_WORDS = 5
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--topic-description",
type=Path,
default=PROJECT_ROOT / "topic_description.txt",
)
parser.add_argument(
"--output-script",
type=Path,
default=PROJECT_ROOT / "reel_script.json",
)
parser.add_argument(
"--log-level",
default=None,
help="Logging level (overrides LOG_LEVEL env var)",
)
return parser.parse_args(argv)
def get_device():
device = 'cuda' if torch.cuda.is_available() else 'cpu'
LOGGER.info("Using device: %s", device)
return device
@debug_log_lifecycle
def load_model(model_id: str = MODEL_ID): def load_model(model_id: str = MODEL_ID):
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True) tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
bnb_config = BitsAndBytesConfig( bnb_config = BitsAndBytesConfig(
@@ -24,13 +61,14 @@ def load_model(model_id: str = MODEL_ID):
model = AutoModelForCausalLM.from_pretrained( model = AutoModelForCausalLM.from_pretrained(
model_id, model_id,
quantization_config=bnb_config, quantization_config=bnb_config,
device_map="auto", device_map=get_device(),
trust_remote_code=True, trust_remote_code=True,
).eval() ).eval()
return model, tokenizer return model, tokenizer
@debug_log_lifecycle
def generate_reel_scenario( def generate_reel_scenario(
model, model,
tokenizer, tokenizer,
@@ -174,7 +212,7 @@ def generate_reel_scenario(
inputs = tokenizer(text, return_tensors="pt").to(model.device) inputs = tokenizer(text, return_tensors="pt").to(model.device)
print("Generating reel scenario..") LOGGER.info("Generating reel scenario")
with torch.no_grad(): with torch.no_grad():
output_ids = model.generate( output_ids = model.generate(
**inputs, **inputs,
@@ -225,6 +263,7 @@ def extract_field(label: str, next_label: Optional[str], text: str) -> str:
return "" return ""
@debug_log_lifecycle
def parse_reel_scenario(raw_scenario: str) -> dict: def parse_reel_scenario(raw_scenario: str) -> dict:
""" """
Parse the shot-by-shot reel scenario into a structured dict. Parse the shot-by-shot reel scenario into a structured dict.
@@ -329,16 +368,22 @@ def parse_reel_scenario(raw_scenario: str) -> dict:
return result return result
if __name__ == '__main__': @debug_log_lifecycle
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
configure_logging(args.log_level)
with open("topic_description.txt", "r") as f: topic = args.topic_description.read_text()
topic = f.read()
model, tokenizer = load_model() model, tokenizer = load_model()
scenario_raw = generate_reel_scenario(model, tokenizer, topic) scenario_raw = generate_reel_scenario(model, tokenizer, topic)
parsed = parse_reel_scenario(scenario_raw) parsed = parse_reel_scenario(scenario_raw)
with open("reel_script.json", "w") as f: args.output_script.write_text(json.dumps(parsed))
json.dump(parsed, f) return 0
if __name__ == '__main__':
raise SystemExit(main())

215
src/generate_videos.py Normal file
View File

@@ -0,0 +1,215 @@
#!/usr/bin/env python3
"""Generate shot videos with HunyuanVideo based on reel script and audio durations."""
from __future__ import annotations
import argparse
import contextlib
import json
import logging
import os
import runpy
import subprocess
import sys
from pathlib import Path
import torch
from torch.distributed.run import main as torch_run
from src.logging_config import configure_logging, debug_log_lifecycle
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_BASE_DIR = SCRIPT_DIR.parents[1]
DEFAULT_HUNYUAN_DIR = DEFAULT_BASE_DIR / "HunyuanVideo-1.5"
DEFAULT_REEL_SCRIPT = DEFAULT_BASE_DIR / "reel_script.json"
DEFAULT_IMAGES_DIR = DEFAULT_BASE_DIR / "images"
DEFAULT_VIDEOS_DIR = DEFAULT_BASE_DIR / "videos"
DEFAULT_AUDIOS_DIR = DEFAULT_BASE_DIR / "audios"
LOGGER = logging.getLogger(__name__)
@contextlib.contextmanager
def _temporary_environ(update: dict[str, str]):
previous: dict[str, str | None] = {key: os.environ.get(key) for key in update}
os.environ.update(update)
try:
yield
finally:
for key, value in previous.items():
if value is None:
os.environ.pop(key, None)
else:
os.environ[key] = value
def _run_hunyuan_generate_in_process(
hunyuan_dir: Path,
argv: list[str],
env_update: dict[str, str],
) -> None:
old_argv = sys.argv[:]
old_cwd = Path.cwd()
try:
with _temporary_environ(env_update):
os.chdir(hunyuan_dir)
sys.argv = argv
torch_run()
finally:
sys.argv = old_argv
os.chdir(old_cwd)
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--hunyuan-dir", type=Path, default=DEFAULT_HUNYUAN_DIR)
parser.add_argument("--reel-script", type=Path, default=DEFAULT_REEL_SCRIPT)
parser.add_argument("--images-dir", type=Path, default=DEFAULT_IMAGES_DIR)
parser.add_argument("--videos-dir", type=Path, default=DEFAULT_VIDEOS_DIR)
parser.add_argument("--audios-dir", type=Path, default=DEFAULT_AUDIOS_DIR)
parser.add_argument("--seed", type=int, default=1)
parser.add_argument(
"--log-level",
default=None,
help="Logging level (overrides LOG_LEVEL env var)",
)
return parser.parse_args(argv)
@debug_log_lifecycle
def get_audio_duration(audio_path: Path) -> float:
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(audio_path),
],
check=True,
text=True,
capture_output=True,
)
return float(result.stdout.strip())
@debug_log_lifecycle
def duration_to_video_length(duration: float) -> int:
frames = int(duration * 24) + 1
if frames % 2 == 0:
frames += 1
return max(49, min(frames, 169))
@debug_log_lifecycle
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
configure_logging(args.log_level)
model_path = args.hunyuan_dir / "ckpts"
args.videos_dir.mkdir(parents=True, exist_ok=True)
env = os.environ.copy()
env["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True,max_split_size_mb:128"
data = json.loads(args.reel_script.read_text())
shots = data.get("shots", [])
LOGGER.info("Found %s shots to generate", len(shots))
for shot in shots:
shot_number = shot["shot_number"]
prompt = str(shot["image_description"]).replace("\t", " ").replace("\n", " ")
image_path = args.images_dir / f"shot_{shot_number}.png"
output_path = args.videos_dir / f"output_{shot_number}.mp4"
audio_path = args.audios_dir / f"output_{shot_number}.mp3"
if not audio_path.exists():
LOGGER.warning("No audio found at %s, falling back to 5s default", audio_path)
duration = 5.0
else:
duration = get_audio_duration(audio_path)
LOGGER.info("Audio duration for shot %s: %ss", shot_number, duration)
video_length = duration_to_video_length(duration)
LOGGER.info("Shot %s | %ss -> %s frames", shot_number, duration, video_length)
LOGGER.info("Prompt: %s", prompt)
LOGGER.info("Image: %s", image_path)
LOGGER.info("Audio: %s", audio_path)
LOGGER.info("Output: %s", output_path)
if output_path.exists():
LOGGER.info("Output path already exists, skipping")
continue
if not image_path.exists():
LOGGER.warning("Image not found at %s, skipped", image_path)
continue
with _temporary_environ(env):
torch.cuda.empty_cache()
LOGGER.info("GPU cache cleared")
run_argv = [
"--nproc_per_node=1",
"generate.py",
"--prompt",
prompt,
"--image_path",
str(image_path),
"--resolution",
"480p",
"--aspect_ratio",
"16:9",
"--seed",
str(args.seed),
"--video_length",
str(video_length),
"--rewrite",
"false",
"--cfg_distilled",
"true",
"--enable_step_distill",
"true",
"--sparse_attn",
"false",
"--use_sageattn",
"true",
"--enable_cache",
"false",
"--overlap_group_offloading",
"true",
"--sr",
"false",
"--output_path",
str(output_path),
"--model_path",
str(model_path),
]
run_env = {
**env,
"RANK": "0",
"LOCAL_RANK": "0",
"WORLD_SIZE": "1",
"MASTER_ADDR": "127.0.0.1",
"MASTER_PORT": os.environ.get("MASTER_PORT", "29500"),
}
_run_hunyuan_generate_in_process(
hunyuan_dir=args.hunyuan_dir,
argv=run_argv,
env_update=run_env,
)
LOGGER.info("Shot %s done", shot_number)
LOGGER.info("Done")
return 0
if __name__ == "__main__":
raise SystemExit(main())

50
src/logging_config.py Normal file
View File

@@ -0,0 +1,50 @@
from __future__ import annotations
import functools
import logging
import os
from collections.abc import Callable
from typing import Any, TypeVar
DEFAULT_LOG_FORMAT = "%(asctime)s | %(levelname)s | %(name)s | %(message)s"
DEFAULT_LOG_LEVEL = "INFO"
LOG_LEVEL_ENV_VAR = "LOG_LEVEL"
F = TypeVar("F", bound=Callable[..., Any])
def resolve_log_level(
cli_level: str | None,
*,
default_level: str = DEFAULT_LOG_LEVEL,
env_var: str = LOG_LEVEL_ENV_VAR,
) -> str:
level = default_level
env_level = os.getenv(env_var)
if env_level:
level = env_level
if cli_level:
level = cli_level
return level
def configure_logging(level: str | None = None, *, default_level: str = DEFAULT_LOG_LEVEL) -> None:
resolved = resolve_log_level(level, default_level=default_level)
logging.basicConfig(
level=getattr(logging, resolved.upper(), logging.INFO),
format=DEFAULT_LOG_FORMAT,
)
def debug_log_lifecycle(func: F) -> F:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
logger = logging.getLogger(func.__module__)
logger.debug("Start %s", func.__qualname__)
try:
return func(*args, **kwargs)
finally:
logger.debug("End %s", func.__qualname__)
return wrapper # type: ignore[return-value]

101
src/merge_audio_video.py Normal file
View File

@@ -0,0 +1,101 @@
#!/usr/bin/env python3
"""Merge videos/output_n.mp4 with audios/output_n.mp3 into merged/merged_n.mp4."""
from __future__ import annotations
import argparse
import logging
import re
import shutil
import subprocess
from pathlib import Path
from src.logging_config import configure_logging
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_BASE_DIR = SCRIPT_DIR.parents[1]
DEFAULT_VIDEOS_DIR = DEFAULT_BASE_DIR / "videos"
DEFAULT_AUDIOS_DIR = DEFAULT_BASE_DIR / "audios"
DEFAULT_OUTPUT_DIR = DEFAULT_BASE_DIR / "merged"
LOGGER = logging.getLogger(__name__)
def shot_number(path: Path) -> int:
match = re.search(r"output_(\d+)\.mp4$", path.name)
return int(match.group(1)) if match else -1
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--videos-dir", type=Path, default=DEFAULT_VIDEOS_DIR)
parser.add_argument("--audios-dir", type=Path, default=DEFAULT_AUDIOS_DIR)
parser.add_argument("--output-dir", type=Path, default=DEFAULT_OUTPUT_DIR)
parser.add_argument(
"--allow-missing-audio",
action="store_true",
help="If set, create merged output from video only when audio is missing.",
)
parser.add_argument("--log-level", default="INFO")
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
configure_logging(args.log_level)
args.output_dir.mkdir(parents=True, exist_ok=True)
videos = sorted(args.videos_dir.glob("output_*.mp4"), key=shot_number)
if not videos:
LOGGER.warning("No videos found in %s", args.videos_dir)
return 1
for video in videos:
num = shot_number(video)
audio = args.audios_dir / f"output_{num}.mp3"
output = args.output_dir / f"merged_{num}.mp4"
if output.exists():
LOGGER.info("Already exists; skipped shot %s", num)
continue
if not audio.exists():
if args.allow_missing_audio:
LOGGER.warning(
"No audio found for shot %s (%s); using video-only output",
num,
audio,
)
shutil.copy2(video, output)
LOGGER.info("Done (video-only): %s", output)
continue
LOGGER.warning("No audio found for shot %s (%s); skipped", num, audio)
continue
LOGGER.info("Merging shot %s: %s + %s -> %s", num, video, audio, output)
subprocess.run(
[
"ffmpeg",
"-i",
str(video),
"-i",
str(audio),
"-c:v",
"copy",
"-c:a",
"aac",
"-shortest",
"-y",
str(output),
],
check=True,
)
LOGGER.info("Done: %s", output)
return 0
if __name__ == "__main__":
raise SystemExit(main())

71
src/s3_video_storage.py Normal file
View File

@@ -0,0 +1,71 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Mapping
import boto3
@dataclass(frozen=True)
class S3Config:
bucket_name: str
region_name: str | None = None
endpoint_url: str | None = None
aws_access_key_id: str | None = None
aws_secret_access_key: str | None = None
aws_session_token: str | None = None
class S3VideoStorage:
def __init__(self, s3_config: S3Config | Mapping[str, Any]) -> None:
self.config = self._normalize_config(s3_config)
client_kwargs: dict[str, Any] = {
"region_name": self.config.region_name,
"endpoint_url": self.config.endpoint_url,
"aws_access_key_id": self.config.aws_access_key_id,
"aws_secret_access_key": self.config.aws_secret_access_key,
"aws_session_token": self.config.aws_session_token,
}
filtered_kwargs = {k: v for k, v in client_kwargs.items() if v is not None}
self._s3_client = boto3.client("s3", **filtered_kwargs)
def store_file(self, file_path: str | Path) -> str:
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File does not exist: {path}")
if not path.is_file():
raise ValueError(f"Path is not a file: {path}")
now = datetime.now(timezone.utc)
key = f"video_content/{now.year:04d}/{now.month:02d}/{now.day:02d}/{path.name}"
self._s3_client.upload_file(str(path), self.config.bucket_name, key)
return f"s3://{self.config.bucket_name}/{key}"
@staticmethod
def _normalize_config(s3_config: S3Config | Mapping[str, Any]) -> S3Config:
if isinstance(s3_config, S3Config):
return s3_config
bucket_name = s3_config.get("bucket_name")
if not bucket_name:
raise ValueError("s3_config must contain non-empty 'bucket_name'")
return S3Config(
bucket_name=str(bucket_name),
region_name=_optional_str(s3_config, "region_name"),
endpoint_url=_optional_str(s3_config, "endpoint_url"),
aws_access_key_id=_optional_str(s3_config, "aws_access_key_id"),
aws_secret_access_key=_optional_str(s3_config, "aws_secret_access_key"),
aws_session_token=_optional_str(s3_config, "aws_session_token"),
)
def _optional_str(config: Mapping[str, Any], key: str) -> str | None:
value = config.get(key)
if value is None:
return None
return str(value)

View File

@@ -0,0 +1,18 @@
from __future__ import annotations
import logging
from src.logging_config import debug_log_lifecycle
def test_logs_function_start_and_end(caplog) -> None:
@debug_log_lifecycle
def sample(a: int, b: int) -> int:
return a + b
with caplog.at_level(logging.DEBUG, logger=sample.__module__):
result = sample(2, 3)
assert result == 5
assert "Start test_logs_function_start_and_end.<locals>.sample" in caplog.text
assert "End test_logs_function_start_and_end.<locals>.sample" in caplog.text

View File

@@ -0,0 +1,119 @@
from __future__ import annotations
import json
import sys
import tempfile
from argparse import Namespace
from pathlib import Path
from types import SimpleNamespace
import pytest
# Avoid requiring boto3 for orchestration tests.
if "boto3" not in sys.modules:
sys.modules["boto3"] = SimpleNamespace(client=lambda *args, **kwargs: object())
import run_video_pipeline as pipeline
def test_full_generation_process_calls_all_scripts(monkeypatch) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
base_dir = Path(tmpdir)
hunyuan_dir = base_dir / "HunyuanVideo-1.5"
images_dir = base_dir / "images"
videos_dir = base_dir / "videos"
audios_dir = base_dir / "audios"
merged_dir = base_dir / "merged"
output_path = base_dir / "results" / "final_output.mp4"
reel_script = base_dir / "reel_script.json"
hunyuan_dir.mkdir(parents=True)
(base_dir / "topic_description.txt").write_text("Test topic")
args = Namespace(
base_dir=base_dir,
hunyuan_dir=hunyuan_dir,
reel_script=reel_script,
images_dir=images_dir,
videos_dir=videos_dir,
audios_dir=audios_dir,
merged_dir=merged_dir,
output=output_path,
seed=1,
skip_generate=False,
skip_audio_generate=False,
skip_merge=False,
skip_concat=False,
skip_s3_upload=True,
log_level="DEBUG",
)
executed_steps: list[str] = []
expected_steps = [
"generate_script",
"generate_audios",
"generate_images",
"generate_videos",
"merge_audio_video",
"concat_merged",
]
def fake_generate_script_main(argv=None) -> int:
executed_steps.append("generate_script")
payload = {
"shots": [
{
"shot_number": 1,
"image_description": "A test image",
"voiceover": "A test voiceover",
}
]
}
reel_script.write_text(json.dumps(payload))
return 0
def fake_generate_audios_main(argv=None) -> int:
executed_steps.append("generate_audios")
audios_dir.mkdir(parents=True, exist_ok=True)
(audios_dir / "output_1.mp3").write_bytes(b"audio")
return 0
def fake_generate_images_main(argv=None) -> int:
executed_steps.append("generate_images")
images_dir.mkdir(parents=True, exist_ok=True)
(images_dir / "shot_1.png").write_bytes(b"image")
return 0
def fake_generate_videos_main(argv=None) -> int:
executed_steps.append("generate_videos")
videos_dir.mkdir(parents=True, exist_ok=True)
(videos_dir / "output_1.mp4").write_bytes(b"video")
return 0
def fake_merge_audio_video_main(argv=None) -> int:
executed_steps.append("merge_audio_video")
merged_dir.mkdir(parents=True, exist_ok=True)
(merged_dir / "merged_1.mp4").write_bytes(b"merged")
return 0
def fake_concat_merged_main(argv=None) -> int:
executed_steps.append("concat_merged")
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_bytes(b"final")
return 0
monkeypatch.setattr(pipeline, "parse_args", lambda: args)
monkeypatch.setattr(pipeline.generate_script, "main", fake_generate_script_main)
monkeypatch.setattr(pipeline.generate_audios, "main", fake_generate_audios_main)
monkeypatch.setattr(pipeline.generate_images, "main", fake_generate_images_main)
monkeypatch.setattr(pipeline.generate_videos, "main", fake_generate_videos_main)
monkeypatch.setattr(pipeline.merge_audio_video, "main", fake_merge_audio_video_main)
monkeypatch.setattr(pipeline.concat_merged, "main", fake_concat_merged_main)
rc = pipeline.main()
assert rc == 0
assert output_path.exists()
# Coverage check for orchestration: ensure every required script stage was called.
assert executed_steps == expected_steps

2015
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff