Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import pandas as pd | |
| import altair as alt | |
| import pickle | |
| from datetime import datetime, timezone | |
| from typing import List, Dict, Tuple, Any, Union | |
| # Average ⬆️ human baseline is 0.897 (source: averaging human baselines below) | |
| # ARC human baseline is 0.80 (source: https://lab42.global/arc/) | |
| # HellaSwag human baseline is 0.95 (source: https://deepgram.com/learn/hellaswag-llm-benchmark-guide) | |
| # MMLU human baseline is 0.898 (source: https://openreview.net/forum?id=d7KBjmI3GmQ) | |
| # TruthfulQA human baseline is 0.94(source: https://arxiv.org/pdf/2109.07958.pdf) | |
| # Define the human baselines | |
| HUMAN_BASELINES = { | |
| "Average ⬆️": 0.897 * 100, | |
| "ARC": 0.80 * 100, | |
| "HellaSwag": 0.95 * 100, | |
| "MMLU": 0.898 * 100, | |
| "TruthfulQA": 0.94 * 100, | |
| } | |
| def to_datetime(model_info: Tuple[str, Any]) -> datetime: | |
| """ | |
| Converts the lastModified attribute of the object to datetime. | |
| :param model_info: A tuple containing the name and object. | |
| The object must have a lastModified attribute | |
| with a string representing the date and time. | |
| :return: A datetime object converted from the lastModified attribute of the input object. | |
| """ | |
| name, obj = model_info | |
| return datetime.strptime(obj.lastModified, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc) | |
| def join_model_info_with_results(results_df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Integrates model information with the results DataFrame by matching 'Model sha'. | |
| :param results_df: A DataFrame containing results information including 'Model sha' column. | |
| :return: A DataFrame with updated 'Results Date' columns, which are synchronized with model information. | |
| """ | |
| # load cache from disk | |
| try: | |
| with open("model_info_cache.pkl", "rb") as f: | |
| model_info_cache = pickle.load(f) | |
| except (EOFError, FileNotFoundError): | |
| model_info_cache = {} | |
| # Sort date strings using datetime objects as keys | |
| sorted_dates = sorted(list(model_info_cache.items()), key=to_datetime, reverse=True) | |
| results_df["Results Date"] = datetime.now().replace(tzinfo=timezone.utc) | |
| # Define the date format string | |
| date_format = "%Y-%m-%dT%H:%M:%S.%fZ" | |
| # Iterate over sorted_dates and update the dataframe | |
| for name, obj in sorted_dates: | |
| # Convert the lastModified string to a datetime object | |
| last_modified_datetime = datetime.strptime(obj.lastModified, date_format).replace(tzinfo=timezone.utc) | |
| # Update the "Results Date" column where "Model sha" equals obj.sha | |
| results_df.loc[results_df["Model sha"] == obj.sha, "Results Date"] = last_modified_datetime | |
| return results_df | |
| def create_scores_df(results_df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Generates a DataFrame containing the maximum scores until each result date. | |
| :param results_df: A DataFrame containing result information including metric scores and result dates. | |
| :return: A new DataFrame containing the maximum scores until each result date for every metric. | |
| """ | |
| # Step 1: Ensure 'Results Date' is in datetime format and sort the DataFrame by it | |
| results_df["Results Date"] = pd.to_datetime(results_df["Results Date"]) | |
| results_df.sort_values(by="Results Date", inplace=True) | |
| # Step 2: Initialize the scores dictionary | |
| scores = { | |
| "Average ⬆️": [], | |
| "ARC": [], | |
| "HellaSwag": [], | |
| "MMLU": [], | |
| "TruthfulQA": [], | |
| "Result Date": [], | |
| } | |
| # Step 3: Iterate over the rows of the DataFrame and update the scores dictionary | |
| for i, row in results_df.iterrows(): | |
| date = row["Results Date"] | |
| for column in scores.keys(): | |
| if column == "Result Date": | |
| if not scores[column] or scores[column][-1] <= date: | |
| scores[column].append(date) | |
| continue | |
| current_max = scores[column][-1] if scores[column] else float("-inf") | |
| scores[column].append(max(current_max, row[column])) | |
| # Step 4: Convert the dictionary to a DataFrame | |
| return pd.DataFrame(scores) | |
| def create_plot_df(scores_df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Transforms the scores DataFrame into a new format suitable for plotting. | |
| :param scores_df: A DataFrame containing metric scores and result dates. | |
| :return: A new DataFrame reshaped for plotting purposes. | |
| """ | |
| # Sample columns | |
| cols = ["Average ⬆️", "ARC", "HellaSwag", "MMLU", "TruthfulQA"] | |
| # Initialize the list to store DataFrames | |
| dfs = [] | |
| # Iterate over the cols and create a new DataFrame for each column | |
| for col in cols: | |
| d = scores_df[[col, "Result Date"]].copy().reset_index(drop=True) | |
| d["Metric Name"] = col | |
| d.rename(columns={col: "Metric Value"}, inplace=True) | |
| dfs.append(d) | |
| # Concatenate all the created DataFrames | |
| concat_df = pd.concat(dfs, ignore_index=True) | |
| # Sort values by 'Result Date' | |
| concat_df.sort_values(by="Result Date", inplace=True) | |
| concat_df.reset_index(drop=True, inplace=True) | |
| # Drop duplicates based on 'Metric Name' and 'Metric Value' and keep the first (earliest) occurrence | |
| concat_df.drop_duplicates(subset=["Metric Name", "Metric Value"], keep="first", inplace=True) | |
| concat_df.reset_index(drop=True, inplace=True) | |
| return concat_df | |
| def create_metric_plot_obj(df: pd.DataFrame, metrics: List[str], human_baselines: Dict[str, float]) -> alt.LayerChart: | |
| """ | |
| Creates a visualization of metrics over time compared to human baselines. | |
| :param df: A DataFrame containing 'Metric Name', 'Metric Value', and 'Result Date' columns. | |
| :param metrics: A list of metric names to be included in the plot. | |
| :param human_baselines: A dictionary mapping metric names to their corresponding human baseline values. | |
| :return: An Altair LayerChart object visualizing the metrics over time. | |
| """ | |
| # Filter the DataFrame based on the metrics parameter | |
| df = df[df["Metric Name"].isin(metrics)] | |
| # Filter the human_baselines dictionary to include only the specified metrics | |
| filtered_human_baselines = {k: v for k, v in human_baselines.items() if k in metrics} | |
| # Create a DataFrame from filtered human baselines | |
| human_baselines_df = pd.DataFrame(list(filtered_human_baselines.items()), columns=["Metric Name", "Metric Value"]) | |
| # Create the lines chart for each metric over time. | |
| base = alt.Chart(df).encode(x="Result Date:T") | |
| lines = base.mark_line().encode( | |
| alt.Y("Metric Value:Q", scale=alt.Scale(domain=[0, 100])), | |
| color="Metric Name:N", | |
| ) | |
| # Create the rules (horizontal lines) chart for the human baselines. | |
| yrules = ( | |
| alt.Chart(human_baselines_df) | |
| .mark_rule(strokeDash=[12, 6], size=2) | |
| .encode(y="Metric Value:Q", color="Metric Name:N") | |
| ) | |
| # Combine lines with yrules and return the chart. | |
| return lines + yrules | |