import datetime import json import os from pathlib import Path import datasets import gradio as gr import pandas as pd import requests from apscheduler.schedulers.background import BackgroundScheduler # InfoStrings from content import ( CITATION_BUTTON_LABEL, CITATION_BUTTON_TEXT, CONTACT_DATASET, INTRODUCTION_TEXT, LEADERBOARD_PATH, OWNER, RESULTS_DATASET, SCENARIO_LIST, SUBMISSION_TEXT, TITLE, ) from gradio_leaderboard import ColumnFilter, Leaderboard, SelectColumns from huggingface_hub import create_repo, snapshot_download, upload_folder from utils import api, Experiment, format_log, model_hyperlink, TOKEN contact_infos = datasets.load_dataset( CONTACT_DATASET, token=TOKEN, verification_mode=datasets.VerificationMode.NO_CHECKS ) # download_mode="force_redownload" def get_display_name(capability: str) -> str: """ Convert internal capability names to user-friendly display names. Args: capability: Internal capability name from the benchmark Returns: User-friendly display name for the leaderboard """ if "noise" in capability: return "noise" elif "agent2agent" in capability or "a2a" in capability: return "A2A" else: return capability def cleanup(row) -> dict: """ Transform raw evaluation data into a clean format for the leaderboard display. Args: row: Raw evaluation result row from the dataset Returns: Dictionary with cleaned and formatted data for leaderboard display """ result = {} # Basic model information result["Model"] = row["metadata.model"] result["Provider"] = row["metadata.model_provider"] result["Total score (%)"] = round(row["statistics.global.macro_success_rate"], 1) # Define the order of capability columns for consistent display scenario_order = [ "execution", "search", "ambiguity", "adaptability", "time", "mini_noise", "mini_agent2agent", ] # Process each capability score with aligned formatting for capability in scenario_order: if capability in SCENARIO_LIST: display_name = get_display_name(capability) # Extract score and standard error score = row[f"statistics.per_capability.{capability}.success_rate"] sem = row[f"statistics.per_capability.{capability}.success_rate_sem"] # Format with decimal alignment using non-breaking spaces score_str = f"{score:4.1f}".replace(" ", "\u00A0") sem_str = f"{sem:.1f}" # No width formatting for SEM to avoid extra spaces result[f"{display_name} (%)"] = f"{score_str} ± {sem_str}" # Add metadata fields result["Number of runs"] = ( row["statistics.global.total_runs"] / row["statistics.global.total_scenarios"] if row["statistics.global.total_scenarios"] != 0 else 0 ) result["Submitter"] = row["metadata.organisation"] result["Submission date"] = row["metadata.timestamp"][:10] return result def get_dataframe_from_results() -> pd.DataFrame: """ Load and process evaluation results from the dataset to create a leaderboard DataFrame. Retrieves raw evaluation data, processes it through the cleanup function, and returns a sorted DataFrame ready for leaderboard display. Returns: Pandas DataFrame with processed leaderboard data, sorted by total score Returns empty DataFrame if no data is available """ split = "train" # Load evaluation results dataset try: eval_results = datasets.load_dataset( RESULTS_DATASET, token=TOKEN, verification_mode=datasets.VerificationMode.NO_CHECKS, ) except datasets.data_files.EmptyDatasetError: eval_results = datasets.DatasetDict() # Return empty DataFrame if no data available if not eval_results or split not in eval_results or len(eval_results[split]) == 0: return pd.DataFrame([]) results = eval_results[split] local_df = results.flatten() # Define columns to extract from the raw data metadata_columns = [ "metadata.model", "metadata.model_provider", "metadata.organisation", "metadata.timestamp", "metadata.url", ] global_stats_columns = [ "statistics.global.macro_success_rate", "statistics.global.total_runs", "statistics.global.total_scenarios", ] # Add per-capability statistics columns capability_columns = [] for capability in SCENARIO_LIST: capability_columns.extend( [ f"statistics.per_capability.{capability}.success_rate", f"statistics.per_capability.{capability}.success_rate_sem", ] ) # Combine all required columns columns = metadata_columns + global_stats_columns + capability_columns # Process the data: select columns, clean up, and remove original columns local_df = local_df.select_columns(columns) mapped_df = local_df.map(cleanup, batched=False) mapped_df = mapped_df.remove_columns(columns) # Convert to pandas DataFrame and sort by total score (highest first) df = pd.DataFrame(mapped_df) df = df.sort_values(by=["Total score (%)"], ascending=False) return df # ATM only one set eval_dataframe_val = get_dataframe_from_results() def restart_space(): api.restart_space(repo_id=LEADERBOARD_PATH, token=TOKEN) def add_new_eval( organisation: str, path_to_repository: str, profile: gr.OAuthProfile, token: gr.OAuthToken, ): # ---- USER CHECKS ---- # Was the profile created less than 2 month ago? user_data = requests.get( f"https://huggingface.co/api/users/{profile.username}/overview" ) creation_date = json.loads(user_data.content)["createdAt"] if datetime.datetime.now() - datetime.datetime.strptime( creation_date, "%Y-%m-%dT%H:%M:%S.%fZ" ) < datetime.timedelta(days=60): raise Exception("This account is not authorized to submit on Gaia2.") # Can't submit several times per day contact_infos = datasets.load_dataset( CONTACT_DATASET, token=TOKEN, verification_mode=datasets.VerificationMode.NO_CHECKS, ) user_submission_dates = sorted( row["date"] for row in contact_infos["train"] if row["username"] == profile.username ) if len(user_submission_dates) > 0 and user_submission_dates[ -1 ] == datetime.datetime.today().strftime("%Y-%m-%d"): raise Exception("You already submitted once today, please try again tomorrow.") # ---- EXPERIMENT MANAGEMENT ---- # Download locally with HF hub snapshot_path = snapshot_download( repo_id=path_to_repository, token=token.token, repo_type="dataset" ) # Test completeness with datasets try: for scenario in SCENARIO_LIST: # Loading what the user provided datasets.load_dataset( snapshot_path, scenario, split="test", verification_mode=datasets.VerificationMode.NO_CHECKS, ) except Exception as e: print(e) raise ValueError( f"We cannot load the scenario {scenario} for your dataset ({path_to_repository}). Please make sure the dataset is accessible and all subsets are there." ) with open(Path(snapshot_path, "computed_stats.json")) as f: results = json.load(f) model = results["metadata"]["model"] results["metadata"]["organisation"] = organisation results["metadata"]["url"] = path_to_repository try: ds = datasets.load_dataset(RESULTS_DATASET, split="train") except datasets.data_files.EmptyDatasetError: ds = datasets.Dataset.from_dict({}) if results in ds: raise Exception("This precise model and results file was already submitted") ds = ds.add_item(results) ds.push_to_hub(RESULTS_DATASET, split="train", private=True) experiment = Experiment(path_to_repository, organisation, model) # Save copy to hub create_repo( repo_id=f"{OWNER}/{str(experiment)}", repo_type="dataset", token=TOKEN, private=True, ) upload_folder( folder_path=snapshot_path, repo_id=f"{OWNER}/{str(experiment)}", repo_type="dataset", token=TOKEN, ) print(f"Adding new eval: {str(experiment)}") # SAVE ALL INFO contact_info = { "model": experiment.model, "path_to_hub": experiment.path_to_hub, "path_to_hub_private_copy": f"{OWNER}/{str(experiment)}", "organisation": experiment.organisation, "date": experiment.cur_date, "username": profile.username, "mail": getattr(profile, "email", None), } contact_infos["test"] = contact_infos["test"].add_item(contact_info) contact_infos.push_to_hub(CONTACT_DATASET, token=TOKEN) return format_log( f"Model {model} submitted by {organisation} successfully.\nPlease wait a couple minutes and refresh the leaderboard to see your score displayed." ) def refresh(): return get_dataframe_from_results() # Custom CSS for sleek styling custom_css = """ """ demo = gr.Blocks( # css=custom_css, theme=gr.themes.Soft( font=[gr.themes.GoogleFont("Roboto"), "Arial", "sans-serif"], primary_hue="blue" ), ) with demo: gr.HTML(TITLE) with gr.Accordion("About", open=True): gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") # Enhanced leaderboard with custom styling with gr.Column(elem_classes="leaderboard-container"): # gr.HTML( # """ #
#

