Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import random | |
| import time | |
| import os | |
| import torch | |
| from pathlib import Path | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| import json | |
| import uuid | |
| import edge_tts | |
| import asyncio | |
| import aiofiles | |
| import mimetypes | |
| from typing import List | |
| from PyPDF2 import PdfReader | |
| # Define model name clearly | |
| MODEL_NAME = "unsloth/gemma-3-1b-pt" # HuggingFaceH4/zephyr-7b-alpha | |
| # Device setup | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"Using device: {device}") | |
| # Load model and tokenizer (explicit evaluation mode) | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_NAME, | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
| ).eval().to(device) | |
| # Constants | |
| MAX_FILE_SIZE_MB = 20 | |
| MAX_FILE_SIZE_BYTES = MAX_FILE_SIZE_MB * 1024 * 1024 # Convert MB to bytes | |
| class PodcastGenerator: | |
| def __init__(self): | |
| pass | |
| async def generate_script(self, prompt: str, language: str, api_key: str, file_obj=None, progress=None): | |
| example = """ | |
| { | |
| "topic": "AGI", | |
| "podcast": [ | |
| { | |
| "speaker": 2, | |
| "line": "So, AGI, huh? Seems like everyone's talking about it these days." | |
| }, | |
| { | |
| "speaker": 1, | |
| "line": "Yeah, it's definitely having a moment, isn't it?" | |
| }, | |
| { | |
| "speaker": 2, | |
| "line": "It is and for good reason, right? I mean, you've been digging into this stuff, listening to the podcasts and everything. What really stood out to you? What got you hooked?" | |
| }, | |
| { | |
| "speaker": 1, | |
| "line": "It's easy to get lost in the noise, for sure." | |
| }, | |
| { | |
| "speaker": 2, | |
| "line": "Exactly. So how about we try to cut through some of that, shall we?" | |
| }, | |
| { | |
| "speaker": 1, | |
| "line": "Sounds like a plan." | |
| }, | |
| { | |
| "speaker": 2, | |
| "line": "It certainly is and on that note, we'll wrap up this deep dive. Thanks for listening, everyone." | |
| }, | |
| { | |
| "speaker": 1, | |
| "line": "Peace." | |
| } | |
| ] | |
| } | |
| """ | |
| if language == "Auto Detect": | |
| language_instruction = "- The podcast MUST be in the same language as the user input." | |
| else: | |
| language_instruction = f"- The podcast MUST be in {language} language" | |
| system_prompt = f""" | |
| You are a professional podcast generator. Your task is to generate a professional podcast script based on the user input. | |
| {language_instruction} | |
| - The podcast should have 2 speakers. | |
| - The podcast should be long. | |
| - Do not use names for the speakers. | |
| - The podcast should be interesting, lively, and engaging, and hook the listener from the start. | |
| - The input text might be disorganized or unformatted, originating from sources like PDFs or text files. Ignore any formatting inconsistencies or irrelevant details; your task is to distill the essential points, identify key definitions, and highlight intriguing facts that would be suitable for discussion in a podcast. | |
| - The script must be in JSON format. | |
| Follow this example structure: | |
| {example} | |
| """ | |
| # Build the user prompt | |
| if prompt and file_obj: | |
| user_prompt = f"Please generate a podcast script based on the uploaded file following user input:\n{prompt}" | |
| elif prompt: | |
| user_prompt = f"Please generate a podcast script based on the following user input:\n{prompt}" | |
| else: | |
| user_prompt = "Please generate a podcast script based on the uploaded file." | |
| # If a file is provided, extract its text and append | |
| if file_obj: | |
| # enforce size limit | |
| file_size = getattr(file_obj, 'size', os.path.getsize(file_obj.name)) | |
| if file_size > MAX_FILE_SIZE_BYTES: | |
| raise Exception(f"File size exceeds the {MAX_FILE_SIZE_MB}MB limit. Please upload a smaller file.") | |
| # extract text based on mime | |
| ext = os.path.splitext(file_obj.name)[1].lower() | |
| if ext == '.pdf': | |
| reader = PdfReader(file_obj) | |
| text = "\n\n".join(page.extract_text() or '' for page in reader.pages) | |
| else: | |
| # txt or other | |
| if hasattr(file_obj, 'read'): | |
| raw = file_obj.read() | |
| else: | |
| raw = await aiofiles.open(file_obj.name, 'rb').read() | |
| text = raw.decode(errors='ignore') | |
| user_prompt += f"\n\n―― FILE CONTENT ――\n{text}" | |
| # Combine system and user prompts | |
| prompt_text = system_prompt + "\n" + user_prompt | |
| try: | |
| if progress: | |
| progress(0.3, "Generating podcast script...") | |
| def hf_generate(prompt_text): | |
| inputs = tokenizer(prompt_text, return_tensors="pt").to(model.device) | |
| outputs = model.generate( | |
| **inputs, | |
| max_new_tokens=1024, | |
| do_sample=True, | |
| temperature=1.0 | |
| ) | |
| return tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| generated_text = await asyncio.wait_for( | |
| asyncio.to_thread(hf_generate, prompt_text), | |
| timeout=60 | |
| ) | |
| except asyncio.TimeoutError: | |
| raise Exception("The script generation request timed out. Please try again later.") | |
| except Exception as e: | |
| raise Exception(f"Failed to generate podcast script: {e}") | |
| if progress: | |
| progress(0.4, "Script generated successfully!") | |
| return json.loads(generated_text) | |
| # ... rest of class unchanged ... | |
| # ... rest of class unchanged ... | |
| async def _read_file_bytes(self, file_obj) -> bytes: | |
| """Read file bytes from a file object""" | |
| # Check file size before reading | |
| if hasattr(file_obj, 'size'): | |
| file_size = file_obj.size | |
| else: | |
| file_size = os.path.getsize(file_obj.name) | |
| if file_size > MAX_FILE_SIZE_BYTES: | |
| raise Exception(f"File size exceeds the {MAX_FILE_SIZE_MB}MB limit. Please upload a smaller file.") | |
| if hasattr(file_obj, 'read'): | |
| return file_obj.read() | |
| else: | |
| async with aiofiles.open(file_obj.name, 'rb') as f: | |
| return await f.read() | |
| def _get_mime_type(self, filename: str) -> str: | |
| """Determine MIME type based on file extension""" | |
| ext = os.path.splitext(filename)[1].lower() | |
| if ext == '.pdf': | |
| return "application/pdf" | |
| elif ext == '.txt': | |
| return "text/plain" | |
| else: | |
| # Fallback to the default mime type detector | |
| mime_type, _ = mimetypes.guess_type(filename) | |
| return mime_type or "application/octet-stream" | |
| async def tts_generate(self, text: str, speaker: int, speaker1: str, speaker2: str) -> str: | |
| voice = speaker1 if speaker == 1 else speaker2 | |
| speech = edge_tts.Communicate(text, voice) | |
| temp_filename = f"temp_{uuid.uuid4()}.wav" | |
| try: | |
| # Add timeout to TTS generation | |
| await asyncio.wait_for(speech.save(temp_filename), timeout=30) # 30 seconds timeout | |
| return temp_filename | |
| except asyncio.TimeoutError: | |
| if os.path.exists(temp_filename): | |
| os.remove(temp_filename) | |
| raise Exception("Text-to-speech generation timed out. Please try with a shorter text.") | |
| except Exception as e: | |
| if os.path.exists(temp_filename): | |
| os.remove(temp_filename) | |
| raise e | |
| async def combine_audio_files(self, audio_files: List[str], progress=None) -> str: | |
| if progress: | |
| progress(0.9, "Combining audio files...") | |
| combined_audio = AudioSegment.empty() | |
| for audio_file in audio_files: | |
| combined_audio += AudioSegment.from_file(audio_file) | |
| os.remove(audio_file) # Clean up temporary files | |
| output_filename = f"output_{uuid.uuid4()}.wav" | |
| combined_audio.export(output_filename, format="wav") | |
| if progress: | |
| progress(1.0, "Podcast generated successfully!") | |
| return output_filename | |
| async def generate_podcast(self, input_text: str, language: str, speaker1: str, speaker2: str, api_key: str, file_obj=None, progress=None) -> str: | |
| try: | |
| if progress: | |
| progress(0.1, "Starting podcast generation...") | |
| # Set overall timeout for the entire process | |
| return await asyncio.wait_for( | |
| self._generate_podcast_internal(input_text, language, speaker1, speaker2, api_key, file_obj, progress), | |
| timeout=600 # 10 minutes total timeout | |
| ) | |
| except asyncio.TimeoutError: | |
| raise Exception("The podcast generation process timed out. Please try with shorter text or try again later.") | |
| except Exception as e: | |
| raise Exception(f"Error generating podcast: {str(e)}") | |
| async def _generate_podcast_internal(self, input_text: str, language: str, speaker1: str, speaker2: str, api_key: str, file_obj=None, progress=None) -> str: | |
| if progress: | |
| progress(0.2, "Generating podcast script...") | |
| podcast_json = await self.generate_script(input_text, language, api_key, file_obj, progress) | |
| if progress: | |
| progress(0.5, "Converting text to speech...") | |
| # Process TTS in batches for concurrent processing | |
| audio_files = [] | |
| total_lines = len(podcast_json['podcast']) | |
| # Define batch size to control concurrency | |
| batch_size = 10 # Adjust based on system resources | |
| # Process in batches | |
| for batch_start in range(0, total_lines, batch_size): | |
| batch_end = min(batch_start + batch_size, total_lines) | |
| batch = podcast_json['podcast'][batch_start:batch_end] | |
| # Create tasks for concurrent processing | |
| tts_tasks = [] | |
| for item in batch: | |
| tts_task = self.tts_generate(item['line'], item['speaker'], speaker1, speaker2) | |
| tts_tasks.append(tts_task) | |
| try: | |
| # Process batch concurrently | |
| batch_results = await asyncio.gather(*tts_tasks, return_exceptions=True) | |
| # Check for exceptions and handle results | |
| for i, result in enumerate(batch_results): | |
| if isinstance(result, Exception): | |
| # Clean up any files already created | |
| for file in audio_files: | |
| if os.path.exists(file): | |
| os.remove(file) | |
| raise Exception(f"Error generating speech: {str(result)}") | |
| else: | |
| audio_files.append(result) | |
| # Update progress | |
| if progress: | |
| current_progress = 0.5 + (0.4 * (batch_end / total_lines)) | |
| progress(current_progress, f"Processed {batch_end}/{total_lines} speech segments...") | |
| except Exception as e: | |
| # Clean up any files already created | |
| for file in audio_files: | |
| if os.path.exists(file): | |
| os.remove(file) | |
| raise Exception(f"Error in batch TTS generation: {str(e)}") | |
| combined_audio = await self.combine_audio_files(audio_files, progress) | |
| return combined_audio | |
| async def process_input(input_text: str, input_file, language: str, speaker1: str, speaker2: str, api_key: str = "", progress=None) -> str: | |
| start_time = time.time() | |
| voice_names = { | |
| "Andrew - English (United States)": "en-US-AndrewMultilingualNeural", | |
| "Ava - English (United States)": "en-US-AvaMultilingualNeural", | |
| "Brian - English (United States)": "en-US-BrianMultilingualNeural", | |
| "Emma - English (United States)": "en-US-EmmaMultilingualNeural", | |
| "Florian - German (Germany)": "de-DE-FlorianMultilingualNeural", | |
| "Seraphina - German (Germany)": "de-DE-SeraphinaMultilingualNeural", | |
| "Remy - French (France)": "fr-FR-RemyMultilingualNeural", | |
| "Vivienne - French (France)": "fr-FR-VivienneMultilingualNeural" | |
| } | |
| speaker1 = voice_names[speaker1] | |
| speaker2 = voice_names[speaker2] | |
| try: | |
| if progress: | |
| progress(0.05, "Processing input...") | |
| if not api_key: | |
| api_key = "saf" # os.getenv("GENAI_API_KEY") | |
| if not api_key: | |
| raise Exception("No API key provided. Please provide a Gemini API key.") | |
| podcast_generator = PodcastGenerator() | |
| podcast = await podcast_generator.generate_podcast(input_text, language, speaker1, speaker2, api_key, input_file, progress) | |
| end_time = time.time() | |
| print(f"Total podcast generation time: {end_time - start_time:.2f} seconds") | |
| return podcast | |
| except Exception as e: | |
| # Ensure we show a user-friendly error | |
| error_msg = str(e) | |
| if "rate limit" in error_msg.lower(): | |
| raise Exception("Rate limit exceeded. Please try again later or use your own API key.") | |
| elif "timeout" in error_msg.lower(): | |
| raise Exception("The request timed out. This could be due to server load or the length of your input. Please try again with shorter text.") | |
| else: | |
| raise Exception(f"Error: {error_msg}") | |
| # Gradio UI | |
| def generate_podcast_gradio(input_text, input_file, language, speaker1, speaker2, api_key): | |
| # Handle the file if uploaded | |
| file_obj = input_file if input_file is not None else None | |
| try: | |
| # Run the async function in the event loop | |
| return asyncio.run(process_input( | |
| input_text, | |
| file_obj, | |
| language, | |
| speaker1, | |
| speaker2, | |
| api_key, | |
| # internally process_input still accepts a progress callback | |
| # but since we're using Gradio's built-in bar, just pass a no-op: | |
| lambda *_: None | |
| )) | |
| except Exception as e: | |
| raise gr.Error(str(e)) | |
| def main(): | |
| with gr.Blocks(title="PodcastGen 🎙️") as demo: | |
| gr.Markdown( | |
| """ | |
| # PodcastGen 🎙️ | |
| Generate a 2-speaker podcast from text or PDF! | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_text = gr.Textbox(label="Input Text", lines=10, placeholder="Enter podcast topic or paste text here...", elem_id="input_text") | |
| input_file = gr.File(label="Or Upload a PDF or TXT file", file_types=[".pdf", ".txt"]) | |
| with gr.Column(): | |
| language = gr.Dropdown( | |
| label="Podcast Language", | |
| choices=[ | |
| "Auto Detect", | |
| "English", | |
| "German", | |
| "French", | |
| "Spanish", | |
| "Italian", | |
| "Dutch", | |
| "Portuguese", | |
| "Russian", | |
| "Chinese", | |
| "Japanese", | |
| "Korean", | |
| "Other", | |
| ], | |
| value="Auto Detect" | |
| ) | |
| speaker1 = gr.Dropdown( | |
| label="Speaker 1 Voice", | |
| choices=[ | |
| "Andrew - English (United States)", | |
| "Ava - English (United States)", | |
| "Brian - English (United States)", | |
| "Emma - English (United States)", | |
| "Florian - German (Germany)", | |
| "Seraphina - German (Germany)", | |
| "Remy - French (France)", | |
| "Vivienne - French (France)" | |
| ], | |
| value="Andrew - English (United States)", | |
| ) | |
| speaker2 = gr.Dropdown( | |
| label="Speaker 2 Voice", | |
| choices=[ | |
| "Andrew - English (United States)", | |
| "Ava - English (United States)", | |
| "Brian - English (United States)", | |
| "Emma - English (United States)", | |
| "Florian - German (Germany)", | |
| "Seraphina - German (Germany)", | |
| "Remy - French (France)", | |
| "Vivienne - French (France)" | |
| ], | |
| value="Ava - English (United States)", | |
| ) | |
| api_key = gr.Textbox(label="Gemini API Key (Optional)", type="password", placeholder="Needed only if you're getting rate limited.") | |
| generate_btn = gr.Button("Generate Podcast 🎙️", variant="primary") | |
| output_audio = gr.Audio(label="Generated Podcast", type="filepath", format="wav", elem_id="output_audio") | |
| generate_btn.click( | |
| fn=generate_podcast_gradio, | |
| inputs=[input_text, input_file, language, speaker1, speaker2, api_key], | |
| outputs=output_audio, | |
| show_progress=True | |
| ) | |
| demo.queue() | |
| demo.launch(server_name="0.0.0.0", debug=True) | |
| if __name__ == "__main__": | |
| main() |