Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, HTTPException, Body | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, HttpUrl | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import torchvision.models as tv | |
| try: | |
| from transformers import Wav2Vec2Model, Wav2Vec2Config | |
| _HAS_TRANSFORMERS = True | |
| except ImportError: | |
| _HAS_TRANSFORMERS = False | |
| import cv2 | |
| import numpy as np | |
| import librosa | |
| from PIL import Image | |
| import tempfile | |
| import os | |
| import shutil | |
| from typing import Dict, Any, Optional | |
| import json | |
| import warnings | |
| import logging | |
| import asyncio | |
| from contextlib import asynccontextmanager | |
| import requests | |
| from urllib.parse import urlparse | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Global model instance | |
| model_instance = None | |
| # Response models | |
| class EmotionScores(BaseModel): | |
| Anger: float | |
| Disgust: float | |
| Fear: float | |
| Happy: float | |
| Neutral: float | |
| Sad: float | |
| class AnalysisResult(BaseModel): | |
| neuroticism: float | |
| neuroticism_level: str | |
| emotions: EmotionScores | |
| dominant_emotion: str | |
| frames_processed: int | |
| audio_features_extracted: bool | |
| model_used: str | |
| confidence: str | |
| class ErrorResponse(BaseModel): | |
| error: str | |
| message: str | |
| # Request model for URL endpoint | |
| class VideoUrlRequest(BaseModel): | |
| video_url: HttpUrl # Ensures valid URL format | |
| # MirrorMind Model Architecture (unchanged) | |
| class GradientReverseFn(torch.autograd.Function): | |
| """Gradient reversal function for adversarial training""" | |
| def forward(ctx, x, lambd): | |
| ctx.lambd = lambd | |
| return x.view_as(x) | |
| def backward(ctx, grad_output): | |
| return -ctx.lambd * grad_output, None | |
| def grad_reverse(x, lambd=1.0): | |
| """Gradient reversal layer""" | |
| return GradientReverseFn.apply(x, lambd) | |
| class MirrorMindModel(nn.Module): | |
| def __init__( | |
| self, | |
| num_frames=8, | |
| audio_length=64000, # 4s at 16kHz | |
| num_emotions=6, | |
| num_domains=2, | |
| hidden_dim=512, | |
| use_pretrained_video=True, | |
| use_pretrained_audio=True, | |
| freeze_video_backbone=True, | |
| freeze_audio_backbone=True, | |
| ): | |
| super().__init__() | |
| self.num_frames = num_frames | |
| self.audio_length = audio_length | |
| self.num_emotions = num_emotions | |
| self.num_domains = num_domains | |
| self.hidden_dim = hidden_dim | |
| # Video encoder | |
| if use_pretrained_video: | |
| self.video_backbone = tv.resnet18(weights=tv.ResNet18_Weights.IMAGENET1K_V1) | |
| else: | |
| self.video_backbone = tv.resnet18(weights=None) | |
| self.video_feat_dim = self.video_backbone.fc.in_features # 512 | |
| self.video_backbone.fc = nn.Identity() | |
| if freeze_video_backbone: | |
| for param in self.video_backbone.parameters(): | |
| param.requires_grad = False | |
| for param in self.video_backbone.layer4.parameters(): | |
| param.requires_grad = True | |
| self.video_proj = nn.Sequential( | |
| nn.Linear(self.video_feat_dim, hidden_dim), | |
| nn.BatchNorm1d(hidden_dim), | |
| nn.ReLU(inplace=True), | |
| nn.Dropout(0.2), | |
| ) | |
| # Audio Encoder | |
| self.audio_feat_dim = 0 | |
| if use_pretrained_audio and _HAS_TRANSFORMERS: | |
| try: | |
| config = Wav2Vec2Config.from_pretrained("facebook/wav2vec2-base") | |
| if hasattr(config, "gradient_checkpointing"): | |
| delattr(config, "gradient_checkpointing") | |
| self.audio_backbone = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base", config=config) | |
| self.audio_feat_dim = self.audio_backbone.config.hidden_size # 768 | |
| if freeze_audio_backbone: | |
| for param in self.audio_backbone.parameters(): | |
| param.requires_grad = False | |
| for name, param in self.audio_backbone.named_parameters(): | |
| if any(x in name for x in ['encoder.layers.10', 'encoder.layers.11']): | |
| param.requires_grad = True | |
| self.audio_pool = nn.AdaptiveAvgPool1d(1) | |
| logger.info("Using Wav2Vec2 audio encoder") | |
| except Exception as e: | |
| logger.warning(f"Could not load Wav2Vec2, using CNN: {e}") | |
| self._create_improved_audio_encoder() | |
| else: | |
| self._create_improved_audio_encoder() | |
| logger.info("Using CNN audio encoder") | |
| self.audio_proj = nn.Sequential( | |
| nn.Linear(self.audio_feat_dim, hidden_dim), | |
| nn.BatchNorm1d(hidden_dim), | |
| nn.ReLU(inplace=True), | |
| nn.Dropout(0.2), | |
| ) | |
| # Temporal attention | |
| self.temporal_attention = nn.Sequential( | |
| nn.Linear(self.video_feat_dim, 128), | |
| nn.ReLU(inplace=True), | |
| nn.Linear(128, 1) | |
| ) | |
| # Fusion layer | |
| fusion_input_dim = hidden_dim * 2 | |
| self.fusion_output_dim = hidden_dim | |
| self.fusion_proj = nn.Sequential( | |
| nn.Linear(fusion_input_dim, self.fusion_output_dim), | |
| nn.BatchNorm1d(self.fusion_output_dim), | |
| nn.ReLU(inplace=True), | |
| nn.Dropout(0.3), | |
| ) | |
| # Task heads | |
| self.emotion_head = nn.Sequential( | |
| nn.Linear(self.fusion_output_dim, hidden_dim // 2), | |
| nn.BatchNorm1d(hidden_dim // 2), | |
| nn.ReLU(inplace=True), | |
| nn.Dropout(0.4), | |
| nn.Linear(hidden_dim // 2, num_emotions), | |
| ) | |
| self.neuro_head = nn.Sequential( | |
| nn.Linear(self.fusion_output_dim, hidden_dim // 2), | |
| nn.BatchNorm1d(hidden_dim // 2), | |
| nn.ReLU(inplace=True), | |
| nn.Dropout(0.3), | |
| nn.Linear(hidden_dim // 2, 1), | |
| nn.Sigmoid() | |
| ) | |
| # Domain head | |
| self.domain_head = nn.Sequential( | |
| nn.Linear(self.fusion_output_dim, hidden_dim // 4), | |
| nn.ReLU(inplace=True), | |
| nn.Dropout(0.2), | |
| nn.Linear(hidden_dim // 4, num_domains), | |
| ) | |
| self._init_weights() | |
| def _create_improved_audio_encoder(self): | |
| self.audio_backbone = nn.Sequential( | |
| nn.Conv1d(1, 64, kernel_size=7, stride=2, padding=3), | |
| nn.BatchNorm1d(64), | |
| nn.ReLU(inplace=True), | |
| nn.MaxPool1d(2), | |
| nn.Conv1d(64, 128, kernel_size=5, stride=1, padding=2), | |
| nn.BatchNorm1d(128), | |
| nn.ReLU(inplace=True), | |
| nn.MaxPool1d(2), | |
| nn.Conv1d(128, 256, kernel_size=3, stride=1, padding=1), | |
| nn.BatchNorm1d(256), | |
| nn.ReLU(inplace=True), | |
| nn.AdaptiveAvgPool1d(1) | |
| ) | |
| self.audio_feat_dim = 256 | |
| self.audio_pool = None | |
| def _init_weights(self): | |
| for m in self.modules(): | |
| if isinstance(m, nn.Linear): | |
| if m.out_features == self.num_emotions: | |
| nn.init.xavier_uniform_(m.weight, gain=1.0) | |
| if m.bias is not None: | |
| nn.init.zeros_(m.bias) | |
| elif m.out_features == 1: | |
| nn.init.xavier_normal_(m.weight) | |
| if m.bias is not None: | |
| nn.init.zeros_(m.bias) | |
| else: | |
| nn.init.xavier_normal_(m.weight) | |
| if m.bias is not None: | |
| nn.init.zeros_(m.bias) | |
| elif isinstance(m, nn.Conv1d): | |
| nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') | |
| elif isinstance(m, (nn.BatchNorm1d, nn.LayerNorm)): | |
| nn.init.ones_(m.weight) | |
| nn.init.zeros_(m.bias) | |
| def _prep_frames(frames): | |
| device = frames.device | |
| if frames.dim() == 5: | |
| B, T = frames.shape[:2] | |
| if frames.shape[-1] == 3: | |
| frames = frames.permute(0, 1, 4, 2, 3) | |
| B, T, C, H, W = frames.shape | |
| frames = frames.reshape(B * T, C, H, W) | |
| elif frames.dim() == 4: | |
| B = frames.shape[0] | |
| T = 1 | |
| else: | |
| raise ValueError(f"Unsupported frames shape: {frames.shape}") | |
| if frames.dtype != torch.float32: | |
| frames = frames.float() | |
| if frames.max() > 1.1: | |
| frames = frames / 255.0 | |
| frames = torch.clamp(frames, 0.0, 1.0) | |
| return frames, B, T | |
| def _process_video_temporal_attention(self, vid_feat_bt, B, T): | |
| if T == 1: | |
| return vid_feat_bt.view(B, -1) | |
| vid_feat_reshaped = vid_feat_bt.view(B, T, -1) | |
| attention_scores = self.temporal_attention(vid_feat_reshaped) # B, T, 1 | |
| attn_weights = F.softmax(attention_scores, dim=1) | |
| return torch.sum(vid_feat_reshaped * attn_weights, dim=1) | |
| def forward(self, frames, audio, alpha=0.0): | |
| device = next(self.parameters()).device | |
| frames_nchw, B, T = self._prep_frames(frames.to(device)) | |
| try: | |
| vid_feat_bt = self.video_backbone(frames_nchw) | |
| vid_feat_bt = vid_feat_bt.flatten(1) | |
| vid_feat = self._process_video_temporal_attention(vid_feat_bt, B, T) | |
| vid_feat = self.video_proj(vid_feat) | |
| except Exception as e: | |
| logger.error(f"Video processing error: {e}") | |
| vid_feat = torch.zeros((B, self.hidden_dim), device=device) | |
| try: | |
| if audio is None or torch.all(audio == 0): | |
| aud_feat = torch.zeros((B, self.hidden_dim), device=device) | |
| else: | |
| audio = audio.float().to(device) | |
| if hasattr(self.audio_backbone, 'from_pretrained'): | |
| attn_mask = (audio.abs() > 1e-6).long() | |
| out = self.audio_backbone(input_values=audio, attention_mask=attn_mask) | |
| x = out.last_hidden_state.transpose(1, 2) | |
| x = self.audio_pool(x).squeeze(-1) | |
| aud_feat = x | |
| else: | |
| x = audio.unsqueeze(1) | |
| x = self.audio_backbone(x) | |
| if x.dim() == 3: | |
| x = x.squeeze(-1) | |
| aud_feat = x | |
| aud_feat = self.audio_proj(aud_feat) | |
| except Exception as e: | |
| logger.error(f"Audio processing error: {e}") | |
| aud_feat = torch.zeros((B, self.hidden_dim), device=device) | |
| fused = torch.cat([vid_feat, aud_feat], dim=1) | |
| fused_final = self.fusion_proj(fused) | |
| emotion_logits = self.emotion_head(fused_final) | |
| neuroticism_pred = self.neuro_head(fused_final) | |
| domain_logits = None | |
| if self.training and alpha > 0.0: | |
| if alpha < 0.01: | |
| rev = grad_reverse(fused_final, lambd=alpha * 0.1) | |
| domain_logits = self.domain_head(rev) | |
| return neuroticism_pred, emotion_logits, domain_logits | |
| class MirrorMindInference: | |
| def __init__(self): | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| logger.info(f"Using device: {self.device}") | |
| model_path = "mirror_model.pth" | |
| logger.info(f"Loading model from {model_path}...") | |
| if not os.path.exists(model_path): | |
| logger.warning(f"Model file {model_path} not found. Using fallback mode.") | |
| self.model = None | |
| return | |
| checkpoint = None | |
| pytorch_version = torch.__version__ | |
| if pytorch_version.startswith(("2.8", "2.9")): | |
| logger.info(f"Detected PyTorch {pytorch_version} - using version-specific loading...") | |
| try: | |
| logger.info("Loading with weights_only=False...") | |
| with warnings.catch_warnings(): | |
| warnings.simplefilter("ignore") | |
| checkpoint = torch.load(model_path, map_location=self.device, weights_only=False) | |
| logger.info("✓ Successfully loaded complete model") | |
| except Exception as e1: | |
| logger.error(f"✗ Failed: {e1}") | |
| try: | |
| logger.info("Attempting state_dict loading with weights_only=True...") | |
| checkpoint = torch.load(model_path, map_location=self.device, weights_only=True) | |
| logger.info("✓ Loaded as state_dict") | |
| except Exception as e2: | |
| logger.error(f"✗ Failed: {e2}") | |
| checkpoint = None | |
| else: | |
| try: | |
| logger.info(f"Using standard loading for PyTorch {pytorch_version}...") | |
| checkpoint = torch.load(model_path, map_location=self.device) | |
| logger.info("✓ Loaded with standard method") | |
| except Exception as e: | |
| logger.error(f"✗ Failed: {e}") | |
| checkpoint = None | |
| if checkpoint is None: | |
| logger.warning("All loading methods failed. Using fallback mode.") | |
| self.model = None | |
| return | |
| if isinstance(checkpoint, dict): | |
| logger.info(f"Checkpoint keys: {list(checkpoint.keys())}") | |
| if 'model' in checkpoint and 'state_dict' in checkpoint: | |
| self.model = checkpoint['model'] | |
| self.model.load_state_dict(checkpoint['state_dict']) | |
| logger.info("✓ Loaded model architecture + state dict") | |
| elif 'state_dict' in checkpoint: | |
| logger.info("Found 'state_dict' - attempting to reconstruct model...") | |
| if 'model_config' in checkpoint: | |
| self.model = MirrorMindModel(**checkpoint['model_config']) | |
| self.model.load_state_dict(checkpoint['state_dict']) | |
| logger.info("✓ Loaded using model_config + state_dict") | |
| else: | |
| logger.warning("⚠️ No model_config. Using fallback.") | |
| self.model = None | |
| return | |
| elif 'model_state_dict' in checkpoint: | |
| logger.info("Found 'model_state_dict' - checking for model class info...") | |
| state_dict = checkpoint['model_state_dict'] | |
| if 'model_config' in checkpoint: | |
| self.model = MirrorMindModel(**checkpoint['model_config']) | |
| self.model.load_state_dict(state_dict) | |
| logger.info("✓ Loaded using model_config + model_state_dict") | |
| else: | |
| logger.warning("⚠️ No model_config. Using fallback.") | |
| self.model = None | |
| return | |
| elif len(checkpoint.keys()) > 0 and all(isinstance(v, torch.Tensor) for v in checkpoint.values()): | |
| logger.info("Checkpoint appears to be a direct state dict") | |
| logger.warning("⚠️ Cannot reconstruct without model_config. Using fallback.") | |
| self.model = None | |
| return | |
| else: | |
| if hasattr(checkpoint, 'eval') and callable(checkpoint.eval): | |
| self.model = checkpoint | |
| logger.info("✓ Using checkpoint as complete model") | |
| else: | |
| logger.warning("⚠️ Unrecognized format. Using fallback.") | |
| self.model = None | |
| return | |
| else: | |
| if hasattr(checkpoint, 'eval') and callable(checkpoint.eval): | |
| self.model = checkpoint | |
| logger.info("✓ Loaded complete model object") | |
| else: | |
| logger.warning("⚠️ Not a model object. Using fallback.") | |
| self.model = None | |
| return | |
| if self.model is not None: | |
| self.model.to(self.device) | |
| self.model.eval() | |
| logger.info("Model loaded and ready for inference!") | |
| else: | |
| logger.warning("Model is None after loading. Using fallback.") | |
| def extract_video_frames(self, video_path: str, num_frames: int = 8) -> torch.Tensor: | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| if total_frames == 0: | |
| raise ValueError("Could not read video file") | |
| frame_indices = np.linspace(0, total_frames - 1, num_frames, dtype=int) | |
| frames = [] | |
| for idx in frame_indices: | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, idx) | |
| ret, frame = cap.read() | |
| if ret: | |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| frame = cv2.resize(frame, (224, 224)) | |
| frame = frame.astype(np.float32) / 255.0 | |
| frames.append(frame) | |
| cap.release() | |
| if not frames: | |
| raise ValueError("No frames extracted") | |
| frames = np.array(frames) | |
| frames = np.transpose(frames, (0, 3, 1, 2)) | |
| video_tensor = torch.from_numpy(frames).to(self.device) | |
| return video_tensor | |
| except Exception as e: | |
| logger.error(f"Video extraction failed: {e}") | |
| dummy_frames = np.random.rand(num_frames, 3, 224, 224).astype(np.float32) | |
| return torch.from_numpy(dummy_frames).to(self.device) | |
| def extract_audio_features(self, video_path: str, duration: float = 4.0): | |
| try: | |
| audio, sr = librosa.load(video_path, sr=16000, duration=duration) | |
| target_length = self.model.audio_length if self.model else 64000 | |
| if len(audio) == 0: | |
| raise ValueError("No audio data") | |
| if len(audio) < target_length: | |
| audio = np.pad(audio, (0, target_length - len(audio))) | |
| elif len(audio) > target_length: | |
| audio = audio[:target_length] | |
| audio_tensor = torch.from_numpy(audio).float().to(self.device) | |
| return audio_tensor | |
| except Exception as e: | |
| logger.error(f"Audio extraction failed: {e}") | |
| target_length = self.model.audio_length if self.model else 64000 | |
| return torch.zeros(target_length).to(self.device) | |
| def predict(self, video_path: str) -> Dict[str, Any]: | |
| try: | |
| if not os.path.exists(video_path): | |
| raise ValueError(f"Video not found: {video_path}") | |
| video_features = self.extract_video_frames(video_path) | |
| audio_features = self.extract_audio_features(video_path) | |
| if self.model is not None: | |
| with torch.no_grad(): | |
| neuroticism_logits, emotion_logits, _ = self.model(video_features.unsqueeze(0), audio_features.unsqueeze(0)) | |
| neuroticism_score = neuroticism_logits.squeeze().item() | |
| emotion_probs = F.softmax(emotion_logits, dim=1).squeeze().cpu().numpy() | |
| emotion_labels = ['Anger', 'Disgust', 'Fear', 'Happy', 'Neutral', 'Sad'] | |
| emotion_scores = dict(zip(emotion_labels, emotion_probs)) | |
| else: | |
| logger.info("Using fallback predictions") | |
| neuroticism_score = np.random.uniform(0.2, 0.8) | |
| emotion_scores = { | |
| 'Happy': np.random.uniform(0.1, 0.4), | |
| 'Neutral': np.random.uniform(0.2, 0.5), | |
| 'Sad': np.random.uniform(0.05, 0.3), | |
| 'Anger': np.random.uniform(0.0, 0.2), | |
| 'Fear': np.random.uniform(0.0, 0.15), | |
| 'Disgust': np.random.uniform(0.0, 0.1) | |
| } | |
| total = sum(emotion_scores.values()) | |
| emotion_scores = {k: v/total for k, v in emotion_scores.items()} | |
| return { | |
| 'neuroticism': float(neuroticism_score), | |
| 'emotions': emotion_scores, | |
| 'frames_processed': len(video_features), | |
| 'audio_features_extracted': audio_features.numel() > 0 and not torch.all(audio_features == 0), | |
| 'model_used': 'real' if self.model is not None else 'fallback' | |
| } | |
| except Exception as e: | |
| logger.error(f"Prediction error: {e}") | |
| return { | |
| 'error': str(e), | |
| 'neuroticism': 0.0, | |
| 'emotions': {'Error': 1.0}, | |
| 'frames_processed': 0, | |
| 'audio_features_extracted': False, | |
| 'model_used': 'error' | |
| } | |
| # Initialize model on startup | |
| async def lifespan(app: FastAPI): | |
| global model_instance | |
| logger.info("Starting MirrorMind API service...") | |
| model_instance = MirrorMindInference() | |
| logger.info(f"PyTorch version: {torch.__version__}") | |
| logger.info(f"CUDA available: {torch.cuda.is_available()}") | |
| yield | |
| logger.info("Shutting down MirrorMind API service...") | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="MirrorMind API", | |
| description="AI Personality & Emotion Analysis API", | |
| version="1.0.0", | |
| lifespan=lifespan | |
| ) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Configure this for production | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def root(): | |
| return { | |
| "message": "MirrorMind API is running", | |
| "version": "1.0.0", | |
| "pytorch_version": torch.__version__, | |
| "cuda_available": torch.cuda.is_available(), | |
| "model_loaded": model_instance.model is not None if model_instance else False | |
| } | |
| async def health_check(): | |
| return { | |
| "status": "healthy", | |
| "model_status": "loaded" if model_instance and model_instance.model is not None else "fallback", | |
| "device": str(model_instance.device) if model_instance else "unknown" | |
| } | |
| async def analyze_video(file: UploadFile = File(...)): | |
| """ | |
| Analyze a video file for personality traits and emotions. | |
| - **file**: Video file (MP4, AVI, MOV, WebM) | |
| - Returns neuroticism score and emotion analysis | |
| """ | |
| if not model_instance: | |
| raise HTTPException(status_code=503, detail="Model not initialized") | |
| # Validate file type | |
| allowed_extensions = {'.mp4', '.avi', '.mov', '.webm', '.mkv'} | |
| file_extension = os.path.splitext(file.filename.lower())[1] | |
| if file_extension not in allowed_extensions: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Unsupported file format. Allowed formats: {', '.join(allowed_extensions)}" | |
| ) | |
| # Create temporary file | |
| temp_dir = tempfile.mkdtemp() | |
| temp_file_path = os.path.join(temp_dir, f"uploaded_video{file_extension}") | |
| try: | |
| # Save uploaded file | |
| with open(temp_file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| # Analyze video | |
| results = model_instance.predict(temp_file_path) | |
| if 'error' in results: | |
| raise HTTPException(status_code=500, detail=f"Analysis failed: {results['error']}") | |
| # Process results | |
| neuroticism_score = results['neuroticism'] | |
| if neuroticism_score <= 0.3: | |
| neuroticism_level = "Low (Emotionally Stable)" | |
| elif neuroticism_score <= 0.7: | |
| neuroticism_level = "Medium (Moderate Reactivity)" | |
| else: | |
| neuroticism_level = "High (Emotionally Sensitive)" | |
| emotions = results['emotions'] | |
| dominant_emotion = max(emotions.keys(), key=lambda k: emotions[k]) | |
| confidence = "High" if results['model_used'] == 'real' else "Demo Mode" | |
| return AnalysisResult( | |
| neuroticism=neuroticism_score, | |
| neuroticism_level=neuroticism_level, | |
| emotions=EmotionScores(**emotions), | |
| dominant_emotion=dominant_emotion, | |
| frames_processed=results['frames_processed'], | |
| audio_features_extracted=results['audio_features_extracted'], | |
| model_used=results['model_used'], | |
| confidence=confidence | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Analysis error: {e}") | |
| raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") | |
| finally: | |
| # Clean up temporary files | |
| try: | |
| shutil.rmtree(temp_dir) | |
| except Exception as e: | |
| logger.warning(f"Failed to clean up temp directory: {e}") | |
| async def analyze_video_from_url(video_url: str): | |
| """ | |
| Analyze a video from a URL (Firebase/Supabase storage). | |
| - **video_url**: Direct URL to video file | |
| - Returns neuroticism score and emotion analysis | |
| """ | |
| if not model_instance: | |
| raise HTTPException(status_code=503, detail="Model not initialized") | |
| import requests | |
| from urllib.parse import urlparse | |
| # Create temporary file | |
| temp_dir = tempfile.mkdtemp() | |
| # Extract file extension from URL or default to .mp4 | |
| parsed_url = urlparse(video_url) | |
| file_extension = os.path.splitext(parsed_url.path)[1] or '.mp4' | |
| temp_file_path = os.path.join(temp_dir, f"downloaded_video{file_extension}") | |
| try: | |
| logger.info(f"Attempting to download video from: {video_url}") | |
| # Enhanced headers to mimic browser request - CRITICAL for Supabase | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
| 'Accept': 'video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5', | |
| 'Accept-Language': 'en-US,en;q=0.9', | |
| 'Accept-Encoding': 'identity', # Don't use gzip for videos | |
| 'Connection': 'keep-alive', | |
| 'Referer': 'https://uedjtfwmbqpwgutkatoy.supabase.co/', | |
| 'Origin': 'https://uedjtfwmbqpwgutkatoy.supabase.co', | |
| 'Sec-Fetch-Dest': 'video', | |
| 'Sec-Fetch-Mode': 'no-cors', | |
| 'Sec-Fetch-Site': 'same-origin' | |
| } | |
| # First, try a HEAD request to check if URL is accessible | |
| try: | |
| head_response = requests.head(video_url, headers=headers, timeout=10, allow_redirects=True) | |
| logger.info(f"HEAD request status: {head_response.status_code}") | |
| logger.info(f"Content-Length: {head_response.headers.get('content-length', 'unknown')}") | |
| except Exception as e: | |
| logger.warning(f"HEAD request failed (continuing anyway): {e}") | |
| # Download video from URL with increased timeout and streaming | |
| response = requests.get( | |
| video_url, | |
| stream=True, | |
| timeout=120, # Increased timeout to 2 minutes | |
| headers=headers, | |
| allow_redirects=True # Follow redirects if any | |
| ) | |
| response.raise_for_status() | |
| # Check content type | |
| content_type = response.headers.get('content-type', '') | |
| logger.info(f"Content-Type: {content_type}") | |
| # Verify we're getting video content | |
| if content_type and not any(vid_type in content_type.lower() for vid_type in ['video', 'octet-stream', 'mp4', 'webm', 'avi']): | |
| logger.warning(f"Unexpected content type: {content_type}") | |
| # Download and save the file | |
| total_size = 0 | |
| with open(temp_file_path, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| total_size += len(chunk) | |
| logger.info(f"Successfully downloaded {total_size} bytes to {temp_file_path}") | |
| # Verify file was downloaded and has content | |
| if not os.path.exists(temp_file_path): | |
| raise HTTPException(status_code=500, detail="Failed to save downloaded video") | |
| file_size = os.path.getsize(temp_file_path) | |
| logger.info(f"Saved file size: {file_size} bytes") | |
| if file_size == 0: | |
| raise HTTPException(status_code=400, detail="Downloaded video file is empty") | |
| if file_size < 1000: # Less than 1KB is suspicious | |
| logger.warning(f"Downloaded file is very small ({file_size} bytes), might be an error page") | |
| # Analyze video | |
| logger.info("Starting video analysis...") | |
| results = model_instance.predict(temp_file_path) | |
| if 'error' in results: | |
| raise HTTPException(status_code=500, detail=f"Analysis failed: {results['error']}") | |
| # Process results | |
| neuroticism_score = results['neuroticism'] | |
| if neuroticism_score <= 0.3: | |
| neuroticism_level = "Low (Emotionally Stable)" | |
| elif neuroticism_score <= 0.7: | |
| neuroticism_level = "Medium (Moderate Reactivity)" | |
| else: | |
| neuroticism_level = "High (Emotionally Sensitive)" | |
| emotions = results['emotions'] | |
| dominant_emotion = max(emotions.keys(), key=lambda k: emotions[k]) | |
| confidence = "High" if results['model_used'] == 'real' else "Demo Mode" | |
| logger.info("Analysis completed successfully") | |
| return AnalysisResult( | |
| neuroticism=neuroticism_score, | |
| neuroticism_level=neuroticism_level, | |
| emotions=EmotionScores(**emotions), | |
| dominant_emotion=dominant_emotion, | |
| frames_processed=results['frames_processed'], | |
| audio_features_extracted=results['audio_features_extracted'], | |
| model_used=results['model_used'], | |
| confidence=confidence | |
| ) | |
| except requests.Timeout: | |
| logger.error("Download timeout - video took too long to download") | |
| raise HTTPException(status_code=504, detail="Video download timeout. The video may be too large or the connection is slow.") | |
| except requests.HTTPError as e: | |
| logger.error(f"HTTP error downloading video: {e}") | |
| status_code = e.response.status_code if e.response else 400 | |
| detail = f"Failed to download video: HTTP {status_code}" | |
| if status_code == 403: | |
| detail += " (Access Forbidden - check if URL is publicly accessible)" | |
| elif status_code == 404: | |
| detail += " (Video not found at URL)" | |
| raise HTTPException(status_code=status_code, detail=detail) | |
| except requests.RequestException as e: | |
| logger.error(f"Network error downloading video: {e}") | |
| raise HTTPException(status_code=400, detail=f"Failed to download video: {str(e)}") | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Analysis error: {e}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") | |
| finally: | |
| # Clean up temporary files | |
| try: | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| logger.info(f"Cleaned up temp directory: {temp_dir}") | |
| except Exception as e: | |
| logger.warning(f"Failed to clean up temp directory: {e}") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |