#!/usr/bin/env python3 """ FastAPI Wrapper for Audio-Enhanced Video Highlights Converts your SmolVLM2 + Whisper system into a web API for Android apps """ from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks from fastapi.responses import FileResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import os import sys import tempfile import uuid import json import asyncio from pathlib import Path from typing import Optional import logging # Add src directory to path for imports sys.path.append(str(Path(__file__).parent / "src")) try: from audio_enhanced_highlights_final import AudioVisualAnalyzer, extract_frames_at_intervals, save_frame_at_time, create_highlights_video except ImportError: print("❌ Cannot import audio_enhanced_highlights_final.py") sys.exit(1) # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # FastAPI app app = FastAPI( title="SmolVLM2 Video Highlights API", description="Generate intelligent video highlights using SmolVLM2 + Whisper", version="1.0.0" ) # Enable CORS for Android apps app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, specify your Android app's domain allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Request/Response models class AnalysisRequest(BaseModel): interval: float = 20.0 min_score: float = 6.5 max_highlights: int = 3 whisper_model: str = "base" timeout: int = 35 class AnalysisResponse(BaseModel): job_id: str status: str message: str class JobStatus(BaseModel): job_id: str status: str # "processing", "completed", "failed" progress: int # 0-100 message: str highlights_url: Optional[str] = None analysis_url: Optional[str] = None # Global storage for jobs (in production, use Redis/database) active_jobs = {} completed_jobs = {} # Create output directories os.makedirs("outputs", exist_ok=True) os.makedirs("temp", exist_ok=True) @app.get("/") async def root(): return { "message": "SmolVLM2 Video Highlights API", "version": "1.0.0", "endpoints": { "upload": "/upload-video", "status": "/job-status/{job_id}", "download": "/download/{filename}" } } @app.post("/upload-video", response_model=AnalysisResponse) async def upload_video( background_tasks: BackgroundTasks, video: UploadFile = File(...), interval: float = 20.0, min_score: float = 6.5, max_highlights: int = 3, whisper_model: str = "base", timeout: int = 35 ): """ Upload a video and start processing highlights """ # Validate file if not video.filename.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')): raise HTTPException(status_code=400, detail="Only video files are supported") # Generate unique job ID job_id = str(uuid.uuid4()) try: # Save uploaded video temp_video_path = f"temp/{job_id}_{video.filename}" with open(temp_video_path, "wb") as f: content = await video.read() f.write(content) # Store job info active_jobs[job_id] = { "status": "processing", "progress": 0, "message": "Video uploaded, starting analysis...", "video_path": temp_video_path, "settings": { "interval": interval, "min_score": min_score, "max_highlights": max_highlights, "whisper_model": whisper_model, "timeout": timeout } } # Start processing in background background_tasks.add_task( process_video_highlights, job_id, temp_video_path, interval, min_score, max_highlights, whisper_model, timeout ) return AnalysisResponse( job_id=job_id, status="processing", message="Video uploaded successfully. Processing started." ) except Exception as e: logger.error(f"Upload failed: {e}") raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}") @app.get("/job-status/{job_id}", response_model=JobStatus) async def get_job_status(job_id: str): """ Get the status of a processing job """ # Check active jobs if job_id in active_jobs: job = active_jobs[job_id] return JobStatus( job_id=job_id, status=job["status"], progress=job["progress"], message=job["message"] ) # Check completed jobs if job_id in completed_jobs: job = completed_jobs[job_id] return JobStatus( job_id=job_id, status=job["status"], progress=100, message=job["message"], highlights_url=job.get("highlights_url"), analysis_url=job.get("analysis_url") ) raise HTTPException(status_code=404, detail="Job not found") @app.get("/download/{filename}") async def download_file(filename: str): """ Download generated files """ file_path = f"outputs/{filename}" if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="File not found") return FileResponse( file_path, media_type='application/octet-stream', filename=filename ) async def process_video_highlights( job_id: str, video_path: str, interval: float, min_score: float, max_highlights: int, whisper_model: str, timeout: int ): """ Background task to process video highlights """ try: # Update status active_jobs[job_id]["progress"] = 10 active_jobs[job_id]["message"] = "Initializing AI models..." # Initialize analyzer analyzer = AudioVisualAnalyzer( whisper_model_size=whisper_model, timeout_seconds=timeout ) active_jobs[job_id]["progress"] = 20 active_jobs[job_id]["message"] = "Extracting video segments..." # Extract segments segments = extract_frames_at_intervals(video_path, interval) total_segments = len(segments) active_jobs[job_id]["progress"] = 30 active_jobs[job_id]["message"] = f"Analyzing {total_segments} segments..." # Analyze segments analyzed_segments = [] temp_frame_path = f"temp/{job_id}_frame.jpg" for i, segment in enumerate(segments): # Update progress progress = 30 + int((i / total_segments) * 50) # 30-80% active_jobs[job_id]["progress"] = progress active_jobs[job_id]["message"] = f"Analyzing segment {i+1}/{total_segments}" # Save frame for visual analysis if save_frame_at_time(video_path, segment['start_time'], temp_frame_path): # Analyze segment analysis = analyzer.analyze_segment(video_path, segment, temp_frame_path) analyzed_segments.append(analysis) # Cleanup temp frame try: os.unlink(temp_frame_path) except: pass active_jobs[job_id]["progress"] = 85 active_jobs[job_id]["message"] = "Selecting best highlights..." # Select best segments analyzed_segments.sort(key=lambda x: x['combined_score'], reverse=True) selected_segments = [s for s in analyzed_segments if s['combined_score'] >= min_score] selected_segments = selected_segments[:max_highlights] if not selected_segments: raise Exception(f"No segments met minimum score of {min_score}") active_jobs[job_id]["progress"] = 90 active_jobs[job_id]["message"] = f"Creating highlights video with {len(selected_segments)} segments..." # Create output filenames highlights_filename = f"{job_id}_highlights.mp4" analysis_filename = f"{job_id}_analysis.json" highlights_path = f"outputs/{highlights_filename}" analysis_path = f"outputs/{analysis_filename}" # Create highlights video success = create_highlights_video(video_path, selected_segments, highlights_path) if not success: raise Exception("Failed to create highlights video") # Save analysis analysis_data = { 'job_id': job_id, 'input_video': video_path, 'output_video': highlights_path, 'settings': { 'interval': interval, 'min_score': min_score, 'max_highlights': max_highlights, 'whisper_model': whisper_model, 'timeout': timeout }, 'segments': analyzed_segments, 'selected_segments': selected_segments, 'summary': { 'total_segments': len(analyzed_segments), 'selected_segments': len(selected_segments), 'processing_time': "Completed successfully" } } with open(analysis_path, 'w') as f: json.dump(analysis_data, f, indent=2) # Mark as completed completed_jobs[job_id] = { "status": "completed", "message": f"Successfully created highlights with {len(selected_segments)} segments", "highlights_url": f"/download/{highlights_filename}", "analysis_url": f"/download/{analysis_filename}", "summary": analysis_data['summary'] } # Remove from active jobs del active_jobs[job_id] # Cleanup temp video try: os.unlink(video_path) except: pass except Exception as e: logger.error(f"Processing failed for job {job_id}: {e}") # Mark as failed completed_jobs[job_id] = { "status": "failed", "message": f"Processing failed: {str(e)}", "highlights_url": None, "analysis_url": None } # Remove from active jobs if job_id in active_jobs: del active_jobs[job_id] # Cleanup try: os.unlink(video_path) except: pass if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)