#!/usr/bin/env python3 """ FastAPI Wrapper for HuggingFace Segment-Based Video Highlights Updated with the latest segment-based approach for better accuracy """ import os import tempfile # Set cache directories to writable locations for HuggingFace Spaces # Use /tmp which is guaranteed to be writable in containers CACHE_DIR = os.path.join("/tmp", ".cache", "huggingface") os.makedirs(CACHE_DIR, exist_ok=True) os.makedirs(os.path.join("/tmp", ".cache", "torch"), exist_ok=True) os.environ['HF_HOME'] = CACHE_DIR os.environ['TRANSFORMERS_CACHE'] = CACHE_DIR os.environ['HF_DATASETS_CACHE'] = CACHE_DIR os.environ['TORCH_HOME'] = os.path.join("/tmp", ".cache", "torch") os.environ['XDG_CACHE_HOME'] = os.path.join("/tmp", ".cache") os.environ['HUGGINGFACE_HUB_CACHE'] = CACHE_DIR os.environ['TOKENIZERS_PARALLELISM'] = 'false' from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks from fastapi.responses import FileResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import sys import uuid import json import asyncio from pathlib import Path from typing import Optional import logging # Add src directory to path for imports sys.path.append(str(Path(__file__).parent / "src")) try: from huggingface_exact_approach import VideoHighlightDetector except ImportError: print("❌ Cannot import huggingface_exact_approach.py") sys.exit(1) # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # FastAPI app app = FastAPI( title="SmolVLM2 Optimized HuggingFace Video Highlights API", description="Generate intelligent video highlights using SmolVLM2 segment-based approach", version="2.0.0" ) # Enable CORS for web apps app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, specify your domains allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Request/Response models class AnalysisRequest(BaseModel): segment_length: float = 5.0 model_name: str = "HuggingFaceTB/SmolVLM2-256M-Video-Instruct" with_effects: bool = True class AnalysisResponse(BaseModel): job_id: str status: str message: str class JobStatus(BaseModel): job_id: str status: str # "processing", "completed", "failed" progress: int # 0-100 message: str highlights_url: Optional[str] = None analysis_url: Optional[str] = None total_segments: Optional[int] = None selected_segments: Optional[int] = None compression_ratio: Optional[float] = None # Global storage for jobs (in production, use Redis/database) active_jobs = {} completed_jobs = {} # Create output directories with proper permissions TEMP_DIR = os.path.join("/tmp", "temp") OUTPUTS_DIR = os.path.join("/tmp", "outputs") # Create directories with proper permissions os.makedirs(OUTPUTS_DIR, mode=0o755, exist_ok=True) os.makedirs(TEMP_DIR, mode=0o755, exist_ok=True) @app.get("/") async def read_root(): """Welcome message with API information""" return { "message": "SmolVLM2 Optimized HuggingFace Video Highlights API", "version": "3.0.0", "approach": "Optimized HuggingFace exact approach with STRICT prompting", "model": "SmolVLM2-256M-Video-Instruct (faster processing)", "improvements": [ "STRICT system prompting for selectivity", "Structured YES/NO user prompts", "Temperature 0.3 for consistent decisions", "Enhanced response processing with fallbacks" ], "endpoints": { "upload": "POST /upload-video", "status": "GET /job-status/{job_id}", "download": "GET /download/{filename}", "docs": "GET /docs" } } @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "model": "SmolVLM2-256M-Video-Instruct"} async def process_video_background(job_id: str, video_path: str, output_path: str, segment_length: float, model_name: str, with_effects: bool): """Background task to process video""" try: # Update job status active_jobs[job_id]["status"] = "processing" active_jobs[job_id]["progress"] = 10 active_jobs[job_id]["message"] = "Initializing AI model..." # Initialize detector detector = VideoHighlightDetector(model_path=model_name) active_jobs[job_id]["progress"] = 20 active_jobs[job_id]["message"] = "Analyzing video content..." # Process video results = detector.process_video( video_path=video_path, output_path=output_path, segment_length=segment_length, with_effects=with_effects ) if "error" in results: # Failed active_jobs[job_id]["status"] = "failed" active_jobs[job_id]["message"] = results["error"] active_jobs[job_id]["progress"] = 0 else: # Success - move to completed jobs output_filename = os.path.basename(output_path) analysis_filename = output_filename.replace('.mp4', '_analysis.json') analysis_path = os.path.join(OUTPUTS_DIR, analysis_filename) # Save analysis with open(analysis_path, 'w') as f: json.dump(results, f, indent=2) completed_jobs[job_id] = { "job_id": job_id, "status": "completed", "progress": 100, "message": f"Created highlights with {results['selected_segments']} segments", "highlights_url": f"/download/{output_filename}", "analysis_url": f"/download/{analysis_filename}", "total_segments": results["total_segments"], "selected_segments": results["selected_segments"], "compression_ratio": results["compression_ratio"] } # Remove from active jobs if job_id in active_jobs: del active_jobs[job_id] except Exception as e: logger.error(f"Error processing video {job_id}: {str(e)}") active_jobs[job_id]["status"] = "failed" active_jobs[job_id]["message"] = f"Processing error: {str(e)}" active_jobs[job_id]["progress"] = 0 finally: # Clean up temp video file if os.path.exists(video_path): os.unlink(video_path) @app.post("/upload-video", response_model=AnalysisResponse) async def upload_video( background_tasks: BackgroundTasks, video: UploadFile = File(...), segment_length: float = 5.0, model_name: str = "HuggingFaceTB/SmolVLM2-256M-Video-Instruct", with_effects: bool = True ): """ Upload video for highlight generation Args: video: Video file to process segment_length: Length of each segment in seconds (default: 5.0) model_name: SmolVLM2 model to use with_effects: Enable fade transitions (default: True) """ # Validate file type if not video.content_type.startswith('video/'): raise HTTPException(status_code=400, detail="File must be a video") # Generate unique job ID job_id = str(uuid.uuid4()) # Save uploaded video to temp file temp_video_path = os.path.join(TEMP_DIR, f"{job_id}_input.mp4") output_path = os.path.join(OUTPUTS_DIR, f"{job_id}_highlights.mp4") try: # Save uploaded file with open(temp_video_path, "wb") as buffer: content = await video.read() buffer.write(content) # Initialize job tracking active_jobs[job_id] = { "job_id": job_id, "status": "queued", "progress": 5, "message": "Video uploaded, queued for processing", "highlights_url": None, "analysis_url": None } # Start background processing background_tasks.add_task( process_video_background, job_id, temp_video_path, output_path, segment_length, model_name, with_effects ) return AnalysisResponse( job_id=job_id, status="queued", message="Video uploaded successfully. Processing started." ) except Exception as e: # Clean up on error if os.path.exists(temp_video_path): os.unlink(temp_video_path) raise HTTPException(status_code=500, detail=f"Failed to process upload: {str(e)}") @app.get("/job-status/{job_id}", response_model=JobStatus) async def get_job_status(job_id: str): """Get processing status for a job""" # Check completed jobs first if job_id in completed_jobs: return JobStatus(**completed_jobs[job_id]) # Check active jobs if job_id in active_jobs: return JobStatus(**active_jobs[job_id]) # Job not found raise HTTPException(status_code=404, detail="Job not found") @app.get("/download/{filename}") async def download_file(filename: str): """Download generated highlights or analysis file""" file_path = os.path.join(OUTPUTS_DIR, filename) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="File not found") # Determine media type if filename.endswith('.mp4'): media_type = 'video/mp4' elif filename.endswith('.json'): media_type = 'application/json' else: media_type = 'application/octet-stream' return FileResponse( path=file_path, media_type=media_type, filename=filename ) @app.get("/jobs") async def list_jobs(): """List all jobs (for debugging)""" return { "active_jobs": len(active_jobs), "completed_jobs": len(completed_jobs), "active": list(active_jobs.keys()), "completed": list(completed_jobs.keys()) } @app.delete("/cleanup") async def cleanup_old_jobs(): """Clean up old completed jobs and files""" cleaned_jobs = 0 cleaned_files = 0 # Keep only last 10 completed jobs if len(completed_jobs) > 10: jobs_to_remove = list(completed_jobs.keys())[:-10] for job_id in jobs_to_remove: del completed_jobs[job_id] cleaned_jobs += 1 # Clean up old files (keep only files from last 20 jobs) all_jobs = list(active_jobs.keys()) + list(completed_jobs.keys()) try: for filename in os.listdir(OUTPUTS_DIR): file_job_id = filename.split('_')[0] if file_job_id not in all_jobs: file_path = os.path.join(OUTPUTS_DIR, filename) os.unlink(file_path) cleaned_files += 1 except Exception as e: logger.error(f"Error during cleanup: {e}") return { "message": "Cleanup completed", "cleaned_jobs": cleaned_jobs, "cleaned_files": cleaned_files } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)