Spaces:
Paused
Paused
| import gradio as gr | |
| import os | |
| import subprocess | |
| import cv2 | |
| import numpy as np | |
| from moviepy.editor import VideoFileClip, concatenate_videoclips | |
| import math | |
| from huggingface_hub import snapshot_download | |
| model_ids = [ | |
| 'runwayml/stable-diffusion-v1-5', | |
| 'lllyasviel/sd-controlnet-depth', | |
| 'lllyasviel/sd-controlnet-canny', | |
| 'lllyasviel/sd-controlnet-openpose', | |
| ] | |
| for model_id in model_ids: | |
| model_name = model_id.split('/')[-1] | |
| snapshot_download(model_id, local_dir=f'checkpoints/{model_name}') | |
| def get_frame_count(filepath): | |
| video = cv2.VideoCapture(filepath) | |
| frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| video.release() | |
| return gr.update(maximum=frame_count) | |
| def get_video_dimension(filepath): | |
| video = cv2.VideoCapture(filepath) | |
| width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = int(video.get(cv2.CAP_PROP_FPS)) | |
| frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| video.release() | |
| return width, height, fps, frame_count | |
| def resize_video(input_vid, output_vid, width, height, fps): | |
| print(f"RESIZING ...") | |
| # Open the input video file | |
| video = cv2.VideoCapture(input_vid) | |
| # Get the original video's width and height | |
| original_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| original_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| # Create a VideoWriter object to write the resized video | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for the output video | |
| output_video = cv2.VideoWriter(output_vid, fourcc, fps, (width, height)) | |
| while True: | |
| # Read a frame from the input video | |
| ret, frame = video.read() | |
| if not ret: | |
| break | |
| # Resize the frame to the desired dimensions | |
| resized_frame = cv2.resize(frame, (width, height)) | |
| # Write the resized frame to the output video file | |
| output_video.write(resized_frame) | |
| # Release the video objects | |
| video.release() | |
| output_video.release() | |
| print(f"RESIZE VIDEO DONE!") | |
| return output_vid | |
| def normalize_and_save_video(input_video_path, output_video_path): | |
| print(f"NORMALIZING ...") | |
| cap = cv2.VideoCapture(input_video_path) | |
| # Get video properties | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| # Create VideoWriter object to save the normalized video | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Specify the codec (e.g., 'mp4v', 'XVID', 'MPEG') | |
| out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) | |
| # Iterate through each frame in the video | |
| for _ in range(frame_count): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Convert frame to floating point | |
| frame = frame.astype(np.float32) | |
| # Normalize pixel values to the range [0, 1] | |
| frame /= 255.0 | |
| # Convert normalized frame back to 8-bit unsigned integer | |
| frame = (frame * 255.0).astype(np.uint8) | |
| # Write the normalized frame to the output video file | |
| out.write(frame) | |
| # Release the VideoCapture and VideoWriter objects | |
| cap.release() | |
| out.release() | |
| print(f"NORMALIZE DONE!") | |
| return output_video_path | |
| def chunkify(video_path, fps, nb_frames): | |
| chunks_array = [] | |
| video_capture = cv2.VideoCapture(video_path) | |
| chunk_start_frame = 0 | |
| frames_per_chunk = 12 | |
| while chunk_start_frame < nb_frames: | |
| chunk_end_frame = min(chunk_start_frame + frames_per_chunk, nb_frames) | |
| video_capture.set(cv2.CAP_PROP_POS_FRAMES, chunk_start_frame) | |
| success, frame = video_capture.read() | |
| if not success: | |
| break | |
| chunk_name = f"chunk_{chunk_start_frame}-{chunk_end_frame}.mp4" | |
| chunk_video = cv2.VideoWriter(chunk_name, cv2.VideoWriter_fourcc(*"mp4v"), fps, (frame.shape[1], frame.shape[0])) | |
| for frame_number in range(chunk_start_frame, chunk_end_frame): | |
| video_capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number) | |
| success, frame = video_capture.read() | |
| if not success: | |
| break | |
| chunk_video.write(frame) | |
| chunk_video.release() | |
| chunks_array.append(chunk_name) | |
| chunk_start_frame += frames_per_chunk | |
| video_capture.release() | |
| print(f"CHUNKS: {chunks_array}") | |
| return chunks_array | |
| def run_inference_by_chunkify(prompt, video_path, condition, video_length): | |
| # DOESN'T WORK | |
| # Get FPS of original video input | |
| target_fps = get_video_dimension(video_path)[2] | |
| print(f"INPUT FPS: {target_fps}") | |
| # Count total frames according to fps | |
| total_frames = get_video_dimension(video_path)[3] | |
| # Resize the video | |
| resized = resize_video(video_path, 'resized.mp4', 512, 512, target_fps) | |
| # Chunkify the video into 12 frames chunks | |
| chunks = chunkify(resized, target_fps, total_frames) | |
| output_path = 'output/' | |
| os.makedirs(output_path, exist_ok=True) | |
| processed_chunks = [] | |
| for index, chunk_path in enumerate(chunks): | |
| if index == 0 : | |
| print(f"Chunk #{index}: {chunk_path}") | |
| # Check if the file already exists | |
| if os.path.exists(os.path.join(output_path, f"{index}.mp4")): | |
| # Delete the existing file | |
| os.remove(os.path.join(output_path, f"{index}.mp4")) | |
| #if video_length > 12: | |
| # command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{video_path}' --output_path '{output_path}' --width 512 --height 512 --fps 8 --video_length {video_length} --is_long_video" | |
| #else: | |
| command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{chunk_path}' --output_path '{output_path}' --temp_chunk_path '{index}' --width 512 --height 512 --fps 8 --video_length {video_length} --is_long_video" | |
| subprocess.run(command, shell=True) | |
| # Construct the video path | |
| video_path_output = os.path.join(output_path, f"{index}.mp4") | |
| # Append processed chunk to final array | |
| processed_chunks.append(video_path_output) | |
| else: | |
| print("finished") | |
| print(f"PROCESSED CHUNKS: {processed_chunks}") | |
| return "done", processed_chunks[0] | |
| def run_inference(prompt, video_path, condition, video_length): | |
| # Get FPS of original video input | |
| target_fps = get_video_dimension(video_path)[2] | |
| print(f"INPUT FPS: {target_fps}") | |
| # Count total frames according to fps | |
| total_frames = get_video_dimension(video_path)[3] | |
| # Resize the video | |
| resized = resize_video(video_path, 'resized.mp4', 512, 512, target_fps) | |
| # normalize pixels | |
| normalized = normalize_and_save_video(resized, 'normalized.mp4') | |
| output_path = 'output/' | |
| os.makedirs(output_path, exist_ok=True) | |
| # Check if the file already exists | |
| if os.path.exists(os.path.join(output_path, f"result.mp4")): | |
| # Delete the existing file | |
| os.remove(os.path.join(output_path, f"result.mp4")) | |
| print(f"RUNNING INFERENCE ...") | |
| if video_length > 12: | |
| command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{normalized}' --output_path '{output_path}' --temp_chunk_path 'result' --width 512 --height 512 --fps {target_fps} --video_length {video_length} --is_long_video" | |
| else: | |
| command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{normalized}' --output_path '{output_path}' --temp_chunk_path 'result' --width 512 --height 512 --fps {target_fps} --video_length {video_length}" | |
| subprocess.run(command, shell=True) | |
| # Construct the video path | |
| video_path_output = os.path.join(output_path, f"result.mp4") | |
| print(f"FINISHED !") | |
| return "done", video_path_output | |
| css=""" | |
| #col-container {max-width: 810px; margin-left: auto; margin-right: auto;} | |
| """ | |
| with gr.Blocks(css=css) as demo: | |
| with gr.Column(elem_id="col-container"): | |
| gr.Markdown(""" | |
| <h1 style="text-align: center;">ControlVideo</h1> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| #video_in = gr.Video(source="upload", type="filepath", visible=True) | |
| video_path = gr.Video(source="upload", type="filepath", visible=True) | |
| prompt = gr.Textbox(label="prompt") | |
| with gr.Row(): | |
| condition = gr.Dropdown(label="Condition", choices=["depth", "canny", "pose"], value="depth") | |
| video_length = gr.Slider(label="Video length", info="How many frames do you want to process ?", minimum=1, maximum=12, step=1, value=2) | |
| #seed = gr.Number(label="seed", value=42) | |
| submit_btn = gr.Button("Submit") | |
| with gr.Column(): | |
| video_res = gr.Video(label="result") | |
| status = gr.Textbox(label="result") | |
| video_path.change(fn=get_frame_count, | |
| inputs=[video_path], | |
| outputs=[video_length] | |
| ) | |
| submit_btn.click(fn=run_inference, | |
| inputs=[prompt, | |
| video_path, | |
| condition, | |
| video_length | |
| ], | |
| outputs=[status, video_res]) | |
| demo.queue(max_size=12).launch() |