Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| import gradio as gr | |
| import requests | |
| import yaml | |
| import os | |
| import re | |
| import asyncio | |
| import aiohttp | |
| import pandas as pd | |
| from tqdm import tqdm | |
| from get_last_ci_run import get_last_ci_run_id | |
| def get_audio_models(): | |
| url = "https://raw.githubusercontent.com/huggingface/transformers/main/docs/source/en/_toctree.yml" | |
| response = requests.get(url) | |
| if response.status_code != 200: | |
| print("Failed to fetch the YAML file") | |
| return [] | |
| toctree_content = yaml.safe_load(response.text) | |
| for section in toctree_content: | |
| if section.get('title') == 'API': | |
| for subsection in section.get('sections', []): | |
| if subsection.get('title') == 'Models': | |
| for model_section in subsection.get('sections', []): | |
| if model_section.get('title') == 'Audio models': | |
| return [audio_model.get('local').split('/')[-1].lower().replace('-', '_') for audio_model in model_section.get('sections', []) if 'local' in audio_model] | |
| return [] | |
| def fetch_and_process_ci_results(job_id): | |
| github_token = os.environ.get('GITHUB_TOKEN') | |
| if not github_token: | |
| raise ValueError("GitHub token not found in environment variables") | |
| headers = { | |
| "Authorization": f"token {github_token}", | |
| "Accept": "application/vnd.github+json" | |
| } | |
| audio_models = get_audio_models() | |
| non_tested_models = [ | |
| "xls_r", | |
| "speech_to_text_2", | |
| "mctct", | |
| "xlsr_wav2vec2", | |
| "mms" | |
| ] | |
| url = f"https://api.github.com/repos/huggingface/transformers/actions/runs/{job_id}/jobs" | |
| audio_model_jobs = {audio_model: [] for audio_model in audio_models} | |
| def process_jobs(jobs_data): | |
| for job in jobs_data['jobs']: | |
| if "Model CI" in job['name'] and "models" in job['name']: | |
| match = re.search(r'models/([^/)]+)', job['name']) | |
| if match: | |
| model_name = match.group(1).lower() | |
| if model_name in audio_model_jobs: | |
| audio_model_jobs[model_name].append(job['id']) | |
| async def fetch_and_process_jobs(session, url): | |
| async with session.get(url, headers=headers) as response: | |
| jobs_data = await response.json() | |
| process_jobs(jobs_data) | |
| return response.links.get('next', {}).get('url') | |
| async def fetch_all_jobs(): | |
| async with aiohttp.ClientSession() as session: | |
| next_url = url | |
| with tqdm(desc="Fetching jobs", unit="page") as pbar: | |
| while next_url: | |
| next_url = await fetch_and_process_jobs(session, next_url) | |
| pbar.update(1) | |
| def parse_test_results(text): | |
| pattern = r'=+ (?:(\d+) failed,?\s*)?(?:(\d+) passed,?\s*)?(?:(\d+) skipped,?\s*)?(?:\d+ warnings?\s*)?in \d+\.\d+s' | |
| match = re.search(pattern, text) | |
| if match: | |
| failed = int(match.group(1)) if match.group(1) else 0 | |
| passed = int(match.group(2)) if match.group(2) else 0 | |
| skipped = int(match.group(3)) if match.group(3) else 0 | |
| return {'failed': failed, 'passed': passed, 'skipped': skipped} | |
| raise Exception("Could not find test summary in logs") | |
| def retrieve_job_logs(job_id, job_name): | |
| url = f"https://api.github.com/repos/huggingface/transformers/actions/jobs/{job_id}" | |
| response = requests.get(url, headers=headers) | |
| logs_url = f"https://api.github.com/repos/huggingface/transformers/actions/jobs/{job_id}/logs" | |
| logs_response = requests.get(logs_url, headers=headers) | |
| logs = logs_response.text | |
| test_summary = parse_test_results(logs) | |
| test_summary["model"] = job_name | |
| test_summary["conclusion"] = response.json()['conclusion'] | |
| return test_summary | |
| # Fetch initial jobs and run asynchronous job fetching | |
| response = requests.get(url, headers=headers) | |
| jobs = response.json() | |
| process_jobs(jobs) | |
| asyncio.run(fetch_all_jobs()) | |
| # Retrieve job logs and process results | |
| results = [] | |
| for job_name, job_ids in tqdm(audio_model_jobs.items()): | |
| for job_id in job_ids: | |
| result = retrieve_job_logs(job_id, job_name) | |
| results.append(result) | |
| # Process results into DataFrame and save to CSV | |
| df = (pd.DataFrame(results) | |
| .melt(id_vars=['model', 'conclusion'], | |
| value_vars=['failed', 'passed', 'skipped'], | |
| var_name='test_type', | |
| value_name='number_of_tests') | |
| .groupby(['model', 'conclusion', 'test_type']) | |
| .agg({'number_of_tests': 'sum'}) | |
| .reset_index()) | |
| df.to_csv('test_results_by_type.csv', index=False) | |
| def load_and_process_data(): | |
| # Load the CSV file | |
| model_test_results = pd.read_csv('test_results_by_type.csv') | |
| # Get models with failed tests and their failure counts | |
| failed_models_counts = model_test_results[ | |
| (model_test_results['test_type'] == 'failed') & | |
| (model_test_results['number_of_tests'] > 0) | |
| ].groupby('model')['number_of_tests'].first().to_dict() | |
| # Add β and failure count to model names that have failures, β for passing models | |
| model_test_results['model'] = model_test_results.apply( | |
| lambda row: f"{row['model']} β ({failed_models_counts[row['model']]})" if row['model'] in failed_models_counts else f"{row['model']} β ", | |
| axis=1 | |
| ) | |
| # Separate failed tests and other tests | |
| failed_tests = model_test_results[model_test_results['test_type'] == 'failed'].sort_values('number_of_tests', ascending=False) | |
| other_tests = model_test_results[model_test_results['test_type'] != 'failed'] | |
| # Concatenate the dataframes | |
| model_test_results = pd.concat([failed_tests, other_tests]) | |
| # Sort models by success/failure and number of failed tests | |
| model_order = model_test_results.sort_values( | |
| by=['conclusion', 'test_type', 'number_of_tests'], | |
| ascending=[True, False, False] | |
| )['model'].unique().tolist() | |
| return model_test_results, model_order, failed_models_counts | |
| def create_bar_plot(model_test_results, model_order, failed_models_counts): | |
| return gr.BarPlot( | |
| model_test_results, | |
| x="model", | |
| y="number_of_tests", # Base layer | |
| color="test_type", # Color by pass/fail status | |
| color_map={"passed": "#008550", "skipped": "#F0B702", "failed": "#8B1710"}, | |
| title="Test Results by Model", | |
| x_title=f"Models ({len(failed_models_counts)} failing / {len(model_order)} total)", | |
| y_title="Number of Tests", | |
| height=600, | |
| width=1000, | |
| x_label_angle=45, # Rotate x-axis labels by 45 degrees | |
| x_order=model_order # Set custom order of x-axis | |
| ) | |
| # Create the Gradio interface | |
| with gr.Blocks() as results_viz: | |
| gr.Markdown("# Test Results by Model") | |
| model_test_results, model_order, failed_models_counts = load_and_process_data() | |
| test_results_plot = create_bar_plot(model_test_results, model_order, failed_models_counts) | |
| with gr.Row(): | |
| refresh_btn = gr.Button( | |
| value="Refresh CI Results (~2mn)", | |
| variant="primary" | |
| ) | |
| refresh_status = gr.Textbox() | |
| ci_link = gr.HTML() | |
| def check_and_refresh(): | |
| # For now just return hardcoded ID | |
| latest_ci_id = str(get_last_ci_run_id()) | |
| try: | |
| with open("ci_id.txt", "r") as f: | |
| current_ci_id = f.read().strip() | |
| except FileNotFoundError: | |
| current_ci_id = "" | |
| if latest_ci_id == current_ci_id: | |
| return "No new CI results available yet.", test_results_plot, f'<a href="https://github.com/huggingface/transformers/actions/runs/{latest_ci_id}" target="_blank">Latest CI Run</a>' | |
| else: | |
| fetch_and_process_ci_results(latest_ci_id) | |
| with open("ci_id.txt", "w") as f: | |
| f.write(latest_ci_id) | |
| # Reload and reprocess the data | |
| new_model_test_results, new_model_order, new_failed_models_counts = load_and_process_data() | |
| # Create a new BarPlot with the updated data | |
| new_test_results_plot = create_bar_plot(new_model_test_results, new_model_order, new_failed_models_counts) | |
| return "CI results refreshed successfully!", new_test_results_plot, f'<a href="https://github.com/huggingface/transformers/actions/runs/{latest_ci_id}" target="_blank">Latest CI Run</a>' | |
| refresh_btn.click(fn=check_and_refresh, outputs=[refresh_status, test_results_plot, ci_link]) | |
| if __name__ == "__main__": | |
| results_viz.launch() |