import gradio as gr import os import tempfile import subprocess import time from pathlib import Path import zipfile import io import sys # Demo configuration DEMO_MODE = True # File size limit removed - no restrictions # For multi-language audio, use larger models (medium, large-v2, large-v3) for better accuracy ALLOWED_MODELS = ["tiny.en", "base.en", "small.en", "medium.en", "medium", "large-v2", "large-v3"] # Password authentication removed for security def check_file_size(file_path): """Check if file exists and get size info""" if not file_path: return False, "No file provided" try: file_size = os.path.getsize(file_path) / (1024*1024) return True, f"File size: {file_size:.1f}MB" except Exception as e: return False, f"Error checking file: {str(e)}" def run_diarization(audio_file, model, language, enable_stemming, suppress_numerals, batch_size, processing_mode, num_speakers): """Main diarization function""" if not audio_file: return None, None, "❌ Please upload an audio file." size_ok, size_msg = check_file_size(audio_file) if not size_ok: return None, None, f"❌ {size_msg}" # Log file size for monitoring (no restriction) print(f"Processing file: {size_msg}") try: # Prepare command cmd = [sys.executable, "diarize1.py", "-a", audio_file] cmd.extend(["--whisper-model", model]) cmd.extend(["--device", "cpu"]) cmd.extend(["--batch-size", str(batch_size)]) if language and language != "auto": cmd.extend(["--language", language]) if not enable_stemming: cmd.append("--no-stem") if suppress_numerals: cmd.append("--suppress_numerals") # Add speaker separation options if processing_mode == "Speaker Separation": cmd.extend(["--num-speakers", str(num_speakers)]) # Run the process process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) stdout, stderr = process.communicate(timeout=1800) if process.returncode == 0: # Look for output files audio_dir = os.path.dirname(audio_file) audio_name = os.path.splitext(os.path.basename(audio_file))[0] output_files = [] transcript_content = "" import glob for ext in ['.txt', '.srt']: pattern = f"{audio_name}*{ext}" matches = glob.glob(os.path.join(audio_dir, pattern)) output_files.extend(matches) if output_files: # Create ZIP temp_dir = tempfile.mkdtemp() zip_path = os.path.join(temp_dir, "results.zip") with zipfile.ZipFile(zip_path, 'w') as zip_file: for file_path in output_files: zip_file.write(file_path, os.path.basename(file_path)) if file_path.endswith('.txt'): with open(file_path, 'r', encoding='utf-8') as f: transcript_content = f.read() # Parse transcript for speaker separation speaker_1_text, speaker_2_text = parse_speakers(transcript_content) return zip_path, speaker_1_text, speaker_2_text, f"✅ Processing complete! Generated {len(output_files)} files." else: return None, "", "", "❌ No output files generated." else: return None, "", "", f"❌ Processing failed: {stderr}" except Exception as e: return None, "", "", f"❌ Error: {str(e)}" def parse_speakers(transcript_content): """Parse transcript content and separate by speakers""" if not transcript_content: return "", "" lines = transcript_content.split('\n') speaker_1_lines = [] speaker_2_lines = [] for line in lines: line = line.strip() if not line: continue # Look for speaker labels (common formats) if line.startswith('SPEAKER_00') or line.startswith('Speaker 0') or line.startswith('[SPEAKER_00]'): speaker_1_lines.append(line) elif line.startswith('SPEAKER_01') or line.startswith('Speaker 1') or line.startswith('[SPEAKER_01]'): speaker_2_lines.append(line) else: # If no clear speaker label, try to detect from content if 'speaker' in line.lower(): if '0' in line or 'one' in line.lower() or 'first' in line.lower(): speaker_1_lines.append(line) elif '1' in line or 'two' in line.lower() or 'second' in line.lower(): speaker_2_lines.append(line) else: # Default to speaker 1 if unclear speaker_1_lines.append(line) else: # If no speaker indication, add to both or alternate if len(speaker_1_lines) <= len(speaker_2_lines): speaker_1_lines.append(line) else: speaker_2_lines.append(line) speaker_1_text = '\n'.join(speaker_1_lines) if speaker_1_lines else "No content detected for Speaker 1" speaker_2_text = '\n'.join(speaker_2_lines) if speaker_2_lines else "No content detected for Speaker 2" return speaker_1_text, speaker_2_text def update_speaker_visibility(mode): """Show/hide speaker count based on processing mode""" if mode == "Speaker Separation": return gr.update(visible=True) else: return gr.update(visible=False) # Create interface with gr.Blocks(title="🎤 Whisper Speaker Diarization Demo") as demo: gr.HTML("""
AI-powered speaker identification and transcription