Spaces:
Running
on
Zero
Running
on
Zero
| import gradio as gr | |
| import torch | |
| from PIL import Image | |
| from transformers import AutoModel, AutoTokenizer | |
| from decord import VideoReader, cpu | |
| from scipy.spatial import cKDTree | |
| import numpy as np | |
| import math | |
| import time | |
| import spaces | |
| # Model initialization | |
| model = None | |
| tokenizer = None | |
| MAX_NUM_FRAMES = 180 | |
| MAX_NUM_PACKING = 3 | |
| TIME_SCALE = 0.1 | |
| def load_model(): | |
| global model, tokenizer | |
| if model is None: | |
| gr.Info("Loading model... This may take a moment.") | |
| model = AutoModel.from_pretrained( | |
| 'openbmb/MiniCPM-V-4_5', | |
| trust_remote_code=True, | |
| attn_implementation='sdpa', | |
| torch_dtype=torch.bfloat16 | |
| ) | |
| model = model.eval() | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| 'openbmb/MiniCPM-V-4_5', | |
| trust_remote_code=True | |
| ) | |
| gr.Success("Model loaded successfully!") | |
| return model, tokenizer | |
| def map_to_nearest_scale(values, scale): | |
| tree = cKDTree(np.asarray(scale)[:, None]) | |
| _, indices = tree.query(np.asarray(values)[:, None]) | |
| return np.asarray(scale)[indices] | |
| def group_array(arr, size): | |
| return [arr[i:i+size] for i in range(0, len(arr), size)] | |
| def encode_video(video_path, choose_fps=3, force_packing=None): | |
| def uniform_sample(l, n): | |
| gap = len(l) / n | |
| idxs = [int(i * gap + gap / 2) for i in range(n)] | |
| return [l[i] for i in idxs] | |
| vr = VideoReader(video_path, ctx=cpu(0)) | |
| fps = vr.get_avg_fps() | |
| video_duration = len(vr) / fps | |
| if choose_fps * int(video_duration) <= MAX_NUM_FRAMES: | |
| packing_nums = 1 | |
| choose_frames = round(min(choose_fps, round(fps)) * min(MAX_NUM_FRAMES, video_duration)) | |
| else: | |
| packing_nums = math.ceil(video_duration * choose_fps / MAX_NUM_FRAMES) | |
| if packing_nums <= MAX_NUM_PACKING: | |
| choose_frames = round(video_duration * choose_fps) | |
| else: | |
| choose_frames = round(MAX_NUM_FRAMES * MAX_NUM_PACKING) | |
| packing_nums = MAX_NUM_PACKING | |
| frame_idx = [i for i in range(0, len(vr))] | |
| frame_idx = np.array(uniform_sample(frame_idx, choose_frames)) | |
| if force_packing: | |
| packing_nums = min(force_packing, MAX_NUM_PACKING) | |
| frames = vr.get_batch(frame_idx).asnumpy() | |
| frame_idx_ts = frame_idx / fps | |
| scale = np.arange(0, video_duration, TIME_SCALE) | |
| frame_ts_id = map_to_nearest_scale(frame_idx_ts, scale) / TIME_SCALE | |
| frame_ts_id = frame_ts_id.astype(np.int32) | |
| assert len(frames) == len(frame_ts_id) | |
| frames = [Image.fromarray(v.astype('uint8')).convert('RGB') for v in frames] | |
| frame_ts_id_group = group_array(frame_ts_id, packing_nums) | |
| return frames, frame_ts_id_group, video_duration, len(frame_idx), packing_nums | |
| def process_video_and_question(video, question, fps, force_packing, history): | |
| if video is None: | |
| gr.Warning("Please upload a video first.") | |
| return history, "" | |
| if not question: | |
| gr.Warning("Please enter a question.") | |
| return history, "" | |
| try: | |
| # Load model if not already loaded | |
| model, tokenizer = load_model() | |
| model = model.cuda() | |
| # Encode video | |
| gr.Info(f"Processing video with {fps} FPS...") | |
| frames, frame_ts_id_group, duration, num_frames, packing_nums = encode_video( | |
| video, | |
| fps, | |
| force_packing=force_packing if force_packing > 0 else None | |
| ) | |
| # Prepare messages | |
| msgs = [ | |
| {'role': 'user', 'content': frames + [question]}, | |
| ] | |
| # Get model response | |
| gr.Info("Generating response...") | |
| answer = model.chat( | |
| msgs=msgs, | |
| tokenizer=tokenizer, | |
| use_image_id=False, | |
| max_slice_nums=1, | |
| temporal_ids=frame_ts_id_group | |
| ) | |
| # Update chat history | |
| history.append({ | |
| "role": "user", | |
| "content": f"📹 [Video: {duration:.1f}s, {num_frames} frames, packing: {packing_nums}]\n{question}" | |
| }) | |
| history.append({ | |
| "role": "assistant", | |
| "content": answer | |
| }) | |
| return history, "" | |
| except Exception as e: | |
| gr.Error(f"Error processing video: {str(e)}") | |
| return history, "" | |
| def clear_chat(): | |
| return [], None, "", 3, 0 | |
| # Create Gradio interface with theme | |
| theme = gr.themes.Soft( | |
| primary_hue=gr.themes.colors.blue, | |
| secondary_hue=gr.themes.colors.gray, | |
| neutral_hue=gr.themes.colors.gray, | |
| spacing_size="md", | |
| radius_size="md", | |
| text_size="md", | |
| font=[gr.themes.GoogleFont("Inter"), "SF Pro Display", "-apple-system", "BlinkMacSystemFont", "sans-serif"], | |
| font_mono=[gr.themes.GoogleFont("SF Mono"), "Monaco", "Menlo", "monospace"] | |
| ).set( | |
| body_background_fill="*neutral_50", | |
| body_background_fill_dark="*neutral_950", | |
| button_primary_background_fill="*primary_500", | |
| button_primary_background_fill_hover="*primary_600", | |
| button_primary_text_color="white", | |
| button_primary_border_color="*primary_500", | |
| block_background_fill="white", | |
| block_background_fill_dark="*neutral_900", | |
| block_border_width="1px", | |
| block_border_color="*neutral_200", | |
| block_border_color_dark="*neutral_800", | |
| block_radius="*radius_lg", | |
| block_shadow="0px 1px 3px 0px rgba(0, 0, 0, 0.02), 0px 0px 0px 1px rgba(0, 0, 0, 0.05)", | |
| block_shadow_dark="0px 1px 3px 0px rgba(0, 0, 0, 0.1), 0px 0px 0px 1px rgba(255, 255, 255, 0.05)", | |
| input_background_fill="*neutral_50", | |
| input_background_fill_dark="*neutral_900", | |
| input_border_color="*neutral_300", | |
| input_border_color_dark="*neutral_700", | |
| input_border_width="1px", | |
| input_radius="*radius_md", | |
| slider_color="*primary_500", | |
| ) | |
| with gr.Blocks(theme=theme, title="Video Chat with MiniCPM-V") as demo: | |
| gr.Markdown( | |
| """ | |
| # 🎥 Video Chat with MiniCPM-V-4.5 | |
| Upload a video and ask questions about it! The model uses advanced 3D-resampler compression | |
| to process multiple frames efficiently. | |
| **Note:** First run will download the model (~8GB), which may take a few minutes. | |
| """ | |
| ) | |
| with gr.Row(): | |
| # Main video area (takes most of the space) | |
| with gr.Column(scale=3): | |
| video_input = gr.Video( | |
| label="Upload Video", | |
| height=600 | |
| ) | |
| # Sidebar with all controls | |
| with gr.Column(scale=1): | |
| chatbot = gr.Chatbot( | |
| label="Chat", | |
| height=300, | |
| type="messages" | |
| ) | |
| with gr.Row(): | |
| question_input = gr.Textbox( | |
| label="Ask about the video", | |
| placeholder="e.g., Describe what happens in this video...", | |
| lines=2, | |
| scale=4 | |
| ) | |
| submit_btn = gr.Button("Send", variant="primary", scale=1) | |
| with gr.Row(): | |
| clear_btn = gr.Button("Clear Chat", variant="secondary", size="sm") | |
| example_btn1 = gr.Button("Describe", size="sm") | |
| example_btn2 = gr.Button("Action", size="sm") | |
| example_btn3 = gr.Button("People", size="sm") | |
| with gr.Accordion("Advanced Settings", open=False): | |
| fps_slider = gr.Slider( | |
| minimum=1, | |
| maximum=10, | |
| value=3, | |
| step=1, | |
| label="FPS for frame extraction", | |
| info="Higher FPS captures more detail but uses more memory" | |
| ) | |
| force_packing_slider = gr.Slider( | |
| minimum=0, | |
| maximum=MAX_NUM_PACKING, | |
| value=0, | |
| step=1, | |
| label="Force Packing", | |
| info=f"0 = auto, 1-{MAX_NUM_PACKING} = force specific packing number" | |
| ) | |
| with gr.Accordion("ℹ️ Video Info", open=False): | |
| gr.Markdown( | |
| """ | |
| - **Max frames:** 180 × 3 packing = 540 frames | |
| - **Temporal compression:** 64 tokens per video | |
| - **Supported formats:** MP4, AVI, MOV, etc. | |
| """ | |
| ) | |
| # Example questions | |
| example_btn1.click( | |
| lambda: "Describe this video in detail.", | |
| outputs=question_input | |
| ) | |
| example_btn2.click( | |
| lambda: "What actions or events occur in this video?", | |
| outputs=question_input | |
| ) | |
| example_btn3.click( | |
| lambda: "Are there any people in this video? If so, what are they doing?", | |
| outputs=question_input | |
| ) | |
| # Event handlers | |
| submit_btn.click( | |
| fn=process_video_and_question, | |
| inputs=[video_input, question_input, fps_slider, force_packing_slider, chatbot], | |
| outputs=[chatbot, question_input] | |
| ) | |
| question_input.submit( | |
| fn=process_video_and_question, | |
| inputs=[video_input, question_input, fps_slider, force_packing_slider, chatbot], | |
| outputs=[chatbot, question_input] | |
| ) | |
| clear_btn.click( | |
| fn=clear_chat, | |
| outputs=[chatbot, video_input, question_input, fps_slider, force_packing_slider] | |
| ) | |
| # Examples | |
| gr.Examples( | |
| examples=[ | |
| ["Describe what happens in this video"], | |
| ["What is the main subject of this video?"], | |
| ["Count the number of objects or people in the video"], | |
| ["What emotions or mood does this video convey?"], | |
| ["Summarize the key moments in this video"], | |
| ], | |
| inputs=question_input, | |
| label="Example Questions" | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |