SWE-Review / app.py
zhimin-z
use wakko
0d6aceb
import gradio as gr
from gradio_leaderboard import Leaderboard, ColumnFilter
import json
import os
import time
import requests
from datetime import datetime, timezone
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.errors import HfHubHTTPError
import backoff
from dotenv import load_dotenv
import pandas as pd
import random
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
# Load environment variables
load_dotenv()
# =============================================================================
# CONFIGURATION
# =============================================================================
AGENTS_REPO = "SWE-Arena/bot_metadata" # HuggingFace dataset for agent metadata
LEADERBOARD_REPO = "SWE-Arena/leaderboard_metadata" # HuggingFace dataset for leaderboard data
LEADERBOARD_COLUMNS = [
("Agent Name", "string"),
("Website", "string"),
("Total Reviews", "number"),
("Merged PRs", "number"),
("Acceptance Rate (%)", "number"),
]
# =============================================================================
# HUGGINGFACE API WRAPPERS WITH BACKOFF
# =============================================================================
def is_rate_limit_error(e):
"""Check if exception is a HuggingFace rate limit error (429)."""
if isinstance(e, HfHubHTTPError):
return e.response.status_code == 429
return False
@backoff.on_exception(
backoff.expo,
HfHubHTTPError,
max_tries=8,
base=300,
max_value=3600,
giveup=lambda e: not is_rate_limit_error(e),
on_backoff=lambda details: print(
f"Rate limited. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/8..."
)
)
def list_repo_files_with_backoff(api, **kwargs):
"""Wrapper for api.list_repo_files() with exponential backoff for rate limits."""
return api.list_repo_files(**kwargs)
@backoff.on_exception(
backoff.expo,
HfHubHTTPError,
max_tries=8,
base=300,
max_value=3600,
giveup=lambda e: not is_rate_limit_error(e),
on_backoff=lambda details: print(
f"Rate limited. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/8..."
)
)
def hf_hub_download_with_backoff(**kwargs):
"""Wrapper for hf_hub_download() with exponential backoff for rate limits."""
return hf_hub_download(**kwargs)
# =============================================================================
# GITHUB API OPERATIONS
# =============================================================================
def request_with_backoff(method, url, *, headers=None, params=None, json_body=None, data=None, max_retries=10, timeout=30):
"""
Perform an HTTP request with exponential backoff and jitter for GitHub API.
Retries on 403/429 (rate limits), 5xx server errors, and transient network exceptions.
Returns the final requests.Response on success or non-retryable status, or None after exhausting retries.
"""
delay = 1.0
for attempt in range(max_retries):
try:
resp = requests.request(
method,
url,
headers=headers or {},
params=params,
json=json_body,
data=data,
timeout=timeout
)
status = resp.status_code
# Success
if 200 <= status < 300:
return resp
# Rate limits or server errors -> retry with backoff
if status in (403, 429) or 500 <= status < 600:
wait = None
# Prefer Retry-After when present
retry_after = resp.headers.get('Retry-After') or resp.headers.get('retry-after')
if retry_after:
try:
wait = float(retry_after)
except Exception:
wait = None
# Fallback to X-RateLimit-Reset when 403/429
if wait is None and status in (403, 429):
reset_hdr = resp.headers.get('X-RateLimit-Reset') or resp.headers.get('x-ratelimit-reset')
if reset_hdr:
try:
reset_timestamp = int(float(reset_hdr))
wait = max(reset_timestamp - time.time() + 2, 1)
except Exception:
wait = None
# Final fallback: exponential backoff with jitter
if wait is None:
wait = delay + random.uniform(0, 0.5)
# Cap individual wait to avoid extreme sleeps
wait = max(1.0, min(wait, 120.0))
print(f"GitHub API {status}. Backing off {wait:.1f}s (attempt {attempt + 1}/{max_retries})...")
time.sleep(wait)
delay = min(delay * 2, 60.0)
continue
# Non-retryable error; return response for caller to handle
return resp
except requests.RequestException as e:
# Network error -> retry with backoff
wait = delay + random.uniform(0, 0.5)
wait = max(1.0, min(wait, 60.0))
print(f"Request error: {e}. Retrying in {wait:.1f}s (attempt {attempt + 1}/{max_retries})...")
time.sleep(wait)
delay = min(delay * 2, 60.0)
print(f"Exceeded max retries for {url}")
return None
def validate_github_username(identifier):
"""Verify that a GitHub identifier exists with backoff-aware requests."""
try:
url = f'https://api.github.com/users/{identifier}'
response = request_with_backoff('GET', url, max_retries=1)
if response is None:
return False, "Validation error: network/rate limit exhausted"
if response.status_code == 200:
return True, "Username is valid"
elif response.status_code == 404:
return False, "GitHub identifier not found"
else:
return False, f"Validation error: HTTP {response.status_code}"
except Exception as e:
return False, f"Validation error: {str(e)}"
# =============================================================================
# HUGGINGFACE DATASET OPERATIONS
# =============================================================================
def load_agents_from_hf():
"""Load all agent metadata JSON files from HuggingFace dataset."""
try:
api = HfApi()
agents = []
# List all files in the repository
files = list_repo_files_with_backoff(api=api, repo_id=AGENTS_REPO, repo_type="dataset")
# Filter for JSON files only
json_files = [f for f in files if f.endswith('.json')]
# Download and parse each JSON file
for json_file in json_files:
try:
file_path = hf_hub_download_with_backoff(
repo_id=AGENTS_REPO,
filename=json_file,
repo_type="dataset"
)
with open(file_path, 'r') as f:
agent_data = json.load(f)
# Only process agents with status == "public"
if agent_data.get('status') != 'public':
continue
# Extract github_identifier from filename (e.g., "claude[bot].json" -> "claude[bot]")
filename_identifier = json_file.replace('.json', '')
# Add or override github_identifier to match filename
agent_data['github_identifier'] = filename_identifier
agents.append(agent_data)
except Exception as e:
print(f"Warning: Could not load {json_file}: {str(e)}")
continue
print(f"Loaded {len(agents)} agents from HuggingFace")
return agents
except Exception as e:
print(f"Could not load agents from HuggingFace: {str(e)}")
return None
def get_hf_token():
"""Get HuggingFace token from environment variables."""
token = os.getenv('HF_TOKEN')
if not token:
print("Warning: HF_TOKEN not found in environment variables")
return token
def upload_with_retry(api, path_or_fileobj, path_in_repo, repo_id, repo_type, token, max_retries=5):
"""
Upload file to HuggingFace with exponential backoff retry logic.
Args:
api: HfApi instance
path_or_fileobj: Local file path to upload
path_in_repo: Target path in the repository
repo_id: Repository ID
repo_type: Type of repository (e.g., "dataset")
token: HuggingFace token
max_retries: Maximum number of retry attempts
Returns:
True if upload succeeded, raises exception if all retries failed
"""
delay = 2.0 # Initial delay in seconds
for attempt in range(max_retries):
try:
api.upload_file(
path_or_fileobj=path_or_fileobj,
path_in_repo=path_in_repo,
repo_id=repo_id,
repo_type=repo_type,
token=token
)
if attempt > 0:
print(f" Upload succeeded on attempt {attempt + 1}/{max_retries}")
return True
except Exception as e:
if attempt < max_retries - 1:
wait_time = delay + random.uniform(0, 1.0)
print(f" Upload failed (attempt {attempt + 1}/{max_retries}): {str(e)}")
print(f" Retrying in {wait_time:.1f} seconds...")
time.sleep(wait_time)
delay = min(delay * 2, 60.0) # Exponential backoff, max 60s
else:
print(f" Upload failed after {max_retries} attempts: {str(e)}")
raise
def save_agent_to_hf(data):
"""Save a new agent to HuggingFace dataset as {identifier}.json in root."""
try:
api = HfApi()
token = get_hf_token()
if not token:
raise Exception("No HuggingFace token found. Please set HF_TOKEN in your Space settings.")
identifier = data['github_identifier']
filename = f"{identifier}.json"
# Save locally first
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
try:
# Upload to HuggingFace (root directory)
upload_with_retry(
api=api,
path_or_fileobj=filename,
path_in_repo=filename,
repo_id=AGENTS_REPO,
repo_type="dataset",
token=token
)
print(f"Saved agent to HuggingFace: {filename}")
return True
finally:
# Always clean up local file, even if upload fails
if os.path.exists(filename):
os.remove(filename)
except Exception as e:
print(f"Error saving agent: {str(e)}")
return False
def load_leaderboard_data_from_hf():
"""
Load leaderboard data and monthly metrics from HuggingFace dataset.
Returns:
dict: Dictionary with 'leaderboard', 'monthly_metrics', and 'last_updated' keys
Returns None if file doesn't exist or error occurs
"""
try:
token = get_hf_token()
filename = "swe-review.json"
# Download file
file_path = hf_hub_download_with_backoff(
repo_id=LEADERBOARD_REPO,
filename=filename,
repo_type="dataset",
token=token
)
# Load JSON data
with open(file_path, 'r') as f:
data = json.load(f)
last_updated = data.get('last_updated', 'Unknown')
print(f"Loaded leaderboard data from HuggingFace (last updated: {last_updated})")
return data
except Exception as e:
print(f"Could not load leaderboard data from HuggingFace: {str(e)}")
return None
# =============================================================================
# UI FUNCTIONS
# =============================================================================
def create_monthly_metrics_plot(top_n=5):
"""
Create a Plotly figure with dual y-axes showing:
- Left y-axis: Acceptance Rate (%) as line curves
- Right y-axis: Total Reviews created as bar charts
Each agent gets a unique color for both their line and bars.
Args:
top_n: Number of top agents to show (default: 5)
"""
# Load from saved dataset
saved_data = load_leaderboard_data_from_hf()
if not saved_data or 'monthly_metrics' not in saved_data:
# Return an empty figure with a message
fig = go.Figure()
fig.add_annotation(
text="No data available for visualization",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=16)
)
fig.update_layout(
title=None,
xaxis_title=None,
height=500
)
return fig
metrics = saved_data['monthly_metrics']
print(f"Loaded monthly metrics from saved dataset")
# Apply top_n filter if specified
if top_n is not None and top_n > 0 and metrics.get('agents'):
# Calculate total reviews for each agent
agent_totals = []
for agent_name in metrics['agents']:
agent_data = metrics['data'].get(agent_name, {})
total_reviews = sum(agent_data.get('total_reviews', []))
agent_totals.append((agent_name, total_reviews))
# Sort by total reviews and take top N
agent_totals.sort(key=lambda x: x[1], reverse=True)
top_agents = [agent_name for agent_name, _ in agent_totals[:top_n]]
# Filter metrics to only include top agents
metrics = {
'agents': top_agents,
'months': metrics['months'],
'data': {agent: metrics['data'][agent] for agent in top_agents if agent in metrics['data']}
}
if not metrics['agents'] or not metrics['months']:
# Return an empty figure with a message
fig = go.Figure()
fig.add_annotation(
text="No data available for visualization",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=16)
)
fig.update_layout(
title=None,
xaxis_title=None,
height=500
)
return fig
# Create figure with secondary y-axis
fig = make_subplots(specs=[[{"secondary_y": True}]])
# Generate unique colors for many agents using HSL color space
def generate_color(index, total):
"""Generate distinct colors using HSL color space for better distribution"""
hue = (index * 360 / total) % 360
saturation = 70 + (index % 3) * 10 # Vary saturation slightly
lightness = 45 + (index % 2) * 10 # Vary lightness slightly
return f'hsl({hue}, {saturation}%, {lightness}%)'
agents = metrics['agents']
months = metrics['months']
data = metrics['data']
# Generate colors for all agents
agent_colors = {agent: generate_color(idx, len(agents)) for idx, agent in enumerate(agents)}
# Add traces for each agent
for idx, agent_name in enumerate(agents):
color = agent_colors[agent_name]
agent_data = data[agent_name]
# Add line trace for acceptance rate (left y-axis)
acceptance_rates = agent_data['acceptance_rates']
# Filter out None values for plotting
x_acceptance = [month for month, rate in zip(months, acceptance_rates) if rate is not None]
y_acceptance = [rate for rate in acceptance_rates if rate is not None]
if x_acceptance and y_acceptance: # Only add trace if there's data
fig.add_trace(
go.Scatter(
x=x_acceptance,
y=y_acceptance,
name=agent_name,
mode='lines+markers',
line=dict(color=color, width=2),
marker=dict(size=8),
legendgroup=agent_name,
showlegend=(top_n is not None and top_n <= 10), # Show legend for top N agents
hovertemplate='<b>Agent: %{fullData.name}</b><br>' +
'Month: %{x}<br>' +
'Acceptance Rate: %{y:.2f}%<br>' +
'<extra></extra>'
),
secondary_y=False
)
# Add bar trace for total reviews (right y-axis)
# Only show bars for months where agent has reviews
x_bars = []
y_bars = []
for month, count in zip(months, agent_data['total_reviews']):
if count > 0: # Only include months with reviews
x_bars.append(month)
y_bars.append(count)
if x_bars and y_bars: # Only add trace if there's data
fig.add_trace(
go.Bar(
x=x_bars,
y=y_bars,
name=agent_name,
marker=dict(color=color, opacity=0.6),
legendgroup=agent_name,
showlegend=False, # Hide duplicate legend entry (already shown in Scatter)
hovertemplate='<b>Agent: %{fullData.name}</b><br>' +
'Month: %{x}<br>' +
'Total Reviews: %{y}<br>' +
'<extra></extra>',
offsetgroup=agent_name # Group bars by agent for proper spacing
),
secondary_y=True
)
# Update axes labels
fig.update_xaxes(title_text=None)
fig.update_yaxes(
title_text="<b>Acceptance Rate (%)</b>",
range=[0, 100],
secondary_y=False,
showticklabels=True,
tickmode='linear',
dtick=10,
showgrid=True
)
fig.update_yaxes(title_text="<b>Total Reviews</b>", secondary_y=True)
# Update layout
show_legend = (top_n is not None and top_n <= 10)
fig.update_layout(
title=None,
hovermode='closest', # Show individual agent info on hover
barmode='group',
height=600,
showlegend=show_legend,
margin=dict(l=50, r=150 if show_legend else 50, t=50, b=50) # More right margin when legend is shown
)
return fig
def get_leaderboard_dataframe():
"""
Load leaderboard from saved dataset and convert to pandas DataFrame for display.
Returns formatted DataFrame sorted by total reviews.
"""
# Load from saved dataset
saved_data = load_leaderboard_data_from_hf()
if not saved_data or 'leaderboard' not in saved_data:
print(f"No leaderboard data available")
# Return empty DataFrame with correct columns if no data
column_names = [col[0] for col in LEADERBOARD_COLUMNS]
return pd.DataFrame(columns=column_names)
cache_dict = saved_data['leaderboard']
print(f"Loaded leaderboard from saved dataset (last updated: {saved_data.get('last_updated', 'Unknown')})")
print(f"Cache dict size: {len(cache_dict)}")
if not cache_dict:
print("WARNING: cache_dict is empty!")
# Return empty DataFrame with correct columns if no data
column_names = [col[0] for col in LEADERBOARD_COLUMNS]
return pd.DataFrame(columns=column_names)
rows = []
filtered_count = 0
for identifier, data in cache_dict.items():
total_reviews = data.get('total_reviews', 0)
print(f" Agent '{identifier}': {total_reviews} reviews")
# Filter out agents with zero total reviews
if total_reviews == 0:
filtered_count += 1
continue
# Only include display-relevant fields
rows.append([
data.get('name', 'Unknown'),
data.get('website', 'N/A'),
total_reviews,
data.get('merged_prs', 0),
data.get('acceptance_rate', 0.0),
])
print(f"Filtered out {filtered_count} agents with 0 reviews")
print(f"Leaderboard will show {len(rows)} agents")
# Create DataFrame
column_names = [col[0] for col in LEADERBOARD_COLUMNS]
df = pd.DataFrame(rows, columns=column_names)
# Ensure numeric types
numeric_cols = ["Total Reviews", "Merged PRs", "Acceptance Rate (%)"]
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
# Sort by Total Reviews descending
if "Total Reviews" in df.columns and not df.empty:
df = df.sort_values(by="Total Reviews", ascending=False).reset_index(drop=True)
print(f"Final DataFrame shape: {df.shape}")
print("="*60 + "\n")
return df
def submit_agent(identifier, agent_name, developer, website):
"""
Submit a new agent to the leaderboard.
Validates input and saves submission.
"""
# Validate required fields
if not identifier or not identifier.strip():
return "ERROR: GitHub identifier is required", get_leaderboard_dataframe()
if not agent_name or not agent_name.strip():
return "ERROR: Agent name is required", get_leaderboard_dataframe()
if not developer or not developer.strip():
return "ERROR: Developer name is required", get_leaderboard_dataframe()
if not website or not website.strip():
return "ERROR: Website URL is required", get_leaderboard_dataframe()
# Clean inputs
identifier = identifier.strip()
agent_name = agent_name.strip()
developer = developer.strip()
website = website.strip()
# Validate GitHub identifier
is_valid, message = validate_github_username(identifier)
if not is_valid:
return f"ERROR: {message}", get_leaderboard_dataframe()
# Check for duplicates by loading agents from HuggingFace
agents = load_agents_from_hf()
if agents:
existing_names = {agent['github_identifier'] for agent in agents}
if identifier in existing_names:
return f"WARNING: Agent with identifier '{identifier}' already exists", get_leaderboard_dataframe()
# Create submission
submission = {
'name': agent_name,
'developer': developer,
'github_identifier': identifier,
'website': website,
'status': 'public'
}
# Save to HuggingFace
if not save_agent_to_hf(submission):
return "ERROR: Failed to save submission", get_leaderboard_dataframe()
# Return success message - data will be populated by backend updates
return f"SUCCESS: Successfully submitted {agent_name}! Review data will be populated by the backend system.", get_leaderboard_dataframe()
# =============================================================================
# DATA RELOAD FUNCTION
# =============================================================================
def reload_leaderboard_data():
"""
Reload leaderboard data from HuggingFace.
This function is called by the scheduler on a daily basis.
"""
print(f"\n{'='*80}")
print(f"Reloading leaderboard data from HuggingFace...")
print(f"{'='*80}\n")
try:
data = load_leaderboard_data_from_hf()
if data:
print(f"Successfully reloaded leaderboard data")
print(f" Last updated: {data.get('last_updated', 'Unknown')}")
print(f" Agents: {len(data.get('leaderboard', {}))}")
else:
print(f"No data available")
except Exception as e:
print(f"Error reloading leaderboard data: {str(e)}")
print(f"{'='*80}\n")
# =============================================================================
# GRADIO APPLICATION
# =============================================================================
print(f"\nStarting SWE Agent PR Leaderboard")
print(f" Data source: {LEADERBOARD_REPO}")
print(f" Reload frequency: Daily at 12:00 AM UTC\n")
# Start APScheduler for daily data reload at 12:00 AM UTC
scheduler = BackgroundScheduler(timezone="UTC")
scheduler.add_job(
reload_leaderboard_data,
trigger=CronTrigger(hour=0, minute=0), # 12:00 AM UTC daily
id='daily_data_reload',
name='Daily Data Reload',
replace_existing=True
)
scheduler.start()
print(f"\n{'='*80}")
print(f"Scheduler initialized successfully")
print(f"Reload schedule: Daily at 12:00 AM UTC")
print(f"On startup: Loads cached data from HuggingFace on demand")
print(f"{'='*80}\n")
# Create Gradio interface
with gr.Blocks(title="SWE Agent Review Leaderboard", theme=gr.themes.Soft()) as app:
gr.Markdown("# SWE Agent Review Leaderboard")
gr.Markdown(f"Track and compare GitHub PR review acceptance statistics for SWE agents")
with gr.Tabs():
# Leaderboard Tab
with gr.Tab("Leaderboard"):
gr.Markdown("*Statistics are based on agent review activity tracked by the system*")
leaderboard_table = Leaderboard(
value=pd.DataFrame(columns=[col[0] for col in LEADERBOARD_COLUMNS]), # Empty initially
datatype=LEADERBOARD_COLUMNS,
search_columns=["Agent Name", "Website"],
filter_columns=[
ColumnFilter(
"Acceptance Rate (%)",
min=0,
max=100,
default=[0, 100],
type="slider",
label="Acceptance Rate (%)"
)
]
)
# Load leaderboard data when app starts
app.load(
fn=get_leaderboard_dataframe,
inputs=[],
outputs=[leaderboard_table]
)
# Monthly Metrics Section
gr.Markdown("---") # Divider
gr.Markdown("### Monthly Performance - Top 5 Agents")
gr.Markdown("*Shows acceptance rate trends and review volumes for the most active agents*")
monthly_metrics_plot = gr.Plot(label="Monthly Metrics")
# Load monthly metrics when app starts
app.load(
fn=lambda: create_monthly_metrics_plot(),
inputs=[],
outputs=[monthly_metrics_plot]
)
# Submit Agent Tab
with gr.Tab("Submit Agent"):
gr.Markdown("### Submit Your Agent")
gr.Markdown("Fill in the details below to add your agent to the leaderboard.")
with gr.Row():
with gr.Column():
github_input = gr.Textbox(
label="GitHub Identifier*",
placeholder="Your agent username (e.g., claude[bot])"
)
name_input = gr.Textbox(
label="Agent Name*",
placeholder="Your agent's display name"
)
with gr.Column():
developer_input = gr.Textbox(
label="Developer*",
placeholder="Your developer or team name"
)
website_input = gr.Textbox(
label="Website*",
placeholder="https://your-agent-website.com"
)
submit_button = gr.Button(
"Submit Agent",
variant="primary"
)
submission_status = gr.Textbox(
label="Submission Status",
interactive=False
)
# Event handler
submit_button.click(
fn=submit_agent,
inputs=[github_input, name_input, developer_input, website_input],
outputs=[submission_status, leaderboard_table]
)
# Launch application
if __name__ == "__main__":
app.launch()