Spaces:
Running
on
Zero
Running
on
Zero
| import sys, copy | |
| import os, random, numpy as np, socket | |
| import json | |
| import tqdm | |
| from multiprocessing import Pool | |
| import glob, os, fire | |
| from collections import defaultdict | |
| sys.path.insert(0, "../../") | |
| from data.tokenizer import TextTokenizer, tokenize_text | |
| def write_jsonl(data, fn): | |
| with open(fn, "w") as file: | |
| for entry in data: | |
| file.write(json.dumps(entry, ensure_ascii=False) + "\n") | |
| def read_jsonl(file_path): | |
| cur_data = [] | |
| with open(file_path, 'r', encoding='utf-8-sig') as file: | |
| for line in file: | |
| cur_data.append(json.loads(line.strip())) | |
| return cur_data | |
| def phonemize_and_save(text, fn, text_tokenizer): | |
| """Phonemizes the text and saves the result to a file.""" | |
| phn = tokenize_text(text_tokenizer, text) | |
| os.makedirs(os.path.dirname(fn), exist_ok=True) | |
| with open(fn, "w") as f: | |
| f.write(" ".join(phn)) | |
| return set(phn) | |
| def process_item(item, root, sub_root, audio_folder, phn_folder, audio_ext, text_ext, phn_ext, text_tokenizer): | |
| """Worker function to process a single item.""" | |
| text_path = os.path.join(root, sub_root, audio_folder, item[0].replace(audio_ext, text_ext)) | |
| if not os.path.exists(text_path): | |
| return {"missing_text": text_path, "success": False, "cur_phn_set": set()} | |
| with open(text_path, "r") as f: | |
| text = [line.strip() for line in f.readlines()] | |
| text = " ".join(text) | |
| phn_path = os.path.join(root, sub_root, phn_folder, item[0].replace(audio_ext, phn_ext)) | |
| cur_phn_set = phonemize_and_save(text, phn_path, text_tokenizer) | |
| return {"missing_text": None, "success": True, "cur_phn_set": cur_phn_set} | |
| def process_item_star(args): | |
| """Unpacks arguments for `process_item` to work with `imap`.""" | |
| return process_item(*args) | |
| def main( | |
| root="/data/scratch/pyp/datasets/emilia", | |
| sub_root="preprocessed", | |
| manifest_folder="manifest_for_codec", | |
| audio_folder="audio", | |
| phn_folder="phoneme", | |
| audio_ext=".mp3", | |
| text_ext=".txt", | |
| phn_ext=".txt", | |
| num_workers=8, | |
| ): | |
| """Main function to process phoneme generation in parallel.""" | |
| # # Initialize the tokenizer | |
| text_tokenizer = TextTokenizer() | |
| all_fns = glob.glob(f"{root}/{sub_root}/{manifest_folder}/*.txt") | |
| print(f"found {len(all_fns)} manifest files") | |
| print(f"{all_fns[:3]=}") | |
| data = [] | |
| for fn in all_fns: | |
| with open(fn, "r") as f: | |
| data += [line.strip().split("\t") for line in f] | |
| vocab = set() | |
| ################## parallel processing ################## | |
| ################## parallel processing ################## | |
| ################## parallel processing ################## | |
| # Prepare arguments for the worker function | |
| # tasks = [ | |
| # ( | |
| # item, | |
| # root, | |
| # sub_root, | |
| # audio_folder, | |
| # phn_folder, | |
| # audio_ext, | |
| # text_ext, | |
| # phn_ext, | |
| # text_tokenizer, | |
| # ) | |
| # for item in data | |
| # ] | |
| # # Parallel processing with progress monitoring | |
| # results = [] | |
| # with Pool(num_workers) as pool: | |
| # for result in tqdm.tqdm( | |
| # pool.imap_unordered(process_item_star, tasks), | |
| # total=len(tasks), | |
| # desc="Processing items", | |
| # ): | |
| # results.append(result) | |
| # # read all manifest endswith .txt | |
| # missing_text = [result["missing_text"] for result in results if not result["success"]] | |
| # for result in results: | |
| # if result['success']: | |
| # vocab.update(result['cur_phn_set']) | |
| ################## parallel processing ################## | |
| ################## parallel processing ################## | |
| ################## parallel processing ################## | |
| ################## sequential processing ################## | |
| ################## sequential processing ################## | |
| ################## sequential processing ################## | |
| missing_text = [] | |
| for item in tqdm.tqdm(data): | |
| text_path = os.path.join(root, sub_root, audio_folder, item[0].replace(audio_ext, text_ext)) | |
| if not os.path.exists(text_path): | |
| missing_text.append(text_path) | |
| continue | |
| try: | |
| with open(text_path, "r") as f: | |
| text = [line.strip() for line in f.readlines()] | |
| text = " ".join(text) | |
| except: | |
| print(f"Error reading {text_path}") | |
| continue | |
| cur_phn_set = phonemize_and_save(text, os.path.join(root, sub_root, phn_folder, item[0].replace(audio_ext, phn_ext)), text_tokenizer) | |
| vocab.update(cur_phn_set) | |
| ################## sequential processing ################## | |
| ################## sequential processing ################## | |
| ################## sequential processing ################## | |
| # save the vocab | |
| vocab = list(vocab) | |
| # sort the vocab | |
| vocab.sort() | |
| with open(os.path.join(root, sub_root, "vocab.txt"), "w") as f: | |
| f.write("\n".join(vocab)) | |
| # Collect missing text paths | |
| print(f"Missing text files: {len(missing_text)}") | |
| if missing_text: | |
| print("Some missing files:", missing_text[:10]) # Print the first 10 missing files as an example | |
| if __name__ == "__main__": | |
| fire.Fire(main) | |