|
|
import gradio as gr |
|
|
import os |
|
|
import tempfile |
|
|
import subprocess |
|
|
import time |
|
|
from pathlib import Path |
|
|
import zipfile |
|
|
import io |
|
|
import sys |
|
|
|
|
|
|
|
|
DEMO_MODE = True |
|
|
|
|
|
|
|
|
ALLOWED_MODELS = ["tiny.en", "base.en", "small.en", "medium.en", "medium", "large-v2", "large-v3"] |
|
|
|
|
|
|
|
|
|
|
|
def check_file_size(file_path): |
|
|
"""Check if file exists and get size info""" |
|
|
if not file_path: |
|
|
return False, "No file provided" |
|
|
|
|
|
try: |
|
|
file_size = os.path.getsize(file_path) / (1024*1024) |
|
|
return True, f"File size: {file_size:.1f}MB" |
|
|
except Exception as e: |
|
|
return False, f"Error checking file: {str(e)}" |
|
|
|
|
|
def run_diarization(audio_file, model, language, enable_stemming, suppress_numerals, batch_size, processing_mode, num_speakers): |
|
|
"""Main diarization function""" |
|
|
|
|
|
if not audio_file: |
|
|
return None, None, "β Please upload an audio file." |
|
|
|
|
|
size_ok, size_msg = check_file_size(audio_file) |
|
|
if not size_ok: |
|
|
return None, None, f"β {size_msg}" |
|
|
|
|
|
|
|
|
print(f"Processing file: {size_msg}") |
|
|
|
|
|
try: |
|
|
|
|
|
cmd = [sys.executable, "diarize1.py", "-a", audio_file] |
|
|
cmd.extend(["--whisper-model", model]) |
|
|
cmd.extend(["--device", "cpu"]) |
|
|
cmd.extend(["--batch-size", str(batch_size)]) |
|
|
|
|
|
if language and language != "auto": |
|
|
cmd.extend(["--language", language]) |
|
|
|
|
|
if not enable_stemming: |
|
|
cmd.append("--no-stem") |
|
|
|
|
|
if suppress_numerals: |
|
|
cmd.append("--suppress_numerals") |
|
|
|
|
|
|
|
|
if processing_mode == "Speaker Separation": |
|
|
cmd.extend(["--num-speakers", str(num_speakers)]) |
|
|
|
|
|
|
|
|
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) |
|
|
stdout, stderr = process.communicate(timeout=1800) |
|
|
|
|
|
if process.returncode == 0: |
|
|
|
|
|
audio_dir = os.path.dirname(audio_file) |
|
|
audio_name = os.path.splitext(os.path.basename(audio_file))[0] |
|
|
|
|
|
output_files = [] |
|
|
transcript_content = "" |
|
|
|
|
|
import glob |
|
|
for ext in ['.txt', '.srt']: |
|
|
pattern = f"{audio_name}*{ext}" |
|
|
matches = glob.glob(os.path.join(audio_dir, pattern)) |
|
|
output_files.extend(matches) |
|
|
|
|
|
if output_files: |
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
|
zip_path = os.path.join(temp_dir, "results.zip") |
|
|
with zipfile.ZipFile(zip_path, 'w') as zip_file: |
|
|
for file_path in output_files: |
|
|
zip_file.write(file_path, os.path.basename(file_path)) |
|
|
if file_path.endswith('.txt'): |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
transcript_content = f.read() |
|
|
|
|
|
|
|
|
speaker_1_text, speaker_2_text = parse_speakers(transcript_content) |
|
|
return zip_path, speaker_1_text, speaker_2_text, f"β
Processing complete! Generated {len(output_files)} files." |
|
|
else: |
|
|
return None, "", "", "β No output files generated." |
|
|
else: |
|
|
return None, "", "", f"β Processing failed: {stderr}" |
|
|
|
|
|
except Exception as e: |
|
|
return None, "", "", f"β Error: {str(e)}" |
|
|
|
|
|
def parse_speakers(transcript_content): |
|
|
"""Parse transcript content and separate by speakers""" |
|
|
if not transcript_content: |
|
|
return "", "" |
|
|
|
|
|
lines = transcript_content.split('\n') |
|
|
speaker_1_lines = [] |
|
|
speaker_2_lines = [] |
|
|
|
|
|
for line in lines: |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
|
|
|
|
|
|
if line.startswith('SPEAKER_00') or line.startswith('Speaker 0') or line.startswith('[SPEAKER_00]'): |
|
|
speaker_1_lines.append(line) |
|
|
elif line.startswith('SPEAKER_01') or line.startswith('Speaker 1') or line.startswith('[SPEAKER_01]'): |
|
|
speaker_2_lines.append(line) |
|
|
else: |
|
|
|
|
|
if 'speaker' in line.lower(): |
|
|
if '0' in line or 'one' in line.lower() or 'first' in line.lower(): |
|
|
speaker_1_lines.append(line) |
|
|
elif '1' in line or 'two' in line.lower() or 'second' in line.lower(): |
|
|
speaker_2_lines.append(line) |
|
|
else: |
|
|
|
|
|
speaker_1_lines.append(line) |
|
|
else: |
|
|
|
|
|
if len(speaker_1_lines) <= len(speaker_2_lines): |
|
|
speaker_1_lines.append(line) |
|
|
else: |
|
|
speaker_2_lines.append(line) |
|
|
|
|
|
speaker_1_text = '\n'.join(speaker_1_lines) if speaker_1_lines else "No content detected for Speaker 1" |
|
|
speaker_2_text = '\n'.join(speaker_2_lines) if speaker_2_lines else "No content detected for Speaker 2" |
|
|
|
|
|
return speaker_1_text, speaker_2_text |
|
|
|
|
|
def update_speaker_visibility(mode): |
|
|
"""Show/hide speaker count based on processing mode""" |
|
|
if mode == "Speaker Separation": |
|
|
return gr.update(visible=True) |
|
|
else: |
|
|
return gr.update(visible=False) |
|
|
|
|
|
|
|
|
with gr.Blocks(title="π€ Whisper Speaker Diarization Demo") as demo: |
|
|
|
|
|
gr.HTML(""" |
|
|
<div style="background: linear-gradient(90deg, #667eea 0%, #764ba2 100%); color: white; padding: 20px; border-radius: 10px; margin-bottom: 20px; display: flex; justify-content: space-between; align-items: center;"> |
|
|
<div style="text-align: left;"> |
|
|
<h1>π€ Whisper Speaker Diarization Demo</h1> |
|
|
<p>AI-powered speaker identification and transcription</p> |
|
|
</div> |
|
|
<div style="text-align: right;"> |
|
|
<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/8/8e/Telecom_Paris_Logo.svg/512px-Telecom_Paris_Logo.svg.png" alt="Telecom Paris" style="height: 80px; width: auto; filter: brightness(0) invert(1);"> |
|
|
</div> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
gr.HTML(""" |
|
|
<div style="background-color: #fff3cd; border: 1px solid #ffeaa7; border-radius: 5px; padding: 15px; margin: 10px 0;"> |
|
|
<h3>π¬ Processing Information</h3> |
|
|
<ul> |
|
|
<li>π» CPU processing only (slower than GPU)</li> |
|
|
<li>π¦ No file size limits - process any audio length</li> |
|
|
<li>π For multi-language audio, use larger models (medium, large-v2, large-v3)</li> |
|
|
<li>β‘ Larger models provide better accuracy but take longer to process</li> |
|
|
<li>β οΈ Very large files may take significant time and memory</li> |
|
|
</ul> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
|
|
|
|
|
|
audio_input = gr.Audio(label="π Upload Audio File (No size limit)", type="filepath", sources=["upload"]) |
|
|
gr.Markdown("*Supported: MP3, WAV, M4A, FLAC, etc.*") |
|
|
|
|
|
with gr.Row(): |
|
|
model_input = gr.Dropdown(choices=ALLOWED_MODELS, value="base.en", label="π― Whisper Model") |
|
|
language_input = gr.Dropdown(choices=["auto", "en", "es", "fr", "de", "it"], value="auto", label="π Language") |
|
|
|
|
|
|
|
|
processing_mode = gr.Radio( |
|
|
choices=["Standard Diarization", "Speaker Separation"], |
|
|
value="Standard Diarization", |
|
|
label="π― Processing Mode" |
|
|
) |
|
|
gr.Markdown("*Standard: Traditional diarization | Separation: Pre-separate speakers*") |
|
|
|
|
|
num_speakers = gr.Radio( |
|
|
choices=[2, 3], |
|
|
value=2, |
|
|
label="π₯ Number of Speakers", |
|
|
visible=False, |
|
|
info="Select how many speakers are in your audio" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
stemming_input = gr.Checkbox(label="π΅ Audio Enhancement", value=False) |
|
|
numerals_input = gr.Checkbox(label="π’ Suppress Numerals", value=False) |
|
|
|
|
|
batch_size_input = gr.Slider(minimum=1, maximum=4, value=2, step=1, label="β‘ Batch Size") |
|
|
|
|
|
process_btn = gr.Button("ποΈ Start Demo Transcription", variant="primary", size="lg") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown(""" |
|
|
### π How to Use |
|
|
1. **Upload audio** (any size) |
|
|
2. **Choose processing mode** |
|
|
3. **Configure settings** (optional) |
|
|
4. **Click process** and wait |
|
|
5. **Download results** |
|
|
|
|
|
### π― Processing Modes |
|
|
- **Standard**: Traditional speaker diarization |
|
|
- **Speaker Separation**: Pre-separate speakers first |
|
|
|
|
|
### π Model Selection |
|
|
- **tiny.en/base.en/small.en**: Fast, English only |
|
|
- **medium.en**: Better accuracy, English only |
|
|
- **medium/large-v2/large-v3**: Best for multi-language audio |
|
|
|
|
|
### β οΈ Large File Warning |
|
|
- Large files will take longer to process |
|
|
- Monitor system resources during processing |
|
|
""") |
|
|
|
|
|
download_output = gr.File(label="π¦ Download Results", visible=False) |
|
|
|
|
|
|
|
|
with gr.Row(visible=False) as transcript_row: |
|
|
with gr.Column(): |
|
|
speaker1_output = gr.Textbox( |
|
|
label="π£οΈ Speaker 1 Transcript", |
|
|
lines=15, |
|
|
max_lines=20, |
|
|
show_copy_button=True, |
|
|
container=True, |
|
|
interactive=False |
|
|
) |
|
|
with gr.Column(): |
|
|
speaker2_output = gr.Textbox( |
|
|
label="π£οΈ Speaker 2 Transcript", |
|
|
lines=15, |
|
|
max_lines=20, |
|
|
show_copy_button=True, |
|
|
container=True, |
|
|
interactive=False |
|
|
) |
|
|
|
|
|
result_output = gr.Textbox(label="π Results", lines=5) |
|
|
|
|
|
|
|
|
processing_mode.change( |
|
|
fn=update_speaker_visibility, |
|
|
inputs=[processing_mode], |
|
|
outputs=[num_speakers] |
|
|
) |
|
|
|
|
|
def process_wrapper(*args): |
|
|
download_file, speaker1_text, speaker2_text, result_text = run_diarization(*args) |
|
|
has_transcripts = bool(speaker1_text or speaker2_text) |
|
|
return ( |
|
|
download_file, |
|
|
speaker1_text or "", |
|
|
speaker2_text or "", |
|
|
result_text or "", |
|
|
gr.update(visible=download_file is not None), |
|
|
gr.update(visible=has_transcripts) |
|
|
) |
|
|
|
|
|
process_btn.click( |
|
|
fn=process_wrapper, |
|
|
inputs=[audio_input, model_input, language_input, stemming_input, numerals_input, batch_size_input, processing_mode, num_speakers], |
|
|
outputs=[download_output, speaker1_output, speaker2_output, result_output, download_output, transcript_row] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |