Spaces:
Running
Running
| """ | |
| Evaluator objects for different evaluation types. | |
| """ | |
| import logging | |
| import random | |
| from abc import ABC, abstractmethod | |
| import heapq | |
| from collections import defaultdict | |
| import pytrec_eval | |
| import numpy as np | |
| import sklearn.cluster | |
| import torch | |
| from scipy.stats import pearsonr | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.metrics import ( | |
| accuracy_score, | |
| average_precision_score, | |
| classification_report, | |
| f1_score, | |
| precision_score, | |
| recall_score, | |
| label_ranking_average_precision_score, | |
| ) | |
| from sklearn.metrics.cluster import v_measure_score | |
| from sklearn.metrics.pairwise import ( | |
| paired_cosine_distances, | |
| paired_euclidean_distances, | |
| paired_manhattan_distances, | |
| ) | |
| from sklearn.multioutput import MultiOutputRegressor | |
| from sklearn.preprocessing import MultiLabelBinarizer | |
| from typing import Dict, List, Tuple | |
| from .eval_utils import ( | |
| cos_sim, | |
| dot_score, | |
| mrr, | |
| recall_cap, | |
| hole, | |
| confidence_scores, | |
| nAUC, | |
| top_k_accuracy, | |
| ) | |
| class Evaluator(ABC): | |
| """Base class for all evaluators | |
| Extend this class and implement __call__ for custom evaluators. | |
| """ | |
| def __init__(self, seed=42, **kwargs): | |
| self.seed = seed | |
| random.seed(self.seed) | |
| np.random.seed(self.seed) | |
| torch.manual_seed(self.seed) | |
| torch.cuda.manual_seed_all(self.seed) | |
| def __call__(self, model): | |
| """This is called during training to evaluate the model. | |
| It returns scores. | |
| Parameters | |
| ---------- | |
| model: | |
| the model to evaluate | |
| """ | |
| pass | |
| logger = logging.getLogger(__name__) | |
| class logRegClassificationEvaluator(Evaluator): | |
| def __init__( | |
| self, | |
| embeds_train, | |
| y_train, | |
| embeds_test, | |
| y_test, | |
| max_iter=1000, | |
| **kwargs, | |
| ): | |
| super().__init__(**kwargs) | |
| self.embeds_train = embeds_train | |
| self.y_train = y_train | |
| self.embeds_test = embeds_test | |
| self.y_test = y_test | |
| self.max_iter = max_iter | |
| def __call__(self): | |
| scores = {} | |
| clf = LogisticRegression( | |
| random_state=self.seed, | |
| n_jobs=-1, | |
| max_iter=self.max_iter, | |
| verbose=1 if logger.isEnabledFor(logging.DEBUG) else 0, | |
| ) | |
| logger.info(f"Encoding {len(self.embeds_train)} training embeds...") | |
| X_train = np.asarray(self.embeds_train) | |
| logger.info(f"Encoding {len(self.embeds_test)} test embeds...") | |
| X_test = np.asarray(self.embeds_test) | |
| logger.info("Fitting logistic regression classifier...") | |
| clf.fit(X_train, self.y_train) | |
| logger.info("Evaluating...") | |
| y_pred = clf.predict(X_test) | |
| accuracy = accuracy_score(self.y_test, y_pred) | |
| f1 = f1_score(self.y_test, y_pred, average="macro") | |
| scores["accuracy"] = accuracy | |
| scores["f1"] = f1 | |
| # if binary classification | |
| if len(np.unique(self.y_train)) == 2: | |
| ap = average_precision_score(self.y_test, y_pred) | |
| scores["ap"] = ap | |
| return scores | |
| class ClusteringEvaluator(Evaluator): | |
| def __init__( | |
| self, | |
| embeds, | |
| labels, | |
| clustering_batch_size=500, | |
| **kwargs, | |
| ): | |
| super().__init__(**kwargs) | |
| self.embeds = embeds | |
| self.labels = labels | |
| self.clustering_batch_size = clustering_batch_size | |
| def __call__(self): | |
| logger.info(f"Encoding {len(self.embeds)} embeds...") | |
| corpus_embeddings = np.asarray(self.embeds) | |
| logger.info("Fitting Mini-Batch K-Means model...") | |
| clustering_model = sklearn.cluster.MiniBatchKMeans( | |
| n_clusters=len(set(self.labels)), | |
| batch_size=self.clustering_batch_size, | |
| n_init="auto", | |
| ) | |
| clustering_model.fit(corpus_embeddings) | |
| cluster_assignment = clustering_model.labels_ | |
| logger.info("Evaluating...") | |
| v_measure = v_measure_score(self.labels, cluster_assignment) | |
| return {"v_measure": v_measure} | |
| class PairClassificationEvaluator(Evaluator): | |
| """Evaluate a model based on the similarity of the embeddings by calculating the accuracy of identifying similar and | |
| dissimilar embeds. | |
| The metrics are the cosine similarity as well as euclidean and Manhattan distance | |
| The returned score is the accuracy with a specified metric. | |
| The results are written in a CSV. If a CSV already exists, then values are appended. | |
| The labels need to be 0 for dissimilar pairs and 1 for similar pairs. | |
| :param embeds1: The first column of embeds | |
| :param embeds2: The second column of embeds | |
| :param labels: labels[i] is the label for the pair (embeds1[i], embeds2[i]). Must be 0 or 1 | |
| :param name: Name for the output | |
| :param write_csv: Write results to a CSV file | |
| """ | |
| def __init__(self, embeds1, embeds2, labels, **kwargs): | |
| super().__init__(**kwargs) | |
| self.embeds1 = embeds1 | |
| self.embeds2 = embeds2 | |
| self.labels = labels | |
| assert len(self.embeds1) == len(self.embeds2) | |
| assert len(self.embeds1) == len(self.labels) | |
| for label in labels: | |
| assert label == 0 or label == 1 | |
| def __call__(self): | |
| scores = self.compute_metrics() | |
| # Compute the max of Average Precision (AP) over all distance metrics. | |
| top_ap_score = max(score for k, score in scores.items() if k.endswith("_ap")) | |
| scores["top_ap"] = top_ap_score | |
| return scores | |
| def compute_metrics(self): | |
| embeddings1 = np.array(self.embeds1) | |
| embeddings2 = np.array(self.embeds2) | |
| logger.info("Computing similarity distances...") | |
| cosine_scores = 1 - paired_cosine_distances(embeddings1, embeddings2) | |
| manhattan_distances = paired_manhattan_distances(embeddings1, embeddings2) | |
| euclidean_distances = paired_euclidean_distances(embeddings1, embeddings2) | |
| embeddings1_np = np.asarray(embeddings1) | |
| embeddings2_np = np.asarray(embeddings2) | |
| dot_scores = [ | |
| np.dot(embeddings1_np[i], embeddings2_np[i]) | |
| for i in range(len(embeddings1_np)) | |
| ] | |
| logger.info("Computing metrics...") | |
| labels = np.asarray(self.labels) | |
| output_scores = {} | |
| for short_name, name, scores, reverse in [ | |
| ["cos_sim", "Cosine-Similarity", cosine_scores, True], | |
| ["manhattan", "Manhattan-Distance", manhattan_distances, False], | |
| ["euclidean", "Euclidean-Distance", euclidean_distances, False], | |
| ["dot", "Dot-Product", dot_scores, True], | |
| ]: | |
| metrics = self._compute_metrics(scores, labels, reverse) | |
| metrics = {short_name + "_" + k: v for k, v in metrics.items()} | |
| output_scores.update(metrics) | |
| return output_scores | |
| def _compute_metrics(scores, labels, high_score_more_similar): | |
| """Compute the metrics for the given scores and labels. | |
| Args: | |
| scores (`np.ndarray` of shape (n_pairs, )): The similarity/dissimilarity scores for the pairs. | |
| labels (`np.ndarray` of shape (n_pairs, )): The labels for the pairs. | |
| high_score_more_similar (`bool`): If true, then the higher the score, the more similar the pairs are. | |
| Returns: | |
| `dict`: The metrics for the given scores and labels. | |
| """ | |
| acc, acc_threshold = PairClassificationEvaluator.find_best_acc_and_threshold( | |
| scores, labels, high_score_more_similar | |
| ) | |
| f1, precision, recall, f1_threshold = ( | |
| PairClassificationEvaluator.find_best_f1_and_threshold( | |
| scores, labels, high_score_more_similar | |
| ) | |
| ) | |
| ap = PairClassificationEvaluator.ap_score( | |
| scores, labels, high_score_more_similar | |
| ) | |
| return { | |
| "accuracy": acc, | |
| "accuracy_threshold": acc_threshold, | |
| "f1": f1, | |
| "f1_threshold": f1_threshold, | |
| "precision": precision, | |
| "recall": recall, | |
| "ap": ap, | |
| } | |
| def find_best_acc_and_threshold(scores, labels, high_score_more_similar: bool): | |
| assert len(scores) == len(labels) | |
| rows = list(zip(scores, labels)) | |
| rows = sorted(rows, key=lambda x: x[0], reverse=high_score_more_similar) | |
| max_acc = 0 | |
| best_threshold = -1 | |
| positive_so_far = 0 | |
| remaining_negatives = sum(np.array(labels) == 0) | |
| for i in range(len(rows) - 1): | |
| score, label = rows[i] | |
| if label == 1: | |
| positive_so_far += 1 | |
| else: | |
| remaining_negatives -= 1 | |
| acc = (positive_so_far + remaining_negatives) / len(labels) | |
| if acc > max_acc: | |
| max_acc = acc | |
| best_threshold = (rows[i][0] + rows[i + 1][0]) / 2 | |
| return max_acc, best_threshold | |
| def find_best_f1_and_threshold(scores, labels, high_score_more_similar: bool): | |
| assert len(scores) == len(labels) | |
| scores = np.asarray(scores) | |
| labels = np.asarray(labels) | |
| rows = list(zip(scores, labels)) | |
| rows = sorted(rows, key=lambda x: x[0], reverse=high_score_more_similar) | |
| best_f1 = best_precision = best_recall = 0 | |
| threshold = 0 | |
| nextract = 0 | |
| ncorrect = 0 | |
| total_num_duplicates = sum(labels) | |
| for i in range(len(rows) - 1): | |
| score, label = rows[i] | |
| nextract += 1 | |
| if label == 1: | |
| ncorrect += 1 | |
| if ncorrect > 0: | |
| precision = ncorrect / nextract | |
| recall = ncorrect / total_num_duplicates | |
| f1 = 2 * precision * recall / (precision + recall) | |
| if f1 > best_f1: | |
| best_f1 = f1 | |
| best_precision = precision | |
| best_recall = recall | |
| threshold = (rows[i][0] + rows[i + 1][0]) / 2 | |
| return best_f1, best_precision, best_recall, threshold | |
| def ap_score(scores, labels, high_score_more_similar: bool): | |
| return average_precision_score( | |
| labels, scores * (1 if high_score_more_similar else -1) | |
| ) | |
| class MultiClassMultiOutputLogRegClassificationEvaluator(Evaluator): | |
| def __init__( | |
| self, | |
| embeds_train, | |
| y_train, | |
| embeds_test, | |
| y_test, | |
| max_iter=1000, | |
| **kwargs, | |
| ): | |
| super().__init__(**kwargs) | |
| self.embeds_train = embeds_train | |
| self.y_train = y_train | |
| self.embeds_test = embeds_test | |
| self.y_test = y_test | |
| self.max_iter = max_iter | |
| def __call__(self): | |
| scores = {} | |
| mlb = MultiLabelBinarizer() | |
| # all classes in y_train and y_test | |
| class_labels = list(self.y_train) + list(self.y_test) | |
| labels = [class_label.split(", ") for class_label in class_labels] | |
| mlb.fit(labels) | |
| train_labels = [class_label.split(", ") for class_label in self.y_train] | |
| test_labels = [class_label.split(", ") for class_label in self.y_test] | |
| y_train = mlb.transform(train_labels) | |
| y_test = mlb.transform(test_labels) | |
| clf = MultiOutputRegressor( | |
| LogisticRegression( | |
| random_state=self.seed, solver="lbfgs", max_iter=self.max_iter | |
| ) | |
| ).fit(self.embeds_train, y_train) | |
| y_pred = clf.predict(self.embeds_test) | |
| results_dict = classification_report(y_test, y_pred, output_dict=True) | |
| assert isinstance( | |
| results_dict, dict | |
| ), "Should always be true since `output_dict=True` is passed to sklearn.metric.classification_report" | |
| scores["precision"] = results_dict["macro avg"]["precision"] | |
| scores["recall"] = results_dict["macro avg"]["recall"] | |
| scores["f1"] = results_dict["macro avg"]["f1-score"] | |
| scores["accuracy"] = accuracy_score(y_test, y_pred) | |
| return scores | |
| class MultiClassMultiOutputKNNClassificationEvaluator(Evaluator): | |
| def __init__( | |
| self, | |
| embeds_train, | |
| y_train, | |
| embeds_test, | |
| y_test, | |
| n_neighbors=5, | |
| **kwargs, | |
| ): | |
| super().__init__(**kwargs) | |
| self.embeds_train = embeds_train | |
| self.y_train = y_train | |
| self.embeds_test = embeds_test | |
| self.y_test = y_test | |
| self.n_neighbors = n_neighbors | |
| def __call__(self): | |
| scores = {} | |
| mlb = MultiLabelBinarizer() | |
| class_labels = list(self.y_train) + list(self.y_test) | |
| labels = [class_label.split(", ") for class_label in class_labels] | |
| mlb.fit(labels) | |
| train_labels = [class_label.split(", ") for class_label in self.y_train] | |
| test_labels = [class_label.split(", ") for class_label in self.y_test] | |
| y_train = mlb.transform(train_labels) | |
| y_test = mlb.transform(test_labels) | |
| clf = sklearn.neighbors.KNeighborsClassifier( | |
| n_neighbors=self.n_neighbors, metric="cosine" | |
| ) | |
| logger.info("Fitting KNN classifier...") | |
| clf.fit(self.embeds_train, y_train) | |
| logger.info("Evaluating...") | |
| y_pred = clf.predict(self.embeds_test) | |
| accuracy = accuracy_score(y_test, y_pred) | |
| f1 = f1_score(y_test, y_pred, average="macro") | |
| precision = precision_score(y_test, y_pred, average="macro") | |
| recall = recall_score(y_test, y_pred, average="macro") | |
| lrap = label_ranking_average_precision_score(y_test, y_pred) | |
| scores["f1"] = f1 | |
| scores["accuracy"] = accuracy | |
| scores["precision"] = precision | |
| scores["recall"] = recall | |
| scores["lrap"] = lrap | |
| return scores | |
| class BiGeneMiningEvaluator(Evaluator): | |
| """ | |
| BiGene Mining Evaluator, analogous to Bitext Mining Evaluator https://github.com/embeddings-benchmark/mteb/blob/main/mteb/evaluation/evaluators/BitextMiningEvaluator.py. | |
| If top_k > 1, then recall@k is also computed. | |
| """ | |
| def __init__(self, embeds1, embeds2, top_k=1, **kwargs): | |
| super().__init__(**kwargs) | |
| self.n = len(embeds1) | |
| self.embeds1 = np.array(embeds1) | |
| self.embeds2 = np.array(embeds2) | |
| self.gold = list(zip(range(self.n), range(self.n))) | |
| self.top_k = top_k | |
| def __call__(self): | |
| scores = self.compute_metrics() | |
| return scores | |
| def compute_metrics(self): | |
| logger.info(f"Finding nearest neighbors... with top_k={self.top_k}") | |
| nearest_neighbors = self._similarity_search( | |
| self.embeds1, self.embeds2, top_k=self.top_k | |
| ) | |
| # Compute errors | |
| logger.info("Computing metrics...") | |
| labels = [] | |
| predictions = [] | |
| # Get predictions and labels for top_k=1. | |
| for i, x in enumerate(nearest_neighbors): | |
| j = x[0]["corpus_id"] | |
| predictions.append(j) | |
| labels.append(self.gold[i][1]) | |
| scores = { | |
| "precision": precision_score( | |
| labels, predictions, zero_division=0, average="weighted" | |
| ), | |
| "recall": recall_score( | |
| labels, predictions, zero_division=0, average="weighted" | |
| ), | |
| "f1": f1_score(labels, predictions, zero_division=0, average="weighted"), | |
| "accuracy": accuracy_score(labels, predictions), | |
| } | |
| if self.top_k > 1: | |
| # Compute recall@k. | |
| top_k_preds = [] | |
| for i, x in enumerate(nearest_neighbors): | |
| top_k_preds.append([pred["corpus_id"] for pred in x]) | |
| top_k_recall = [ | |
| self.gold[i][1] in top_k_pred | |
| for i, top_k_pred in enumerate(top_k_preds) | |
| ] | |
| scores[f"recall_at_{self.top_k}"] = sum(top_k_recall) / len(top_k_recall) | |
| return scores | |
| def _similarity_search( | |
| self, | |
| query_embeddings, | |
| corpus_embeddings, | |
| query_chunk_size=100, | |
| corpus_chunk_size=500000, | |
| top_k=1, | |
| score_function=cos_sim, | |
| ): | |
| """This function performs a cosine similarity search between a list of query embeddings and a list of corpus embeddings. | |
| It can be used for Information Retrieval / Semantic Search for corpora up to about 1 Million entries. | |
| :param query_embeddings: A 2 dimensional tensor with the query embeddings. | |
| :param corpus_embeddings: A 2 dimensional tensor with the corpus embeddings. | |
| :param query_chunk_size: Process 100 queries simultaneously. Increasing that value increases the speed, but requires more memory. | |
| :param corpus_chunk_size: Scans the corpus 50k entries at a time. Increasing that value increases the speed, but requires more memory. | |
| :param top_k: Retrieve top k matching entries. | |
| :param score_function: Function for computing scores. By default, cosine similarity. | |
| :return: Returns a list with one entry for each query. Each entry is a list of dictionaries with the keys 'corpus_id' and 'score', sorted by decreasing cosine similarity scores. | |
| """ | |
| query_embeddings = torch.from_numpy(query_embeddings) | |
| corpus_embeddings = torch.from_numpy(corpus_embeddings) | |
| if len(query_embeddings.shape) == 1: | |
| query_embeddings = query_embeddings.unsqueeze(0) | |
| if len(corpus_embeddings.shape) == 1: | |
| corpus_embeddings = corpus_embeddings.unsqueeze(0) | |
| # Check that corpus and queries are on the same device | |
| if corpus_embeddings.device != query_embeddings.device: | |
| query_embeddings = query_embeddings.to(corpus_embeddings.device) | |
| queries_result_list = [[] for _ in range(len(query_embeddings))] | |
| for query_start_idx in range(0, len(query_embeddings), query_chunk_size): | |
| # Iterate over chunks of the corpus | |
| for corpus_start_idx in range(0, len(corpus_embeddings), corpus_chunk_size): | |
| # Compute cosine similarities | |
| cos_scores = score_function( | |
| query_embeddings[ | |
| query_start_idx : query_start_idx + query_chunk_size | |
| ], | |
| corpus_embeddings[ | |
| corpus_start_idx : corpus_start_idx + corpus_chunk_size | |
| ], | |
| ) | |
| # Get top-k scores | |
| cos_scores_top_k_values, cos_scores_top_k_idx = torch.topk( | |
| cos_scores, | |
| min(top_k, len(cos_scores[0])), | |
| dim=1, | |
| largest=True, | |
| sorted=False, | |
| ) | |
| cos_scores_top_k_values = cos_scores_top_k_values.cpu().tolist() | |
| cos_scores_top_k_idx = cos_scores_top_k_idx.cpu().tolist() | |
| for query_itr in range(len(cos_scores)): | |
| for sub_corpus_id, score in zip( | |
| cos_scores_top_k_idx[query_itr], | |
| cos_scores_top_k_values[query_itr], | |
| ): | |
| corpus_id = corpus_start_idx + sub_corpus_id | |
| query_id = query_start_idx + query_itr | |
| queries_result_list[query_id].append( | |
| {"corpus_id": corpus_id, "score": score} | |
| ) | |
| # Sort and strip to top_k results | |
| for idx in range(len(queries_result_list)): | |
| queries_result_list[idx] = sorted( | |
| queries_result_list[idx], key=lambda x: x["score"], reverse=True | |
| ) | |
| queries_result_list[idx] = queries_result_list[idx][0:top_k] | |
| return queries_result_list | |
| class EDSEvaluator(Evaluator): | |
| """ | |
| Evolutionary Distance Similarity Evaluator, analogous to Semantic Textual Similarity Evaluator. | |
| Adapted from https://github.com/embeddings-benchmark/mteb/blob/main/mteb/evaluation/evaluators/STSEvaluator.py | |
| """ | |
| def __init__(self, embeds1, embeds2, gold_scores, **kwargs): | |
| super().__init__(**kwargs) | |
| self.embeds1 = embeds1 | |
| self.embeds2 = embeds2 | |
| self.gold_scores = gold_scores | |
| def __call__(self): | |
| embeddings1 = np.array(self.embeds1) | |
| embeddings2 = np.array(self.embeds2) | |
| logger.info("Evaluating...") | |
| cosine_scores = paired_cosine_distances(embeddings1, embeddings2) | |
| manhattan_distances = paired_manhattan_distances(embeddings1, embeddings2) | |
| euclidean_distances = paired_euclidean_distances(embeddings1, embeddings2) | |
| cosine_pearson, _ = pearsonr(self.gold_scores, cosine_scores) | |
| manhattan_pearson, _ = pearsonr(self.gold_scores, manhattan_distances) | |
| euclidean_pearson, _ = pearsonr(self.gold_scores, euclidean_distances) | |
| top_corr = max( | |
| cosine_pearson, | |
| manhattan_pearson, | |
| euclidean_pearson, | |
| ) | |
| return { | |
| "cos_sim": cosine_pearson, | |
| "manhattan": manhattan_pearson, | |
| "euclidean": euclidean_pearson, | |
| "top_corr": top_corr, | |
| } | |
| class RetrievalEvaluator(Evaluator): | |
| """Adapted from | |
| https://github.com/embeddings-benchmark/mteb/blob/main/mteb/evaluation/evaluators/RetrievalEvaluator.py | |
| """ | |
| def __init__( | |
| self, | |
| corpus_embeds, | |
| query_embeds, | |
| corpus_ids, | |
| query_ids, | |
| qrels: Dict[str, Dict[str, int]], | |
| k_values: List[int] = [5, 10, 50], | |
| score_function: str = "cos_sim", | |
| corpus_chunk_size: int = 50000, | |
| **kwargs, | |
| ): | |
| super().__init__(**kwargs) | |
| self.corpus_embeds = corpus_embeds | |
| self.query_embeds = query_embeds | |
| self.corpus_ids = corpus_ids | |
| self.query_ids = query_ids | |
| self.qrels = qrels | |
| self.k_values = k_values | |
| self.top_k = max(k_values) if "top_k" not in kwargs else kwargs["top_k"] | |
| self.score_function = score_function | |
| self.score_functions = { | |
| "cos_sim": cos_sim, | |
| "dot": dot_score, | |
| } | |
| self.corpus_chunk_size = corpus_chunk_size | |
| def __call__(self): | |
| results = self.search( | |
| self.corpus_embeds, | |
| self.query_embeds, | |
| self.corpus_ids, | |
| self.query_ids, | |
| self.top_k, | |
| self.score_function, | |
| ) | |
| ndcg, _map, recall, precision, naucs = self.evaluate( | |
| self.qrels, results, self.k_values | |
| ) | |
| mrr, naucs_mrr = self.evaluate_custom(self.qrels, results, self.k_values, "mrr") | |
| scores = { | |
| **{f"ndcg_at_{k.split('@')[1]}": v for (k, v) in ndcg.items()}, | |
| **{f"map_at_{k.split('@')[1]}": v for (k, v) in _map.items()}, | |
| **{f"recall_at_{k.split('@')[1]}": v for (k, v) in recall.items()}, | |
| **{f"precision_at_{k.split('@')[1]}": v for (k, v) in precision.items()}, | |
| **{f"mrr_at_{k.split('@')[1]}": v for (k, v) in mrr.items()}, | |
| **{ | |
| k.replace("@", "_at_").replace("_P", "_precision").lower(): v | |
| for k, v in naucs.items() | |
| }, | |
| **{ | |
| k.replace("@", "_at_").replace("_P", "_precision").lower(): v | |
| for k, v in naucs_mrr.items() | |
| }, | |
| } | |
| return scores | |
| def search( | |
| self, | |
| corpus_embeds, | |
| query_embeds, | |
| corpus_ids, | |
| query_ids, | |
| top_k: int, | |
| score_function: str, | |
| return_sorted: bool = False, | |
| **kwargs, | |
| ) -> dict[str, dict[str, float]]: | |
| # Create embeddings for all queries using model.encode() | |
| # Runs semantic search against the corpus embeddings | |
| # Returns a ranked list with the corpus ids | |
| if score_function not in self.score_functions: | |
| raise ValueError( | |
| f"score function: {score_function} must be either (cos_sim) for cosine similarity or (dot) for dot product" | |
| ) | |
| # make query embeds and corpus embeds torch tensors | |
| query_embeds = torch.from_numpy(query_embeds) | |
| corpus_embeds = torch.from_numpy(corpus_embeds) | |
| itr = range(0, len(corpus_embeds), self.corpus_chunk_size) | |
| results = defaultdict(dict) | |
| # Keep only the top-k docs for each query | |
| result_heaps = defaultdict(list) | |
| for batch_num, corpus_start_idx in enumerate(itr): | |
| logger.info("Searching Batch {}/{}...".format(batch_num + 1, len(itr))) | |
| corpus_end_idx = min( | |
| corpus_start_idx + self.corpus_chunk_size, len(corpus_ids) | |
| ) | |
| sub_corpus_embeds = corpus_embeds[corpus_start_idx:corpus_end_idx] | |
| # Compute similarites using either cosine-similarity or dot product | |
| cos_scores = self.score_functions[score_function]( | |
| query_embeds, sub_corpus_embeds | |
| ) | |
| cos_scores[torch.isnan(cos_scores)] = -1 | |
| # Get top-k values | |
| cos_scores_top_k_values, cos_scores_top_k_idx = torch.topk( | |
| cos_scores, | |
| min( | |
| top_k + 1, | |
| len(cos_scores[1]) if len(cos_scores) > 1 else len(cos_scores[-1]), | |
| ), | |
| dim=1, | |
| largest=True, | |
| sorted=return_sorted, | |
| ) | |
| cos_scores_top_k_values = cos_scores_top_k_values.cpu().tolist() | |
| cos_scores_top_k_idx = cos_scores_top_k_idx.cpu().tolist() | |
| for query_itr in range(len(query_embeds)): | |
| query_id = query_ids[query_itr] | |
| for sub_corpus_id, score in zip( | |
| cos_scores_top_k_idx[query_itr], cos_scores_top_k_values[query_itr] | |
| ): | |
| corpus_id = corpus_ids[corpus_start_idx + sub_corpus_id] | |
| if corpus_id != query_id: | |
| if len(result_heaps[query_id]) < top_k: | |
| # Push item on the heap | |
| heapq.heappush(result_heaps[query_id], (score, corpus_id)) | |
| else: | |
| # If item is larger than the smallest in the heap, push it on the heap then pop the smallest element | |
| heapq.heappushpop( | |
| result_heaps[query_id], (score, corpus_id) | |
| ) | |
| for qid in result_heaps: | |
| for score, corpus_id in result_heaps[qid]: | |
| results[qid][corpus_id] = score | |
| return results | |
| def evaluate( | |
| qrels: dict[str, dict[str, int]], | |
| results: dict[str, dict[str, float]], | |
| k_values: List[int], | |
| ignore_identical_ids: bool = True, | |
| ) -> Tuple[Dict[str, float], dict[str, float], dict[str, float], dict[str, float]]: | |
| if ignore_identical_ids: | |
| logger.info( | |
| "For evaluation, we ignore identical query and document ids (default), please explicitly set ``ignore_identical_ids=False`` to ignore this." | |
| ) | |
| popped = [] | |
| for qid, rels in results.items(): | |
| for pid in list(rels): | |
| if qid == pid: | |
| results[qid].pop(pid) | |
| popped.append(pid) | |
| all_ndcgs, all_aps, all_recalls, all_precisions = {}, {}, {}, {} | |
| for k in k_values: | |
| all_ndcgs[f"NDCG@{k}"] = [] | |
| all_aps[f"MAP@{k}"] = [] | |
| all_recalls[f"Recall@{k}"] = [] | |
| all_precisions[f"P@{k}"] = [] | |
| map_string = "map_cut." + ",".join([str(k) for k in k_values]) | |
| ndcg_string = "ndcg_cut." + ",".join([str(k) for k in k_values]) | |
| recall_string = "recall." + ",".join([str(k) for k in k_values]) | |
| precision_string = "P." + ",".join([str(k) for k in k_values]) | |
| evaluator = pytrec_eval.RelevanceEvaluator( | |
| qrels, {map_string, ndcg_string, recall_string, precision_string} | |
| ) | |
| scores = evaluator.evaluate(results) | |
| for query_id in scores.keys(): | |
| for k in k_values: | |
| all_ndcgs[f"NDCG@{k}"].append(scores[query_id]["ndcg_cut_" + str(k)]) | |
| all_aps[f"MAP@{k}"].append(scores[query_id]["map_cut_" + str(k)]) | |
| all_recalls[f"Recall@{k}"].append(scores[query_id]["recall_" + str(k)]) | |
| all_precisions[f"P@{k}"].append(scores[query_id]["P_" + str(k)]) | |
| ndcg, _map, recall, precision = ( | |
| all_ndcgs.copy(), | |
| all_aps.copy(), | |
| all_recalls.copy(), | |
| all_precisions.copy(), | |
| ) | |
| for k in k_values: | |
| ndcg[f"NDCG@{k}"] = round(sum(ndcg[f"NDCG@{k}"]) / len(scores), 5) | |
| _map[f"MAP@{k}"] = round(sum(_map[f"MAP@{k}"]) / len(scores), 5) | |
| recall[f"Recall@{k}"] = round(sum(recall[f"Recall@{k}"]) / len(scores), 5) | |
| precision[f"P@{k}"] = round(sum(precision[f"P@{k}"]) / len(scores), 5) | |
| naucs = RetrievalEvaluator.evaluate_abstention( | |
| results, {**all_ndcgs, **all_aps, **all_recalls, **all_precisions} | |
| ) | |
| return ndcg, _map, recall, precision, naucs | |
| def evaluate_abstention( | |
| results: dict[str, dict[str, float]], | |
| metric_scores: dict[str, list[float]], | |
| ) -> Dict[str, float]: | |
| """Computes normalized Area Under the Curve on a set of evaluated instances as presented in the paper https://arxiv.org/abs/2402.12997""" | |
| all_sim_scores = [list(results[qid].values()) for qid in list(results.keys())] | |
| all_conf_scores = [ | |
| confidence_scores(sim_scores) for sim_scores in all_sim_scores | |
| ] | |
| conf_fcts = list(all_conf_scores[0].keys()) | |
| all_conf_scores = { | |
| fct: np.array([x[fct] for x in all_conf_scores]) for fct in conf_fcts | |
| } | |
| metric_scores = {k: np.array(v) for k, v in metric_scores.items()} | |
| naucs = {} | |
| for metric_name, scores in metric_scores.items(): | |
| for fct, conf_scores in all_conf_scores.items(): | |
| naucs[f"nAUC_{metric_name}_{fct}"] = nAUC(conf_scores, scores) | |
| return naucs | |
| def evaluate_custom( | |
| qrels: dict[str, dict[str, int]], | |
| results: dict[str, dict[str, float]], | |
| k_values: List[int], | |
| metric: str, | |
| output_type: str = "all", | |
| ) -> Tuple[Dict[str, float]]: | |
| if metric.lower() in ["mrr", "mrr@k", "mrr_cut"]: | |
| metric_scores = mrr(qrels, results, k_values, output_type) | |
| elif metric.lower() in ["recall_cap", "r_cap", "r_cap@k"]: | |
| metric_scores = recall_cap(qrels, results, k_values, output_type) | |
| elif metric.lower() in ["hole", "hole@k"]: | |
| metric_scores = hole(qrels, results, k_values, output_type) | |
| elif metric.lower() in [ | |
| "acc", | |
| "top_k_acc", | |
| "accuracy", | |
| "accuracy@k", | |
| "top_k_accuracy", | |
| ]: | |
| metric_scores = top_k_accuracy(qrels, results, k_values, output_type) | |
| naucs = RetrievalEvaluator.evaluate_abstention(results, metric_scores) | |
| metric_scores_avg = {k: sum(v) / len(v) for k, v in metric_scores.items()} | |
| return metric_scores_avg, naucs | |