Spaces:
Paused
Paused
| import gradio as gr | |
| import os | |
| import subprocess | |
| import cv2 | |
| from moviepy.editor import VideoFileClip, concatenate_videoclips | |
| import math | |
| from huggingface_hub import snapshot_download | |
| model_ids = [ | |
| 'runwayml/stable-diffusion-v1-5', | |
| 'lllyasviel/sd-controlnet-depth', | |
| 'lllyasviel/sd-controlnet-canny', | |
| 'lllyasviel/sd-controlnet-openpose', | |
| ] | |
| for model_id in model_ids: | |
| model_name = model_id.split('/')[-1] | |
| snapshot_download(model_id, local_dir=f'checkpoints/{model_name}') | |
| def get_frame_count_in_duration(filepath): | |
| video = cv2.VideoCapture(filepath) | |
| fps = video.get(cv2.CAP_PROP_FPS) | |
| frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = frame_count / fps | |
| width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| video.release() | |
| return gr.update(maximum=frame_count) | |
| def cut_mp4_into_chunks(input_file, chunk_size): | |
| video = VideoFileClip(input_file) | |
| frame_count = int(video.fps * video.duration) | |
| num_chunks = (frame_count + chunk_size - 1) // chunk_size # Ceiling division | |
| chunks = [] | |
| for i in range(num_chunks): | |
| start_frame = i * chunk_size | |
| end_frame = min((i + 1) * chunk_size, frame_count) | |
| chunk = video.subclip(start_frame / video.fps, end_frame / video.fps) | |
| chunk_frame_count = end_frame - start_frame | |
| chunks.append((chunk, chunk_frame_count)) | |
| return chunks | |
| def run_inference(prompt, video_path, condition, video_length): | |
| chunk_size = 12 | |
| chunks = cut_mp4_into_chunks(video_path, chunk_size) | |
| output_path = 'output/' | |
| os.makedirs(output_path, exist_ok=True) | |
| # Accessing chunks and frame counts by index | |
| for i, (chunk, frame_count) in enumerate(chunks): | |
| chunk.write_videofile(f'chunk_{i}.mp4') # Saving the chunk to a file | |
| chunk_path = f'chunk_{i}.mp4' | |
| print(f"Chunk {i}: Frame Count = {frame_count}") | |
| command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{chunk_path}' --output_path '{output_path}' --video_length {frame_count}" | |
| subprocess.run(command, shell=True) | |
| def working_run_inference(prompt, video_path, condition, video_length): | |
| output_path = 'output/' | |
| os.makedirs(output_path, exist_ok=True) | |
| # Construct the final video path | |
| video_path_output = os.path.join(output_path, f"{prompt}.mp4") | |
| # Check if the file already exists | |
| if os.path.exists(video_path_output): | |
| # Delete the existing file | |
| os.remove(video_path_output) | |
| if video_length > 12: | |
| command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{video_path}' --output_path '{output_path}' --video_length {video_length} --is_long_video" | |
| else: | |
| command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{video_path}' --output_path '{output_path}' --video_length {video_length}" | |
| subprocess.run(command, shell=True) | |
| # Construct the video path | |
| video_path_output = os.path.join(output_path, f"{prompt}.mp4") | |
| return "done", video_path_output | |
| with gr.Blocks() as demo: | |
| with gr.Column(): | |
| prompt = gr.Textbox(label="prompt") | |
| video_path = gr.Video(source="upload", type="filepath") | |
| condition = gr.Textbox(label="Condition", value="depth") | |
| video_length = gr.Slider(label="video length", minimum=1, maximum=15, step=1, value=2) | |
| #seed = gr.Number(label="seed", value=42) | |
| submit_btn = gr.Button("Submit") | |
| video_res = gr.Video(label="result") | |
| status = gr.Textbox(label="result") | |
| video_path.change(fn=get_frame_count_in_duration, | |
| inputs=[video_path], | |
| outputs=[video_length] | |
| ) | |
| submit_btn.click(fn=run_inference, | |
| inputs=[prompt, | |
| video_path, | |
| condition, | |
| video_length | |
| ], | |
| outputs=[status, video_res]) | |
| demo.queue(max_size=12).launch() |