Spaces:
Sleeping
Sleeping
| """ | |
| score functions from: https://hearbenchmark.com/hear-tasks.html | |
| """ | |
| import json | |
| from collections import ChainMap | |
| from pathlib import Path | |
| from typing import Dict, Optional, Tuple, Union, List, Any | |
| import more_itertools | |
| import numpy as np | |
| import sed_eval | |
| import torch | |
| from dcase_util.containers import MetaDataContainer | |
| from scipy.ndimage import median_filter | |
| from sklearn.model_selection import ParameterGrid | |
| from tqdm import tqdm | |
| def validate_score_return_type(ret: Union[Tuple[Tuple[str, float], ...], float]): | |
| """ | |
| Valid return types for the metric are | |
| - tuple(tuple(string: name of the subtype, float: the value)): This is the | |
| case with sed eval metrics. They can return (("f_measure", value), | |
| ("precision", value), ...), depending on the scores | |
| the metric should is supposed to return. This is set as `scores` | |
| attribute in the metric. | |
| - float: Standard metric behaviour | |
| The downstream prediction pipeline is able to handle these two types. | |
| In case of the tuple return type, the value of the first entry in the | |
| tuple will be used as an optimisation criterion wherever required. | |
| For instance, if the return is (("f_measure", value), ("precision", value)), | |
| the value corresponding to the f_measure will be used ( for instance in | |
| early stopping if this metric is the primary score for the task ) | |
| """ | |
| if isinstance(ret, tuple): | |
| assert all( | |
| type(s) == tuple and type(s[0]) == str and type(s[1]) == float for s in ret | |
| ), ( | |
| "If the return type of the score is a tuple, all the elements " | |
| "in the tuple should be tuple of type (string, float)" | |
| ) | |
| elif isinstance(ret, float): | |
| pass | |
| else: | |
| raise ValueError( | |
| f"Return type {type(ret)} is unexpected. Return type of " | |
| "the score function should either be a " | |
| "tuple(tuple) or float. " | |
| ) | |
| class ScoreFunction: | |
| """ | |
| A simple abstract base class for score functions | |
| """ | |
| # TODO: Remove label_to_idx? | |
| def __init__( | |
| self, | |
| label_to_idx: Dict[str, int], | |
| name: Optional[str] = None, | |
| maximize: bool = True, | |
| ): | |
| """ | |
| :param label_to_idx: Map from label string to integer index. | |
| :param name: Override the name of this scoring function. | |
| :param maximize: Maximize this score? (Otherwise, it's a loss or energy | |
| we want to minimize, and I guess technically isn't a score.) | |
| """ | |
| self.label_to_idx = label_to_idx | |
| if name: | |
| self.name = name | |
| self.maximize = maximize | |
| def __call__(self, *args, **kwargs) -> Union[Tuple[Tuple[str, float], ...], float]: | |
| """ | |
| Calls the compute function of the metric, and after validating the output, | |
| returns the metric score | |
| """ | |
| ret = self._compute(*args, **kwargs) | |
| validate_score_return_type(ret) | |
| return ret | |
| def _compute( | |
| self, predictions: Any, targets: Any, **kwargs | |
| ) -> Union[Tuple[Tuple[str, float], ...], float]: | |
| """ | |
| Compute the score based on the predictions and targets. | |
| This is a private function and the metric should be used as a functor | |
| by calling the `__call__` method which calls this and also validates | |
| the return type | |
| """ | |
| raise NotImplementedError("Inheriting classes must implement this function") | |
| def __str__(self): | |
| return self.name | |
| class SoundEventScore(ScoreFunction): | |
| """ | |
| Scores for sound event detection tasks using sed_eval | |
| """ | |
| # Score class must be defined in inheriting classes | |
| score_class: sed_eval.sound_event.SoundEventMetrics = None | |
| def __init__( | |
| self, | |
| label_to_idx: Dict[str, int], | |
| scores: Tuple[str], | |
| params: Dict = None, | |
| name: Optional[str] = None, | |
| maximize: bool = True, | |
| ): | |
| """ | |
| :param scores: Scores to use, from the list of overall SED eval scores. | |
| The first score in the tuple will be the primary score for this metric | |
| :param params: Parameters to pass to the scoring function, | |
| see inheriting children for details. | |
| """ | |
| if params is None: | |
| params = {} | |
| super().__init__(label_to_idx=label_to_idx, name=name, maximize=maximize) | |
| self.scores = scores | |
| self.params = params | |
| assert self.score_class is not None | |
| def _compute( | |
| self, predictions: Dict, targets: Dict, **kwargs | |
| ) -> Tuple[Tuple[str, float], ...]: | |
| # Containers of events for sed_eval | |
| reference_event_list = self.sed_eval_event_container(targets) | |
| estimated_event_list = self.sed_eval_event_container(predictions) | |
| # This will break in Python < 3.6 if the dict order is not | |
| # the insertion order I think. I'm a little worried about this line | |
| scores = self.score_class( | |
| event_label_list=list(self.label_to_idx.keys()), **self.params | |
| ) | |
| for filename in predictions: | |
| scores.evaluate( | |
| reference_event_list=reference_event_list.filter(filename=filename), | |
| estimated_event_list=estimated_event_list.filter(filename=filename), | |
| ) | |
| # results_overall_metrics return a pretty large nested selection of scores, | |
| # with dicts of scores keyed on the type of scores, like f_measure, error_rate, | |
| # accuracy | |
| nested_overall_scores: Dict[ | |
| str, Dict[str, float] | |
| ] = scores.results_overall_metrics() | |
| # Open up nested overall scores | |
| overall_scores: Dict[str, float] = dict( | |
| ChainMap(*nested_overall_scores.values()) | |
| ) | |
| # Return the required scores as tuples. The scores are returned in the | |
| # order they are passed in the `scores` argument | |
| return tuple([(score, overall_scores[score]) for score in self.scores]) | |
| def sed_eval_event_container( | |
| x: Dict[str, List[Dict[str, Any]]] | |
| ) -> MetaDataContainer: | |
| # Reformat event list for sed_eval | |
| reference_events = [] | |
| for filename, event_list in x.items(): | |
| for event in event_list: | |
| reference_events.append( | |
| { | |
| # Convert from ms to seconds for sed_eval | |
| "event_label": str(event["label"]), | |
| "event_onset": event["start"] / 1000.0, | |
| "event_offset": event["end"] / 1000.0, | |
| "file": filename, | |
| } | |
| ) | |
| return MetaDataContainer(reference_events) | |
| class EventBasedScore(SoundEventScore): | |
| """ | |
| event-based scores - the ground truth and system output are compared at | |
| event instance level; | |
| See https://tut-arg.github.io/sed_eval/generated/sed_eval.sound_event.EventBasedMetrics.html # noqa: E501 | |
| for params. | |
| """ | |
| score_class = sed_eval.sound_event.EventBasedMetrics | |
| class SegmentBasedScore(SoundEventScore): | |
| """ | |
| segment-based scores - the ground truth and system output are compared in a | |
| fixed time grid; sound events are marked as active or inactive in each segment; | |
| See https://tut-arg.github.io/sed_eval/sound_event.html#sed_eval.sound_event.SegmentBasedMetrics # noqa: E501 | |
| for params. | |
| """ | |
| score_class = sed_eval.sound_event.SegmentBasedMetrics | |
| def get_events_for_all_files( | |
| predictions: torch.Tensor, | |
| filenames: List[str], | |
| timestamps: torch.Tensor, | |
| idx_to_label: Dict[int, str], | |
| postprocessing_grid: Dict[str, List[float]], | |
| postprocessing: Optional[Tuple[Tuple[str, Any], ...]] = None, | |
| ) -> Dict[Tuple[Tuple[str, Any], ...], Dict[str, List[Dict[str, Union[str, float]]]]]: | |
| """ | |
| Produces lists of events from a set of frame based label probabilities. | |
| The input prediction tensor may contain frame predictions from a set of different | |
| files concatenated together. file_timestamps has a list of filenames and | |
| timestamps for each frame in the predictions tensor. | |
| We split the predictions into separate tensors based on the filename and compute | |
| events based on those individually. | |
| If no postprocessing is specified (during training), we try a | |
| variety of ways of postprocessing the predictions into events, | |
| from the postprocessing_grid including median filtering and | |
| minimum event length. | |
| If postprocessing is specified (during test, chosen at the best | |
| validation epoch), we use this postprocessing. | |
| Args: | |
| predictions: a tensor of frame based multi-label predictions. | |
| filenames: a list of filenames where each entry corresponds | |
| to a frame in the predictions tensor. | |
| timestamps: a list of timestamps where each entry corresponds | |
| to a frame in the predictions tensor. | |
| idx_to_label: Index to label mapping. | |
| postprocessing: See above. | |
| Returns: | |
| A dictionary from filtering params to the following values: | |
| A dictionary of lists of events keyed on the filename slug. | |
| The event list is of dicts of the following format: | |
| {"label": str, "start": float ms, "end": float ms} | |
| """ | |
| # This probably could be more efficient if we make the assumption that | |
| # timestamps are in sorted order. But this makes sure of it. | |
| assert predictions.shape[0] == len(filenames) | |
| assert predictions.shape[0] == len(timestamps) | |
| event_files: Dict[str, Dict[float, torch.Tensor]] = {} | |
| for i, (filename, timestamp) in enumerate(zip(filenames, timestamps)): | |
| slug = Path(filename).name | |
| # Key on the slug to be consistent with the ground truth | |
| if slug not in event_files: | |
| event_files[slug] = {} | |
| # Save the predictions for the file keyed on the timestamp | |
| event_files[slug][float(timestamp)] = predictions[i] | |
| # Create events for all the different files. Store all the events as a dictionary | |
| # with the same format as the ground truth from the luigi pipeline. | |
| # Ex) { slug -> [{"label" : "woof", "start": 0.0, "end": 2.32}, ...], ...} | |
| event_dict: Dict[ | |
| Tuple[Tuple[str, Any], ...], Dict[str, List[Dict[str, Union[float, str]]]] | |
| ] = {} | |
| if postprocessing: | |
| postprocess = postprocessing | |
| event_dict[postprocess] = {} | |
| for slug, timestamp_predictions in event_files.items(): | |
| event_dict[postprocess][slug] = create_events_from_prediction( | |
| timestamp_predictions, idx_to_label, **dict(postprocess) | |
| ) | |
| else: | |
| postprocessing_confs = list(ParameterGrid(postprocessing_grid)) | |
| for postprocess_dict in tqdm(postprocessing_confs): | |
| postprocess = tuple(postprocess_dict.items()) | |
| event_dict[postprocess] = {} | |
| for slug, timestamp_predictions in event_files.items(): | |
| event_dict[postprocess][slug] = create_events_from_prediction( | |
| timestamp_predictions, idx_to_label, **postprocess_dict | |
| ) | |
| return event_dict | |
| def create_events_from_prediction( | |
| prediction_dict: Dict[float, torch.Tensor], | |
| idx_to_label: Dict[int, str], | |
| threshold: float = 0.5, | |
| median_filter_ms: float = 150, | |
| min_duration: float = 60.0, | |
| ) -> List[Dict[str, Union[float, str]]]: | |
| """ | |
| Takes a set of prediction tensors keyed on timestamps and generates events. | |
| (This is for one particular audio scene.) | |
| We convert the prediction tensor to a binary label based on the threshold value. Any | |
| events occurring at adjacent timestamps are considered to be part of the same event. | |
| This loops through and creates events for each label class. | |
| We optionally apply median filtering to predictions. | |
| We disregard events that are less than the min_duration milliseconds. | |
| Args: | |
| prediction_dict: A dictionary of predictions keyed on timestamp | |
| {timestamp -> prediction}. The prediction is a tensor of label | |
| probabilities. | |
| idx_to_label: Index to label mapping. | |
| threshold: Threshold for determining whether to apply a label | |
| min_duration: the minimum duration in milliseconds for an | |
| event to be included. | |
| Returns: | |
| A list of dicts withs keys "label", "start", and "end" | |
| """ | |
| # Make sure the timestamps are in the correct order | |
| timestamps = np.array(sorted(prediction_dict.keys())) | |
| # Create a sorted numpy matrix of frame level predictions for this file. We convert | |
| # to a numpy array here before applying a median filter. | |
| predictions = np.stack( | |
| [prediction_dict[t].detach().cpu().numpy() for t in timestamps] | |
| ) | |
| # Optionally apply a median filter here to smooth out events. | |
| ts_diff = np.mean(np.diff(timestamps)) | |
| if median_filter_ms: | |
| filter_width = int(round(median_filter_ms / ts_diff)) | |
| if filter_width: | |
| predictions = median_filter(predictions, size=(filter_width, 1)) | |
| # Convert probabilities to binary vectors based on threshold | |
| predictions = (predictions > threshold).astype(np.int8) | |
| events = [] | |
| for label in range(predictions.shape[1]): | |
| for group in more_itertools.consecutive_groups( | |
| np.where(predictions[:, label])[0] | |
| ): | |
| grouptuple = tuple(group) | |
| assert ( | |
| tuple(sorted(grouptuple)) == grouptuple | |
| ), f"{sorted(grouptuple)} != {grouptuple}" | |
| startidx, endidx = (grouptuple[0], grouptuple[-1]) | |
| start = timestamps[startidx] | |
| end = timestamps[endidx] | |
| # Add event if greater than the minimum duration threshold | |
| if end - start >= min_duration: | |
| events.append( | |
| {"label": idx_to_label[label], "start": start, "end": end} | |
| ) | |
| # This is just for pretty output, not really necessary | |
| events.sort(key=lambda k: k["start"]) | |
| return events | |
| def combine_target_events(split_names: List[str], task_path): | |
| """ | |
| This combines the target events from the list of splits and | |
| returns the combined target events. This is useful when combining | |
| multiple folds of data to create the training or validation | |
| dataloader. For example, in k-fold, the training data-loader | |
| might be made from the first 4/5 folds, and calling this function | |
| with [fold00, fold01, fold02, fold03] will return the | |
| aggregated target events across all the folds | |
| """ | |
| combined_target_events: Dict = {} | |
| for split_name in split_names: | |
| target_events = json.load( | |
| task_path.joinpath(f"{split_name}.json").open() | |
| ) | |
| common_keys = set(combined_target_events.keys()).intersection( | |
| target_events.keys() | |
| ) | |
| assert len(common_keys) == 0, ( | |
| "Target events from one split should not override " | |
| "target events from another. This is very unlikely as the " | |
| "target_event is keyed on the files which are distinct for " | |
| "each split" | |
| ) | |
| combined_target_events.update(target_events) | |
| return combined_target_events | |