Spaces:
Paused
Paused
| import spaces | |
| import os | |
| import uuid | |
| import torch | |
| import logging | |
| import tempfile | |
| import numpy as np | |
| import gradio as gr | |
| from datetime import datetime | |
| from diffusers import WanImageToVideoPipeline | |
| from diffusers.utils import export_to_video | |
| from huggingface_hub import upload_file | |
| from PIL import Image | |
| # ----------------- Setup ----------------- | |
| logging.basicConfig(level=logging.INFO) | |
| HF_MODEL = "rahul7star/rahulAI" | |
| dtype = torch.bfloat16 | |
| device = "cuda" | |
| model_id = "FastDM/Wan2.2-I2V-A14B-Merge-Lightning-V1.0-Diffusers" | |
| pipe = WanImageToVideoPipeline.from_pretrained(model_id, torch_dtype=dtype) | |
| pipe.to(device) | |
| default_negative_prompt = ( | |
| "色调艳丽,过曝,静态,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,最差质量,低质量," | |
| "JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,畸形的,毁容的,形态畸形的肢体," | |
| "手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走" | |
| ) | |
| # ----------------- Upload helper ----------------- | |
| def upscale_and_upload_4k(input_video_path: str, input_image, summary_text: str) -> str: | |
| """ | |
| Upload video (4K), input image, and summary text to HF. | |
| """ | |
| logging.info(f"Upscaling video to 4K for upload: {input_video_path}") | |
| # Upscale video | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_upscaled: | |
| upscaled_path = tmp_upscaled.name | |
| cmd = [ | |
| "ffmpeg", "-i", input_video_path, | |
| "-vf", "scale=3840:2160:flags=lanczos", | |
| "-c:v", "libx264", "-crf", "18", "-preset", "slow", "-y", upscaled_path, | |
| ] | |
| os.system(" ".join(cmd)) # safer: subprocess.run, but HF Spaces sometimes picky | |
| # Create HF folder | |
| today_str = datetime.now().strftime("%Y-%m-%d") | |
| unique_subfolder = f"upload_{uuid.uuid4().hex[:8]}" | |
| hf_folder = f"{today_str}-WAN-I2V/{unique_subfolder}" | |
| # Upload video | |
| video_filename = os.path.basename(input_video_path) | |
| video_hf_path = f"{hf_folder}/{video_filename}" | |
| upload_file(upscaled_path, video_hf_path, repo_id=HF_MODEL, repo_type="model", | |
| token=os.environ.get("HUGGINGFACE_HUB_TOKEN")) | |
| # Upload image | |
| with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_img: | |
| if isinstance(input_image, str): | |
| import shutil | |
| shutil.copy(input_image, tmp_img.name) | |
| else: | |
| input_image.save(tmp_img.name, format="PNG") | |
| tmp_img_path = tmp_img.name | |
| image_hf_path = f"{hf_folder}/input_image.png" | |
| upload_file(tmp_img_path, image_hf_path, repo_id=HF_MODEL, repo_type="model", | |
| token=os.environ.get("HUGGINGFACE_HUB_TOKEN")) | |
| # Upload summary | |
| summary_file = tempfile.NamedTemporaryFile(delete=False, suffix=".txt").name | |
| with open(summary_file, "w", encoding="utf-8") as f: | |
| f.write(summary_text) | |
| summary_hf_path = f"{hf_folder}/summary.txt" | |
| upload_file(summary_file, summary_hf_path, repo_id=HF_MODEL, repo_type="model", | |
| token=os.environ.get("HUGGINGFACE_HUB_TOKEN")) | |
| # Cleanup | |
| os.remove(upscaled_path) | |
| os.remove(tmp_img_path) | |
| os.remove(summary_file) | |
| return hf_folder | |
| # ----------------- Video generation ----------------- | |
| def get_duration( | |
| input_image, | |
| prompt, | |
| negative_prompt, | |
| duration_seconds, | |
| guidance_scale, | |
| guidance_scale_2, | |
| steps, | |
| seed, | |
| randomize_seed, | |
| progress, | |
| ): | |
| return steps * 15 | |
| def generate_video(input_image, prompt, negative_prompt=default_negative_prompt, | |
| duration_seconds=2, guidance_scale=3.5, steps=40, seed=0): | |
| if input_image is None: | |
| return None, "Please upload an image!" | |
| # Ensure divisible by patch size | |
| max_area = 480 * 832 | |
| aspect_ratio = input_image.height / input_image.width | |
| mod_value = pipe.vae_scale_factor_spatial * pipe.transformer.config.patch_size[1] | |
| height = round(np.sqrt(max_area * aspect_ratio)) // mod_value * mod_value | |
| width = round(np.sqrt(max_area / aspect_ratio)) // mod_value * mod_value | |
| input_image = input_image.resize((width, height)) | |
| generator = torch.Generator(device=device).manual_seed(int(seed)) | |
| with torch.inference_mode(): | |
| output_frames_list = pipe( | |
| image=input_image, | |
| prompt=prompt, | |
| negative_prompt=negative_prompt, | |
| height=height, | |
| width=width, | |
| num_frames=int(duration_seconds * 16), # 16 fps | |
| guidance_scale=float(guidance_scale), | |
| num_inference_steps=int(steps), | |
| generator=generator, | |
| ).frames[0] | |
| # Save temp video | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmpfile: | |
| video_path = tmpfile.name | |
| export_to_video(output_frames_list, video_path, fps=16) | |
| # Upload to HF | |
| #hf_folder = upscale_and_upload_4k(video_path, input_image, prompt) | |
| return video_path, f"✅ Uploaded to HF: {hf_folder}" | |
| # ----------------- Gradio UI ----------------- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 🖼️➡️🎥 Image to Video with Wan 2.2 I2V (14B Lightning)") | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_image = gr.Image(type="pil", label="Upload an Image") | |
| prompt = gr.Textbox(lines=4, label="Prompt") | |
| negative_prompt = gr.Textbox(value=default_negative_prompt, lines=3, label="Negative Prompt") | |
| duration = gr.Slider(1, 4, value=2, step=1, label="Duration (seconds)") | |
| guidance_scale = gr.Slider(0, 10, value=3.5, step=0.5, label="Guidance Scale") | |
| steps = gr.Slider(10, 50, value=40, step=1, label="Inference Steps") | |
| seed = gr.Number(value=0, precision=0, label="Seed") | |
| generate_btn = gr.Button("🚀 Generate Video") | |
| with gr.Column(): | |
| output_video = gr.Video(label="Generated Video") | |
| upload_status = gr.Textbox(label="Upload Status", interactive=False) | |
| generate_btn.click( | |
| generate_video, | |
| inputs=[input_image, prompt, negative_prompt, duration, guidance_scale, steps, seed], | |
| outputs=[output_video, upload_status], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |