1
0

Refactored code, added Dockerfile, replaced bash scripts with python alternatives, added README with instructions on running a pipeline

This commit is contained in:
2026-04-01 16:56:06 +02:00
parent ca116562fe
commit 686a458905
19 changed files with 1103 additions and 65 deletions

View File

@@ -0,0 +1,80 @@
#!/usr/bin/env python3
"""Concatenate merged_*.mp4 files into a single output using ffmpeg concat demuxer."""
from __future__ import annotations
import argparse
import logging
import re
import subprocess
import tempfile
from pathlib import Path
from logging_config import configure_logging
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_BASE_DIR = SCRIPT_DIR.parents[1]
DEFAULT_MERGED_DIR = DEFAULT_BASE_DIR / "merged"
DEFAULT_OUTPUT = DEFAULT_BASE_DIR / "results" / "run_3" / "final_output.mp4"
LOGGER = logging.getLogger(__name__)
def shot_number(path: Path) -> int:
match = re.search(r"merged_(\d+)\.mp4$", path.name)
return int(match.group(1)) if match else -1
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--merged-dir", type=Path, default=DEFAULT_MERGED_DIR)
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT)
parser.add_argument("--log-level", default="INFO")
return parser.parse_args()
def main() -> int:
args = parse_args()
configure_logging(args.log_level)
videos = sorted(args.merged_dir.glob("merged_*.mp4"), key=shot_number)
if not videos:
LOGGER.warning("No merged videos found in %s", args.merged_dir)
return 1
args.output.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as tmp:
filelist = Path(tmp.name)
for video in videos:
tmp.write(f"file '{video}'\\n")
try:
LOGGER.info("Concatenating the following files:\n%s", filelist.read_text().rstrip())
subprocess.run(
[
"ffmpeg",
"-f",
"concat",
"-safe",
"0",
"-i",
str(filelist),
"-c",
"copy",
"-y",
str(args.output),
],
check=True,
)
finally:
filelist.unlink(missing_ok=True)
LOGGER.info("Done")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
import json
import logging
import os
from pathlib import Path
from dotenv import load_dotenv
from elevenlabs.client import ElevenLabs
from logging_config import configure_logging
SCRIPT_DIR = Path(__file__).resolve().parent
PROJECT_ROOT = SCRIPT_DIR.parents[1]
load_dotenv(PROJECT_ROOT / ".env")
LOGGER = logging.getLogger(__name__)
def main() -> int:
configure_logging("INFO")
api_key = os.getenv("ELEVENLABS_API_KEY")
if not api_key:
raise RuntimeError("ELEVENLABS_API_KEY is not set")
reel_script = PROJECT_ROOT / "reel_script.json"
audios_dir = PROJECT_ROOT / "audios"
audios_dir.mkdir(parents=True, exist_ok=True)
reel_data = json.loads(reel_script.read_text())
client = ElevenLabs(api_key=api_key)
for shot in reel_data["shots"]:
shot_num = shot["shot_number"]
prompt = shot["voiceover"]
LOGGER.info("Generating audio for shot %s: %s", shot_num, prompt)
audio = client.text_to_speech.convert(
text=prompt,
voice_id="JBFqnCBsd6RMkjVDRZzb",
model_id="eleven_multilingual_v2",
output_format="mp3_44100_128",
)
audio_bytes = b"".join(audio)
out_path = audios_dir / f"output_{shot_num}.mp3"
out_path.write_bytes(audio_bytes)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,50 @@
from __future__ import annotations
import json
import logging
from pathlib import Path
import torch
from diffusers import FluxPipeline
from logging_config import configure_logging
SCRIPT_DIR = Path(__file__).resolve().parent
PROJECT_ROOT = SCRIPT_DIR.parents[1]
LOGGER = logging.getLogger(__name__)
def main() -> int:
configure_logging("INFO")
reel_script = PROJECT_ROOT / "reel_script.json"
images_dir = PROJECT_ROOT / "images"
images_dir.mkdir(parents=True, exist_ok=True)
reel_data = json.loads(reel_script.read_text())
pipe = FluxPipeline.from_pretrained(
"black-forest-labs/FLUX.1-schnell",
torch_dtype=torch.bfloat16,
)
pipe.enable_model_cpu_offload()
for shot in reel_data["shots"]:
shot_num = shot["shot_number"]
prompt = shot["image_description"]
LOGGER.info("Generating image for shot %s: %s", shot_num, prompt)
image = pipe(
prompt,
guidance_scale=0.0,
num_inference_steps=4,
max_sequence_length=256,
generator=torch.Generator("cpu").manual_seed(0),
).images[0]
image.save(images_dir / f"shot_{shot_num}.png")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,351 @@
import torch
import json
import logging
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import re
from typing import Optional
from logging_config import configure_logging
LOGGER = logging.getLogger(__name__)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
MODEL_ID = "Qwen/Qwen3-14B"
WORDS_PER_SECOND = 2.5
MAX_DEAD_AIR_SECONDS = 1
MAX_VOICEOVER_SECONDS = 5.0
MAX_VOICEOVER_WORDS = int(MAX_VOICEOVER_SECONDS * WORDS_PER_SECOND)
MIN_VOICEOVER_WORDS = 5
def load_model(model_id: str = MODEL_ID):
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
)
model = AutoModelForCausalLM.from_pretrained(
model_id,
quantization_config=bnb_config,
device_map="auto",
trust_remote_code=True,
).eval()
return model, tokenizer
def generate_reel_scenario(
model,
tokenizer,
content_summary: str,
temperature: float = 0.75,
top_p: float = 0.9,
repetition_penalty: float = 1.1,
) -> str:
"""
Generate a shot-by-shot Instagram Reel scenario where every script beat
has its own image description for AI image/video generation.
Each shot in the output contains:
- Timestamp
- Beat label (HOOK, PROBLEM, etc.)
- Voiceover line
- Text on screen
- Image description (AI-generation-ready prompt)
The hook question from the topic summary is preserved as the opening beat.
"""
system_prompt = (
"You are a professional Instagram Reel director and scriptwriter specializing in EdTech content. "
"You think like a filmmaker: every line of voiceover has a visual that amplifies it. "
"You write punchy, cinematic, scroll-stopping reels that feel native to social media. "
"You write image descriptions like a cinematographer briefing an AI image generator — "
"specific, vivid, atmospheric, with clear subject, composition, lighting, mood, and style."
)
user_prompt = f"""You are given a topic summary for an Instagram Reel.
Your job is to direct a complete shot-by-shot reel scenario — like a real filmmaker laying out a storyboard.
## TOPIC SUMMARY
{content_summary}
---
## PACING RULES — read these before writing a single shot
These rules exist because every shot in this reel will be rendered as a real video clip. Timestamps must be tight and honest.
**Speech rate:** spoken voiceover moves at roughly 2.5 words per second in a reel (energetic, not rushed).
**VOICEOVER LENGTH: vary naturally between 25 seconds per shot. Diversity is encouraged.**
Different beats call for different rhythms:
- A hook or payoff line can be short and punchy: 23 seconds (58 words). Let it land.
- A problem or tension beat needs more breath: 45 seconds (1012 words). Build the feeling.
- A CTA can be medium: 34 seconds (810 words). Direct and warm.
The one hard constraint is the 5-second ceiling — the audio renderer cannot handle more than 5 seconds per shot (12 words). Never exceed this. If an idea needs more than 12 words, split it into two shots.
VARIETY IN ACTION — a good reel sounds like this:
SHOT 1 (HOOK, 2s): "Will your major still matter in five years?" — short, punchy, stops the scroll
SHOT 2 (PROBLEM, 5s): "AI is already replacing writers, designers, and even doctors — fields we thought were safe." — builds tension
SHOT 3 (TENSION, 4s): "Most students have no idea this is already happening to them." — personal, lands hard
SHOT 4 (PAYOFF, 3s): "Will your major still matter? It depends on you." — echo + flip
SHOT 5 (SOLUTION, 5s): "The students who'll thrive are learning skills AI simply cannot replicate yet." — concrete, hopeful
SHOT 6 (CTA, 3s): "Follow us for tips on future-proofing your career." — warm, direct
Notice how the lengths vary. That variation is intentional — it creates rhythm and keeps the viewer engaged.
**Shot duration = voiceover duration + 01 second of breathing room.** Match the shot length to what you actually wrote, not to a preset slot.
**Continuity.** The end timestamp of one shot is the start timestamp of the next. No gaps, no overlaps. The reel runs like a stopwatch.
**Total reel length:** 4560 seconds. Count it up before you finalize. If you're over, trim voiceover. If you're under, add a shot.
**Shot count:** aim for 710 shots. More shots = faster pace = more energy. Fewer shots = slower, more contemplative. Match the tone of the topic.
---
## BRAND CONTEXT — read before writing any shot
These reels are published on the **LiveCarta** Instagram account. LiveCarta is an AI-powered EdTech platform for higher education that lets educators build custom coursepacks by mixing and matching chapters from top publishers, adding their own materials, video, and notes — all in one place. Students get flexible, affordable access: buy whole books or individual chapters, read online or via the app. Key features: AI Curriculum Builder, Remix-on-Demand, pay-by-chapter pricing, LMS integration (Canvas, Blackboard), and real-time content updates.
**Brand mention rules:**
1. **CTA beat — always name LiveCarta.** The final CTA shot(s) must mention LiveCarta by name. Reference it naturally in context — e.g. "Follow LiveCarta for more", "Check out LiveCarta", "LiveCarta helps students stay ahead."
2. **Mid-reel mention — only when there's a genuine, obvious overlap.** If the topic directly connects to what LiveCarta does, drop a natural product mention in the SOLUTION beat. Do NOT force it. The overlap is genuine when the topic is about: customizing course content, affordable textbooks, AI in education, flexible learning, keeping up with fast-changing fields, building skills alongside a degree. If the overlap isn't obvious, skip the mid-reel mention entirely — a forced plug feels fake and loses the audience.
3. **Never open with the brand.** The HOOK and PROBLEM beats must earn attention first. LiveCarta enters only once the viewer is already hooked.
Write shots that follow these beats in order. Each beat can be one or more shots — distribute them based on how long the idea takes to say out loud, not on a fixed slot.
HOOK — The hook question from the topic, spoken word-for-word. One punchy sentence. Creates immediate tension or curiosity.
PROBLEM — Build the relatable pain. Show the viewer their own situation. Make it vivid and specific, not abstract.
TENSION / RELATE — Go deeper into the problem. A concrete, personal detail that makes the viewer think "that's exactly me."
HOOK PAYOFF — Echo the opening hook question word-for-word, then immediately flip it with the insight. This is the emotional turning point.
SOLUTION — The concrete, actionable answer. Specific enough to be immediately useful.
CTA — Direct call to action. **Must name LiveCarta by name.** End with a follow or save prompt. Warm, not salesy.
## OUTPUT FORMAT
Output exactly two sections. Start immediately with the first separator — no preamble.
------------------------------------------------------------
SHOT LIST
For each shot use EXACTLY this format:
SHOT [N]
Timestamp: [start][end]
Beat: [BEAT LABEL]
Voiceover: [25 seconds of speech (512 words). Vary length by beat — short for hooks/payoffs, longer for problem/tension/solution. Hard ceiling: 12 words.]
Text on screen: [36 word punchy overlay that punches up the voiceover, not just repeats it]
Image description: [Standalone AI image generation prompt. Describe: subject, composition, camera angle, lighting, color palette, mood, visual style. Make it cinematic and specific. 24 sentences. No references to "the reel", "the previous shot", or any other shot.]
------------------------------------------------------------
CAPTION
Write a 45 line Instagram caption:
Line 1: hook statement (echoes the reel's opening question)
Lines 23: expand the insight in 12 casual, direct sentences
Line 4: engagement question to the audience
Line 5: 810 relevant hashtags
---
## FINAL CHECKS before outputting
- Does the voiceover length vary across shots? Hooks and payoffs should be short (23s), problems and solutions longer (45s).
- Does any voiceover exceed 12 words? If so, split that shot.
- Does every shot's timestamp connect cleanly to the next?
- Does the total add up to 4560 seconds?
- Does the CTA beat name LiveCarta explicitly?
- If the topic overlaps with LiveCarta's features, is there a natural mid-reel mention in the SOLUTION beat?
- Does the HOOK PAYOFF echo it verbatim before the flip?
- Is every Image description usable as a standalone AI generation prompt?
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
)
inputs = tokenizer(text, return_tensors="pt").to(model.device)
LOGGER.info("Generating reel scenario")
with torch.no_grad():
output_ids = model.generate(
**inputs,
max_new_tokens=2000,
temperature=temperature,
top_p=top_p,
repetition_penalty=repetition_penalty,
do_sample=True,
pad_token_id=tokenizer.eos_token_id,
)
generated_ids = output_ids[0][inputs["input_ids"].shape[1]:]
result = tokenizer.decode(generated_ids, skip_special_tokens=True)
return result
def _parse_timestamp_seconds(ts_str: str) -> tuple[Optional[int], Optional[int]]:
ts_str = re.sub(r'\s*[–—]+\s*', '-', ts_str).strip()
parts = ts_str.split('-')
if len(parts) != 2:
return None, None
def to_seconds(s: str) -> Optional[int]:
s = s.strip()
if ':' in s:
m, sec = s.split(':', 1)
try:
return int(m) * 60 + int(sec)
except ValueError:
return None
try:
return int(s)
except ValueError:
return None
return to_seconds(parts[0]), to_seconds(parts[1])
def extract_field(label: str, next_label: Optional[str], text: str) -> str:
if next_label:
pattern = rf'{label}:\s*(.*?)(?=\n{next_label}:|$)'
else:
pattern = rf'{label}:\s*(.*?)$'
m = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
if m:
return m.group(1).strip().strip('"')
return ""
def parse_reel_scenario(raw_scenario: str) -> dict:
"""
Parse the shot-by-shot reel scenario into a structured dict.
Returns:
{
"shots": [
{
"shot_number": 1,
"timestamp": "04",
"start_sec": 0,
"end_sec": 4,
"duration_sec": 4,
"beat": "HOOK",
"voiceover": "Will your major still matter in 5 years?",
"word_count": 9,
"speech_duration_sec": 3.6, # word_count / WORDS_PER_SECOND
"dead_air_sec": 0.4, # duration_sec - speech_duration_sec
"text_on_screen": "Your degree. Obsolete?",
"image_description": "Close-up of a university diploma ...",
},
...
],
"caption": {
"body": "Will your major still matter in 5 years? ...",
"hashtags": ["#EdTech", "#AIEducation", ...],
},
"total_duration_sec": 55,
}
"""
result = {
"shots": [],
"caption": {"body": "", "hashtags": []},
"total_duration_sec": 0,
}
raw_scenario = re.sub(r'<think>.*?</think>', '', raw_scenario, flags=re.DOTALL).strip()
cleaned = re.sub(r'\*+', '', raw_scenario)
shot_section = re.search(
r'SHOT LIST\s*\n(.*?)(?=-{4,}\s*CAPTION|$)',
cleaned, re.DOTALL | re.IGNORECASE
)
if shot_section:
shot_text = shot_section.group(1)
shot_blocks = re.split(r'\n(?=SHOT\s+\d+)', shot_text.strip())
for block in shot_blocks:
block = block.strip()
if not block:
continue
shot_num_match = re.match(r'SHOT\s+(\d+)', block)
if not shot_num_match:
continue
shot_number = int(shot_num_match.group(1))
timestamp = extract_field("Timestamp", "Beat", block)
beat = extract_field("Beat", "Voiceover", block)
voiceover = extract_field("Voiceover", "Text on screen", block)
text_on_screen = extract_field("Text on screen", "Image description", block)
image_description = extract_field("Image description", None, block)
timestamp_display = re.sub(r'\s*[–—-]+\s*', '', timestamp)
start_sec, end_sec = _parse_timestamp_seconds(timestamp)
duration_sec = (end_sec - start_sec) if (start_sec is not None and end_sec is not None) else None
word_count = len(voiceover.split()) if voiceover else 0
speech_duration = round(word_count / WORDS_PER_SECOND, 1)
dead_air = round(duration_sec - speech_duration, 1) if duration_sec is not None else None
result["shots"].append({
"shot_number": shot_number,
"timestamp": timestamp_display,
"start_sec": start_sec,
"end_sec": end_sec,
"duration_sec": duration_sec,
"beat": beat.upper(),
"voiceover": voiceover,
"word_count": word_count,
"speech_duration_sec": speech_duration,
"dead_air_sec": dead_air,
"text_on_screen": text_on_screen,
"image_description": image_description,
})
caption_section = re.search(
r'CAPTION\s*\n(.*?)$',
cleaned, re.DOTALL | re.IGNORECASE
)
if caption_section:
caption_text = caption_section.group(1).strip()
lines = [l.strip() for l in caption_text.splitlines() if l.strip()]
hashtag_line = next((l for l in lines if l.startswith("#")), "")
body_lines = [l for l in lines if not l.startswith("#")]
result["caption"] = {
"body": "\n".join(body_lines).strip().strip('"'),
"hashtags": re.findall(r'#\w+', hashtag_line),
}
return result
if __name__ == '__main__':
configure_logging("INFO")
with open("topic_description.txt", "r") as f:
topic = f.read()
model, tokenizer = load_model()
scenario_raw = generate_reel_scenario(model, tokenizer, topic)
parsed = parse_reel_scenario(scenario_raw)
with open("reel_script.json", "w") as f:
json.dump(parsed, f)

View File

@@ -0,0 +1,171 @@
#!/usr/bin/env python3
"""Generate shot videos with HunyuanVideo based on reel script and audio durations."""
from __future__ import annotations
import argparse
import json
import logging
import os
import subprocess
from pathlib import Path
from logging_config import configure_logging
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_BASE_DIR = SCRIPT_DIR.parents[1]
DEFAULT_HUNYUAN_DIR = DEFAULT_BASE_DIR / "HunyuanVideo-1.5"
DEFAULT_REEL_SCRIPT = DEFAULT_BASE_DIR / "reel_script.json"
DEFAULT_IMAGES_DIR = DEFAULT_BASE_DIR / "images"
DEFAULT_VIDEOS_DIR = DEFAULT_BASE_DIR / "videos"
DEFAULT_AUDIOS_DIR = DEFAULT_BASE_DIR / "audios"
LOGGER = logging.getLogger(__name__)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--hunyuan-dir", type=Path, default=DEFAULT_HUNYUAN_DIR)
parser.add_argument("--reel-script", type=Path, default=DEFAULT_REEL_SCRIPT)
parser.add_argument("--images-dir", type=Path, default=DEFAULT_IMAGES_DIR)
parser.add_argument("--videos-dir", type=Path, default=DEFAULT_VIDEOS_DIR)
parser.add_argument("--audios-dir", type=Path, default=DEFAULT_AUDIOS_DIR)
parser.add_argument("--seed", type=int, default=1)
parser.add_argument("--log-level", default="INFO")
return parser.parse_args()
def get_audio_duration(audio_path: Path) -> float:
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(audio_path),
],
check=True,
text=True,
capture_output=True,
)
return float(result.stdout.strip())
def duration_to_video_length(duration: float) -> int:
frames = int(duration * 24) + 1
if frames % 2 == 0:
frames += 1
return max(49, min(frames, 169))
def main() -> int:
args = parse_args()
configure_logging(args.log_level)
model_path = args.hunyuan_dir / "ckpts"
args.videos_dir.mkdir(parents=True, exist_ok=True)
env = os.environ.copy()
env["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True,max_split_size_mb:128"
data = json.loads(args.reel_script.read_text())
shots = data.get("shots", [])
LOGGER.info("Found %s shots to generate", len(shots))
for shot in shots:
shot_number = shot["shot_number"]
prompt = str(shot["image_description"]).replace("\t", " ").replace("\n", " ")
image_path = args.images_dir / f"shot_{shot_number}.png"
output_path = args.videos_dir / f"output_{shot_number}.mp4"
audio_path = args.audios_dir / f"output_{shot_number}.mp3"
if not audio_path.exists():
LOGGER.warning("No audio found at %s, falling back to 5s default", audio_path)
duration = 5.0
else:
duration = get_audio_duration(audio_path)
LOGGER.info("Audio duration for shot %s: %ss", shot_number, duration)
video_length = duration_to_video_length(duration)
LOGGER.info("Shot %s | %ss -> %s frames", shot_number, duration, video_length)
LOGGER.info("Prompt: %s", prompt)
LOGGER.info("Image: %s", image_path)
LOGGER.info("Audio: %s", audio_path)
LOGGER.info("Output: %s", output_path)
if output_path.exists():
LOGGER.info("Output path already exists, skipping")
continue
if not image_path.exists():
LOGGER.warning("Image not found at %s, skipped", image_path)
continue
subprocess.run(
[
"python3",
"-c",
"import torch; torch.cuda.empty_cache()",
],
check=True,
env=env,
)
LOGGER.info("GPU cache cleared")
subprocess.run(
[
"torchrun",
"--nproc_per_node=1",
"generate.py",
"--prompt",
prompt,
"--image_path",
str(image_path),
"--resolution",
"480p",
"--aspect_ratio",
"16:9",
"--seed",
str(args.seed),
"--video_length",
str(video_length),
"--rewrite",
"false",
"--cfg_distilled",
"true",
"--enable_step_distill",
"true",
"--sparse_attn",
"false",
"--use_sageattn",
"true",
"--enable_cache",
"false",
"--overlap_group_offloading",
"true",
"--sr",
"false",
"--output_path",
str(output_path),
"--model_path",
str(model_path),
],
check=True,
cwd=args.hunyuan_dir,
env=env,
)
LOGGER.info("Shot %s done", shot_number)
LOGGER.info("Done")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,13 @@
from __future__ import annotations
import logging
DEFAULT_LOG_FORMAT = "%(asctime)s | %(levelname)s | %(name)s | %(message)s"
def configure_logging(level: str = "INFO") -> None:
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format=DEFAULT_LOG_FORMAT,
)

View File

@@ -0,0 +1,85 @@
#!/usr/bin/env python3
"""Merge videos/output_n.mp4 with audios/output_n.mp3 into merged/merged_n.mp4."""
from __future__ import annotations
import argparse
import logging
import re
import subprocess
from pathlib import Path
from logging_config import configure_logging
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_BASE_DIR = SCRIPT_DIR.parents[1]
DEFAULT_VIDEOS_DIR = DEFAULT_BASE_DIR / "videos"
DEFAULT_AUDIOS_DIR = DEFAULT_BASE_DIR / "audios"
DEFAULT_OUTPUT_DIR = DEFAULT_BASE_DIR / "merged"
LOGGER = logging.getLogger(__name__)
def shot_number(path: Path) -> int:
match = re.search(r"output_(\d+)\.mp4$", path.name)
return int(match.group(1)) if match else -1
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--videos-dir", type=Path, default=DEFAULT_VIDEOS_DIR)
parser.add_argument("--audios-dir", type=Path, default=DEFAULT_AUDIOS_DIR)
parser.add_argument("--output-dir", type=Path, default=DEFAULT_OUTPUT_DIR)
parser.add_argument("--log-level", default="INFO")
return parser.parse_args()
def main() -> int:
args = parse_args()
configure_logging(args.log_level)
args.output_dir.mkdir(parents=True, exist_ok=True)
videos = sorted(args.videos_dir.glob("output_*.mp4"), key=shot_number)
if not videos:
LOGGER.warning("No videos found in %s", args.videos_dir)
return 1
for video in videos:
num = shot_number(video)
audio = args.audios_dir / f"output_{num}.mp3"
output = args.output_dir / f"merged_{num}.mp4"
if not audio.exists():
LOGGER.warning("No audio found for shot %s (%s); skipped", num, audio)
continue
if output.exists():
LOGGER.info("Already exists; skipped shot %s", num)
continue
LOGGER.info("Merging shot %s: %s + %s -> %s", num, video, audio, output)
subprocess.run(
[
"ffmpeg",
"-i",
str(video),
"-i",
str(audio),
"-c:v",
"copy",
"-c:a",
"aac",
"-shortest",
"-y",
str(output),
],
check=True,
)
LOGGER.info("Done: %s", output)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,163 @@
#!/usr/bin/env python3
"""Run the full video pipeline: generate, merge, and concatenate."""
from __future__ import annotations
import argparse
import logging
import os
import subprocess
import sys
from pathlib import Path
from logging_config import configure_logging
from s3_video_storage import S3VideoStorage
SCRIPT_DIR = Path(__file__).resolve().parent
PROJECT_ROOT = SCRIPT_DIR.parents[1]
DEFAULT_BASE_DIR = PROJECT_ROOT
DEFAULT_HUNYUAN_DIR = DEFAULT_BASE_DIR / "HunyuanVideo-1.5"
DEFAULT_REEL_SCRIPT = DEFAULT_BASE_DIR / "reel_script.json"
DEFAULT_IMAGES_DIR = DEFAULT_BASE_DIR / "images"
DEFAULT_VIDEOS_DIR = DEFAULT_BASE_DIR / "videos"
DEFAULT_AUDIOS_DIR = DEFAULT_BASE_DIR / "audios"
DEFAULT_MERGED_DIR = DEFAULT_BASE_DIR / "merged"
DEFAULT_OUTPUT = DEFAULT_BASE_DIR / "results" / "final_output.mp4"
LOGGER = logging.getLogger(__name__)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--base-dir", type=Path, default=DEFAULT_BASE_DIR)
parser.add_argument("--hunyuan-dir", type=Path, default=DEFAULT_HUNYUAN_DIR)
parser.add_argument("--reel-script", type=Path, default=DEFAULT_REEL_SCRIPT)
parser.add_argument("--images-dir", type=Path, default=DEFAULT_IMAGES_DIR)
parser.add_argument("--videos-dir", type=Path, default=DEFAULT_VIDEOS_DIR)
parser.add_argument("--audios-dir", type=Path, default=DEFAULT_AUDIOS_DIR)
parser.add_argument("--merged-dir", type=Path, default=DEFAULT_MERGED_DIR)
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT)
parser.add_argument("--seed", type=int, default=1)
parser.add_argument("--skip-generate", action="store_true")
parser.add_argument("--skip-merge", action="store_true")
parser.add_argument("--skip-concat", action="store_true")
parser.add_argument("--skip-s3-upload", action="store_true")
parser.add_argument("--log-level", default="INFO")
return parser.parse_args()
def run_step(name: str, cmd: list[str]) -> None:
LOGGER.info("=== %s ===", name)
LOGGER.info("$ %s", " ".join(str(part) for part in cmd))
subprocess.run(cmd, check=True)
def maybe_upload_to_s3(output_path: Path) -> None:
bucket = os.getenv("AWS_S3_BUCKET")
if not bucket:
LOGGER.warning("Skipping S3 upload: AWS_S3_BUCKET is not set")
return
storage = S3VideoStorage(
{
"bucket_name": bucket,
"region_name": os.getenv("AWS_REGION"),
"endpoint_url": os.getenv("AWS_S3_ENDPOINT_URL"),
"aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID"),
"aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY"),
"aws_session_token": os.getenv("AWS_SESSION_TOKEN"),
}
)
s3_uri = storage.store_file(output_path)
LOGGER.info("Uploaded output to %s", s3_uri)
def main() -> int:
args = parse_args()
configure_logging(args.log_level)
# If only base-dir is overridden, derive the common subpaths from it.
if args.base_dir != DEFAULT_BASE_DIR:
if args.hunyuan_dir == DEFAULT_HUNYUAN_DIR:
args.hunyuan_dir = args.base_dir / "HunyuanVideo-1.5"
if args.reel_script == DEFAULT_REEL_SCRIPT:
args.reel_script = args.base_dir / "reel_script.json"
if args.images_dir == DEFAULT_IMAGES_DIR:
args.images_dir = args.base_dir / "images"
if args.videos_dir == DEFAULT_VIDEOS_DIR:
args.videos_dir = args.base_dir / "videos"
if args.audios_dir == DEFAULT_AUDIOS_DIR:
args.audios_dir = args.base_dir / "audios"
if args.merged_dir == DEFAULT_MERGED_DIR:
args.merged_dir = args.base_dir / "merged"
if args.output == DEFAULT_OUTPUT:
args.output = args.base_dir / "results" / "final_output.mp4"
try:
if not args.skip_generate:
run_step(
"Generate Videos",
[
sys.executable,
str(SCRIPT_DIR / "generate_videos.py"),
"--hunyuan-dir",
str(args.hunyuan_dir),
"--reel-script",
str(args.reel_script),
"--images-dir",
str(args.images_dir),
"--videos-dir",
str(args.videos_dir),
"--audios-dir",
str(args.audios_dir),
"--seed",
str(args.seed),
],
)
if not args.skip_merge:
run_step(
"Merge Audio + Video",
[
sys.executable,
str(SCRIPT_DIR / "merge_audio_video.py"),
"--videos-dir",
str(args.videos_dir),
"--audios-dir",
str(args.audios_dir),
"--output-dir",
str(args.merged_dir),
],
)
if not args.skip_concat:
run_step(
"Concatenate Merged Videos",
[
sys.executable,
str(SCRIPT_DIR / "concat_merged.py"),
"--merged-dir",
str(args.merged_dir),
"--output",
str(args.output),
],
)
except subprocess.CalledProcessError as exc:
LOGGER.exception("Pipeline failed at command: %s", exc.cmd)
return exc.returncode
if not args.skip_s3_upload:
try:
maybe_upload_to_s3(args.output)
except Exception:
LOGGER.exception("Failed uploading output to S3")
return 1
LOGGER.info("Pipeline complete")
LOGGER.info("Final output: %s", args.output)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,71 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Mapping
import boto3
@dataclass(frozen=True)
class S3Config:
bucket_name: str
region_name: str | None = None
endpoint_url: str | None = None
aws_access_key_id: str | None = None
aws_secret_access_key: str | None = None
aws_session_token: str | None = None
class S3VideoStorage:
def __init__(self, s3_config: S3Config | Mapping[str, Any]) -> None:
self.config = self._normalize_config(s3_config)
client_kwargs: dict[str, Any] = {
"region_name": self.config.region_name,
"endpoint_url": self.config.endpoint_url,
"aws_access_key_id": self.config.aws_access_key_id,
"aws_secret_access_key": self.config.aws_secret_access_key,
"aws_session_token": self.config.aws_session_token,
}
filtered_kwargs = {k: v for k, v in client_kwargs.items() if v is not None}
self._s3_client = boto3.client("s3", **filtered_kwargs)
def store_file(self, file_path: str | Path) -> str:
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File does not exist: {path}")
if not path.is_file():
raise ValueError(f"Path is not a file: {path}")
now = datetime.now(timezone.utc)
key = f"video_content/{now.year:04d}/{now.month:02d}/{now.day:02d}/{path.name}"
self._s3_client.upload_file(str(path), self.config.bucket_name, key)
return f"s3://{self.config.bucket_name}/{key}"
@staticmethod
def _normalize_config(s3_config: S3Config | Mapping[str, Any]) -> S3Config:
if isinstance(s3_config, S3Config):
return s3_config
bucket_name = s3_config.get("bucket_name")
if not bucket_name:
raise ValueError("s3_config must contain non-empty 'bucket_name'")
return S3Config(
bucket_name=str(bucket_name),
region_name=_optional_str(s3_config, "region_name"),
endpoint_url=_optional_str(s3_config, "endpoint_url"),
aws_access_key_id=_optional_str(s3_config, "aws_access_key_id"),
aws_secret_access_key=_optional_str(s3_config, "aws_secret_access_key"),
aws_session_token=_optional_str(s3_config, "aws_session_token"),
)
def _optional_str(config: Mapping[str, Any], key: str) -> str | None:
value = config.get(key)
if value is None:
return None
return str(value)