Spaces:
Sleeping
Sleeping
| # Copyright 2023-2024 Xiaomi Corp. (authors: Zengwei Yao | |
| # Han Zhu, | |
| # Wei Kang) | |
| # | |
| # See ../../LICENSE for clarification regarding multiple authors | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import logging | |
| import re | |
| from abc import ABC, abstractmethod | |
| from functools import reduce | |
| from typing import Dict, List, Optional | |
| import jieba | |
| from pypinyin import Style, lazy_pinyin | |
| from pypinyin.contrib.tone_convert import to_finals_tone3, to_initials | |
| from zipvoice.tokenizer.normalizer import ChineseTextNormalizer, EnglishTextNormalizer | |
| try: | |
| from piper_phonemize import phonemize_espeak | |
| except Exception as ex: | |
| raise RuntimeError( | |
| f"{ex}\nPlease run\n" | |
| "pip install piper_phonemize -f \ | |
| https://k2-fsa.github.io/icefall/piper_phonemize.html" | |
| ) | |
| class Tokenizer(ABC): | |
| """Abstract base class for tokenizers, defining common interface.""" | |
| def texts_to_token_ids(self, texts: List[str]) -> List[List[int]]: | |
| """Convert list of texts to list of token id sequences.""" | |
| raise NotImplementedError | |
| def texts_to_tokens(self, texts: List[str]) -> List[List[str]]: | |
| """Convert list of texts to list of token sequences.""" | |
| raise NotImplementedError | |
| def tokens_to_token_ids(self, tokens: List[List[str]]) -> List[List[int]]: | |
| """Convert list of token sequences to list of token id sequences.""" | |
| raise NotImplementedError | |
| class SimpleTokenizer(Tokenizer): | |
| """The simplpest tokenizer, treat every character as a token, | |
| without text normalization. | |
| """ | |
| def __init__(self, token_file: Optional[str] = None): | |
| """ | |
| Args: | |
| tokens: the file that contains information that maps tokens to ids, | |
| which is a text file with '{token}\t{token_id}' per line. | |
| """ | |
| # Parse token file | |
| self.has_tokens = False | |
| if token_file is None: | |
| logging.debug( | |
| "Initialize Tokenizer without tokens file, \ | |
| will fail when map to ids." | |
| ) | |
| return | |
| self.token2id: Dict[str, int] = {} | |
| with open(token_file, "r", encoding="utf-8") as f: | |
| for line in f.readlines(): | |
| info = line.rstrip().split("\t") | |
| token, id = info[0], int(info[1]) | |
| assert token not in self.token2id, token | |
| self.token2id[token] = id | |
| self.pad_id = self.token2id["_"] # padding | |
| self.vocab_size = len(self.token2id) | |
| self.has_tokens = True | |
| def texts_to_token_ids( | |
| self, | |
| texts: List[str], | |
| ) -> List[List[int]]: | |
| return self.tokens_to_token_ids(self.texts_to_tokens(texts)) | |
| def texts_to_tokens( | |
| self, | |
| texts: List[str], | |
| ) -> List[List[str]]: | |
| tokens_list = [list(texts[i]) for i in range(len(texts))] | |
| return tokens_list | |
| def tokens_to_token_ids( | |
| self, | |
| tokens_list: List[List[str]], | |
| ) -> List[List[int]]: | |
| assert self.has_tokens, "Please initialize Tokenizer with a tokens file." | |
| token_ids_list = [] | |
| for tokens in tokens_list: | |
| token_ids = [] | |
| for t in tokens: | |
| if t not in self.token2id: | |
| logging.debug(f"Skip OOV {t}") | |
| continue | |
| token_ids.append(self.token2id[t]) | |
| token_ids_list.append(token_ids) | |
| return token_ids_list | |
| class EspeakTokenizer(Tokenizer): | |
| """A simple tokenizer with Espeak g2p function.""" | |
| def __init__(self, token_file: Optional[str] = None, lang: str = "en-us"): | |
| """ | |
| Args: | |
| tokens: the file that contains information that maps tokens to ids, | |
| which is a text file with '{token}\t{token_id}' per line. | |
| lang: the language identifier, see | |
| https://github.com/rhasspy/espeak-ng/blob/master/docs/languages.md | |
| """ | |
| # Parse token file | |
| self.has_tokens = False | |
| if token_file is None: | |
| logging.debug( | |
| "Initialize Tokenizer without tokens file, \ | |
| will fail when map to ids." | |
| ) | |
| return | |
| self.token2id: Dict[str, int] = {} | |
| with open(token_file, "r", encoding="utf-8") as f: | |
| for line in f.readlines(): | |
| info = line.rstrip().split("\t") | |
| token, id = info[0], int(info[1]) | |
| assert token not in self.token2id, token | |
| self.token2id[token] = id | |
| self.pad_id = self.token2id["_"] # padding | |
| self.vocab_size = len(self.token2id) | |
| self.has_tokens = True | |
| self.lang = lang | |
| def g2p(self, text: str) -> List[str]: | |
| try: | |
| tokens = phonemize_espeak(text, self.lang) | |
| tokens = reduce(lambda x, y: x + y, tokens) | |
| return tokens | |
| except Exception as ex: | |
| logging.warning(f"Tokenization of {self.lang} texts failed: {ex}") | |
| return [] | |
| def texts_to_token_ids( | |
| self, | |
| texts: List[str], | |
| ) -> List[List[int]]: | |
| return self.tokens_to_token_ids(self.texts_to_tokens(texts)) | |
| def texts_to_tokens( | |
| self, | |
| texts: List[str], | |
| ) -> List[List[str]]: | |
| tokens_list = [self.g2p(texts[i]) for i in range(len(texts))] | |
| return tokens_list | |
| def tokens_to_token_ids( | |
| self, | |
| tokens_list: List[List[str]], | |
| ) -> List[List[int]]: | |
| assert self.has_tokens, "Please initialize Tokenizer with a tokens file." | |
| token_ids_list = [] | |
| for tokens in tokens_list: | |
| token_ids = [] | |
| for t in tokens: | |
| if t not in self.token2id: | |
| logging.debug(f"Skip OOV {t}") | |
| continue | |
| token_ids.append(self.token2id[t]) | |
| token_ids_list.append(token_ids) | |
| return token_ids_list | |
| class EmiliaTokenizer(Tokenizer): | |
| def __init__(self, token_file: Optional[str] = None, token_type="phone"): | |
| """ | |
| Args: | |
| tokens: the file that contains information that maps tokens to ids, | |
| which is a text file with '{token}\t{token_id}' per line. | |
| """ | |
| assert ( | |
| token_type == "phone" | |
| ), f"Only support phone tokenizer for Emilia, but get {token_type}." | |
| self.english_normalizer = EnglishTextNormalizer() | |
| self.chinese_normalizer = ChineseTextNormalizer() | |
| self.has_tokens = False | |
| if token_file is None: | |
| logging.debug( | |
| "Initialize Tokenizer without tokens file, \ | |
| will fail when map to ids." | |
| ) | |
| return | |
| self.token2id: Dict[str, int] = {} | |
| with open(token_file, "r", encoding="utf-8") as f: | |
| for line in f.readlines(): | |
| info = line.rstrip().split("\t") | |
| token, id = info[0], int(info[1]) | |
| assert token not in self.token2id, token | |
| self.token2id[token] = id | |
| self.pad_id = self.token2id["_"] # padding | |
| self.vocab_size = len(self.token2id) | |
| self.has_tokens = True | |
| def texts_to_token_ids( | |
| self, | |
| texts: List[str], | |
| ) -> List[List[int]]: | |
| return self.tokens_to_token_ids(self.texts_to_tokens(texts)) | |
| def preprocess_text( | |
| self, | |
| text: str, | |
| ) -> str: | |
| return self.map_punctuations(text) | |
| def texts_to_tokens( | |
| self, | |
| texts: List[str], | |
| ) -> List[List[str]]: | |
| for i in range(len(texts)): | |
| # Text normalization | |
| texts[i] = self.preprocess_text(texts[i]) | |
| phoneme_list = [] | |
| for text in texts: | |
| # now only en and ch | |
| segments = self.get_segment(text) | |
| all_phoneme = [] | |
| for index in range(len(segments)): | |
| seg = segments[index] | |
| if seg[1] == "zh": | |
| phoneme = self.tokenize_ZH(seg[0]) | |
| elif seg[1] == "en": | |
| phoneme = self.tokenize_EN(seg[0]) | |
| elif seg[1] == "pinyin": | |
| phoneme = self.tokenize_pinyin(seg[0]) | |
| elif seg[1] == "tag": | |
| phoneme = [seg[0]] | |
| else: | |
| logging.warning( | |
| f"No English or Chinese characters found, \ | |
| skipping segment of unknown language: {seg}" | |
| ) | |
| continue | |
| all_phoneme += phoneme | |
| phoneme_list.append(all_phoneme) | |
| return phoneme_list | |
| def tokens_to_token_ids( | |
| self, | |
| tokens_list: List[List[str]], | |
| ) -> List[List[int]]: | |
| assert self.has_tokens, "Please initialize Tokenizer with a tokens file." | |
| token_ids_list = [] | |
| for tokens in tokens_list: | |
| token_ids = [] | |
| for t in tokens: | |
| if t not in self.token2id: | |
| logging.debug(f"Skip OOV {t}") | |
| continue | |
| token_ids.append(self.token2id[t]) | |
| token_ids_list.append(token_ids) | |
| return token_ids_list | |
| def tokenize_ZH(self, text: str) -> List[str]: | |
| try: | |
| text = self.chinese_normalizer.normalize(text) | |
| segs = list(jieba.cut(text)) | |
| full = lazy_pinyin( | |
| segs, | |
| style=Style.TONE3, | |
| tone_sandhi=True, | |
| neutral_tone_with_five=True, | |
| ) | |
| phones = [] | |
| for x in full: | |
| # valid pinyin (in tone3 style) is alphabet + 1 number in [1-5]. | |
| if not (x[0:-1].isalpha() and x[-1] in ("1", "2", "3", "4", "5")): | |
| phones.append(x) | |
| continue | |
| else: | |
| phones.extend(self.seperate_pinyin(x)) | |
| return phones | |
| except Exception as ex: | |
| logging.warning(f"Tokenization of Chinese texts failed: {ex}") | |
| return [] | |
| def tokenize_EN(self, text: str) -> List[str]: | |
| try: | |
| text = self.english_normalizer.normalize(text) | |
| tokens = phonemize_espeak(text, "en-us") | |
| tokens = reduce(lambda x, y: x + y, tokens) | |
| return tokens | |
| except Exception as ex: | |
| logging.warning(f"Tokenization of English texts failed: {ex}") | |
| return [] | |
| def tokenize_pinyin(self, text: str) -> List[str]: | |
| try: | |
| assert text.startswith("<") and text.endswith(">") | |
| text = text.lstrip("<").rstrip(">") | |
| # valid pinyin (in tone3 style) is alphabet + 1 number in [1-5]. | |
| if not (text[0:-1].isalpha() and text[-1] in ("1", "2", "3", "4", "5")): | |
| logging.warning( | |
| f"Strings enclosed with <> should be pinyin, \ | |
| but got: {text}. Skipped it. " | |
| ) | |
| return [] | |
| else: | |
| return self.seperate_pinyin(text) | |
| except Exception as ex: | |
| logging.warning(f"Tokenize pinyin failed: {ex}") | |
| return [] | |
| def seperate_pinyin(self, text: str) -> List[str]: | |
| """ | |
| Separate pinyin into initial and final | |
| """ | |
| pinyins = [] | |
| initial = to_initials(text, strict=False) | |
| # don't want to share tokens with espeak tokens, | |
| # so use tone3 style | |
| final = to_finals_tone3( | |
| text, | |
| strict=False, | |
| neutral_tone_with_five=True, | |
| ) | |
| if initial != "": | |
| # don't want to share tokens with espeak tokens, | |
| # so add a '0' after each initial | |
| pinyins.append(initial + "0") | |
| if final != "": | |
| pinyins.append(final) | |
| return pinyins | |
| def map_punctuations(self, text): | |
| text = text.replace(",", ",") | |
| text = text.replace("。", ".") | |
| text = text.replace("!", "!") | |
| text = text.replace("?", "?") | |
| text = text.replace(";", ";") | |
| text = text.replace(":", ":") | |
| text = text.replace("、", ",") | |
| text = text.replace("‘", "'") | |
| text = text.replace("“", '"') | |
| text = text.replace("”", '"') | |
| text = text.replace("’", "'") | |
| text = text.replace("⋯", "…") | |
| text = text.replace("···", "…") | |
| text = text.replace("・・・", "…") | |
| text = text.replace("...", "…") | |
| return text | |
| def get_segment(self, text: str) -> List[str]: | |
| """ | |
| Split a text into segments based on language types | |
| (Chinese, English, Pinyin, tags, etc.) | |
| Args: | |
| text (str): Input text to be segmented | |
| Returns: | |
| List[str]: Segmented text parts with their language types | |
| Example: | |
| Input: 我们是小米人,是吗? Yes I think so!霍...啦啦啦 | |
| Output: [('我们是小米人,是吗? ', 'zh'), | |
| ('Yes I think so!', 'en'), ('霍...啦啦啦', 'zh')] | |
| """ | |
| # Stores the final segmented parts and their language types | |
| segments = [] | |
| # Stores the language type of each character in the input text | |
| types = [] | |
| temp_seg = "" | |
| temp_lang = "" | |
| # Each part is a character, or a special string enclosed in <> and [] | |
| # <> denotes pinyin string, [] denotes other special strings. | |
| _part_pattern = re.compile(r"[<[].*?[>\]]|.") | |
| text = _part_pattern.findall(text) | |
| for i, part in enumerate(text): | |
| if self.is_chinese(part) or self.is_pinyin(part): | |
| types.append("zh") | |
| elif self.is_alphabet(part): | |
| types.append("en") | |
| else: | |
| types.append("other") | |
| assert len(types) == len(text) | |
| for i in range(len(types)): | |
| # find the first char of the seg | |
| if i == 0: | |
| temp_seg += text[i] | |
| temp_lang = types[i] | |
| else: | |
| if temp_lang == "other": | |
| temp_seg += text[i] | |
| temp_lang = types[i] | |
| else: | |
| if types[i] in [temp_lang, "other"]: | |
| temp_seg += text[i] | |
| else: | |
| segments.append((temp_seg, temp_lang)) | |
| temp_seg = text[i] | |
| temp_lang = types[i] | |
| segments.append((temp_seg, temp_lang)) | |
| # Handle "pinyin" and "tag" types | |
| segments = self.split_segments(segments) | |
| return segments | |
| def split_segments(self, segments): | |
| """ | |
| split segments into smaller parts if special strings enclosed by [] or <> | |
| are found, where <> denotes pinyin strings, [] denotes other special strings. | |
| Args: | |
| segments (list): A list of tuples where each tuple contains: | |
| - temp_seg (str): The text segment to be split. | |
| - temp_lang (str): The language code associated with the segment. | |
| Returns: | |
| list: A list of smaller segments. | |
| """ | |
| result = [] | |
| for temp_seg, temp_lang in segments: | |
| parts = re.split(r"([<[].*?[>\]])", temp_seg) | |
| for part in parts: | |
| if not part: | |
| continue | |
| if self.is_pinyin(part): | |
| result.append((part, "pinyin")) | |
| elif self.is_tag(part): | |
| result.append((part, "tag")) | |
| else: | |
| result.append((part, temp_lang)) | |
| return result | |
| def is_chinese(self, char: str) -> bool: | |
| if char >= "\u4e00" and char <= "\u9fa5": | |
| return True | |
| else: | |
| return False | |
| def is_alphabet(self, char: str) -> bool: | |
| if (char >= "\u0041" and char <= "\u005a") or ( | |
| char >= "\u0061" and char <= "\u007a" | |
| ): | |
| return True | |
| else: | |
| return False | |
| def is_pinyin(self, part: str) -> bool: | |
| if part.startswith("<") and part.endswith(">"): | |
| return True | |
| else: | |
| return False | |
| def is_tag(self, part: str) -> bool: | |
| if part.startswith("[") and part.endswith("]"): | |
| return True | |
| else: | |
| return False | |
| class DialogTokenizer(EmiliaTokenizer): | |
| def __init__(self, token_file: Optional[str] = None, token_type="phone"): | |
| super().__init__(token_file=token_file, token_type=token_type) | |
| self.spk_a_id = self.token2id["[S1]"] | |
| self.spk_b_id = self.token2id["[S2]"] | |
| def preprocess_text( | |
| self, | |
| text: str, | |
| ) -> str: | |
| text = re.sub(r"\s*(\[S[12]\])\s*", r"\1", text) | |
| text = self.map_punctuations(text) | |
| return text | |
| class LibriTTSTokenizer(Tokenizer): | |
| def __init__(self, token_file: Optional[str] = None, token_type="char"): | |
| """ | |
| Args: | |
| type: the type of tokenizer, e.g., bpe, char, phone. | |
| tokens: the file that contains information that maps tokens to ids, | |
| which is a text file with '{token}\t{token_id}' per line if type is | |
| char or phone, otherwise it is a bpe_model file. | |
| """ | |
| self.type = token_type | |
| assert token_type in ["bpe", "char", "phone"] | |
| try: | |
| import tacotron_cleaner.cleaners | |
| except Exception as ex: | |
| raise RuntimeError(f"{ex}\nPlease run\n" "pip install espnet_tts_frontend") | |
| self.normalize = tacotron_cleaner.cleaners.custom_english_cleaners | |
| self.has_tokens = False | |
| if token_file is None: | |
| logging.debug( | |
| "Initialize Tokenizer without tokens file, \ | |
| will fail when map to ids." | |
| ) | |
| return | |
| if token_type == "bpe": | |
| import sentencepiece as spm | |
| self.sp = spm.SentencePieceProcessor() | |
| self.sp.load(token_file) | |
| self.pad_id = self.sp.piece_to_id("<pad>") | |
| self.vocab_size = self.sp.get_piece_size() | |
| else: | |
| self.token2id: Dict[str, int] = {} | |
| with open(token_file, "r", encoding="utf-8") as f: | |
| for line in f.readlines(): | |
| info = line.rstrip().split("\t") | |
| token, id = info[0], int(info[1]) | |
| assert token not in self.token2id, token | |
| self.token2id[token] = id | |
| self.pad_id = self.token2id["_"] # padding | |
| self.vocab_size = len(self.token2id) | |
| self.has_tokens = True | |
| def texts_to_token_ids( | |
| self, | |
| texts: List[str], | |
| ) -> List[List[int]]: | |
| if self.type == "bpe": | |
| for i in range(len(texts)): | |
| texts[i] = self.normalize(texts[i]) | |
| return self.sp.encode(texts) | |
| else: | |
| return self.tokens_to_token_ids(self.texts_to_tokens(texts)) | |
| def texts_to_tokens( | |
| self, | |
| texts: List[str], | |
| ) -> List[List[str]]: | |
| for i in range(len(texts)): | |
| texts[i] = self.normalize(texts[i]) | |
| if self.type == "char": | |
| tokens_list = [list(texts[i]) for i in range(len(texts))] | |
| elif self.type == "phone": | |
| tokens_list = [ | |
| phonemize_espeak(texts[i].lower(), "en-us") for i in range(len(texts)) | |
| ] | |
| elif self.type == "bpe": | |
| tokens_list = self.sp.encode(texts, out_type=str) | |
| return tokens_list | |
| def tokens_to_token_ids( | |
| self, | |
| tokens_list: List[List[str]], | |
| ) -> List[List[int]]: | |
| assert self.has_tokens, "Please initialize Tokenizer with a tokens file." | |
| assert self.type != "bpe", "BPE tokenizer does not support this function." | |
| token_ids_list = [] | |
| for tokens in tokens_list: | |
| token_ids = [] | |
| for t in tokens: | |
| if t not in self.token2id: | |
| logging.debug(f"Skip OOV {t}") | |
| continue | |
| token_ids.append(self.token2id[t]) | |
| token_ids_list.append(token_ids) | |
| return token_ids_list | |
| if __name__ == "__main__": | |
| text = ( | |
| "我们是5年小米人,是吗? Yes I think so! " | |
| "mr king, 5 years, from 2019 to 2024." | |
| "霍...啦啦啦超过90%的人<le5>...?!9204" | |
| ) | |
| tokenizer = EmiliaTokenizer() | |
| tokens = tokenizer.texts_to_tokens([text]) | |
| print(f"tokens: {'|'.join(tokens[0])}") | |