Spaces:
Running
Running
| """ | |
| Audio effects for data augmentation. | |
| Several audio effects can be combined into an augmentation chain. | |
| Important note: We assume that the parallelization during training is done using | |
| multi-processing and not multi-threading. Hence, we do not need the | |
| `@sox.sox_context()` decorators as discussed in this | |
| [thread](https://github.com/pseeth/soxbindings/issues/4). | |
| AI Music Technology Group, Sony Group Corporation | |
| AI Speech and Sound Group, Sony Europe | |
| This implementation originally belongs to Sony Group Corporation, | |
| which has been introduced in the work "Automatic music mixing with deep learning and out-of-domain data". | |
| Original repo link: https://github.com/sony/FxNorm-automix | |
| This work modifies a few implementations from the original repo to suit the task. | |
| """ | |
| from itertools import permutations | |
| import logging | |
| import numpy as np | |
| import pymixconsole as pymc | |
| from pymixconsole.parameter import Parameter | |
| from pymixconsole.parameter_list import ParameterList | |
| from pymixconsole.processor import Processor | |
| from random import shuffle | |
| from scipy.signal import oaconvolve | |
| import soxbindings as sox | |
| from typing import List, Optional, Tuple, Union | |
| from numba import jit | |
| # prevent pysox from logging warnings regarding non-opimal timestretch factors | |
| logging.getLogger('sox').setLevel(logging.ERROR) | |
| # Monkey-Patch `Processor` for convenience | |
| # (a) Allow `None` as blocksize if processor can work on variable-length audio | |
| def new_init(self, name, parameters, block_size, sample_rate, dtype='float32'): | |
| """ | |
| Initialize processor. | |
| Args: | |
| self: Reference to object | |
| name (str): Name of processor. | |
| parameters (parameter_list): Parameters for this processor. | |
| block_size (int): Size of blocks for blockwise processing. | |
| Can also be `None` if full audio can be processed at once. | |
| sample_rate (int): Sample rate of input audio. Use `None` if effect is independent of this value. | |
| dtype (str): data type of samples | |
| """ | |
| self.name = name | |
| self.parameters = parameters | |
| self.block_size = block_size | |
| self.sample_rate = sample_rate | |
| self.dtype = dtype | |
| # (b) make code simpler | |
| def new_update(self, parameter_name): | |
| """ | |
| Update processor after randomization of parameters. | |
| Args: | |
| self: Reference to object. | |
| parameter_name (str): Parameter whose value has changed. | |
| """ | |
| pass | |
| # (c) representation for nice print | |
| def new_repr(self): | |
| """ | |
| Create human-readable representation. | |
| Args: | |
| self: Reference to object. | |
| Returns: | |
| string representation of object. | |
| """ | |
| return f'Processor(name={self.name!r}, parameters={self.parameters!r}' | |
| Processor.__init__ = new_init | |
| Processor.__repr__ = new_repr | |
| Processor.update = new_update | |
| class AugmentationChain: | |
| """Basic audio Fx chain which is used for data augmentation.""" | |
| def __init__(self, | |
| fxs: Optional[List[Tuple[Union[Processor, 'AugmentationChain'], float, bool]]] = [], | |
| shuffle: Optional[bool] = False, | |
| parallel: Optional[bool] = False, | |
| parallel_weight_factor = None, | |
| randomize_param_value=True): | |
| """ | |
| Create augmentation chain from the dictionary `fxs`. | |
| Args: | |
| fxs (list of tuples): First tuple element is an instances of `pymc.processor` or `AugmentationChain` that | |
| we want to use for data augmentation. Second element gives probability that effect should be applied. | |
| Third element defines, whether the processed signal is normalized by the RMS of the input. | |
| shuffle (bool): If `True` then order of Fx are changed whenever chain is applied. | |
| """ | |
| self.fxs = fxs | |
| self.shuffle = shuffle | |
| self.parallel = parallel | |
| self.parallel_weight_factor = parallel_weight_factor | |
| self.randomize_param_value = randomize_param_value | |
| def apply_processor(self, x, processor: Processor, rms_normalize): | |
| """ | |
| Pass audio in `x` through `processor` and output the respective processed audio. | |
| Args: | |
| x (Numpy array): Input audio of shape `n_samples` x `n_channels`. | |
| processor (Processor): Audio effect that we want to apply. | |
| rms_normalize (bool): If `True`, the processed signal is normalized by the RMS of the signal. | |
| Returns: | |
| Numpy array: Processed audio of shape `n_samples` x `n_channels` (same size as `x') | |
| """ | |
| n_samples_input = x.shape[0] | |
| if processor.block_size is None: | |
| y = processor.process(x) | |
| else: | |
| # make sure that n_samples is a multiple of `processor.block_size` | |
| if x.shape[0] % processor.block_size != 0: | |
| n_pad = processor.block_size - x.shape[0] % processor.block_size | |
| x = np.pad(x, ((0, n_pad), (0, 0)), mode='reflective') | |
| y = np.zeros_like(x) | |
| for idx in range(0, x.shape[0], processor.block_size): | |
| y[idx:idx+processor.block_size, :] = processor.process(x[idx:idx+processor.block_size, :]) | |
| if rms_normalize: | |
| # normalize output energy such that it is the same as the input energy | |
| scale = np.sqrt(np.mean(np.square(x)) / np.maximum(1e-7, np.mean(np.square(y)))) | |
| y *= scale | |
| # return audio of same length as x | |
| return y[:n_samples_input, :] | |
| def apply_same_processor(self, x_list, processor: Processor, rms_normalize): | |
| for i in range(len(x_list)): | |
| x_list[i] = self.apply_processor(x_list[i], processor, rms_normalize) | |
| return x_list | |
| def __call__(self, x_list): | |
| """ | |
| Apply the same augmentation chain to audio tracks in list `x_list`. | |
| Args: | |
| x_list (list of Numpy array) : List of audio samples of shape `n_samples` x `n_channels`. | |
| Returns: | |
| y_list (list of Numpy array) : List of processed audio of same shape as `x_list` where the same effects have been applied. | |
| """ | |
| # randomly shuffle effect order if `self.shuffle` is True | |
| if self.shuffle: | |
| shuffle(self.fxs) | |
| # apply effects with probabilities given in `self.fxs` | |
| y_list = x_list.copy() | |
| for fx, p, rms_normalize in self.fxs: | |
| if np.random.rand() < p: | |
| if isinstance(fx, Processor): | |
| # randomize all effect parameters (also calls `update()` for each processor) | |
| if self.randomize_param_value: | |
| fx.randomize() | |
| else: | |
| fx.update(None) | |
| # apply processor | |
| y_list = self.apply_same_processor(y_list, fx, rms_normalize) | |
| else: | |
| y_list = fx(y_list) | |
| if self.parallel: | |
| # weighting factor of input signal in the range of (0.0 ~ 0.5) | |
| weight_in = self.parallel_weight_factor if self.parallel_weight_factor else np.random.rand() / 2. | |
| for i in range(len(y_list)): | |
| y_list[i] = weight_in*x_list[i] + (1-weight_in)*y_list[i] | |
| return y_list | |
| def __repr__(self): | |
| """ | |
| Human-readable representation. | |
| Returns: | |
| string representation of object. | |
| """ | |
| return f'AugmentationChain(fxs={self.fxs!r}, shuffle={self.shuffle!r})' | |
| # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% DISTORTION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
| def hard_clip(x, threshold_dB, drive): | |
| """ | |
| Hard clip distortion. | |
| Args: | |
| x: input audio | |
| threshold_dB: threshold | |
| drive: drive | |
| Returns: | |
| (Numpy array): distorted audio | |
| """ | |
| drive_linear = np.power(10., drive / 20.).astype(np.float32) | |
| threshold_linear = 10. ** (threshold_dB / 20.) | |
| return np.clip(x * drive_linear, -threshold_linear, threshold_linear) | |
| def overdrive(x, drive, colour, sample_rate): | |
| """ | |
| Overdrive distortion. | |
| Args: | |
| x: input audio | |
| drive: Controls the amount of distortion (dB). | |
| colour: Controls the amount of even harmonic content in the output(dB) | |
| sample_rate: sampling rate | |
| Returns: | |
| (Numpy array): distorted audio | |
| """ | |
| scale = np.max(np.abs(x)) | |
| if scale > 0.9: | |
| clips = True | |
| x = x * (0.9 / scale) | |
| else: | |
| clips = False | |
| tfm = sox.Transformer() | |
| tfm.overdrive(gain_db=drive, colour=colour) | |
| y = tfm.build_array(input_array=x, sample_rate_in=sample_rate).astype(np.float32) | |
| if clips: | |
| y *= scale / 0.9 # rescale output to original scale | |
| return y | |
| def hyperbolic_tangent(x, drive): | |
| """ | |
| Hyperbolic Tanh distortion. | |
| Args: | |
| x: input audio | |
| drive: drive | |
| Returns: | |
| (Numpy array): distorted audio | |
| """ | |
| drive_linear = np.power(10., drive / 20.).astype(np.float32) | |
| return np.tanh(2. * x * drive_linear) | |
| def soft_sine(x, drive): | |
| """ | |
| Soft sine distortion. | |
| Args: | |
| x: input audio | |
| drive: drive | |
| Returns: | |
| (Numpy array): distorted audio | |
| """ | |
| drive_linear = np.power(10., drive / 20.).astype(np.float32) | |
| y = np.clip(x * drive_linear, -np.pi/4.0, np.pi/4.0) | |
| return np.sin(2. * y) | |
| def bit_crusher(x, bits): | |
| """ | |
| Bit crusher distortion. | |
| Args: | |
| x: input audio | |
| bits: bits | |
| Returns: | |
| (Numpy array): distorted audio | |
| """ | |
| return np.rint(x * (2 ** bits)) / (2 ** bits) | |
| class Distortion(Processor): | |
| """ | |
| Distortion processor. | |
| Processor parameters: | |
| mode (str): Currently supports the following five modes: hard_clip, waveshaper, soft_sine, tanh, bit_crusher. | |
| Each mode has different parameters such as threshold, factor, or bits. | |
| threshold (float): threshold | |
| drive (float): drive | |
| factor (float): factor | |
| limit_range (float): limit range | |
| bits (int): bits | |
| """ | |
| def __init__(self, sample_rate, name='Distortion', parameters=None): | |
| """ | |
| Initialize processor. | |
| Args: | |
| sample_rate (int): sample rate. | |
| name (str): Name of processor. | |
| parameters (parameter_list): Parameters for this processor. | |
| """ | |
| super().__init__(name, None, block_size=None, sample_rate=sample_rate) | |
| if not parameters: | |
| self.parameters = ParameterList() | |
| self.parameters.add(Parameter('mode', 'hard_clip', 'string', | |
| options=['hard_clip', | |
| 'overdrive', | |
| 'soft_sine', | |
| 'tanh', | |
| 'bit_crusher'])) | |
| self.parameters.add(Parameter('threshold', 0.0, 'float', | |
| units='dB', maximum=0.0, minimum=-20.0)) | |
| self.parameters.add(Parameter('drive', 0.0, 'float', | |
| units='dB', maximum=20.0, minimum=0.0)) | |
| self.parameters.add(Parameter('colour', 20.0, 'float', | |
| maximum=100.0, minimum=0.0)) | |
| self.parameters.add(Parameter('bits', 12, 'int', | |
| maximum=12, minimum=8)) | |
| def process(self, x): | |
| """ | |
| Process audio. | |
| Args: | |
| x (Numpy array): input audio of size `n_samples x n_channels`. | |
| Returns: | |
| (Numpy array): distorted audio of size `n_samples x n_channels`. | |
| """ | |
| if self.parameters.mode.value == 'hard_clip': | |
| y = hard_clip(x, self.parameters.threshold.value, self.parameters.drive.value) | |
| elif self.parameters.mode.value == 'overdrive': | |
| y = overdrive(x, self.parameters.drive.value, | |
| self.parameters.colour.value, self.sample_rate) | |
| elif self.parameters.mode.value == 'soft_sine': | |
| y = soft_sine(x, self.parameters.drive.value) | |
| elif self.parameters.mode.value == 'tanh': | |
| y = hyperbolic_tangent(x, self.parameters.drive.value) | |
| elif self.parameters.mode.value == 'bit_crusher': | |
| y = bit_crusher(x, self.parameters.bits.value) | |
| # If the output has low amplitude, (some distortion settigns can "crush" down the amplitude) | |
| # Then it`s normalised to the input's amplitude | |
| x_max = np.max(np.abs(x)) + 1e-8 | |
| o_max = np.max(np.abs(y)) + 1e-8 | |
| if x_max > o_max: | |
| y = y*(x_max/o_max) | |
| return y | |
| # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% EQUALISER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
| class Equaliser(Processor): | |
| """ | |
| Five band parametric equaliser (two shelves and three central bands). | |
| All gains are set in dB values and range from `MIN_GAIN` dB to `MAX_GAIN` dB. | |
| This processor is implemented as cascade of five biquad IIR filters | |
| that are implemented using the infamous cookbook formulae from RBJ. | |
| Processor parameters: | |
| low_shelf_gain (float), low_shelf_freq (float) | |
| first_band_gain (float), first_band_freq (float), first_band_q (float) | |
| second_band_gain (float), second_band_freq (float), second_band_q (float) | |
| third_band_gain (float), third_band_freq (float), third_band_q (float) | |
| original from https://github.com/csteinmetz1/pymixconsole/blob/master/pymixconsole/processors/equaliser.py | |
| """ | |
| def __init__(self, n_channels, | |
| sample_rate, | |
| gain_range=(-15.0, 15.0), | |
| q_range=(0.1, 2.0), | |
| bands=['low_shelf', 'first_band', 'second_band', 'third_band', 'high_shelf'], | |
| hard_clip=False, | |
| name='Equaliser', parameters=None): | |
| """ | |
| Initialize processor. | |
| Args: | |
| n_channels (int): Number of audio channels. | |
| sample_rate (int): Sample rate of audio. | |
| gain_range (tuple of floats): minimum and maximum gain that can be used. | |
| q_range (tuple of floats): minimum and maximum q value. | |
| hard_clip (bool): Whether we clip to [-1, 1.] after processing. | |
| name (str): Name of processor. | |
| parameters (parameter_list): Parameters for this processor. | |
| """ | |
| super().__init__(name, parameters=parameters, block_size=None, sample_rate=sample_rate) | |
| self.n_channels = n_channels | |
| MIN_GAIN, MAX_GAIN = gain_range | |
| MIN_Q, MAX_Q = q_range | |
| if not parameters: | |
| self.parameters = ParameterList() | |
| # low shelf parameters ------- | |
| self.parameters.add(Parameter('low_shelf_gain', 0.0, 'float', minimum=MIN_GAIN, maximum=MAX_GAIN)) | |
| self.parameters.add(Parameter('low_shelf_freq', 80.0, 'float', minimum=30.0, maximum=200.0)) | |
| # first band parameters ------ | |
| self.parameters.add(Parameter('first_band_gain', 0.0, 'float', minimum=MIN_GAIN, maximum=MAX_GAIN)) | |
| self.parameters.add(Parameter('first_band_freq', 400.0, 'float', minimum=200.0, maximum=1000.0)) | |
| self.parameters.add(Parameter('first_band_q', 0.7, 'float', minimum=MIN_Q, maximum=MAX_Q)) | |
| # second band parameters ----- | |
| self.parameters.add(Parameter('second_band_gain', 0.0, 'float', minimum=MIN_GAIN, maximum=MAX_GAIN)) | |
| self.parameters.add(Parameter('second_band_freq', 2000.0, 'float', minimum=1000.0, maximum=3000.0)) | |
| self.parameters.add(Parameter('second_band_q', 0.7, 'float', minimum=MIN_Q, maximum=MAX_Q)) | |
| # third band parameters ------ | |
| self.parameters.add(Parameter('third_band_gain', 0.0, 'float', minimum=MIN_GAIN, maximum=MAX_GAIN)) | |
| self.parameters.add(Parameter('third_band_freq', 4000.0, 'float', minimum=3000.0, maximum=8000.0)) | |
| self.parameters.add(Parameter('third_band_q', 0.7, 'float', minimum=MIN_Q, maximum=MAX_Q)) | |
| # high shelf parameters ------ | |
| self.parameters.add(Parameter('high_shelf_gain', 0.0, 'float', minimum=MIN_GAIN, maximum=MAX_GAIN)) | |
| self.parameters.add(Parameter('high_shelf_freq', 8000.0, 'float', minimum=5000.0, maximum=10000.0)) | |
| self.bands = bands | |
| self.filters = self.setup_filters() | |
| self.hard_clip = hard_clip | |
| def setup_filters(self): | |
| """ | |
| Create IIR filters. | |
| Returns: | |
| IIR filters | |
| """ | |
| filters = {} | |
| for band in self.bands: | |
| G = getattr(self.parameters, band + '_gain').value | |
| fc = getattr(self.parameters, band + '_freq').value | |
| rate = self.sample_rate | |
| if band in ['low_shelf', 'high_shelf']: | |
| Q = 0.707 | |
| filter_type = band | |
| else: | |
| Q = getattr(self.parameters, band + '_q').value | |
| filter_type = 'peaking' | |
| filters[band] = pymc.components.iirfilter.IIRfilter(G, Q, fc, rate, filter_type, n_channels=self.n_channels) | |
| return filters | |
| def update_filter(self, band): | |
| """ | |
| Update filters. | |
| Args: | |
| band (str): Band that should be updated. | |
| """ | |
| self.filters[band].G = getattr(self.parameters, band + '_gain').value | |
| self.filters[band].fc = getattr(self.parameters, band + '_freq').value | |
| self.filters[band].rate = self.sample_rate | |
| if band in ['first_band', 'second_band', 'third_band']: | |
| self.filters[band].Q = getattr(self.parameters, band + '_q').value | |
| def update(self, parameter_name=None): | |
| """ | |
| Update processor after randomization of parameters. | |
| Args: | |
| parameter_name (str): Parameter whose value has changed. | |
| """ | |
| if parameter_name is not None: | |
| bands = ['_'.join(parameter_name.split('_')[:2])] | |
| else: | |
| bands = self.bands | |
| for band in bands: | |
| self.update_filter(band) | |
| for _band, iirfilter in self.filters.items(): | |
| iirfilter.reset_state() | |
| def reset_state(self): | |
| """Reset state.""" | |
| for _band, iirfilter in self.filters.items(): | |
| iirfilter.reset_state() | |
| def process(self, x): | |
| """ | |
| Process audio. | |
| Args: | |
| x (Numpy array): input audio of size `n_samples x n_channels`. | |
| Returns: | |
| (Numpy array): equalized audio of size `n_samples x n_channels`. | |
| """ | |
| for _band, iirfilter in self.filters.items(): | |
| iirfilter.reset_state() | |
| x = iirfilter.apply_filter(x) | |
| if self.hard_clip: | |
| x = np.clip(x, -1.0, 1.0) | |
| # make sure that we have float32 as IIR filtering returns float64 | |
| x = x.astype(np.float32) | |
| # make sure that we have two dimensions (if `n_channels == 1`) | |
| if x.ndim == 1: | |
| x = x[:, np.newaxis] | |
| return x | |
| # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% COMPRESSOR %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
| def compressor_process(x, threshold, attack_time, release_time, ratio, makeup_gain, sample_rate, yL_prev): | |
| """ | |
| Apply compressor. | |
| Args: | |
| x (Numpy array): audio data. | |
| threshold: threshold in dB. | |
| attack_time: attack_time in ms. | |
| release_time: release_time in ms. | |
| ratio: ratio. | |
| makeup_gain: makeup_gain. | |
| sample_rate: sample rate. | |
| yL_prev: internal state of the envelop gain. | |
| Returns: | |
| compressed audio. | |
| """ | |
| M = x.shape[0] | |
| x_g = np.zeros(M) | |
| x_l = np.zeros(M) | |
| y_g = np.zeros(M) | |
| y_l = np.zeros(M) | |
| c = np.zeros(M) | |
| yL_prev = 0. | |
| alpha_attack = np.exp(-1/(0.001 * sample_rate * attack_time)) | |
| alpha_release = np.exp(-1/(0.001 * sample_rate * release_time)) | |
| for i in np.arange(M): | |
| if np.abs(x[i]) < 0.000001: | |
| x_g[i] = -120.0 | |
| else: | |
| x_g[i] = 20 * np.log10(np.abs(x[i])) | |
| if ratio > 1: | |
| if x_g[i] >= threshold: | |
| y_g[i] = threshold + (x_g[i] - threshold) / ratio | |
| else: | |
| y_g[i] = x_g[i] | |
| elif ratio < 1: | |
| if x_g[i] <= threshold: | |
| y_g[i] = threshold + (x_g[i] - threshold) / (1/ratio) | |
| else: | |
| y_g[i] = x_g[i] | |
| x_l[i] = x_g[i] - y_g[i] | |
| if x_l[i] > yL_prev: | |
| y_l[i] = alpha_attack * yL_prev + (1 - alpha_attack) * x_l[i] | |
| else: | |
| y_l[i] = alpha_release * yL_prev + (1 - alpha_release) * x_l[i] | |
| c[i] = np.power(10.0, (makeup_gain - y_l[i]) / 20.0) | |
| yL_prev = y_l[i] | |
| y = x * c | |
| return y, yL_prev | |
| class Compressor(Processor): | |
| """ | |
| Single band stereo dynamic range compressor. | |
| Processor parameters: | |
| threshold (float) | |
| attack_time (float) | |
| release_time (float) | |
| ratio (float) | |
| makeup_gain (float) | |
| """ | |
| def __init__(self, sample_rate, name='Compressor', parameters=None): | |
| """ | |
| Initialize processor. | |
| Args: | |
| sample_rate (int): Sample rate of input audio. | |
| name (str): Name of processor. | |
| parameters (parameter_list): Parameters for this processor. | |
| """ | |
| super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate) | |
| if not parameters: | |
| self.parameters = ParameterList() | |
| self.parameters.add(Parameter('threshold', -20.0, 'float', units='dB', minimum=-80.0, maximum=-5.0)) | |
| self.parameters.add(Parameter('attack_time', 2.0, 'float', units='ms', minimum=1., maximum=20.0)) | |
| self.parameters.add(Parameter('release_time', 100.0, 'float', units='ms', minimum=50.0, maximum=500.0)) | |
| self.parameters.add(Parameter('ratio', 4.0, 'float', minimum=4., maximum=40.0)) | |
| # we remove makeup_gain parameter inside the Compressor | |
| # store internal state (for block-wise processing) | |
| self.yL_prev = None | |
| def process(self, x): | |
| """ | |
| Process audio. | |
| Args: | |
| x (Numpy array): input audio of size `n_samples x n_channels`. | |
| Returns: | |
| (Numpy array): compressed audio of size `n_samples x n_channels`. | |
| """ | |
| if self.yL_prev is None: | |
| self.yL_prev = [0.] * x.shape[1] | |
| if not self.parameters.threshold.value == 0.0 or not self.parameters.ratio.value == 1.0: | |
| y = np.zeros_like(x) | |
| for ch in range(x.shape[1]): | |
| y[:, ch], self.yL_prev[ch] = compressor_process(x[:, ch], | |
| self.parameters.threshold.value, | |
| self.parameters.attack_time.value, | |
| self.parameters.release_time.value, | |
| self.parameters.ratio.value, | |
| 0.0, # makeup_gain = 0 | |
| self.sample_rate, | |
| self.yL_prev[ch]) | |
| else: | |
| y = x | |
| return y | |
| def update(self, parameter_name=None): | |
| """ | |
| Update processor after randomization of parameters. | |
| Args: | |
| parameter_name (str): Parameter whose value has changed. | |
| """ | |
| self.yL_prev = None | |
| # %%%%%%%%%%%%%%%%%%%%%%%%%% CONVOLUTIONAL REVERB %%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
| class ConvolutionalReverb(Processor): | |
| """ | |
| Convolutional Reverb. | |
| Processor parameters: | |
| wet_dry (float): Wet/dry ratio. | |
| decay (float): Applies a fade out to the impulse response. | |
| pre_delay (float): Value in ms. Shifts the IR in time and allows. | |
| A positive value produces a traditional delay between the dry signal and the wet. | |
| A negative delay is, in reality, zero delay, but effectively trims off the start of IR, | |
| so the reverb response begins at a point further in. | |
| """ | |
| def __init__(self, impulse_responses, sample_rate, name='ConvolutionalReverb', parameters=None): | |
| """ | |
| Initialize processor. | |
| Args: | |
| impulse_responses (list): List with impulse responses created by `common_dataprocessing.create_dataset` | |
| sample_rate (int): Sample rate that we should assume (used for fade-out computation) | |
| name (str): Name of processor. | |
| parameters (parameter_list): Parameters for this processor. | |
| Raises: | |
| ValueError: if no impulse responses are provided. | |
| """ | |
| super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate) | |
| if impulse_responses is None: | |
| raise ValueError('List of impulse responses must be provided for ConvolutionalReverb processor.') | |
| self.impulse_responses = impulse_responses | |
| if not parameters: | |
| self.parameters = ParameterList() | |
| self.max_ir_num = len(max(impulse_responses, key=len)) | |
| self.parameters.add(Parameter('index', 0, 'int', minimum=0, maximum=len(impulse_responses))) | |
| self.parameters.add(Parameter('index_ir', 0, 'int', minimum=0, maximum=self.max_ir_num)) | |
| self.parameters.add(Parameter('wet', 1.0, 'float', minimum=1.0, maximum=1.0)) | |
| self.parameters.add(Parameter('dry', 0.0, 'float', minimum=0.0, maximum=0.0)) | |
| self.parameters.add(Parameter('decay', 1.0, 'float', minimum=1.0, maximum=1.0)) | |
| self.parameters.add(Parameter('pre_delay', 0, 'int', units='ms', minimum=0, maximum=0)) | |
| def update(self, parameter_name=None): | |
| """ | |
| Update processor after randomization of parameters. | |
| Args: | |
| parameter_name (str): Parameter whose value has changed. | |
| """ | |
| # we sample IR with a uniform random distribution according to RT60 values | |
| chosen_ir_duration = self.impulse_responses[self.parameters.index.value] | |
| chosen_ir_idx = self.parameters.index_ir.value % len(chosen_ir_duration) | |
| self.h = np.copy(chosen_ir_duration[chosen_ir_idx]['impulse_response']()) | |
| # fade out the impulse based on the decay setting (starting from peak value) | |
| if self.parameters.decay.value < 1.: | |
| idx_peak = np.argmax(np.max(np.abs(self.h), axis=1), axis=0) | |
| fstart = np.minimum(self.h.shape[0], | |
| idx_peak + int(self.parameters.decay.value * (self.h.shape[0] - idx_peak))) | |
| fstop = np.minimum(self.h.shape[0], fstart + int(0.020*self.sample_rate)) # constant 20 ms fade out | |
| flen = fstop - fstart | |
| fade = np.arange(1, flen+1, dtype=self.dtype)/flen | |
| fade = np.power(0.1, fade * 5) | |
| self.h[fstart:fstop, :] *= fade[:, np.newaxis] | |
| self.h = self.h[:fstop] | |
| def process(self, x): | |
| """ | |
| Process audio. | |
| Args: | |
| x (Numpy array): input audio of size `n_samples x n_channels`. | |
| Returns: | |
| (Numpy array): reverbed audio of size `n_samples x n_channels`. | |
| """ | |
| # reshape IR to the correct size | |
| n_channels = x.shape[1] | |
| if self.h.shape[1] == 1 and n_channels > 1: | |
| self.h = np.hstack([self.h] * n_channels) # repeat mono IR for multi-channel input | |
| if self.h.shape[1] > 1 and n_channels == 1: | |
| self.h = self.h[:, np.random.randint(self.h.shape[1]), np.newaxis] # randomly choose one IR channel | |
| if self.parameters.wet.value == 0.0: | |
| return x | |
| else: | |
| # perform convolution to get wet signal | |
| y = oaconvolve(x, self.h, mode='full', axes=0) | |
| # cut out wet signal (compensating for the delay that the IR is introducing + predelay) | |
| idx = np.argmax(np.max(np.abs(self.h), axis=1), axis=0) | |
| idx += int(0.001 * np.abs(self.parameters.pre_delay.value) * self.sample_rate) | |
| idx = np.clip(idx, 0, self.h.shape[0]-1) | |
| y = y[idx:idx+x.shape[0], :] | |
| # return weighted sum of dry and wet signal | |
| return self.parameters.dry.value * x + self.parameters.wet.value * y | |
| # %%%%%%%%%%%%%%%%%%%%%%%%%%%%% HAAS EFFECT %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
| def haas_process(x, delay, feedback, wet_channel): | |
| """ | |
| Add Haas effect to audio. | |
| Args: | |
| x (Numpy array): input audio. | |
| delay: Delay that we apply to one of the channels (in samples). | |
| feedback: Feedback value. | |
| wet_channel: Which channel we process (`left` or `right`). | |
| Returns: | |
| (Numpy array): Audio with Haas effect. | |
| """ | |
| y = np.copy(x) | |
| if wet_channel == 'left': | |
| y[:, 0] += feedback * np.roll(x[:, 0], delay) | |
| elif wet_channel == 'right': | |
| y[:, 1] += feedback * np.roll(x[:, 1], delay) | |
| return y | |
| class Haas(Processor): | |
| """ | |
| Haas Effect Processor. | |
| Randomly selects one channel and applies a short delay to it. | |
| Processor parameters: | |
| delay (int) | |
| feedback (float) | |
| wet_channel (string) | |
| """ | |
| def __init__(self, sample_rate, delay_range=(-0.040, 0.040), name='Haas', parameters=None, | |
| ): | |
| """ | |
| Initialize processor. | |
| Args: | |
| sample_rate (int): Sample rate of input audio. | |
| delay_range (tuple of floats): minimum/maximum delay for Haas effect. | |
| name (str): Name of processor. | |
| parameters (parameter_list): Parameters for this processor. | |
| """ | |
| super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate) | |
| if not parameters: | |
| self.parameters = ParameterList() | |
| self.parameters.add(Parameter('delay', int(delay_range[1] * sample_rate), 'int', units='samples', | |
| minimum=int(delay_range[0] * sample_rate), | |
| maximum=int(delay_range[1] * sample_rate))) | |
| self.parameters.add(Parameter('feedback', 0.35, 'float', minimum=0.33, maximum=0.66)) | |
| self.parameters.add(Parameter('wet_channel', 'left', 'string', options=['left', 'right'])) | |
| def process(self, x): | |
| """ | |
| Process audio. | |
| Args: | |
| x (Numpy array): input audio of size `n_samples x n_channels`. | |
| Returns: | |
| (Numpy array): audio with Haas effect of size `n_samples x n_channels`. | |
| """ | |
| assert x.shape[1] == 1 or x.shape[1] == 2, 'Haas effect only works with monaural or stereo audio.' | |
| if x.shape[1] < 2: | |
| x = np.repeat(x, 2, axis=1) | |
| y = haas_process(x, self.parameters.delay.value, | |
| self.parameters.feedback.value, self.parameters.wet_channel.value) | |
| return y | |
| def update(self, parameter_name=None): | |
| """ | |
| Update processor after randomization of parameters. | |
| Args: | |
| parameter_name (str): Parameter whose value has changed. | |
| """ | |
| self.reset_state() | |
| def reset_state(self): | |
| """Reset state.""" | |
| self.read_idx = 0 | |
| self.write_idx = self.parameters.delay.value | |
| self.buffer = np.zeros((65536, 2)) | |
| # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% PANNER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
| class Panner(Processor): | |
| """ | |
| Simple stereo panner. | |
| If input is mono, output is stereo. | |
| Original edited from https://github.com/csteinmetz1/pymixconsole/blob/master/pymixconsole/processors/panner.py | |
| """ | |
| def __init__(self, name='Panner', parameters=None): | |
| """ | |
| Initialize processor. | |
| Args: | |
| name (str): Name of processor. | |
| parameters (parameter_list): Parameters for this processor. | |
| """ | |
| # default processor class constructor | |
| super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=None) | |
| if not parameters: | |
| self.parameters = ParameterList() | |
| self.parameters.add(Parameter('pan', 0.5, 'float', minimum=0., maximum=1.)) | |
| self.parameters.add(Parameter('pan_law', '-4.5dB', 'string', | |
| options=['-4.5dB', 'linear', 'constant_power'])) | |
| # setup the coefficents based on default params | |
| self.update() | |
| def _calculate_pan_coefficents(self): | |
| """ | |
| Calculate panning coefficients from the chosen pan law. | |
| Based on the set pan law determine the gain value | |
| to apply for the left and right channel to achieve panning effect. | |
| This operates on the assumption that the input channel is mono. | |
| The output data will be stereo at the moment, but could be expanded | |
| to a higher channel count format. | |
| The panning value is in the range [0, 1], where | |
| 0 means the signal is panned completely to the left, and | |
| 1 means the signal is apanned copletely to the right. | |
| Raises: | |
| ValueError: `self.parameters.pan_law` is not supported. | |
| """ | |
| self.gains = np.zeros(2, dtype=self.dtype) | |
| # first scale the linear [0, 1] to [0, pi/2] | |
| theta = self.parameters.pan.value * (np.pi/2) | |
| if self.parameters.pan_law.value == 'linear': | |
| self.gains[0] = ((np.pi/2) - theta) * (2/np.pi) | |
| self.gains[1] = theta * (2/np.pi) | |
| elif self.parameters.pan_law.value == 'constant_power': | |
| self.gains[0] = np.cos(theta) | |
| self.gains[1] = np.sin(theta) | |
| elif self.parameters.pan_law.value == '-4.5dB': | |
| self.gains[0] = np.sqrt(((np.pi/2) - theta) * (2/np.pi) * np.cos(theta)) | |
| self.gains[1] = np.sqrt(theta * (2/np.pi) * np.sin(theta)) | |
| else: | |
| raise ValueError(f'Invalid pan_law {self.parameters.pan_law.value}.') | |
| def process(self, x): | |
| """ | |
| Process audio. | |
| Args: | |
| x (Numpy array): input audio of size `n_samples x n_channels`. | |
| Returns: | |
| (Numpy array): panned audio of size `n_samples x n_channels`. | |
| """ | |
| assert x.shape[1] == 1 or x.shape[1] == 2, 'Panner only works with monaural or stereo audio.' | |
| if x.shape[1] < 2: | |
| x = np.repeat(x, 2, axis=1) | |
| return x * self.gains | |
| def update(self, parameter_name=None): | |
| """ | |
| Update processor after randomization of parameters. | |
| Args: | |
| parameter_name (str): Parameter whose value has changed. | |
| """ | |
| self._calculate_pan_coefficents() | |
| def reset_state(self): | |
| """Reset state.""" | |
| self._output_buffer = np.empty([self.block_size, 2]) | |
| self.update() | |
| # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% STEREO IMAGER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
| class MidSideImager(Processor): | |
| def __init__(self, name='IMAGER', parameters=None): | |
| super().__init__(name, parameters=parameters, block_size=None, sample_rate=None) | |
| if not parameters: | |
| self.parameters = ParameterList() | |
| # values of 0.0~1.0 indicate making the signal more centered while 1.0~2.0 means making the signal more wider | |
| self.parameters.add(Parameter("bal", 0.0, "float", processor=self, minimum=0.0, maximum=2.0)) | |
| def process(self, data): | |
| """ | |
| # input shape : [signal length, 2] | |
| ### note! stereo imager won't work if the input signal is a mono signal (left==right) | |
| ### if you want to apply stereo imager to a mono signal, first stereoize it with Haas effects | |
| """ | |
| # to mid-side channels | |
| mid, side = self.lr_to_ms(data[:,0], data[:,1]) | |
| # apply mid-side weights according to energy | |
| mid_e, side_e = np.sum(mid**2), np.sum(side**2) | |
| total_e = mid_e + side_e | |
| # apply weights | |
| max_side_multiplier = np.sqrt(total_e / (side_e + 1e-3)) | |
| # compute current multiply factor | |
| cur_bal = round(getattr(self.parameters, "bal").value, 3) | |
| side_gain = cur_bal if cur_bal <= 1. else max_side_multiplier * (cur_bal-1) | |
| # multiply weighting factor | |
| new_side = side * side_gain | |
| new_side_e = side_e * (side_gain ** 2) | |
| left_mid_e = total_e - new_side_e | |
| mid_gain = np.sqrt(left_mid_e / (mid_e + 1e-3)) | |
| new_mid = mid * mid_gain | |
| # convert back to left-right channels | |
| left, right = self.ms_to_lr(new_mid, new_side) | |
| imaged = np.stack([left, right], 1) | |
| return imaged | |
| # left-right channeled signal to mid-side signal | |
| def lr_to_ms(self, left, right): | |
| mid = left + right | |
| side = left - right | |
| return mid, side | |
| # mid-side channeled signal to left-right signal | |
| def ms_to_lr(self, mid, side): | |
| left = (mid + side) / 2 | |
| right = (mid - side) / 2 | |
| return left, right | |
| def update(self, parameter_name=None): | |
| return parameter_name | |
| # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% GAIN %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
| class Gain(Processor): | |
| """ | |
| Gain Processor. | |
| Applies gain in dB and can also randomly inverts polarity. | |
| Processor parameters: | |
| gain (float): Gain that should be applied (dB scale). | |
| invert (bool): If True, then we also invert the waveform. | |
| """ | |
| def __init__(self, name='Gain', parameters=None): | |
| """ | |
| Initialize processor. | |
| Args: | |
| name (str): Name of processor. | |
| parameters (parameter_list): Parameters for this processor. | |
| """ | |
| super().__init__(name, parameters=parameters, block_size=None, sample_rate=None) | |
| if not parameters: | |
| self.parameters = ParameterList() | |
| # self.parameters.add(Parameter('gain', 1.0, 'float', units='dB', minimum=-12.0, maximum=6.0)) | |
| self.parameters.add(Parameter('gain', 1.0, 'float', units='dB', minimum=-6.0, maximum=9.0)) | |
| self.parameters.add(Parameter('invert', False, 'bool')) | |
| def process(self, x): | |
| """ | |
| Process audio. | |
| Args: | |
| x (Numpy array): input audio of size `n_samples x n_channels`. | |
| Returns: | |
| (Numpy array): gain-augmented audio of size `n_samples x n_channels`. | |
| """ | |
| gain = 10 ** (self.parameters.gain.value / 20.) | |
| if self.parameters.invert.value: | |
| gain = -gain | |
| return gain * x | |
| # %%%%%%%%%%%%%%%%%%%%%%% SIMPLE CHANNEL SWAP %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
| class SwapChannels(Processor): | |
| """ | |
| Swap channels in multi-channel audio. | |
| Processor parameters: | |
| index (int) Selects the permutation that we are using. | |
| Please note that "no permutation" is one of the permutations in `self.permutations` at index `0`. | |
| """ | |
| def __init__(self, n_channels, name='SwapChannels', parameters=None): | |
| """ | |
| Initialize processor. | |
| Args: | |
| n_channels (int): Number of channels in audio that we want to process. | |
| name (str): Name of processor. | |
| parameters (parameter_list): Parameters for this processor. | |
| """ | |
| super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=None) | |
| self.permutations = tuple(permutations(range(n_channels), n_channels)) | |
| if not parameters: | |
| self.parameters = ParameterList() | |
| self.parameters.add(Parameter('index', 0, 'int', minimum=0, maximum=len(self.permutations))) | |
| def process(self, x): | |
| """ | |
| Process audio. | |
| Args: | |
| x (Numpy array): input audio of size `n_samples x n_channels`. | |
| Returns: | |
| (Numpy array): channel-swapped audio of size `n_samples x n_channels`. | |
| """ | |
| return x[:, self.permutations[self.parameters.index.value]] | |
| # %%%%%%%%%%%%%%%%%%%%%%% Monauralize %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
| class Monauralize(Processor): | |
| """ | |
| Monauralizes audio (i.e., removes spatial information). | |
| Process parameters: | |
| seed_channel (int): channel that we use for overwriting the others. | |
| """ | |
| def __init__(self, n_channels, name='Monauralize', parameters=None): | |
| """ | |
| Initialize processor. | |
| Args: | |
| n_channels (int): Number of channels in audio that we want to process. | |
| name (str): Name of processor. | |
| parameters (parameter_list): Parameters for this processor. | |
| """ | |
| super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=None) | |
| if not parameters: | |
| self.parameters = ParameterList() | |
| self.parameters.add(Parameter('seed_channel', 0, 'int', minimum=0, maximum=n_channels)) | |
| def process(self, x): | |
| """ | |
| Process audio. | |
| Args: | |
| x (Numpy array): input audio of size `n_samples x n_channels`. | |
| Returns: | |
| (Numpy array): monauralized audio of size `n_samples x n_channels`. | |
| """ | |
| return np.tile(x[:, [self.parameters.seed_channel.value]], (1, x.shape[1])) | |
| # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% PITCH SHIFT %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
| class PitchShift(Processor): | |
| """ | |
| Simple pitch shifter using SoX and soxbindings (https://github.com/pseeth/soxbindings). | |
| Processor parameters: | |
| steps (float): Pitch shift as positive/negative semitones | |
| quick (bool): If True, this effect will run faster but with lower sound quality. | |
| """ | |
| def __init__(self, sample_rate, fix_length=True, name='PitchShift', parameters=None): | |
| """ | |
| Initialize processor. | |
| Args: | |
| sample_rate (int): Sample rate of input audio. | |
| fix_length (bool): If True, then output has same length as input. | |
| name (str): Name of processor. | |
| parameters (parameter_list): Parameters for this processor. | |
| """ | |
| super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate) | |
| if not parameters: | |
| self.parameters = ParameterList() | |
| self.parameters.add(Parameter('steps', 0.0, 'float', minimum=-6., maximum=6.)) | |
| self.parameters.add(Parameter('quick', False, 'bool')) | |
| self.fix_length = fix_length | |
| self.clips = False | |
| def process(self, x): | |
| """ | |
| Process audio. | |
| Args: | |
| x (Numpy array): input audio of size `n_samples x n_channels`. | |
| Returns: | |
| (Numpy array): pitch-shifted audio of size `n_samples x n_channels`. | |
| """ | |
| if self.parameters.steps.value == 0.0: | |
| y = x | |
| else: | |
| scale = np.max(np.abs(x)) | |
| if scale > 0.9: | |
| clips = True | |
| x = x * (0.9 / scale) | |
| else: | |
| clips = False | |
| tfm = sox.Transformer() | |
| tfm.pitch(self.parameters.steps.value, quick=bool(self.parameters.quick.value)) | |
| y = tfm.build_array(input_array=x, sample_rate_in=self.sample_rate).astype(np.float32) | |
| if clips: | |
| y *= scale / 0.9 # rescale output to original scale | |
| if self.fix_length: | |
| n_samples_input = x.shape[0] | |
| n_samples_output = y.shape[0] | |
| if n_samples_input < n_samples_output: | |
| idx1 = (n_samples_output - n_samples_input) // 2 | |
| idx2 = idx1 + n_samples_input | |
| y = y[idx1:idx2] | |
| elif n_samples_input > n_samples_output: | |
| n_pad = n_samples_input - n_samples_output | |
| y = np.pad(y, ((n_pad//2, n_pad - n_pad//2), (0, 0))) | |
| return y | |
| # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% TIME STRETCH %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
| class TimeStretch(Processor): | |
| """ | |
| Simple time stretcher using SoX and soxbindings (https://github.com/pseeth/soxbindings). | |
| Processor parameters: | |
| factor (float): Time stretch factor. | |
| quick (bool): If True, this effect will run faster but with lower sound quality. | |
| stretch_type (str): Algorithm used for stretching (`tempo` or `stretch`). | |
| audio_type (str): Sets which time segments are most optmial when finding | |
| the best overlapping points for time stretching. | |
| """ | |
| def __init__(self, sample_rate, fix_length=True, name='TimeStretch', parameters=None): | |
| """ | |
| Initialize processor. | |
| Args: | |
| sample_rate (int): Sample rate of input audio. | |
| fix_length (bool): If True, then output has same length as input. | |
| name (str): Name of processor. | |
| parameters (parameter_list): Parameters for this processor. | |
| """ | |
| super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate) | |
| if not parameters: | |
| self.parameters = ParameterList() | |
| self.parameters.add(Parameter('factor', 1.0, 'float', minimum=1/1.33, maximum=1.33)) | |
| self.parameters.add(Parameter('quick', False, 'bool')) | |
| self.parameters.add(Parameter('stretch_type', 'tempo', 'string', options=['tempo', 'stretch'])) | |
| self.parameters.add(Parameter('audio_type', 'l', 'string', options=['m', 's', 'l'])) | |
| self.fix_length = fix_length | |
| def process(self, x): | |
| """ | |
| Process audio. | |
| Args: | |
| x (Numpy array): input audio of size `n_samples x n_channels`. | |
| Returns: | |
| (Numpy array): time-stretched audio of size `n_samples x n_channels`. | |
| """ | |
| if self.parameters.factor.value == 1.0: | |
| y = x | |
| else: | |
| scale = np.max(np.abs(x)) | |
| if scale > 0.9: | |
| clips = True | |
| x = x * (0.9 / scale) | |
| else: | |
| clips = False | |
| tfm = sox.Transformer() | |
| if self.parameters.stretch_type.value == 'stretch': | |
| tfm.stretch(self.parameters.factor.value) | |
| elif self.parameters.stretch_type.value == 'tempo': | |
| tfm.tempo(self.parameters.factor.value, | |
| audio_type=self.parameters.audio_type.value, | |
| quick=bool(self.parameters.quick.value)) | |
| y = tfm.build_array(input_array=x, sample_rate_in=self.sample_rate).astype(np.float32) | |
| if clips: | |
| y *= scale / 0.9 # rescale output to original scale | |
| if self.fix_length: | |
| n_samples_input = x.shape[0] | |
| n_samples_output = y.shape[0] | |
| if n_samples_input < n_samples_output: | |
| idx1 = (n_samples_output - n_samples_input) // 2 | |
| idx2 = idx1 + n_samples_input | |
| y = y[idx1:idx2] | |
| elif n_samples_input > n_samples_output: | |
| n_pad = n_samples_input - n_samples_output | |
| y = np.pad(y, ((n_pad//2, n_pad - n_pad//2), (0, 0))) | |
| return y | |
| # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% PLAYBACK SPEED %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
| class PlaybackSpeed(Processor): | |
| """ | |
| Simple playback speed effect using SoX and soxbindings (https://github.com/pseeth/soxbindings). | |
| Processor parameters: | |
| factor (float): Playback speed factor. | |
| """ | |
| def __init__(self, sample_rate, fix_length=True, name='PlaybackSpeed', parameters=None): | |
| """ | |
| Initialize processor. | |
| Args: | |
| sample_rate (int): Sample rate of input audio. | |
| fix_length (bool): If True, then output has same length as input. | |
| name (str): Name of processor. | |
| parameters (parameter_list): Parameters for this processor. | |
| """ | |
| super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate) | |
| if not parameters: | |
| self.parameters = ParameterList() | |
| self.parameters.add(Parameter('factor', 1.0, 'float', minimum=1./1.33, maximum=1.33)) | |
| self.fix_length = fix_length | |
| def process(self, x): | |
| """ | |
| Process audio. | |
| Args: | |
| x (Numpy array): input audio of size `n_samples x n_channels`. | |
| Returns: | |
| (Numpy array): resampled audio of size `n_samples x n_channels`. | |
| """ | |
| if self.parameters.factor.value == 1.0: | |
| y = x | |
| else: | |
| scale = np.max(np.abs(x)) | |
| if scale > 0.9: | |
| clips = True | |
| x = x * (0.9 / scale) | |
| else: | |
| clips = False | |
| tfm = sox.Transformer() | |
| tfm.speed(self.parameters.factor.value) | |
| y = tfm.build_array(input_array=x, sample_rate_in=self.sample_rate).astype(np.float32) | |
| if clips: | |
| y *= scale / 0.9 # rescale output to original scale | |
| if self.fix_length: | |
| n_samples_input = x.shape[0] | |
| n_samples_output = y.shape[0] | |
| if n_samples_input < n_samples_output: | |
| idx1 = (n_samples_output - n_samples_input) // 2 | |
| idx2 = idx1 + n_samples_input | |
| y = y[idx1:idx2] | |
| elif n_samples_input > n_samples_output: | |
| n_pad = n_samples_input - n_samples_output | |
| y = np.pad(y, ((n_pad//2, n_pad - n_pad//2), (0, 0))) | |
| return y | |
| # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% BEND %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
| class Bend(Processor): | |
| """ | |
| Simple bend effect using SoX and soxbindings (https://github.com/pseeth/soxbindings). | |
| Processor parameters: | |
| n_bends (int): Number of segments or intervals to pitch shift | |
| """ | |
| def __init__(self, sample_rate, pitch_range=(-600, 600), fix_length=True, name='Bend', parameters=None): | |
| """ | |
| Initialize processor. | |
| Args: | |
| sample_rate (int): Sample rate of input audio. | |
| pitch_range (tuple of ints): min and max pitch bending ranges in cents | |
| fix_length (bool): If True, then output has same length as input. | |
| name (str): Name of processor. | |
| parameters (parameter_list): Parameters for this processor. | |
| """ | |
| super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate) | |
| if not parameters: | |
| self.parameters = ParameterList() | |
| self.parameters.add(Parameter('n_bends', 2, 'int', minimum=2, maximum=10)) | |
| self.pitch_range_min, self.pitch_range_max = pitch_range | |
| def process(self, x): | |
| """ | |
| Process audio. | |
| Args: | |
| x (Numpy array): input audio of size `n_samples x n_channels`. | |
| Returns: | |
| (Numpy array): pitch-bended audio of size `n_samples x n_channels`. | |
| """ | |
| n_bends = self.parameters.n_bends.value | |
| max_length = x.shape[0] / self.sample_rate | |
| # Generates random non-overlapping segments | |
| delta = 1. / self.sample_rate | |
| boundaries = np.sort(delta + np.random.rand(n_bends-1) * (max_length - delta)) | |
| start, end = np.zeros(n_bends), np.zeros(n_bends) | |
| start[0] = delta | |
| for i, b in enumerate(boundaries): | |
| end[i] = b | |
| start[i+1] = b | |
| end[-1] = max_length | |
| # randomly sample pitch-shifts in cents | |
| cents = np.random.randint(self.pitch_range_min, self.pitch_range_max+1, n_bends) | |
| # remove segment if cent value is zero or start == end (as SoX does not allow such values) | |
| idx_keep = np.logical_and(cents != 0, start != end) | |
| n_bends, start, end, cents = sum(idx_keep), start[idx_keep], end[idx_keep], cents[idx_keep] | |
| scale = np.max(np.abs(x)) | |
| if scale > 0.9: | |
| clips = True | |
| x = x * (0.9 / scale) | |
| else: | |
| clips = False | |
| tfm = sox.Transformer() | |
| tfm.bend(n_bends=int(n_bends), start_times=list(start), end_times=list(end), cents=list(cents)) | |
| y = tfm.build_array(input_array=x, sample_rate_in=self.sample_rate).astype(np.float32) | |
| if clips: | |
| y *= scale / 0.9 # rescale output to original scale | |
| return y | |
| # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% ALGORITHMIC REVERB %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |
| class AlgorithmicReverb(Processor): | |
| def __init__(self, name="algoreverb", parameters=None, sample_rate=44100, **kwargs): | |
| super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate, **kwargs) | |
| if not parameters: | |
| self.parameters = ParameterList() | |
| self.parameters.add(Parameter("room_size", 0.5, "float", minimum=0.05, maximum=0.85)) | |
| self.parameters.add(Parameter("damping", 0.1, "float", minimum=0.0, maximum=1.0)) | |
| self.parameters.add(Parameter("dry_mix", 0.9, "float", minimum=0.0, maximum=1.0)) | |
| self.parameters.add(Parameter("wet_mix", 0.1, "float", minimum=0.0, maximum=1.0)) | |
| self.parameters.add(Parameter("width", 0.7, "float", minimum=0.0, maximum=1.0)) | |
| # Tuning | |
| self.stereospread = 23 | |
| self.scalegain = 0.2 | |
| def process(self, data): | |
| if data.ndim >= 2: | |
| dataL = data[:,0] | |
| if data.shape[1] == 2: | |
| dataR = data[:,1] | |
| else: | |
| dataR = data[:,0] | |
| else: | |
| dataL = data | |
| dataR = data | |
| output = np.zeros((data.shape[0], 2)) | |
| xL, xR = self.process_filters(dataL.copy(), dataR.copy()) | |
| wet1_g = self.parameters.wet_mix.value * ((self.parameters.width.value/2) + 0.5) | |
| wet2_g = self.parameters.wet_mix.value * ((1-self.parameters.width.value)/2) | |
| dry_g = self.parameters.dry_mix.value | |
| output[:,0] = (wet1_g * xL) + (wet2_g * xR) + (dry_g * dataL) | |
| output[:,1] = (wet1_g * xR) + (wet2_g * xL) + (dry_g * dataR) | |
| return output | |
| def process_filters(self, dataL, dataR): | |
| xL = self.combL1.process(dataL.copy() * self.scalegain) | |
| xL += self.combL2.process(dataL.copy() * self.scalegain) | |
| xL += self.combL3.process(dataL.copy() * self.scalegain) | |
| xL += self.combL4.process(dataL.copy() * self.scalegain) | |
| xL = self.combL5.process(dataL.copy() * self.scalegain) | |
| xL += self.combL6.process(dataL.copy() * self.scalegain) | |
| xL += self.combL7.process(dataL.copy() * self.scalegain) | |
| xL += self.combL8.process(dataL.copy() * self.scalegain) | |
| xR = self.combR1.process(dataR.copy() * self.scalegain) | |
| xR += self.combR2.process(dataR.copy() * self.scalegain) | |
| xR += self.combR3.process(dataR.copy() * self.scalegain) | |
| xR += self.combR4.process(dataR.copy() * self.scalegain) | |
| xR = self.combR5.process(dataR.copy() * self.scalegain) | |
| xR += self.combR6.process(dataR.copy() * self.scalegain) | |
| xR += self.combR7.process(dataR.copy() * self.scalegain) | |
| xR += self.combR8.process(dataR.copy() * self.scalegain) | |
| yL1 = self.allpassL1.process(xL) | |
| yL2 = self.allpassL2.process(yL1) | |
| yL3 = self.allpassL3.process(yL2) | |
| yL4 = self.allpassL4.process(yL3) | |
| yR1 = self.allpassR1.process(xR) | |
| yR2 = self.allpassR2.process(yR1) | |
| yR3 = self.allpassR3.process(yR2) | |
| yR4 = self.allpassR4.process(yR3) | |
| return yL4, yR4 | |
| def update(self, parameter_name): | |
| rs = self.parameters.room_size.value | |
| dp = self.parameters.damping.value | |
| ss = self.stereospread | |
| # initialize allpass and feedback comb-filters | |
| # (with coefficients optimized for fs=44.1kHz) | |
| self.allpassL1 = pymc.components.allpass.Allpass(556, rs, self.block_size) | |
| self.allpassR1 = pymc.components.allpass.Allpass(556+ss, rs, self.block_size) | |
| self.allpassL2 = pymc.components.allpass.Allpass(441, rs, self.block_size) | |
| self.allpassR2 = pymc.components.allpass.Allpass(441+ss, rs, self.block_size) | |
| self.allpassL3 = pymc.components.allpass.Allpass(341, rs, self.block_size) | |
| self.allpassR3 = pymc.components.allpass.Allpass(341+ss, rs, self.block_size) | |
| self.allpassL4 = pymc.components.allpass.Allpass(225, rs, self.block_size) | |
| self.allpassR4 = pymc.components.allpass.Allpass(255+ss, rs, self.block_size) | |
| self.combL1 = pymc.components.comb.Comb(1116, dp, rs, self.block_size) | |
| self.combR1 = pymc.components.comb.Comb(1116+ss, dp, rs, self.block_size) | |
| self.combL2 = pymc.components.comb.Comb(1188, dp, rs, self.block_size) | |
| self.combR2 = pymc.components.comb.Comb(1188+ss, dp, rs, self.block_size) | |
| self.combL3 = pymc.components.comb.Comb(1277, dp, rs, self.block_size) | |
| self.combR3 = pymc.components.comb.Comb(1277+ss, dp, rs, self.block_size) | |
| self.combL4 = pymc.components.comb.Comb(1356, dp, rs, self.block_size) | |
| self.combR4 = pymc.components.comb.Comb(1356+ss, dp, rs, self.block_size) | |
| self.combL5 = pymc.components.comb.Comb(1422, dp, rs, self.block_size) | |
| self.combR5 = pymc.components.comb.Comb(1422+ss, dp, rs, self.block_size) | |
| self.combL6 = pymc.components.comb.Comb(1491, dp, rs, self.block_size) | |
| self.combR6 = pymc.components.comb.Comb(1491+ss, dp, rs, self.block_size) | |
| self.combL7 = pymc.components.comb.Comb(1557, dp, rs, self.block_size) | |
| self.combR7 = pymc.components.comb.Comb(1557+ss, dp, rs, self.block_size) | |
| self.combL8 = pymc.components.comb.Comb(1617, dp, rs, self.block_size) | |
| self.combR8 = pymc.components.comb.Comb(1617+ss, dp, rs, self.block_size) | |