soheillotfi's picture
removed the size limit, added stronger models to the list
cb03940
import gradio as gr
import os
import tempfile
import subprocess
import time
from pathlib import Path
import zipfile
import io
import sys
# Demo configuration
DEMO_MODE = True
# File size limit removed - no restrictions
# For multi-language audio, use larger models (medium, large-v2, large-v3) for better accuracy
ALLOWED_MODELS = ["tiny.en", "base.en", "small.en", "medium.en", "medium", "large-v2", "large-v3"]
# Password authentication removed for security
def check_file_size(file_path):
"""Check if file exists and get size info"""
if not file_path:
return False, "No file provided"
try:
file_size = os.path.getsize(file_path) / (1024*1024)
return True, f"File size: {file_size:.1f}MB"
except Exception as e:
return False, f"Error checking file: {str(e)}"
def run_diarization(audio_file, model, language, enable_stemming, suppress_numerals, batch_size, processing_mode, num_speakers):
"""Main diarization function"""
if not audio_file:
return None, None, "❌ Please upload an audio file."
size_ok, size_msg = check_file_size(audio_file)
if not size_ok:
return None, None, f"❌ {size_msg}"
# Log file size for monitoring (no restriction)
print(f"Processing file: {size_msg}")
try:
# Prepare command
cmd = [sys.executable, "diarize1.py", "-a", audio_file]
cmd.extend(["--whisper-model", model])
cmd.extend(["--device", "cpu"])
cmd.extend(["--batch-size", str(batch_size)])
if language and language != "auto":
cmd.extend(["--language", language])
if not enable_stemming:
cmd.append("--no-stem")
if suppress_numerals:
cmd.append("--suppress_numerals")
# Add speaker separation options
if processing_mode == "Speaker Separation":
cmd.extend(["--num-speakers", str(num_speakers)])
# Run the process
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout, stderr = process.communicate(timeout=1800)
if process.returncode == 0:
# Look for output files
audio_dir = os.path.dirname(audio_file)
audio_name = os.path.splitext(os.path.basename(audio_file))[0]
output_files = []
transcript_content = ""
import glob
for ext in ['.txt', '.srt']:
pattern = f"{audio_name}*{ext}"
matches = glob.glob(os.path.join(audio_dir, pattern))
output_files.extend(matches)
if output_files:
# Create ZIP
temp_dir = tempfile.mkdtemp()
zip_path = os.path.join(temp_dir, "results.zip")
with zipfile.ZipFile(zip_path, 'w') as zip_file:
for file_path in output_files:
zip_file.write(file_path, os.path.basename(file_path))
if file_path.endswith('.txt'):
with open(file_path, 'r', encoding='utf-8') as f:
transcript_content = f.read()
# Parse transcript for speaker separation
speaker_1_text, speaker_2_text = parse_speakers(transcript_content)
return zip_path, speaker_1_text, speaker_2_text, f"βœ… Processing complete! Generated {len(output_files)} files."
else:
return None, "", "", "❌ No output files generated."
else:
return None, "", "", f"❌ Processing failed: {stderr}"
except Exception as e:
return None, "", "", f"❌ Error: {str(e)}"
def parse_speakers(transcript_content):
"""Parse transcript content and separate by speakers"""
if not transcript_content:
return "", ""
lines = transcript_content.split('\n')
speaker_1_lines = []
speaker_2_lines = []
for line in lines:
line = line.strip()
if not line:
continue
# Look for speaker labels (common formats)
if line.startswith('SPEAKER_00') or line.startswith('Speaker 0') or line.startswith('[SPEAKER_00]'):
speaker_1_lines.append(line)
elif line.startswith('SPEAKER_01') or line.startswith('Speaker 1') or line.startswith('[SPEAKER_01]'):
speaker_2_lines.append(line)
else:
# If no clear speaker label, try to detect from content
if 'speaker' in line.lower():
if '0' in line or 'one' in line.lower() or 'first' in line.lower():
speaker_1_lines.append(line)
elif '1' in line or 'two' in line.lower() or 'second' in line.lower():
speaker_2_lines.append(line)
else:
# Default to speaker 1 if unclear
speaker_1_lines.append(line)
else:
# If no speaker indication, add to both or alternate
if len(speaker_1_lines) <= len(speaker_2_lines):
speaker_1_lines.append(line)
else:
speaker_2_lines.append(line)
speaker_1_text = '\n'.join(speaker_1_lines) if speaker_1_lines else "No content detected for Speaker 1"
speaker_2_text = '\n'.join(speaker_2_lines) if speaker_2_lines else "No content detected for Speaker 2"
return speaker_1_text, speaker_2_text
def update_speaker_visibility(mode):
"""Show/hide speaker count based on processing mode"""
if mode == "Speaker Separation":
return gr.update(visible=True)
else:
return gr.update(visible=False)
# Create interface
with gr.Blocks(title="🎀 Whisper Speaker Diarization Demo") as demo:
gr.HTML("""
<div style="background: linear-gradient(90deg, #667eea 0%, #764ba2 100%); color: white; padding: 20px; border-radius: 10px; margin-bottom: 20px; display: flex; justify-content: space-between; align-items: center;">
<div style="text-align: left;">
<h1>🎀 Whisper Speaker Diarization Demo</h1>
<p>AI-powered speaker identification and transcription</p>
</div>
<div style="text-align: right;">
<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/8/8e/Telecom_Paris_Logo.svg/512px-Telecom_Paris_Logo.svg.png" alt="Telecom Paris" style="height: 80px; width: auto; filter: brightness(0) invert(1);">
</div>
</div>
""")
gr.HTML("""
<div style="background-color: #fff3cd; border: 1px solid #ffeaa7; border-radius: 5px; padding: 15px; margin: 10px 0;">
<h3>πŸ”¬ Processing Information</h3>
<ul>
<li>πŸ’» CPU processing only (slower than GPU)</li>
<li>πŸ“¦ No file size limits - process any audio length</li>
<li>🌍 For multi-language audio, use larger models (medium, large-v2, large-v3)</li>
<li>⚑ Larger models provide better accuracy but take longer to process</li>
<li>⚠️ Very large files may take significant time and memory</li>
</ul>
</div>
""")
with gr.Row():
with gr.Column(scale=2):
# Password input removed for security
audio_input = gr.Audio(label="πŸ“ Upload Audio File (No size limit)", type="filepath", sources=["upload"])
gr.Markdown("*Supported: MP3, WAV, M4A, FLAC, etc.*")
with gr.Row():
model_input = gr.Dropdown(choices=ALLOWED_MODELS, value="base.en", label="🎯 Whisper Model")
language_input = gr.Dropdown(choices=["auto", "en", "es", "fr", "de", "it"], value="auto", label="🌍 Language")
# Processing mode selection
processing_mode = gr.Radio(
choices=["Standard Diarization", "Speaker Separation"],
value="Standard Diarization",
label="🎯 Processing Mode"
)
gr.Markdown("*Standard: Traditional diarization | Separation: Pre-separate speakers*")
num_speakers = gr.Radio(
choices=[2, 3],
value=2,
label="πŸ‘₯ Number of Speakers",
visible=False,
info="Select how many speakers are in your audio"
)
with gr.Row():
stemming_input = gr.Checkbox(label="🎡 Audio Enhancement", value=False)
numerals_input = gr.Checkbox(label="πŸ”’ Suppress Numerals", value=False)
batch_size_input = gr.Slider(minimum=1, maximum=4, value=2, step=1, label="⚑ Batch Size")
process_btn = gr.Button("πŸŽ™οΈ Start Demo Transcription", variant="primary", size="lg")
with gr.Column(scale=1):
gr.Markdown("""
### πŸ“š How to Use
1. **Upload audio** (any size)
2. **Choose processing mode**
3. **Configure settings** (optional)
4. **Click process** and wait
5. **Download results**
### 🎯 Processing Modes
- **Standard**: Traditional speaker diarization
- **Speaker Separation**: Pre-separate speakers first
### 🌍 Model Selection
- **tiny.en/base.en/small.en**: Fast, English only
- **medium.en**: Better accuracy, English only
- **medium/large-v2/large-v3**: Best for multi-language audio
### ⚠️ Large File Warning
- Large files will take longer to process
- Monitor system resources during processing
""")
download_output = gr.File(label="πŸ“¦ Download Results", visible=False)
# Separate transcript windows for each speaker
with gr.Row(visible=False) as transcript_row:
with gr.Column():
speaker1_output = gr.Textbox(
label="πŸ—£οΈ Speaker 1 Transcript",
lines=15,
max_lines=20,
show_copy_button=True,
container=True,
interactive=False
)
with gr.Column():
speaker2_output = gr.Textbox(
label="πŸ—£οΈ Speaker 2 Transcript",
lines=15,
max_lines=20,
show_copy_button=True,
container=True,
interactive=False
)
result_output = gr.Textbox(label="πŸ“‹ Results", lines=5)
# Wire up mode visibility
processing_mode.change(
fn=update_speaker_visibility,
inputs=[processing_mode],
outputs=[num_speakers]
)
def process_wrapper(*args):
download_file, speaker1_text, speaker2_text, result_text = run_diarization(*args)
has_transcripts = bool(speaker1_text or speaker2_text)
return (
download_file,
speaker1_text or "",
speaker2_text or "",
result_text or "",
gr.update(visible=download_file is not None),
gr.update(visible=has_transcripts)
)
process_btn.click(
fn=process_wrapper,
inputs=[audio_input, model_input, language_input, stemming_input, numerals_input, batch_size_input, processing_mode, num_speakers],
outputs=[download_output, speaker1_output, speaker2_output, result_output, download_output, transcript_row]
)
if __name__ == "__main__":
demo.launch()