Spaces:
Running
on
Zero
Running
on
Zero
| import os, sys, json, tempfile, subprocess, shutil, uuid | |
| from pathlib import Path | |
| from typing import Optional, Tuple, List | |
| import gradio as gr | |
| import spaces | |
| from huggingface_hub import snapshot_download | |
| from loguru import logger | |
| import torch, torchaudio | |
| # ========= Paths & Config ========= | |
| ROOT = Path(__file__).parent.resolve() | |
| REPO_DIR = ROOT / "HunyuanVideo-Foley" | |
| WEIGHTS_DIR = ROOT / "weights" | |
| CACHE_DIR = ROOT / "cache" | |
| OUT_DIR = ROOT / "outputs" | |
| ASSETS = ROOT / "assets" | |
| ASSETS.mkdir(exist_ok=True) | |
| APP_TITLE = os.environ.get("APP_TITLE", "Foley Studio · ZeroGPU") | |
| APP_TAGLINE = os.environ.get("APP_TAGLINE", "Generate scene-true foley for short clips (ZeroGPU-ready).") | |
| PRIMARY_COLOR = os.environ.get("PRIMARY_COLOR", "#6B5BFF") # UI accent only | |
| # ZeroGPU-safe defaults (tweak in Space Secrets if needed) | |
| MAX_SECS = int(os.environ.get("MAX_SECS", "15")) # keep clips short for ZeroGPU window | |
| TARGET_H = int(os.environ.get("TARGET_H", "480")) # downscale target height | |
| SR = int(os.environ.get("TARGET_SR", "48000")) # WAV sample rate | |
| ZEROGPU_DURATION = int(os.environ.get("ZEROGPU_DURATION", "110")) # must be <= platform limit | |
| def sh(cmd: str): | |
| print(">>", cmd) | |
| subprocess.run(cmd, shell=True, check=True) | |
| def ffprobe_duration(path: str) -> float: | |
| try: | |
| out = subprocess.check_output([ | |
| "ffprobe", "-v", "error", "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1", path | |
| ]).decode().strip() | |
| return float(out) | |
| except Exception: | |
| return 0.0 | |
| def _clone_without_lfs(): | |
| """ | |
| Clone repo while skipping LFS smudge to avoid demo video downloads. | |
| Falls back to sparse checkout with only essential paths. | |
| """ | |
| if REPO_DIR.exists(): | |
| return | |
| # Attempt 1: shallow clone with LFS disabled | |
| try: | |
| sh( | |
| "GIT_LFS_SKIP_SMUDGE=1 " | |
| "git -c filter.lfs.smudge= -c filter.lfs.required=false " | |
| f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}" | |
| ) | |
| assets = REPO_DIR / "assets" | |
| if assets.exists(): | |
| shutil.rmtree(assets, ignore_errors=True) | |
| return | |
| except subprocess.CalledProcessError as e: | |
| print("Shallow clone with LFS skipped failed, trying sparse checkout…", e) | |
| # Attempt 2: sparse checkout minimal files | |
| REPO_DIR.mkdir(parents=True, exist_ok=True) | |
| sh(f"git -C {REPO_DIR} init") | |
| sh( | |
| f"git -C {REPO_DIR} -c filter.lfs.smudge= -c filter.lfs.required=false " | |
| "remote add origin https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git" | |
| ) | |
| sh(f"git -C {REPO_DIR} config core.sparseCheckout true") | |
| sparse_file = REPO_DIR / ".git" / "info" / "sparse-checkout" | |
| sparse_file.parent.mkdir(parents=True, exist_ok=True) | |
| sparse_file.write_text("\n".join([ | |
| "hunyuanvideo_foley/", | |
| "configs/", | |
| "gradio_app.py", | |
| "requirements.txt", | |
| "LICENSE", | |
| "README.md", | |
| ]) + "\n") | |
| # Try main, fallback to master | |
| try: | |
| sh(f"git -C {REPO_DIR} fetch --depth 1 origin main") | |
| sh(f"git -C {REPO_DIR} checkout main") | |
| except subprocess.CalledProcessError: | |
| sh(f"git -C {REPO_DIR} fetch --depth 1 origin master") | |
| sh(f"git -C {REPO_DIR} checkout master") | |
| def prepare_once(): | |
| """Clone code (skip LFS), download weights, set env, prepare dirs.""" | |
| _clone_without_lfs() | |
| # Ensure we can import their package later | |
| if str(REPO_DIR) not in sys.path: | |
| sys.path.insert(0, str(REPO_DIR)) | |
| WEIGHTS_DIR.mkdir(parents=True, exist_ok=True) | |
| snapshot_download( | |
| repo_id="tencent/HunyuanVideo-Foley", | |
| local_dir=str(WEIGHTS_DIR), | |
| local_dir_use_symlinks=False, | |
| repo_type="model", | |
| resume_download=True, | |
| ) | |
| os.environ["HIFI_FOLEY_MODEL_PATH"] = str(WEIGHTS_DIR) | |
| CACHE_DIR.mkdir(exist_ok=True) | |
| OUT_DIR.mkdir(exist_ok=True) | |
| prepare_once() | |
| try: | |
| import audiotools | |
| except Exception as e: | |
| raise RuntimeError( | |
| "Missing module 'audiotools'. Install it via the PyPI package " | |
| "'descript-audiotools' (e.g., add 'descript-audiotools>=0.7.2' " | |
| "to requirements.txt) and restart the Space." | |
| ) from e | |
| try: | |
| import omegaconf # noqa: F401 | |
| import yaml # from pyyaml | |
| import easydict # noqa: F401 | |
| except Exception as e: | |
| raise RuntimeError( | |
| "Missing config deps. Please add to requirements.txt: " | |
| "'omegaconf>=2.3.0', 'pyyaml', and 'easydict'." | |
| ) from e | |
| # Now safe to import Tencent internals | |
| from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process | |
| from hunyuanvideo_foley.utils.feature_utils import feature_process | |
| from hunyuanvideo_foley.utils.media_utils import merge_audio_video | |
| # ========= Native Model Setup ========= | |
| MODEL_PATH = os.environ.get("HIFI_FOLEY_MODEL_PATH", str(WEIGHTS_DIR)) | |
| CONFIG_PATH = str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml") | |
| _model_dict = None | |
| _cfg = None | |
| _device = None | |
| def _setup_device(device_str: str = "auto", gpu_id: int = 0) -> torch.device: | |
| if device_str == "auto": | |
| if torch.cuda.is_available(): | |
| d = torch.device(f"cuda:{gpu_id}") | |
| logger.info(f"Using CUDA {d}") | |
| elif torch.backends.mps.is_available(): | |
| d = torch.device("mps") | |
| logger.info("Using MPS") | |
| else: | |
| d = torch.device("cpu") | |
| logger.info("Using CPU") | |
| else: | |
| d = torch.device(device_str if device_str != "cuda" else f"cuda:{gpu_id}") | |
| logger.info(f"Using specified device: {d}") | |
| return d | |
| def auto_load_models() -> str: | |
| """Load model natively (weights already downloaded to MODEL_PATH).""" | |
| global _model_dict, _cfg, _device | |
| if not os.path.exists(MODEL_PATH): | |
| os.makedirs(MODEL_PATH, exist_ok=True) | |
| if not os.path.exists(CONFIG_PATH): | |
| return f"❌ Config file not found: {CONFIG_PATH}" | |
| _device = _setup_device("auto", 0) | |
| logger.info("Loading HunyuanVideo-Foley model...") | |
| logger.info(f"MODEL_PATH: {MODEL_PATH}") | |
| logger.info(f"CONFIG_PATH: {CONFIG_PATH}") | |
| _model_dict, _cfg = load_model(MODEL_PATH, CONFIG_PATH, _device) | |
| logger.info("✅ Model loaded") | |
| return "✅ Model loaded" | |
| # Init logger and load model once | |
| logger.remove() | |
| logger.add(lambda msg: print(msg, end=''), level="INFO") | |
| logger.info(auto_load_models()) | |
| # ========= Preprocessing ========= | |
| def preprocess_video(in_path: str) -> Tuple[str, float]: | |
| """ | |
| - Validate/trim to <= MAX_SECS. | |
| - Downscale to TARGET_H (keep AR), strip original audio. | |
| - Return processed mp4 path and final duration. | |
| """ | |
| dur = ffprobe_duration(in_path) | |
| if dur == 0: | |
| raise RuntimeError("Unable to read the video duration.") | |
| temp_dir = Path(tempfile.mkdtemp(prefix="pre_")) | |
| trimmed = temp_dir / "trim.mp4" | |
| processed = temp_dir / "proc.mp4" | |
| trim_args = ["-t", str(MAX_SECS)] if dur > MAX_SECS else [] | |
| # Normalize container & remove audio | |
| sh(" ".join([ | |
| "ffmpeg", "-y", "-i", f"\"{in_path}\"", | |
| *trim_args, | |
| "-an", | |
| "-vcodec", "libx264", "-preset", "veryfast", "-crf", "23", | |
| "-movflags", "+faststart", | |
| f"\"{trimmed}\"" | |
| ])) | |
| # Downscale to TARGET_H; ensure mod2 width, baseline profile | |
| vf = f"scale=-2:{TARGET_H}:flags=bicubic" | |
| sh(" ".join([ | |
| "ffmpeg", "-y", "-i", f"\"{trimmed}\"", | |
| "-vf", f"\"{vf}\"", | |
| "-an", | |
| "-vcodec", "libx264", "-profile:v", "baseline", "-level", "3.1", | |
| "-pix_fmt", "yuv420p", | |
| "-preset", "veryfast", "-crf", "24", | |
| "-movflags", "+faststart", | |
| f"\"{processed}\"" | |
| ])) | |
| final_dur = min(dur, float(MAX_SECS)) | |
| return str(processed), final_dur | |
| # ========= Inference (ZeroGPU) ========= | |
| # tune via env if needed | |
| def run_model(video_path: str, prompt_text: str, | |
| guidance_scale: float = 4.5, | |
| num_inference_steps: int = 50, | |
| sample_nums: int = 1) -> Tuple[List[str], int]: | |
| """ | |
| Native inference (no shell). Returns ([wav_paths], sample_rate). | |
| """ | |
| if _model_dict is None or _cfg is None: | |
| raise RuntimeError("Model not loaded yet.") | |
| text_prompt = (prompt_text or "").strip() | |
| # Extract features | |
| visual_feats, text_feats, audio_len_s = feature_process( | |
| video_path, text_prompt, _model_dict, _cfg | |
| ) | |
| # Generate audio (B x C x T) | |
| logger.info(f"Generating {sample_nums} sample(s)...") | |
| audio_batch, sr = denoise_process( | |
| visual_feats, text_feats, audio_len_s, _model_dict, _cfg, | |
| guidance_scale=guidance_scale, | |
| num_inference_steps=num_inference_steps, | |
| batch_size=sample_nums | |
| ) | |
| # Save each sample as WAV | |
| out_dir = OUT_DIR / f"job_{uuid.uuid4().hex[:8]}" | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| wav_paths = [] | |
| for i in range(sample_nums): | |
| wav_p = out_dir / f"generated_audio_{i+1}.wav" | |
| torchaudio.save(str(wav_p), audio_batch[i], sr) | |
| wav_paths.append(str(wav_p)) | |
| return wav_paths, sr | |
| # ========= Optional: Mux Foley back to video ========= | |
| def mux_audio_with_video(video_path: str, audio_path: str) -> str: | |
| out_path = Path(tempfile.mkdtemp(prefix="mux_")) / "with_foley.mp4" | |
| sh(" ".join([ | |
| "ffmpeg", "-y", | |
| "-i", f"\"{video_path}\"", | |
| "-i", f"\"{audio_path}\"", | |
| "-map", "0:v:0", "-map", "1:a:0", | |
| "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", | |
| "-shortest", | |
| f"\"{out_path}\"" | |
| ])) | |
| return str(out_path) | |
| # ========= UI Handlers ========= | |
| def single_generate(video: str, prompt: str, want_mux: bool, project_name: str): | |
| history = [] | |
| try: | |
| if not video: | |
| return None, None, "⚠️ Please upload a video.", history | |
| history.append(["Preprocess", "Downscaling & trimming"]) | |
| pre_path, final_dur = preprocess_video(video) | |
| history.append(["Inference", "ZeroGPU native pipeline"]) | |
| wav_list, sr = run_model( | |
| pre_path, prompt or "", guidance_scale=4.5, num_inference_steps=50, sample_nums=1 | |
| ) | |
| if not wav_list: | |
| raise RuntimeError("No audio produced.") | |
| wav = wav_list[0] | |
| muxed = None | |
| if want_mux: | |
| history.append(["Mux", "Merging foley with video"]) | |
| muxed = mux_audio_with_video(pre_path, wav) | |
| history.append(["Done", f"OK · ~{final_dur:.1f}s"]) | |
| return wav, muxed, f"✅ Completed (~{final_dur:.1f}s)", history | |
| except Exception as e: | |
| history.append(["Error", str(e)]) | |
| return None, None, f"❌ {type(e).__name__}: {e}", history | |
| def batch_lite_generate(files: List[str], prompt: str, want_mux: bool): | |
| log = [] | |
| if not files: | |
| return "⚠️ Please upload 1–3 videos.", log | |
| if len(files) > 3: | |
| files = files[:3] | |
| log.append(["Info", "Limiting to first 3 videos."]) | |
| outputs = [] | |
| for i, f in enumerate(files, 1): | |
| try: | |
| log.append([f"Preprocess {i}", Path(f).name]) | |
| pre, final_dur = preprocess_video(f) | |
| log.append([f"Run {i}", f"ZeroGPU ~{final_dur:.1f}s"]) | |
| wav_list, sr = run_model(pre, prompt or "", sample_nums=1) | |
| if not wav_list: | |
| raise RuntimeError("No audio produced.") | |
| wav = wav_list[0] | |
| muxed = mux_audio_with_video(pre, wav) if want_mux else None | |
| outputs.append((wav, muxed)) | |
| log.append([f"Done {i}", "OK"]) | |
| except Exception as e: | |
| log.append([f"Error {i}", str(e)]) | |
| manifest = OUT_DIR / f"batchlite_{uuid.uuid4().hex[:6]}.json" | |
| manifest.write_text(json.dumps( | |
| [{"wav": w, "video": v} for (w, v) in outputs], ensure_ascii=False, indent=2 | |
| )) | |
| return f"✅ Batch-lite finished · items: {len(outputs)}", log | |
| # ========= UI (refreshed design) ========= | |
| THEME_CSS = f""" | |
| :root {{ | |
| --brand: {PRIMARY_COLOR}; | |
| --bg: #0f1120; | |
| --panel: #181a2e; | |
| --text: #edf0ff; | |
| --muted: #b7bce3; | |
| --card: #15172a; | |
| }} | |
| .gradio-container {{ | |
| font-family: Inter, ui-sans-serif, -apple-system, Segoe UI, Roboto, Cairo, Noto Sans, Arial; | |
| background: var(--bg); | |
| color: var(--text); | |
| }} | |
| #hero {{ | |
| background: linear-gradient(135deg, var(--brand) 0%, #2f2e8b 40%, #1b1a3a 100%); | |
| border-radius: 18px; | |
| padding: 18px 20px; | |
| color: white; | |
| box-shadow: 0 10px 30px rgba(0,0,0,.35); | |
| }} | |
| #hero h1 {{ | |
| margin: 0 0 6px 0; | |
| font-size: 20px; | |
| font-weight: 700; | |
| letter-spacing: .2px; | |
| }} | |
| #hero p {{ | |
| margin: 0; | |
| opacity: .95; | |
| }} | |
| .gr-tabitem, .gr-block.gr-group, .gr-panel {{ | |
| background: var(--panel); | |
| border-radius: 16px !important; | |
| box-shadow: 0 6px 18px rgba(0,0,0,.28); | |
| border: 1px solid rgba(255,255,255,.04); | |
| }} | |
| .gr-button {{ | |
| border-radius: 12px !important; | |
| border: 1px solid rgba(255,255,255,.08) !important; | |
| }} | |
| .gradio-container .tabs .tab-nav button.selected {{ | |
| background: rgba(255,255,255,.06); | |
| border-radius: 12px; | |
| border: 1px solid rgba(255,255,255,.08); | |
| }} | |
| .badge {{ | |
| display:inline-block; padding:2px 8px; border-radius:999px; | |
| background: rgba(255,255,255,.12); color:#fff; font-size:12px | |
| }} | |
| """ | |
| with gr.Blocks(css=THEME_CSS, title=APP_TITLE, analytics_enabled=False) as demo: | |
| with gr.Row(): | |
| gr.HTML(f""" | |
| <div id="hero"> | |
| <h1>{APP_TITLE}</h1> | |
| <p>{APP_TAGLINE}</p> | |
| <div style="margin-top:8px"><span class="badge">ZeroGPU</span> <span class="badge">Auto-trim ≤ {MAX_SECS}s</span> <span class="badge">Downscale {TARGET_H}p</span></div> | |
| </div> | |
| """) | |
| with gr.Tabs(): | |
| with gr.Tab("🎬 Single Clip"): | |
| with gr.Group(): | |
| project_name = gr.Textbox( | |
| label="Project name (optional)", | |
| placeholder="Enter a short label for this clip" | |
| ) | |
| with gr.Row(): | |
| v_single = gr.Video(label=f"Video (≤ ~{MAX_SECS}s recommended)") | |
| p_single = gr.Textbox( | |
| label="Sound prompt (optional)", | |
| placeholder="e.g., soft footsteps on wood, light rain, indoor reverb" | |
| ) | |
| with gr.Row(): | |
| want_mux_single = gr.Checkbox(value=True, label="Mux foley into MP4 output") | |
| run_btn = gr.Button("Generate", variant="primary") | |
| with gr.Row(): | |
| out_audio = gr.Audio(label=f"Generated Foley ({SR//1000} kHz WAV)", type="filepath") | |
| out_mux = gr.Video(label="Video + Foley (MP4)", visible=True) | |
| status_md = gr.Markdown() | |
| history_table = gr.Dataframe( | |
| headers=["Step", "Note"], datatype=["str","str"], | |
| interactive=False, wrap=True, label="Activity" | |
| ) | |
| run_btn.click( | |
| single_generate, | |
| inputs=[v_single, p_single, want_mux_single, project_name], | |
| outputs=[out_audio, out_mux, status_md, history_table] | |
| ) | |
| with gr.Tab("📦 Batch-Lite (1–3 clips)"): | |
| files = gr.Files(label="Upload 1–3 short videos", file_types=[".mp4",".mov"], file_count="multiple") | |
| prompt_b = gr.Textbox(label="Global prompt (optional)") | |
| want_mux_b = gr.Checkbox(value=True, label="Mux each output") | |
| go_b = gr.Button("Run batch-lite") | |
| batch_status = gr.Markdown() | |
| batch_log = gr.Dataframe( | |
| headers=["Step","Note"], datatype=["str","str"], | |
| interactive=False, wrap=True, label="Batch Log" | |
| ) | |
| go_b.click( | |
| batch_lite_generate, | |
| inputs=[files, prompt_b, want_mux_b], | |
| outputs=[batch_status, batch_log] | |
| ) | |
| with gr.Tab("ℹ️ Tips"): | |
| gr.Markdown(f""" | |
| **Usage guidelines** | |
| - Keep clips short (the tool trims to **≤ {MAX_SECS}s** automatically). | |
| - The video is downscaled to **{TARGET_H}p** to fit the ZeroGPU time window. | |
| - If you see a quota message, try again later (ZeroGPU limits GPU minutes per visitor). | |
| **Outputs** | |
| - WAV is **{SR//1000} kHz** stereo. | |
| - Enable **Mux** to get a ready MP4 with the generated foley track. | |
| """) | |
| demo.queue(max_size=24).launch() | |