qtAnswering / app.py
ikraamkb's picture
Update app.py
28de64c verified
raw
history blame
4.69 kB
from fastapi import FastAPI, File, UploadFile
import fitz # PyMuPDF for PDF parsing
from tika import parser # Apache Tika for document parsing
import openpyxl
from pptx import Presentation
from PIL import Image
from transformers import pipeline
import gradio as gr
from fastapi.responses import RedirectResponse
import numpy as np
import easyocr
# Initialize FastAPI
app = FastAPI()
print(f"πŸ”„ Loading models")
doc_qa_pipeline = pipeline("text-generation", model="TinyLlama/TinyLlama-1.1B-Chat-v1.0")
image_captioning_pipeline = pipeline("image-to-text", model="nlpconnect/vit-gpt2-image-captioning")
print("Models loaded")
# Initialize OCR Model (Lazy Load)
reader = easyocr.Reader(["en"], gpu=True)
# Allowed File Extensions
ALLOWED_EXTENSIONS = {"pdf", "docx", "pptx", "xlsx"}
def validate_file_type(file: UploadFile):
ext = file.filename.split(".")[-1].lower()
print(f"πŸ” Validating file type: {ext}")
if ext not in ALLOWED_EXTENSIONS:
return f"❌ Unsupported file format: {ext}"
return None
def truncate_text(text, max_tokens=450):
words = text.split()
truncated = " ".join(words[:max_tokens])
print(f"βœ‚οΈ Truncated text to {max_tokens} tokens.")
return truncated
def extract_text_from_pdf(pdf_file: UploadFile):
try:
print("πŸ“ Extracting text from PDF...")
pdf_bytes = pdf_file.file.read()
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
text = "\n".join([page.get_text("text") for page in doc])
return text if text else "⚠️ No text found."
except Exception as e:
return f"❌ Error reading PDF: {str(e)}"
def extract_text_with_tika(file: UploadFile):
try:
print("πŸ“ Extracting text with Tika...")
parsed = parser.from_buffer(file.file.read())
return parsed.get("content", "⚠️ No text found.").strip()
except Exception as e:
return f"❌ Error reading document: {str(e)}"
def extract_text_from_excel(excel_file: UploadFile):
try:
print("πŸ“ Extracting text from Excel...")
wb = openpyxl.load_workbook(excel_file.file, read_only=True)
text = []
for sheet in wb.worksheets:
for row in sheet.iter_rows(values_only=True):
text.append(" ".join(map(str, row)))
return "\n".join(text) if text else "⚠️ No text found."
except Exception as e:
return f"❌ Error reading Excel: {str(e)}"
def answer_question_from_document(file: UploadFile, question: str):
print("πŸ“‚ Processing document for QA...")
validation_error = validate_file_type(file)
if validation_error:
return validation_error
file_ext = file.filename.split(".")[-1].lower()
if file_ext == "pdf":
text = extract_text_from_pdf(file)
elif file_ext in ["docx", "pptx"]:
text = extract_text_with_tika(file)
elif file_ext == "xlsx":
text = extract_text_from_excel(file)
else:
return "❌ Unsupported file format!"
if not text:
return "⚠️ No text extracted from the document."
truncated_text = truncate_text(text)
print("πŸ€– Generating response...")
response = doc_qa_pipeline(f"Question: {question}\nContext: {truncated_text}")
return response[0]["generated_text"]
def answer_question_from_image(image, question: str):
try:
print("🎨 Converting image for processing...")
if isinstance(image, np.ndarray):
image = Image.fromarray(image) # Convert NumPy array to PIL Image
print("🎨 Generating caption for image...")
caption = image_captioning_pipeline(image)[0]['generated_text']
print("πŸ€– Answering question based on caption...")
response = doc_qa_pipeline(f"Question: {question}\nContext: {caption}")
return response[0]["generated_text"]
except Exception as e:
return f"❌ Error processing image: {str(e)}"
doc_interface = gr.Interface(
fn=answer_question_from_document,
inputs=[gr.File(label="πŸ“‚ Upload Document"), gr.Textbox(label="πŸ’¬ Ask a Question")],
outputs="text",
title="πŸ“„ AI Document Question Answering"
)
img_interface = gr.Interface(
fn=answer_question_from_image,
inputs=[gr.Image(label="🎨 Upload Image"), gr.Textbox(label="πŸ’¬ Ask a Question")],
outputs="text",
title="🎨 AI Image Question Answering"
)
demo = gr.TabbedInterface([doc_interface, img_interface], ["πŸ“„ Document QA", "🎨 Image QA"])
@app.get("/")
def home():
return RedirectResponse(url="/")
if __name__ == "__main__":
demo.launch()
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)