Spaces:
Running
Running
| import os | |
| import shutil | |
| for d in ["/tmp/huggingface", "/tmp/Ultralytics", "/tmp/matplotlib", "/tmp/torch", "/root/.cache"]: | |
| shutil.rmtree(d, ignore_errors=True) | |
| os.environ["HF_HOME"] = "/tmp/huggingface" | |
| os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface" | |
| os.environ["TORCH_HOME"] = "/tmp/torch" | |
| os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" | |
| os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" | |
| import json | |
| import uuid | |
| import datetime | |
| import numpy as np | |
| import torch | |
| import cv2 | |
| import joblib | |
| import torch.nn as nn | |
| import torchvision.transforms as transforms | |
| import torchvision.models as models | |
| from io import BytesIO | |
| from PIL import Image as PILImage | |
| from fastapi import FastAPI, File, UploadFile, Form | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse, FileResponse | |
| import tensorflow as tf | |
| from model_histo import BreastCancerClassifier | |
| from fastapi.staticfiles import StaticFiles | |
| import uvicorn | |
| try: | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image as ReportLabImage | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.enums import TA_CENTER, TA_JUSTIFY | |
| from reportlab.lib.units import inch | |
| from reportlab.lib.colors import navy, black | |
| REPORTLAB_AVAILABLE = True | |
| except ImportError: | |
| REPORTLAB_AVAILABLE = False | |
| from ultralytics import YOLO | |
| from sklearn.preprocessing import MinMaxScaler | |
| from model import MWT as create_model | |
| from augmentations import Augmentations | |
| from huggingface_hub import InferenceClient | |
| # ===================================================== | |
| # SETUP TEMP DIRS AND ENV | |
| # ===================================================== | |
| for d in ["/tmp/huggingface", "/tmp/Ultralytics", "/tmp/matplotlib", "/tmp/torch"]: | |
| shutil.rmtree(d, ignore_errors=True) | |
| os.environ["HF_HOME"] = "/tmp/huggingface" | |
| os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface" | |
| os.environ["TORCH_HOME"] = "/tmp/torch" | |
| os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" | |
| os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" | |
| # ===================================================== | |
| # HUGGING FACE CLIENT SETUP | |
| # ===================================================== | |
| HF_MODEL_ID = "BioMistral/BioMistral-7B" | |
| hf_token = os.getenv("HF_TOKEN") | |
| client = None | |
| if hf_token: | |
| try: | |
| client = InferenceClient(model=HF_MODEL_ID, token=hf_token) | |
| print(f"✅ Hugging Face InferenceClient initialized for {HF_MODEL_ID}") | |
| except Exception as e: | |
| print("⚠️ Failed to initialize Hugging Face client:", e) | |
| else: | |
| print("⚠️ Warning: No HF_TOKEN found — summaries will be skipped.") | |
| def generate_ai_summary(abnormal_cells, normal_cells, avg_confidence): | |
| """Generate a brief medical interpretation using Mistral.""" | |
| if not client: | |
| return "⚠️ Hugging Face client not initialized — skipping summary." | |
| try: | |
| prompt = f"""Act as a cytopathology expert providing a brief diagnostic interpretation. | |
| Observed Cell Counts: | |
| - {abnormal_cells} Abnormal Cells | |
| - {normal_cells} Normal Cells | |
| Write a 2-3 sentence professional medical assessment focusing on: | |
| 1. Cell count analysis | |
| 2. Abnormality ratio ({abnormal_cells/(abnormal_cells + normal_cells)*100:.1f}%) | |
| 3. Clinical significance | |
| Use objective, scientific language suitable for a pathology report.""" | |
| # Use streaming to avoid StopIteration | |
| response = client.text_generation( | |
| prompt, | |
| max_new_tokens=200, | |
| temperature=0.7, | |
| stream=False, | |
| details=True, | |
| stop_sequences=["\n\n", "###"] | |
| ) | |
| # Handle different response formats | |
| if hasattr(response, 'generated_text'): | |
| return response.generated_text.strip() | |
| elif isinstance(response, dict): | |
| return response.get('generated_text', '').strip() | |
| elif isinstance(response, str): | |
| return response.strip() | |
| # Fallback summary if response format is unexpected | |
| ratio = abnormal_cells / (abnormal_cells + normal_cells) * 100 if (abnormal_cells + normal_cells) > 0 else 0 | |
| return f"Analysis shows {abnormal_cells} abnormal cells ({ratio:.1f}%) and {normal_cells} normal cells." | |
| except Exception as e: | |
| # Provide a structured fallback summary instead of error message | |
| total = abnormal_cells + normal_cells | |
| if total == 0: | |
| return "No cells were detected in the sample. Consider re-scanning or adjusting detection parameters." | |
| ratio = (abnormal_cells / total) * 100 | |
| severity = "high" if ratio > 70 else "moderate" if ratio > 30 else "low" | |
| return f"Quantitative analysis detected {abnormal_cells} abnormal cells ({ratio:.1f}%) among {total} total cells, indicating {severity} abnormality ratio." | |
| def generate_mwt_summary(predicted_label, confidences, avg_confidence): | |
| """Generate a short MWT-specific interpretation using the HF client when available.""" | |
| if not client: | |
| return "⚠️ Hugging Face client not initialized — skipping AI interpretation." | |
| try: | |
| prompt = f""" | |
| You are a concise cytopathology expert. Given an MWT classifier result, write a 1-2 sentence professional interpretation suitable for embedding in a diagnostic report. | |
| Result: | |
| - Predicted label: {predicted_label} | |
| - Class probabilities: {json.dumps(confidences)} | |
| Provide guidance on the significance of the result and any suggested next steps in plain, objective language. | |
| """ | |
| response = client.text_generation( | |
| prompt, | |
| max_new_tokens=120, | |
| temperature=0.2, | |
| stream=False, | |
| details=True, | |
| stop_sequences=["\n\n", "###"] | |
| ) | |
| if hasattr(response, 'generated_text'): | |
| return response.generated_text.strip() | |
| elif isinstance(response, dict): | |
| return response.get('generated_text', '').strip() | |
| elif isinstance(response, str): | |
| return response.strip() | |
| return f"Result: {predicted_label}." | |
| except Exception as e: | |
| return f"Quantitative result: {predicted_label}." | |
| def generate_cin_summary(predicted_grade, confidences, avg_confidence): | |
| """Generate a short CIN-specific interpretation using the HF client when available.""" | |
| if not client: | |
| return "⚠️ Hugging Face client not initialized — skipping AI interpretation." | |
| try: | |
| prompt = f""" | |
| You are a concise gynecologic pathology expert. Given a CIN classifier result, write a 1-2 sentence professional interpretation suitable for a diagnostic report. | |
| Result: | |
| - Predicted grade: {predicted_grade} | |
| - Class probabilities: {json.dumps(confidences)} | |
| Provide a brief statement about clinical significance and suggested next steps (e.g., further colposcopic evaluation) in objective, clinical language. | |
| """ | |
| response = client.text_generation( | |
| prompt, | |
| max_new_tokens=140, | |
| temperature=0.2, | |
| stream=False, | |
| details=True, | |
| stop_sequences=["\n\n", "###"] | |
| ) | |
| if hasattr(response, 'generated_text'): | |
| return response.generated_text.strip() | |
| elif isinstance(response, dict): | |
| return response.get('generated_text', '').strip() | |
| elif isinstance(response, str): | |
| return response.strip() | |
| return f"Result: {predicted_grade}." | |
| except Exception: | |
| return f"Quantitative result: {predicted_grade}." | |
| # ===================================================== | |
| # FASTAPI SETUP | |
| # ===================================================== | |
| app = FastAPI(title="Pathora Medical Diagnostic API") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*", "http://localhost:5173", "http://127.0.0.1:5173"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| expose_headers=["*"] # Allow access to response headers | |
| ) | |
| # Use /tmp for outputs in Hugging Face Spaces (writable directory) | |
| OUTPUT_DIR = os.environ.get("OUTPUT_DIR", "/tmp/outputs") | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # Create image outputs dir | |
| IMAGES_DIR = os.path.join(OUTPUT_DIR, "images") | |
| os.makedirs(IMAGES_DIR, exist_ok=True) | |
| app.mount("/outputs", StaticFiles(directory=OUTPUT_DIR), name="outputs") | |
| # Mount public sample images from frontend dist (Vite copies public/ to dist/ root) | |
| # Check both possible locations: frontend/dist (Docker) and ../frontend/dist (local dev) | |
| FRONTEND_DIST_CHECK = os.path.join(os.path.dirname(__file__), "frontend/dist") | |
| if not os.path.isdir(FRONTEND_DIST_CHECK): | |
| FRONTEND_DIST_CHECK = os.path.abspath(os.path.join(os.path.dirname(__file__), "../frontend/dist")) | |
| for sample_dir in ["cyto", "colpo", "histo"]: | |
| sample_path = os.path.join(FRONTEND_DIST_CHECK, sample_dir) | |
| if os.path.isdir(sample_path): | |
| app.mount(f"/{sample_dir}", StaticFiles(directory=sample_path), name=sample_dir) | |
| print(f"✅ Mounted /{sample_dir} from {sample_path}") | |
| else: | |
| print(f"⚠️ Sample directory not found: {sample_path}") | |
| # Mount other static assets (logos, banners) from dist root | |
| for static_file in ["banner.jpeg", "white_logo.png", "black_logo.png", "manalife_LOGO.jpg"]: | |
| static_path = os.path.join(FRONTEND_DIST_CHECK, static_file) | |
| if os.path.isfile(static_path): | |
| print(f"✅ Static file available: /{static_file}") | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # ===================================================== | |
| # MODEL LOADS | |
| # ===================================================== | |
| print("🔹 Loading YOLO model...") | |
| yolo_model = YOLO("best2.pt") | |
| print("🔹 Loading MWT model...") | |
| mwt_model = create_model(num_classes=2).to(device) | |
| mwt_model.load_state_dict(torch.load("MWTclass2.pth", map_location=device)) | |
| mwt_model.eval() | |
| mwt_class_names = ["Negative", "Positive"] | |
| print("🔹 Loading CIN model...") | |
| try: | |
| clf = joblib.load("logistic_regression_model.pkl") | |
| except Exception as e: | |
| print(f"⚠️ CIN classifier not available (logistic_regression_model.pkl missing or invalid): {e}") | |
| clf = None | |
| yolo_colposcopy = YOLO("yolo_colposcopy.pt") | |
| # ===================================================== | |
| # RESNET FEATURE EXTRACTORS FOR CIN | |
| # ===================================================== | |
| def build_resnet(model_name="resnet50"): | |
| if model_name == "resnet50": | |
| model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT) | |
| elif model_name == "resnet101": | |
| model = models.resnet101(weights=models.ResNet101_Weights.DEFAULT) | |
| elif model_name == "resnet152": | |
| model = models.resnet152(weights=models.ResNet152_Weights.DEFAULT) | |
| model.eval().to(device) | |
| return ( | |
| nn.Sequential(model.conv1, model.bn1, model.relu, model.maxpool), | |
| model.layer1, model.layer2, model.layer3, model.layer4, | |
| ) | |
| gap = nn.AdaptiveAvgPool2d((1, 1)) | |
| gmp = nn.AdaptiveMaxPool2d((1, 1)) | |
| resnet50_blocks = build_resnet("resnet50") | |
| resnet101_blocks = build_resnet("resnet101") | |
| resnet152_blocks = build_resnet("resnet152") | |
| transform = transforms.Compose([ | |
| transforms.ToPILImage(), | |
| transforms.Resize((224, 224)), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], | |
| std=[0.229, 0.224, 0.225]), | |
| ]) | |
| def preprocess_for_mwt(image_np): | |
| img = cv2.resize(image_np, (224, 224)) | |
| img = Augmentations.Normalization((0, 1))(img) | |
| img = np.array(img, np.float32) | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| img = img.transpose(2, 0, 1) | |
| img = np.expand_dims(img, axis=0) | |
| return torch.Tensor(img) | |
| def extract_cbf_features(blocks, img_t): | |
| block1, block2, block3, block4, block5 = blocks | |
| with torch.no_grad(): | |
| f1 = block1(img_t) | |
| f2 = block2(f1) | |
| f3 = block3(f2) | |
| f4 = block4(f3) | |
| f5 = block5(f4) | |
| p1 = gmp(f1).view(-1) | |
| p2 = gmp(f2).view(-1) | |
| p3 = gap(f3).view(-1) | |
| p4 = gap(f4).view(-1) | |
| p5 = gap(f5).view(-1) | |
| return torch.cat([p1, p2, p3, p4, p5], dim=0).cpu().numpy() | |
| # ===================================================== | |
| # Model 4: Histopathology Classifier (TensorFlow) | |
| # ===================================================== | |
| print("🔹 Attempting to load Breast Cancer Histopathology model...") | |
| try: | |
| classifier = BreastCancerClassifier(fine_tune=False) | |
| # Safely handle Hugging Face token auth | |
| hf_token = os.getenv("HF_TOKEN") | |
| if hf_token: | |
| if classifier.authenticate_huggingface(): | |
| print("✅ Hugging Face authentication successful.") | |
| else: | |
| print("⚠️ Warning: Hugging Face authentication failed, using local model only.") | |
| else: | |
| print("⚠️ HF_TOKEN not found in environment — skipping authentication.") | |
| # Load Path Foundation model | |
| if classifier.load_path_foundation(): | |
| print("✅ Loaded Path Foundation base model.") | |
| else: | |
| print("⚠️ Could not load Path Foundation base model, continuing with local weights only.") | |
| # Load trained histopathology model | |
| model_path = "histopathology_trained_model.keras" | |
| if os.path.exists(model_path): | |
| classifier.model = tf.keras.models.load_model(model_path) | |
| print(f"✅ Loaded local histopathology model: {model_path}") | |
| else: | |
| print(f"⚠️ Model file not found: {model_path}") | |
| except Exception as e: | |
| classifier = None | |
| print(f"❌ Error initializing histopathology model: {e}") | |
| def predict_histopathology(image): | |
| if classifier is None: | |
| return {"error": "Histopathology model not available."} | |
| try: | |
| if image.mode != "RGB": | |
| image = image.convert("RGB") | |
| image = image.resize((224, 224)) | |
| img_array = np.expand_dims(np.array(image).astype("float32") / 255.0, axis=0) | |
| embeddings = classifier.extract_embeddings(img_array) | |
| prediction_proba = classifier.model.predict(embeddings, verbose=0)[0] | |
| predicted_class = int(np.argmax(prediction_proba)) | |
| class_names = ["Benign", "Malignant"] | |
| # Return confidence as dictionary with both class probabilities (like MWT/CIN) | |
| confidences = {class_names[i]: float(prediction_proba[i]) for i in range(len(class_names))} | |
| avg_confidence = float(np.max(prediction_proba)) * 100 | |
| return { | |
| "model_used": "Histopathology Classifier", | |
| "prediction": class_names[predicted_class], | |
| "confidence": confidences, | |
| "summary": { | |
| "ai_interpretation": f"Histopathological analysis indicates {class_names[predicted_class].lower()} tissue.", | |
| }, | |
| } | |
| except Exception as e: | |
| return {"error": f"Histopathology prediction failed: {e}"} | |
| # ===================================================== | |
| # MAIN ENDPOINT | |
| # ===================================================== | |
| async def predict(model_name: str = Form(...), file: UploadFile = File(...)): | |
| print(f"Received prediction request - model: {model_name}, file: {file.filename}") | |
| # Validate model name | |
| if model_name not in ["yolo", "mwt", "cin", "histopathology"]: | |
| return JSONResponse( | |
| content={ | |
| "error": f"Invalid model_name: {model_name}. Must be one of: yolo, mwt, cin, histopathology" | |
| }, | |
| status_code=400 | |
| ) | |
| # Validate and read file | |
| if not file.filename: | |
| return JSONResponse( | |
| content={"error": "No file provided"}, | |
| status_code=400 | |
| ) | |
| contents = await file.read() | |
| if len(contents) == 0: | |
| return JSONResponse( | |
| content={"error": "Empty file provided"}, | |
| status_code=400 | |
| ) | |
| # Attempt to open and validate image | |
| try: | |
| image = PILImage.open(BytesIO(contents)).convert("RGB") | |
| image_np = np.array(image) | |
| if image_np.size == 0: | |
| raise ValueError("Empty image array") | |
| print(f"Successfully loaded image, shape: {image_np.shape}") | |
| except Exception as e: | |
| return JSONResponse( | |
| content={"error": f"Invalid image file: {str(e)}"}, | |
| status_code=400 | |
| ) | |
| if model_name == "yolo": | |
| results = yolo_model(image) | |
| detections_json = results[0].to_json() | |
| detections = json.loads(detections_json) | |
| abnormal_cells = sum(1 for d in detections if d["name"] == "abnormal") | |
| normal_cells = sum(1 for d in detections if d["name"] == "normal") | |
| avg_confidence = np.mean([d.get("confidence", 0) for d in detections]) * 100 if detections else 0 | |
| ai_summary = generate_ai_summary(abnormal_cells, normal_cells, avg_confidence) | |
| output_filename = f"detected_{uuid.uuid4().hex[:8]}.jpg" | |
| output_path = os.path.join(IMAGES_DIR, output_filename) | |
| results[0].save(filename=output_path) | |
| return { | |
| "model_used": "YOLO Detection", | |
| "detections": detections, | |
| "annotated_image_url": f"/outputs/images/{output_filename}", | |
| "summary": { | |
| "abnormal_cells": abnormal_cells, | |
| "normal_cells": normal_cells, | |
| "ai_interpretation": ai_summary, | |
| }, | |
| } | |
| elif model_name == "mwt": | |
| tensor = preprocess_for_mwt(image_np) | |
| with torch.no_grad(): | |
| output = mwt_model(tensor.to(device)).cpu() | |
| probs = torch.softmax(output, dim=1)[0] | |
| confidences = {mwt_class_names[i]: float(probs[i]) for i in range(2)} | |
| predicted_label = mwt_class_names[int(torch.argmax(probs).item())] | |
| # Average / primary confidence for display | |
| avg_confidence = float(torch.max(probs).item()) * 100 | |
| # Generate a brief AI interpretation using the Mistral client (if available) | |
| ai_interp = generate_mwt_summary(predicted_label, confidences, avg_confidence) | |
| return { | |
| "model_used": "MWT Classifier", | |
| "prediction": predicted_label, | |
| "confidence": confidences, | |
| "summary": { | |
| "ai_interpretation": ai_interp, | |
| }, | |
| } | |
| elif model_name == "cin": | |
| if clf is None: | |
| return JSONResponse( | |
| content={"error": "CIN classifier not available on server."}, | |
| status_code=503, | |
| ) | |
| # Decode uploaded image and run colposcopy detector | |
| nparr = np.frombuffer(contents, np.uint8) | |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| results = yolo_colposcopy.predict(source=img, conf=0.7, save=False, verbose=False) | |
| if len(results[0].boxes) == 0: | |
| return {"error": "No cervix detected"} | |
| x1, y1, x2, y2 = map(int, results[0].boxes.xyxy[0].cpu().numpy()) | |
| crop = img[y1:y2, x1:x2] | |
| crop = cv2.resize(crop, (224, 224)) | |
| img_t = transform(crop).unsqueeze(0).to(device) | |
| # Extract features from multiple ResNet backbones | |
| f50 = extract_cbf_features(resnet50_blocks, img_t) | |
| f101 = extract_cbf_features(resnet101_blocks, img_t) | |
| f152 = extract_cbf_features(resnet152_blocks, img_t) | |
| features = np.concatenate([f50, f101, f152]).reshape(1, -1) | |
| # Scale and predict | |
| X_scaled = MinMaxScaler().fit_transform(features) | |
| # Ensure classifier supports probability outputs | |
| try: | |
| proba = clf.predict_proba(X_scaled)[0] | |
| except Exception as e: | |
| return JSONResponse( | |
| content={"error": "CIN classifier does not support probability estimates (predict_proba)."}, | |
| status_code=503, | |
| ) | |
| num_classes = int(len(proba)) | |
| # Handle different classifier output sizes: | |
| # - If 3 classes: map directly to CIN1/CIN2/CIN3 | |
| # - If 2 classes: apply a conservative heuristic to split High-grade into CIN2/CIN3 | |
| if num_classes == 3: | |
| classes = ["CIN1", "CIN2", "CIN3"] | |
| confidences = {classes[i]: float(proba[i]) for i in range(3)} | |
| predicted_idx = int(np.argmax(proba)) | |
| predicted_label = classes[predicted_idx] | |
| avg_confidence = float(np.max(proba)) * 100 | |
| mapping_used = "direct_3class" | |
| elif num_classes == 2: | |
| # Binary model detected (e.g., Low-grade vs High-grade). We'll convert to CIN1/CIN2/CIN3 | |
| # Heuristic: | |
| # - CIN1 <- low_grade_prob | |
| # - Split high_grade_prob into CIN2 and CIN3 based on how confident 'high' is. | |
| # * If high <= 0.6 -> mostly CIN2 | |
| # * If high >= 0.8 -> mostly CIN3 | |
| # * Between 0.6 and 0.8 -> interpolate | |
| low_prob = float(proba[0]) | |
| high_prob = float(proba[1]) | |
| if high_prob <= 0.6: | |
| cin3_factor = 0.0 | |
| elif high_prob >= 0.8: | |
| cin3_factor = 1.0 | |
| else: | |
| cin3_factor = (high_prob - 0.6) / 0.2 | |
| cin1 = low_prob | |
| cin3 = high_prob * cin3_factor | |
| cin2 = high_prob - cin3 | |
| confidences = {"CIN1": cin1, "CIN2": cin2, "CIN3": cin3} | |
| # pick highest of the mapped three as primary prediction | |
| predicted_label = max(confidences.items(), key=lambda x: x[1])[0] | |
| avg_confidence = float(max(confidences.values())) * 100 | |
| mapping_used = "binary_to_3class_heuristic" | |
| else: | |
| return JSONResponse( | |
| content={ | |
| "error": "CIN classifier must output 2-class (Low/High) or 3-class probabilities (CIN1, CIN2, CIN3).", | |
| "detected_num_classes": num_classes, | |
| }, | |
| status_code=503, | |
| ) | |
| # Generate AI interpretation using Mistral client (if available) | |
| ai_interp = generate_cin_summary(predicted_label, confidences, avg_confidence) | |
| response = { | |
| "model_used": "CIN Classifier", | |
| "prediction": predicted_label, | |
| "confidence": confidences, | |
| "summary": { | |
| "ai_interpretation": ai_interp, | |
| }, | |
| } | |
| # If we used the binary->3class heuristic, include a diagnostic field so callers know it was mapped | |
| if 'mapping_used' in locals() and mapping_used == 'binary_to_3class_heuristic': | |
| response["mapping_used"] = mapping_used | |
| response["mapping_note"] = ( | |
| "The server mapped a binary Low/High classifier to CIN1/CIN2/CIN3 using a heuristic split. " | |
| "This is an approximation — for clinical use please supply a native 3-class model." | |
| ) | |
| return response | |
| elif model_name == "histopathology": | |
| result = predict_histopathology(image) | |
| return result | |
| else: | |
| return JSONResponse(content={"error": "Invalid model name"}, status_code=400) | |
| # ===================================================== | |
| # ROUTES | |
| # ===================================================== | |
| def create_designed_pdf(pdf_path, report_data, analysis_summary_json, annotated_image_path=None): | |
| doc = SimpleDocTemplate(pdf_path, pagesize=letter, | |
| rightMargin=72, leftMargin=72, | |
| topMargin=72, bottomMargin=18) | |
| styles = getSampleStyleSheet() | |
| story = [] | |
| styles.add(ParagraphStyle(name='Title', fontSize=20, fontName='Helvetica-Bold', alignment=TA_CENTER, textColor=navy)) | |
| styles.add(ParagraphStyle(name='Section', fontSize=14, fontName='Helvetica-Bold', spaceBefore=10, spaceAfter=6)) | |
| styles.add(ParagraphStyle(name='NormalSmall', fontSize=10, leading=12)) | |
| styles.add(ParagraphStyle(name='Heading', fontSize=16, fontName='Helvetica-Bold', textColor=navy, spaceBefore=6, spaceAfter=4)) | |
| patient = report_data['patient'] | |
| analysis = report_data.get('analysis', {}) | |
| # Safely parse analysis_summary_json | |
| try: | |
| ai_summary = json.loads(analysis_summary_json) if analysis_summary_json else {} | |
| except (json.JSONDecodeError, TypeError): | |
| ai_summary = {} | |
| # Determine report type based on model used | |
| model_used = ai_summary.get('model_used', '') | |
| if 'YOLO' in model_used or 'yolo' in str(analysis.get('id', '')).lower(): | |
| report_type = "CYTOLOGY" | |
| report_title = "Cytology Report" | |
| elif 'MWT' in model_used or 'mwt' in str(model_used).lower(): | |
| # MWT is a cytology classifier; use a clearer report title for MWT results | |
| report_type = "CYTOLOGY" | |
| report_title = "Cytology Analysis Report" | |
| elif 'CIN' in model_used or 'cin' in str(analysis.get('id', '')).lower() or 'colpo' in str(analysis.get('id', '')).lower(): | |
| report_type = "COLPOSCOPY" | |
| report_title = "Colposcopy Report" | |
| elif 'histo' in str(analysis.get('id', '')).lower() or 'histopathology' in model_used.lower(): | |
| report_type = "HISTOPATHOLOGY" | |
| report_title = "Histopathology Report" | |
| else: | |
| report_type = "CYTOLOGY" | |
| report_title = "Medical Analysis Report" | |
| # Header | |
| story.append(Paragraph("MANALIFE AI", styles['Title'])) | |
| story.append(Paragraph("Advanced Medical Analysis", styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.3*inch)) | |
| story.append(Paragraph(f"MEDICAL ANALYSIS REPORT OF {report_type}", styles['Heading'])) | |
| story.append(Paragraph(report_title, styles['Section'])) | |
| story.append(Spacer(1, 0.2*inch)) | |
| # Report ID and Date | |
| story.append(Paragraph(f"<b>Report ID:</b> {report_data.get('report_id', 'N/A')}", styles['NormalSmall'])) | |
| story.append(Paragraph(f"<b>Generated:</b> {datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')}", styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.2*inch)) | |
| # Patient Information Section | |
| story.append(Paragraph("Patient Information", styles['Section'])) | |
| story.append(Paragraph(f"<b>Patient ID:</b> {patient.get('id', 'N/A')}", styles['NormalSmall'])) | |
| story.append(Paragraph(f"<b>Exam Date:</b> {patient.get('exam_date', 'N/A')}", styles['NormalSmall'])) | |
| story.append(Paragraph(f"<b>Physician:</b> {patient.get('physician', 'N/A')}", styles['NormalSmall'])) | |
| story.append(Paragraph(f"<b>Facility:</b> {patient.get('facility', 'N/A')}", styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.2*inch)) | |
| # Sample Information Section | |
| story.append(Paragraph("Sample Information", styles['Section'])) | |
| story.append(Paragraph(f"<b>Specimen Type:</b> {patient.get('specimen_type', 'Cervical Cytology')}", styles['NormalSmall'])) | |
| story.append(Paragraph(f"<b>Clinical History:</b> {patient.get('clinical_history', 'N/A')}", styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.2*inch)) | |
| # AI Analysis Section | |
| story.append(Paragraph("AI-ASSISTED ANALYSIS", styles['Section'])) | |
| story.append(Paragraph("<b>System:</b> Manalife AI System — Automated Analysis", styles['NormalSmall'])) | |
| # Add metrics based on report type | |
| if report_type == "HISTOPATHOLOGY": | |
| # For histopathology, show Benign/Malignant confidence | |
| confidence_dict = ai_summary.get('confidence', {}) | |
| if isinstance(confidence_dict, dict): | |
| benign_conf = confidence_dict.get('Benign', 0) * 100 | |
| malignant_conf = confidence_dict.get('Malignant', 0) * 100 | |
| story.append(Paragraph(f"<b>Benign Confidence:</b> {benign_conf:.2f}%", styles['NormalSmall'])) | |
| story.append(Paragraph(f"<b>Malignant Confidence:</b> {malignant_conf:.2f}%", styles['NormalSmall'])) | |
| elif report_type == "CYTOLOGY": | |
| # For cytology and MWT, show class confidences if available, otherwise show abnormal/normal cells | |
| confidence_dict = ai_summary.get('confidence', {}) | |
| if isinstance(confidence_dict, dict) and confidence_dict: | |
| for cls, val in confidence_dict.items(): | |
| conf_pct = val * 100 if isinstance(val, (int, float)) else 0 | |
| story.append(Paragraph(f"<b>{cls} Confidence:</b> {conf_pct:.2f}%", styles['NormalSmall'])) | |
| else: | |
| if 'abnormal_cells' in ai_summary: | |
| story.append(Paragraph(f"<b>Abnormal Cells:</b> {ai_summary.get('abnormal_cells', 'N/A')}", styles['NormalSmall'])) | |
| if 'normal_cells' in ai_summary: | |
| story.append(Paragraph(f"<b>Normal Cells:</b> {ai_summary.get('normal_cells', 'N/A')}", styles['NormalSmall'])) | |
| else: | |
| # For CIN/Colposcopy, show class confidences | |
| confidence_dict = ai_summary.get('confidence', {}) | |
| if isinstance(confidence_dict, dict): | |
| for cls, val in confidence_dict.items(): | |
| conf_pct = val * 100 if isinstance(val, (int, float)) else 0 | |
| story.append(Paragraph(f"<b>{cls} Confidence:</b> {conf_pct:.2f}%", styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.1*inch)) | |
| story.append(Paragraph("<b>AI Interpretation:</b>", styles['NormalSmall'])) | |
| story.append(Paragraph(ai_summary.get('ai_interpretation', 'Not available.'), styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.2*inch)) | |
| # If an annotated image path was provided and exists on disk, embed it | |
| if annotated_image_path: | |
| try: | |
| if os.path.isfile(annotated_image_path): | |
| story.append(Spacer(1, 0.1*inch)) | |
| # Determine image pixel size and scale to a reasonable width for PDF | |
| try: | |
| from PIL import Image as PILImageLocal | |
| with PILImageLocal.open(annotated_image_path) as im: | |
| img_w, img_h = im.size | |
| except Exception: | |
| img_w, img_h = (800, 600) | |
| # Display width in points (ReportLab uses points; 1 inch = 72 points). Assume 96 DPI for pixel->inch. | |
| display_width_px = max(300, min(img_w, 800)) | |
| width_points = min(5 * inch, (display_width_px / 96.0) * inch) | |
| img = ReportLabImage(annotated_image_path, width=width_points, kind='proportional') | |
| story.append(img) | |
| story.append(Spacer(1, 0.2*inch)) | |
| except Exception as e: | |
| # Don't fail the whole PDF creation if image embedding fails | |
| print(f"⚠️ Could not embed annotated image in PDF: {e}") | |
| # Doctor's Notes | |
| story.append(Paragraph("Doctor's Notes", styles['Section'])) | |
| story.append(Paragraph(report_data.get('doctor_notes') or 'No additional notes provided.', styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.2*inch)) | |
| # Recommendations | |
| story.append(Paragraph("RECOMMENDATIONS", styles['Section'])) | |
| story.append(Paragraph("Continue routine screening as per standard guidelines. Follow up as directed by your physician.", styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.3*inch)) | |
| # Signatures | |
| story.append(Paragraph("Signatures", styles['Section'])) | |
| story.append(Paragraph("Rajesh Venugopal, Physician", styles['NormalSmall'])) | |
| #story.append(Paragraph("", styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.1*inch)) | |
| story.append(Paragraph(f"Generated on: {datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')}", styles['NormalSmall'])) | |
| doc.build(story) | |
| async def generate_report( | |
| patient_id: str = Form(...), | |
| exam_date: str = Form(...), | |
| metadata: str = Form(...), | |
| notes: str = Form(None), | |
| analysis_id: str = Form(None), | |
| analysis_summary: str = Form(None), | |
| file: UploadFile = File(None), | |
| ): | |
| """Generate a structured medical report from analysis results and metadata.""" | |
| try: | |
| # Create reports directory if it doesn't exist | |
| reports_dir = os.path.join(OUTPUT_DIR, "reports") | |
| os.makedirs(reports_dir, exist_ok=True) | |
| # Generate unique report ID | |
| report_id = f"{patient_id}_{uuid.uuid4().hex[:8]}" | |
| report_dir = os.path.join(reports_dir, report_id) | |
| os.makedirs(report_dir, exist_ok=True) | |
| # Parse metadata | |
| metadata_dict = json.loads(metadata) | |
| # Get analysis results - assuming stored in memory or retrievable | |
| # TODO: Implement analysis results storage/retrieval | |
| # Construct report data | |
| report_data = { | |
| "report_id": report_id, | |
| "generated_at": datetime.datetime.now().isoformat(), | |
| "patient": { | |
| "id": patient_id, | |
| "exam_date": exam_date, | |
| **metadata_dict | |
| }, | |
| "analysis": { | |
| "id": analysis_id, | |
| # If the analysis_id is actually an annotated image URL, store it for report embedding | |
| "annotated_image_url": analysis_id, | |
| # TODO: Add actual analysis results | |
| }, | |
| "doctor_notes": notes | |
| } | |
| # Save report data | |
| report_json = os.path.join(report_dir, "report.json") | |
| with open(report_json, "w", encoding="utf-8") as f: | |
| json.dump(report_data, f, indent=2, ensure_ascii=False) | |
| # We'll create PDF later (after parsing analysis_summary and resolving | |
| # any annotated/input image). Initialize pdf_url here. | |
| pdf_url = None | |
| # Parse analysis_summary to get AI results | |
| try: | |
| ai_summary = json.loads(analysis_summary) if analysis_summary else {} | |
| except (json.JSONDecodeError, TypeError): | |
| ai_summary = {} | |
| # Resolve annotated image: prefer AI/analysis annotated image; if none, | |
| # save the uploaded input image (if provided) into the report folder | |
| # and use that as the embedded image. | |
| annotated_img = ai_summary.get('annotated_image_url') or report_data.get("analysis", {}).get("annotated_image_url") or "" | |
| annotated_img_full = "" | |
| annotated_img_local = None | |
| if annotated_img: | |
| # If it's an outputs path served by StaticFiles, map to local file | |
| if isinstance(annotated_img, str) and annotated_img.startswith('/outputs/'): | |
| rel = annotated_img[len('/outputs/'):].lstrip('/') | |
| annotated_img_local = os.path.join(OUTPUT_DIR, rel) | |
| annotated_img_full = annotated_img | |
| else: | |
| # keep absolute URLs as-is for HTML | |
| annotated_img_full = annotated_img if isinstance(annotated_img, str) else '' | |
| # If no annotated image provided, but an input file was uploaded, save it | |
| if not annotated_img_full and file is not None and getattr(file, 'filename', None): | |
| try: | |
| input_filename = f"input_image{os.path.splitext(file.filename)[1] or '.jpg'}" | |
| input_path = os.path.join(report_dir, input_filename) | |
| contents = await file.read() | |
| with open(input_path, 'wb') as out_f: | |
| out_f.write(contents) | |
| annotated_img_full = f"/outputs/reports/{report_id}/{input_filename}" | |
| annotated_img_local = input_path | |
| except Exception as e: | |
| print(f"⚠️ Failed to save uploaded input image for report: {e}") | |
| # Ensure annotated_img_full has a leading slash if it's a relative path | |
| if annotated_img_full and not annotated_img_full.startswith(('http://', 'https://')): | |
| annotated_img_full = annotated_img_full if annotated_img_full.startswith('/') else '/' + annotated_img_full | |
| # If we have a local annotated image but it's stored in the shared images folder | |
| # (e.g. /outputs/images/...), copy it into this report's folder so the HTML/PDF | |
| # can reference the image relative to the report directory. This also makes the | |
| # image visible when opening report.html directly from disk (file://). | |
| try: | |
| if annotated_img_local: | |
| annotated_img_local_abs = os.path.abspath(annotated_img_local) | |
| report_dir_abs = os.path.abspath(report_dir) | |
| # If the image is not already in the report directory, copy it there | |
| if not os.path.commonpath([annotated_img_local_abs, report_dir_abs]) == report_dir_abs: | |
| src_basename = os.path.basename(annotated_img_local_abs) | |
| dest_name = f"annotated_{src_basename}" | |
| dest_path = os.path.join(report_dir, dest_name) | |
| try: | |
| shutil.copy2(annotated_img_local_abs, dest_path) | |
| annotated_img_local = dest_path | |
| annotated_img_full = f"/outputs/reports/{report_id}/{dest_name}" | |
| except Exception as e: | |
| # If copy fails, keep using the original annotated_img_full (may be served by StaticFiles) | |
| print(f"⚠️ Failed to copy annotated image into report folder: {e}") | |
| except Exception: | |
| pass | |
| # Now attempt to create the PDF (passing the local annotated image path | |
| # so the PDF writer can embed it). If annotation is remote or not | |
| # available, PDF creation will still proceed without the image. | |
| if REPORTLAB_AVAILABLE: | |
| try: | |
| pdf_path = os.path.join(report_dir, "report.pdf") | |
| create_designed_pdf(pdf_path, report_data, analysis_summary, annotated_img_local) | |
| pdf_url = f"/outputs/reports/{report_id}/report.pdf" | |
| except Exception as e: | |
| print(f"Error creating designed PDF: {e}") | |
| pdf_url = None | |
| # Determine report type based on analysis summary or model used | |
| model_used = ai_summary.get('model_used', '') | |
| if 'YOLO' in model_used or 'yolo' in str(analysis_id).lower(): | |
| report_type = "Cytology" | |
| report_title = "Cytology Report" | |
| elif 'MWT' in model_used or 'mwt' in str(model_used).lower() or 'mwt' in str(analysis_id).lower(): | |
| # MWT is a cytology classifier — use clearer title | |
| report_type = "Cytology" | |
| report_title = "Cytology Analysis Report" | |
| elif 'CIN' in model_used or 'cin' in str(analysis_id).lower() or 'colpo' in str(analysis_id).lower(): | |
| report_type = "Colposcopy" | |
| report_title = "Colposcopy Report" | |
| elif 'histo' in str(analysis_id).lower() or 'histopathology' in model_used.lower(): | |
| report_type = "Histopathology" | |
| report_title = "Histopathology Report" | |
| else: | |
| # Default fallback | |
| report_type = "Cytology" | |
| report_title = "Medical Analysis Report" | |
| # Build analysis metrics HTML based on report type | |
| if report_type == "Histopathology": | |
| # For histopathology, show Benign/Malignant confidence from the confidence dict | |
| confidence_dict = ai_summary.get('confidence', {}) | |
| benign_conf = confidence_dict.get('Benign', 0) * 100 if isinstance(confidence_dict, dict) else 0 | |
| malignant_conf = confidence_dict.get('Malignant', 0) * 100 if isinstance(confidence_dict, dict) else 0 | |
| analysis_metrics_html = f""" | |
| <tr><th>System</th><td>Manalife AI System — Automated Analysis</td></tr> | |
| <tr><th>Benign Confidence</th><td>{benign_conf:.2f}%</td></tr> | |
| <tr><th>Malignant Confidence</th><td>{malignant_conf:.2f}%</td></tr> | |
| """ | |
| elif report_type == "Cytology": | |
| # For cytology (YOLO) or MWT, show class confidences if provided, else abnormal/normal counts | |
| confidence_dict = ai_summary.get('confidence', {}) | |
| if isinstance(confidence_dict, dict) and confidence_dict: | |
| confidence_rows = "" | |
| for cls, val in confidence_dict.items(): | |
| conf_pct = val * 100 if isinstance(val, (int, float)) else 0 | |
| confidence_rows += f"<tr><th>{cls} Confidence</th><td>{conf_pct:.2f}%</td></tr>\n " | |
| analysis_metrics_html = f""" | |
| <tr><th>System</th><td>Manalife AI System — Automated Analysis</td></tr> | |
| {confidence_rows} | |
| """ | |
| else: | |
| analysis_metrics_html = f""" | |
| <tr><th>System</th><td>Manalife AI System — Automated Analysis</td></tr> | |
| <tr><th>Abnormal Cells</th><td>{ai_summary.get('abnormal_cells', 'N/A')}</td></tr> | |
| <tr><th>Normal Cells</th><td>{ai_summary.get('normal_cells', 'N/A')}</td></tr> | |
| """ | |
| else: | |
| # For CIN/Colposcopy or other models, show generic confidence | |
| confidence_dict = ai_summary.get('confidence', {}) | |
| confidence_rows = "" | |
| if isinstance(confidence_dict, dict): | |
| for cls, val in confidence_dict.items(): | |
| conf_pct = val * 100 if isinstance(val, (int, float)) else 0 | |
| confidence_rows += f"<tr><th>{cls} Confidence</th><td>{conf_pct:.2f}%</td></tr>\n " | |
| analysis_metrics_html = f""" | |
| <tr><th>System</th><td>Manalife AI System — Automated Analysis</td></tr> | |
| {confidence_rows} | |
| """ | |
| # Build final HTML including download links and embedded annotated image | |
| report_html = os.path.join(report_dir, "report.html") | |
| json_url = f"/outputs/reports/{report_id}/report.json" | |
| html_url = f"/outputs/reports/{report_id}/report.html" | |
| # annotated_img_full was computed earlier; ensure it's defined and set | |
| # annotated_img (used by the HTML template conditional) accordingly. | |
| if 'annotated_img_full' not in locals() or not annotated_img_full: | |
| annotated_img_full = '' | |
| annotated_img = annotated_img_full | |
| # Compute a display path for the HTML. Prefer a relative filename when the | |
| # annotated image is copied into the same report folder. This makes the | |
| # HTML work when opened directly from disk (or via HF file viewer). | |
| annotated_img_display = annotated_img_full | |
| try: | |
| if annotated_img_local: | |
| annotated_img_local_abs = os.path.abspath(annotated_img_local) | |
| report_dir_abs = os.path.abspath(report_dir) | |
| # If the annotated image resides in the report folder, reference by basename | |
| if os.path.commonpath([annotated_img_local_abs, report_dir_abs]) == report_dir_abs: | |
| annotated_img_display = os.path.basename(annotated_img_local_abs) | |
| else: | |
| # If annotated image is inside the outputs/reports/<report_id>/ path but not same | |
| # absolute path (edge cases), make it relative to the report dir | |
| prefix = f"/outputs/reports/{report_id}/" | |
| if isinstance(annotated_img_full, str) and annotated_img_full.startswith(prefix): | |
| annotated_img_display = annotated_img_full[len(prefix):] | |
| except Exception: | |
| annotated_img_display = annotated_img_full | |
| download_pdf_btn = f'<a href="{pdf_url}" download style="text-decoration:none"><button class="btn-secondary">Download PDF</button></a>' if pdf_url else '' | |
| # Format generated time | |
| generated_time = datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p') | |
| html_content = f"""<!doctype html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="utf-8" /> | |
| <meta name="viewport" content="width=device-width,initial-scale=1" /> | |
| <title>{report_title} — Manalife AI</title> | |
| <style> | |
| :root{{--bg:#f8fafc;--card:#ffffff;--muted:#6b7280;--accent:#0f172a}} | |
| body{{font-family:Inter,ui-sans-serif,system-ui,-apple-system,"Segoe UI",Roboto,"Helvetica Neue",Arial;margin:0;background:var(--bg);color:var(--accent);line-height:1.45}} | |
| .container{{max-width:900px;margin:36px auto;padding:20px}} | |
| header{{display:flex;align-items:center;gap:16px}} | |
| .brand{{display:flex;flex-direction:column}} | |
| h1{{margin:0;font-size:20px}} | |
| .sub{{color:var(--muted);font-size:13px}} | |
| .card{{background:var(--card);box-shadow:0 6px 18px rgba(15,23,42,0.06);border-radius:12px;padding:20px;margin-top:18px}} | |
| .grid{{display:grid;grid-template-columns:1fr 1fr;gap:12px}} | |
| .section-title{{font-weight:600;margin-top:8px}} | |
| table{{width:100%;border-collapse:collapse;margin-top:8px}} | |
| td,th{{padding:8px;border-bottom:1px dashed #e6e9ef;text-align:left;font-size:14px}} | |
| .full{{grid-column:1/-1}} | |
| .muted{{color:var(--muted);font-size:13px}} | |
| .footer{{margin-top:20px;font-size:13px;color:var(--muted)}} | |
| .pill{{background:#eef2ff;color:#1e3a8a;padding:6px 10px;border-radius:999px;font-weight:600;font-size:13px}} | |
| @media (max-width:700px){{.grid{{grid-template-columns:1fr}}}} | |
| .signatures{{display:flex;gap:20px;flex-wrap:wrap;margin-top:12px}} | |
| .sig{{background:#fbfbfd;border:1px solid #f0f1f5;padding:10px;border-radius:8px;min-width:180px}} | |
| .annotated-image{{max-width:100%;height:auto;border-radius:8px;margin-top:12px;border:1px solid #e6e9ef}} | |
| .btn-primary{{padding:10px 14px;border-radius:8px;border:1px solid #2563eb;background:#2563eb;color:white;font-weight:700;cursor:pointer}} | |
| .btn-secondary{{padding:10px 14px;border-radius:8px;border:1px solid #e6eefc;background:#eef2ff;font-weight:700;cursor:pointer}} | |
| .actions-bar{{margin-top:12px;display:flex;gap:8px;flex-wrap:wrap}} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <header> | |
| <div> | |
| <!-- Use the static logo from frontend public/ (copied to dist by Vite) --> | |
| <img src="/manalife_LOGO.jpg" alt="Manalife Logo" width="64" height="64"> | |
| </div> | |
| <div class="brand"> | |
| <h1>MANALIFE AI — Medical Analysis</h1> | |
| <div class="sub">Advanced cytological colposcopy and histopathology reporting</div> | |
| <div class="muted">[email protected] • +1 (555) 123-4567</div> | |
| </div> | |
| </header> | |
| <div class="card"> | |
| <div style="display:flex;justify-content:space-between;align-items:center;gap:12px;flex-wrap:wrap"> | |
| <div> | |
| <div class="muted">MEDICAL ANALYSIS REPORT OF {report_type.upper()}</div> | |
| <h2 style="margin:6px 0 0 0">{report_title}</h2> | |
| </div> | |
| <div style="text-align:right"> | |
| <div class="pill">Report ID: {report_id}</div> | |
| <div class="muted" style="margin-top:6px">Generated: {generated_time}</div> | |
| </div> | |
| </div> | |
| <hr style="border:none;border-top:1px solid #eef2f6;margin:16px 0"> | |
| <div class="grid"> | |
| <div> | |
| <div class="section-title">Patient Information</div> | |
| <table> | |
| <tr><th>Patient ID</th><td>{patient_id}</td></tr> | |
| <tr><th>Exam Date</th><td>{exam_date}</td></tr> | |
| <tr><th>Physician</th><td>{metadata_dict.get('physician', 'N/A')}</td></tr> | |
| <tr><th>Facility</th><td>{metadata_dict.get('facility', 'N/A')}</td></tr> | |
| </table> | |
| </div> | |
| <div> | |
| <div class="section-title">Sample Information</div> | |
| <table> | |
| <tr><th>Specimen Type</th><td>{metadata_dict.get('specimen_type', 'N/A')}</td></tr> | |
| <tr><th>Clinical History</th><td>{metadata_dict.get('clinical_history', 'N/A')}</td></tr> | |
| <tr><th>Collected</th><td>{exam_date}</td></tr> | |
| <tr><th>Reported</th><td>{generated_time}</td></tr> | |
| </table> | |
| </div> | |
| <div class="full"> | |
| <div class="section-title">AI-Assisted Analysis</div> | |
| <table> | |
| {analysis_metrics_html} | |
| </table> | |
| <div style="margin-top:12px;padding:12px;background:#f8fafc;border-radius:8px;border-left:4px solid #2563eb"> | |
| <div style="font-weight:600;margin-bottom:6px">AI Interpretation:</div> | |
| <div class="muted">{ai_summary.get('ai_interpretation', 'No AI interpretation available.')}</div> | |
| </div> | |
| </div> | |
| {'<div class="full"><div class="section-title">Annotated Analysis Image</div><img src="' + annotated_img_display + '" class="annotated-image" alt="Annotated Analysis Result" /></div>' if annotated_img else ''} | |
| <div class="full"> | |
| <div class="section-title">Doctor\'s Notes</div> | |
| <p class="muted">{notes or 'No additional notes provided.'}</p> | |
| </div> | |
| <div class="full"> | |
| <div class="section-title">Recommendations</div> | |
| <p class="muted">Continue routine screening as per standard guidelines. Follow up as directed by your physician.</p> | |
| </div> | |
| <div class="full"> | |
| <div class="section-title">Signatures</div> | |
| <div class="signatures"> | |
| <div class="sig"> | |
| <div style="font-weight:700">Rajesh Venugopal</div> | |
| <div class="muted">Physician</div> | |
| </div> | |
| </div> | |
| </div> | |
| </div> | |
| <div class="footer"> | |
| <div>AI System: Manalife AI — Automated Analysis</div> | |
| <div style="margin-top:6px">Report generated: {report_data['generated_at']}</div> | |
| </div> | |
| </div> | |
| <div class="actions-bar"> | |
| {download_pdf_btn} | |
| <button class="btn-secondary" onclick="window.print()">Print Report</button> | |
| </div> | |
| </div> | |
| </body> | |
| </html>""" | |
| with open(report_html, "w", encoding="utf-8") as f: | |
| f.write(html_content) | |
| # Update report.json to include the resolved annotated image url so callers can find it | |
| try: | |
| report_data['analysis']['annotated_image_url'] = annotated_img_full or '' | |
| with open(report_json, 'w', encoding='utf-8') as f: | |
| json.dump(report_data, f, indent=2, ensure_ascii=False) | |
| except Exception as e: | |
| print(f"⚠️ Failed to update report.json with annotated image url: {e}") | |
| return { | |
| "report_id": report_id, | |
| "json_url": json_url, | |
| "html_url": html_url, | |
| "pdf_url": pdf_url, | |
| } | |
| except Exception as e: | |
| return JSONResponse( | |
| content={"error": f"Failed to generate report: {str(e)}"}, | |
| status_code=500 | |
| ) | |
| async def get_report(report_id: str): | |
| """Fetch a generated report by ID.""" | |
| report_dir = os.path.join(OUTPUT_DIR, "reports", report_id) | |
| report_json = os.path.join(report_dir, "report.json") | |
| if not os.path.exists(report_json): | |
| return JSONResponse( | |
| content={"error": "Report not found"}, | |
| status_code=404 | |
| ) | |
| with open(report_json, "r") as f: | |
| report_data = json.load(f) | |
| return report_data | |
| async def list_reports(patient_id: str = None): | |
| """List all generated reports, optionally filtered by patient ID.""" | |
| reports_dir = os.path.join(OUTPUT_DIR, "reports") | |
| if not os.path.exists(reports_dir): | |
| return {"reports": []} | |
| reports = [] | |
| for report_id in os.listdir(reports_dir): | |
| report_json = os.path.join(reports_dir, report_id, "report.json") | |
| if os.path.exists(report_json): | |
| with open(report_json, "r") as f: | |
| report_data = json.load(f) | |
| if not patient_id or report_data["patient"]["id"] == patient_id: | |
| reports.append({ | |
| "report_id": report_id, | |
| "patient_id": report_data["patient"]["id"], | |
| "exam_date": report_data["patient"]["exam_date"], | |
| "generated_at": report_data["generated_at"] | |
| }) | |
| return {"reports": sorted(reports, key=lambda r: r["generated_at"], reverse=True)} | |
| def get_models(): | |
| return {"available_models": ["yolo", "mwt", "cin", "histopathology"]} | |
| def health(): | |
| return {"message": "Pathora Medical Diagnostic API is running!"} | |
| # FRONTEND | |
| # ===================================================== | |
| # Serve frontend only if it has been built; avoid startup failure when dist/ is missing. | |
| FRONTEND_DIST = os.path.abspath(os.path.join(os.path.dirname(__file__), "../frontend/dist")) | |
| # Check if frontend/dist exists in /app (Docker), otherwise check relative to script location | |
| if not os.path.isdir(FRONTEND_DIST): | |
| # Fallback for Docker: frontend is copied to ./frontend/dist during build | |
| FRONTEND_DIST = os.path.join(os.path.dirname(__file__), "frontend/dist") | |
| ASSETS_DIR = os.path.join(FRONTEND_DIST, "assets") | |
| if os.path.isdir(ASSETS_DIR): | |
| app.mount("/assets", StaticFiles(directory=ASSETS_DIR), name="assets") | |
| else: | |
| print("ℹ️ Frontend assets directory not found — skipping /assets mount.") | |
| async def serve_frontend(): | |
| index_path = os.path.join(FRONTEND_DIST, "index.html") | |
| if os.path.isfile(index_path): | |
| return FileResponse(index_path) | |
| return JSONResponse({"message": "Backend is running. Frontend build not found."}) | |
| async def serve_static_files(file_path: str): | |
| """Serve static files from frontend dist (images, logos, etc.)""" | |
| # Skip API routes | |
| if file_path.startswith(("predict", "reports", "models", "health", "outputs", "assets", "cyto", "colpo", "histo")): | |
| return JSONResponse({"error": "Not found"}, status_code=404) | |
| # Try to serve file from dist root | |
| static_file = os.path.join(FRONTEND_DIST, file_path) | |
| if os.path.isfile(static_file): | |
| return FileResponse(static_file) | |
| # Fallback to index.html for client-side routing | |
| index_path = os.path.join(FRONTEND_DIST, "index.html") | |
| if os.path.isfile(index_path): | |
| return FileResponse(index_path) | |
| return JSONResponse({"error": "Not found"}, status_code=404) | |
| if __name__ == "__main__": | |
| # Use PORT provided by the environment (Hugging Face Spaces sets PORT=7860) | |
| port = int(os.environ.get("PORT", 7860)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |