Spaces:
Sleeping
Sleeping
| import spaces | |
| import subprocess | |
| # Install flash attention, skipping CUDA build if necessary | |
| subprocess.run( | |
| "pip install flash-attn --no-build-isolation", | |
| env={"FLASH_ATTENTION_SKIP_CUDA_BUILD": "TRUE"}, | |
| shell=True, | |
| ) | |
| import time | |
| import logging | |
| import gradio as gr | |
| import cv2 | |
| import os | |
| from transformers import AutoProcessor, AutoModelForImageTextToText | |
| import torch | |
| from PIL import Image | |
| import numpy as np | |
| from pathlib import Path | |
| # Cache for loaded model and processor | |
| default_cache = {'model_id': None, 'processor': None, 'model': None, 'device': None} | |
| model_cache = default_cache.copy() | |
| # Check for XPU availability | |
| has_xpu = hasattr(torch, 'xpu') and torch.xpu.is_available() | |
| def update_model(model_id, device): | |
| if model_cache['model_id'] != model_id or model_cache['device'] != device: | |
| logging.info(f'Loading model {model_id} on {device}') | |
| try: | |
| processor = AutoProcessor.from_pretrained(model_id) | |
| # Load model with appropriate precision for each device | |
| if device == 'cuda': | |
| # Use bfloat16 for CUDA for performance | |
| model = AutoModelForImageTextToText.from_pretrained( | |
| model_id, | |
| torch_dtype=torch.bfloat16, | |
| _attn_implementation='flash_attention_2' | |
| ).to('cuda') | |
| elif device == 'xpu' and has_xpu: | |
| # Use float32 on XPU to avoid bfloat16 layernorm issues | |
| model = AutoModelForImageTextToText.from_pretrained( | |
| model_id, | |
| torch_dtype=torch.float32 | |
| ).to('xpu') | |
| else: | |
| # Default to float32 on CPU | |
| model = AutoModelForImageTextToText.from_pretrained(model_id).to('cpu') | |
| model.eval() | |
| model_cache.update({'model_id': model_id, 'processor': processor, 'model': model, 'device': device}) | |
| except Exception as e: | |
| logging.error(f'Error loading model: {e}') | |
| raise e | |
| def extract_frames_from_video(video_path, max_frames=10): | |
| """Extract frames from video file for processing""" | |
| if not os.path.exists(video_path): | |
| raise FileNotFoundError(f"Video file not found: {video_path}") | |
| # Validate video file | |
| if not video_path.lower().endswith(('.mp4', '.avi', '.mov', '.mkv', '.webm')): | |
| raise ValueError("Unsupported video format. Please use MP4, AVI, MOV, MKV, or WEBM.") | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Cannot open video file: {video_path}") | |
| frames = [] | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| if frame_count == 0: | |
| cap.release() | |
| raise ValueError("Video file appears to be empty or corrupted") | |
| # Calculate step size to extract evenly distributed frames | |
| step = max(1, frame_count // max_frames) | |
| frame_idx = 0 | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_idx % step == 0: | |
| # Calculate timestamp for this frame | |
| timestamp = frame_idx / fps if fps > 0 else frame_idx | |
| frames.append((frame, timestamp)) | |
| if len(frames) >= max_frames: | |
| break | |
| frame_idx += 1 | |
| cap.release() | |
| return frames, fps | |
| def caption_frame(frame, model_id, interval_ms, sys_prompt, usr_prompt, device): | |
| """Caption a single frame (used for webcam streaming)""" | |
| debug_msgs = [] | |
| try: | |
| update_model(model_id, device) | |
| processor = model_cache['processor'] | |
| model = model_cache['model'] | |
| # Control capture interval | |
| time.sleep(interval_ms / 1000) | |
| # Preprocess frame | |
| t0 = time.time() | |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| pil_img = Image.fromarray(rgb) | |
| temp_path = 'frame.jpg' | |
| pil_img.save(temp_path, format='JPEG', quality=50) | |
| debug_msgs.append(f'Preprocess: {int((time.time()-t0)*1000)} ms') | |
| # Prepare multimodal chat messages | |
| messages = [ | |
| {'role': 'system', 'content': [{'type': 'text', 'text': sys_prompt}]}, | |
| {'role': 'user', 'content': [ | |
| {'type': 'image', 'url': temp_path}, | |
| {'type': 'text', 'text': usr_prompt} | |
| ]} | |
| ] | |
| # Tokenize and encode | |
| t1 = time.time() | |
| inputs = processor.apply_chat_template( | |
| messages, | |
| add_generation_prompt=True, | |
| tokenize=True, | |
| return_dict=True, | |
| return_tensors='pt' | |
| ) | |
| # Move inputs to correct device and dtype (matching model parameters) | |
| param_dtype = next(model.parameters()).dtype | |
| cast_inputs = {} | |
| for k, v in inputs.items(): | |
| if isinstance(v, torch.Tensor): | |
| if v.dtype.is_floating_point: | |
| # cast floating-point tensors to model's parameter dtype | |
| cast_inputs[k] = v.to(device=model.device, dtype=param_dtype) | |
| else: | |
| # move integer/mask tensors without changing dtype | |
| cast_inputs[k] = v.to(device=model.device) | |
| else: | |
| cast_inputs[k] = v | |
| inputs = cast_inputs | |
| debug_msgs.append(f'Tokenize: {int((time.time()-t1)*1000)} ms') | |
| # Inference | |
| t2 = time.time() | |
| outputs = model.generate(**inputs, do_sample=False, max_new_tokens=128) | |
| debug_msgs.append(f'Inference: {int((time.time()-t2)*1000)} ms') | |
| # Decode and strip history | |
| t3 = time.time() | |
| raw = processor.batch_decode(outputs, skip_special_tokens=True)[0] | |
| debug_msgs.append(f'Decode: {int((time.time()-t3)*1000)} ms') | |
| if "Assistant:" in raw: | |
| caption = raw.split("Assistant:")[-1].strip() | |
| else: | |
| lines = raw.splitlines() | |
| caption = lines[-1].strip() if len(lines) > 1 else raw.strip() | |
| # Clean up temp file | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| return caption, '\n'.join(debug_msgs) | |
| except Exception as e: | |
| return f"Error: {str(e)}", '\n'.join(debug_msgs) | |
| def process_single_frame(frame, model_id, sys_prompt, usr_prompt, device, frame_id=0): | |
| """Process a single frame similar to webcam mode - optimized for reuse""" | |
| debug_msgs = [] | |
| temp_path = None | |
| try: | |
| # Ensure model is loaded | |
| update_model(model_id, device) | |
| processor = model_cache['processor'] | |
| model = model_cache['model'] | |
| # Preprocess frame | |
| t0 = time.time() | |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| pil_img = Image.fromarray(rgb) | |
| temp_path = f'video_frame_{frame_id}.jpg' | |
| pil_img.save(temp_path, format='JPEG', quality=50) | |
| debug_msgs.append(f'Preprocess: {int((time.time()-t0)*1000)} ms') | |
| # Prepare multimodal chat messages | |
| messages = [ | |
| {'role': 'system', 'content': [{'type': 'text', 'text': sys_prompt}]}, | |
| {'role': 'user', 'content': [ | |
| {'type': 'image', 'url': temp_path}, | |
| {'type': 'text', 'text': usr_prompt} | |
| ]} | |
| ] | |
| # Tokenize and encode | |
| t1 = time.time() | |
| inputs = processor.apply_chat_template( | |
| messages, | |
| add_generation_prompt=True, | |
| tokenize=True, | |
| return_dict=True, | |
| return_tensors='pt' | |
| ) | |
| # Move inputs to correct device and dtype (matching model parameters) | |
| param_dtype = next(model.parameters()).dtype | |
| cast_inputs = {} | |
| for k, v in inputs.items(): | |
| if isinstance(v, torch.Tensor): | |
| if v.dtype.is_floating_point: | |
| cast_inputs[k] = v.to(device=model.device, dtype=param_dtype) | |
| else: | |
| cast_inputs[k] = v.to(device=model.device) | |
| else: | |
| cast_inputs[k] = v | |
| inputs = cast_inputs | |
| debug_msgs.append(f'Tokenize: {int((time.time()-t1)*1000)} ms') | |
| # Inference | |
| t2 = time.time() | |
| outputs = model.generate(**inputs, do_sample=False, max_new_tokens=128) | |
| debug_msgs.append(f'Inference: {int((time.time()-t2)*1000)} ms') | |
| # Decode and strip history | |
| t3 = time.time() | |
| raw = processor.batch_decode(outputs, skip_special_tokens=True)[0] | |
| debug_msgs.append(f'Decode: {int((time.time()-t3)*1000)} ms') | |
| if "Assistant:" in raw: | |
| caption = raw.split("Assistant:")[-1].strip() | |
| else: | |
| lines = raw.splitlines() | |
| caption = lines[-1].strip() if len(lines) > 1 else raw.strip() | |
| return caption, debug_msgs, None | |
| except Exception as e: | |
| return f"Error: {str(e)}", debug_msgs, str(e) | |
| finally: | |
| # Clean up temp file | |
| if temp_path and os.path.exists(temp_path): | |
| try: | |
| os.remove(temp_path) | |
| except Exception as cleanup_error: | |
| logging.warning(f"Failed to cleanup {temp_path}: {cleanup_error}") | |
| def process_video_with_interval(video_file, model_id, sys_prompt, usr_prompt, device, max_frames, interval_ms): | |
| """Process video file with interval-based processing similar to webcam mode""" | |
| if video_file is None: | |
| return "No video file uploaded", "" | |
| debug_msgs = [] | |
| all_captions = [] | |
| try: | |
| # Extract frames from video | |
| t0 = time.time() | |
| frames_with_timestamps, fps = extract_frames_from_video(video_file, max_frames) | |
| debug_msgs.append(f'Extracted {len(frames_with_timestamps)} frames in {int((time.time()-t0)*1000)} ms') | |
| debug_msgs.append(f'Video FPS: {fps:.2f}') | |
| if not frames_with_timestamps: | |
| return "No frames could be extracted from the video", '\n'.join(debug_msgs) | |
| # Process each frame with interval delay (similar to webcam mode) | |
| for i, (frame, timestamp) in enumerate(frames_with_timestamps): | |
| # Apply interval delay (similar to webcam mode) | |
| if i > 0: # Don't delay the first frame | |
| time.sleep(interval_ms / 1000) | |
| # Process frame using the same logic as webcam mode | |
| caption, frame_debug_msgs, error = process_single_frame( | |
| frame, model_id, sys_prompt, usr_prompt, device, frame_id=i | |
| ) | |
| # Add timing information | |
| timestamp_str = f"{timestamp:.2f}s" | |
| if error: | |
| all_captions.append(f"Frame {i+1} (t={timestamp_str}): ERROR - {error}") | |
| else: | |
| all_captions.append(f"Frame {i+1} (t={timestamp_str}): {caption}") | |
| # Add frame-specific debug info | |
| debug_msgs.extend([f"Frame {i+1}: {msg}" for msg in frame_debug_msgs]) | |
| return '\n\n'.join(all_captions), '\n'.join(debug_msgs) | |
| except Exception as e: | |
| return f"Error processing video: {str(e)}", '\n'.join(debug_msgs) | |
| def toggle_input_mode(input_mode): | |
| """Toggle between webcam and video file input""" | |
| if input_mode == "Webcam": | |
| return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) | |
| else: # Video File | |
| return gr.update(visible=False), gr.update(visible=True), gr.update(visible=True) | |
| def main(): | |
| logging.basicConfig(level=logging.INFO) | |
| model_choices = [ | |
| 'HuggingFaceTB/SmolVLM2-256M-Video-Instruct', | |
| 'HuggingFaceTB/SmolVLM2-500M-Video-Instruct', | |
| 'HuggingFaceTB/SmolVLM2-2.2B-Instruct' | |
| ] | |
| # Determine available devices | |
| device_options = ['cpu'] | |
| if torch.cuda.is_available(): | |
| device_options.append('cuda') | |
| if has_xpu: | |
| device_options.append('xpu') | |
| default_device = 'cuda' if torch.cuda.is_available() else ('xpu' if has_xpu else 'cpu') | |
| with gr.Blocks() as demo: | |
| gr.Markdown('## 🎥 Real-Time Webcam & Video File Captioning with SmolVLM2 (Transformers)') | |
| with gr.Row(): | |
| input_mode = gr.Radio( | |
| choices=["Webcam", "Video File"], | |
| value="Webcam", | |
| label="Input Mode" | |
| ) | |
| with gr.Row(): | |
| model_dd = gr.Dropdown(model_choices, value=model_choices[0], label='Model ID') | |
| device_dd = gr.Dropdown(device_options, value=default_device, label='Device') | |
| # Webcam-specific controls | |
| with gr.Row() as webcam_controls: | |
| interval = gr.Slider(100, 20000, step=100, value=3000, label='Interval (ms)') | |
| # Video file-specific controls | |
| with gr.Row(visible=False) as video_controls: | |
| interval_video = gr.Slider(100, 10000, step=100, value=1000, label='Processing Interval (ms)') | |
| max_frames = gr.Slider(1, 20, step=1, value=5, label='Max Frames to Process') | |
| sys_p = gr.Textbox(lines=2, value='Describe the key action', label='System Prompt') | |
| usr_p = gr.Textbox(lines=1, value='What is happening in this image?', label='User Prompt') | |
| # Input components | |
| cam = gr.Image(sources=['webcam'], streaming=True, label='Webcam Feed') | |
| video_file = gr.File( | |
| label="Upload Video File", | |
| file_types=[".mp4", ".avi", ".mov", ".mkv", ".webm"], | |
| visible=False | |
| ) | |
| # Process button for video files | |
| process_btn = gr.Button("Process Video", visible=False) | |
| # Output components | |
| caption_tb = gr.Textbox(interactive=False, label='Caption') | |
| log_tb = gr.Textbox(lines=4, interactive=False, label='Debug Log') | |
| # Toggle input mode | |
| input_mode.change( | |
| fn=toggle_input_mode, | |
| inputs=[input_mode], | |
| outputs=[cam, video_file, process_btn] | |
| ) | |
| # Also toggle the control panels | |
| input_mode.change( | |
| fn=lambda mode: (gr.update(visible=mode=="Webcam"), gr.update(visible=mode=="Video File")), | |
| inputs=[input_mode], | |
| outputs=[webcam_controls, video_controls] | |
| ) | |
| # Webcam streaming | |
| cam.stream( | |
| fn=caption_frame, | |
| inputs=[cam, model_dd, interval, sys_p, usr_p, device_dd], | |
| outputs=[caption_tb, log_tb], | |
| time_limit=600 | |
| ) | |
| # Video file processing | |
| process_btn.click( | |
| fn=process_video_with_interval, | |
| inputs=[video_file, model_dd, sys_p, usr_p, device_dd, max_frames, interval_video], | |
| outputs=[caption_tb, log_tb] | |
| ) | |
| # Enable Gradio's async event queue | |
| demo.queue() | |
| # Launch the app | |
| demo.launch() | |
| if __name__ == '__main__': | |
| main() |