Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| # Copyright 2019 Tomoki Hayashi | |
| # MIT License (https://opensource.org/licenses/MIT) | |
| """Perform preprocessing and raw feature extraction.""" | |
| import argparse | |
| import logging | |
| import os | |
| import librosa | |
| import numpy as np | |
| import soundfile as sf | |
| import yaml | |
| from tqdm import tqdm | |
| from parallel_wavegan.datasets import AudioDataset | |
| from parallel_wavegan.datasets import AudioSCPDataset | |
| from parallel_wavegan.utils import write_hdf5 | |
| def logmelfilterbank( | |
| audio, | |
| sampling_rate, | |
| fft_size=1024, | |
| hop_size=256, | |
| win_length=None, | |
| window="hann", | |
| num_mels=80, | |
| fmin=None, | |
| fmax=None, | |
| eps=1e-10, | |
| log_base=10.0, | |
| ): | |
| """Compute log-Mel filterbank feature. | |
| Args: | |
| audio (ndarray): Audio signal (T,). | |
| sampling_rate (int): Sampling rate. | |
| fft_size (int): FFT size. | |
| hop_size (int): Hop size. | |
| win_length (int): Window length. If set to None, it will be the same as fft_size. | |
| window (str): Window function type. | |
| num_mels (int): Number of mel basis. | |
| fmin (int): Minimum frequency in mel basis calculation. | |
| fmax (int): Maximum frequency in mel basis calculation. | |
| eps (float): Epsilon value to avoid inf in log calculation. | |
| log_base (float): Log base. If set to None, use np.log. | |
| Returns: | |
| ndarray: Log Mel filterbank feature (#frames, num_mels). | |
| """ | |
| # get amplitude spectrogram | |
| x_stft = librosa.stft( | |
| audio, | |
| n_fft=fft_size, | |
| hop_length=hop_size, | |
| win_length=win_length, | |
| window=window, | |
| pad_mode="reflect", | |
| ) | |
| spc = np.abs(x_stft).T # (#frames, #bins) | |
| # get mel basis | |
| fmin = 0 if fmin is None else fmin | |
| fmax = sampling_rate / 2 if fmax is None else fmax | |
| mel_basis = librosa.filters.mel(sampling_rate, fft_size, num_mels, fmin, fmax) | |
| mel = np.maximum(eps, np.dot(spc, mel_basis.T)) | |
| if log_base is None: | |
| return np.log(mel) | |
| elif log_base == 10.0: | |
| return np.log10(mel) | |
| elif log_base == 2.0: | |
| return np.log2(mel) | |
| else: | |
| raise ValueError(f"{log_base} is not supported.") | |
| def main(): | |
| """Run preprocessing process.""" | |
| parser = argparse.ArgumentParser( | |
| description="Preprocess audio and then extract features (See detail in parallel_wavegan/bin/preprocess.py)." | |
| ) | |
| parser.add_argument( | |
| "--wav-scp", | |
| "--scp", | |
| default=None, | |
| type=str, | |
| help="kaldi-style wav.scp file. you need to specify either scp or rootdir.", | |
| ) | |
| parser.add_argument( | |
| "--segments", | |
| default=None, | |
| type=str, | |
| help="kaldi-style segments file. if use, you must to specify both scp and segments.", | |
| ) | |
| parser.add_argument( | |
| "--rootdir", | |
| default=None, | |
| type=str, | |
| help="directory including wav files. you need to specify either scp or rootdir.", | |
| ) | |
| parser.add_argument( | |
| "--dumpdir", | |
| type=str, | |
| required=True, | |
| help="directory to dump feature files.", | |
| ) | |
| parser.add_argument( | |
| "--config", | |
| type=str, | |
| required=True, | |
| help="yaml format configuration file.", | |
| ) | |
| parser.add_argument( | |
| "--verbose", | |
| type=int, | |
| default=1, | |
| help="logging level. higher is more logging. (default=1)", | |
| ) | |
| args = parser.parse_args() | |
| # set logger | |
| if args.verbose > 1: | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format="%(asctime)s (%(module)s:%(lineno)d) %(levelname)s: %(message)s", | |
| ) | |
| elif args.verbose > 0: | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s (%(module)s:%(lineno)d) %(levelname)s: %(message)s", | |
| ) | |
| else: | |
| logging.basicConfig( | |
| level=logging.WARN, | |
| format="%(asctime)s (%(module)s:%(lineno)d) %(levelname)s: %(message)s", | |
| ) | |
| logging.warning("Skip DEBUG/INFO messages") | |
| # load config | |
| with open(args.config) as f: | |
| config = yaml.load(f, Loader=yaml.Loader) | |
| config.update(vars(args)) | |
| # check arguments | |
| if (args.wav_scp is not None and args.rootdir is not None) or ( | |
| args.wav_scp is None and args.rootdir is None | |
| ): | |
| raise ValueError("Please specify either --rootdir or --wav-scp.") | |
| # get dataset | |
| if args.rootdir is not None: | |
| dataset = AudioDataset( | |
| args.rootdir, | |
| "*.wav", | |
| audio_load_fn=sf.read, | |
| return_utt_id=True, | |
| ) | |
| else: | |
| dataset = AudioSCPDataset( | |
| args.wav_scp, | |
| segments=args.segments, | |
| return_utt_id=True, | |
| return_sampling_rate=True, | |
| ) | |
| # check directly existence | |
| if not os.path.exists(args.dumpdir): | |
| os.makedirs(args.dumpdir, exist_ok=True) | |
| # process each data | |
| for utt_id, (audio, fs) in tqdm(dataset): | |
| # check | |
| assert len(audio.shape) == 1, f"{utt_id} seems to be multi-channel signal." | |
| assert ( | |
| np.abs(audio).max() <= 1.0 | |
| ), f"{utt_id} seems to be different from 16 bit PCM." | |
| assert ( | |
| fs == config["sampling_rate"] | |
| ), f"{utt_id} seems to have a different sampling rate." | |
| # trim silence | |
| if config["trim_silence"]: | |
| audio, _ = librosa.effects.trim( | |
| audio, | |
| top_db=config["trim_threshold_in_db"], | |
| frame_length=config["trim_frame_size"], | |
| hop_length=config["trim_hop_size"], | |
| ) | |
| if "sampling_rate_for_feats" not in config: | |
| x = audio | |
| sampling_rate = config["sampling_rate"] | |
| hop_size = config["hop_size"] | |
| else: | |
| # NOTE(kan-bayashi): this procedure enables to train the model with different | |
| # sampling rate for feature and audio, e.g., training with mel extracted | |
| # using 16 kHz audio and 24 kHz audio as a target waveform | |
| x = librosa.resample(audio, fs, config["sampling_rate_for_feats"]) | |
| sampling_rate = config["sampling_rate_for_feats"] | |
| assert ( | |
| config["hop_size"] * config["sampling_rate_for_feats"] % fs == 0 | |
| ), "hop_size must be int value. please check sampling_rate_for_feats is correct." | |
| hop_size = config["hop_size"] * config["sampling_rate_for_feats"] // fs | |
| # extract feature | |
| mel = logmelfilterbank( | |
| x, | |
| sampling_rate=sampling_rate, | |
| hop_size=hop_size, | |
| fft_size=config["fft_size"], | |
| win_length=config["win_length"], | |
| window=config["window"], | |
| num_mels=config["num_mels"], | |
| fmin=config["fmin"], | |
| fmax=config["fmax"], | |
| # keep compatibility | |
| log_base=config.get("log_base", 10.0), | |
| ) | |
| # make sure the audio length and feature length are matched | |
| audio = np.pad(audio, (0, config["fft_size"]), mode="reflect") | |
| audio = audio[: len(mel) * config["hop_size"]] | |
| assert len(mel) * config["hop_size"] == len(audio) | |
| # apply global gain | |
| if config["global_gain_scale"] > 0.0: | |
| audio *= config["global_gain_scale"] | |
| if np.abs(audio).max() >= 1.0: | |
| logging.warn( | |
| f"{utt_id} causes clipping. " | |
| f"it is better to re-consider global gain scale." | |
| ) | |
| continue | |
| # save | |
| if config["format"] == "hdf5": | |
| write_hdf5( | |
| os.path.join(args.dumpdir, f"{utt_id}.h5"), | |
| "wave", | |
| audio.astype(np.float32), | |
| ) | |
| write_hdf5( | |
| os.path.join(args.dumpdir, f"{utt_id}.h5"), | |
| "feats", | |
| mel.astype(np.float32), | |
| ) | |
| elif config["format"] == "npy": | |
| np.save( | |
| os.path.join(args.dumpdir, f"{utt_id}-wave.npy"), | |
| audio.astype(np.float32), | |
| allow_pickle=False, | |
| ) | |
| np.save( | |
| os.path.join(args.dumpdir, f"{utt_id}-feats.npy"), | |
| mel.astype(np.float32), | |
| allow_pickle=False, | |
| ) | |
| else: | |
| raise ValueError("support only hdf5 or npy format.") | |
| if __name__ == "__main__": | |
| main() | |