Spaces:
Sleeping
Sleeping
| import collections | |
| import os | |
| import sys | |
| import numpy as np | |
| import torch | |
| import matplotlib.pyplot as plt | |
| import pandas as pd | |
| from torch.utils.data._utils.collate import np_str_obj_array_pattern, default_collate_err_msg_format | |
| plt.switch_backend('agg') | |
| def adjust_learning_rate(optimizer, epoch, args): | |
| # lr = args.learning_rate * (0.2 ** (epoch // 2)) | |
| if args.lradj == 'type1': | |
| lr_adjust = {epoch: args.learning_rate * (0.5 ** ((epoch - 1) // 1))} | |
| elif args.lradj == 'type2': | |
| lr_adjust = { | |
| 2: 5e-5, 4: 1e-5, 6: 5e-6, 8: 1e-6, | |
| 10: 5e-7, 15: 1e-7, 20: 5e-8 | |
| } | |
| if epoch in lr_adjust.keys(): | |
| lr = lr_adjust[epoch] | |
| for param_group in optimizer.param_groups: | |
| param_group['lr'] = lr | |
| print('Updating learning rate to {}'.format(lr)) | |
| class EarlyStopping: | |
| def __init__(self, patience=7, verbose=False, delta=0): | |
| self.patience = patience | |
| self.verbose = verbose | |
| self.counter = 0 | |
| self.best_score = None | |
| self.early_stop = False | |
| self.val_loss_min = np.Inf | |
| self.delta = delta | |
| def __call__(self, val_loss, model, path): | |
| score = -val_loss | |
| if self.best_score is None: | |
| self.best_score = score | |
| self.save_checkpoint(val_loss, model, path) | |
| elif score < self.best_score + self.delta: | |
| self.counter += 1 | |
| print(f'EarlyStopping counter: {self.counter} out of {self.patience}') | |
| if self.counter >= self.patience: | |
| self.early_stop = True | |
| else: | |
| self.best_score = score | |
| self.save_checkpoint(val_loss, model, path) | |
| self.counter = 0 | |
| def save_checkpoint(self, val_loss, model, path): | |
| if self.verbose: | |
| print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...') | |
| torch.save(model.state_dict(), path + '/' + 'checkpoint.pth') | |
| self.val_loss_min = val_loss | |
| class dotdict(dict): | |
| """dot.notation access to dictionary attributes""" | |
| __getattr__ = dict.get | |
| __setattr__ = dict.__setitem__ | |
| __delattr__ = dict.__delitem__ | |
| class StandardScaler(): | |
| def __init__(self, mean, std): | |
| self.mean = mean | |
| self.std = std | |
| def transform(self, data): | |
| return (data - self.mean) / self.std | |
| def inverse_transform(self, data): | |
| return (data * self.std) + self.mean | |
| def visual(true, preds=None, name='./pic/test.pdf'): | |
| """ | |
| Results visualization | |
| """ | |
| plt.figure() | |
| plt.plot(true, label='GroundTruth', linewidth=2) | |
| if preds is not None: | |
| plt.plot(preds, label='Prediction', linewidth=2) | |
| plt.legend() | |
| plt.savefig(name, bbox_inches='tight') | |
| def adjustment(gt, pred): | |
| anomaly_state = False | |
| for i in range(len(gt)): | |
| if gt[i] == 1 and pred[i] == 1 and not anomaly_state: | |
| anomaly_state = True | |
| for j in range(i, 0, -1): | |
| if gt[j] == 0: | |
| break | |
| else: | |
| if pred[j] == 0: | |
| pred[j] = 1 | |
| for j in range(i, len(gt)): | |
| if gt[j] == 0: | |
| break | |
| else: | |
| if pred[j] == 0: | |
| pred[j] = 1 | |
| elif gt[i] == 0: | |
| anomaly_state = False | |
| if anomaly_state: | |
| pred[i] = 1 | |
| return gt, pred | |
| def cal_accuracy(y_pred, y_true): | |
| return np.mean(y_pred == y_true) | |
| def custom_collate(batch): | |
| r"""source: pytorch 1.9.0, only one modification to original code """ | |
| elem = batch[0] | |
| elem_type = type(elem) | |
| if isinstance(elem, torch.Tensor): | |
| out = None | |
| if torch.utils.data.get_worker_info() is not None: | |
| # If we're in a background process, concatenate directly into a | |
| # shared memory tensor to avoid an extra copy | |
| numel = sum([x.numel() for x in batch]) | |
| storage = elem.storage()._new_shared(numel) | |
| out = elem.new(storage) | |
| return torch.stack(batch, 0, out=out) | |
| elif elem_type.__module__ == 'numpy' and elem_type.__name__ != 'str_' \ | |
| and elem_type.__name__ != 'string_': | |
| if elem_type.__name__ == 'ndarray' or elem_type.__name__ == 'memmap': | |
| # array of string classes and object | |
| if np_str_obj_array_pattern.search(elem.dtype.str) is not None: | |
| raise TypeError(default_collate_err_msg_format.format(elem.dtype)) | |
| return custom_collate([torch.as_tensor(b) for b in batch]) | |
| elif elem.shape == (): # scalars | |
| return torch.as_tensor(batch) | |
| elif isinstance(elem, float): | |
| return torch.tensor(batch, dtype=torch.float64) | |
| elif isinstance(elem, int): | |
| return torch.tensor(batch) | |
| elif isinstance(elem, str): | |
| return batch | |
| elif isinstance(elem, collections.abc.Mapping): | |
| return {key: custom_collate([d[key] for d in batch]) for key in elem} | |
| elif isinstance(elem, tuple) and hasattr(elem, '_fields'): # namedtuple | |
| return elem_type(*(custom_collate(samples) for samples in zip(*batch))) | |
| elif isinstance(elem, collections.abc.Sequence): | |
| # check to make sure that the elements in batch have consistent size | |
| it = iter(batch) | |
| elem_size = len(next(it)) | |
| if not all(len(elem) == elem_size for elem in it): | |
| raise RuntimeError('each element in list of batch should be of equal size') | |
| transposed = zip(*batch) | |
| return [custom_collate(samples) for samples in transposed] | |
| raise TypeError(default_collate_err_msg_format.format(elem_type)) | |
| class HiddenPrints: | |
| def __init__(self, rank): | |
| # 如果rank是none,那么就是单机单卡,不需要隐藏打印,将rank设置为0 | |
| if rank is None: | |
| rank = 0 | |
| self.rank = rank | |
| def __enter__(self): | |
| if self.rank == 0: | |
| return | |
| self._original_stdout = sys.stdout | |
| sys.stdout = open(os.devnull, 'w') | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| if self.rank == 0: | |
| return | |
| sys.stdout.close() | |
| sys.stdout = self._original_stdout | |