File size: 27,928 Bytes
acbd624 674ae41 ad8a59c ba69f38 acbd624 666c2bc ad8a59c ba69f38 ad8a59c ba69f38 ad8a59c ba69f38 ad8a59c ba69f38 ad8a59c ba69f38 ad8a59c 59ce98d acbd624 ba69f38 666c2bc acbd624 666c2bc 59ce98d 674ae41 ad8a59c 674ae41 ad8a59c 674ae41 ad8a59c 674ae41 ad8a59c 674ae41 ad8a59c 674ae41 ad8a59c 674ae41 acbd624 674ae41 666c2bc acbd624 59ce98d 674ae41 ad8a59c 674ae41 ad8a59c 674ae41 ad8a59c 674ae41 ad8a59c 674ae41 49d1318 674ae41 59ce98d acbd624 674ae41 acbd624 666c2bc 59ce98d acbd624 ba69f38 acbd624 674ae41 ba69f38 666c2bc acbd624 674ae41 acbd624 674ae41 ba69f38 a37cabf 674ae41 ba69f38 a37cabf 674ae41 acbd624 a37cabf acbd624 a37cabf acbd624 674ae41 ba69f38 acbd624 a37cabf acbd624 ba69f38 674ae41 acbd624 674ae41 a37cabf 674ae41 ba69f38 acbd624 a37cabf acbd624 a37cabf acbd624 674ae41 acbd624 ad8a59c 666c2bc acbd624 666c2bc acbd624 666c2bc ad8a59c acbd624 666c2bc ad8a59c acbd624 666c2bc acbd624 666c2bc ad8a59c acbd624 674ae41 acbd624 674ae41 acbd624 666c2bc acbd624 674ae41 acbd624 ad8a59c 666c2bc ba69f38 acbd624 666c2bc acbd624 24e5bd0 acbd624 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 |
import gradio as gr
import os
import tempfile
import soundfile as sf
import numpy as np
import re
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import gc
from huggingface_hub import hf_hub_download
import json
import onnxruntime as ort
import warnings
# Suppress warnings
warnings.filterwarnings("ignore")
# Fix for OpenMP duplicate library error
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
# Force CPU usage for ONNX Runtime to avoid GPU issues
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
class DirectKittenTTS:
"""Direct implementation of KittenTTS using ONNX Runtime"""
def __init__(self, model_path, voices_path):
"""Initialize with direct paths to model and voices files"""
self.session = ort.InferenceSession(model_path, providers=['CPUExecutionProvider'])
self.voices_data = np.load(voices_path)
self.voice_list = list(self.voices_data.keys())
print(f"Loaded model with voices: {self.voice_list}")
def text_to_phonemes(self, text):
"""Convert text to phonemes with multiple fallback strategies"""
try:
# Try to use g2p_en for English phonemization
try:
from g2p_en import G2p
g2p = G2p()
phonemes = g2p(text)
# Convert to string of phonemes separated by spaces
phonemes = ' '.join(phonemes)
return phonemes
except ImportError:
print("g2p_en not available, trying phonemizer")
# Try to use phonemizer with espeak backend
try:
from phonemizer import phonemize
phonemes = phonemize(text, backend='espeak', language='en-us')
return phonemes
except ImportError:
print("phonemizer not available, using basic cleaning")
except Exception as e:
print(f"phonemizer failed: {e}")
# Fallback to basic cleaning
text = text.lower()
text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\'\"]', '', text)
return text
except Exception as e:
print(f"Error in phoneme conversion: {e}")
# Last resort: return cleaned text
text = text.lower()
text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\'\"]', '', text)
return text
def generate(self, text, voice='expr-voice-2-m', speed=1.0):
"""Generate audio from text with improved text processing"""
try:
# Get voice embedding
if voice not in self.voices_data:
print(f"Voice {voice} not found, using first available voice")
voice = self.voice_list[0]
voice_embedding = self.voices_data[voice]
# Convert text to phonemes
phonemes = self.text_to_phonemes(text)
# Prepare input for ONNX model
max_length = 512
# Try to use a proper tokenizer if available
try:
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
text_encoded = tokenizer.encode(phonemes, truncation=True, max_length=max_length)
# Add padding if needed
text_encoded = text_encoded + [0] * (max_length - len(text_encoded))
except:
# Fallback to character-level encoding
text_encoded = [ord(c) for c in phonemes[:max_length]]
text_encoded = text_encoded + [0] * (max_length - len(text_encoded))
text_input = np.array([text_encoded], dtype=np.int64)
# Get input names from the model
input_names = [inp.name for inp in self.session.get_inputs()]
# Prepare inputs dict
inputs = {}
for name in input_names:
if 'text' in name.lower() or 'input' in name.lower():
inputs[name] = text_input
elif 'voice' in name.lower() or 'speaker' in name.lower():
inputs[name] = voice_embedding.reshape(1, -1)
elif 'speed' in name.lower():
inputs[name] = np.array([[speed]], dtype=np.float32)
# Reset model state between generations
try:
dummy_inputs = {name: np.zeros_like(inputs[name]) for name in inputs}
self.session.run(None, dummy_inputs)
except:
pass
# Run inference
outputs = self.session.run(None, inputs)
# Get audio output (usually the first output)
audio = outputs[0]
# Ensure audio is 1D
if audio.ndim > 1:
audio = audio.squeeze()
# Apply speed adjustment if not handled by model
if speed != 1.0:
# Simple speed adjustment by resampling
original_length = len(audio)
new_length = int(original_length / speed)
indices = np.linspace(0, original_length - 1, new_length)
audio = np.interp(indices, np.arange(original_length), audio)
return audio
except Exception as e:
print(f"Error in generate: {e}")
# Return a simple sine wave as fallback
duration = 1.0
sample_rate = 24000
t = np.linspace(0, duration, int(sample_rate * duration))
audio = np.sin(2 * np.pi * 440 * t) * 0.3
return audio
class KittenTTSGradio:
def __init__(self):
"""Initialize the KittenTTS model and settings"""
self.model = None
self.available_voices = [
'expr-voice-2-m', 'expr-voice-2-f', 'expr-voice-3-m', 'expr-voice-3-f',
'expr-voice-4-m', 'expr-voice-4-f', 'expr-voice-5-m', 'expr-voice-5-f'
]
# Limit workers to avoid conflicts
self.max_workers = min(4, max(1, os.cpu_count() - 1)) if os.cpu_count() else 2
self.model_loaded = False
def ensure_model_loaded(self):
"""Ensure model is loaded before use"""
if not self.model_loaded:
self.load_model()
def download_and_load_model(self, repo_id):
"""Download model files and load them directly"""
try:
print(f"Downloading model files from {repo_id}...")
# Download config file
config_path = hf_hub_download(repo_id=repo_id, filename="config.json")
# Read config to get file names
with open(config_path, 'r') as f:
config = json.load(f)
# Get model filename from config or use defaults
model_filename = config.get("model_file")
if not model_filename:
# Try to guess based on repo name
if "mini" in repo_id:
model_filename = "kitten_tts_mini_v0_1.onnx"
elif "nano" in repo_id and "0.2" in repo_id:
model_filename = "kitten_tts_nano_v0_2.onnx"
else:
model_filename = "kitten_tts_nano_v0_1.onnx"
# Download model file
print(f"Downloading model file: {model_filename}")
model_path = hf_hub_download(repo_id=repo_id, filename=model_filename)
# Download voices file
voices_filename = config.get("voices", "voices.npz")
print(f"Downloading voices file: {voices_filename}")
voices_path = hf_hub_download(repo_id=repo_id, filename=voices_filename)
print(f"Files downloaded: {model_path}, {voices_path}")
# Create our direct ONNX model
self.model = DirectKittenTTS(model_path, voices_path)
# Update available voices based on what's actually in the file
if hasattr(self.model, 'voice_list'):
self.available_voices = self.model.voice_list
return True
except Exception as e:
print(f"Failed to download and load {repo_id}: {e}")
return False
def load_model(self):
"""Load the TTS model with multiple fallback options"""
if self.model_loaded:
return
try:
print("Loading KittenTTS model...")
# First, try to import and use KittenTTS if available
try:
from kittentts import KittenTTS
# Try loading with the library first
for repo_id in ["KittenML/kitten-tts-mini-0.1", "KittenML/kitten-tts-nano-0.2"]:
try:
print(f"Trying to load {repo_id} with KittenTTS library...")
self.model = KittenTTS(repo_id)
self.model_loaded = True
print(f"Successfully loaded {repo_id} with KittenTTS!")
return
except:
continue
except ImportError:
print("KittenTTS library not available, using direct ONNX loading")
# If library loading failed, use our direct implementation
strategies = [
("KittenML/kitten-tts-mini-0.1", "mini"),
("KittenML/kitten-tts-nano-0.2", "nano v0.2"),
("KittenML/kitten-tts-nano-0.1", "nano v0.1"),
]
for repo_id, name in strategies:
print(f"Trying to load {name} model directly...")
if self.download_and_load_model(repo_id):
self.model_loaded = True
print(f"Successfully loaded {name} model!")
return
# If all strategies failed
raise Exception("Failed to load any KittenTTS model")
except Exception as e:
print(f"Error loading model: {e}")
self.model_loaded = False
raise e
def split_into_sentences(self, text):
"""Split text into sentences"""
text = re.sub(r'\s+', ' ', text)
text = text.strip()
sentences = re.split(r'(?<=[.!?])\s+', text)
processed_sentences = []
for sentence in sentences:
sentence = sentence.strip()
if sentence:
if not sentence.endswith(('.', '!', '?')):
sentence += '.'
processed_sentences.append(sentence)
return processed_sentences
def group_sentences_into_chunks(self, sentences, chunk_size):
"""Group sentences into chunks of specified size"""
if chunk_size <= 0:
chunk_size = 1
chunks = []
for i in range(0, len(sentences), chunk_size):
chunk = ' '.join(sentences[i:i + chunk_size])
chunks.append(chunk)
return chunks
def clean_text_for_model(self, text):
"""Clean text for the TTS model"""
if not text:
return "Hello."
text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\'\"]', '', text)
text = re.sub(r'\s+', ' ', text)
text = text.strip()
if len(text) < 5:
text = "Hello."
return text
def safe_generate_audio(self, text, voice, speed):
"""Generate audio with fallback strategies"""
self.ensure_model_loaded()
if not self.model:
raise Exception("Model not loaded")
# Try original text
try:
audio = self.model.generate(text, voice=voice, speed=speed)
return audio
except Exception as e:
print(f"Original attempt failed: {e}")
# Try cleaned text
try:
cleaned_text = self.clean_text_for_model(text)
audio = self.model.generate(cleaned_text, voice=voice, speed=speed)
return audio
except Exception as e:
print(f"Cleaned attempt failed: {e}")
# Try basic fallback
try:
words = text.split()[:5]
basic_text = ' '.join(words)
if not basic_text.endswith(('.', '!', '?')):
basic_text += '.'
audio = self.model.generate(basic_text or "Hello.", voice=voice, speed=speed)
return audio
except Exception as e:
print(f"Basic attempt failed: {e}")
raise Exception("All audio generation attempts failed")
def process_single_sentence(self, sentence, voice, speed):
"""Process a single sentence with better error handling"""
try:
# Clean the sentence
cleaned_sentence = self.clean_text_for_model(sentence)
# Add a small delay between processing to avoid potential state issues
time.sleep(0.1)
# Generate audio
audio = self.safe_generate_audio(cleaned_sentence, voice=voice, speed=speed)
# Explicit garbage collection
gc.collect()
return audio
except Exception as e:
print(f"Error processing sentence: '{sentence[:30]}...': {e}")
# Return a short silence as fallback
sample_rate = 24000
silence_duration = 0.5 # seconds
silence = np.zeros(int(sample_rate * silence_duration))
return silence
def convert_text_to_speech(self, text, voice, speed, chunk_size, use_multithreading, progress=gr.Progress()):
"""Main conversion function for Gradio with model state reset"""
try:
self.ensure_model_loaded()
except Exception as e:
raise gr.Error(f"Failed to load model: {str(e)}")
if not text or not text.strip():
raise gr.Error("Please enter some text to convert.")
try:
sentences = self.split_into_sentences(text)
if not sentences:
raise gr.Error("No valid sentences found in the text.")
chunks = self.group_sentences_into_chunks(sentences, chunk_size)
total_chunks = len(chunks)
total_sentences = len(sentences)
chunk_label = "chunk" if chunk_size == 1 else f"chunk ({chunk_size} sentences each)"
progress(0, desc=f"Processing {total_sentences} sentences in {total_chunks} {chunk_label}s...")
# Reset model state before starting
if hasattr(self.model, 'session'):
try:
input_names = [inp.name for inp in self.model.session.get_inputs()]
dummy_inputs = {}
for name in input_names:
if 'text' in name.lower() or 'input' in name.lower():
dummy_inputs[name] = np.zeros((1, 512), dtype=np.int64)
else:
dummy_inputs[name] = np.zeros((1, 256), dtype=np.float32)
self.model.session.run(None, dummy_inputs)
except:
pass
# Create a list to hold results in the correct order
audio_chunks = [None] * total_chunks
if use_multithreading and total_chunks > 1:
# Process chunks in parallel with limited workers
with ThreadPoolExecutor(max_workers=min(self.max_workers, 4)) as executor:
# Submit all tasks
future_to_index = {
executor.submit(self.process_single_sentence, chunk, voice, speed): i
for i, chunk in enumerate(chunks)
}
completed = 0
# Process as they complete
for future in as_completed(future_to_index):
index = future_to_index[future]
try:
audio = future.result()
audio_chunks[index] = audio # Place at the correct index
completed += 1
progress(completed / total_chunks,
desc=f"Processed {completed}/{total_chunks} {chunk_label}s")
# Reset model state after each chunk
if hasattr(self.model, 'session'):
try:
input_names = [inp.name for inp in self.model.session.get_inputs()]
dummy_inputs = {}
for name in input_names:
if 'text' in name.lower() or 'input' in name.lower():
dummy_inputs[name] = np.zeros((1, 512), dtype=np.int64)
else:
dummy_inputs[name] = np.zeros((1, 256), dtype=np.float32)
self.model.session.run(None, dummy_inputs)
except:
pass
except Exception as e:
print(f"Error processing chunk at index {index}: {e}")
# Generate silence for failed chunks
sample_rate = 24000
silence_duration = 0.5
silence = np.zeros(int(sample_rate * silence_duration))
audio_chunks[index] = silence
completed += 1
progress(completed / total_chunks,
desc=f"Processed {completed}/{total_chunks} {chunk_label}s")
else:
# Process chunks sequentially
for i, chunk in enumerate(chunks):
try:
audio = self.process_single_sentence(chunk, voice, speed)
audio_chunks[i] = audio
progress((i + 1) / total_chunks,
desc=f"Processed {i + 1}/{total_chunks} {chunk_label}s")
# Reset model state after each chunk
if hasattr(self.model, 'session'):
try:
input_names = [inp.name for inp in self.model.session.get_inputs()]
dummy_inputs = {}
for name in input_names:
if 'text' in name.lower() or 'input' in name.lower():
dummy_inputs[name] = np.zeros((1, 512), dtype=np.int64)
else:
dummy_inputs[name] = np.zeros((1, 256), dtype=np.float32)
self.model.session.run(None, dummy_inputs)
except:
pass
except Exception as e:
print(f"Error processing chunk at index {i}: {e}")
# Generate silence for failed chunks
sample_rate = 24000
silence_duration = 0.5
silence = np.zeros(int(sample_rate * silence_duration))
audio_chunks[i] = silence
progress((i + 1) / total_chunks,
desc=f"Processed {i + 1}/{total_chunks} {chunk_label}s")
# Check if we have any None values (shouldn't happen with the error handling)
if any(chunk is None for chunk in audio_chunks):
print("Warning: Some audio chunks were not generated properly")
# Replace any None values with silence
for i, chunk in enumerate(audio_chunks):
if chunk is None:
sample_rate = 24000
silence_duration = 0.5
silence = np.zeros(int(sample_rate * silence_duration))
audio_chunks[i] = silence
progress(0.9, desc="Concatenating audio...")
if len(audio_chunks) == 1:
final_audio = audio_chunks[0]
else:
final_audio = np.concatenate(audio_chunks)
output_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
sf.write(output_file.name, final_audio, 24000)
output_file.close()
progress(1.0, desc="Complete!")
gc.collect()
processing_method = "multithreading" if use_multithreading else "sequential"
chunk_description = f"{chunk_size} sentence(s) per chunk" if chunk_size > 1 else "sentence-by-sentence"
status_message = f"β
Successfully converted {total_sentences} sentences ({total_chunks} chunks) using {processing_method} processing with {chunk_description}!"
return output_file.name, status_message
except Exception as e:
raise gr.Error(f"Conversion failed: {str(e)}")
# Initialize the app
print("Initializing KittenTTS app...")
app = KittenTTSGradio()
print("App initialized, model will load on first use")
# Create Gradio interface
def create_interface():
with gr.Blocks(title="KittenTTS - Text to Speech") as demo:
gr.Markdown("""
# ποΈ KittenTTS Text-to-Speech Converter
Convert text to natural-sounding speech using KittenTTS - a lightweight TTS model that runs on CPU.
**Note:** First conversion will download and load the model (~170MB for mini, ~25MB for nano).
If you encounter issues, please try refreshing the page.
""")
with gr.Row():
with gr.Column(scale=2):
text_input = gr.Textbox(
label="Text to Convert",
placeholder="Enter your text here or upload a file...",
lines=10,
max_lines=20,
value=""
)
with gr.Row():
file_upload = gr.File(
label="Or Upload Text File",
file_types=[".txt"],
type="filepath"
)
clear_btn = gr.Button("Clear Text", size="sm")
def load_file(file_path):
if file_path:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if len(content) > 50000:
display_text = content[:50000] + "\n\n... (truncated for display)"
else:
display_text = content
return display_text
except Exception as e:
return f"Error loading file: {str(e)}"
return ""
def clear_text():
return ""
file_upload.change(fn=load_file, inputs=[file_upload], outputs=[text_input])
clear_btn.click(fn=clear_text, inputs=[], outputs=[text_input])
with gr.Column(scale=1):
voice_dropdown = gr.Dropdown(
choices=app.available_voices,
value=app.available_voices[0],
label="Voice Selection",
info="Choose the voice for speech synthesis"
)
speed_slider = gr.Slider(
minimum=0.5,
maximum=2.0,
value=1.0,
step=0.1,
label="Speech Speed",
info="Adjust the speed of speech (1.0 = normal)"
)
chunk_size_slider = gr.Slider(
minimum=1,
maximum=10,
value=1,
step=1,
label="Sentences per Chunk",
info="Group sentences together (1 = best quality, higher = faster processing)"
)
multithread_checkbox = gr.Checkbox(
value=True,
label=f"Enable Multithreading ({app.max_workers} workers)",
info="Process multiple chunks in parallel"
)
convert_btn = gr.Button(
"π€ Convert to Speech",
variant="primary",
size="lg"
)
with gr.Row():
with gr.Column():
audio_output = gr.Audio(
label="Generated Audio",
type="filepath",
autoplay=False
)
status_output = gr.Markdown(
value="Ready to convert text to speech."
)
gr.Examples(
examples=[
["Hello! This is a test of the KittenTTS system. It can convert text to natural sounding speech."],
["The quick brown fox jumps over the lazy dog. This sentence contains every letter of the alphabet."],
["Welcome to our presentation. Today we'll discuss artificial intelligence. Let's begin with the basics."]
],
inputs=text_input,
label="Example Texts"
)
convert_btn.click(
fn=app.convert_text_to_speech,
inputs=[text_input, voice_dropdown, speed_slider, chunk_size_slider, multithread_checkbox],
outputs=[audio_output, status_output]
)
gr.Markdown("""
---
### βοΈ Chunk Size Guide:
- **1 sentence**: Best quality, natural pauses (recommended for short texts)
- **2-3 sentences**: Good balance of speed and quality
- **5+ sentences**: Faster processing for long texts (may sound more continuous)
### π Available Voices:
- **expr-voice-2-m/f**: Expressive male/female voices
- **expr-voice-3-m/f**: Natural male/female voices
- **expr-voice-4-m/f**: Clear male/female voices
- **expr-voice-5-m/f**: Warm male/female voices
### π Notes:
- For best quality with longer texts, use chunk size 1
- The model uses phoneme conversion for more natural speech
- First use will download the model (may take a moment)
""")
return demo
# Create and launch the interface
print("Creating Gradio interface...")
demo = create_interface()
print("Launching app...")
if __name__ == "__main__":
demo.queue(max_size=5)
demo.launch(
share=False,
show_error=True,
server_name="0.0.0.0",
server_port=7860
) |