Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, Form, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import HTMLResponse, JSONResponse, FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| import shutil, os | |
| from tempfile import gettempdir | |
| app = FastAPI() | |
| # β CORS to allow frontend access | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # β Static assets | |
| app.mount("/resources", StaticFiles(directory="resources"), name="resources") | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # β Jinja2 Templates | |
| templates = Jinja2Templates(directory="templates") | |
| # β Serve Homepage | |
| async def serve_home(request: Request): | |
| return templates.TemplateResponse("home.html", {"request": request}) | |
| # β Predict endpoint (handles image + document) | |
| async def predict(question: str = Form(...), file: UploadFile = Form(...)): | |
| try: | |
| temp_path = f"temp_{file.filename}" | |
| with open(temp_path, "wb") as f: | |
| shutil.copyfileobj(file.file, f) | |
| is_image = file.content_type.startswith("image/") | |
| if is_image: | |
| from appImage import answer_question_from_image | |
| from PIL import Image | |
| image = Image.open(temp_path).convert("RGB") | |
| answer, audio_path = answer_question_from_image(image, question) | |
| else: | |
| from app import answer_question_from_doc | |
| class NamedFile: | |
| def __init__(self, name): self.filename = name | |
| def read(self): return open(self.filename, "rb").read() | |
| answer, audio_path = answer_question_from_doc(NamedFile(temp_path), question) | |
| os.remove(temp_path) | |
| if audio_path and os.path.exists(audio_path): | |
| return JSONResponse({ | |
| "answer": answer, | |
| "audio": f"/audio/{os.path.basename(audio_path)}" | |
| }) | |
| else: | |
| return JSONResponse({"answer": answer}) | |
| except Exception as e: | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| # β Serve audio | |
| async def get_audio(filename: str): | |
| filepath = os.path.join(gettempdir(), filename) | |
| return FileResponse(filepath, media_type="audio/mpeg") | |