shukdevdattaEX's picture
Update app.py
f0b9a08 verified
raw
history blame
29.4 kB
import gradio as gr
import base64
import io
import os
from openai import OpenAI
import PyPDF2
from PIL import Image
import speech_recognition as sr
import tempfile
import cv2
import numpy as np
from typing import List, Tuple, Optional
import json
import pydub
from pydub import AudioSegment
from transformers import pipeline
import torch
class MultimodalChatbot:
def __init__(self, api_key: str):
self.client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=api_key,
)
self.model = "google/gemma-2-9b-it:free"
self.conversation_history = []
# Initialize the pipeline for image-text-to-text processing
try:
self.pipe = pipeline(
"image-captioning",
model="Salesforce/blip-image-captioning-base",
device="cpu", # Optimized for CPU in HF Spaces
torch_dtype=torch.float32, # Use float32 for CPU compatibility
)
print("Image captioning pipeline initialized successfully")
except Exception as e:
print(f"Error initializing image captioning pipeline: {e}")
self.pipe = None
def encode_image_to_base64(self, image) -> str:
"""Convert PIL Image or file path to base64 string"""
try:
if isinstance(image, str):
with open(image, "rb") as img_file:
return base64.b64encode(img_file.read()).decode('utf-8')
elif isinstance(image, Image.Image):
buffered = io.BytesIO()
if image.mode == 'RGBA':
image = image.convert('RGB')
image.save(buffered, format="JPEG", quality=85)
return base64.b64encode(buffered.getvalue()).decode('utf-8')
else:
raise ValueError("Invalid image input")
except Exception as e:
return f"Error encoding image: {str(e)}"
def extract_pdf_text(self, pdf_file) -> str:
"""Extract text from PDF file"""
try:
if isinstance(pdf_file, str):
pdf_path = pdf_file
elif hasattr(pdf_file, 'name'):
pdf_path = pdf_file.name
else:
raise ValueError("Invalid PDF file input")
text = ""
with open(pdf_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page_num, page in enumerate(pdf_reader.pages):
page_text = page.extract_text()
if page_text and page_text.strip():
text += f"Page {page_num + 1}:\n{page_text}\n\n"
return text.strip() if text.strip() else "No text could be extracted from this PDF."
except Exception as e:
return f"Error extracting PDF: {str(e)}"
def convert_audio_to_wav(self, audio_file) -> str:
"""Convert audio file to WAV format for speech recognition"""
try:
if isinstance(audio_file, str):
audio_path = audio_file
elif hasattr(audio_file, 'name'):
audio_path = audio_file.name
else:
raise ValueError("Invalid audio file input")
file_ext = os.path.splitext(audio_path)[1].lower()
if file_ext == '.wav':
return audio_path
audio = AudioSegment.from_file(audio_path)
wav_path = tempfile.mktemp(suffix='.wav')
audio.export(wav_path, format="wav", parameters=["-ac", "1", "-ar", "16000"])
return wav_path
except Exception as e:
return f"Error converting audio: {str(e)}"
def transcribe_audio(self, audio_file) -> str:
"""Transcribe audio file to text"""
try:
recognizer = sr.Recognizer()
wav_path = self.convert_audio_to_wav(audio_file)
with sr.AudioFile(wav_path) as source:
recognizer.adjust_for_ambient_noise(source, duration=0.2)
audio_data = recognizer.record(source)
try:
text = recognizer.recognize_google(audio_data)
return text
except sr.UnknownValueError:
return "Could not understand the audio. Please try with clearer audio."
except sr.RequestError as e:
try:
text = recognizer.recognize_sphinx(audio_data)
return text
except:
return f"Speech recognition service error: {str(e)}"
except Exception as e:
return f"Error transcribing audio: {str(e)}"
def extract_video_frame(self, video_file, frame_number=None):
"""Extract a frame from the video"""
try:
if isinstance(video_file, str):
video_path = video_file
elif hasattr(video_file, 'name'):
video_path = video_file.name
else:
raise ValueError("Invalid video file input")
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None, "Could not open video file"
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total_frames <= 0:
cap.release()
return None, "Video has no frames"
if frame_number is None:
frame_number = total_frames // 2 # Extract middle frame
if frame_number >= total_frames:
frame_number = total_frames - 1
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame = cap.read()
cap.release()
if ret:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return Image.fromarray(frame), f"Extracted frame {frame_number} of {total_frames}"
else:
return None, "Failed to extract frame"
except Exception as e:
return None, f"Error extracting video frame: {str(e)}"
def create_multimodal_message(self,
text_input: str = "",
pdf_file=None,
audio_file=None,
image_file=None,
video_file=None) -> dict:
"""Create a multimodal message for the API"""
content_parts = []
processing_info = []
if text_input:
content_parts.append({"type": "text", "text": text_input})
if pdf_file is not None:
pdf_text = self.extract_pdf_text(pdf_file)
content_parts.append({"type": "text", "text": f"PDF Content:\n{pdf_text}"})
processing_info.append("πŸ“„ PDF processed")
if audio_file is not None:
audio_text = self.transcribe_audio(audio_file)
content_parts.append({"type": "text", "text": f"Audio Transcription:\n{audio_text}"})
processing_info.append("🎀 Audio transcribed")
if image_file is not None and self.pipe is not None:
try:
if isinstance(image_file, str):
image = Image.open(image_file)
else:
image = image_file
# Use BLIP model for image captioning
output = self.pipe(image)
description = output[0]['generated_caption']
if text_input:
content_parts.append({"type": "text", "text": f"Image analysis (based on '{text_input}'): {description}"})
else:
content_parts.append({"type": "text", "text": f"Image analysis: {description}"})
processing_info.append("πŸ–ΌοΈ Image analyzed")
except Exception as e:
content_parts.append({"type": "text", "text": f"Error analyzing image: {str(e)}"})
processing_info.append("πŸ–ΌοΈ Image analysis failed")
elif image_file is not None:
content_parts.append({"type": "text", "text": "Image uploaded. Analysis failed due to model initialization error."})
processing_info.append("πŸ–ΌοΈ Image received (analysis failed)")
if video_file is not None and self.pipe is not None:
frame, frame_info = self.extract_video_frame(video_file)
if frame:
try:
output = self.pipe(frame)
description = output[0]['generated_caption']
if text_input:
content_parts.append({"type": "text", "text": f"Video frame analysis (based on '{text_input}'): {description}. Frame info: {frame_info}. Please describe the video for further assistance."})
else:
content_parts.append({"type": "text", "text": f"Video frame analysis: {description}. Frame info: {frame_info}. Please describe the video for further assistance."})
processing_info.append("πŸŽ₯ Video frame analyzed")
except Exception as e:
content_parts.append({"type": "text", "text": f"Error analyzing video frame: {str(e)}. Frame info: {frame_info}"})
processing_info.append("πŸŽ₯ Video frame analysis failed")
else:
content_parts.append({"type": "text", "text": f"Could not extract frame from video: {frame_info}. Please describe the video."})
processing_info.append("πŸŽ₯ Video processing failed")
elif video_file is not None:
content_parts.append({"type": "text", "text": "Video uploaded. Analysis failed due to model initialization error."})
processing_info.append("πŸŽ₯ Video received (analysis failed)")
return {"role": "user", "content": content_parts}, processing_info
def chat(self,
text_input: str = "",
pdf_file=None,
audio_file=None,
image_file=None,
video_file=None,
history: List[Tuple[str, str]] = None) -> Tuple[List[Tuple[str, str]], str]:
"""Main chat function"""
if history is None:
history = []
try:
user_message_parts = []
if text_input:
user_message_parts.append(f"Text: {text_input}")
if pdf_file:
user_message_parts.append("πŸ“„ PDF uploaded")
if audio_file:
user_message_parts.append("🎀 Audio uploaded")
if image_file:
user_message_parts.append("πŸ–ΌοΈ Image uploaded")
if video_file:
user_message_parts.append("πŸŽ₯ Video uploaded")
user_display = " | ".join(user_message_parts)
user_message, processing_info = self.create_multimodal_message(
text_input, pdf_file, audio_file, image_file, video_file
)
if processing_info:
user_display += f"\n{' | '.join(processing_info)}"
messages = [user_message]
completion = self.client.chat.completions.create(
extra_headers={
"HTTP-Referer": "https://multimodal-chatbot.local",
"X-Title": "Multimodal Chatbot",
},
model=self.model,
messages=messages,
max_tokens=2048,
temperature=0.7
)
bot_response = completion.choices[0].message.content
history.append((user_display, bot_response))
return history, ""
except Exception as e:
error_msg = f"Error: {str(e)}"
history.append((user_display if 'user_display' in locals() else "Error in input", error_msg))
return history, ""
def create_interface():
"""Create the Gradio interface"""
with gr.Blocks(title="Multimodal Chatbot with BLIP and Gemma", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# πŸ€– Multimodal Chatbot with BLIP and Gemma
This chatbot can process multiple types of input:
- **Text**: Regular text messages using Gemma
- **PDF**: Extract and analyze document content
- **Audio**: Transcribe speech to text (supports WAV, MP3, M4A, FLAC)
- **Images**: Upload images for analysis using BLIP
- **Video**: Upload videos for basic frame analysis using BLIP
**Setup**: Enter your OpenRouter API key below to get started
""")
with gr.Row():
with gr.Column():
api_key_input = gr.Textbox(
label="πŸ”‘ OpenRouter API Key",
placeholder="Enter your OpenRouter API key here...",
type="password",
info="Your API key is not stored and only used for this session"
)
api_status = gr.Textbox(
label="Connection Status",
value="❌ API Key not provided",
interactive=False
)
with gr.Tabs():
with gr.TabItem("πŸ’¬ Text Chat"):
with gr.Row():
with gr.Column(scale=1):
text_input = gr.Textbox(
label="πŸ’¬ Text Input",
placeholder="Type your message here...",
lines=5
)
text_submit_btn = gr.Button("πŸš€ Send", variant="primary", size="lg", interactive=False)
text_clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary")
with gr.Column(scale=2):
text_chatbot = gr.Chatbot(
label="Text Chat History",
height=600,
bubble_full_width=False,
show_copy_button=True
)
with gr.TabItem("πŸ“„ PDF Chat"):
with gr.Row():
with gr.Column(scale=1):
pdf_input = gr.File(
label="πŸ“„ PDF Upload",
file_types=[".pdf"],
type="filepath"
)
pdf_text_input = gr.Textbox(
label="πŸ’¬ Question about PDF",
placeholder="Ask something about the PDF...",
lines=3
)
pdf_submit_btn = gr.Button("πŸš€ Send", variant="primary", size="lg", interactive=False)
pdf_clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary")
with gr.Column(scale=2):
pdf_chatbot = gr.Chatbot(
label="PDF Chat History",
height=600,
bubble_full_width=False,
show_copy_button=True
)
with gr.TabItem("🎀 Audio Chat"):
with gr.Row():
with gr.Column(scale=1):
audio_input = gr.File(
label="🎀 Audio Upload",
file_types=[".wav", ".mp3", ".m4a", ".flac", ".ogg"],
type="filepath"
)
audio_text_input = gr.Textbox(
label="πŸ’¬ Question about Audio",
placeholder="Ask something about the audio...",
lines=3
)
audio_submit_btn = gr.Button("πŸš€ Send", variant="primary", size="lg", interactive=False)
audio_clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary")
with gr.Column(scale=2):
audio_chatbot = gr.Chatbot(
label="Audio Chat History",
height=600,
bubble_full_width=False,
show_copy_button=True
)
with gr.TabItem("πŸ–ΌοΈ Image Chat"):
with gr.Row():
with gr.Column(scale=1):
image_input = gr.Image(
label="πŸ–ΌοΈ Image Upload",
type="pil"
)
image_text_input = gr.Textbox(
label="πŸ’¬ Question about Image",
placeholder="Ask something about the image...",
lines=3
)
image_submit_btn = gr.Button("πŸš€ Send", variant="primary", size="lg", interactive=False)
image_clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary")
with gr.Column(scale=2):
image_chatbot = gr.Chatbot(
label="Image Chat History",
height=600,
bubble_full_width=False,
show_copy_button=True
)
with gr.TabItem("πŸŽ₯ Video Chat"):
with gr.Row():
with gr.Column(scale=1):
video_input = gr.File(
label="πŸŽ₯ Video Upload",
file_types=[".mp4", ".avi", ".mov", ".mkv", ".webm"],
type="filepath"
)
video_text_input = gr.Textbox(
label="πŸ’¬ Question about Video",
placeholder="Ask something about the video...",
lines=3
)
video_submit_btn = gr.Button("πŸš€ Send", variant="primary", size="lg", interactive=False)
video_clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary")
with gr.Column(scale=2):
video_chatbot = gr.Chatbot(
label="Video Chat History",
height=600,
bubble_full_width=False,
show_copy_button=True
)
with gr.TabItem("🌟 Combined Chat"):
with gr.Row():
with gr.Column(scale=1):
combined_text_input = gr.Textbox(
label="πŸ’¬ Text Input",
placeholder="Type your message here...",
lines=3
)
combined_pdf_input = gr.File(
label="πŸ“„ PDF Upload",
file_types=[".pdf"],
type="filepath"
)
combined_audio_input = gr.File(
label="🎀 Audio Upload",
file_types=[".wav", ".mp3", ".m4a", ".flac", ".ogg"],
type="filepath"
)
combined_image_input = gr.Image(
label="πŸ–ΌοΈ Image Upload",
type="pil"
)
combined_video_input = gr.File(
label="πŸŽ₯ Video Upload",
file_types=[".mp4", ".avi", ".mov", ".mkv", ".webm"],
type="filepath"
)
combined_submit_btn = gr.Button("πŸš€ Send All", variant="primary", size="lg", interactive=False)
combined_clear_btn = gr.Button("πŸ—‘οΈ Clear All", variant="secondary")
with gr.Column(scale=2):
combined_chatbot = gr.Chatbot(
label="Combined Chat History",
height=600,
bubble_full_width=False,
show_copy_button=True
)
def validate_api_key(api_key):
if not api_key or len(api_key.strip()) == 0:
return "❌ API Key not provided", *[gr.update(interactive=False) for _ in range(6)]
try:
test_client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=api_key.strip(),
)
return "βœ… API Key validated successfully", *[gr.update(interactive=True) for _ in range(6)]
except Exception as e:
return f"❌ API Key validation failed: {str(e)}", *[gr.update(interactive=False) for _ in range(6)]
def process_text_input(api_key, text, history):
if not api_key or len(api_key.strip()) == 0:
if history is None:
history = []
history.append(("Error", "❌ Please provide a valid API key first"))
return history, ""
chatbot = MultimodalChatbot(api_key.strip())
return chatbot.chat(text_input=text, history=history)
def process_pdf_input(api_key, pdf, text, history):
if not api_key or len(api_key.strip()) == 0:
if history is None:
history = []
history.append(("Error", "❌ Please provide a valid API key first"))
return history, ""
chatbot = MultimodalChatbot(api_key.strip())
return chatbot.chat(text_input=text, pdf_file=pdf, history=history)
def process_audio_input(api_key, audio, text, history):
if not api_key or len(api_key.strip()) == 0:
if history is None:
history = []
history.append(("Error", "❌ Please provide a valid API key first"))
return history, ""
chatbot = MultimodalChatbot(api_key.strip())
return chatbot.chat(text_input=text, audio_file=audio, history=history)
def process_image_input(api_key, image, text, history):
if not api_key or len(api_key.strip()) == 0:
if history is None:
history = []
history.append(("Error", "❌ Please provide a valid API key first"))
return history, ""
chatbot = MultimodalChatbot(api_key.strip())
return chatbot.chat(text_input=text, image_file=image, history=history)
def process_video_input(api_key, video, text, history):
if not api_key or len(api_key.strip()) == 0:
if history is None:
history = []
history.append(("Error", "❌ Please provide a valid API key first"))
return history, ""
chatbot = MultimodalChatbot(api_key.strip())
return chatbot.chat(text_input=text, video_file=video, history=history)
def process_combined_input(api_key, text, pdf, audio, image, video, history):
if not api_key or len(api_key.strip()) == 0:
if history is None:
history = []
history.append(("Error", "❌ Please provide a valid API key first"))
return history, ""
chatbot = MultimodalChatbot(api_key.strip())
return chatbot.chat(text_input=text, pdf_file=pdf, audio_file=audio, image_file=image, video_file=video, history=history)
def clear_chat():
return [], ""
def clear_all_inputs():
return [], "", None, None, None, None
api_key_input.change(
validate_api_key,
inputs=[api_key_input],
outputs=[api_status, text_submit_btn, pdf_submit_btn, audio_submit_btn,
image_submit_btn, video_submit_btn, combined_submit_btn]
)
text_submit_btn.click(
process_text_input,
inputs=[api_key_input, text_input, text_chatbot],
outputs=[text_chatbot, text_input]
)
text_input.submit(
process_text_input,
inputs=[api_key_input, text_input, text_chatbot],
outputs=[text_chatbot, text_input]
)
text_clear_btn.click(clear_chat, outputs=[text_chatbot, text_input])
pdf_submit_btn.click(
process_pdf_input,
inputs=[api_key_input, pdf_input, pdf_text_input, pdf_chatbot],
outputs=[pdf_chatbot, pdf_text_input]
)
pdf_clear_btn.click(lambda: ([], "", None), outputs=[pdf_chatbot, pdf_text_input, pdf_input])
audio_submit_btn.click(
process_audio_input,
inputs=[api_key_input, audio_input, audio_text_input, audio_chatbot],
outputs=[audio_chatbot, audio_text_input]
)
audio_clear_btn.click(lambda: ([], "", None), outputs=[audio_chatbot, audio_text_input, audio_input])
image_submit_btn.click(
process_image_input,
inputs=[api_key_input, image_input, image_text_input, image_chatbot],
outputs=[image_chatbot, image_text_input]
)
image_clear_btn.click(lambda: ([], "", None), outputs=[image_chatbot, image_text_input, image_input])
video_submit_btn.click(
process_video_input,
inputs=[api_key_input, video_input, video_text_input, video_chatbot],
outputs=[video_chatbot, video_text_input]
)
video_clear_btn.click(lambda: ([], "", None), outputs=[video_chatbot, video_text_input, video_input])
combined_submit_btn.click(
process_combined_input,
inputs=[api_key_input, combined_text_input, combined_pdf_input,
combined_audio_input, combined_image_input, combined_video_input, combined_chatbot],
outputs=[combined_chatbot, combined_text_input]
)
combined_clear_btn.click(clear_all_inputs,
outputs=[combined_chatbot, combined_text_input, combined_pdf_input,
combined_audio_input, combined_image_input, combined_video_input])
gr.Markdown("""
### 🎯 How to Use Each Tab:
**πŸ’¬ Text Chat**: Simple text conversations with the AI using Gemma
**πŸ“„ PDF Chat**: Upload a PDF and ask questions about its content
**🎀 Audio Chat**: Upload audio files for transcription and analysis
- Supports: WAV, MP3, M4A, FLAC, OGG formats
- Best results with clear speech and minimal background noise
**πŸ–ΌοΈ Image Chat**: Upload images for analysis using BLIP
- Provide a text prompt to guide the analysis (e.g., "What is in this image?")
**πŸŽ₯ Video Chat**: Upload videos for basic frame analysis using BLIP
- Analysis is based on a single frame; provide a text description for full video context
**🌟 Combined Chat**: Use multiple input types together for comprehensive analysis
### πŸ”‘ Getting an API Key:
1. Go to [OpenRouter.ai](https://openrouter.ai)
2. Sign up for an account
3. Navigate to the API Keys section
4. Create a new API key
5. Copy and paste it in the field above
### ⚠️ Current Limitations:
- Image and video analysis may be slow on CPU in Hugging Face Spaces
- Video analysis is limited to a single frame due to CPU constraints
- Large files may take longer to process
- BLIP model may provide basic captions; detailed video descriptions require additional user input
""")
return demo
if __name__ == "__main__":
required_packages = [
"gradio",
"openai",
"PyPDF2",
"Pillow",
"SpeechRecognition",
"opencv-python",
"numpy",
"pydub",
"transformers",
"torch"
]
print("πŸš€ Multimodal Chatbot with BLIP and Gemma")
print("=" * 50)
print("Required packages:", ", ".join(required_packages))
print("\nπŸ“¦ To install: pip install " + " ".join(required_packages))
print("\n🎀 For audio processing, you may also need:")
print(" - ffmpeg (for audio conversion)")
print(" - sudo apt-get install espeak espeak-data libespeak1 libespeak-dev (for offline speech recognition)")
print("\nπŸ”‘ Get your API key from: https://openrouter.ai")
print("πŸ’‘ Enter your API key in the web interface when it loads")
demo = create_interface()
demo.launch(share=True)