Spaces:
Running
on
Zero
Running
on
Zero
| # app.py β ShortiFoley (Video -> Foley) | |
| # Created by bilsimaging.com | |
| import os | |
| # ---- Prefer safetensors for all HF model loads (fixes CLAP .bin crash on ZeroGPU) ---- | |
| os.environ.setdefault("HF_PREFER_SAFETENSORS", "1") | |
| import sys | |
| import io | |
| import json | |
| import uuid | |
| import time | |
| import shutil | |
| import base64 | |
| import random | |
| import tempfile | |
| import datetime | |
| from pathlib import Path | |
| from typing import List, Optional, Tuple, Dict | |
| import numpy as np | |
| import torch | |
| import torchaudio | |
| import gradio as gr | |
| from loguru import logger | |
| from huggingface_hub import snapshot_download | |
| import spaces # HF Spaces ZeroGPU & MCP integration | |
| # ------------------------- | |
| # Constants & configuration | |
| # ------------------------- | |
| ROOT = Path(__file__).parent.resolve() | |
| REPO_DIR = ROOT / "HunyuanVideo-Foley" | |
| WEIGHTS_DIR = Path(os.environ.get("HIFI_FOLEY_MODEL_PATH", str(ROOT / "weights"))) | |
| CONFIG_PATH = Path(os.environ.get("HIFI_FOLEY_CONFIG", str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml"))) | |
| OUTPUTS_DIR = Path(os.environ.get("OUTPUTS_DIR", str(ROOT / "outputs"))) | |
| OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) | |
| SPACE_TITLE = "π΅ ShortiFoley β HunyuanVideo-Foley" | |
| SPACE_TAGLINE = "Text/Video β Audio Foley. Created by bilsimaging.com" | |
| WATERMARK_NOTE = "Made with β€οΈ by bilsimaging.com" | |
| # Keep GPU <= 120s for ZeroGPU (default 110) | |
| GPU_DURATION = int(os.environ.get("GPU_DURATION_SECS", "110")) | |
| # Globals | |
| _model_dict = None | |
| _cfg = None | |
| _device: Optional[torch.device] = None | |
| # ------------ | |
| # Small helpers | |
| # ------------ | |
| def _setup_device(pref: str = "auto", gpu_id: int = 0) -> torch.device: | |
| """Pick CUDA if available, else MPS, else CPU.""" | |
| if pref == "auto": | |
| if torch.cuda.is_available(): | |
| d = torch.device(f"cuda:{gpu_id}") | |
| elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): | |
| d = torch.device("mps") | |
| else: | |
| d = torch.device("cpu") | |
| else: | |
| d = torch.device(pref) | |
| if d.type == "cuda": | |
| logger.info(f"Using CUDA {d}") | |
| else: | |
| logger.info(f"Using {d}") | |
| return d | |
| def _ensure_repo() -> None: | |
| """Shallow-clone Tencent repo with LFS smudge disabled (avoid LFS quota checkout).""" | |
| if REPO_DIR.exists(): | |
| return | |
| cmd = ( | |
| "GIT_LFS_SKIP_SMUDGE=1 " | |
| "git -c filter.lfs.smudge= -c filter.lfs.required=false " | |
| f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}" | |
| ) | |
| logger.info(f">> {cmd}") | |
| os.system(cmd) | |
| def _download_weights_if_needed() -> None: | |
| """Snapshot only needed files from HF weights/model hub.""" | |
| WEIGHTS_DIR.mkdir(parents=True, exist_ok=True) | |
| snapshot_download( | |
| repo_id="tencent/HunyuanVideo-Foley", | |
| local_dir=str(WEIGHTS_DIR), | |
| resume_download=True, | |
| allow_patterns=[ | |
| "hunyuanvideo_foley.pth", | |
| "synchformer_state_dict.pth", | |
| "vae_128d_48k.pth", | |
| "assets/*", | |
| "config.yaml", | |
| ], | |
| ) | |
| def prepare_once() -> None: | |
| _ensure_repo() | |
| _download_weights_if_needed() | |
| # ----------------------- | |
| # Model load & inference | |
| # ----------------------- | |
| def auto_load_models() -> str: | |
| """ | |
| Load HunyuanVideo-Foley + encoders on the chosen device. | |
| Ensures safetensors is preferred to avoid ZeroGPU issues with .bin checkpoints. | |
| """ | |
| global _model_dict, _cfg, _device | |
| if _model_dict is not None and _cfg is not None: | |
| return "Model already loaded." | |
| # Ensure Transformers prefers safetensors for everything: | |
| os.environ["HF_PREFER_SAFETENSORS"] = "1" | |
| sys.path.append(str(REPO_DIR)) | |
| from hunyuanvideo_foley.utils.model_utils import load_model | |
| _device = _setup_device("auto", 0) | |
| logger.info("Loading HunyuanVideo-Foley model...") | |
| logger.info(f"MODEL_PATH: {WEIGHTS_DIR}") | |
| logger.info(f"CONFIG_PATH: {CONFIG_PATH}") | |
| try: | |
| _model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device) | |
| return "β Model loaded." | |
| except OSError as e: | |
| # If any OSError (often from trying to read pytorch_model.bin), retry after enforcing safetensors. | |
| logger.error(str(e)) | |
| logger.info("Retrying load after enforcing safetensors preference...") | |
| os.environ["HF_PREFER_SAFETENSORS"] = "1" | |
| try: | |
| _model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device) | |
| return "β Model loaded (after safetensors retry)." | |
| except Exception as e2: | |
| logger.error(str(e2)) | |
| return f"β Failed to load model: {e2}" | |
| except Exception as e: | |
| logger.error(str(e)) | |
| return f"β Failed to load model: {e}" | |
| def _merge_audio_video(audio_path: str, video_path: str, out_path: str) -> None: | |
| """Use project's helper (preferred) with a fallback to ffmpeg via subprocess.""" | |
| sys.path.append(str(REPO_DIR)) | |
| try: | |
| from hunyuanvideo_foley.utils.media_utils import merge_audio_video | |
| merge_audio_video(audio_path, video_path, out_path) | |
| except Exception as e: | |
| logger.warning(f"merge_audio_video failed, falling back to ffmpeg: {e}") | |
| import subprocess | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-i", video_path, | |
| "-i", audio_path, | |
| "-c:v", "copy", | |
| "-c:a", "aac", | |
| "-shortest", | |
| out_path | |
| ] | |
| subprocess.run(cmd, check=True) | |
| def _save_outputs(video_src: str, audio_tensor: torch.Tensor, sr: int, idx: int, | |
| prompt: str) -> str: | |
| """Save WAV + MP4 in outputs/, add metadata and a small watermark note (metadata only).""" | |
| # torchaudio expects [C, N] | |
| if audio_tensor.ndim == 1: | |
| audio_tensor = audio_tensor.unsqueeze(0) | |
| tmpdir = Path(tempfile.mkdtemp()) | |
| wav_path = tmpdir / f"gen_{idx}.wav" | |
| torchaudio.save(str(wav_path), audio_tensor.cpu(), sr) | |
| ts = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f") | |
| base = f"shortifoley_{ts}_{idx}" | |
| out_mp4 = OUTPUTS_DIR / f"{base}.mp4" | |
| _merge_audio_video(str(wav_path), video_src, str(out_mp4)) | |
| # Save JSON sidecar | |
| meta = { | |
| "id": base, | |
| "created_utc": datetime.datetime.utcnow().isoformat() + "Z", | |
| "source_video": Path(video_src).name, | |
| "output_video": Path(out_mp4).name, | |
| "prompt": prompt or "", | |
| "watermark": WATERMARK_NOTE, | |
| "tool": "ShortiFoley (HunyuanVideo-Foley)" | |
| } | |
| (OUTPUTS_DIR / f"{base}.json").write_text(json.dumps(meta, ensure_ascii=False, indent=2)) | |
| return str(out_mp4) | |
| def _list_gallery(limit: int = 100) -> List[str]: | |
| vids = [] | |
| for p in sorted(OUTPUTS_DIR.glob("*.mp4"), key=lambda x: x.stat().st_mtime, reverse=True): | |
| vids.append(str(p)) | |
| if len(vids) >= limit: | |
| break | |
| return vids | |
| # ================ | |
| # Inference kernel | |
| # ================ | |
| def infer_single_video( | |
| video_file: str, | |
| text_prompt: str, | |
| guidance_scale: float = 4.5, | |
| num_inference_steps: int = 50, | |
| sample_nums: int = 1, | |
| ) -> Tuple[List[str], str]: | |
| """ | |
| Generate Foley audio for an uploaded video (1β6 variants). | |
| Returns: (list of output video paths, status message) | |
| """ | |
| if _model_dict is None or _cfg is None: | |
| return [], "β Load the model first (open the app once)." | |
| if not video_file: | |
| return [], "β Please provide a video." | |
| sys.path.append(str(REPO_DIR)) | |
| from hunyuanvideo_foley.utils.feature_utils import feature_process | |
| from hunyuanvideo_foley.utils.model_utils import denoise_process | |
| # preprocess | |
| visual_feats, text_feats, audio_len_s = feature_process( | |
| video_file, (text_prompt or "").strip(), _model_dict, _cfg | |
| ) | |
| # generate batch | |
| n = int(max(1, min(6, sample_nums))) | |
| audio, sr = denoise_process( | |
| visual_feats, | |
| text_feats, | |
| audio_len_s, | |
| _model_dict, | |
| _cfg, | |
| guidance_scale=float(guidance_scale), | |
| num_inference_steps=int(num_inference_steps), | |
| batch_size=n, | |
| ) | |
| # save results | |
| outs = [] | |
| for i in range(n): | |
| outs.append(_save_outputs(video_file, audio[i], sr, i + 1, text_prompt or "")) | |
| return outs, f"β Generated {len(outs)} result(s). Saved to {OUTPUTS_DIR}/" | |
| # ------------- | |
| # Gradio UI (with MCP+API inside the same app) | |
| # ------------- | |
| def _about_html() -> str: | |
| return f""" | |
| <div style="line-height:1.6"> | |
| <h2>About ShortiFoley</h2> | |
| <p><b>ShortiFoley</b> automatically generates realistic Foley soundtracks for short videos using | |
| Tencentβs HunyuanVideo-Foley with CLAP & SigLIP2 encoders. It includes autosave and an MCP server so | |
| you can call it from agents or workflows (e.g., n8n).</p> | |
| <p><b>Created by <a href="https://bilsimaging.com" target="_blank">bilsimaging.com</a></b></p> | |
| <h3>How to use</h3> | |
| <ol> | |
| <li>Upload a video (ideally < 120 seconds).</li> | |
| <li>Optionally enter a text description of the sound (English).</li> | |
| <li>Adjust CFG scale, steps, and number of variants.</li> | |
| <li>Click <b>Generate</b>. Results appear on the right and are stored in the Gallery.</li> | |
| </ol> | |
| <h3>Tips</h3> | |
| <ul> | |
| <li>Trim clips to the key action (5β30s) for faster, crisper results.</li> | |
| <li>Include material cues (βwoodβ, βmetalβ, βconcreteβ), action cues (βsplashβ, βglass shatterβ), and ambience (βroomyβ, βechoeyβ).</li> | |
| <li>Generate multiple variants and pick the most natural.</li> | |
| </ul> | |
| <h3>MCP / Automation</h3> | |
| <p>This app runs as an <b>MCP server</b>. Open the footer βView API β MCPβ to copy a ready config. You can also use the REST endpoints listed there. Perfect for n8n integrations.</p> | |
| <h3>Watermark</h3> | |
| <p>Each outputβs metadata includes: <i>{WATERMARK_NOTE}</i>. If you want a <b>visible video overlay</b>, I can add an ffmpeg overlay step on request.</p> | |
| </div> | |
| """ | |
| def create_ui() -> gr.Blocks: | |
| with gr.Blocks( | |
| title="ShortiFoley β HunyuanVideo-Foley", | |
| css=""" | |
| .main-header{ text-align:center; padding:1.2rem; border-radius:16px; background:linear-gradient(135deg,#667eea,#764ba2); color:white; } | |
| .card{ background:white; border:1px solid #e1e5e9; border-radius:16px; padding:1rem; box-shadow:0 8px 32px rgba(0,0,0,.06); } | |
| .generate-btn button{ font-weight:700; } | |
| """ | |
| ) as demo: | |
| gr.HTML(f"<div class='main-header'><h1>{SPACE_TITLE}</h1><p>{SPACE_TAGLINE}</p></div>") | |
| with gr.Tabs(): | |
| with gr.Tab("Run"): | |
| with gr.Row(): | |
| with gr.Column(scale=1, elem_classes=["card"]): | |
| gr.Markdown("### πΉ Input") | |
| video_input = gr.Video(label="Upload Video", height=300) | |
| text_input = gr.Textbox( | |
| label="π― Audio Description (optional, English)", | |
| placeholder="e.g., Rubber soles on wet tile, distant chatter.", | |
| lines=3 | |
| ) | |
| with gr.Row(): | |
| guidance_scale = gr.Slider(1.0, 10.0, value=4.5, step=0.1, label="CFG Scale") | |
| steps = gr.Slider(10, 100, value=50, step=5, label="Steps") | |
| samples = gr.Slider(1, 6, value=1, step=1, label="Variants") | |
| generate = gr.Button("π΅ Generate", variant="primary", elem_classes=["generate-btn"]) | |
| with gr.Column(scale=1, elem_classes=["card"]): | |
| gr.Markdown("### π₯ Result(s)") | |
| v1 = gr.Video(label="Sample 1", height=260, visible=True) | |
| v2 = gr.Video(label="Sample 2", height=160, visible=False) | |
| v3 = gr.Video(label="Sample 3", height=160, visible=False) | |
| v4 = gr.Video(label="Sample 4", height=160, visible=False) | |
| v5 = gr.Video(label="Sample 5", height=160, visible=False) | |
| v6 = gr.Video(label="Sample 6", height=160, visible=False) | |
| status = gr.Textbox(label="Status", interactive=False) | |
| # Generate handler | |
| def _process_and_update(video_file, text_prompt, cfg, nsteps, nsamples): | |
| outs, msg = infer_single_video(video_file, text_prompt, cfg, nsteps, nsamples) | |
| vis_updates = [] | |
| for i in range(6): | |
| if i < len(outs): | |
| vis_updates.append(gr.update(visible=True, value=outs[i])) | |
| else: | |
| vis_updates.append(gr.update(visible=False, value=None)) | |
| return (*vis_updates, msg) | |
| generate.click( | |
| fn=_process_and_update, | |
| inputs=[video_input, text_input, guidance_scale, steps, samples], | |
| outputs=[v1, v2, v3, v4, v5, v6, status], | |
| api_name="/infer", | |
| api_description="Generate Foley audio for an uploaded video. Returns up to 6 video+audio files." | |
| ) | |
| # Toggle visibility when # of samples changes | |
| def _toggle_vis(n): | |
| n = int(n) | |
| return [ | |
| gr.update(visible=True), | |
| gr.update(visible=n >= 2), | |
| gr.update(visible=n >= 3), | |
| gr.update(visible=n >= 4), | |
| gr.update(visible=n >= 5), | |
| gr.update(visible=n >= 6), | |
| ] | |
| samples.change(_toggle_vis, inputs=[samples], outputs=[v1, v2, v3, v4, v5, v6]) | |
| with gr.Tab("π Gallery"): | |
| gr.Markdown("Latest generated videos (autosaved to `outputs/`).") | |
| gallery = gr.Gallery( | |
| value=_list_gallery(), | |
| columns=3, | |
| preview=True, | |
| label="Saved Results" | |
| ) | |
| refresh = gr.Button("π Refresh Gallery") | |
| refresh.click(lambda: gr.update(value=_list_gallery()), outputs=[gallery]) | |
| with gr.Tab("βΉοΈ About"): | |
| gr.HTML(_about_html()) | |
| # Keep gallery in sync after generate | |
| generate.click(lambda: gr.update(value=_list_gallery()), outputs=[gallery]) | |
| # ----------------------- | |
| # MCP + REST API endpoints | |
| # ----------------------- | |
| def _download_to_tmp(url: str) -> str: | |
| try: | |
| import requests | |
| except Exception: | |
| raise RuntimeError("Missing dependency 'requests'. Add it to requirements.txt to use URL inputs.") | |
| r = requests.get(url, timeout=30) | |
| r.raise_for_status() | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") | |
| tmp.write(r.content) | |
| tmp.flush() | |
| tmp.close() | |
| return tmp.name | |
| def _maybe_from_base64(data_url_or_b64: str) -> str: | |
| b64 = data_url_or_b64 | |
| if data_url_or_b64.startswith("data:"): | |
| b64 = data_url_or_b64.split(",", 1)[-1] | |
| raw = base64.b64decode(b64) | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") | |
| tmp.write(raw) | |
| tmp.flush() | |
| tmp.close() | |
| return tmp.name | |
| def _normalize_video_input(video_url_or_b64: str) -> str: | |
| v = (video_url_or_b64 or "").strip() | |
| if v.startswith("http://") or v.startswith("https://"): | |
| return _download_to_tmp(v) | |
| return _maybe_from_base64(v) | |
| def api_generate_from_url( | |
| video_url_or_b64: str, | |
| text_prompt: str = "", | |
| guidance_scale: float = 4.5, | |
| num_inference_steps: int = 50, | |
| sample_nums: int = 1, | |
| ) -> Dict[str, List[str]]: | |
| if _model_dict is None or _cfg is None: | |
| raise RuntimeError("Model not loaded. Open the UI once or call /load_model tool.") | |
| local = _normalize_video_input(video_url_or_b64) | |
| outs, msg = infer_single_video(local, text_prompt, guidance_scale, num_inference_steps, sample_nums) | |
| return {"videos": outs, "message": msg} | |
| def load_model_tool() -> str: | |
| """Ensure model is loaded on server (MCP convenience).""" | |
| return auto_load_models() | |
| def shortifoley_status() -> str: | |
| """Return a simple readiness string for MCP clients.""" | |
| ready = _model_dict is not None and _cfg is not None | |
| dev = "cuda" if (_device and _device.type == "cuda") else ("mps" if (_device and _device.type == "mps") else "cpu") | |
| return f"ShortiFoley status: {'ready' if ready else 'loading'} | device={dev} | outputs={OUTPUTS_DIR}" | |
| def foley_prompt(name: str = "default") -> str: | |
| """Reusable guidance for describing sound ambience.""" | |
| return ( | |
| "Describe the expected environmental sound precisely. Mention material, rhythm, intensity, and ambience.\n" | |
| "Example: 'Soft leather footfalls on wet pavement with distant traffic hiss; occasional splashes.'" | |
| ) | |
| return demo | |
| def set_seeds(s: int = 1): | |
| random.seed(s) | |
| np.random.seed(s) | |
| torch.manual_seed(s) | |
| # ------------- | |
| # App bootstrap | |
| # ------------- | |
| if __name__ == "__main__": | |
| logger.remove() | |
| logger.add(lambda m: print(m, end=""), level="INFO") | |
| set_seeds(1) | |
| logger.info("===== Application Startup =====\n") | |
| prepare_once() | |
| # Ensure import paths after repo is present | |
| sys.path.append(str(REPO_DIR)) | |
| try: | |
| # Probe key modules early (better error surfacing) | |
| from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process # noqa: F401 | |
| from hunyuanvideo_foley.utils.feature_utils import feature_process # noqa: F401 | |
| from hunyuanvideo_foley.utils.media_utils import merge_audio_video # noqa: F401 | |
| except Exception as e: | |
| logger.warning(f"Repo imports not ready yet: {e}") | |
| msg = auto_load_models() | |
| if not msg.startswith("β "): | |
| logger.error(f"[BOOT][ERROR] auto_load_models() failed:\n{msg}") | |
| else: | |
| logger.info(msg) | |
| ui = create_ui() | |
| # Enable MCP server so tools/resources/prompts are discoverable | |
| ui.launch( | |
| server_name="0.0.0.0", | |
| share=False, | |
| show_error=True, | |
| mcp_server=True, # MCP on | |
| ) | |