Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| """ | |
| Security utilities for TTS Arena to prevent vote manipulation and botting. | |
| """ | |
| from datetime import datetime, timedelta | |
| from models import db, Vote, User | |
| from sqlalchemy import func, and_, or_ | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| def detect_suspicious_voting_patterns(user_id, hours_back=24, max_votes_per_hour=30): | |
| """ | |
| Detect if a user has suspicious voting patterns. | |
| Updated to allow rapid voting for reasonable periods (30 votes/hour = 1 vote every 2 minutes) | |
| Returns (is_suspicious, reason, vote_count) | |
| """ | |
| if not user_id: | |
| return False, None, 0 | |
| # Check voting frequency over 24 hours | |
| time_threshold = datetime.utcnow() - timedelta(hours=hours_back) | |
| recent_votes = Vote.query.filter( | |
| and_( | |
| Vote.user_id == user_id, | |
| Vote.vote_date >= time_threshold | |
| ) | |
| ).count() | |
| # Allow up to 30 votes per hour (720 votes in 24 hours) | |
| # This allows rapid voting for several hours but catches extended botting | |
| max_votes_24h = max_votes_per_hour * hours_back | |
| if recent_votes > max_votes_24h: | |
| return True, f"Too many votes: {recent_votes} in {hours_back} hours (max: {max_votes_24h})", recent_votes | |
| # Additional check: if someone votes more than 100 times in 3 hours, that's suspicious | |
| # (100 votes in 3 hours = 1 vote every 1.8 minutes, which is very sustained) | |
| if hours_back >= 3: | |
| three_hour_threshold = datetime.utcnow() - timedelta(hours=3) | |
| votes_3h = Vote.query.filter( | |
| and_( | |
| Vote.user_id == user_id, | |
| Vote.vote_date >= three_hour_threshold | |
| ) | |
| ).count() | |
| if votes_3h > 100: | |
| return True, f"Excessive voting in short period: {votes_3h} votes in 3 hours", recent_votes | |
| return False, None, recent_votes | |
| def detect_model_bias(user_id, model_id, min_votes=5, bias_threshold=0.8): | |
| """ | |
| Detect if a user consistently votes for a specific model. | |
| Returns (is_biased, bias_ratio, total_votes_for_model, total_votes) | |
| """ | |
| if not user_id: | |
| return False, 0, 0, 0 | |
| # Get all votes by this user | |
| total_votes = Vote.query.filter_by(user_id=user_id).count() | |
| if total_votes < min_votes: | |
| return False, 0, 0, total_votes | |
| # Get votes where this user chose the specific model | |
| votes_for_model = Vote.query.filter( | |
| and_( | |
| Vote.user_id == user_id, | |
| Vote.model_chosen == model_id | |
| ) | |
| ).count() | |
| bias_ratio = votes_for_model / total_votes if total_votes > 0 else 0 | |
| is_biased = bias_ratio >= bias_threshold and total_votes >= min_votes | |
| return is_biased, bias_ratio, votes_for_model, total_votes | |
| def detect_coordinated_voting(model_id, hours_back=6, min_users=3, vote_threshold=10): | |
| """ | |
| Detect coordinated voting campaigns for a specific model. | |
| Returns (is_coordinated, user_count, vote_count, suspicious_users) | |
| """ | |
| time_threshold = datetime.utcnow() - timedelta(hours=hours_back) | |
| # Get recent votes for this model | |
| recent_votes = db.session.query(Vote.user_id, Vote.vote_date).filter( | |
| and_( | |
| Vote.model_chosen == model_id, | |
| Vote.vote_date >= time_threshold | |
| ) | |
| ).all() | |
| if len(recent_votes) < vote_threshold: | |
| return False, 0, len(recent_votes), [] | |
| # Count unique users and analyze patterns | |
| user_vote_data = {} | |
| for vote in recent_votes: | |
| if vote.user_id: | |
| if vote.user_id not in user_vote_data: | |
| user_vote_data[vote.user_id] = [] | |
| user_vote_data[vote.user_id].append(vote.vote_date) | |
| user_count = len(user_vote_data) | |
| # Enhanced detection logic | |
| if user_count >= min_users and len(recent_votes) >= vote_threshold: | |
| suspicious_users = [] | |
| high_suspicion_users = [] | |
| for user_id, vote_dates in user_vote_data.items(): | |
| user_votes_for_model = len(vote_dates) | |
| if user_votes_for_model > 1: # Multiple votes for same model in short time | |
| user = User.query.get(user_id) | |
| if user: | |
| # Calculate suspicion level | |
| account_age_days = (datetime.utcnow() - user.join_date).days if user.join_date else 0 | |
| vote_frequency = user_votes_for_model / hours_back # votes per hour | |
| # Determine suspicion level | |
| suspicion_level = "low" | |
| if account_age_days < 30 or vote_frequency > 3: | |
| suspicion_level = "high" | |
| high_suspicion_users.append(user_id) | |
| elif account_age_days < 90 or vote_frequency > 1: | |
| suspicion_level = "medium" | |
| user_data = { | |
| 'user_id': user_id, | |
| 'username': user.username, | |
| 'votes_for_model': user_votes_for_model, | |
| 'account_age_days': account_age_days, | |
| 'suspicion_level': suspicion_level, | |
| 'first_vote_at': min(vote_dates), | |
| 'last_vote_at': max(vote_dates) | |
| } | |
| suspicious_users.append(user_data) | |
| # Calculate confidence score | |
| confidence_factors = [] | |
| # Factor 1: Ratio of high suspicion users | |
| if suspicious_users: | |
| high_suspicion_ratio = len(high_suspicion_users) / len(suspicious_users) | |
| confidence_factors.append(min(high_suspicion_ratio * 0.4, 0.4)) | |
| # Factor 2: Vote concentration (more votes in shorter time = higher confidence) | |
| vote_concentration = min(len(recent_votes) / (hours_back * user_count), 1.0) | |
| confidence_factors.append(vote_concentration * 0.3) | |
| # Factor 3: New account participation | |
| new_account_ratio = sum(1 for u in suspicious_users if u['account_age_days'] < 30) / len(suspicious_users) if suspicious_users else 0 | |
| confidence_factors.append(new_account_ratio * 0.3) | |
| confidence_score = sum(confidence_factors) | |
| # Only consider it coordinated if confidence is above threshold | |
| is_coordinated = confidence_score >= 0.6 | |
| if is_coordinated: | |
| # Log the campaign automatically | |
| try: | |
| from models import log_coordinated_campaign, Model | |
| model = Model.query.get(model_id) | |
| model_type = model.model_type if model else "unknown" | |
| participants_data = [{ | |
| 'user_id': u['user_id'], | |
| 'votes_in_campaign': u['votes_for_model'], | |
| 'first_vote_at': u['first_vote_at'], | |
| 'last_vote_at': u['last_vote_at'], | |
| 'suspicion_level': u['suspicion_level'] | |
| } for u in suspicious_users] | |
| campaign = log_coordinated_campaign( | |
| model_id=model_id, | |
| model_type=model_type, | |
| vote_count=len(recent_votes), | |
| user_count=user_count, | |
| time_window_hours=hours_back, | |
| confidence_score=confidence_score, | |
| participants_data=participants_data | |
| ) | |
| # Automatically timeout high suspicion users | |
| from models import create_user_timeout | |
| timeout_count = 0 | |
| for user_id in high_suspicion_users: | |
| try: | |
| create_user_timeout( | |
| user_id=user_id, | |
| reason=f"Automatic timeout for participation in coordinated voting campaign (Campaign ID: {campaign.id})", | |
| timeout_type="coordinated_voting", | |
| duration_days=30, | |
| related_campaign_id=campaign.id | |
| ) | |
| timeout_count += 1 | |
| except Exception as e: | |
| logger.error(f"Error creating timeout for user {user_id}: {str(e)}") | |
| logger.warning(f"Coordinated voting campaign detected and logged (ID: {campaign.id}). {timeout_count} users timed out.") | |
| except Exception as e: | |
| logger.error(f"Error logging coordinated campaign: {str(e)}") | |
| return is_coordinated, user_count, len(recent_votes), suspicious_users | |
| return False, user_count, len(recent_votes), [] | |
| def detect_rapid_voting(user_id, min_interval_seconds=3): | |
| """ | |
| Detect if a user is voting too rapidly (potential bot behavior). | |
| This allows rapid voting (3+ seconds) for reasonable periods, but flags | |
| extended periods of very rapid voting that indicate bot behavior. | |
| Returns (is_rapid, intervals, avg_interval) | |
| """ | |
| if not user_id: | |
| return False, [], 0 | |
| # Get more recent votes to better analyze patterns (last 50 instead of 10) | |
| recent_votes = Vote.query.filter_by(user_id=user_id).order_by( | |
| Vote.vote_date.desc() | |
| ).limit(50).all() | |
| if len(recent_votes) < 50: # Need at least 50 votes to detect patterns | |
| return False, [], 0 | |
| # Calculate intervals between votes | |
| intervals = [] | |
| for i in range(len(recent_votes) - 1): | |
| interval = (recent_votes[i].vote_date - recent_votes[i + 1].vote_date).total_seconds() | |
| intervals.append(interval) | |
| avg_interval = sum(intervals) / len(intervals) if intervals else 0 | |
| # More sophisticated bot detection: | |
| # 1. Count votes with intervals < 3 seconds (very rapid) | |
| very_rapid_votes = sum(1 for interval in intervals if interval < 3) | |
| # 2. Count votes with intervals < 1 second (extremely rapid - likely bot) | |
| extremely_rapid_votes = sum(1 for interval in intervals if interval < 1) | |
| # 3. Check for sustained rapid voting patterns | |
| # Look for sequences of 10+ votes all under 5 seconds | |
| sustained_rapid_sequences = 0 | |
| current_sequence = 0 | |
| for interval in intervals: | |
| if interval < 5: | |
| current_sequence += 1 | |
| else: | |
| if current_sequence >= 10: # 10+ votes in a row under 5 seconds | |
| sustained_rapid_sequences += 1 | |
| current_sequence = 0 | |
| # Final check for remaining sequence | |
| if current_sequence >= 10: | |
| sustained_rapid_sequences += 1 | |
| # Flag as rapid/bot if: | |
| # - More than 20% of votes are extremely rapid (< 1 second) OR | |
| # - More than 60% of votes are very rapid (< 3 seconds) AND there are sustained sequences OR | |
| # - There are multiple sustained rapid sequences (10+ votes under 5 seconds each) | |
| total_intervals = len(intervals) | |
| extremely_rapid_ratio = extremely_rapid_votes / total_intervals if total_intervals > 0 else 0 | |
| very_rapid_ratio = very_rapid_votes / total_intervals if total_intervals > 0 else 0 | |
| is_rapid = ( | |
| extremely_rapid_ratio > 0.2 or # > 20% extremely rapid | |
| (very_rapid_ratio > 0.6 and sustained_rapid_sequences > 0) or # > 60% very rapid + sustained | |
| sustained_rapid_sequences >= 2 # Multiple sustained rapid sequences | |
| ) | |
| return is_rapid, intervals, avg_interval | |
| def check_user_security_score(user_id): | |
| """ | |
| Calculate a security score for a user based on various factors. | |
| Returns (score, factors) where score is 0-100 (higher = more trustworthy) | |
| """ | |
| if not user_id: | |
| return 0, {"error": "No user ID provided"} | |
| user = User.query.get(user_id) | |
| if not user: | |
| return 0, {"error": "User not found"} | |
| factors = {} | |
| score = 100 # Start with perfect score and deduct points | |
| # Account age factor | |
| if user.join_date: | |
| account_age_days = (datetime.utcnow() - user.join_date).days | |
| factors['account_age_days'] = account_age_days | |
| if account_age_days < 45: | |
| score -= 30 | |
| elif account_age_days < 90: | |
| score -= 15 | |
| elif account_age_days < 180: | |
| score -= 5 | |
| else: | |
| score -= 20 | |
| factors['account_age_days'] = None | |
| # HF account age factor | |
| if user.hf_account_created: | |
| hf_age_days = (datetime.utcnow() - user.hf_account_created).days | |
| factors['hf_account_age_days'] = hf_age_days | |
| if hf_age_days < 30: | |
| score -= 25 # This should be caught by auth, but double-check | |
| elif hf_age_days < 90: | |
| score -= 10 | |
| else: | |
| score -= 15 | |
| factors['hf_account_age_days'] = None | |
| # Voting pattern analysis | |
| is_suspicious, reason, vote_count = detect_suspicious_voting_patterns(user_id) | |
| factors['suspicious_voting'] = is_suspicious | |
| factors['recent_vote_count'] = vote_count | |
| if is_suspicious: | |
| score -= 25 | |
| factors['suspicious_reason'] = reason | |
| # Rapid voting check | |
| is_rapid, intervals, avg_interval = detect_rapid_voting(user_id) | |
| factors['rapid_voting'] = is_rapid | |
| factors['avg_vote_interval'] = avg_interval | |
| if is_rapid: | |
| score -= 20 | |
| # Total vote count (very new users with many votes are suspicious) | |
| total_votes = Vote.query.filter_by(user_id=user_id).count() | |
| factors['total_votes'] = total_votes | |
| if account_age_days and account_age_days < 7 and total_votes > 20: | |
| score -= 15 # New account with many votes | |
| # Model bias detection - check for extreme bias toward any single model | |
| if total_votes >= 5: # Only check if user has enough votes | |
| max_bias_ratio = 0 | |
| most_biased_model = None | |
| # Get all models this user has voted for | |
| user_votes = Vote.query.filter_by(user_id=user_id).all() | |
| model_stats = {} | |
| for vote in user_votes: | |
| chosen_id = vote.model_chosen | |
| rejected_id = vote.model_rejected | |
| # Track appearances and choices | |
| if chosen_id not in model_stats: | |
| model_stats[chosen_id] = {'chosen': 0, 'appeared': 0} | |
| if rejected_id not in model_stats: | |
| model_stats[rejected_id] = {'chosen': 0, 'appeared': 0} | |
| model_stats[chosen_id]['chosen'] += 1 | |
| model_stats[chosen_id]['appeared'] += 1 | |
| model_stats[rejected_id]['appeared'] += 1 | |
| # Find the highest bias ratio | |
| for model_id, stats in model_stats.items(): | |
| if stats['appeared'] >= 5: # Only consider models with enough appearances | |
| bias_ratio = stats['chosen'] / stats['appeared'] | |
| if bias_ratio > max_bias_ratio: | |
| max_bias_ratio = bias_ratio | |
| most_biased_model = model_id | |
| factors['max_bias_ratio'] = max_bias_ratio | |
| factors['most_biased_model_id'] = most_biased_model | |
| # Deduct points based on bias level | |
| if max_bias_ratio >= 0.95: # 95%+ bias | |
| score -= 30 | |
| factors['bias_penalty'] = 'Extreme bias (95%+)' | |
| elif max_bias_ratio >= 0.9: # 90%+ bias | |
| score -= 20 | |
| factors['bias_penalty'] = 'Very high bias (90%+)' | |
| elif max_bias_ratio >= 0.8: # 80%+ bias | |
| score -= 10 | |
| factors['bias_penalty'] = 'High bias (80%+)' | |
| else: | |
| factors['bias_penalty'] = None | |
| else: | |
| factors['max_bias_ratio'] = 0 | |
| factors['bias_penalty'] = None | |
| # Ensure score doesn't go below 0 | |
| score = max(0, score) | |
| factors['final_score'] = score | |
| return score, factors | |
| def is_vote_allowed(user_id, ip_address=None): | |
| """ | |
| Check if a vote should be allowed based on security factors. | |
| Returns (allowed, reason, security_score) | |
| """ | |
| if not user_id: | |
| return False, "User not authenticated", 0 | |
| # Check if user is currently timed out | |
| try: | |
| from models import check_user_timeout | |
| is_timed_out, timeout = check_user_timeout(user_id) | |
| if is_timed_out: | |
| remaining_time = timeout.expires_at - datetime.utcnow() | |
| days_remaining = remaining_time.days | |
| hours_remaining = remaining_time.seconds // 3600 | |
| if days_remaining > 0: | |
| time_str = f"{days_remaining} day(s)" | |
| else: | |
| time_str = f"{hours_remaining} hour(s)" | |
| return False, f"Account temporarily suspended until {timeout.expires_at.strftime('%Y-%m-%d %H:%M')} ({time_str} remaining). Reason: {timeout.reason}", 0 | |
| except ImportError: | |
| # If models import fails, continue with other checks | |
| pass | |
| except Exception as e: | |
| logger.error(f"Error checking user timeout: {str(e)}") | |
| # Check security score | |
| score, factors = check_user_security_score(user_id) | |
| # Very low scores are blocked | |
| if score < 20: | |
| return False, f"Security score too low: {score}/100", score | |
| # Check for recent suspicious activity | |
| if factors.get('suspicious_voting'): | |
| return False, f"Suspicious voting pattern detected: {factors.get('suspicious_reason')}", score | |
| if factors.get('rapid_voting'): | |
| return False, f"Voting too rapidly (avg interval: {factors.get('avg_vote_interval', 0):.1f}s)", score | |
| # Additional IP-based checks could go here | |
| return True, "Vote allowed", score |