mirrormindv2 / app.py
sam12555's picture
Update app.py
559bfe7 verified
from fastapi import FastAPI, File, UploadFile, HTTPException, Body
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, HttpUrl
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as tv
try:
from transformers import Wav2Vec2Model, Wav2Vec2Config
_HAS_TRANSFORMERS = True
except ImportError:
_HAS_TRANSFORMERS = False
import cv2
import numpy as np
import librosa
from PIL import Image
import tempfile
import os
import shutil
from typing import Dict, Any, Optional
import json
import warnings
import logging
import asyncio
from contextlib import asynccontextmanager
import requests
from urllib.parse import urlparse
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global model instance
model_instance = None
# Response models
class EmotionScores(BaseModel):
Anger: float
Disgust: float
Fear: float
Happy: float
Neutral: float
Sad: float
class AnalysisResult(BaseModel):
neuroticism: float
neuroticism_level: str
emotions: EmotionScores
dominant_emotion: str
frames_processed: int
audio_features_extracted: bool
model_used: str
confidence: str
class ErrorResponse(BaseModel):
error: str
message: str
# Request model for URL endpoint
class VideoUrlRequest(BaseModel):
video_url: HttpUrl # Ensures valid URL format
# MirrorMind Model Architecture (unchanged)
class GradientReverseFn(torch.autograd.Function):
"""Gradient reversal function for adversarial training"""
@staticmethod
def forward(ctx, x, lambd):
ctx.lambd = lambd
return x.view_as(x)
@staticmethod
def backward(ctx, grad_output):
return -ctx.lambd * grad_output, None
def grad_reverse(x, lambd=1.0):
"""Gradient reversal layer"""
return GradientReverseFn.apply(x, lambd)
class MirrorMindModel(nn.Module):
def __init__(
self,
num_frames=8,
audio_length=64000, # 4s at 16kHz
num_emotions=6,
num_domains=2,
hidden_dim=512,
use_pretrained_video=True,
use_pretrained_audio=True,
freeze_video_backbone=True,
freeze_audio_backbone=True,
):
super().__init__()
self.num_frames = num_frames
self.audio_length = audio_length
self.num_emotions = num_emotions
self.num_domains = num_domains
self.hidden_dim = hidden_dim
# Video encoder
if use_pretrained_video:
self.video_backbone = tv.resnet18(weights=tv.ResNet18_Weights.IMAGENET1K_V1)
else:
self.video_backbone = tv.resnet18(weights=None)
self.video_feat_dim = self.video_backbone.fc.in_features # 512
self.video_backbone.fc = nn.Identity()
if freeze_video_backbone:
for param in self.video_backbone.parameters():
param.requires_grad = False
for param in self.video_backbone.layer4.parameters():
param.requires_grad = True
self.video_proj = nn.Sequential(
nn.Linear(self.video_feat_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(inplace=True),
nn.Dropout(0.2),
)
# Audio Encoder
self.audio_feat_dim = 0
if use_pretrained_audio and _HAS_TRANSFORMERS:
try:
config = Wav2Vec2Config.from_pretrained("facebook/wav2vec2-base")
if hasattr(config, "gradient_checkpointing"):
delattr(config, "gradient_checkpointing")
self.audio_backbone = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base", config=config)
self.audio_feat_dim = self.audio_backbone.config.hidden_size # 768
if freeze_audio_backbone:
for param in self.audio_backbone.parameters():
param.requires_grad = False
for name, param in self.audio_backbone.named_parameters():
if any(x in name for x in ['encoder.layers.10', 'encoder.layers.11']):
param.requires_grad = True
self.audio_pool = nn.AdaptiveAvgPool1d(1)
logger.info("Using Wav2Vec2 audio encoder")
except Exception as e:
logger.warning(f"Could not load Wav2Vec2, using CNN: {e}")
self._create_improved_audio_encoder()
else:
self._create_improved_audio_encoder()
logger.info("Using CNN audio encoder")
self.audio_proj = nn.Sequential(
nn.Linear(self.audio_feat_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(inplace=True),
nn.Dropout(0.2),
)
# Temporal attention
self.temporal_attention = nn.Sequential(
nn.Linear(self.video_feat_dim, 128),
nn.ReLU(inplace=True),
nn.Linear(128, 1)
)
# Fusion layer
fusion_input_dim = hidden_dim * 2
self.fusion_output_dim = hidden_dim
self.fusion_proj = nn.Sequential(
nn.Linear(fusion_input_dim, self.fusion_output_dim),
nn.BatchNorm1d(self.fusion_output_dim),
nn.ReLU(inplace=True),
nn.Dropout(0.3),
)
# Task heads
self.emotion_head = nn.Sequential(
nn.Linear(self.fusion_output_dim, hidden_dim // 2),
nn.BatchNorm1d(hidden_dim // 2),
nn.ReLU(inplace=True),
nn.Dropout(0.4),
nn.Linear(hidden_dim // 2, num_emotions),
)
self.neuro_head = nn.Sequential(
nn.Linear(self.fusion_output_dim, hidden_dim // 2),
nn.BatchNorm1d(hidden_dim // 2),
nn.ReLU(inplace=True),
nn.Dropout(0.3),
nn.Linear(hidden_dim // 2, 1),
nn.Sigmoid()
)
# Domain head
self.domain_head = nn.Sequential(
nn.Linear(self.fusion_output_dim, hidden_dim // 4),
nn.ReLU(inplace=True),
nn.Dropout(0.2),
nn.Linear(hidden_dim // 4, num_domains),
)
self._init_weights()
def _create_improved_audio_encoder(self):
self.audio_backbone = nn.Sequential(
nn.Conv1d(1, 64, kernel_size=7, stride=2, padding=3),
nn.BatchNorm1d(64),
nn.ReLU(inplace=True),
nn.MaxPool1d(2),
nn.Conv1d(64, 128, kernel_size=5, stride=1, padding=2),
nn.BatchNorm1d(128),
nn.ReLU(inplace=True),
nn.MaxPool1d(2),
nn.Conv1d(128, 256, kernel_size=3, stride=1, padding=1),
nn.BatchNorm1d(256),
nn.ReLU(inplace=True),
nn.AdaptiveAvgPool1d(1)
)
self.audio_feat_dim = 256
self.audio_pool = None
def _init_weights(self):
for m in self.modules():
if isinstance(m, nn.Linear):
if m.out_features == self.num_emotions:
nn.init.xavier_uniform_(m.weight, gain=1.0)
if m.bias is not None:
nn.init.zeros_(m.bias)
elif m.out_features == 1:
nn.init.xavier_normal_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)
else:
nn.init.xavier_normal_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Conv1d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
elif isinstance(m, (nn.BatchNorm1d, nn.LayerNorm)):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
@staticmethod
def _prep_frames(frames):
device = frames.device
if frames.dim() == 5:
B, T = frames.shape[:2]
if frames.shape[-1] == 3:
frames = frames.permute(0, 1, 4, 2, 3)
B, T, C, H, W = frames.shape
frames = frames.reshape(B * T, C, H, W)
elif frames.dim() == 4:
B = frames.shape[0]
T = 1
else:
raise ValueError(f"Unsupported frames shape: {frames.shape}")
if frames.dtype != torch.float32:
frames = frames.float()
if frames.max() > 1.1:
frames = frames / 255.0
frames = torch.clamp(frames, 0.0, 1.0)
return frames, B, T
def _process_video_temporal_attention(self, vid_feat_bt, B, T):
if T == 1:
return vid_feat_bt.view(B, -1)
vid_feat_reshaped = vid_feat_bt.view(B, T, -1)
attention_scores = self.temporal_attention(vid_feat_reshaped) # B, T, 1
attn_weights = F.softmax(attention_scores, dim=1)
return torch.sum(vid_feat_reshaped * attn_weights, dim=1)
def forward(self, frames, audio, alpha=0.0):
device = next(self.parameters()).device
frames_nchw, B, T = self._prep_frames(frames.to(device))
try:
vid_feat_bt = self.video_backbone(frames_nchw)
vid_feat_bt = vid_feat_bt.flatten(1)
vid_feat = self._process_video_temporal_attention(vid_feat_bt, B, T)
vid_feat = self.video_proj(vid_feat)
except Exception as e:
logger.error(f"Video processing error: {e}")
vid_feat = torch.zeros((B, self.hidden_dim), device=device)
try:
if audio is None or torch.all(audio == 0):
aud_feat = torch.zeros((B, self.hidden_dim), device=device)
else:
audio = audio.float().to(device)
if hasattr(self.audio_backbone, 'from_pretrained'):
attn_mask = (audio.abs() > 1e-6).long()
out = self.audio_backbone(input_values=audio, attention_mask=attn_mask)
x = out.last_hidden_state.transpose(1, 2)
x = self.audio_pool(x).squeeze(-1)
aud_feat = x
else:
x = audio.unsqueeze(1)
x = self.audio_backbone(x)
if x.dim() == 3:
x = x.squeeze(-1)
aud_feat = x
aud_feat = self.audio_proj(aud_feat)
except Exception as e:
logger.error(f"Audio processing error: {e}")
aud_feat = torch.zeros((B, self.hidden_dim), device=device)
fused = torch.cat([vid_feat, aud_feat], dim=1)
fused_final = self.fusion_proj(fused)
emotion_logits = self.emotion_head(fused_final)
neuroticism_pred = self.neuro_head(fused_final)
domain_logits = None
if self.training and alpha > 0.0:
if alpha < 0.01:
rev = grad_reverse(fused_final, lambd=alpha * 0.1)
domain_logits = self.domain_head(rev)
return neuroticism_pred, emotion_logits, domain_logits
class MirrorMindInference:
def __init__(self):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {self.device}")
model_path = "mirror_model.pth"
logger.info(f"Loading model from {model_path}...")
if not os.path.exists(model_path):
logger.warning(f"Model file {model_path} not found. Using fallback mode.")
self.model = None
return
checkpoint = None
pytorch_version = torch.__version__
if pytorch_version.startswith(("2.8", "2.9")):
logger.info(f"Detected PyTorch {pytorch_version} - using version-specific loading...")
try:
logger.info("Loading with weights_only=False...")
with warnings.catch_warnings():
warnings.simplefilter("ignore")
checkpoint = torch.load(model_path, map_location=self.device, weights_only=False)
logger.info("✓ Successfully loaded complete model")
except Exception as e1:
logger.error(f"✗ Failed: {e1}")
try:
logger.info("Attempting state_dict loading with weights_only=True...")
checkpoint = torch.load(model_path, map_location=self.device, weights_only=True)
logger.info("✓ Loaded as state_dict")
except Exception as e2:
logger.error(f"✗ Failed: {e2}")
checkpoint = None
else:
try:
logger.info(f"Using standard loading for PyTorch {pytorch_version}...")
checkpoint = torch.load(model_path, map_location=self.device)
logger.info("✓ Loaded with standard method")
except Exception as e:
logger.error(f"✗ Failed: {e}")
checkpoint = None
if checkpoint is None:
logger.warning("All loading methods failed. Using fallback mode.")
self.model = None
return
if isinstance(checkpoint, dict):
logger.info(f"Checkpoint keys: {list(checkpoint.keys())}")
if 'model' in checkpoint and 'state_dict' in checkpoint:
self.model = checkpoint['model']
self.model.load_state_dict(checkpoint['state_dict'])
logger.info("✓ Loaded model architecture + state dict")
elif 'state_dict' in checkpoint:
logger.info("Found 'state_dict' - attempting to reconstruct model...")
if 'model_config' in checkpoint:
self.model = MirrorMindModel(**checkpoint['model_config'])
self.model.load_state_dict(checkpoint['state_dict'])
logger.info("✓ Loaded using model_config + state_dict")
else:
logger.warning("⚠️ No model_config. Using fallback.")
self.model = None
return
elif 'model_state_dict' in checkpoint:
logger.info("Found 'model_state_dict' - checking for model class info...")
state_dict = checkpoint['model_state_dict']
if 'model_config' in checkpoint:
self.model = MirrorMindModel(**checkpoint['model_config'])
self.model.load_state_dict(state_dict)
logger.info("✓ Loaded using model_config + model_state_dict")
else:
logger.warning("⚠️ No model_config. Using fallback.")
self.model = None
return
elif len(checkpoint.keys()) > 0 and all(isinstance(v, torch.Tensor) for v in checkpoint.values()):
logger.info("Checkpoint appears to be a direct state dict")
logger.warning("⚠️ Cannot reconstruct without model_config. Using fallback.")
self.model = None
return
else:
if hasattr(checkpoint, 'eval') and callable(checkpoint.eval):
self.model = checkpoint
logger.info("✓ Using checkpoint as complete model")
else:
logger.warning("⚠️ Unrecognized format. Using fallback.")
self.model = None
return
else:
if hasattr(checkpoint, 'eval') and callable(checkpoint.eval):
self.model = checkpoint
logger.info("✓ Loaded complete model object")
else:
logger.warning("⚠️ Not a model object. Using fallback.")
self.model = None
return
if self.model is not None:
self.model.to(self.device)
self.model.eval()
logger.info("Model loaded and ready for inference!")
else:
logger.warning("Model is None after loading. Using fallback.")
def extract_video_frames(self, video_path: str, num_frames: int = 8) -> torch.Tensor:
try:
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total_frames == 0:
raise ValueError("Could not read video file")
frame_indices = np.linspace(0, total_frames - 1, num_frames, dtype=int)
frames = []
for idx in frame_indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if ret:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame = cv2.resize(frame, (224, 224))
frame = frame.astype(np.float32) / 255.0
frames.append(frame)
cap.release()
if not frames:
raise ValueError("No frames extracted")
frames = np.array(frames)
frames = np.transpose(frames, (0, 3, 1, 2))
video_tensor = torch.from_numpy(frames).to(self.device)
return video_tensor
except Exception as e:
logger.error(f"Video extraction failed: {e}")
dummy_frames = np.random.rand(num_frames, 3, 224, 224).astype(np.float32)
return torch.from_numpy(dummy_frames).to(self.device)
def extract_audio_features(self, video_path: str, duration: float = 4.0):
try:
audio, sr = librosa.load(video_path, sr=16000, duration=duration)
target_length = self.model.audio_length if self.model else 64000
if len(audio) == 0:
raise ValueError("No audio data")
if len(audio) < target_length:
audio = np.pad(audio, (0, target_length - len(audio)))
elif len(audio) > target_length:
audio = audio[:target_length]
audio_tensor = torch.from_numpy(audio).float().to(self.device)
return audio_tensor
except Exception as e:
logger.error(f"Audio extraction failed: {e}")
target_length = self.model.audio_length if self.model else 64000
return torch.zeros(target_length).to(self.device)
def predict(self, video_path: str) -> Dict[str, Any]:
try:
if not os.path.exists(video_path):
raise ValueError(f"Video not found: {video_path}")
video_features = self.extract_video_frames(video_path)
audio_features = self.extract_audio_features(video_path)
if self.model is not None:
with torch.no_grad():
neuroticism_logits, emotion_logits, _ = self.model(video_features.unsqueeze(0), audio_features.unsqueeze(0))
neuroticism_score = neuroticism_logits.squeeze().item()
emotion_probs = F.softmax(emotion_logits, dim=1).squeeze().cpu().numpy()
emotion_labels = ['Anger', 'Disgust', 'Fear', 'Happy', 'Neutral', 'Sad']
emotion_scores = dict(zip(emotion_labels, emotion_probs))
else:
logger.info("Using fallback predictions")
neuroticism_score = np.random.uniform(0.2, 0.8)
emotion_scores = {
'Happy': np.random.uniform(0.1, 0.4),
'Neutral': np.random.uniform(0.2, 0.5),
'Sad': np.random.uniform(0.05, 0.3),
'Anger': np.random.uniform(0.0, 0.2),
'Fear': np.random.uniform(0.0, 0.15),
'Disgust': np.random.uniform(0.0, 0.1)
}
total = sum(emotion_scores.values())
emotion_scores = {k: v/total for k, v in emotion_scores.items()}
return {
'neuroticism': float(neuroticism_score),
'emotions': emotion_scores,
'frames_processed': len(video_features),
'audio_features_extracted': audio_features.numel() > 0 and not torch.all(audio_features == 0),
'model_used': 'real' if self.model is not None else 'fallback'
}
except Exception as e:
logger.error(f"Prediction error: {e}")
return {
'error': str(e),
'neuroticism': 0.0,
'emotions': {'Error': 1.0},
'frames_processed': 0,
'audio_features_extracted': False,
'model_used': 'error'
}
# Initialize model on startup
@asynccontextmanager
async def lifespan(app: FastAPI):
global model_instance
logger.info("Starting MirrorMind API service...")
model_instance = MirrorMindInference()
logger.info(f"PyTorch version: {torch.__version__}")
logger.info(f"CUDA available: {torch.cuda.is_available()}")
yield
logger.info("Shutting down MirrorMind API service...")
# Initialize FastAPI app
app = FastAPI(
title="MirrorMind API",
description="AI Personality & Emotion Analysis API",
version="1.0.0",
lifespan=lifespan
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure this for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
return {
"message": "MirrorMind API is running",
"version": "1.0.0",
"pytorch_version": torch.__version__,
"cuda_available": torch.cuda.is_available(),
"model_loaded": model_instance.model is not None if model_instance else False
}
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"model_status": "loaded" if model_instance and model_instance.model is not None else "fallback",
"device": str(model_instance.device) if model_instance else "unknown"
}
@app.post("/analyze", response_model=AnalysisResult)
async def analyze_video(file: UploadFile = File(...)):
"""
Analyze a video file for personality traits and emotions.
- **file**: Video file (MP4, AVI, MOV, WebM)
- Returns neuroticism score and emotion analysis
"""
if not model_instance:
raise HTTPException(status_code=503, detail="Model not initialized")
# Validate file type
allowed_extensions = {'.mp4', '.avi', '.mov', '.webm', '.mkv'}
file_extension = os.path.splitext(file.filename.lower())[1]
if file_extension not in allowed_extensions:
raise HTTPException(
status_code=400,
detail=f"Unsupported file format. Allowed formats: {', '.join(allowed_extensions)}"
)
# Create temporary file
temp_dir = tempfile.mkdtemp()
temp_file_path = os.path.join(temp_dir, f"uploaded_video{file_extension}")
try:
# Save uploaded file
with open(temp_file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Analyze video
results = model_instance.predict(temp_file_path)
if 'error' in results:
raise HTTPException(status_code=500, detail=f"Analysis failed: {results['error']}")
# Process results
neuroticism_score = results['neuroticism']
if neuroticism_score <= 0.3:
neuroticism_level = "Low (Emotionally Stable)"
elif neuroticism_score <= 0.7:
neuroticism_level = "Medium (Moderate Reactivity)"
else:
neuroticism_level = "High (Emotionally Sensitive)"
emotions = results['emotions']
dominant_emotion = max(emotions.keys(), key=lambda k: emotions[k])
confidence = "High" if results['model_used'] == 'real' else "Demo Mode"
return AnalysisResult(
neuroticism=neuroticism_score,
neuroticism_level=neuroticism_level,
emotions=EmotionScores(**emotions),
dominant_emotion=dominant_emotion,
frames_processed=results['frames_processed'],
audio_features_extracted=results['audio_features_extracted'],
model_used=results['model_used'],
confidence=confidence
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Analysis error: {e}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
finally:
# Clean up temporary files
try:
shutil.rmtree(temp_dir)
except Exception as e:
logger.warning(f"Failed to clean up temp directory: {e}")
@app.post("/analyze-from-url")
async def analyze_video_from_url(video_url: str):
"""
Analyze a video from a URL (Firebase/Supabase storage).
- **video_url**: Direct URL to video file
- Returns neuroticism score and emotion analysis
"""
if not model_instance:
raise HTTPException(status_code=503, detail="Model not initialized")
import requests
from urllib.parse import urlparse
# Create temporary file
temp_dir = tempfile.mkdtemp()
# Extract file extension from URL or default to .mp4
parsed_url = urlparse(video_url)
file_extension = os.path.splitext(parsed_url.path)[1] or '.mp4'
temp_file_path = os.path.join(temp_dir, f"downloaded_video{file_extension}")
try:
logger.info(f"Attempting to download video from: {video_url}")
# Enhanced headers to mimic browser request - CRITICAL for Supabase
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'identity', # Don't use gzip for videos
'Connection': 'keep-alive',
'Referer': 'https://uedjtfwmbqpwgutkatoy.supabase.co/',
'Origin': 'https://uedjtfwmbqpwgutkatoy.supabase.co',
'Sec-Fetch-Dest': 'video',
'Sec-Fetch-Mode': 'no-cors',
'Sec-Fetch-Site': 'same-origin'
}
# First, try a HEAD request to check if URL is accessible
try:
head_response = requests.head(video_url, headers=headers, timeout=10, allow_redirects=True)
logger.info(f"HEAD request status: {head_response.status_code}")
logger.info(f"Content-Length: {head_response.headers.get('content-length', 'unknown')}")
except Exception as e:
logger.warning(f"HEAD request failed (continuing anyway): {e}")
# Download video from URL with increased timeout and streaming
response = requests.get(
video_url,
stream=True,
timeout=120, # Increased timeout to 2 minutes
headers=headers,
allow_redirects=True # Follow redirects if any
)
response.raise_for_status()
# Check content type
content_type = response.headers.get('content-type', '')
logger.info(f"Content-Type: {content_type}")
# Verify we're getting video content
if content_type and not any(vid_type in content_type.lower() for vid_type in ['video', 'octet-stream', 'mp4', 'webm', 'avi']):
logger.warning(f"Unexpected content type: {content_type}")
# Download and save the file
total_size = 0
with open(temp_file_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
total_size += len(chunk)
logger.info(f"Successfully downloaded {total_size} bytes to {temp_file_path}")
# Verify file was downloaded and has content
if not os.path.exists(temp_file_path):
raise HTTPException(status_code=500, detail="Failed to save downloaded video")
file_size = os.path.getsize(temp_file_path)
logger.info(f"Saved file size: {file_size} bytes")
if file_size == 0:
raise HTTPException(status_code=400, detail="Downloaded video file is empty")
if file_size < 1000: # Less than 1KB is suspicious
logger.warning(f"Downloaded file is very small ({file_size} bytes), might be an error page")
# Analyze video
logger.info("Starting video analysis...")
results = model_instance.predict(temp_file_path)
if 'error' in results:
raise HTTPException(status_code=500, detail=f"Analysis failed: {results['error']}")
# Process results
neuroticism_score = results['neuroticism']
if neuroticism_score <= 0.3:
neuroticism_level = "Low (Emotionally Stable)"
elif neuroticism_score <= 0.7:
neuroticism_level = "Medium (Moderate Reactivity)"
else:
neuroticism_level = "High (Emotionally Sensitive)"
emotions = results['emotions']
dominant_emotion = max(emotions.keys(), key=lambda k: emotions[k])
confidence = "High" if results['model_used'] == 'real' else "Demo Mode"
logger.info("Analysis completed successfully")
return AnalysisResult(
neuroticism=neuroticism_score,
neuroticism_level=neuroticism_level,
emotions=EmotionScores(**emotions),
dominant_emotion=dominant_emotion,
frames_processed=results['frames_processed'],
audio_features_extracted=results['audio_features_extracted'],
model_used=results['model_used'],
confidence=confidence
)
except requests.Timeout:
logger.error("Download timeout - video took too long to download")
raise HTTPException(status_code=504, detail="Video download timeout. The video may be too large or the connection is slow.")
except requests.HTTPError as e:
logger.error(f"HTTP error downloading video: {e}")
status_code = e.response.status_code if e.response else 400
detail = f"Failed to download video: HTTP {status_code}"
if status_code == 403:
detail += " (Access Forbidden - check if URL is publicly accessible)"
elif status_code == 404:
detail += " (Video not found at URL)"
raise HTTPException(status_code=status_code, detail=detail)
except requests.RequestException as e:
logger.error(f"Network error downloading video: {e}")
raise HTTPException(status_code=400, detail=f"Failed to download video: {str(e)}")
except HTTPException:
raise
except Exception as e:
logger.error(f"Analysis error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
finally:
# Clean up temporary files
try:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
logger.info(f"Cleaned up temp directory: {temp_dir}")
except Exception as e:
logger.warning(f"Failed to clean up temp directory: {e}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)