fahmiaziz98
init
d33d53d
raw
history blame
5.55 kB
import time
from loguru import logger
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
from models import RerankRequest, RerankResponse, RerankResult
from core import ModelManager
model_manager = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager with model preloading."""
global model_manager
# Startup
logger.info("Starting reranking API...")
try:
model_manager = ModelManager("config.yaml")
await model_manager.preload_all_models()
logger.success("Reranking API startup complete!")
except Exception as e:
logger.error(f"Failed to initialize models: {e}")
raise
yield
# Shutdown
logger.info("Shutting down reranking API...")
app = FastAPI(
title="Reranking API",
description="""
High-performance API for document reranking using multiple state-of-the-art models.
✅ **Supported Models:**
- **Qwen/Qwen3-Reranker-0.6B**
- **BAAI/bge-reranker-v2-m3**
- **jinaai/jina-reranker-v2-base-multilingual**
🚀 **Features:**
- Multiple reranking models preloaded at startup
- List all available models
- Optional instruction-based reranking (Qwen3)
⚠️ **Warning**: Not for production use!.
""",
version="1.0.0",
lifespan=lifespan
)
@app.post("/rerank", response_model=RerankResponse, tags=["Reranking"])
async def rerank_documents(request: RerankRequest):
"""
Rerank documents based on relevance to query.
This endpoint takes a query and list of documents, then returns them
ranked by relevance using the specified reranking model.
Args:
request: RerankRequest containing query, documents, and model info
Returns:
RerankResponse with ranked documents, scores, and metadata
Example:
```json
{
"query": "machine learning algorithms",
"documents": [
"Deep learning uses neural networks",
"Weather forecast for tomorrow",
"Supervised learning with labeled data"
],
"model_id": "jina-reranker-v2"
}
```
"""
if not request.query.strip():
raise HTTPException(400, "Query cannot be empty")
if not request.documents:
raise HTTPException(400, "Documents list cannot be empty")
valid_docs = [(i, doc.strip()) for i, doc in enumerate(request.documents) if doc.strip()]
if not valid_docs:
raise HTTPException(400, "No valid documents found after filtering empty strings")
try:
start_time = time.time()
model = model_manager.get_model(request.model_id)
original_indices, documents = zip(*valid_docs)
scores = model.rerank(
query=request.query.strip(),
documents=list(documents),
instruction=request.instruction
)
results = []
for i, (orig_idx, doc, score) in enumerate(zip(original_indices, documents, scores)):
results.append(RerankResult(
text=doc,
score=score,
index=orig_idx
))
results.sort(key=lambda x: x.score, reverse=True)
if request.top_k:
results = results[:request.top_k]
processing_time = time.time() - start_time
logger.info(
f"Reranked {len(documents)} documents in {processing_time:.3f}s "
f"using {request.model_id}"
)
return RerankResponse(
results=results,
query=request.query.strip(),
model_id=request.model_id,
processing_time=processing_time,
total_documents=len(request.documents),
returned_documents=len(results)
)
except ValueError as e:
raise HTTPException(400, str(e))
except Exception as e:
logger.error(f"Reranking failed: {e}")
raise HTTPException(500, f"Reranking failed: {str(e)}")
@app.get("/models", tags=["Models"])
async def list_models():
"""
List all available reranking models.
Returns information about all configured models including their
loading status and capabilities.
Returns:
List of model information dictionaries
"""
try:
return model_manager.list_models()
except Exception as e:
logger.error(f"Failed to list models: {e}")
raise HTTPException(500, str(e))
@app.get("/health", tags=["Monitoring"])
async def health_check():
"""
Check API health and model status.
Returns comprehensive health information including model loading
status and system metrics.
Returns:
Health status dictionary
"""
try:
models = model_manager.list_models()
loaded_models = [m for m in models if m['loaded']]
return {
"status": "ok",
"total_models": len(models),
"loaded_models": len(loaded_models),
"available_models": [m['id'] for m in loaded_models],
"models_info": models
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return {
"status": "error",
"error": str(e)
}
@app.get("/", tags=["Monitoring"])
async def root():
return {"message": "Welcome to Reranking API. Visit /docs for API documentation.", "version": "1.0.0"}