# 🏆 Gaia2 Leaderboard Rankings #

#

# Click on column headers to sort • Use filters to narrow results #

#
# """ # ) leaderboard_table_val = Leaderboard( value=eval_dataframe_val, select_columns=SelectColumns( default_selection=[ "Model", "Provider", "Total score (%)", "execution (%)", "search (%)", "ambiguity (%)", "adaptability (%)", "time (%)", "noise (%)", "A2A (%)", "Submission date", ], cant_deselect=[ "Model", "Provider", "Total score (%)", "Submission date", ], ), search_columns=["Model", "Provider", "Submitter"], filter_columns=[ "Provider", ColumnFilter("Model", type="dropdown", label="🔍 Select Model"), ], ) # Enhanced submission section with gr.Column(elem_classes="submission-section"): gr.HTML( """

🚀 Submit Your Model

""" ) with gr.Accordion("📋 How to submit", open=True): gr.Markdown(SUBMISSION_TEXT, elem_classes="markdown-text") with gr.Row(equal_height=True): with gr.Column(scale=1): gr.LoginButton(size="lg") with gr.Column(scale=2): organisation_tbox = gr.Textbox( label="🏢 Organization", placeholder="Enter your organization name", container=True, ) with gr.Column(scale=3): dataset_tbox = gr.Textbox( label="📊 Hub Dataset Path", placeholder="username/dataset-name", container=True, ) with gr.Column(scale=1): submit_button = gr.Button("Submit", variant="primary", size="lg") with gr.Column(scale=1): refresh_button = gr.Button( "🔄 Refresh the display", variant="secondary", size="lg" ) submission_result = gr.Markdown() with gr.Column(): gr.HTML( """
Star ARE on GitHub ⭐ 🧑‍🔬 Read the paper 🚀 Try the ARE Demo
""" ) with gr.Column(): with gr.Accordion("📙 Citation", open=True): citation_button = gr.Textbox( value=CITATION_BUTTON_TEXT, label=CITATION_BUTTON_LABEL, elem_id="citation-button", show_copy_button=True, ) submit_button.click( add_new_eval, [organisation_tbox, dataset_tbox], submission_result, ) refresh_button.click( refresh, inputs=[], outputs=[leaderboard_table_val], ) scheduler = BackgroundScheduler() scheduler.add_job(restart_space, "interval", seconds=3600) scheduler.start() demo.launch(debug=True, server_name="0.0.0.0", server_port=7860)