import gradio as gr import os import tempfile import subprocess import time from pathlib import Path import zipfile import io import sys # Demo configuration DEMO_MODE = True # File size limit removed - no restrictions # For multi-language audio, use larger models (medium, large-v2, large-v3) for better accuracy ALLOWED_MODELS = ["tiny.en", "base.en", "small.en", "medium.en", "medium", "large-v2", "large-v3"] # Password authentication removed for security def check_file_size(file_path): """Check if file exists and get size info""" if not file_path: return False, "No file provided" try: file_size = os.path.getsize(file_path) / (1024*1024) return True, f"File size: {file_size:.1f}MB" except Exception as e: return False, f"Error checking file: {str(e)}" def run_diarization(audio_file, model, language, enable_stemming, suppress_numerals, batch_size, processing_mode, num_speakers): """Main diarization function""" if not audio_file: return None, None, "❌ Please upload an audio file." size_ok, size_msg = check_file_size(audio_file) if not size_ok: return None, None, f"❌ {size_msg}" # Log file size for monitoring (no restriction) print(f"Processing file: {size_msg}") try: # Prepare command cmd = [sys.executable, "diarize1.py", "-a", audio_file] cmd.extend(["--whisper-model", model]) cmd.extend(["--device", "cpu"]) cmd.extend(["--batch-size", str(batch_size)]) if language and language != "auto": cmd.extend(["--language", language]) if not enable_stemming: cmd.append("--no-stem") if suppress_numerals: cmd.append("--suppress_numerals") # Add speaker separation options if processing_mode == "Speaker Separation": cmd.extend(["--num-speakers", str(num_speakers)]) # Run the process process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) stdout, stderr = process.communicate(timeout=1800) if process.returncode == 0: # Look for output files audio_dir = os.path.dirname(audio_file) audio_name = os.path.splitext(os.path.basename(audio_file))[0] output_files = [] transcript_content = "" import glob for ext in ['.txt', '.srt']: pattern = f"{audio_name}*{ext}" matches = glob.glob(os.path.join(audio_dir, pattern)) output_files.extend(matches) if output_files: # Create ZIP temp_dir = tempfile.mkdtemp() zip_path = os.path.join(temp_dir, "results.zip") with zipfile.ZipFile(zip_path, 'w') as zip_file: for file_path in output_files: zip_file.write(file_path, os.path.basename(file_path)) if file_path.endswith('.txt'): with open(file_path, 'r', encoding='utf-8') as f: transcript_content = f.read() # Parse transcript for speaker separation speaker_1_text, speaker_2_text = parse_speakers(transcript_content) return zip_path, speaker_1_text, speaker_2_text, f"✅ Processing complete! Generated {len(output_files)} files." else: return None, "", "", "❌ No output files generated." else: return None, "", "", f"❌ Processing failed: {stderr}" except Exception as e: return None, "", "", f"❌ Error: {str(e)}" def parse_speakers(transcript_content): """Parse transcript content and separate by speakers""" if not transcript_content: return "", "" lines = transcript_content.split('\n') speaker_1_lines = [] speaker_2_lines = [] for line in lines: line = line.strip() if not line: continue # Look for speaker labels (common formats) if line.startswith('SPEAKER_00') or line.startswith('Speaker 0') or line.startswith('[SPEAKER_00]'): speaker_1_lines.append(line) elif line.startswith('SPEAKER_01') or line.startswith('Speaker 1') or line.startswith('[SPEAKER_01]'): speaker_2_lines.append(line) else: # If no clear speaker label, try to detect from content if 'speaker' in line.lower(): if '0' in line or 'one' in line.lower() or 'first' in line.lower(): speaker_1_lines.append(line) elif '1' in line or 'two' in line.lower() or 'second' in line.lower(): speaker_2_lines.append(line) else: # Default to speaker 1 if unclear speaker_1_lines.append(line) else: # If no speaker indication, add to both or alternate if len(speaker_1_lines) <= len(speaker_2_lines): speaker_1_lines.append(line) else: speaker_2_lines.append(line) speaker_1_text = '\n'.join(speaker_1_lines) if speaker_1_lines else "No content detected for Speaker 1" speaker_2_text = '\n'.join(speaker_2_lines) if speaker_2_lines else "No content detected for Speaker 2" return speaker_1_text, speaker_2_text def update_speaker_visibility(mode): """Show/hide speaker count based on processing mode""" if mode == "Speaker Separation": return gr.update(visible=True) else: return gr.update(visible=False) # Create interface with gr.Blocks(title="🎤 Whisper Speaker Diarization Demo") as demo: gr.HTML("""

