edgellm / backend /api /routes.py
wu981526092's picture
add
f479a01
"""
API routes for Edge LLM
"""
from fastapi import APIRouter, HTTPException, Request, UploadFile, File
from fastapi.responses import FileResponse
from typing import List
from ..models import (
PromptRequest, PromptResponse, ModelInfo, ModelsResponse,
ModelLoadRequest, ModelUnloadRequest
)
from ..services.model_service import model_service
from ..services.chat_service import chat_service
from ..config import AVAILABLE_MODELS
# Import RAG system
try:
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from rag_system import get_rag_system
RAG_AVAILABLE = True
except ImportError as e:
print(f"RAG system not available: {e}")
RAG_AVAILABLE = False
# Create API router
router = APIRouter()
@router.get("/")
async def read_index():
"""Serve the React app"""
from ..config import FRONTEND_DIST_DIR
return FileResponse(f'{FRONTEND_DIST_DIR}/index.html')
@router.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "message": "Edge LLM API is running"}
@router.get("/models", response_model=ModelsResponse)
async def get_models():
"""Get available models and their status"""
models = []
for model_name, info in AVAILABLE_MODELS.items():
models.append(ModelInfo(
model_name=model_name,
name=info["name"],
supports_thinking=info["supports_thinking"],
description=info["description"],
size_gb=info["size_gb"],
is_loaded=model_service.is_model_loaded(model_name),
type=info["type"]
))
return ModelsResponse(
models=models,
current_model=model_service.get_current_model() or ""
)
@router.post("/load-model")
async def load_model(request: ModelLoadRequest):
"""Load a specific model"""
if request.model_name not in AVAILABLE_MODELS:
raise HTTPException(
status_code=400,
detail=f"Model {request.model_name} not available"
)
success = model_service.load_model(request.model_name)
if success:
model_service.set_current_model(request.model_name)
return {
"message": f"Model {request.model_name} loaded successfully",
"current_model": model_service.get_current_model()
}
else:
raise HTTPException(
status_code=500,
detail=f"Failed to load model {request.model_name}"
)
@router.post("/unload-model")
async def unload_model(request: ModelUnloadRequest):
"""Unload a specific model"""
success = model_service.unload_model(request.model_name)
if success:
return {
"message": f"Model {request.model_name} unloaded successfully",
"current_model": model_service.get_current_model() or ""
}
else:
raise HTTPException(
status_code=404,
detail=f"Model {request.model_name} not found in cache"
)
@router.post("/set-current-model")
async def set_current_model(request: ModelLoadRequest):
"""Set the current active model"""
if not model_service.is_model_loaded(request.model_name):
raise HTTPException(
status_code=400,
detail=f"Model {request.model_name} is not loaded. Please load it first."
)
model_service.set_current_model(request.model_name)
return {
"message": f"Current model set to {request.model_name}",
"current_model": model_service.get_current_model()
}
# RAG System Endpoints
@router.post("/rag/upload")
async def upload_document(files: List[UploadFile] = File(...)):
"""Upload documents for RAG system"""
if not RAG_AVAILABLE:
raise HTTPException(status_code=503, detail="RAG system not available")
rag_system = get_rag_system()
results = []
for file in files:
try:
# Read file content
content = await file.read()
# Process document
result = rag_system.add_document(
file_content=content,
filename=file.filename,
file_type=file.content_type
)
results.append({
"filename": file.filename,
"success": result["success"],
"doc_id": result.get("doc_id"),
"chunks": result.get("chunks"),
"message": result.get("message"),
"error": result.get("error")
})
except Exception as e:
results.append({
"filename": file.filename,
"success": False,
"error": str(e)
})
return {"results": results}
@router.delete("/rag/documents/{doc_id}")
async def delete_document(doc_id: str):
"""Delete a document from RAG system"""
if not RAG_AVAILABLE:
raise HTTPException(status_code=503, detail="RAG system not available")
rag_system = get_rag_system()
result = rag_system.remove_document(doc_id)
if result["success"]:
return result
else:
raise HTTPException(status_code=404, detail=result["error"])
@router.get("/rag/documents")
async def get_documents():
"""Get information about uploaded documents"""
if not RAG_AVAILABLE:
raise HTTPException(status_code=503, detail="RAG system not available")
rag_system = get_rag_system()
return rag_system.get_documents_info()
@router.post("/rag/search")
async def search_documents(query: str, max_results: int = 3):
"""Search through uploaded documents"""
if not RAG_AVAILABLE:
raise HTTPException(status_code=503, detail="RAG system not available")
rag_system = get_rag_system()
results = rag_system.search_similar(query, k=max_results)
return {"query": query, "results": results}
@router.post("/generate", response_model=PromptResponse)
async def generate_text(request: PromptRequest):
"""Generate text using the loaded model with optional RAG enhancement"""
# Use the model specified in request, or fall back to current model
model_to_use = request.model_name if request.model_name else model_service.get_current_model()
if not model_to_use:
raise HTTPException(
status_code=400,
detail="No model specified. Please load a model first."
)
if not model_service.is_model_loaded(model_to_use):
raise HTTPException(
status_code=400,
detail=f"Model {model_to_use} is not loaded. Please load it first."
)
try:
# Enhanced system prompt with RAG context if available
enhanced_system_prompt = request.system_prompt
# Check if RAG is available and should be used
use_rag = request.use_rag or False
retrieval_count = request.retrieval_count or 3
if RAG_AVAILABLE and use_rag:
rag_system = get_rag_system()
# Get relevant context for the current prompt
context = rag_system.get_context_for_query(request.prompt, max_chunks=retrieval_count)
if context:
# Enhance the system prompt with retrieved context
context_instruction = (
"\n\nAdditional Context from Documents:\n"
"Use the following information to help answer the user's question. "
"If the context is relevant, incorporate it into your response. "
"If the context is not relevant, you can ignore it.\n\n"
f"{context}\n"
"---\n"
)
enhanced_system_prompt = (request.system_prompt or "") + context_instruction
thinking_content, final_content, model_used, supports_thinking = chat_service.generate_response(
prompt=request.prompt,
model_name=model_to_use,
messages=[msg.dict() for msg in request.messages] if request.messages else [],
system_prompt=enhanced_system_prompt,
temperature=request.temperature,
max_new_tokens=request.max_new_tokens
)
return PromptResponse(
thinking_content=thinking_content,
content=final_content,
model_used=model_used,
supports_thinking=supports_thinking
)
except Exception as e:
print(f"Generation error: {e}")
raise HTTPException(status_code=500, detail=f"Generation failed: {str(e)}")
# Catch-all route for SPA - must be last
@router.get("/{full_path:path}")
async def catch_all(request: Request, full_path: str):
"""
Catch-all route to serve index.html for any unmatched paths.
This enables client-side routing for the React SPA.
Skip static file paths.
"""
# Skip static file paths
if full_path.startswith(('assets/', 'images/', 'static/')):
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="File not found")
from ..config import FRONTEND_DIST_DIR
return FileResponse(f'{FRONTEND_DIST_DIR}/index.html')