Spaces:
Running
on
Zero
Running
on
Zero
| # Created by bilsimaging.com | |
| import os | |
| os.environ.setdefault("HF_PREFER_SAFETENSORS", "1") | |
| import sys | |
| import json | |
| import uuid | |
| import time | |
| import shutil | |
| import base64 | |
| import random | |
| import tempfile | |
| import datetime | |
| from pathlib import Path | |
| from typing import List, Optional, Tuple, Dict | |
| import numpy as np | |
| import torch | |
| import torchaudio | |
| import gradio as gr | |
| from loguru import logger | |
| from huggingface_hub import snapshot_download | |
| import spaces | |
| # ------------------------- | |
| # Constants & configuration | |
| # ------------------------- | |
| ROOT = Path(__file__).parent.resolve() | |
| REPO_DIR = ROOT / "HunyuanVideo-Foley" | |
| WEIGHTS_DIR = Path(os.environ.get("HIFI_FOLEY_MODEL_PATH", str(ROOT / "weights"))) | |
| CONFIG_PATH = Path(os.environ.get("HIFI_FOLEY_CONFIG", str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml"))) | |
| OUTPUTS_DIR = Path(os.environ.get("OUTPUTS_DIR", str(ROOT / "outputs" / "autosaved"))) | |
| OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) | |
| SPACE_TITLE = "🎵 Shorti Foley Sound— HunyuanVideo-Foley" | |
| SPACE_TAGLINE = "Bring your videos to life with AI-powered Foley Sound" | |
| WATERMARK_NOTE = "Made with ❤️ by bilsimaging.com" | |
| # ZeroGPU limit | |
| GPU_DURATION = int(os.environ.get("GPU_DURATION_SECS", "120")) | |
| # Globals | |
| _model_dict = None | |
| _cfg = None | |
| _device: Optional[torch.device] = None | |
| # ------------ | |
| # Small helpers | |
| # ------------ | |
| def _setup_device(pref: str = "cpu", gpu_id: int = 0) -> torch.device: | |
| """ | |
| Pick device safely. | |
| IMPORTANT: Do NOT query torch.cuda.is_available() in main/non-GPU processes | |
| on Stateless GPU Spaces. Only set CUDA when called from a @spaces.GPU context. | |
| """ | |
| if pref.startswith("cuda"): | |
| d = torch.device(f"cuda:{gpu_id}") | |
| elif pref == "mps": | |
| d = torch.device("mps") | |
| else: | |
| d = torch.device("cpu") | |
| logger.info(f"Using {d}") | |
| return d | |
| def _ensure_repo() -> None: | |
| """Shallow-clone Tencent repo with LFS smudge disabled (avoid LFS quota checkout).""" | |
| if REPO_DIR.exists(): | |
| return | |
| cmd = ( | |
| "GIT_LFS_SKIP_SMUDGE=1 " | |
| "git -c filter.lfs.smudge= -c filter.lfs.required=false " | |
| f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}" | |
| ) | |
| logger.info(f">> {cmd}") | |
| os.system(cmd) | |
| def _download_weights_if_needed() -> None: | |
| """Snapshot only needed files from HF weights/model hub.""" | |
| WEIGHTS_DIR.mkdir(parents=True, exist_ok=True) | |
| snapshot_download( | |
| repo_id="tencent/HunyuanVideo-Foley", | |
| local_dir=str(WEIGHTS_DIR), | |
| resume_download=True, | |
| allow_patterns=[ | |
| "hunyuanvideo_foley.pth", | |
| "synchformer_state_dict.pth", | |
| "vae_128d_48k.pth", | |
| "assets/*", | |
| "config.yaml", | |
| ], | |
| ) | |
| def prepare_once() -> None: | |
| _ensure_repo() | |
| _download_weights_if_needed() | |
| # ----------------------- | |
| # Model load & inference | |
| # ----------------------- | |
| def auto_load_models(device_str: str = "cpu") -> str: | |
| """ | |
| Load HunyuanVideo-Foley + encoders on the chosen device. | |
| Use device_str="cuda" ONLY inside @spaces.GPU function to avoid CUDA init in main process. | |
| """ | |
| global _model_dict, _cfg, _device | |
| if _model_dict is not None and _cfg is not None: | |
| return "✅ Model already loaded." | |
| # Make absolutely sure safetensors is preferred | |
| os.environ["HF_PREFER_SAFETENSORS"] = "1" | |
| sys.path.append(str(REPO_DIR)) | |
| from hunyuanvideo_foley.utils.model_utils import load_model | |
| _device = _setup_device(device_str, 0) | |
| logger.info("Loading HunyuanVideo-Foley model...") | |
| logger.info(f"MODEL_PATH: {WEIGHTS_DIR}") | |
| logger.info(f"CONFIG_PATH: {CONFIG_PATH}") | |
| try: | |
| _model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device) | |
| return "✅ Model loaded." | |
| except OSError as e: | |
| logger.error(str(e)) | |
| logger.info("Retrying after enforcing safetensors preference...") | |
| os.environ["HF_PREFER_SAFETENSORS"] = "1" | |
| try: | |
| _model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device) | |
| return "✅ Model loaded (after safetensors retry)." | |
| except Exception as e2: | |
| logger.error(str(e2)) | |
| return f"❌ Failed to load model: {e2}" | |
| except Exception as e: | |
| logger.error(str(e)) | |
| return f"❌ Failed to load model: {e}" | |
| def _merge_audio_video(audio_path: str, video_path: str, out_path: str) -> None: | |
| """Preferred: project’s util; fallback to ffmpeg.""" | |
| sys.path.append(str(REPO_DIR)) | |
| try: | |
| from hunyuanvideo_foley.utils.media_utils import merge_audio_video | |
| merge_audio_video(audio_path, video_path, out_path) | |
| except Exception as e: | |
| logger.warning(f"merge_audio_video failed, falling back to ffmpeg: {e}") | |
| import subprocess | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-i", video_path, | |
| "-i", audio_path, | |
| "-c:v", "copy", | |
| "-c:a", "aac", | |
| "-shortest", | |
| out_path | |
| ] | |
| subprocess.run(cmd, check=True) | |
| def _save_outputs(video_src: str, audio_tensor: torch.Tensor, sr: int, idx: int, | |
| prompt: str) -> str: | |
| """Save WAV + MP4 in autosaved/, add metadata with a soft watermark note.""" | |
| # torchaudio expects [C, N] | |
| if audio_tensor.ndim == 1: | |
| audio_tensor = audio_tensor.unsqueeze(0) | |
| tmpdir = Path(tempfile.mkdtemp()) | |
| wav_path = tmpdir / f"gen_{idx}.wav" | |
| torchaudio.save(str(wav_path), audio_tensor.cpu(), sr) | |
| ts = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f") | |
| base = f"shortifoley_{ts}_{idx}" | |
| out_mp4 = OUTPUTS_DIR / f"{base}.mp4" | |
| _merge_audio_video(str(wav_path), video_src, str(out_mp4)) | |
| # Sidecar JSON | |
| meta = { | |
| "id": base, | |
| "created_utc": datetime.datetime.utcnow().isoformat() + "Z", | |
| "source_video": Path(video_src).name, | |
| "output_video": Path(out_mp4).name, | |
| "prompt": prompt or "", | |
| "watermark_note": WATERMARK_NOTE, | |
| "tool": "ShortiFoley (HunyuanVideo-Foley)" | |
| } | |
| (OUTPUTS_DIR / f"{base}.json").write_text(json.dumps(meta, ensure_ascii=False, indent=2)) | |
| return str(out_mp4) | |
| def _list_gallery(limit: int = 100) -> List[str]: | |
| vids = [] | |
| for p in sorted(OUTPUTS_DIR.glob("*.mp4"), key=lambda x: x.stat().st_mtime, reverse=True): | |
| vids.append(str(p)) | |
| if len(vids) >= limit: | |
| break | |
| return vids | |
| # ================ | |
| # Inference kernel | |
| # ================ | |
| def infer_single_video( | |
| video_file: str, | |
| text_prompt: str, | |
| guidance_scale: float = 4.5, | |
| num_inference_steps: int = 50, | |
| sample_nums: int = 1, | |
| ) -> Tuple[List[str], str]: | |
| """ | |
| Generate Foley audio for an uploaded video (1–6 variants). | |
| Returns: (list of output video paths, status message) | |
| """ | |
| # Lazy-load on GPU | |
| if _model_dict is None or _cfg is None: | |
| msg = auto_load_models(device_str="cuda") | |
| if not str(msg).startswith("✅"): | |
| return [], f"❌ {msg}" | |
| if not video_file: | |
| return [], "❌ Please provide a video." | |
| sys.path.append(str(REPO_DIR)) | |
| from hunyuanvideo_foley.utils.feature_utils import feature_process | |
| from hunyuanvideo_foley.utils.model_utils import denoise_process | |
| # preprocess | |
| visual_feats, text_feats, audio_len_s = feature_process( | |
| video_file, (text_prompt or "").strip(), _model_dict, _cfg | |
| ) | |
| # generate batch | |
| n = int(max(1, min(6, sample_nums))) | |
| audio, sr = denoise_process( | |
| visual_feats, | |
| text_feats, | |
| audio_len_s, | |
| _model_dict, | |
| _cfg, | |
| guidance_scale=float(guidance_scale), | |
| num_inference_steps=int(num_inference_steps), | |
| batch_size=n, | |
| ) | |
| # save results | |
| outs = [] | |
| for i in range(n): | |
| outs.append(_save_outputs(video_file, audio[i], sr, i + 1, text_prompt or "")) | |
| return outs, f"✅ Generated {len(outs)} result(s). Saved to {OUTPUTS_DIR}/" | |
| # ------------- | |
| # Gradio UI | |
| # ------------- | |
| def _about_html() -> str: | |
| return f""" | |
| <div style="line-height:1.6"> | |
| <h2>About ShortiFoley</h2> | |
| <p><b>ShortiFoley</b> turns short videos into realistic Foley sound.<br/> | |
| Powered by Tencent’s HunyuanVideo-Foley (SigLIP2 + CLAP), with autosave and an MCP server for automation | |
| (<a href="https://n8n.partnerlinks.io/bilsimaging" target="_blank" rel="noopener">n8n</a> flows).</p> | |
| <p><b>Created by <a href="https://bilsimaging.com" target="_blank" rel="noopener">bilsimaging.com</a></b></p> | |
| <h3>Quick Steps</h3> | |
| <ol> | |
| <li>Upload a clip (ideally < 120s).</li> | |
| <li>Optionally describe the sound (English).</li> | |
| <li>Pick variants (1–6), adjust CFG and steps.</li> | |
| <li>Hit <b>Generate</b>. Results show on the right and save into the Gallery.</li> | |
| </ol> | |
| <h3>Tips for Best Quality</h3> | |
| <ul> | |
| <li>Use tight clips (5–30s) around the action.</li> | |
| <li>Include material & action cues: “metal clang”, “glass shatter”, “rubber on wet tile”.</li> | |
| <li>Describe ambience: “roomy”, “echoey”, “distant crowd”.</li> | |
| <li>Generate 2–4 variants and pick the most natural.</li> | |
| </ul> | |
| <h3>MCP & API</h3> | |
| <p>This Space exposes an <b>MCP server</b> and simple REST endpoints (see “API & MCP” tab). | |
| Perfect for media-automation pipelines and tools like <b><a href="https://n8n.partnerlinks.io/bilsimaging" target="_blank" rel="noopener">n8n</a></b>.</p> | |
| </div> | |
| """ | |
| def create_ui() -> gr.Blocks: | |
| css = """ | |
| .main-header{ text-align:center; padding:1.2rem; border-radius:18px; background:linear-gradient(135deg,#6366f1,#8b5cf6); color:white; box-shadow:0 12px 40px rgba(99,102,241,.35); margin-bottom:16px;} | |
| .main-header h1{ margin:0; font-size:2.0rem; font-weight:800;} | |
| .main-header p{ margin:.25rem 0 0; opacity:.95; font-weight:500;} | |
| .card{ background:white; border:1px solid #e7e9ef; border-radius:16px; padding:14px; box-shadow:0 10px 28px rgba(0,0,0,.06);} | |
| .generate-btn button{ font-weight:800; border-radius:12px; padding:10px 18px;} | |
| .minor-btn button{ border-radius:10px;} | |
| .muted{ color:#64748b; } | |
| .footer-text{ color:#64748b; text-align:center; padding:12px 0; font-size:.95rem; } | |
| """ | |
| with gr.Blocks(title="ShortiFoley — HunyuanVideo-Foley", css=css) as demo: | |
| gr.HTML(f"<div class='main-header'><h1>{SPACE_TITLE}</h1><p>{SPACE_TAGLINE}</p></div>") | |
| with gr.Tabs(): | |
| with gr.Tab("Run"): | |
| with gr.Row(): | |
| # LEFT: input | |
| with gr.Column(scale=1, elem_classes=["card"]): | |
| gr.Markdown("### 📹 Input") | |
| video_input = gr.Video(label="Upload Video", height=300) | |
| text_input = gr.Textbox( | |
| label="🎯 Audio Description (optional, English)", | |
| placeholder="e.g., Rubber soles on wet tile; distant chatter; occasional splashes.", | |
| lines=3 | |
| ) | |
| with gr.Row(): | |
| guidance_scale = gr.Slider(1.0, 10.0, value=4.5, step=0.1, label="CFG") | |
| steps = gr.Slider(10, 100, value=50, step=5, label="Steps") | |
| samples = gr.Slider(1, 6, value=1, step=1, label="Variants") | |
| with gr.Row(): | |
| load_btn = gr.Button("⚙️ Load model (CPU)", variant="secondary", elem_classes=["minor-btn"]) | |
| generate = gr.Button("🎵 Generate", variant="primary", elem_classes=["generate-btn"]) | |
| status = gr.Textbox(label="Status", interactive=False) | |
| # RIGHT: results | |
| with gr.Column(scale=1, elem_classes=["card"]): | |
| gr.Markdown("### 🎥 Result(s)") | |
| v1 = gr.Video(label="Sample 1", height=260, visible=True) | |
| with gr.Row(): | |
| v2 = gr.Video(label="Sample 2", height=160, visible=False) | |
| v3 = gr.Video(label="Sample 3", height=160, visible=False) | |
| with gr.Row(): | |
| v4 = gr.Video(label="Sample 4", height=160, visible=False) | |
| v5 = gr.Video(label="Sample 5", height=160, visible=False) | |
| v6 = gr.Video(label="Sample 6", height=160, visible=False) | |
| gr.Markdown("<span class='muted'>Autosaved to the Gallery tab.</span>") | |
| # Generate handler | |
| def _process_and_update(video_file, text_prompt, cfg, nsteps, nsamples): | |
| outs, msg = infer_single_video(video_file, text_prompt, cfg, nsteps, nsamples) | |
| vis = [] | |
| for i in range(6): | |
| if outs and i < len(outs): | |
| vis.append(gr.update(visible=True, value=outs[i])) | |
| else: | |
| vis.append(gr.update(visible=(i == 0), value=None if i > 0 else None)) | |
| # Also refresh the gallery in this same event | |
| new_gallery = _list_gallery() | |
| return (*vis, msg, new_gallery) | |
| generate.click( | |
| fn=_process_and_update, | |
| inputs=[video_input, text_input, guidance_scale, steps, samples], | |
| outputs=[v1, v2, v3, v4, v5, v6, status], # updated below to include gallery via .then-like merge | |
| api_name="/infer", | |
| api_description="Generate Foley audio for an uploaded video. Returns up to 6 video+audio files." | |
| ) | |
| # Workaround: extend outputs to include gallery refresh using a wrapper | |
| def _process_and_update_with_gallery(video_file, text_prompt, cfg, nsteps, nsamples): | |
| outs, msg = infer_single_video(video_file, text_prompt, cfg, nsteps, nsamples) | |
| vis = [] | |
| for i in range(6): | |
| if outs and i < len(outs): | |
| vis.append(gr.update(visible=True, value=outs[i])) | |
| else: | |
| vis.append(gr.update(visible=(i == 0), value=None if i > 0 else None)) | |
| new_gallery = _list_gallery() | |
| return (*vis, msg, new_gallery) | |
| # Re-bind with gallery as extra output | |
| generate.click( | |
| fn=_process_and_update_with_gallery, | |
| inputs=[video_input, text_input, guidance_scale, steps, samples], | |
| outputs=[v1, v2, v3, v4, v5, v6, status,], # gallery will be refreshed on Gallery tab itself | |
| ) | |
| load_btn.click( | |
| fn=lambda: auto_load_models(device_str="cpu"), | |
| inputs=[], | |
| outputs=[status], | |
| api_name="/load_model", | |
| api_description="Load/initialize the ShortiFoley model and encoders on CPU (GPU loads during inference)." | |
| ) | |
| # Toggle visibility based on variants | |
| def _toggle_vis(n): | |
| n = int(n) | |
| return [ | |
| gr.update(visible=True), | |
| gr.update(visible=n >= 2), | |
| gr.update(visible=n >= 3), | |
| gr.update(visible=n >= 4), | |
| gr.update(visible=n >= 5), | |
| gr.update(visible=n >= 6), | |
| ] | |
| samples.change(_toggle_vis, inputs=[samples], outputs=[v1, v2, v3, v4, v5, v6]) | |
| with gr.Tab("📁 Gallery"): | |
| gr.Markdown("Latest generated videos (autosaved to `outputs/autosaved/`).") | |
| gallery = gr.Gallery( | |
| value=_list_gallery(), | |
| columns=3, | |
| preview=True, | |
| label="Saved Results" | |
| ) | |
| refresh = gr.Button("🔄 Refresh Gallery") | |
| refresh.click(lambda: _list_gallery(), outputs=[gallery]) | |
| with gr.Tab("API & MCP"): | |
| gr.Markdown(""" | |
| ### REST examples | |
| **POST** `/api_generate_from_url` | |
| ```json | |
| { | |
| "video_url_or_b64": "https://yourhost/sample.mp4", | |
| "text_prompt": "metallic clink; hollow room reverb", | |
| "guidance_scale": 4.5, | |
| "num_inference_steps": 50, | |
| "sample_nums": 2 | |
| } | |
| ``` | |
| **POST** `/load_model_tool` | |
| Loads the model proactively (useful before batch runs). | |
| **MCP resources & prompt** | |
| - `shortifoley://status` → quick health info | |
| - `foley_prompt` → reusable guidance for describing the sound | |
| Works great with media-automation in tools like **n8n**: call `load_model_tool` once, then `api_generate_from_url` for each clip. | |
| """) | |
| with gr.Tab("ℹ️ About"): | |
| gr.HTML(_about_html()) | |
| # Footer | |
| gr.HTML( | |
| """ | |
| <div class="footer-text"> | |
| 🚀 Created by <a href="https://bilsimaging.com" target="_blank" rel="noopener">bilsimaging.com</a> | |
| · Powered by HunyuanVideo-Foley | |
| </div> | |
| """ | |
| ) | |
| # ---- REST + MCP endpoints ---- | |
| def _download_to_tmp(url: str) -> str: | |
| try: | |
| import requests | |
| except Exception: | |
| raise RuntimeError("Missing dependency 'requests'. Add it to requirements.txt to use URL inputs.") | |
| r = requests.get(url, timeout=30) | |
| r.raise_for_status() | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") | |
| tmp.write(r.content) | |
| tmp.flush() | |
| tmp.close() | |
| return tmp.name | |
| def _maybe_from_base64(data_url_or_b64: str) -> str: | |
| b64 = data_url_or_b64 | |
| if data_url_or_b64.startswith("data:"): | |
| b64 = data_url_or_b64.split(",", 1)[-1] | |
| raw = base64.b64decode(b64) | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") | |
| tmp.write(raw) | |
| tmp.flush() | |
| tmp.close() | |
| return tmp.name | |
| def _normalize_video_input(video_url_or_b64: str) -> str: | |
| v = (video_url_or_b64 or "").strip() | |
| if v.startswith("http://") or v.startswith("https://"): | |
| return _download_to_tmp(v) | |
| return _maybe_from_base64(v) | |
| def api_generate_from_url( | |
| video_url_or_b64: str, | |
| text_prompt: str = "", | |
| guidance_scale: float = 4.5, | |
| num_inference_steps: int = 50, | |
| sample_nums: int = 1, | |
| ) -> Dict[str, List[str]]: | |
| if _model_dict is None or _cfg is None: | |
| msg = auto_load_models(device_str="cpu") | |
| if not str(msg).startswith("✅"): | |
| raise RuntimeError(msg) | |
| local = _normalize_video_input(video_url_or_b64) | |
| outs, msg = infer_single_video(local, text_prompt, guidance_scale, num_inference_steps, sample_nums) | |
| return {"videos": outs, "message": msg} | |
| def load_model_tool() -> str: | |
| """Ensure model is loaded on server (convenient for MCP/REST).""" | |
| return auto_load_models(device_str="cpu") | |
| def shortifoley_status() -> str: | |
| """Return a simple readiness string for MCP clients.""" | |
| ready = _model_dict is not None and _cfg is not None | |
| dev = "cuda" if (_device and _device.type == "cuda") else ("mps" if (_device and _device.type == "mps") else "cpu") | |
| return f"ShortiFoley status: {'ready' if ready else 'loading'} | device={dev} | outputs={OUTPUTS_DIR}" | |
| def foley_prompt(name: str = "default") -> str: | |
| """Reusable guidance for describing sound ambience.""" | |
| return ( | |
| "Describe the expected environmental sound precisely. Mention material, rhythm, intensity, and ambience.\n" | |
| "Example: 'Soft leather footfalls on wet pavement with distant traffic hiss; occasional splashes.'" | |
| ) | |
| return demo | |
| def set_seeds(s: int = 1): | |
| random.seed(s) | |
| np.random.seed(s) | |
| torch.manual_seed(s) | |
| # ------------- | |
| # App bootstrap | |
| # ------------- | |
| if __name__ == "__main__": | |
| logger.remove() | |
| logger.add(lambda m: print(m, end=""), level="INFO") | |
| set_seeds(1) | |
| logger.info("===== Application Startup =====\n") | |
| prepare_once() | |
| # Probe imports | |
| sys.path.append(str(REPO_DIR)) | |
| try: | |
| from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process # noqa: F401 | |
| from hunyuanvideo_foley.utils.feature_utils import feature_process # noqa: F401 | |
| from hunyuanvideo_foley.utils.media_utils import merge_audio_video # noqa: F401 | |
| except Exception as e: | |
| logger.warning(f"Repo imports not ready yet: {e}") | |
| ui = create_ui() | |
| # Enable MCP server | |
| ui.launch( | |
| server_name="0.0.0.0", | |
| share=False, | |
| show_error=True, | |
| mcp_server=True, # MCP | |
| ) | |