🎤 Whisper Speaker Diarization Demo

AI-powered speaker identification and transcription

Telecom Paris
""") gr.HTML("""

🔬 Processing Information

""") with gr.Row(): with gr.Column(scale=2): # Password input removed for security audio_input = gr.Audio(label="📁 Upload Audio File (No size limit)", type="filepath", sources=["upload"]) gr.Markdown("*Supported: MP3, WAV, M4A, FLAC, etc.*") with gr.Row(): model_input = gr.Dropdown(choices=ALLOWED_MODELS, value="base.en", label="🎯 Whisper Model") language_input = gr.Dropdown(choices=["auto", "en", "es", "fr", "de", "it"], value="auto", label="🌍 Language") # Processing mode selection processing_mode = gr.Radio( choices=["Standard Diarization", "Speaker Separation"], value="Standard Diarization", label="🎯 Processing Mode" ) gr.Markdown("*Standard: Traditional diarization | Separation: Pre-separate speakers*") num_speakers = gr.Radio( choices=[2, 3], value=2, label="👥 Number of Speakers", visible=False, info="Select how many speakers are in your audio" ) with gr.Row(): stemming_input = gr.Checkbox(label="🎵 Audio Enhancement", value=False) numerals_input = gr.Checkbox(label="🔢 Suppress Numerals", value=False) batch_size_input = gr.Slider(minimum=1, maximum=4, value=2, step=1, label="⚡ Batch Size") process_btn = gr.Button("🎙️ Start Demo Transcription", variant="primary", size="lg") with gr.Column(scale=1): gr.Markdown(""" ### 📚 How to Use 1. **Upload audio** (any size) 2. **Choose processing mode** 3. **Configure settings** (optional) 4. **Click process** and wait 5. **Download results** ### 🎯 Processing Modes - **Standard**: Traditional speaker diarization - **Speaker Separation**: Pre-separate speakers first ### 🌍 Model Selection - **tiny.en/base.en/small.en**: Fast, English only - **medium.en**: Better accuracy, English only - **medium/large-v2/large-v3**: Best for multi-language audio ### ⚠️ Large File Warning - Large files will take longer to process - Monitor system resources during processing """) download_output = gr.File(label="📦 Download Results", visible=False) # Separate transcript windows for each speaker with gr.Row(visible=False) as transcript_row: with gr.Column(): speaker1_output = gr.Textbox( label="🗣️ Speaker 1 Transcript", lines=15, max_lines=20, show_copy_button=True, container=True, interactive=False ) with gr.Column(): speaker2_output = gr.Textbox( label="🗣️ Speaker 2 Transcript", lines=15, max_lines=20, show_copy_button=True, container=True, interactive=False ) result_output = gr.Textbox(label="📋 Results", lines=5) # Wire up mode visibility processing_mode.change( fn=update_speaker_visibility, inputs=[processing_mode], outputs=[num_speakers] ) def process_wrapper(*args): download_file, speaker1_text, speaker2_text, result_text = run_diarization(*args) has_transcripts = bool(speaker1_text or speaker2_text) return ( download_file, speaker1_text or "", speaker2_text or "", result_text or "", gr.update(visible=download_file is not None), gr.update(visible=has_transcripts) ) process_btn.click( fn=process_wrapper, inputs=[audio_input, model_input, language_input, stemming_input, numerals_input, batch_size_input, processing_mode, num_speakers], outputs=[download_output, speaker1_output, speaker2_output, result_output, download_output, transcript_row] ) if __name__ == "__main__": demo.launch()