Spaces:
Sleeping
Sleeping
| import numpy as np | |
| from tqdm import tqdm | |
| def jitter(x, sigma=0.03): | |
| # https://arxiv.org/pdf/1706.00527.pdf | |
| return x + np.random.normal(loc=0., scale=sigma, size=x.shape) | |
| def scaling(x, sigma=0.1): | |
| # https://arxiv.org/pdf/1706.00527.pdf | |
| factor = np.random.normal(loc=1., scale=sigma, size=(x.shape[0], x.shape[2])) | |
| return np.multiply(x, factor[:, np.newaxis, :]) | |
| def rotation(x): | |
| x = np.array(x) | |
| flip = np.random.choice([-1, 1], size=(x.shape[0], x.shape[2])) | |
| rotate_axis = np.arange(x.shape[2]) | |
| np.random.shuffle(rotate_axis) | |
| return flip[:, np.newaxis, :] * x[:, :, rotate_axis] | |
| def permutation(x, max_segments=5, seg_mode="equal"): | |
| orig_steps = np.arange(x.shape[1]) | |
| num_segs = np.random.randint(1, max_segments, size=(x.shape[0])) | |
| ret = np.zeros_like(x) | |
| for i, pat in enumerate(x): | |
| if num_segs[i] > 1: | |
| if seg_mode == "random": | |
| split_points = np.random.choice(x.shape[1] - 2, num_segs[i] - 1, replace=False) | |
| split_points.sort() | |
| splits = np.split(orig_steps, split_points) | |
| else: | |
| splits = np.array_split(orig_steps, num_segs[i]) | |
| warp = np.concatenate(np.random.permutation(splits)).ravel() | |
| # ? Question: What is the point of making segments? | |
| # for i in range(len(splits)): | |
| # permute = np.random.permutation(splits[i]) | |
| ret[i] = pat[warp] | |
| else: | |
| ret[i] = pat | |
| return ret | |
| def magnitude_warp(x, sigma=0.2, knot=4): | |
| from scipy.interpolate import CubicSpline | |
| orig_steps = np.arange(x.shape[1]) | |
| random_warps = np.random.normal(loc=1.0, scale=sigma, size=(x.shape[0], knot + 2, x.shape[2])) | |
| warp_steps = (np.ones((x.shape[2], 1)) * (np.linspace(0, x.shape[1] - 1., num=knot + 2))).T | |
| ret = np.zeros_like(x) | |
| for i, pat in enumerate(x): | |
| warper = np.array( | |
| [CubicSpline(warp_steps[:, dim], random_warps[i, :, dim])(orig_steps) for dim in range(x.shape[2])]).T | |
| ret[i] = pat * warper | |
| return ret | |
| def time_warp(x, sigma=0.2, knot=4): | |
| from scipy.interpolate import CubicSpline | |
| orig_steps = np.arange(x.shape[1]) | |
| random_warps = np.random.normal(loc=1.0, scale=sigma, size=(x.shape[0], knot + 2, x.shape[2])) | |
| warp_steps = (np.ones((x.shape[2], 1)) * (np.linspace(0, x.shape[1] - 1., num=knot + 2))).T | |
| ret = np.zeros_like(x) | |
| for i, pat in enumerate(x): | |
| for dim in range(x.shape[2]): | |
| time_warp = CubicSpline(warp_steps[:, dim], warp_steps[:, dim] * random_warps[i, :, dim])(orig_steps) | |
| scale = (x.shape[1] - 1) / time_warp[-1] | |
| ret[i, :, dim] = np.interp(orig_steps, np.clip(scale * time_warp, 0, x.shape[1] - 1), pat[:, dim]).T | |
| return ret | |
| def window_slice(x, reduce_ratio=0.9): | |
| # https://halshs.archives-ouvertes.fr/halshs-01357973/document | |
| target_len = np.ceil(reduce_ratio * x.shape[1]).astype(int) | |
| if target_len >= x.shape[1]: | |
| return x | |
| starts = np.random.randint(low=0, high=x.shape[1] - target_len, size=(x.shape[0])).astype(int) | |
| ends = (target_len + starts).astype(int) | |
| ret = np.zeros_like(x) | |
| for i, pat in enumerate(x): | |
| for dim in range(x.shape[2]): | |
| ret[i, :, dim] = np.interp(np.linspace(0, target_len, num=x.shape[1]), np.arange(target_len), | |
| pat[starts[i]:ends[i], dim]).T | |
| return ret | |
| def window_warp(x, window_ratio=0.1, scales=[0.5, 2.]): | |
| # https://halshs.archives-ouvertes.fr/halshs-01357973/document | |
| warp_scales = np.random.choice(scales, x.shape[0]) | |
| warp_size = np.ceil(window_ratio * x.shape[1]).astype(int) | |
| window_steps = np.arange(warp_size) | |
| window_starts = np.random.randint(low=1, high=x.shape[1] - warp_size - 1, size=(x.shape[0])).astype(int) | |
| window_ends = (window_starts + warp_size).astype(int) | |
| ret = np.zeros_like(x) | |
| for i, pat in enumerate(x): | |
| for dim in range(x.shape[2]): | |
| start_seg = pat[:window_starts[i], dim] | |
| window_seg = np.interp(np.linspace(0, warp_size - 1, num=int(warp_size * warp_scales[i])), window_steps, | |
| pat[window_starts[i]:window_ends[i], dim]) | |
| end_seg = pat[window_ends[i]:, dim] | |
| warped = np.concatenate((start_seg, window_seg, end_seg)) | |
| ret[i, :, dim] = np.interp(np.arange(x.shape[1]), np.linspace(0, x.shape[1] - 1., num=warped.size), | |
| warped).T | |
| return ret | |
| def spawner(x, labels, sigma=0.05, verbose=0): | |
| # https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6983028/ | |
| # use verbose=-1 to turn off warnings | |
| # use verbose=1 to print out figures | |
| import utils.dtw as dtw | |
| random_points = np.random.randint(low=1, high=x.shape[1] - 1, size=x.shape[0]) | |
| window = np.ceil(x.shape[1] / 10.).astype(int) | |
| orig_steps = np.arange(x.shape[1]) | |
| l = np.argmax(labels, axis=1) if labels.ndim > 1 else labels | |
| ret = np.zeros_like(x) | |
| # for i, pat in enumerate(tqdm(x)): | |
| for i, pat in enumerate(x): | |
| # guarentees that same one isnt selected | |
| choices = np.delete(np.arange(x.shape[0]), i) | |
| # remove ones of different classes | |
| choices = np.where(l[choices] == l[i])[0] | |
| if choices.size > 0: | |
| random_sample = x[np.random.choice(choices)] | |
| # SPAWNER splits the path into two randomly | |
| path1 = dtw.dtw(pat[:random_points[i]], random_sample[:random_points[i]], dtw.RETURN_PATH, | |
| slope_constraint="symmetric", window=window) | |
| path2 = dtw.dtw(pat[random_points[i]:], random_sample[random_points[i]:], dtw.RETURN_PATH, | |
| slope_constraint="symmetric", window=window) | |
| combined = np.concatenate((np.vstack(path1), np.vstack(path2 + random_points[i])), axis=1) | |
| if verbose: | |
| # print(random_points[i]) | |
| dtw_value, cost, DTW_map, path = dtw.dtw(pat, random_sample, return_flag=dtw.RETURN_ALL, | |
| slope_constraint=slope_constraint, window=window) | |
| dtw.draw_graph1d(cost, DTW_map, path, pat, random_sample) | |
| dtw.draw_graph1d(cost, DTW_map, combined, pat, random_sample) | |
| mean = np.mean([pat[combined[0]], random_sample[combined[1]]], axis=0) | |
| for dim in range(x.shape[2]): | |
| ret[i, :, dim] = np.interp(orig_steps, np.linspace(0, x.shape[1] - 1., num=mean.shape[0]), | |
| mean[:, dim]).T | |
| else: | |
| # if verbose > -1: | |
| # print("There is only one pattern of class {}, skipping pattern average".format(l[i])) | |
| ret[i, :] = pat | |
| return jitter(ret, sigma=sigma) | |
| def wdba(x, labels, batch_size=6, slope_constraint="symmetric", use_window=True, verbose=0): | |
| # https://ieeexplore.ieee.org/document/8215569 | |
| # use verbose = -1 to turn off warnings | |
| # slope_constraint is for DTW. "symmetric" or "asymmetric" | |
| x = np.array(x) | |
| import utils.dtw as dtw | |
| if use_window: | |
| window = np.ceil(x.shape[1] / 10.).astype(int) | |
| else: | |
| window = None | |
| orig_steps = np.arange(x.shape[1]) | |
| l = np.argmax(labels, axis=1) if labels.ndim > 1 else labels | |
| ret = np.zeros_like(x) | |
| # for i in tqdm(range(ret.shape[0])): | |
| for i in range(ret.shape[0]): | |
| # get the same class as i | |
| choices = np.where(l == l[i])[0] | |
| if choices.size > 0: | |
| # pick random intra-class pattern | |
| k = min(choices.size, batch_size) | |
| random_prototypes = x[np.random.choice(choices, k, replace=False)] | |
| # calculate dtw between all | |
| dtw_matrix = np.zeros((k, k)) | |
| for p, prototype in enumerate(random_prototypes): | |
| for s, sample in enumerate(random_prototypes): | |
| if p == s: | |
| dtw_matrix[p, s] = 0. | |
| else: | |
| dtw_matrix[p, s] = dtw.dtw(prototype, sample, dtw.RETURN_VALUE, | |
| slope_constraint=slope_constraint, window=window) | |
| # get medoid | |
| medoid_id = np.argsort(np.sum(dtw_matrix, axis=1))[0] | |
| nearest_order = np.argsort(dtw_matrix[medoid_id]) | |
| medoid_pattern = random_prototypes[medoid_id] | |
| # start weighted DBA | |
| average_pattern = np.zeros_like(medoid_pattern) | |
| weighted_sums = np.zeros((medoid_pattern.shape[0])) | |
| for nid in nearest_order: | |
| if nid == medoid_id or dtw_matrix[medoid_id, nearest_order[1]] == 0.: | |
| average_pattern += medoid_pattern | |
| weighted_sums += np.ones_like(weighted_sums) | |
| else: | |
| path = dtw.dtw(medoid_pattern, random_prototypes[nid], dtw.RETURN_PATH, | |
| slope_constraint=slope_constraint, window=window) | |
| dtw_value = dtw_matrix[medoid_id, nid] | |
| warped = random_prototypes[nid, path[1]] | |
| weight = np.exp(np.log(0.5) * dtw_value / dtw_matrix[medoid_id, nearest_order[1]]) | |
| average_pattern[path[0]] += weight * warped | |
| weighted_sums[path[0]] += weight | |
| ret[i, :] = average_pattern / weighted_sums[:, np.newaxis] | |
| else: | |
| # if verbose > -1: | |
| # print("There is only one pattern of class {}, skipping pattern average".format(l[i])) | |
| ret[i, :] = x[i] | |
| return ret | |
| # Proposed | |
| def random_guided_warp(x, labels, slope_constraint="symmetric", use_window=True, dtw_type="normal", verbose=0): | |
| # use verbose = -1 to turn off warnings | |
| # slope_constraint is for DTW. "symmetric" or "asymmetric" | |
| # dtw_type is for shapeDTW or DTW. "normal" or "shape" | |
| import utils.dtw as dtw | |
| if use_window: | |
| window = np.ceil(x.shape[1] / 10.).astype(int) | |
| else: | |
| window = None | |
| orig_steps = np.arange(x.shape[1]) | |
| l = np.argmax(labels, axis=1) if labels.ndim > 1 else labels | |
| ret = np.zeros_like(x) | |
| # for i, pat in enumerate(tqdm(x)): | |
| for i, pat in enumerate(x): | |
| # guarentees that same one isnt selected | |
| choices = np.delete(np.arange(x.shape[0]), i) | |
| # remove ones of different classes | |
| choices = np.where(l[choices] == l[i])[0] | |
| if choices.size > 0: | |
| # pick random intra-class pattern | |
| random_prototype = x[np.random.choice(choices)] | |
| if dtw_type == "shape": | |
| path = dtw.shape_dtw(random_prototype, pat, dtw.RETURN_PATH, slope_constraint=slope_constraint, | |
| window=window) | |
| else: | |
| path = dtw.dtw(random_prototype, pat, dtw.RETURN_PATH, slope_constraint=slope_constraint, window=window) | |
| # Time warp | |
| warped = pat[path[1]] | |
| for dim in range(x.shape[2]): | |
| ret[i, :, dim] = np.interp(orig_steps, np.linspace(0, x.shape[1] - 1., num=warped.shape[0]), | |
| warped[:, dim]).T | |
| else: | |
| # if verbose > -1: | |
| # print("There is only one pattern of class {}, skipping timewarping".format(l[i])) | |
| ret[i, :] = pat | |
| return ret | |
| def random_guided_warp_shape(x, labels, slope_constraint="symmetric", use_window=True): | |
| return random_guided_warp(x, labels, slope_constraint, use_window, dtw_type="shape") | |
| def discriminative_guided_warp(x, labels, batch_size=6, slope_constraint="symmetric", use_window=True, | |
| dtw_type="normal", use_variable_slice=True, verbose=0): | |
| # use verbose = -1 to turn off warnings | |
| # slope_constraint is for DTW. "symmetric" or "asymmetric" | |
| # dtw_type is for shapeDTW or DTW. "normal" or "shape" | |
| import utils.dtw as dtw | |
| if use_window: | |
| window = np.ceil(x.shape[1] / 10.).astype(int) | |
| else: | |
| window = None | |
| orig_steps = np.arange(x.shape[1]) | |
| l = np.argmax(labels, axis=1) if labels.ndim > 1 else labels | |
| positive_batch = np.ceil(batch_size / 2).astype(int) | |
| negative_batch = np.floor(batch_size / 2).astype(int) | |
| ret = np.zeros_like(x) | |
| warp_amount = np.zeros(x.shape[0]) | |
| # for i, pat in enumerate(tqdm(x)): | |
| for i, pat in enumerate(x): | |
| # guarentees that same one isnt selected | |
| choices = np.delete(np.arange(x.shape[0]), i) | |
| # remove ones of different classes | |
| positive = np.where(l[choices] == l[i])[0] | |
| negative = np.where(l[choices] != l[i])[0] | |
| if positive.size > 0 and negative.size > 0: | |
| pos_k = min(positive.size, positive_batch) | |
| neg_k = min(negative.size, negative_batch) | |
| positive_prototypes = x[np.random.choice(positive, pos_k, replace=False)] | |
| negative_prototypes = x[np.random.choice(negative, neg_k, replace=False)] | |
| # vector embedding and nearest prototype in one | |
| pos_aves = np.zeros((pos_k)) | |
| neg_aves = np.zeros((pos_k)) | |
| if dtw_type == "shape": | |
| for p, pos_prot in enumerate(positive_prototypes): | |
| for ps, pos_samp in enumerate(positive_prototypes): | |
| if p != ps: | |
| pos_aves[p] += (1. / (pos_k - 1.)) * dtw.shape_dtw(pos_prot, pos_samp, dtw.RETURN_VALUE, | |
| slope_constraint=slope_constraint, | |
| window=window) | |
| for ns, neg_samp in enumerate(negative_prototypes): | |
| neg_aves[p] += (1. / neg_k) * dtw.shape_dtw(pos_prot, neg_samp, dtw.RETURN_VALUE, | |
| slope_constraint=slope_constraint, window=window) | |
| selected_id = np.argmax(neg_aves - pos_aves) | |
| path = dtw.shape_dtw(positive_prototypes[selected_id], pat, dtw.RETURN_PATH, | |
| slope_constraint=slope_constraint, window=window) | |
| else: | |
| for p, pos_prot in enumerate(positive_prototypes): | |
| for ps, pos_samp in enumerate(positive_prototypes): | |
| if p != ps: | |
| pos_aves[p] += (1. / (pos_k - 1.)) * dtw.dtw(pos_prot, pos_samp, dtw.RETURN_VALUE, | |
| slope_constraint=slope_constraint, | |
| window=window) | |
| for ns, neg_samp in enumerate(negative_prototypes): | |
| neg_aves[p] += (1. / neg_k) * dtw.dtw(pos_prot, neg_samp, dtw.RETURN_VALUE, | |
| slope_constraint=slope_constraint, window=window) | |
| selected_id = np.argmax(neg_aves - pos_aves) | |
| path = dtw.dtw(positive_prototypes[selected_id], pat, dtw.RETURN_PATH, | |
| slope_constraint=slope_constraint, window=window) | |
| # Time warp | |
| warped = pat[path[1]] | |
| warp_path_interp = np.interp(orig_steps, np.linspace(0, x.shape[1] - 1., num=warped.shape[0]), path[1]) | |
| warp_amount[i] = np.sum(np.abs(orig_steps - warp_path_interp)) | |
| for dim in range(x.shape[2]): | |
| ret[i, :, dim] = np.interp(orig_steps, np.linspace(0, x.shape[1] - 1., num=warped.shape[0]), | |
| warped[:, dim]).T | |
| else: | |
| # if verbose > -1: | |
| # print("There is only one pattern of class {}".format(l[i])) | |
| ret[i, :] = pat | |
| warp_amount[i] = 0. | |
| if use_variable_slice: | |
| max_warp = np.max(warp_amount) | |
| if max_warp == 0: | |
| # unchanged | |
| ret = window_slice(ret, reduce_ratio=0.9) | |
| else: | |
| for i, pat in enumerate(ret): | |
| # Variable Sllicing | |
| ret[i] = window_slice(pat[np.newaxis, :, :], reduce_ratio=0.9 + 0.1 * warp_amount[i] / max_warp)[0] | |
| return ret | |
| def discriminative_guided_warp_shape(x, labels, batch_size=6, slope_constraint="symmetric", use_window=True): | |
| return discriminative_guided_warp(x, labels, batch_size, slope_constraint, use_window, dtw_type="shape") | |
| def run_augmentation(x, y, args): | |
| print("Augmenting %s" % args.data) | |
| np.random.seed(args.seed) | |
| x_aug = x | |
| y_aug = y | |
| if args.augmentation_ratio > 0: | |
| augmentation_tags = "%d" % args.augmentation_ratio | |
| for n in range(args.augmentation_ratio): | |
| x_temp, augmentation_tags = augment(x, y, args) | |
| x_aug = np.append(x_aug, x_temp, axis=0) | |
| y_aug = np.append(y_aug, y, axis=0) | |
| print("Round %d: %s done" % (n, augmentation_tags)) | |
| if args.extra_tag: | |
| augmentation_tags += "_" + args.extra_tag | |
| else: | |
| augmentation_tags = args.extra_tag | |
| return x_aug, y_aug, augmentation_tags | |
| def run_augmentation_single(x, y, args): | |
| # print("Augmenting %s"%args.data) | |
| np.random.seed(args.seed) | |
| x_aug = x | |
| y_aug = y | |
| if len(x.shape) < 3: | |
| # Augmenting on the entire series: using the input data as "One Big Batch" | |
| # Before - (sequence_length, num_channels) | |
| # After - (1, sequence_length, num_channels) | |
| # Note: the 'sequence_length' here is actually the length of the entire series | |
| x_input = x[np.newaxis, :] | |
| elif len(x.shape) == 3: | |
| # Augmenting on the batch series: keep current dimension (batch_size, sequence_length, num_channels) | |
| x_input = x | |
| else: | |
| raise ValueError("Input must be (batch_size, sequence_length, num_channels) dimensional") | |
| if args.augmentation_ratio > 0: | |
| augmentation_tags = "%d" % args.augmentation_ratio | |
| for n in range(args.augmentation_ratio): | |
| x_aug, augmentation_tags = augment(x_input, y, args) | |
| # print("Round %d: %s done"%(n, augmentation_tags)) | |
| if args.extra_tag: | |
| augmentation_tags += "_" + args.extra_tag | |
| else: | |
| augmentation_tags = args.extra_tag | |
| if (len(x.shape) < 3): | |
| # Reverse to two-dimensional in whole series augmentation scenario | |
| x_aug = x_aug.squeeze(0) | |
| return x_aug, y_aug, augmentation_tags | |
| def augment(x, y, args): | |
| import utils.augmentation as aug | |
| augmentation_tags = "" | |
| if args.jitter: | |
| x = aug.jitter(x) | |
| augmentation_tags += "_jitter" | |
| if args.scaling: | |
| x = aug.scaling(x) | |
| augmentation_tags += "_scaling" | |
| if args.rotation: | |
| x = aug.rotation(x) | |
| augmentation_tags += "_rotation" | |
| if args.permutation: | |
| x = aug.permutation(x) | |
| augmentation_tags += "_permutation" | |
| if args.randompermutation: | |
| x = aug.permutation(x, seg_mode="random") | |
| augmentation_tags += "_randomperm" | |
| if args.magwarp: | |
| x = aug.magnitude_warp(x) | |
| augmentation_tags += "_magwarp" | |
| if args.timewarp: | |
| x = aug.time_warp(x) | |
| augmentation_tags += "_timewarp" | |
| if args.windowslice: | |
| x = aug.window_slice(x) | |
| augmentation_tags += "_windowslice" | |
| if args.windowwarp: | |
| x = aug.window_warp(x) | |
| augmentation_tags += "_windowwarp" | |
| if args.spawner: | |
| x = aug.spawner(x, y) | |
| augmentation_tags += "_spawner" | |
| if args.dtwwarp: | |
| x = aug.random_guided_warp(x, y) | |
| augmentation_tags += "_rgw" | |
| if args.shapedtwwarp: | |
| x = aug.random_guided_warp_shape(x, y) | |
| augmentation_tags += "_rgws" | |
| if args.wdba: | |
| x = aug.wdba(x, y) | |
| augmentation_tags += "_wdba" | |
| if args.discdtw: | |
| x = aug.discriminative_guided_warp(x, y) | |
| augmentation_tags += "_dgw" | |
| if args.discsdtw: | |
| x = aug.discriminative_guided_warp_shape(x, y) | |
| augmentation_tags += "_dgws" | |
| return x, augmentation_tags |