Spaces:
Runtime error
Runtime error
| import json | |
| import math | |
| import os | |
| import time | |
| from collections import defaultdict | |
| from typing import Any, Dict, List, Optional, Tuple, Union | |
| import torch | |
| from datasets import Dataset | |
| from torch import nn | |
| from transformers.debug_utils import DebugOption | |
| from transformers.deepspeed import is_deepspeed_zero3_enabled | |
| from transformers.trainer_utils import speed_metrics | |
| from transformers.utils import logging | |
| from transformers import Seq2SeqTrainer, is_torch_tpu_available | |
| import gc | |
| if is_torch_tpu_available(check_device=False): | |
| import torch_xla.core.xla_model as xm | |
| import torch_xla.debug.metrics as met | |
| from utils.decoding import decode | |
| logger = logging.get_logger(__name__) | |
| def _clean_memory(): | |
| gc.collect() | |
| torch.cuda.empty_cache() | |
| # This custom trainer is based on the trainer defined in https://github.com/huggingface/transformers/compare/main...eladsegal:public-transformers:scrolls | |
| class CustomTrainer(Seq2SeqTrainer): | |
| def __init__( | |
| self, *args, untokenized_eval_dataset=None, data_args=None, output_dir: Optional[str] = None, **kwargs | |
| ): | |
| super().__init__(*args, **kwargs) | |
| self._untokenized_eval_dataset = untokenized_eval_dataset | |
| self._max_length = data_args.val_max_target_length | |
| self._num_beams = data_args.num_beams | |
| self._output_dir = output_dir | |
| self._data_args = data_args | |
| self.mock_predictions_to_assign_zero_metric_score = self.tokenizer.encode("TOO_MANY_INPUT_TOKENS",return_tensors="np")[0] | |
| def prediction_step( | |
| self, | |
| model: nn.Module, | |
| inputs: Dict[str, Union[torch.Tensor, Any]], | |
| prediction_loss_only: bool, | |
| ignore_keys: Optional[List[str]] = None, | |
| ) -> Tuple[Optional[float], Optional[torch.Tensor], Optional[torch.Tensor]]: | |
| """ | |
| Perform an evaluation step on `model` using `inputs`. | |
| Subclass and override to inject custom behavior. | |
| Args: | |
| model (`nn.Module`): | |
| The model to evaluate. | |
| inputs (`Dict[str, Union[torch.Tensor, Any]]`): | |
| The inputs and targets of the model. | |
| The dictionary will be unpacked before being fed to the model. Most models expect the targets under the | |
| argument `labels`. Check your model's documentation for all accepted arguments. | |
| prediction_loss_only (`bool`): | |
| Whether or not to ret`urn the loss only. | |
| Return: | |
| Tuple[Optional[float], Optional[torch.Tensor], Optional[torch.Tensor]]: A tuple with the loss, logits and | |
| labels (each being optional). | |
| """ | |
| if not ("labels" in inputs or 'decoder_input_ids' in inputs): | |
| if model.training: | |
| logger.warning('When computing loss, must give labels or decoder_input_ids. ' | |
| 'If you only perform prediction, you can safely ignore this message') | |
| # This is an issue here because the input may be longer than the max-output length of the model, | |
| # and if nothing was given it will shift the input and use it to compute loss (and later discard it). | |
| # This may cause an indexing error when absolute embeddings are used (CUDA device side assert) | |
| inputs['decoder_input_ids'] = inputs['input_ids'][:,:2].clone() # dummy outputs | |
| if not self.args.predict_with_generate or prediction_loss_only: | |
| return super().prediction_step( | |
| model, inputs, prediction_loss_only=prediction_loss_only, ignore_keys=ignore_keys | |
| ) | |
| has_labels = "labels" in inputs | |
| inputs = self._prepare_inputs(inputs) | |
| # XXX: adapt synced_gpus for fairscale as well | |
| gen_kwargs = self._gen_kwargs.copy() | |
| gen_kwargs["max_length"] = ( | |
| gen_kwargs["max_length"] if gen_kwargs.get("max_length") is not None else self.model.config.max_length | |
| ) | |
| gen_kwargs["num_beams"] = ( | |
| gen_kwargs["num_beams"] if gen_kwargs.get("num_beams") is not None else self.model.config.num_beams | |
| ) | |
| default_synced_gpus = True if is_deepspeed_zero3_enabled() else False | |
| gen_kwargs["synced_gpus"] = ( | |
| gen_kwargs["synced_gpus"] if gen_kwargs.get("synced_gpus") is not None else default_synced_gpus | |
| ) | |
| if "attention_mask" in inputs: | |
| gen_kwargs["attention_mask"] = inputs.get("attention_mask", None) | |
| if "global_attention_mask" in inputs: | |
| gen_kwargs["global_attention_mask"] = inputs.get("global_attention_mask", None) | |
| # --------------------- addition compared to the source file -------------------- | |
| if 'prefix_length' in inputs: | |
| gen_kwargs['prefix_length'] = inputs['prefix_length'] | |
| _clean_memory() | |
| # ------------------------------------------------------------------------------ | |
| # prepare generation inputs | |
| # some encoder-decoder models can have varying encoder's and thus | |
| # varying model input names | |
| if hasattr(self.model, "encoder") and self.model.encoder.main_input_name != self.model.main_input_name: | |
| generation_inputs = inputs[self.model.encoder.main_input_name] | |
| else: | |
| generation_inputs = inputs[self.model.main_input_name] | |
| # Uri: to make sure we use cache even during mid-training evaluation, where this is disabled in general: | |
| gen_kwargs['use_cache'] = True | |
| generated_tokens = self.model.generate( | |
| generation_inputs, | |
| **gen_kwargs, | |
| ) | |
| # --------------------- addition compared to the source file -------------------- | |
| _clean_memory() | |
| # ------------------------------------------------------------------------------ | |
| # in case the batch is shorter than max length, the output should be padded | |
| if generated_tokens.shape[-1] < gen_kwargs["max_length"]: | |
| generated_tokens = self._pad_tensors_to_max_len(generated_tokens, gen_kwargs["max_length"]) | |
| if has_labels: # changed the order of the if's here because there is no point going through the model if there are no labels to compute the loss on.. | |
| with torch.no_grad(): | |
| with self.compute_loss_context_manager(): | |
| outputs = model(**inputs) | |
| if self.label_smoother is not None: | |
| loss = self.label_smoother(outputs, inputs["labels"]).mean().detach() | |
| else: | |
| loss = (outputs["loss"] if isinstance(outputs, dict) else outputs[0]).mean().detach() | |
| else: | |
| loss = None | |
| if self.args.prediction_loss_only: | |
| return (loss, None, None) | |
| if has_labels: | |
| labels = inputs["labels"] | |
| if labels.shape[-1] < gen_kwargs["max_length"]: | |
| labels = self._pad_tensors_to_max_len(labels, gen_kwargs["max_length"]) | |
| else: | |
| labels = None | |
| return (loss, generated_tokens, labels) | |
| def _restart_generator(self): | |
| if getattr(self, '_is_restart_generator', False): | |
| self._is_restart_generator = False | |
| return True | |
| return False | |
| def set_restart_generator(self): | |
| self._is_restart_generator = True | |
| def _get_train_sampler(self) -> Optional[torch.utils.data.Sampler]: | |
| sampler = super()._get_train_sampler() | |
| try: | |
| if self._restart_generator: | |
| sampler.generator.manual_seed(self._initial_seed) | |
| else: | |
| self._initial_seed = sampler.generator.initial_seed() | |
| except Exception as e: | |
| logger.warning(f'Cannot save or set the seed of the generator: {e}') | |
| return sampler | |
| def _post_process_function(self, untokenized_eval_dataset, predictions): | |
| id_to_prediction = {} | |
| id_to_label_ids = defaultdict(list) | |
| assert len(untokenized_eval_dataset) == len(self.eval_dataset) | |
| for i, (instance, not_valid_for_eval) in enumerate(zip(untokenized_eval_dataset, self.eval_dataset["not_valid_for_eval"])): | |
| if not_valid_for_eval: | |
| id_to_prediction[instance["id"]] = self.mock_predictions_to_assign_zero_metric_score | |
| else: | |
| id_to_prediction[instance["id"]] = predictions[i] | |
| if "outputs" in instance: | |
| id_to_label_ids[instance["id"]] = instance["outputs"] | |
| else: | |
| id_to_label_ids[instance["id"]].append(instance["output"]) | |
| return id_to_prediction, id_to_label_ids | |
| def evaluate( | |
| self, | |
| eval_dataset: Optional[Dataset] = None, | |
| ignore_keys: Optional[List[str]] = None, | |
| metric_key_prefix: str = "eval", | |
| untokenized_eval_dataset: Optional[Dataset] = None, | |
| **gen_kwargs | |
| ) -> Dict[str, float]: | |
| """ | |
| Run evaluation and returns metrics. | |
| The calling script will be responsible for providing a method to compute metrics, as they are task-dependent | |
| (pass it to the init `compute_metrics` argument). | |
| You can also subclass and override this method to inject custom behavior. | |
| Args: | |
| eval_dataset (`Dataset`, *optional*): | |
| Pass a dataset if you wish to override `self.eval_dataset`. If it is an [`~datasets.Dataset`], columns | |
| not accepted by the `model.forward()` method are automatically removed. It must implement the `__len__` | |
| method. | |
| ignore_keys (`List[str]`, *optional*): | |
| A list of keys in the output of your model (if it is a dictionary) that should be ignored when | |
| gathering predictions. | |
| metric_key_prefix (`str`, *optional*, defaults to `"eval"`): | |
| An optional prefix to be used as the metrics key prefix. For example the metrics "bleu" will be named | |
| "eval_bleu" if the prefix is `"eval"` (default) | |
| max_length (`int`, *optional*): | |
| The maximum target length to use when predicting with the generate method. | |
| num_beams (`int`, *optional*): | |
| Number of beams for beam search that will be used when predicting with the generate method. 1 means no | |
| beam search. | |
| gen_kwargs: | |
| Additional `generate` specific kwargs. | |
| Returns: | |
| A dictionary containing the evaluation loss and the potential metrics computed from the predictions. The | |
| dictionary also contains the epoch number which comes from the training state. | |
| """ | |
| gen_kwargs = gen_kwargs.copy() | |
| gen_kwargs["max_length"] = ( | |
| gen_kwargs["max_length"] if gen_kwargs.get("max_length") is not None else self.args.generation_max_length | |
| ) | |
| gen_kwargs["num_beams"] = ( | |
| gen_kwargs["num_beams"] if gen_kwargs.get("num_beams") is not None else self.args.generation_num_beams | |
| ) | |
| self._gen_kwargs = gen_kwargs | |
| self._memory_tracker.start() | |
| eval_dataloader = self.get_eval_dataloader(eval_dataset) | |
| # ----------------------------------- Added ----------------------------------- | |
| untokenized_eval_dataset = ( | |
| self._untokenized_eval_dataset if untokenized_eval_dataset is None else untokenized_eval_dataset | |
| ) | |
| compute_metrics = self.compute_metrics | |
| self.compute_metrics = None | |
| # ----------------------------------------------------------------------------- | |
| start_time = time.time() | |
| eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop | |
| try: | |
| output = eval_loop( | |
| eval_dataloader, | |
| description="Evaluation", | |
| # No point gathering the predictions if there are no metrics, otherwise we defer to | |
| # self.args.prediction_loss_only | |
| prediction_loss_only=None, # MODIFIED since we need the predictions | |
| ignore_keys=ignore_keys, | |
| metric_key_prefix=metric_key_prefix, | |
| ) | |
| finally: | |
| # ----------------------------------- Added ----------------------------------- | |
| # revert the compute metrics back | |
| self.compute_metrics = compute_metrics | |
| # ----------------------------------------------------------------------------- | |
| # ----------------------------------- Added ----------------------------------- | |
| # compute our metrics | |
| if output.predictions is not None: | |
| eval_preds = self._post_process_function(untokenized_eval_dataset, output.predictions) | |
| if self._output_dir is not None and self.is_world_process_zero(): | |
| predictions = decode(eval_preds[0], self.tokenizer, self._data_args) | |
| output_prediction_file = os.path.join( | |
| self._output_dir, f"generated_predictions_eval_{self.state.global_step}.json" | |
| ) | |
| with open(output_prediction_file, "w") as writer: | |
| json.dump(predictions, writer, indent=4) | |
| output_labels_file = os.path.join( | |
| self._output_dir, f"eval_labels.json" | |
| ) | |
| if not os.path.isfile(output_labels_file): | |
| with open(output_labels_file, "w") as writer: | |
| json.dump(eval_preds[1], writer, indent=4) | |
| if self.compute_metrics is not None: | |
| output.metrics.update(self.compute_metrics(*eval_preds)) | |
| # Prefix all keys with metric_key_prefix + '_' | |
| for key in list(output.metrics.keys()): | |
| if not key.startswith(f"{metric_key_prefix}_"): | |
| output.metrics[f"{metric_key_prefix}_{key}"] = output.metrics.pop(key) | |
| # ----------------------------------------------------------------------------- | |
| total_batch_size = self.args.eval_batch_size * self.args.world_size | |
| output.metrics.update( | |
| speed_metrics( | |
| metric_key_prefix, | |
| start_time, | |
| num_samples=output.num_samples, | |
| num_steps=math.ceil(output.num_samples / total_batch_size), | |
| ) | |
| ) | |
| self.log(output.metrics) | |
| if DebugOption.TPU_METRICS_DEBUG in self.args.debug: | |
| # tpu-comment: Logging debug metrics for PyTorch/XLA (compile, execute times, ops, etc.) | |
| xm.master_print(met.metrics_report()) | |
| self.control = self.callback_handler.on_evaluate(self.args, self.state, self.control, output.metrics) | |
| self._memory_tracker.stop_and_update_metrics(output.metrics) | |
| return output.metrics | |