|
|
import gradio as gr |
|
|
import base64 |
|
|
import io |
|
|
import os |
|
|
from openai import OpenAI |
|
|
import PyPDF2 |
|
|
from PIL import Image |
|
|
import speech_recognition as sr |
|
|
import tempfile |
|
|
import cv2 |
|
|
import numpy as np |
|
|
from typing import List, Tuple, Optional |
|
|
import json |
|
|
|
|
|
class MultimodalChatbot: |
|
|
def __init__(self, api_key: str): |
|
|
self.client = OpenAI( |
|
|
base_url="https://openrouter.ai/api/v1", |
|
|
api_key=api_key, |
|
|
) |
|
|
self.model = "google/gemma-3n-e2b-it:free" |
|
|
self.conversation_history = [] |
|
|
|
|
|
def encode_image_to_base64(self, image) -> str: |
|
|
"""Convert PIL Image to base64 string""" |
|
|
if isinstance(image, str): |
|
|
|
|
|
with open(image, "rb") as img_file: |
|
|
return base64.b64encode(img_file.read()).decode('utf-8') |
|
|
else: |
|
|
|
|
|
buffered = io.BytesIO() |
|
|
image.save(buffered, format="PNG") |
|
|
return base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
|
|
|
|
def extract_pdf_text(self, pdf_file) -> str: |
|
|
"""Extract text from PDF file""" |
|
|
try: |
|
|
if hasattr(pdf_file, 'name'): |
|
|
|
|
|
pdf_path = pdf_file.name |
|
|
else: |
|
|
pdf_path = pdf_file |
|
|
|
|
|
text = "" |
|
|
with open(pdf_path, 'rb') as file: |
|
|
pdf_reader = PyPDF2.PdfReader(file) |
|
|
for page in pdf_reader.pages: |
|
|
text += page.extract_text() + "\n" |
|
|
return text.strip() |
|
|
except Exception as e: |
|
|
return f"Error extracting PDF: {str(e)}" |
|
|
|
|
|
def transcribe_audio(self, audio_file) -> str: |
|
|
"""Transcribe audio file to text""" |
|
|
try: |
|
|
recognizer = sr.Recognizer() |
|
|
|
|
|
if hasattr(audio_file, 'name'): |
|
|
audio_path = audio_file.name |
|
|
else: |
|
|
audio_path = audio_file |
|
|
|
|
|
with sr.AudioFile(audio_path) as source: |
|
|
audio_data = recognizer.record(source) |
|
|
text = recognizer.recognize_google(audio_data) |
|
|
return text |
|
|
except Exception as e: |
|
|
return f"Error transcribing audio: {str(e)}" |
|
|
|
|
|
def process_video(self, video_file) -> List[str]: |
|
|
"""Extract frames from video and convert to base64""" |
|
|
try: |
|
|
if hasattr(video_file, 'name'): |
|
|
video_path = video_file.name |
|
|
else: |
|
|
video_path = video_file |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
frames = [] |
|
|
frame_count = 0 |
|
|
|
|
|
|
|
|
while cap.read()[0] and frame_count < 10: |
|
|
ret, frame = cap.read() |
|
|
if ret and frame_count % 30 == 0: |
|
|
|
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
pil_image = Image.fromarray(rgb_frame) |
|
|
base64_frame = self.encode_image_to_base64(pil_image) |
|
|
frames.append(base64_frame) |
|
|
frame_count += 1 |
|
|
|
|
|
cap.release() |
|
|
return frames |
|
|
except Exception as e: |
|
|
return [f"Error processing video: {str(e)}"] |
|
|
|
|
|
def create_multimodal_message(self, |
|
|
text_input: str = "", |
|
|
pdf_file=None, |
|
|
audio_file=None, |
|
|
image_file=None, |
|
|
video_file=None) -> dict: |
|
|
"""Create a multimodal message for the API""" |
|
|
|
|
|
content_parts = [] |
|
|
|
|
|
|
|
|
if text_input: |
|
|
content_parts.append({"type": "text", "text": text_input}) |
|
|
|
|
|
|
|
|
if pdf_file is not None: |
|
|
pdf_text = self.extract_pdf_text(pdf_file) |
|
|
content_parts.append({ |
|
|
"type": "text", |
|
|
"text": f"PDF Content:\n{pdf_text}" |
|
|
}) |
|
|
|
|
|
|
|
|
if audio_file is not None: |
|
|
audio_text = self.transcribe_audio(audio_file) |
|
|
content_parts.append({ |
|
|
"type": "text", |
|
|
"text": f"Audio Transcription:\n{audio_text}" |
|
|
}) |
|
|
|
|
|
|
|
|
if image_file is not None: |
|
|
image_base64 = self.encode_image_to_base64(image_file) |
|
|
content_parts.append({ |
|
|
"type": "image_url", |
|
|
"image_url": { |
|
|
"url": f"data:image/png;base64,{image_base64}" |
|
|
} |
|
|
}) |
|
|
|
|
|
|
|
|
if video_file is not None: |
|
|
video_frames = self.process_video(video_file) |
|
|
for i, frame_base64 in enumerate(video_frames): |
|
|
if not frame_base64.startswith("Error"): |
|
|
content_parts.append({ |
|
|
"type": "image_url", |
|
|
"image_url": { |
|
|
"url": f"data:image/png;base64,{frame_base64}" |
|
|
} |
|
|
}) |
|
|
|
|
|
return {"role": "user", "content": content_parts} |
|
|
|
|
|
def chat(self, |
|
|
text_input: str = "", |
|
|
pdf_file=None, |
|
|
audio_file=None, |
|
|
image_file=None, |
|
|
video_file=None, |
|
|
history: List[Tuple[str, str]] = None) -> Tuple[List[Tuple[str, str]], str]: |
|
|
"""Main chat function""" |
|
|
|
|
|
if history is None: |
|
|
history = [] |
|
|
|
|
|
try: |
|
|
|
|
|
user_message_parts = [] |
|
|
if text_input: |
|
|
user_message_parts.append(f"Text: {text_input}") |
|
|
if pdf_file: |
|
|
user_message_parts.append("π PDF uploaded") |
|
|
if audio_file: |
|
|
user_message_parts.append("π€ Audio uploaded") |
|
|
if image_file: |
|
|
user_message_parts.append("πΌοΈ Image uploaded") |
|
|
if video_file: |
|
|
user_message_parts.append("π₯ Video uploaded") |
|
|
|
|
|
user_display = " | ".join(user_message_parts) |
|
|
|
|
|
|
|
|
user_message = self.create_multimodal_message( |
|
|
text_input, pdf_file, audio_file, image_file, video_file |
|
|
) |
|
|
|
|
|
|
|
|
messages = [user_message] |
|
|
|
|
|
|
|
|
completion = self.client.chat.completions.create( |
|
|
extra_headers={ |
|
|
"HTTP-Referer": "https://multimodal-chatbot.local", |
|
|
"X-Title": "Multimodal Chatbot", |
|
|
}, |
|
|
model=self.model, |
|
|
messages=messages, |
|
|
max_tokens=1024, |
|
|
temperature=0.7 |
|
|
) |
|
|
|
|
|
bot_response = completion.choices[0].message.content |
|
|
|
|
|
|
|
|
history.append((user_display, bot_response)) |
|
|
|
|
|
return history, "" |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error: {str(e)}" |
|
|
history.append((user_display if 'user_display' in locals() else "Error in input", error_msg)) |
|
|
return history, "" |
|
|
|
|
|
def create_interface(): |
|
|
"""Create the Gradio interface""" |
|
|
|
|
|
|
|
|
api_key = os.getenv("OPENROUTER_API_KEY", "your_api_key_here") |
|
|
chatbot = MultimodalChatbot(api_key) |
|
|
|
|
|
with gr.Blocks(title="Multimodal Chatbot with Gemma 3n", theme=gr.themes.Soft()) as demo: |
|
|
gr.Markdown(""" |
|
|
# π€ Multimodal Chatbot with Gemma 3n |
|
|
|
|
|
This chatbot can process multiple types of input: |
|
|
- **Text**: Regular text messages |
|
|
- **PDF**: Extract and analyze document content |
|
|
- **Audio**: Transcribe speech to text |
|
|
- **Images**: Analyze visual content |
|
|
- **Video**: Extract frames and analyze video content |
|
|
|
|
|
**Setup**: Set your OpenRouter API key as an environment variable `OPENROUTER_API_KEY` |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
|
|
|
text_input = gr.Textbox( |
|
|
label="π¬ Text Input", |
|
|
placeholder="Type your message here...", |
|
|
lines=3 |
|
|
) |
|
|
|
|
|
pdf_input = gr.File( |
|
|
label="π PDF Upload", |
|
|
file_types=[".pdf"], |
|
|
type="filepath" |
|
|
) |
|
|
|
|
|
audio_input = gr.File( |
|
|
label="π€ Audio Upload", |
|
|
file_types=[".wav", ".mp3", ".m4a", ".flac"], |
|
|
type="filepath" |
|
|
) |
|
|
|
|
|
image_input = gr.Image( |
|
|
label="πΌοΈ Image Upload", |
|
|
type="pil" |
|
|
) |
|
|
|
|
|
video_input = gr.File( |
|
|
label="π₯ Video Upload", |
|
|
file_types=[".mp4", ".avi", ".mov", ".mkv"], |
|
|
type="filepath" |
|
|
) |
|
|
|
|
|
submit_btn = gr.Button("π Send", variant="primary", size="lg") |
|
|
clear_btn = gr.Button("ποΈ Clear", variant="secondary") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
|
|
|
chatbot_interface = gr.Chatbot( |
|
|
label="Chat History", |
|
|
height=600, |
|
|
bubble_full_width=False |
|
|
) |
|
|
|
|
|
|
|
|
def process_input(text, pdf, audio, image, video, history): |
|
|
return chatbot.chat(text, pdf, audio, image, video, history) |
|
|
|
|
|
def clear_all(): |
|
|
return [], "", None, None, None, None |
|
|
|
|
|
|
|
|
submit_btn.click( |
|
|
process_input, |
|
|
inputs=[text_input, pdf_input, audio_input, image_input, video_input, chatbot_interface], |
|
|
outputs=[chatbot_interface, text_input] |
|
|
) |
|
|
|
|
|
clear_btn.click( |
|
|
clear_all, |
|
|
outputs=[chatbot_interface, text_input, pdf_input, audio_input, image_input, video_input] |
|
|
) |
|
|
|
|
|
|
|
|
text_input.submit( |
|
|
process_input, |
|
|
inputs=[text_input, pdf_input, audio_input, image_input, video_input, chatbot_interface], |
|
|
outputs=[chatbot_interface, text_input] |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown(""" |
|
|
### π― Example Usage: |
|
|
- Upload a PDF and ask "Summarize this document" |
|
|
- Upload an image and ask "What do you see in this image?" |
|
|
- Record audio and ask "What did I say?" |
|
|
- Upload a video and ask "Describe what's happening" |
|
|
- Combine multiple inputs: "Compare this image with the PDF content" |
|
|
""") |
|
|
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
required_packages = [ |
|
|
"gradio", |
|
|
"openai", |
|
|
"PyPDF2", |
|
|
"Pillow", |
|
|
"SpeechRecognition", |
|
|
"opencv-python", |
|
|
"numpy" |
|
|
] |
|
|
|
|
|
print("Required packages:", ", ".join(required_packages)) |
|
|
print("\nTo install: pip install " + " ".join(required_packages)) |
|
|
print("\nDon't forget to set your OPENROUTER_API_KEY environment variable!") |
|
|
|
|
|
demo = create_interface() |
|
|
demo.launch( |
|
|
share=True |
|
|
) |