Spaces:
Sleeping
Sleeping
| """ | |
| DataEngEval - Hugging Face Spaces App | |
| Main application for the Hugging Face Space deployment. | |
| """ | |
| import gradio as gr | |
| import pandas as pd | |
| import os | |
| import time | |
| from typing import List, Dict, Any, Optional | |
| import sys | |
| # Add src to path for imports | |
| sys.path.append('src') | |
| from evaluator import evaluator, DatasetManager | |
| from models_registry import models_registry | |
| from scoring import scoring_engine | |
| from utils.config_loader import config_loader | |
| class LeaderboardManager: | |
| """Manages the leaderboard persistence and display.""" | |
| def __init__(self): | |
| self.config = config_loader.get_leaderboard_config() | |
| self.leaderboard_path = self.config.path | |
| self.leaderboard = self._load_leaderboard() | |
| def _load_leaderboard(self) -> pd.DataFrame: | |
| """Load existing leaderboard or create new one.""" | |
| if os.path.exists(self.leaderboard_path): | |
| try: | |
| return pd.read_parquet(self.leaderboard_path) | |
| except Exception as e: | |
| print(f"Error loading leaderboard: {e}") | |
| # Create empty leaderboard using config | |
| return pd.DataFrame(columns=self.config.columns) | |
| def add_result(self, result: Dict[str, Any]): | |
| """Add a new result to the leaderboard.""" | |
| new_row = pd.DataFrame([result]) | |
| self.leaderboard = pd.concat([self.leaderboard, new_row], ignore_index=True) | |
| self._save_leaderboard() | |
| def _save_leaderboard(self): | |
| """Save leaderboard to parquet file.""" | |
| try: | |
| self.leaderboard.to_parquet(self.leaderboard_path, index=False) | |
| except Exception as e: | |
| print(f"Error saving leaderboard: {e}") | |
| def get_leaderboard(self) -> pd.DataFrame: | |
| """Get the current leaderboard.""" | |
| return self.leaderboard.copy() | |
| def get_top_results(self, n: int = None) -> pd.DataFrame: | |
| """Get top N results by composite score, aggregated by model.""" | |
| if self.leaderboard.empty: | |
| return self.leaderboard | |
| if n is None: | |
| n = self.config.top_results | |
| # Group by model and calculate averages | |
| numeric_columns = ['composite_score', 'correctness_exact', 'result_match_f1', 'exec_success', 'latency_ms'] | |
| # Calculate averages for numeric columns, keeping provider info | |
| model_aggregated = self.leaderboard.groupby(['model_name', 'provider'])[numeric_columns].mean().reset_index() | |
| # Create combined model name with provider | |
| model_aggregated['model_display'] = model_aggregated['model_name'] + ' (' + model_aggregated['provider'] + ')' | |
| # Sort by composite score (descending) to get proper ranking | |
| model_aggregated = model_aggregated.sort_values('composite_score', ascending=False).reset_index(drop=True) | |
| # Take top N results | |
| top_results = model_aggregated.head(n).copy() | |
| # Add ranking column (1-based ranking) | |
| top_results.insert(0, 'rank', range(1, len(top_results) + 1)) | |
| # Reorder columns according to configuration | |
| leaderboard_config = config_loader.get_leaderboard_config() | |
| column_mapping = { | |
| 'Rank': 'rank', | |
| 'Model': 'model_display', | |
| 'Composite Score': 'composite_score', | |
| 'Correctness': 'correctness_exact', | |
| 'Result F1': 'result_match_f1', | |
| 'Exec Success': 'exec_success', | |
| 'Latency': 'latency_ms', | |
| 'Dataset': 'dataset_name', | |
| 'Case ID': 'case_id', | |
| 'Question': 'question', | |
| 'Reference SQL': 'reference_sql', | |
| 'Generated SQL': 'candidate_sql', | |
| 'Dialect OK': 'dialect_ok' | |
| } | |
| # Select and reorder columns | |
| ordered_columns = [] | |
| for header in leaderboard_config.results_table_headers: | |
| if header in column_mapping and column_mapping[header] in top_results.columns: | |
| ordered_columns.append(column_mapping[header]) | |
| return top_results[ordered_columns] | |
| # Global instances | |
| leaderboard_manager = LeaderboardManager() | |
| dataset_manager = DatasetManager() | |
| def load_prompt_template(dialect: str) -> str: | |
| """Load prompt template for a specific dialect.""" | |
| prompts_config = config_loader.get_prompts_config() | |
| # Get template file path from config | |
| template_path = prompts_config.files.get(dialect.lower()) | |
| if template_path and os.path.exists(template_path): | |
| with open(template_path, 'r') as f: | |
| return f.read() | |
| else: | |
| # Use fallback template from config | |
| return prompts_config.fallback.format(dialect=dialect) | |
| def get_available_datasets() -> List[str]: | |
| """Get list of available datasets.""" | |
| # Get all available datasets | |
| all_datasets = dataset_manager.get_datasets() | |
| print(f"All available datasets: {list(all_datasets.keys())}") | |
| # Filter to only show visible datasets from config | |
| visible_datasets = config_loader.get_visible_datasets() | |
| print(f"Visible datasets from config: {visible_datasets}") | |
| # Return only datasets that are both available and visible | |
| result = [name for name in all_datasets.keys() if name in visible_datasets] | |
| print(f"Final available datasets: {result}") | |
| return result | |
| def get_available_models() -> List[str]: | |
| """Get list of available models.""" | |
| models = models_registry.get_models() | |
| return [model.name for model in models] | |
| def get_available_dialects() -> List[str]: | |
| """Get list of available SQL dialects.""" | |
| return config_loader.get_dialects() | |
| def handle_model_selection(selected_models: List[str]) -> List[str]: | |
| """Handle model selection including 'Select All' functionality.""" | |
| if not selected_models: | |
| return [] | |
| # If "Select All" is selected, return all available models | |
| if "Select All" in selected_models: | |
| return get_available_models() | |
| # Otherwise, return the selected models (excluding "Select All" if it's there) | |
| return [model for model in selected_models if model != "Select All"] | |
| def get_cases_for_dataset(dataset_name: str) -> List[str]: | |
| """Get list of cases for a dataset.""" | |
| if not dataset_name: | |
| return [] | |
| try: | |
| print(f"Loading cases for dataset: {dataset_name}") | |
| # Check if dataset exists | |
| dataset = dataset_manager.get_dataset(dataset_name) | |
| if not dataset: | |
| print(f"Dataset {dataset_name} not found!") | |
| print(f"Available datasets: {list(dataset_manager.get_datasets().keys())}") | |
| return [] | |
| print(f"Dataset found: {dataset.name}") | |
| print(f"Cases path: {dataset.cases_path}") | |
| cases = dataset_manager.load_cases(dataset_name) | |
| print(f"Loaded {len(cases)} cases") | |
| for i, case in enumerate(cases): | |
| print(f" Case {i+1}: {case.id} - {case.question[:50]}...") | |
| return [f"{case.id}: {case.question[:50]}..." for case in cases] | |
| except Exception as e: | |
| print(f"Error loading cases for {dataset_name}: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return [] | |
| def run_evaluation(dataset_name: str, dialect: str, case_selection: str, | |
| selected_models: List[str]) -> tuple: | |
| """Run evaluation for selected models on a case.""" | |
| if not all([dataset_name, dialect, case_selection, selected_models]): | |
| return "Please select all required options.", None, None, None | |
| # Handle model selection (including "Select All" functionality) | |
| selected_models = handle_model_selection(selected_models) | |
| if not selected_models: | |
| return "Please select at least one model to evaluate.", None, None, None | |
| # Get environment config | |
| env_config = config_loader.get_environment_config() | |
| has_hf_token = bool(os.getenv(env_config["hf_token_env"])) | |
| if not has_hf_token: | |
| print("🏠 No HF_TOKEN detected, using mock mode for demo purposes") | |
| # Extract case ID from selection | |
| case_id = case_selection.split(":")[0] if ":" in case_selection else case_selection | |
| # Load prompt template | |
| prompt_template = load_prompt_template(dialect) | |
| # Get metrics config for formatting | |
| metrics_config = config_loader.get_metrics_config() | |
| formatting = metrics_config.formatting | |
| results = [] | |
| detailed_results = [] | |
| for model_name in selected_models: | |
| try: | |
| print(f"Evaluating {model_name} on {dataset_name}/{case_id} ({dialect})") | |
| result = evaluator.evaluate_model_on_case( | |
| model_name, dataset_name, case_id, dialect, prompt_template | |
| ) | |
| # Add to leaderboard | |
| leaderboard_manager.add_result(result) | |
| # Format for display using config | |
| results.append([ | |
| len(results) + 1, # Rank (1-based) | |
| f"{model_name} ({result['provider']})", # Include provider in model name | |
| formatting["composite_score"].format(result['composite_score']), | |
| formatting["correctness_exact"].format(result['correctness_exact']), | |
| formatting["result_match_f1"].format(result['result_match_f1']), | |
| formatting["exec_success"].format(result['exec_success']), | |
| formatting["latency_ms"].format(result['latency_ms']), | |
| result['dataset_name'], | |
| result['case_id'], | |
| result['question'][:100] + "..." if len(result['question']) > 100 else result['question'], | |
| result['reference_sql'][:100] + "..." if len(result['reference_sql']) > 100 else result['reference_sql'], | |
| result['candidate_sql'][:100] + "..." if len(result['candidate_sql']) > 100 else result['candidate_sql'], | |
| formatting["dialect_ok"].format(result['dialect_ok']) | |
| ]) | |
| detailed_results.append(f""" | |
| **Model: {model_name}** | |
| - **Question:** {result['question']} | |
| - **Reference SQL:** ```sql | |
| {result['reference_sql']} | |
| ``` | |
| - **Generated SQL:** ```sql | |
| {result['candidate_sql']} | |
| ``` | |
| - **Composite Score:** {formatting["composite_score"].format(result['composite_score'])} | |
| - **Correctness (Exact):** {formatting["correctness_exact"].format(result['correctness_exact'])} | |
| - **Execution Success:** {formatting["exec_success"].format(result['exec_success'])} | |
| - **Result Match F1:** {formatting["result_match_f1"].format(result['result_match_f1'])} | |
| - **Latency:** {formatting["latency_ms"].format(result['latency_ms'])} | |
| - **Dialect Compliance:** {formatting["dialect_ok"].format(result['dialect_ok'])} | |
| --- | |
| """) | |
| except Exception as e: | |
| error_msg = f"Error evaluating {model_name}: {str(e)}" | |
| print(error_msg) | |
| results.append([len(results) + 1, model_name, "ERROR", "ERROR", "ERROR", "ERROR", "ERROR", "ERROR", "ERROR", "ERROR", "ERROR", "ERROR", "ERROR"]) | |
| detailed_results.append(f"**Error with {model_name}:** {error_msg}\n\n---\n") | |
| # Create results DataFrame using config | |
| leaderboard_config = config_loader.get_leaderboard_config() | |
| results_df = pd.DataFrame(results, columns=leaderboard_config.results_table_headers) | |
| # Get updated leaderboard | |
| leaderboard_df = leaderboard_manager.get_top_results(20) | |
| return ( | |
| f"Evaluation completed! Processed {len(selected_models)} models.", | |
| results_df, | |
| "\n".join(detailed_results), | |
| leaderboard_df | |
| ) | |
| def get_leaderboard_display() -> pd.DataFrame: | |
| """Get the current leaderboard for display.""" | |
| leaderboard_config = config_loader.get_leaderboard_config() | |
| leaderboard_data = leaderboard_manager.get_top_results(leaderboard_config.top_results) | |
| # The get_top_results method already filters columns according to configuration | |
| # This ensures consistency with the Results table in the Evaluate tab | |
| return leaderboard_data | |
| # Create Gradio interface | |
| def create_interface(): | |
| """Create the Gradio interface.""" | |
| # Get app configuration | |
| app_config = config_loader.get_app_config() | |
| ui_config = config_loader.get_ui_config() | |
| with gr.Blocks(title=app_config.title, theme=getattr(gr.themes, app_config.theme.capitalize())()) as app: | |
| gr.Markdown(f""" | |
| # {app_config.title} | |
| {app_config.description} | |
| Select a dataset, dialect, and test case, then choose models to evaluate. Results are automatically added to the public leaderboard. | |
| **Note**: This Hugging Face Space uses remote inference - no heavy models are downloaded locally! | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=10): | |
| pass # Empty column for spacing | |
| with gr.Column(scale=1): | |
| refresh_button = gr.Button( | |
| ui_config["buttons"]["refresh"]["text"], | |
| variant=ui_config["buttons"]["refresh"]["variant"], | |
| size=ui_config["buttons"]["refresh"]["size"] | |
| ) | |
| with gr.Tabs(): | |
| # Evaluation Tab | |
| with gr.Tab(ui_config["tabs"][0]["label"]): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| dataset_dropdown = gr.Dropdown( | |
| choices=get_available_datasets(), | |
| label=ui_config["inputs"]["dataset"]["label"], | |
| value=get_available_datasets()[0] if get_available_datasets() else None | |
| ) | |
| dialect_dropdown = gr.Dropdown( | |
| choices=get_available_dialects(), | |
| label=ui_config["inputs"]["dialect"]["label"], | |
| value=ui_config["inputs"]["dialect"]["default"] | |
| ) | |
| # Initialize cases for default dataset | |
| default_dataset = get_available_datasets()[0] if get_available_datasets() else None | |
| initial_cases = [] | |
| if default_dataset: | |
| print(f"Initializing cases for default dataset: {default_dataset}") | |
| initial_cases = get_cases_for_dataset(default_dataset) | |
| print(f"Initialized {len(initial_cases)} cases") | |
| case_dropdown = gr.Dropdown( | |
| choices=initial_cases, | |
| label=ui_config["inputs"]["case"]["label"], | |
| interactive=True, | |
| value=initial_cases[0] if initial_cases else None | |
| ) | |
| models_checkbox = gr.CheckboxGroup( | |
| choices=["Select All"] + get_available_models(), | |
| label=ui_config["inputs"]["models"]["label"], | |
| value=[] | |
| ) | |
| run_button = gr.Button( | |
| ui_config["buttons"]["run_evaluation"]["text"], | |
| variant=ui_config["buttons"]["run_evaluation"]["variant"] | |
| ) | |
| with gr.Column(scale=2): | |
| status_output = gr.Textbox(label=ui_config["outputs"]["status"]["label"], interactive=False) | |
| results_table = gr.Dataframe( | |
| label=ui_config["outputs"]["results"]["label"], | |
| headers=ui_config["outputs"]["results"]["headers"], | |
| interactive=False | |
| ) | |
| detailed_results = gr.Markdown(label=ui_config["outputs"]["detailed"]["label"]) | |
| # Event handlers | |
| def update_cases(dataset_name): | |
| print(f"update_cases called with dataset_name: {dataset_name}") | |
| cases = get_cases_for_dataset(dataset_name) | |
| print(f"update_cases returning {len(cases)} cases") | |
| return gr.Dropdown(choices=cases, value=cases[0] if cases else None) | |
| dataset_dropdown.change( | |
| fn=update_cases, | |
| inputs=[dataset_dropdown], | |
| outputs=[case_dropdown] | |
| ) | |
| run_button.click( | |
| fn=run_evaluation, | |
| inputs=[dataset_dropdown, dialect_dropdown, case_dropdown, models_checkbox], | |
| outputs=[status_output, results_table, detailed_results, gr.State()] | |
| ) | |
| # Leaderboard Tab | |
| with gr.Tab(ui_config["tabs"][1]["label"]): | |
| # Get leaderboard data with same column filtering as Results table | |
| leaderboard_data = get_leaderboard_display() | |
| leaderboard_table = gr.Dataframe( | |
| label=ui_config["outputs"]["leaderboard"]["label"], | |
| interactive=False, | |
| value=leaderboard_data, | |
| headers=ui_config["outputs"]["results"]["headers"] | |
| ) | |
| # Info Tab | |
| with gr.Tab(ui_config["tabs"][2]["label"]): | |
| gr.Markdown(""" | |
| ## About DataEngEval | |
| This platform evaluates natural language to SQL generation across multiple dialects and datasets using Hugging Face Spaces. | |
| ### Features | |
| - **Multi-dialect support**: Presto, BigQuery, Snowflake | |
| - **Config-driven models**: Add new models by editing `config/models.yaml` | |
| - **Multiple datasets**: NYC Taxi (with more coming) | |
| - **Comprehensive metrics**: Correctness, execution success, result matching, latency | |
| - **Public leaderboard**: Track performance across models and datasets | |
| - **Remote inference**: No heavy model downloads - uses Hugging Face Inference API | |
| ### Adding New Models | |
| 1. Edit `config/models.yaml` | |
| 2. Add your model configuration with provider, model_id, and parameters | |
| 3. Supported providers: `huggingface` | |
| ### Adding New Datasets | |
| 1. Create a new folder under `tasks/` | |
| 2. Add `schema.sql`, `loader.py`, and `cases.yaml` | |
| 3. The loader should create a DuckDB database with sample data | |
| 4. Cases should include questions and reference SQL for each dialect | |
| ### Scoring | |
| The composite score combines: | |
| - **Correctness (40%)**: Exact match with reference results | |
| - **Execution Success (25%)**: SQL executes without errors | |
| - **Result Match F1 (15%)**: Partial credit for similar results | |
| - **Dialect Compliance (10%)**: Proper SQL transpilation | |
| - **Readability (5%)**: SQL structure and formatting | |
| - **Latency (5%)**: Response time (normalized) | |
| ### Hugging Face Spaces Deployment | |
| This app is optimized for Hugging Face Spaces: | |
| - Uses remote inference via Hugging Face Inference API | |
| - No local model downloads required | |
| - Lightweight dependencies | |
| - Automatic deployment from Git | |
| ### Environment Variables | |
| - `HF_TOKEN`: Hugging Face API token (optional - if not set, uses mock mode) | |
| - `MOCK_MODE`: Set to "true" to force mock mode | |
| """) | |
| # Add refresh button click event | |
| refresh_button.click( | |
| fn=get_leaderboard_display, | |
| outputs=[leaderboard_table] | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| app = create_interface() | |
| app_config = config_loader.get_app_config() | |
| app.launch( | |
| server_name=app_config.server_host, | |
| server_port=app_config.server_port, | |
| share=app_config.server_share | |
| ) | |