|
|
|
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from typing import Tuple |
|
|
from datasets import load_dataset, Features, Value |
|
|
from about import results_repo_validation, results_repo_test |
|
|
from about import METRICS, STANDARD_COLS |
|
|
from loguru import logger |
|
|
|
|
|
def make_user_clickable(name: str): |
|
|
link =f'https://huggingface.co/{name}' |
|
|
return f'<a target="_blank" href="{link}" style="color: var(--link-text-color); text-decoration: underline;text-decoration-style: dotted;">{name}</a>' |
|
|
def make_tag_clickable(tag: str): |
|
|
return f'<a target="_blank" href="{tag}" style="color: var(--link-text-color); text-decoration: underline;text-decoration-style: dotted;">link</a>' |
|
|
|
|
|
def fetch_dataset_df(): |
|
|
logger.info("Fetching latest results dataset from Hugging Face Hub...") |
|
|
|
|
|
metric_features = { |
|
|
f'mean_{m}': Value('float64') for m in METRICS |
|
|
} |
|
|
metric_features.update({ |
|
|
f'std_{m}': Value('float64') for m in METRICS |
|
|
}) |
|
|
other_features = { |
|
|
'user': Value('string'), |
|
|
'Endpoint': Value('string'), |
|
|
'submission_time': Value('string'), |
|
|
'model_report': Value('string'), |
|
|
'anonymous': Value('bool'), |
|
|
} |
|
|
feature_schema = Features(metric_features | other_features) |
|
|
|
|
|
dset = load_dataset(results_repo_validation, |
|
|
split='train', |
|
|
features=feature_schema, |
|
|
download_mode="force_redownload") |
|
|
full_df = dset.to_pandas() |
|
|
expected_mean_cols = [f"mean_{col}" for col in METRICS] |
|
|
expected_std_cols = [f"std_{col}" for col in METRICS] |
|
|
expected_all_cols = STANDARD_COLS + expected_mean_cols + expected_std_cols |
|
|
assert all( |
|
|
col in full_df.columns for col in expected_all_cols |
|
|
), f"Expected columns not found in {full_df.columns}. Missing columns: {set(expected_all_cols) - set(full_df.columns)}" |
|
|
|
|
|
df = full_df.copy() |
|
|
df = df[df["user"] != "test"].copy() |
|
|
df["submission_time"] = pd.to_datetime(df["submission_time"], errors="coerce") |
|
|
df = df.dropna(subset=["submission_time"]) |
|
|
|
|
|
|
|
|
latest = ( |
|
|
df.sort_values("submission_time") |
|
|
.drop_duplicates(subset=["Endpoint", "user"], keep="last") |
|
|
.sort_values(["Endpoint", "user"]) |
|
|
.reset_index(drop=True) |
|
|
) |
|
|
latest.rename(columns={"submission_time": "submission time"}, inplace=True) |
|
|
return latest |
|
|
|
|
|
|
|
|
def clip_and_log_transform(y: np.ndarray): |
|
|
""" |
|
|
Clip to a detection limit and transform to log10 scale. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
y : np.ndarray |
|
|
The array to be clipped and transformed. |
|
|
""" |
|
|
y = np.clip(y, a_min=0, a_max=None) |
|
|
return np.log10(y + 1) |
|
|
|
|
|
|
|
|
def bootstrap_sampling(size: int, n_samples: int) -> np.ndarray: |
|
|
""" |
|
|
Generate bootstrap samples for a given size and number of samples. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
size : int |
|
|
The size of the data. |
|
|
n_samples : int |
|
|
The number of samples to generate. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
np.ndarray |
|
|
Returns a numpy array of the bootstrap samples. |
|
|
""" |
|
|
rng = np.random.default_rng(0) |
|
|
return rng.choice(size, size=(n_samples, size), replace=True) |
|
|
|
|
|
|
|
|
def metrics_per_ep(pred: np.ndarray, |
|
|
true: np.ndarray |
|
|
)->Tuple[float, float, float, float]: |
|
|
"""Predict evaluation metrics for a single sample |
|
|
Parameters |
|
|
---------- |
|
|
pred : np.ndarray |
|
|
Array with predictions |
|
|
true : np.ndarray |
|
|
Array with actual values |
|
|
Returns |
|
|
------- |
|
|
Tuple[float, float, float, float] |
|
|
Resulting metrics: (MAE, RAE, R2, Spearman R, Kendall's Tau) |
|
|
""" |
|
|
from scipy.stats import spearmanr, kendalltau |
|
|
from sklearn.metrics import mean_absolute_error, r2_score |
|
|
mae = mean_absolute_error(true, pred) |
|
|
rae = mae / np.mean(np.abs(true - np.mean(true))) |
|
|
if np.nanstd(true) == 0: |
|
|
r2=np.nan |
|
|
else: |
|
|
r2 = r2_score(true, pred) |
|
|
|
|
|
if np.nanstd(pred) < 0.0001: |
|
|
spr = np.nan |
|
|
ktau = np.nan |
|
|
else: |
|
|
spr = spearmanr(true, pred).statistic |
|
|
ktau = kendalltau(true, pred).statistic |
|
|
|
|
|
return mae, rae, r2, spr, ktau |
|
|
|
|
|
def bootstrap_metrics(pred: np.ndarray, |
|
|
true: np.ndarray, |
|
|
endpoint: str, |
|
|
n_bootstrap_samples=1000 |
|
|
)->pd.DataFrame: |
|
|
"""Calculate bootstrap metrics given predicted and true values |
|
|
Parameters |
|
|
---------- |
|
|
pred : np.ndarray |
|
|
Predicted endpoints |
|
|
true : np.ndarray |
|
|
Actual endpoint values |
|
|
endpoint : str |
|
|
String with endpoint |
|
|
n_bootstrap_samples : int, optional |
|
|
Size of bootstrapsample, by default 1000 |
|
|
Returns |
|
|
------- |
|
|
pd.DataFrame |
|
|
Dataframe with estimated metric per bootstrap sample for the given endpoint |
|
|
""" |
|
|
cols = ["Sample", "Endpoint", "Metric", "Value"] |
|
|
bootstrap_results = pd.DataFrame(columns=cols) |
|
|
for i, indx in enumerate( |
|
|
bootstrap_sampling(true.shape[0], n_bootstrap_samples) |
|
|
): |
|
|
mae, rae, r2, spr, ktau = metrics_per_ep(pred[indx], true[indx]) |
|
|
scores = pd.DataFrame( |
|
|
[ |
|
|
[i, endpoint, "MAE", mae], |
|
|
[i, endpoint, "RAE", rae], |
|
|
[i, endpoint, "R2", r2], |
|
|
[i, endpoint, "Spearman R", spr], |
|
|
[i, endpoint, "Kendall's Tau", ktau] |
|
|
], |
|
|
columns=cols |
|
|
) |
|
|
bootstrap_results = pd.concat([bootstrap_results, scores]) |
|
|
return bootstrap_results |
|
|
|
|
|
def map_metric_to_stats(df: pd.DataFrame, average=False) -> pd.DataFrame: |
|
|
"""Map mean and std to 'mean +/- std' string for each metric |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
df : pd.DataFrame |
|
|
Dataframe to modify |
|
|
average : bool, optional |
|
|
Whether the dataframe contains average info, by default False |
|
|
|
|
|
Returns |
|
|
------- |
|
|
pd.DataFrame |
|
|
Modified dataframe |
|
|
""" |
|
|
metric_cols = METRICS[:] |
|
|
if average: |
|
|
metric_cols[1] = "MA-RAE" |
|
|
cols_drop = [] |
|
|
for col in metric_cols: |
|
|
mean_col = f"mean_{col}" |
|
|
std_col = f"std_{col}" |
|
|
df[col] = df.apply( |
|
|
lambda row: f"{row[mean_col]:.2f} +/- {row[std_col]:.2f}", |
|
|
axis=1 |
|
|
) |
|
|
cols_drop.extend([mean_col, std_col]) |
|
|
df = df.drop(columns=cols_drop) |
|
|
return df |