Pathora / backend /app.py
malavikapradeep2001's picture
Update
8b3e779
raw
history blame
54.1 kB
import os
import shutil
for d in ["/tmp/huggingface", "/tmp/Ultralytics", "/tmp/matplotlib", "/tmp/torch", "/root/.cache"]:
shutil.rmtree(d, ignore_errors=True)
os.environ["HF_HOME"] = "/tmp/huggingface"
os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface"
os.environ["TORCH_HOME"] = "/tmp/torch"
os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib"
os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics"
import json
import uuid
import datetime
import numpy as np
import torch
import cv2
import joblib
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.models as models
from io import BytesIO
from PIL import Image as PILImage
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, FileResponse
import tensorflow as tf
from model_histo import BreastCancerClassifier
from fastapi.staticfiles import StaticFiles
import uvicorn
try:
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image as ReportLabImage
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.enums import TA_CENTER, TA_JUSTIFY
from reportlab.lib.units import inch
from reportlab.lib.colors import navy, black
REPORTLAB_AVAILABLE = True
except ImportError:
REPORTLAB_AVAILABLE = False
from ultralytics import YOLO
from sklearn.preprocessing import MinMaxScaler
from model import MWT as create_model
from augmentations import Augmentations
from huggingface_hub import InferenceClient
# =====================================================
# SETUP TEMP DIRS AND ENV
# =====================================================
for d in ["/tmp/huggingface", "/tmp/Ultralytics", "/tmp/matplotlib", "/tmp/torch"]:
shutil.rmtree(d, ignore_errors=True)
os.environ["HF_HOME"] = "/tmp/huggingface"
os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface"
os.environ["TORCH_HOME"] = "/tmp/torch"
os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib"
os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics"
# =====================================================
# HUGGING FACE CLIENT SETUP
# =====================================================
HF_MODEL_ID = "BioMistral/BioMistral-7B"
hf_token = os.getenv("HF_TOKEN")
client = None
if hf_token:
try:
client = InferenceClient(model=HF_MODEL_ID, token=hf_token)
print(f"✅ Hugging Face InferenceClient initialized for {HF_MODEL_ID}")
except Exception as e:
print("⚠️ Failed to initialize Hugging Face client:", e)
else:
print("⚠️ Warning: No HF_TOKEN found — summaries will be skipped.")
def generate_ai_summary(abnormal_cells, normal_cells, avg_confidence):
"""Generate a brief medical interpretation using Mistral."""
if not client:
return "⚠️ Hugging Face client not initialized — skipping summary."
try:
prompt = f"""Act as a cytopathology expert providing a brief diagnostic interpretation.
Observed Cell Counts:
- {abnormal_cells} Abnormal Cells
- {normal_cells} Normal Cells
Write a 2-3 sentence professional medical assessment focusing on:
1. Cell count analysis
2. Abnormality ratio ({abnormal_cells/(abnormal_cells + normal_cells)*100:.1f}%)
3. Clinical significance
Use objective, scientific language suitable for a pathology report."""
# Use streaming to avoid StopIteration
response = client.text_generation(
prompt,
max_new_tokens=200,
temperature=0.7,
stream=False,
details=True,
stop_sequences=["\n\n", "###"]
)
# Handle different response formats
if hasattr(response, 'generated_text'):
return response.generated_text.strip()
elif isinstance(response, dict):
return response.get('generated_text', '').strip()
elif isinstance(response, str):
return response.strip()
# Fallback summary if response format is unexpected
ratio = abnormal_cells / (abnormal_cells + normal_cells) * 100 if (abnormal_cells + normal_cells) > 0 else 0
return f"Analysis shows {abnormal_cells} abnormal cells ({ratio:.1f}%) and {normal_cells} normal cells."
except Exception as e:
# Provide a structured fallback summary instead of error message
total = abnormal_cells + normal_cells
if total == 0:
return "No cells were detected in the sample. Consider re-scanning or adjusting detection parameters."
ratio = (abnormal_cells / total) * 100
severity = "high" if ratio > 70 else "moderate" if ratio > 30 else "low"
return f"Quantitative analysis detected {abnormal_cells} abnormal cells ({ratio:.1f}%) among {total} total cells, indicating {severity} abnormality ratio."
def generate_mwt_summary(predicted_label, confidences, avg_confidence):
"""Generate a short MWT-specific interpretation using the HF client when available."""
if not client:
return "⚠️ Hugging Face client not initialized — skipping AI interpretation."
try:
prompt = f"""
You are a concise cytopathology expert. Given an MWT classifier result, write a 1-2 sentence professional interpretation suitable for embedding in a diagnostic report.
Result:
- Predicted label: {predicted_label}
- Class probabilities: {json.dumps(confidences)}
Provide guidance on the significance of the result and any suggested next steps in plain, objective language.
"""
response = client.text_generation(
prompt,
max_new_tokens=120,
temperature=0.2,
stream=False,
details=True,
stop_sequences=["\n\n", "###"]
)
if hasattr(response, 'generated_text'):
return response.generated_text.strip()
elif isinstance(response, dict):
return response.get('generated_text', '').strip()
elif isinstance(response, str):
return response.strip()
return f"Result: {predicted_label}."
except Exception as e:
return f"Quantitative result: {predicted_label}."
def generate_cin_summary(predicted_grade, confidences, avg_confidence):
"""Generate a short CIN-specific interpretation using the HF client when available."""
if not client:
return "⚠️ Hugging Face client not initialized — skipping AI interpretation."
try:
prompt = f"""
You are a concise gynecologic pathology expert. Given a CIN classifier result, write a 1-2 sentence professional interpretation suitable for a diagnostic report.
Result:
- Predicted grade: {predicted_grade}
- Class probabilities: {json.dumps(confidences)}
Provide a brief statement about clinical significance and suggested next steps (e.g., further colposcopic evaluation) in objective, clinical language.
"""
response = client.text_generation(
prompt,
max_new_tokens=140,
temperature=0.2,
stream=False,
details=True,
stop_sequences=["\n\n", "###"]
)
if hasattr(response, 'generated_text'):
return response.generated_text.strip()
elif isinstance(response, dict):
return response.get('generated_text', '').strip()
elif isinstance(response, str):
return response.strip()
return f"Result: {predicted_grade}."
except Exception:
return f"Quantitative result: {predicted_grade}."
# =====================================================
# FASTAPI SETUP
# =====================================================
app = FastAPI(title="Pathora Medical Diagnostic API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*", "http://localhost:5173", "http://127.0.0.1:5173"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["*"] # Allow access to response headers
)
# Use /tmp for outputs in Hugging Face Spaces (writable directory)
OUTPUT_DIR = os.environ.get("OUTPUT_DIR", "/tmp/outputs")
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Create image outputs dir
IMAGES_DIR = os.path.join(OUTPUT_DIR, "images")
os.makedirs(IMAGES_DIR, exist_ok=True)
app.mount("/outputs", StaticFiles(directory=OUTPUT_DIR), name="outputs")
# Mount public sample images from frontend dist (Vite copies public/ to dist/ root)
# Check both possible locations: frontend/dist (Docker) and ../frontend/dist (local dev)
FRONTEND_DIST_CHECK = os.path.join(os.path.dirname(__file__), "frontend/dist")
if not os.path.isdir(FRONTEND_DIST_CHECK):
FRONTEND_DIST_CHECK = os.path.abspath(os.path.join(os.path.dirname(__file__), "../frontend/dist"))
for sample_dir in ["cyto", "colpo", "histo"]:
sample_path = os.path.join(FRONTEND_DIST_CHECK, sample_dir)
if os.path.isdir(sample_path):
app.mount(f"/{sample_dir}", StaticFiles(directory=sample_path), name=sample_dir)
print(f"✅ Mounted /{sample_dir} from {sample_path}")
else:
print(f"⚠️ Sample directory not found: {sample_path}")
# Mount other static assets (logos, banners) from dist root
for static_file in ["banner.jpeg", "white_logo.png", "black_logo.png", "manalife_LOGO.jpg"]:
static_path = os.path.join(FRONTEND_DIST_CHECK, static_file)
if os.path.isfile(static_path):
print(f"✅ Static file available: /{static_file}")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# =====================================================
# MODEL LOADS
# =====================================================
print("🔹 Loading YOLO model...")
yolo_model = YOLO("best2.pt")
print("🔹 Loading MWT model...")
mwt_model = create_model(num_classes=2).to(device)
mwt_model.load_state_dict(torch.load("MWTclass2.pth", map_location=device))
mwt_model.eval()
mwt_class_names = ["Negative", "Positive"]
print("🔹 Loading CIN model...")
try:
clf = joblib.load("logistic_regression_model.pkl")
except Exception as e:
print(f"⚠️ CIN classifier not available (logistic_regression_model.pkl missing or invalid): {e}")
clf = None
yolo_colposcopy = YOLO("yolo_colposcopy.pt")
# =====================================================
# RESNET FEATURE EXTRACTORS FOR CIN
# =====================================================
def build_resnet(model_name="resnet50"):
if model_name == "resnet50":
model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
elif model_name == "resnet101":
model = models.resnet101(weights=models.ResNet101_Weights.DEFAULT)
elif model_name == "resnet152":
model = models.resnet152(weights=models.ResNet152_Weights.DEFAULT)
model.eval().to(device)
return (
nn.Sequential(model.conv1, model.bn1, model.relu, model.maxpool),
model.layer1, model.layer2, model.layer3, model.layer4,
)
gap = nn.AdaptiveAvgPool2d((1, 1))
gmp = nn.AdaptiveMaxPool2d((1, 1))
resnet50_blocks = build_resnet("resnet50")
resnet101_blocks = build_resnet("resnet101")
resnet152_blocks = build_resnet("resnet152")
transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
def preprocess_for_mwt(image_np):
img = cv2.resize(image_np, (224, 224))
img = Augmentations.Normalization((0, 1))(img)
img = np.array(img, np.float32)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.transpose(2, 0, 1)
img = np.expand_dims(img, axis=0)
return torch.Tensor(img)
def extract_cbf_features(blocks, img_t):
block1, block2, block3, block4, block5 = blocks
with torch.no_grad():
f1 = block1(img_t)
f2 = block2(f1)
f3 = block3(f2)
f4 = block4(f3)
f5 = block5(f4)
p1 = gmp(f1).view(-1)
p2 = gmp(f2).view(-1)
p3 = gap(f3).view(-1)
p4 = gap(f4).view(-1)
p5 = gap(f5).view(-1)
return torch.cat([p1, p2, p3, p4, p5], dim=0).cpu().numpy()
# =====================================================
# Model 4: Histopathology Classifier (TensorFlow)
# =====================================================
print("🔹 Attempting to load Breast Cancer Histopathology model...")
try:
classifier = BreastCancerClassifier(fine_tune=False)
# Safely handle Hugging Face token auth
hf_token = os.getenv("HF_TOKEN")
if hf_token:
if classifier.authenticate_huggingface():
print("✅ Hugging Face authentication successful.")
else:
print("⚠️ Warning: Hugging Face authentication failed, using local model only.")
else:
print("⚠️ HF_TOKEN not found in environment — skipping authentication.")
# Load Path Foundation model
if classifier.load_path_foundation():
print("✅ Loaded Path Foundation base model.")
else:
print("⚠️ Could not load Path Foundation base model, continuing with local weights only.")
# Load trained histopathology model
model_path = "histopathology_trained_model.keras"
if os.path.exists(model_path):
classifier.model = tf.keras.models.load_model(model_path)
print(f"✅ Loaded local histopathology model: {model_path}")
else:
print(f"⚠️ Model file not found: {model_path}")
except Exception as e:
classifier = None
print(f"❌ Error initializing histopathology model: {e}")
def predict_histopathology(image):
if classifier is None:
return {"error": "Histopathology model not available."}
try:
if image.mode != "RGB":
image = image.convert("RGB")
image = image.resize((224, 224))
img_array = np.expand_dims(np.array(image).astype("float32") / 255.0, axis=0)
embeddings = classifier.extract_embeddings(img_array)
prediction_proba = classifier.model.predict(embeddings, verbose=0)[0]
predicted_class = int(np.argmax(prediction_proba))
class_names = ["Benign", "Malignant"]
# Return confidence as dictionary with both class probabilities (like MWT/CIN)
confidences = {class_names[i]: float(prediction_proba[i]) for i in range(len(class_names))}
avg_confidence = float(np.max(prediction_proba)) * 100
return {
"model_used": "Histopathology Classifier",
"prediction": class_names[predicted_class],
"confidence": confidences,
"summary": {
"ai_interpretation": f"Histopathological analysis indicates {class_names[predicted_class].lower()} tissue.",
},
}
except Exception as e:
return {"error": f"Histopathology prediction failed: {e}"}
# =====================================================
# MAIN ENDPOINT
# =====================================================
@app.post("/predict/")
async def predict(model_name: str = Form(...), file: UploadFile = File(...)):
print(f"Received prediction request - model: {model_name}, file: {file.filename}")
# Validate model name
if model_name not in ["yolo", "mwt", "cin", "histopathology"]:
return JSONResponse(
content={
"error": f"Invalid model_name: {model_name}. Must be one of: yolo, mwt, cin, histopathology"
},
status_code=400
)
# Validate and read file
if not file.filename:
return JSONResponse(
content={"error": "No file provided"},
status_code=400
)
contents = await file.read()
if len(contents) == 0:
return JSONResponse(
content={"error": "Empty file provided"},
status_code=400
)
# Attempt to open and validate image
try:
image = PILImage.open(BytesIO(contents)).convert("RGB")
image_np = np.array(image)
if image_np.size == 0:
raise ValueError("Empty image array")
print(f"Successfully loaded image, shape: {image_np.shape}")
except Exception as e:
return JSONResponse(
content={"error": f"Invalid image file: {str(e)}"},
status_code=400
)
if model_name == "yolo":
results = yolo_model(image)
detections_json = results[0].to_json()
detections = json.loads(detections_json)
abnormal_cells = sum(1 for d in detections if d["name"] == "abnormal")
normal_cells = sum(1 for d in detections if d["name"] == "normal")
avg_confidence = np.mean([d.get("confidence", 0) for d in detections]) * 100 if detections else 0
ai_summary = generate_ai_summary(abnormal_cells, normal_cells, avg_confidence)
output_filename = f"detected_{uuid.uuid4().hex[:8]}.jpg"
output_path = os.path.join(IMAGES_DIR, output_filename)
results[0].save(filename=output_path)
return {
"model_used": "YOLO Detection",
"detections": detections,
"annotated_image_url": f"/outputs/images/{output_filename}",
"summary": {
"abnormal_cells": abnormal_cells,
"normal_cells": normal_cells,
"ai_interpretation": ai_summary,
},
}
elif model_name == "mwt":
tensor = preprocess_for_mwt(image_np)
with torch.no_grad():
output = mwt_model(tensor.to(device)).cpu()
probs = torch.softmax(output, dim=1)[0]
confidences = {mwt_class_names[i]: float(probs[i]) for i in range(2)}
predicted_label = mwt_class_names[int(torch.argmax(probs).item())]
# Average / primary confidence for display
avg_confidence = float(torch.max(probs).item()) * 100
# Generate a brief AI interpretation using the Mistral client (if available)
ai_interp = generate_mwt_summary(predicted_label, confidences, avg_confidence)
return {
"model_used": "MWT Classifier",
"prediction": predicted_label,
"confidence": confidences,
"summary": {
"ai_interpretation": ai_interp,
},
}
elif model_name == "cin":
if clf is None:
return JSONResponse(
content={"error": "CIN classifier not available on server."},
status_code=503,
)
# Decode uploaded image and run colposcopy detector
nparr = np.frombuffer(contents, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
results = yolo_colposcopy.predict(source=img, conf=0.7, save=False, verbose=False)
if len(results[0].boxes) == 0:
return {"error": "No cervix detected"}
x1, y1, x2, y2 = map(int, results[0].boxes.xyxy[0].cpu().numpy())
crop = img[y1:y2, x1:x2]
crop = cv2.resize(crop, (224, 224))
img_t = transform(crop).unsqueeze(0).to(device)
# Extract features from multiple ResNet backbones
f50 = extract_cbf_features(resnet50_blocks, img_t)
f101 = extract_cbf_features(resnet101_blocks, img_t)
f152 = extract_cbf_features(resnet152_blocks, img_t)
features = np.concatenate([f50, f101, f152]).reshape(1, -1)
# Scale and predict
X_scaled = MinMaxScaler().fit_transform(features)
# Ensure classifier supports probability outputs
try:
proba = clf.predict_proba(X_scaled)[0]
except Exception as e:
return JSONResponse(
content={"error": "CIN classifier does not support probability estimates (predict_proba)."},
status_code=503,
)
num_classes = int(len(proba))
# Handle different classifier output sizes:
# - If 3 classes: map directly to CIN1/CIN2/CIN3
# - If 2 classes: apply a conservative heuristic to split High-grade into CIN2/CIN3
if num_classes == 3:
classes = ["CIN1", "CIN2", "CIN3"]
confidences = {classes[i]: float(proba[i]) for i in range(3)}
predicted_idx = int(np.argmax(proba))
predicted_label = classes[predicted_idx]
avg_confidence = float(np.max(proba)) * 100
mapping_used = "direct_3class"
elif num_classes == 2:
# Binary model detected (e.g., Low-grade vs High-grade). We'll convert to CIN1/CIN2/CIN3
# Heuristic:
# - CIN1 <- low_grade_prob
# - Split high_grade_prob into CIN2 and CIN3 based on how confident 'high' is.
# * If high <= 0.6 -> mostly CIN2
# * If high >= 0.8 -> mostly CIN3
# * Between 0.6 and 0.8 -> interpolate
low_prob = float(proba[0])
high_prob = float(proba[1])
if high_prob <= 0.6:
cin3_factor = 0.0
elif high_prob >= 0.8:
cin3_factor = 1.0
else:
cin3_factor = (high_prob - 0.6) / 0.2
cin1 = low_prob
cin3 = high_prob * cin3_factor
cin2 = high_prob - cin3
confidences = {"CIN1": cin1, "CIN2": cin2, "CIN3": cin3}
# pick highest of the mapped three as primary prediction
predicted_label = max(confidences.items(), key=lambda x: x[1])[0]
avg_confidence = float(max(confidences.values())) * 100
mapping_used = "binary_to_3class_heuristic"
else:
return JSONResponse(
content={
"error": "CIN classifier must output 2-class (Low/High) or 3-class probabilities (CIN1, CIN2, CIN3).",
"detected_num_classes": num_classes,
},
status_code=503,
)
# Generate AI interpretation using Mistral client (if available)
ai_interp = generate_cin_summary(predicted_label, confidences, avg_confidence)
response = {
"model_used": "CIN Classifier",
"prediction": predicted_label,
"confidence": confidences,
"summary": {
"ai_interpretation": ai_interp,
},
}
# If we used the binary->3class heuristic, include a diagnostic field so callers know it was mapped
if 'mapping_used' in locals() and mapping_used == 'binary_to_3class_heuristic':
response["mapping_used"] = mapping_used
response["mapping_note"] = (
"The server mapped a binary Low/High classifier to CIN1/CIN2/CIN3 using a heuristic split. "
"This is an approximation — for clinical use please supply a native 3-class model."
)
return response
elif model_name == "histopathology":
result = predict_histopathology(image)
return result
else:
return JSONResponse(content={"error": "Invalid model name"}, status_code=400)
# =====================================================
# ROUTES
# =====================================================
def create_designed_pdf(pdf_path, report_data, analysis_summary_json, annotated_image_path=None):
doc = SimpleDocTemplate(pdf_path, pagesize=letter,
rightMargin=72, leftMargin=72,
topMargin=72, bottomMargin=18)
styles = getSampleStyleSheet()
story = []
styles.add(ParagraphStyle(name='Title', fontSize=20, fontName='Helvetica-Bold', alignment=TA_CENTER, textColor=navy))
styles.add(ParagraphStyle(name='Section', fontSize=14, fontName='Helvetica-Bold', spaceBefore=10, spaceAfter=6))
styles.add(ParagraphStyle(name='NormalSmall', fontSize=10, leading=12))
styles.add(ParagraphStyle(name='Heading', fontSize=16, fontName='Helvetica-Bold', textColor=navy, spaceBefore=6, spaceAfter=4))
patient = report_data['patient']
analysis = report_data.get('analysis', {})
# Safely parse analysis_summary_json
try:
ai_summary = json.loads(analysis_summary_json) if analysis_summary_json else {}
except (json.JSONDecodeError, TypeError):
ai_summary = {}
# Determine report type based on model used
model_used = ai_summary.get('model_used', '')
if 'YOLO' in model_used or 'yolo' in str(analysis.get('id', '')).lower():
report_type = "CYTOLOGY"
report_title = "Cytology Report"
elif 'MWT' in model_used or 'mwt' in str(model_used).lower():
# MWT is a cytology classifier; use a clearer report title for MWT results
report_type = "CYTOLOGY"
report_title = "Cytology Analysis Report"
elif 'CIN' in model_used or 'cin' in str(analysis.get('id', '')).lower() or 'colpo' in str(analysis.get('id', '')).lower():
report_type = "COLPOSCOPY"
report_title = "Colposcopy Report"
elif 'histo' in str(analysis.get('id', '')).lower() or 'histopathology' in model_used.lower():
report_type = "HISTOPATHOLOGY"
report_title = "Histopathology Report"
else:
report_type = "CYTOLOGY"
report_title = "Medical Analysis Report"
# Header
story.append(Paragraph("MANALIFE AI", styles['Title']))
story.append(Paragraph("Advanced Medical Analysis", styles['NormalSmall']))
story.append(Spacer(1, 0.3*inch))
story.append(Paragraph(f"MEDICAL ANALYSIS REPORT OF {report_type}", styles['Heading']))
story.append(Paragraph(report_title, styles['Section']))
story.append(Spacer(1, 0.2*inch))
# Report ID and Date
story.append(Paragraph(f"<b>Report ID:</b> {report_data.get('report_id', 'N/A')}", styles['NormalSmall']))
story.append(Paragraph(f"<b>Generated:</b> {datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')}", styles['NormalSmall']))
story.append(Spacer(1, 0.2*inch))
# Patient Information Section
story.append(Paragraph("Patient Information", styles['Section']))
story.append(Paragraph(f"<b>Patient ID:</b> {patient.get('id', 'N/A')}", styles['NormalSmall']))
story.append(Paragraph(f"<b>Exam Date:</b> {patient.get('exam_date', 'N/A')}", styles['NormalSmall']))
story.append(Paragraph(f"<b>Physician:</b> {patient.get('physician', 'N/A')}", styles['NormalSmall']))
story.append(Paragraph(f"<b>Facility:</b> {patient.get('facility', 'N/A')}", styles['NormalSmall']))
story.append(Spacer(1, 0.2*inch))
# Sample Information Section
story.append(Paragraph("Sample Information", styles['Section']))
story.append(Paragraph(f"<b>Specimen Type:</b> {patient.get('specimen_type', 'Cervical Cytology')}", styles['NormalSmall']))
story.append(Paragraph(f"<b>Clinical History:</b> {patient.get('clinical_history', 'N/A')}", styles['NormalSmall']))
story.append(Spacer(1, 0.2*inch))
# AI Analysis Section
story.append(Paragraph("AI-ASSISTED ANALYSIS", styles['Section']))
story.append(Paragraph("<b>System:</b> Manalife AI System — Automated Analysis", styles['NormalSmall']))
# Add metrics based on report type
if report_type == "HISTOPATHOLOGY":
# For histopathology, show Benign/Malignant confidence
confidence_dict = ai_summary.get('confidence', {})
if isinstance(confidence_dict, dict):
benign_conf = confidence_dict.get('Benign', 0) * 100
malignant_conf = confidence_dict.get('Malignant', 0) * 100
story.append(Paragraph(f"<b>Benign Confidence:</b> {benign_conf:.2f}%", styles['NormalSmall']))
story.append(Paragraph(f"<b>Malignant Confidence:</b> {malignant_conf:.2f}%", styles['NormalSmall']))
elif report_type == "CYTOLOGY":
# For cytology and MWT, show class confidences if available, otherwise show abnormal/normal cells
confidence_dict = ai_summary.get('confidence', {})
if isinstance(confidence_dict, dict) and confidence_dict:
for cls, val in confidence_dict.items():
conf_pct = val * 100 if isinstance(val, (int, float)) else 0
story.append(Paragraph(f"<b>{cls} Confidence:</b> {conf_pct:.2f}%", styles['NormalSmall']))
else:
if 'abnormal_cells' in ai_summary:
story.append(Paragraph(f"<b>Abnormal Cells:</b> {ai_summary.get('abnormal_cells', 'N/A')}", styles['NormalSmall']))
if 'normal_cells' in ai_summary:
story.append(Paragraph(f"<b>Normal Cells:</b> {ai_summary.get('normal_cells', 'N/A')}", styles['NormalSmall']))
else:
# For CIN/Colposcopy, show class confidences
confidence_dict = ai_summary.get('confidence', {})
if isinstance(confidence_dict, dict):
for cls, val in confidence_dict.items():
conf_pct = val * 100 if isinstance(val, (int, float)) else 0
story.append(Paragraph(f"<b>{cls} Confidence:</b> {conf_pct:.2f}%", styles['NormalSmall']))
story.append(Spacer(1, 0.1*inch))
story.append(Paragraph("<b>AI Interpretation:</b>", styles['NormalSmall']))
story.append(Paragraph(ai_summary.get('ai_interpretation', 'Not available.'), styles['NormalSmall']))
story.append(Spacer(1, 0.2*inch))
# If an annotated image path was provided and exists on disk, embed it
if annotated_image_path:
try:
if os.path.isfile(annotated_image_path):
story.append(Spacer(1, 0.1*inch))
# Determine image pixel size and scale to a reasonable width for PDF
try:
from PIL import Image as PILImageLocal
with PILImageLocal.open(annotated_image_path) as im:
img_w, img_h = im.size
except Exception:
img_w, img_h = (800, 600)
# Display width in points (ReportLab uses points; 1 inch = 72 points). Assume 96 DPI for pixel->inch.
display_width_px = max(300, min(img_w, 800))
width_points = min(5 * inch, (display_width_px / 96.0) * inch)
img = ReportLabImage(annotated_image_path, width=width_points, kind='proportional')
story.append(img)
story.append(Spacer(1, 0.2*inch))
except Exception as e:
# Don't fail the whole PDF creation if image embedding fails
print(f"⚠️ Could not embed annotated image in PDF: {e}")
# Doctor's Notes
story.append(Paragraph("Doctor's Notes", styles['Section']))
story.append(Paragraph(report_data.get('doctor_notes') or 'No additional notes provided.', styles['NormalSmall']))
story.append(Spacer(1, 0.2*inch))
# Recommendations
story.append(Paragraph("RECOMMENDATIONS", styles['Section']))
story.append(Paragraph("Continue routine screening as per standard guidelines. Follow up as directed by your physician.", styles['NormalSmall']))
story.append(Spacer(1, 0.3*inch))
# Signatures
story.append(Paragraph("Signatures", styles['Section']))
story.append(Paragraph("Rajesh Venugopal, Physician", styles['NormalSmall']))
#story.append(Paragraph("", styles['NormalSmall']))
story.append(Spacer(1, 0.1*inch))
story.append(Paragraph(f"Generated on: {datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')}", styles['NormalSmall']))
doc.build(story)
@app.post("/reports/")
async def generate_report(
patient_id: str = Form(...),
exam_date: str = Form(...),
metadata: str = Form(...),
notes: str = Form(None),
analysis_id: str = Form(None),
analysis_summary: str = Form(None),
file: UploadFile = File(None),
):
"""Generate a structured medical report from analysis results and metadata."""
try:
# Create reports directory if it doesn't exist
reports_dir = os.path.join(OUTPUT_DIR, "reports")
os.makedirs(reports_dir, exist_ok=True)
# Generate unique report ID
report_id = f"{patient_id}_{uuid.uuid4().hex[:8]}"
report_dir = os.path.join(reports_dir, report_id)
os.makedirs(report_dir, exist_ok=True)
# Parse metadata
metadata_dict = json.loads(metadata)
# Get analysis results - assuming stored in memory or retrievable
# TODO: Implement analysis results storage/retrieval
# Construct report data
report_data = {
"report_id": report_id,
"generated_at": datetime.datetime.now().isoformat(),
"patient": {
"id": patient_id,
"exam_date": exam_date,
**metadata_dict
},
"analysis": {
"id": analysis_id,
# If the analysis_id is actually an annotated image URL, store it for report embedding
"annotated_image_url": analysis_id,
# TODO: Add actual analysis results
},
"doctor_notes": notes
}
# Save report data
report_json = os.path.join(report_dir, "report.json")
with open(report_json, "w", encoding="utf-8") as f:
json.dump(report_data, f, indent=2, ensure_ascii=False)
# We'll create PDF later (after parsing analysis_summary and resolving
# any annotated/input image). Initialize pdf_url here.
pdf_url = None
# Parse analysis_summary to get AI results
try:
ai_summary = json.loads(analysis_summary) if analysis_summary else {}
except (json.JSONDecodeError, TypeError):
ai_summary = {}
# Resolve annotated image: prefer AI/analysis annotated image; if none,
# save the uploaded input image (if provided) into the report folder
# and use that as the embedded image.
annotated_img = ai_summary.get('annotated_image_url') or report_data.get("analysis", {}).get("annotated_image_url") or ""
annotated_img_full = ""
annotated_img_local = None
if annotated_img:
# If it's an outputs path served by StaticFiles, map to local file
if isinstance(annotated_img, str) and annotated_img.startswith('/outputs/'):
rel = annotated_img[len('/outputs/'):].lstrip('/')
annotated_img_local = os.path.join(OUTPUT_DIR, rel)
annotated_img_full = annotated_img
else:
# keep absolute URLs as-is for HTML
annotated_img_full = annotated_img if isinstance(annotated_img, str) else ''
# If no annotated image provided, but an input file was uploaded, save it
if not annotated_img_full and file is not None and getattr(file, 'filename', None):
try:
input_filename = f"input_image{os.path.splitext(file.filename)[1] or '.jpg'}"
input_path = os.path.join(report_dir, input_filename)
contents = await file.read()
with open(input_path, 'wb') as out_f:
out_f.write(contents)
annotated_img_full = f"/outputs/reports/{report_id}/{input_filename}"
annotated_img_local = input_path
except Exception as e:
print(f"⚠️ Failed to save uploaded input image for report: {e}")
# Ensure annotated_img_full has a leading slash if it's a relative path
if annotated_img_full and not annotated_img_full.startswith(('http://', 'https://')):
annotated_img_full = annotated_img_full if annotated_img_full.startswith('/') else '/' + annotated_img_full
# If we have a local annotated image but it's stored in the shared images folder
# (e.g. /outputs/images/...), copy it into this report's folder so the HTML/PDF
# can reference the image relative to the report directory. This also makes the
# image visible when opening report.html directly from disk (file://).
try:
if annotated_img_local:
annotated_img_local_abs = os.path.abspath(annotated_img_local)
report_dir_abs = os.path.abspath(report_dir)
# If the image is not already in the report directory, copy it there
if not os.path.commonpath([annotated_img_local_abs, report_dir_abs]) == report_dir_abs:
src_basename = os.path.basename(annotated_img_local_abs)
dest_name = f"annotated_{src_basename}"
dest_path = os.path.join(report_dir, dest_name)
try:
shutil.copy2(annotated_img_local_abs, dest_path)
annotated_img_local = dest_path
annotated_img_full = f"/outputs/reports/{report_id}/{dest_name}"
except Exception as e:
# If copy fails, keep using the original annotated_img_full (may be served by StaticFiles)
print(f"⚠️ Failed to copy annotated image into report folder: {e}")
except Exception:
pass
# Now attempt to create the PDF (passing the local annotated image path
# so the PDF writer can embed it). If annotation is remote or not
# available, PDF creation will still proceed without the image.
if REPORTLAB_AVAILABLE:
try:
pdf_path = os.path.join(report_dir, "report.pdf")
create_designed_pdf(pdf_path, report_data, analysis_summary, annotated_img_local)
pdf_url = f"/outputs/reports/{report_id}/report.pdf"
except Exception as e:
print(f"Error creating designed PDF: {e}")
pdf_url = None
# Determine report type based on analysis summary or model used
model_used = ai_summary.get('model_used', '')
if 'YOLO' in model_used or 'yolo' in str(analysis_id).lower():
report_type = "Cytology"
report_title = "Cytology Report"
elif 'MWT' in model_used or 'mwt' in str(model_used).lower() or 'mwt' in str(analysis_id).lower():
# MWT is a cytology classifier — use clearer title
report_type = "Cytology"
report_title = "Cytology Analysis Report"
elif 'CIN' in model_used or 'cin' in str(analysis_id).lower() or 'colpo' in str(analysis_id).lower():
report_type = "Colposcopy"
report_title = "Colposcopy Report"
elif 'histo' in str(analysis_id).lower() or 'histopathology' in model_used.lower():
report_type = "Histopathology"
report_title = "Histopathology Report"
else:
# Default fallback
report_type = "Cytology"
report_title = "Medical Analysis Report"
# Build analysis metrics HTML based on report type
if report_type == "Histopathology":
# For histopathology, show Benign/Malignant confidence from the confidence dict
confidence_dict = ai_summary.get('confidence', {})
benign_conf = confidence_dict.get('Benign', 0) * 100 if isinstance(confidence_dict, dict) else 0
malignant_conf = confidence_dict.get('Malignant', 0) * 100 if isinstance(confidence_dict, dict) else 0
analysis_metrics_html = f"""
<tr><th>System</th><td>Manalife AI System — Automated Analysis</td></tr>
<tr><th>Benign Confidence</th><td>{benign_conf:.2f}%</td></tr>
<tr><th>Malignant Confidence</th><td>{malignant_conf:.2f}%</td></tr>
"""
elif report_type == "Cytology":
# For cytology (YOLO) or MWT, show class confidences if provided, else abnormal/normal counts
confidence_dict = ai_summary.get('confidence', {})
if isinstance(confidence_dict, dict) and confidence_dict:
confidence_rows = ""
for cls, val in confidence_dict.items():
conf_pct = val * 100 if isinstance(val, (int, float)) else 0
confidence_rows += f"<tr><th>{cls} Confidence</th><td>{conf_pct:.2f}%</td></tr>\n "
analysis_metrics_html = f"""
<tr><th>System</th><td>Manalife AI System — Automated Analysis</td></tr>
{confidence_rows}
"""
else:
analysis_metrics_html = f"""
<tr><th>System</th><td>Manalife AI System — Automated Analysis</td></tr>
<tr><th>Abnormal Cells</th><td>{ai_summary.get('abnormal_cells', 'N/A')}</td></tr>
<tr><th>Normal Cells</th><td>{ai_summary.get('normal_cells', 'N/A')}</td></tr>
"""
else:
# For CIN/Colposcopy or other models, show generic confidence
confidence_dict = ai_summary.get('confidence', {})
confidence_rows = ""
if isinstance(confidence_dict, dict):
for cls, val in confidence_dict.items():
conf_pct = val * 100 if isinstance(val, (int, float)) else 0
confidence_rows += f"<tr><th>{cls} Confidence</th><td>{conf_pct:.2f}%</td></tr>\n "
analysis_metrics_html = f"""
<tr><th>System</th><td>Manalife AI System — Automated Analysis</td></tr>
{confidence_rows}
"""
# Build final HTML including download links and embedded annotated image
report_html = os.path.join(report_dir, "report.html")
json_url = f"/outputs/reports/{report_id}/report.json"
html_url = f"/outputs/reports/{report_id}/report.html"
# annotated_img_full was computed earlier; ensure it's defined and set
# annotated_img (used by the HTML template conditional) accordingly.
if 'annotated_img_full' not in locals() or not annotated_img_full:
annotated_img_full = ''
annotated_img = annotated_img_full
# Compute a display path for the HTML. Prefer a relative filename when the
# annotated image is copied into the same report folder. This makes the
# HTML work when opened directly from disk (or via HF file viewer).
annotated_img_display = annotated_img_full
try:
if annotated_img_local:
annotated_img_local_abs = os.path.abspath(annotated_img_local)
report_dir_abs = os.path.abspath(report_dir)
# If the annotated image resides in the report folder, reference by basename
if os.path.commonpath([annotated_img_local_abs, report_dir_abs]) == report_dir_abs:
annotated_img_display = os.path.basename(annotated_img_local_abs)
else:
# If annotated image is inside the outputs/reports/<report_id>/ path but not same
# absolute path (edge cases), make it relative to the report dir
prefix = f"/outputs/reports/{report_id}/"
if isinstance(annotated_img_full, str) and annotated_img_full.startswith(prefix):
annotated_img_display = annotated_img_full[len(prefix):]
except Exception:
annotated_img_display = annotated_img_full
download_pdf_btn = f'<a href="{pdf_url}" download style="text-decoration:none"><button class="btn-secondary">Download PDF</button></a>' if pdf_url else ''
# Format generated time
generated_time = datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')
html_content = f"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width,initial-scale=1" />
<title>{report_title} — Manalife AI</title>
<style>
:root{{--bg:#f8fafc;--card:#ffffff;--muted:#6b7280;--accent:#0f172a}}
body{{font-family:Inter,ui-sans-serif,system-ui,-apple-system,"Segoe UI",Roboto,"Helvetica Neue",Arial;margin:0;background:var(--bg);color:var(--accent);line-height:1.45}}
.container{{max-width:900px;margin:36px auto;padding:20px}}
header{{display:flex;align-items:center;gap:16px}}
.brand{{display:flex;flex-direction:column}}
h1{{margin:0;font-size:20px}}
.sub{{color:var(--muted);font-size:13px}}
.card{{background:var(--card);box-shadow:0 6px 18px rgba(15,23,42,0.06);border-radius:12px;padding:20px;margin-top:18px}}
.grid{{display:grid;grid-template-columns:1fr 1fr;gap:12px}}
.section-title{{font-weight:600;margin-top:8px}}
table{{width:100%;border-collapse:collapse;margin-top:8px}}
td,th{{padding:8px;border-bottom:1px dashed #e6e9ef;text-align:left;font-size:14px}}
.full{{grid-column:1/-1}}
.muted{{color:var(--muted);font-size:13px}}
.footer{{margin-top:20px;font-size:13px;color:var(--muted)}}
.pill{{background:#eef2ff;color:#1e3a8a;padding:6px 10px;border-radius:999px;font-weight:600;font-size:13px}}
@media (max-width:700px){{.grid{{grid-template-columns:1fr}}}}
.signatures{{display:flex;gap:20px;flex-wrap:wrap;margin-top:12px}}
.sig{{background:#fbfbfd;border:1px solid #f0f1f5;padding:10px;border-radius:8px;min-width:180px}}
.annotated-image{{max-width:100%;height:auto;border-radius:8px;margin-top:12px;border:1px solid #e6e9ef}}
.btn-primary{{padding:10px 14px;border-radius:8px;border:1px solid #2563eb;background:#2563eb;color:white;font-weight:700;cursor:pointer}}
.btn-secondary{{padding:10px 14px;border-radius:8px;border:1px solid #e6eefc;background:#eef2ff;font-weight:700;cursor:pointer}}
.actions-bar{{margin-top:12px;display:flex;gap:8px;flex-wrap:wrap}}
</style>
</head>
<body>
<div class="container">
<header>
<div>
<!-- Use the static logo from frontend public/ (copied to dist by Vite) -->
<img src="/manalife_LOGO.jpg" alt="Manalife Logo" width="64" height="64">
</div>
<div class="brand">
<h1>MANALIFE AI — Medical Analysis</h1>
<div class="sub">Advanced cytological colposcopy and histopathology reporting</div>
<div class="muted">[email protected] • +1 (555) 123-4567</div>
</div>
</header>
<div class="card">
<div style="display:flex;justify-content:space-between;align-items:center;gap:12px;flex-wrap:wrap">
<div>
<div class="muted">MEDICAL ANALYSIS REPORT OF {report_type.upper()}</div>
<h2 style="margin:6px 0 0 0">{report_title}</h2>
</div>
<div style="text-align:right">
<div class="pill">Report ID: {report_id}</div>
<div class="muted" style="margin-top:6px">Generated: {generated_time}</div>
</div>
</div>
<hr style="border:none;border-top:1px solid #eef2f6;margin:16px 0">
<div class="grid">
<div>
<div class="section-title">Patient Information</div>
<table>
<tr><th>Patient ID</th><td>{patient_id}</td></tr>
<tr><th>Exam Date</th><td>{exam_date}</td></tr>
<tr><th>Physician</th><td>{metadata_dict.get('physician', 'N/A')}</td></tr>
<tr><th>Facility</th><td>{metadata_dict.get('facility', 'N/A')}</td></tr>
</table>
</div>
<div>
<div class="section-title">Sample Information</div>
<table>
<tr><th>Specimen Type</th><td>{metadata_dict.get('specimen_type', 'N/A')}</td></tr>
<tr><th>Clinical History</th><td>{metadata_dict.get('clinical_history', 'N/A')}</td></tr>
<tr><th>Collected</th><td>{exam_date}</td></tr>
<tr><th>Reported</th><td>{generated_time}</td></tr>
</table>
</div>
<div class="full">
<div class="section-title">AI-Assisted Analysis</div>
<table>
{analysis_metrics_html}
</table>
<div style="margin-top:12px;padding:12px;background:#f8fafc;border-radius:8px;border-left:4px solid #2563eb">
<div style="font-weight:600;margin-bottom:6px">AI Interpretation:</div>
<div class="muted">{ai_summary.get('ai_interpretation', 'No AI interpretation available.')}</div>
</div>
</div>
{'<div class="full"><div class="section-title">Annotated Analysis Image</div><img src="' + annotated_img_display + '" class="annotated-image" alt="Annotated Analysis Result" /></div>' if annotated_img else ''}
<div class="full">
<div class="section-title">Doctor\'s Notes</div>
<p class="muted">{notes or 'No additional notes provided.'}</p>
</div>
<div class="full">
<div class="section-title">Recommendations</div>
<p class="muted">Continue routine screening as per standard guidelines. Follow up as directed by your physician.</p>
</div>
<div class="full">
<div class="section-title">Signatures</div>
<div class="signatures">
<div class="sig">
<div style="font-weight:700">Rajesh Venugopal</div>
<div class="muted">Physician</div>
</div>
</div>
</div>
</div>
<div class="footer">
<div>AI System: Manalife AI — Automated Analysis</div>
<div style="margin-top:6px">Report generated: {report_data['generated_at']}</div>
</div>
</div>
<div class="actions-bar">
{download_pdf_btn}
<button class="btn-secondary" onclick="window.print()">Print Report</button>
</div>
</div>
</body>
</html>"""
with open(report_html, "w", encoding="utf-8") as f:
f.write(html_content)
# Update report.json to include the resolved annotated image url so callers can find it
try:
report_data['analysis']['annotated_image_url'] = annotated_img_full or ''
with open(report_json, 'w', encoding='utf-8') as f:
json.dump(report_data, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"⚠️ Failed to update report.json with annotated image url: {e}")
return {
"report_id": report_id,
"json_url": json_url,
"html_url": html_url,
"pdf_url": pdf_url,
}
except Exception as e:
return JSONResponse(
content={"error": f"Failed to generate report: {str(e)}"},
status_code=500
)
@app.get("/reports/{report_id}")
async def get_report(report_id: str):
"""Fetch a generated report by ID."""
report_dir = os.path.join(OUTPUT_DIR, "reports", report_id)
report_json = os.path.join(report_dir, "report.json")
if not os.path.exists(report_json):
return JSONResponse(
content={"error": "Report not found"},
status_code=404
)
with open(report_json, "r") as f:
report_data = json.load(f)
return report_data
@app.get("/reports")
async def list_reports(patient_id: str = None):
"""List all generated reports, optionally filtered by patient ID."""
reports_dir = os.path.join(OUTPUT_DIR, "reports")
if not os.path.exists(reports_dir):
return {"reports": []}
reports = []
for report_id in os.listdir(reports_dir):
report_json = os.path.join(reports_dir, report_id, "report.json")
if os.path.exists(report_json):
with open(report_json, "r") as f:
report_data = json.load(f)
if not patient_id or report_data["patient"]["id"] == patient_id:
reports.append({
"report_id": report_id,
"patient_id": report_data["patient"]["id"],
"exam_date": report_data["patient"]["exam_date"],
"generated_at": report_data["generated_at"]
})
return {"reports": sorted(reports, key=lambda r: r["generated_at"], reverse=True)}
@app.get("/models")
def get_models():
return {"available_models": ["yolo", "mwt", "cin", "histopathology"]}
@app.get("/health")
def health():
return {"message": "Pathora Medical Diagnostic API is running!"}
# FRONTEND
# =====================================================
# Serve frontend only if it has been built; avoid startup failure when dist/ is missing.
FRONTEND_DIST = os.path.abspath(os.path.join(os.path.dirname(__file__), "../frontend/dist"))
# Check if frontend/dist exists in /app (Docker), otherwise check relative to script location
if not os.path.isdir(FRONTEND_DIST):
# Fallback for Docker: frontend is copied to ./frontend/dist during build
FRONTEND_DIST = os.path.join(os.path.dirname(__file__), "frontend/dist")
ASSETS_DIR = os.path.join(FRONTEND_DIST, "assets")
if os.path.isdir(ASSETS_DIR):
app.mount("/assets", StaticFiles(directory=ASSETS_DIR), name="assets")
else:
print("ℹ️ Frontend assets directory not found — skipping /assets mount.")
@app.get("/")
async def serve_frontend():
index_path = os.path.join(FRONTEND_DIST, "index.html")
if os.path.isfile(index_path):
return FileResponse(index_path)
return JSONResponse({"message": "Backend is running. Frontend build not found."})
@app.get("/{file_path:path}")
async def serve_static_files(file_path: str):
"""Serve static files from frontend dist (images, logos, etc.)"""
# Skip API routes
if file_path.startswith(("predict", "reports", "models", "health", "outputs", "assets", "cyto", "colpo", "histo")):
return JSONResponse({"error": "Not found"}, status_code=404)
# Try to serve file from dist root
static_file = os.path.join(FRONTEND_DIST, file_path)
if os.path.isfile(static_file):
return FileResponse(static_file)
# Fallback to index.html for client-side routing
index_path = os.path.join(FRONTEND_DIST, "index.html")
if os.path.isfile(index_path):
return FileResponse(index_path)
return JSONResponse({"error": "Not found"}, status_code=404)
if __name__ == "__main__":
# Use PORT provided by the environment (Hugging Face Spaces sets PORT=7860)
port = int(os.environ.get("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)