import os import shutil for d in ["/tmp/huggingface", "/tmp/Ultralytics", "/tmp/matplotlib", "/tmp/torch", "/root/.cache"]: shutil.rmtree(d, ignore_errors=True) os.environ["HF_HOME"] = "/tmp/huggingface" os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface" os.environ["TORCH_HOME"] = "/tmp/torch" os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" import json import uuid import datetime import numpy as np import torch import cv2 import joblib import torch.nn as nn import torchvision.transforms as transforms import torchvision.models as models from io import BytesIO from PIL import Image as PILImage from fastapi import FastAPI, File, UploadFile, Form from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, FileResponse import tensorflow as tf from model_histo import BreastCancerClassifier from fastapi.staticfiles import StaticFiles import uvicorn try: from reportlab.lib.pagesizes import letter from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image as ReportLabImage from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.enums import TA_CENTER, TA_JUSTIFY from reportlab.lib.units import inch from reportlab.lib.colors import navy, black REPORTLAB_AVAILABLE = True except ImportError: REPORTLAB_AVAILABLE = False from ultralytics import YOLO from sklearn.preprocessing import MinMaxScaler from model import MWT as create_model from augmentations import Augmentations from huggingface_hub import InferenceClient # ===================================================== # SETUP TEMP DIRS AND ENV # ===================================================== for d in ["/tmp/huggingface", "/tmp/Ultralytics", "/tmp/matplotlib", "/tmp/torch"]: shutil.rmtree(d, ignore_errors=True) os.environ["HF_HOME"] = "/tmp/huggingface" os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface" os.environ["TORCH_HOME"] = "/tmp/torch" os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" # ===================================================== # HUGGING FACE CLIENT SETUP # ===================================================== HF_MODEL_ID = "BioMistral/BioMistral-7B" hf_token = os.getenv("HF_TOKEN") client = None if hf_token: try: client = InferenceClient(model=HF_MODEL_ID, token=hf_token) print(f"✅ Hugging Face InferenceClient initialized for {HF_MODEL_ID}") except Exception as e: print("⚠️ Failed to initialize Hugging Face client:", e) else: print("⚠️ Warning: No HF_TOKEN found — summaries will be skipped.") def generate_ai_summary(abnormal_cells, normal_cells, avg_confidence): """Generate a brief medical interpretation using Mistral.""" if not client: return "⚠️ Hugging Face client not initialized — skipping summary." try: prompt = f"""Act as a cytopathology expert providing a brief diagnostic interpretation. Observed Cell Counts: - {abnormal_cells} Abnormal Cells - {normal_cells} Normal Cells Write a 2-3 sentence professional medical assessment focusing on: 1. Cell count analysis 2. Abnormality ratio ({abnormal_cells/(abnormal_cells + normal_cells)*100:.1f}%) 3. Clinical significance Use objective, scientific language suitable for a pathology report.""" # Use streaming to avoid StopIteration response = client.text_generation( prompt, max_new_tokens=200, temperature=0.7, stream=False, details=True, stop_sequences=["\n\n", "###"] ) # Handle different response formats if hasattr(response, 'generated_text'): return response.generated_text.strip() elif isinstance(response, dict): return response.get('generated_text', '').strip() elif isinstance(response, str): return response.strip() # Fallback summary if response format is unexpected ratio = abnormal_cells / (abnormal_cells + normal_cells) * 100 if (abnormal_cells + normal_cells) > 0 else 0 return f"Analysis shows {abnormal_cells} abnormal cells ({ratio:.1f}%) and {normal_cells} normal cells." except Exception as e: # Provide a structured fallback summary instead of error message total = abnormal_cells + normal_cells if total == 0: return "No cells were detected in the sample. Consider re-scanning or adjusting detection parameters." ratio = (abnormal_cells / total) * 100 severity = "high" if ratio > 70 else "moderate" if ratio > 30 else "low" return f"Quantitative analysis detected {abnormal_cells} abnormal cells ({ratio:.1f}%) among {total} total cells, indicating {severity} abnormality ratio." def generate_mwt_summary(predicted_label, confidences, avg_confidence): """Generate a short MWT-specific interpretation using the HF client when available.""" if not client: return "⚠️ Hugging Face client not initialized — skipping AI interpretation." try: prompt = f""" You are a concise cytopathology expert. Given an MWT classifier result, write a 1-2 sentence professional interpretation suitable for embedding in a diagnostic report. Result: - Predicted label: {predicted_label} - Class probabilities: {json.dumps(confidences)} Provide guidance on the significance of the result and any suggested next steps in plain, objective language. """ response = client.text_generation( prompt, max_new_tokens=120, temperature=0.2, stream=False, details=True, stop_sequences=["\n\n", "###"] ) if hasattr(response, 'generated_text'): return response.generated_text.strip() elif isinstance(response, dict): return response.get('generated_text', '').strip() elif isinstance(response, str): return response.strip() return f"Result: {predicted_label}." except Exception as e: return f"Quantitative result: {predicted_label}." def generate_cin_summary(predicted_grade, confidences, avg_confidence): """Generate a short CIN-specific interpretation using the HF client when available.""" if not client: return "⚠️ Hugging Face client not initialized — skipping AI interpretation." try: prompt = f""" You are a concise gynecologic pathology expert. Given a CIN classifier result, write a 1-2 sentence professional interpretation suitable for a diagnostic report. Result: - Predicted grade: {predicted_grade} - Class probabilities: {json.dumps(confidences)} Provide a brief statement about clinical significance and suggested next steps (e.g., further colposcopic evaluation) in objective, clinical language. """ response = client.text_generation( prompt, max_new_tokens=140, temperature=0.2, stream=False, details=True, stop_sequences=["\n\n", "###"] ) if hasattr(response, 'generated_text'): return response.generated_text.strip() elif isinstance(response, dict): return response.get('generated_text', '').strip() elif isinstance(response, str): return response.strip() return f"Result: {predicted_grade}." except Exception: return f"Quantitative result: {predicted_grade}." # ===================================================== # FASTAPI SETUP # ===================================================== app = FastAPI(title="Pathora Medical Diagnostic API") app.add_middleware( CORSMiddleware, allow_origins=["*", "http://localhost:5173", "http://127.0.0.1:5173"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], expose_headers=["*"] # Allow access to response headers ) # Use /tmp for outputs in Hugging Face Spaces (writable directory) OUTPUT_DIR = os.environ.get("OUTPUT_DIR", "/tmp/outputs") os.makedirs(OUTPUT_DIR, exist_ok=True) # Create image outputs dir IMAGES_DIR = os.path.join(OUTPUT_DIR, "images") os.makedirs(IMAGES_DIR, exist_ok=True) app.mount("/outputs", StaticFiles(directory=OUTPUT_DIR), name="outputs") # Mount public sample images from frontend dist (Vite copies public/ to dist/ root) # Check both possible locations: frontend/dist (Docker) and ../frontend/dist (local dev) FRONTEND_DIST_CHECK = os.path.join(os.path.dirname(__file__), "frontend/dist") if not os.path.isdir(FRONTEND_DIST_CHECK): FRONTEND_DIST_CHECK = os.path.abspath(os.path.join(os.path.dirname(__file__), "../frontend/dist")) for sample_dir in ["cyto", "colpo", "histo"]: sample_path = os.path.join(FRONTEND_DIST_CHECK, sample_dir) if os.path.isdir(sample_path): app.mount(f"/{sample_dir}", StaticFiles(directory=sample_path), name=sample_dir) print(f"✅ Mounted /{sample_dir} from {sample_path}") else: print(f"⚠️ Sample directory not found: {sample_path}") # Mount other static assets (logos, banners) from dist root for static_file in ["banner.jpeg", "white_logo.png", "black_logo.png", "manalife_LOGO.jpg"]: static_path = os.path.join(FRONTEND_DIST_CHECK, static_file) if os.path.isfile(static_path): print(f"✅ Static file available: /{static_file}") device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # ===================================================== # MODEL LOADS # ===================================================== print("🔹 Loading YOLO model...") yolo_model = YOLO("best2.pt") print("🔹 Loading MWT model...") mwt_model = create_model(num_classes=2).to(device) mwt_model.load_state_dict(torch.load("MWTclass2.pth", map_location=device)) mwt_model.eval() mwt_class_names = ["Negative", "Positive"] print("🔹 Loading CIN model...") try: clf = joblib.load("logistic_regression_model.pkl") except Exception as e: print(f"⚠️ CIN classifier not available (logistic_regression_model.pkl missing or invalid): {e}") clf = None yolo_colposcopy = YOLO("yolo_colposcopy.pt") # ===================================================== # RESNET FEATURE EXTRACTORS FOR CIN # ===================================================== def build_resnet(model_name="resnet50"): if model_name == "resnet50": model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT) elif model_name == "resnet101": model = models.resnet101(weights=models.ResNet101_Weights.DEFAULT) elif model_name == "resnet152": model = models.resnet152(weights=models.ResNet152_Weights.DEFAULT) model.eval().to(device) return ( nn.Sequential(model.conv1, model.bn1, model.relu, model.maxpool), model.layer1, model.layer2, model.layer3, model.layer4, ) gap = nn.AdaptiveAvgPool2d((1, 1)) gmp = nn.AdaptiveMaxPool2d((1, 1)) resnet50_blocks = build_resnet("resnet50") resnet101_blocks = build_resnet("resnet101") resnet152_blocks = build_resnet("resnet152") transform = transforms.Compose([ transforms.ToPILImage(), transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) def preprocess_for_mwt(image_np): img = cv2.resize(image_np, (224, 224)) img = Augmentations.Normalization((0, 1))(img) img = np.array(img, np.float32) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = img.transpose(2, 0, 1) img = np.expand_dims(img, axis=0) return torch.Tensor(img) def extract_cbf_features(blocks, img_t): block1, block2, block3, block4, block5 = blocks with torch.no_grad(): f1 = block1(img_t) f2 = block2(f1) f3 = block3(f2) f4 = block4(f3) f5 = block5(f4) p1 = gmp(f1).view(-1) p2 = gmp(f2).view(-1) p3 = gap(f3).view(-1) p4 = gap(f4).view(-1) p5 = gap(f5).view(-1) return torch.cat([p1, p2, p3, p4, p5], dim=0).cpu().numpy() # ===================================================== # Model 4: Histopathology Classifier (TensorFlow) # ===================================================== print("🔹 Attempting to load Breast Cancer Histopathology model...") try: classifier = BreastCancerClassifier(fine_tune=False) # Safely handle Hugging Face token auth hf_token = os.getenv("HF_TOKEN") if hf_token: if classifier.authenticate_huggingface(): print("✅ Hugging Face authentication successful.") else: print("⚠️ Warning: Hugging Face authentication failed, using local model only.") else: print("⚠️ HF_TOKEN not found in environment — skipping authentication.") # Load Path Foundation model if classifier.load_path_foundation(): print("✅ Loaded Path Foundation base model.") else: print("⚠️ Could not load Path Foundation base model, continuing with local weights only.") # Load trained histopathology model model_path = "histopathology_trained_model.keras" if os.path.exists(model_path): classifier.model = tf.keras.models.load_model(model_path) print(f"✅ Loaded local histopathology model: {model_path}") else: print(f"⚠️ Model file not found: {model_path}") except Exception as e: classifier = None print(f"❌ Error initializing histopathology model: {e}") def predict_histopathology(image): if classifier is None: return {"error": "Histopathology model not available."} try: if image.mode != "RGB": image = image.convert("RGB") image = image.resize((224, 224)) img_array = np.expand_dims(np.array(image).astype("float32") / 255.0, axis=0) embeddings = classifier.extract_embeddings(img_array) prediction_proba = classifier.model.predict(embeddings, verbose=0)[0] predicted_class = int(np.argmax(prediction_proba)) class_names = ["Benign", "Malignant"] # Return confidence as dictionary with both class probabilities (like MWT/CIN) confidences = {class_names[i]: float(prediction_proba[i]) for i in range(len(class_names))} avg_confidence = float(np.max(prediction_proba)) * 100 return { "model_used": "Histopathology Classifier", "prediction": class_names[predicted_class], "confidence": confidences, "summary": { "ai_interpretation": f"Histopathological analysis indicates {class_names[predicted_class].lower()} tissue.", }, } except Exception as e: return {"error": f"Histopathology prediction failed: {e}"} # ===================================================== # MAIN ENDPOINT # ===================================================== @app.post("/predict/") async def predict(model_name: str = Form(...), file: UploadFile = File(...)): print(f"Received prediction request - model: {model_name}, file: {file.filename}") # Validate model name if model_name not in ["yolo", "mwt", "cin", "histopathology"]: return JSONResponse( content={ "error": f"Invalid model_name: {model_name}. Must be one of: yolo, mwt, cin, histopathology" }, status_code=400 ) # Validate and read file if not file.filename: return JSONResponse( content={"error": "No file provided"}, status_code=400 ) contents = await file.read() if len(contents) == 0: return JSONResponse( content={"error": "Empty file provided"}, status_code=400 ) # Attempt to open and validate image try: image = PILImage.open(BytesIO(contents)).convert("RGB") image_np = np.array(image) if image_np.size == 0: raise ValueError("Empty image array") print(f"Successfully loaded image, shape: {image_np.shape}") except Exception as e: return JSONResponse( content={"error": f"Invalid image file: {str(e)}"}, status_code=400 ) if model_name == "yolo": results = yolo_model(image) detections_json = results[0].to_json() detections = json.loads(detections_json) abnormal_cells = sum(1 for d in detections if d["name"] == "abnormal") normal_cells = sum(1 for d in detections if d["name"] == "normal") avg_confidence = np.mean([d.get("confidence", 0) for d in detections]) * 100 if detections else 0 ai_summary = generate_ai_summary(abnormal_cells, normal_cells, avg_confidence) output_filename = f"detected_{uuid.uuid4().hex[:8]}.jpg" output_path = os.path.join(IMAGES_DIR, output_filename) results[0].save(filename=output_path) return { "model_used": "YOLO Detection", "detections": detections, "annotated_image_url": f"/outputs/images/{output_filename}", "summary": { "abnormal_cells": abnormal_cells, "normal_cells": normal_cells, "ai_interpretation": ai_summary, }, } elif model_name == "mwt": tensor = preprocess_for_mwt(image_np) with torch.no_grad(): output = mwt_model(tensor.to(device)).cpu() probs = torch.softmax(output, dim=1)[0] confidences = {mwt_class_names[i]: float(probs[i]) for i in range(2)} predicted_label = mwt_class_names[int(torch.argmax(probs).item())] # Average / primary confidence for display avg_confidence = float(torch.max(probs).item()) * 100 # Generate a brief AI interpretation using the Mistral client (if available) ai_interp = generate_mwt_summary(predicted_label, confidences, avg_confidence) return { "model_used": "MWT Classifier", "prediction": predicted_label, "confidence": confidences, "summary": { "ai_interpretation": ai_interp, }, } elif model_name == "cin": if clf is None: return JSONResponse( content={"error": "CIN classifier not available on server."}, status_code=503, ) # Decode uploaded image and run colposcopy detector nparr = np.frombuffer(contents, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) results = yolo_colposcopy.predict(source=img, conf=0.7, save=False, verbose=False) if len(results[0].boxes) == 0: return {"error": "No cervix detected"} x1, y1, x2, y2 = map(int, results[0].boxes.xyxy[0].cpu().numpy()) crop = img[y1:y2, x1:x2] crop = cv2.resize(crop, (224, 224)) img_t = transform(crop).unsqueeze(0).to(device) # Extract features from multiple ResNet backbones f50 = extract_cbf_features(resnet50_blocks, img_t) f101 = extract_cbf_features(resnet101_blocks, img_t) f152 = extract_cbf_features(resnet152_blocks, img_t) features = np.concatenate([f50, f101, f152]).reshape(1, -1) # Scale and predict X_scaled = MinMaxScaler().fit_transform(features) # Ensure classifier supports probability outputs try: proba = clf.predict_proba(X_scaled)[0] except Exception as e: return JSONResponse( content={"error": "CIN classifier does not support probability estimates (predict_proba)."}, status_code=503, ) num_classes = int(len(proba)) # Handle different classifier output sizes: # - If 3 classes: map directly to CIN1/CIN2/CIN3 # - If 2 classes: apply a conservative heuristic to split High-grade into CIN2/CIN3 if num_classes == 3: classes = ["CIN1", "CIN2", "CIN3"] confidences = {classes[i]: float(proba[i]) for i in range(3)} predicted_idx = int(np.argmax(proba)) predicted_label = classes[predicted_idx] avg_confidence = float(np.max(proba)) * 100 mapping_used = "direct_3class" elif num_classes == 2: # Binary model detected (e.g., Low-grade vs High-grade). We'll convert to CIN1/CIN2/CIN3 # Heuristic: # - CIN1 <- low_grade_prob # - Split high_grade_prob into CIN2 and CIN3 based on how confident 'high' is. # * If high <= 0.6 -> mostly CIN2 # * If high >= 0.8 -> mostly CIN3 # * Between 0.6 and 0.8 -> interpolate low_prob = float(proba[0]) high_prob = float(proba[1]) if high_prob <= 0.6: cin3_factor = 0.0 elif high_prob >= 0.8: cin3_factor = 1.0 else: cin3_factor = (high_prob - 0.6) / 0.2 cin1 = low_prob cin3 = high_prob * cin3_factor cin2 = high_prob - cin3 confidences = {"CIN1": cin1, "CIN2": cin2, "CIN3": cin3} # pick highest of the mapped three as primary prediction predicted_label = max(confidences.items(), key=lambda x: x[1])[0] avg_confidence = float(max(confidences.values())) * 100 mapping_used = "binary_to_3class_heuristic" else: return JSONResponse( content={ "error": "CIN classifier must output 2-class (Low/High) or 3-class probabilities (CIN1, CIN2, CIN3).", "detected_num_classes": num_classes, }, status_code=503, ) # Generate AI interpretation using Mistral client (if available) ai_interp = generate_cin_summary(predicted_label, confidences, avg_confidence) response = { "model_used": "CIN Classifier", "prediction": predicted_label, "confidence": confidences, "summary": { "ai_interpretation": ai_interp, }, } # If we used the binary->3class heuristic, include a diagnostic field so callers know it was mapped if 'mapping_used' in locals() and mapping_used == 'binary_to_3class_heuristic': response["mapping_used"] = mapping_used response["mapping_note"] = ( "The server mapped a binary Low/High classifier to CIN1/CIN2/CIN3 using a heuristic split. " "This is an approximation — for clinical use please supply a native 3-class model." ) return response elif model_name == "histopathology": result = predict_histopathology(image) return result else: return JSONResponse(content={"error": "Invalid model name"}, status_code=400) # ===================================================== # ROUTES # ===================================================== def create_designed_pdf(pdf_path, report_data, analysis_summary_json, annotated_image_path=None): doc = SimpleDocTemplate(pdf_path, pagesize=letter, rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=18) styles = getSampleStyleSheet() story = [] styles.add(ParagraphStyle(name='Title', fontSize=20, fontName='Helvetica-Bold', alignment=TA_CENTER, textColor=navy)) styles.add(ParagraphStyle(name='Section', fontSize=14, fontName='Helvetica-Bold', spaceBefore=10, spaceAfter=6)) styles.add(ParagraphStyle(name='NormalSmall', fontSize=10, leading=12)) styles.add(ParagraphStyle(name='Heading', fontSize=16, fontName='Helvetica-Bold', textColor=navy, spaceBefore=6, spaceAfter=4)) patient = report_data['patient'] analysis = report_data.get('analysis', {}) # Safely parse analysis_summary_json try: ai_summary = json.loads(analysis_summary_json) if analysis_summary_json else {} except (json.JSONDecodeError, TypeError): ai_summary = {} # Determine report type based on model used model_used = ai_summary.get('model_used', '') if 'YOLO' in model_used or 'yolo' in str(analysis.get('id', '')).lower(): report_type = "CYTOLOGY" report_title = "Cytology Report" elif 'MWT' in model_used or 'mwt' in str(model_used).lower(): # MWT is a cytology classifier; use a clearer report title for MWT results report_type = "CYTOLOGY" report_title = "Cytology Analysis Report" elif 'CIN' in model_used or 'cin' in str(analysis.get('id', '')).lower() or 'colpo' in str(analysis.get('id', '')).lower(): report_type = "COLPOSCOPY" report_title = "Colposcopy Report" elif 'histo' in str(analysis.get('id', '')).lower() or 'histopathology' in model_used.lower(): report_type = "HISTOPATHOLOGY" report_title = "Histopathology Report" else: report_type = "CYTOLOGY" report_title = "Medical Analysis Report" # Header story.append(Paragraph("MANALIFE AI", styles['Title'])) story.append(Paragraph("Advanced Medical Analysis", styles['NormalSmall'])) story.append(Spacer(1, 0.3*inch)) story.append(Paragraph(f"MEDICAL ANALYSIS REPORT OF {report_type}", styles['Heading'])) story.append(Paragraph(report_title, styles['Section'])) story.append(Spacer(1, 0.2*inch)) # Report ID and Date story.append(Paragraph(f"Report ID: {report_data.get('report_id', 'N/A')}", styles['NormalSmall'])) story.append(Paragraph(f"Generated: {datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')}", styles['NormalSmall'])) story.append(Spacer(1, 0.2*inch)) # Patient Information Section story.append(Paragraph("Patient Information", styles['Section'])) story.append(Paragraph(f"Patient ID: {patient.get('id', 'N/A')}", styles['NormalSmall'])) story.append(Paragraph(f"Exam Date: {patient.get('exam_date', 'N/A')}", styles['NormalSmall'])) story.append(Paragraph(f"Physician: {patient.get('physician', 'N/A')}", styles['NormalSmall'])) story.append(Paragraph(f"Facility: {patient.get('facility', 'N/A')}", styles['NormalSmall'])) story.append(Spacer(1, 0.2*inch)) # Sample Information Section story.append(Paragraph("Sample Information", styles['Section'])) story.append(Paragraph(f"Specimen Type: {patient.get('specimen_type', 'Cervical Cytology')}", styles['NormalSmall'])) story.append(Paragraph(f"Clinical History: {patient.get('clinical_history', 'N/A')}", styles['NormalSmall'])) story.append(Spacer(1, 0.2*inch)) # AI Analysis Section story.append(Paragraph("AI-ASSISTED ANALYSIS", styles['Section'])) story.append(Paragraph("System: Manalife AI System — Automated Analysis", styles['NormalSmall'])) # Add metrics based on report type if report_type == "HISTOPATHOLOGY": # For histopathology, show Benign/Malignant confidence confidence_dict = ai_summary.get('confidence', {}) if isinstance(confidence_dict, dict): benign_conf = confidence_dict.get('Benign', 0) * 100 malignant_conf = confidence_dict.get('Malignant', 0) * 100 story.append(Paragraph(f"Benign Confidence: {benign_conf:.2f}%", styles['NormalSmall'])) story.append(Paragraph(f"Malignant Confidence: {malignant_conf:.2f}%", styles['NormalSmall'])) elif report_type == "CYTOLOGY": # For cytology and MWT, show class confidences if available, otherwise show abnormal/normal cells confidence_dict = ai_summary.get('confidence', {}) if isinstance(confidence_dict, dict) and confidence_dict: for cls, val in confidence_dict.items(): conf_pct = val * 100 if isinstance(val, (int, float)) else 0 story.append(Paragraph(f"{cls} Confidence: {conf_pct:.2f}%", styles['NormalSmall'])) else: if 'abnormal_cells' in ai_summary: story.append(Paragraph(f"Abnormal Cells: {ai_summary.get('abnormal_cells', 'N/A')}", styles['NormalSmall'])) if 'normal_cells' in ai_summary: story.append(Paragraph(f"Normal Cells: {ai_summary.get('normal_cells', 'N/A')}", styles['NormalSmall'])) else: # For CIN/Colposcopy, show class confidences confidence_dict = ai_summary.get('confidence', {}) if isinstance(confidence_dict, dict): for cls, val in confidence_dict.items(): conf_pct = val * 100 if isinstance(val, (int, float)) else 0 story.append(Paragraph(f"{cls} Confidence: {conf_pct:.2f}%", styles['NormalSmall'])) story.append(Spacer(1, 0.1*inch)) story.append(Paragraph("AI Interpretation:", styles['NormalSmall'])) story.append(Paragraph(ai_summary.get('ai_interpretation', 'Not available.'), styles['NormalSmall'])) story.append(Spacer(1, 0.2*inch)) # If an annotated image path was provided and exists on disk, embed it if annotated_image_path: try: if os.path.isfile(annotated_image_path): story.append(Spacer(1, 0.1*inch)) # Determine image pixel size and scale to a reasonable width for PDF try: from PIL import Image as PILImageLocal with PILImageLocal.open(annotated_image_path) as im: img_w, img_h = im.size except Exception: img_w, img_h = (800, 600) # Display width in points (ReportLab uses points; 1 inch = 72 points). Assume 96 DPI for pixel->inch. display_width_px = max(300, min(img_w, 800)) width_points = min(5 * inch, (display_width_px / 96.0) * inch) img = ReportLabImage(annotated_image_path, width=width_points, kind='proportional') story.append(img) story.append(Spacer(1, 0.2*inch)) except Exception as e: # Don't fail the whole PDF creation if image embedding fails print(f"⚠️ Could not embed annotated image in PDF: {e}") # Doctor's Notes story.append(Paragraph("Doctor's Notes", styles['Section'])) story.append(Paragraph(report_data.get('doctor_notes') or 'No additional notes provided.', styles['NormalSmall'])) story.append(Spacer(1, 0.2*inch)) # Recommendations story.append(Paragraph("RECOMMENDATIONS", styles['Section'])) story.append(Paragraph("Continue routine screening as per standard guidelines. Follow up as directed by your physician.", styles['NormalSmall'])) story.append(Spacer(1, 0.3*inch)) # Signatures story.append(Paragraph("Signatures", styles['Section'])) story.append(Paragraph("Rajesh Venugopal, Physician", styles['NormalSmall'])) #story.append(Paragraph("", styles['NormalSmall'])) story.append(Spacer(1, 0.1*inch)) story.append(Paragraph(f"Generated on: {datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')}", styles['NormalSmall'])) doc.build(story) @app.post("/reports/") async def generate_report( patient_id: str = Form(...), exam_date: str = Form(...), metadata: str = Form(...), notes: str = Form(None), analysis_id: str = Form(None), analysis_summary: str = Form(None), file: UploadFile = File(None), ): """Generate a structured medical report from analysis results and metadata.""" try: # Create reports directory if it doesn't exist reports_dir = os.path.join(OUTPUT_DIR, "reports") os.makedirs(reports_dir, exist_ok=True) # Generate unique report ID report_id = f"{patient_id}_{uuid.uuid4().hex[:8]}" report_dir = os.path.join(reports_dir, report_id) os.makedirs(report_dir, exist_ok=True) # Parse metadata metadata_dict = json.loads(metadata) # Get analysis results - assuming stored in memory or retrievable # TODO: Implement analysis results storage/retrieval # Construct report data report_data = { "report_id": report_id, "generated_at": datetime.datetime.now().isoformat(), "patient": { "id": patient_id, "exam_date": exam_date, **metadata_dict }, "analysis": { "id": analysis_id, # If the analysis_id is actually an annotated image URL, store it for report embedding "annotated_image_url": analysis_id, # TODO: Add actual analysis results }, "doctor_notes": notes } # Save report data report_json = os.path.join(report_dir, "report.json") with open(report_json, "w", encoding="utf-8") as f: json.dump(report_data, f, indent=2, ensure_ascii=False) # We'll create PDF later (after parsing analysis_summary and resolving # any annotated/input image). Initialize pdf_url here. pdf_url = None # Parse analysis_summary to get AI results try: ai_summary = json.loads(analysis_summary) if analysis_summary else {} except (json.JSONDecodeError, TypeError): ai_summary = {} # Resolve annotated image: prefer AI/analysis annotated image; if none, # save the uploaded input image (if provided) into the report folder # and use that as the embedded image. annotated_img = ai_summary.get('annotated_image_url') or report_data.get("analysis", {}).get("annotated_image_url") or "" annotated_img_full = "" annotated_img_local = None if annotated_img: # If it's an outputs path served by StaticFiles, map to local file if isinstance(annotated_img, str) and annotated_img.startswith('/outputs/'): rel = annotated_img[len('/outputs/'):].lstrip('/') annotated_img_local = os.path.join(OUTPUT_DIR, rel) annotated_img_full = annotated_img else: # keep absolute URLs as-is for HTML annotated_img_full = annotated_img if isinstance(annotated_img, str) else '' # If no annotated image provided, but an input file was uploaded, save it if not annotated_img_full and file is not None and getattr(file, 'filename', None): try: input_filename = f"input_image{os.path.splitext(file.filename)[1] or '.jpg'}" input_path = os.path.join(report_dir, input_filename) contents = await file.read() with open(input_path, 'wb') as out_f: out_f.write(contents) annotated_img_full = f"/outputs/reports/{report_id}/{input_filename}" annotated_img_local = input_path except Exception as e: print(f"⚠️ Failed to save uploaded input image for report: {e}") # Ensure annotated_img_full has a leading slash if it's a relative path if annotated_img_full and not annotated_img_full.startswith(('http://', 'https://')): annotated_img_full = annotated_img_full if annotated_img_full.startswith('/') else '/' + annotated_img_full # If we have a local annotated image but it's stored in the shared images folder # (e.g. /outputs/images/...), copy it into this report's folder so the HTML/PDF # can reference the image relative to the report directory. This also makes the # image visible when opening report.html directly from disk (file://). try: if annotated_img_local: annotated_img_local_abs = os.path.abspath(annotated_img_local) report_dir_abs = os.path.abspath(report_dir) # If the image is not already in the report directory, copy it there if not os.path.commonpath([annotated_img_local_abs, report_dir_abs]) == report_dir_abs: src_basename = os.path.basename(annotated_img_local_abs) dest_name = f"annotated_{src_basename}" dest_path = os.path.join(report_dir, dest_name) try: shutil.copy2(annotated_img_local_abs, dest_path) annotated_img_local = dest_path annotated_img_full = f"/outputs/reports/{report_id}/{dest_name}" except Exception as e: # If copy fails, keep using the original annotated_img_full (may be served by StaticFiles) print(f"⚠️ Failed to copy annotated image into report folder: {e}") except Exception: pass # Now attempt to create the PDF (passing the local annotated image path # so the PDF writer can embed it). If annotation is remote or not # available, PDF creation will still proceed without the image. if REPORTLAB_AVAILABLE: try: pdf_path = os.path.join(report_dir, "report.pdf") create_designed_pdf(pdf_path, report_data, analysis_summary, annotated_img_local) pdf_url = f"/outputs/reports/{report_id}/report.pdf" except Exception as e: print(f"Error creating designed PDF: {e}") pdf_url = None # Determine report type based on analysis summary or model used model_used = ai_summary.get('model_used', '') if 'YOLO' in model_used or 'yolo' in str(analysis_id).lower(): report_type = "Cytology" report_title = "Cytology Report" elif 'MWT' in model_used or 'mwt' in str(model_used).lower() or 'mwt' in str(analysis_id).lower(): # MWT is a cytology classifier — use clearer title report_type = "Cytology" report_title = "Cytology Analysis Report" elif 'CIN' in model_used or 'cin' in str(analysis_id).lower() or 'colpo' in str(analysis_id).lower(): report_type = "Colposcopy" report_title = "Colposcopy Report" elif 'histo' in str(analysis_id).lower() or 'histopathology' in model_used.lower(): report_type = "Histopathology" report_title = "Histopathology Report" else: # Default fallback report_type = "Cytology" report_title = "Medical Analysis Report" # Build analysis metrics HTML based on report type if report_type == "Histopathology": # For histopathology, show Benign/Malignant confidence from the confidence dict confidence_dict = ai_summary.get('confidence', {}) benign_conf = confidence_dict.get('Benign', 0) * 100 if isinstance(confidence_dict, dict) else 0 malignant_conf = confidence_dict.get('Malignant', 0) * 100 if isinstance(confidence_dict, dict) else 0 analysis_metrics_html = f""" SystemManalife AI System — Automated Analysis Benign Confidence{benign_conf:.2f}% Malignant Confidence{malignant_conf:.2f}% """ elif report_type == "Cytology": # For cytology (YOLO) or MWT, show class confidences if provided, else abnormal/normal counts confidence_dict = ai_summary.get('confidence', {}) if isinstance(confidence_dict, dict) and confidence_dict: confidence_rows = "" for cls, val in confidence_dict.items(): conf_pct = val * 100 if isinstance(val, (int, float)) else 0 confidence_rows += f"{cls} Confidence{conf_pct:.2f}%\n " analysis_metrics_html = f""" SystemManalife AI System — Automated Analysis {confidence_rows} """ else: analysis_metrics_html = f""" SystemManalife AI System — Automated Analysis Abnormal Cells{ai_summary.get('abnormal_cells', 'N/A')} Normal Cells{ai_summary.get('normal_cells', 'N/A')} """ else: # For CIN/Colposcopy or other models, show generic confidence confidence_dict = ai_summary.get('confidence', {}) confidence_rows = "" if isinstance(confidence_dict, dict): for cls, val in confidence_dict.items(): conf_pct = val * 100 if isinstance(val, (int, float)) else 0 confidence_rows += f"{cls} Confidence{conf_pct:.2f}%\n " analysis_metrics_html = f""" SystemManalife AI System — Automated Analysis {confidence_rows} """ # Build final HTML including download links and embedded annotated image report_html = os.path.join(report_dir, "report.html") json_url = f"/outputs/reports/{report_id}/report.json" html_url = f"/outputs/reports/{report_id}/report.html" # annotated_img_full was computed earlier; ensure it's defined and set # annotated_img (used by the HTML template conditional) accordingly. if 'annotated_img_full' not in locals() or not annotated_img_full: annotated_img_full = '' annotated_img = annotated_img_full # Compute a display path for the HTML. Prefer a relative filename when the # annotated image is copied into the same report folder. This makes the # HTML work when opened directly from disk (or via HF file viewer). annotated_img_display = annotated_img_full try: if annotated_img_local: annotated_img_local_abs = os.path.abspath(annotated_img_local) report_dir_abs = os.path.abspath(report_dir) # If the annotated image resides in the report folder, reference by basename if os.path.commonpath([annotated_img_local_abs, report_dir_abs]) == report_dir_abs: annotated_img_display = os.path.basename(annotated_img_local_abs) else: # If annotated image is inside the outputs/reports// path but not same # absolute path (edge cases), make it relative to the report dir prefix = f"/outputs/reports/{report_id}/" if isinstance(annotated_img_full, str) and annotated_img_full.startswith(prefix): annotated_img_display = annotated_img_full[len(prefix):] except Exception: annotated_img_display = annotated_img_full download_pdf_btn = f'' if pdf_url else '' # Format generated time generated_time = datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p') html_content = f""" {report_title} — Manalife AI
Manalife Logo

MANALIFE AI — Medical Analysis

Advanced cytological colposcopy and histopathology reporting
contact@manalife.ai • +1 (555) 123-4567
MEDICAL ANALYSIS REPORT OF {report_type.upper()}

{report_title}

Report ID: {report_id}
Generated: {generated_time}

Patient Information
Patient ID{patient_id}
Exam Date{exam_date}
Physician{metadata_dict.get('physician', 'N/A')}
Facility{metadata_dict.get('facility', 'N/A')}
Sample Information
Specimen Type{metadata_dict.get('specimen_type', 'N/A')}
Clinical History{metadata_dict.get('clinical_history', 'N/A')}
Collected{exam_date}
Reported{generated_time}
AI-Assisted Analysis
{analysis_metrics_html}
AI Interpretation:
{ai_summary.get('ai_interpretation', 'No AI interpretation available.')}
{'
Annotated Analysis Image
Annotated Analysis Result
' if annotated_img else ''}
Doctor\'s Notes

{notes or 'No additional notes provided.'}

Recommendations

Continue routine screening as per standard guidelines. Follow up as directed by your physician.

Signatures
Rajesh Venugopal
Physician
{download_pdf_btn}
""" with open(report_html, "w", encoding="utf-8") as f: f.write(html_content) # Update report.json to include the resolved annotated image url so callers can find it try: report_data['analysis']['annotated_image_url'] = annotated_img_full or '' with open(report_json, 'w', encoding='utf-8') as f: json.dump(report_data, f, indent=2, ensure_ascii=False) except Exception as e: print(f"⚠️ Failed to update report.json with annotated image url: {e}") return { "report_id": report_id, "json_url": json_url, "html_url": html_url, "pdf_url": pdf_url, } except Exception as e: return JSONResponse( content={"error": f"Failed to generate report: {str(e)}"}, status_code=500 ) @app.get("/reports/{report_id}") async def get_report(report_id: str): """Fetch a generated report by ID.""" report_dir = os.path.join(OUTPUT_DIR, "reports", report_id) report_json = os.path.join(report_dir, "report.json") if not os.path.exists(report_json): return JSONResponse( content={"error": "Report not found"}, status_code=404 ) with open(report_json, "r") as f: report_data = json.load(f) return report_data @app.get("/reports") async def list_reports(patient_id: str = None): """List all generated reports, optionally filtered by patient ID.""" reports_dir = os.path.join(OUTPUT_DIR, "reports") if not os.path.exists(reports_dir): return {"reports": []} reports = [] for report_id in os.listdir(reports_dir): report_json = os.path.join(reports_dir, report_id, "report.json") if os.path.exists(report_json): with open(report_json, "r") as f: report_data = json.load(f) if not patient_id or report_data["patient"]["id"] == patient_id: reports.append({ "report_id": report_id, "patient_id": report_data["patient"]["id"], "exam_date": report_data["patient"]["exam_date"], "generated_at": report_data["generated_at"] }) return {"reports": sorted(reports, key=lambda r: r["generated_at"], reverse=True)} @app.get("/models") def get_models(): return {"available_models": ["yolo", "mwt", "cin", "histopathology"]} @app.get("/health") def health(): return {"message": "Pathora Medical Diagnostic API is running!"} # FRONTEND # ===================================================== # Serve frontend only if it has been built; avoid startup failure when dist/ is missing. FRONTEND_DIST = os.path.abspath(os.path.join(os.path.dirname(__file__), "../frontend/dist")) # Check if frontend/dist exists in /app (Docker), otherwise check relative to script location if not os.path.isdir(FRONTEND_DIST): # Fallback for Docker: frontend is copied to ./frontend/dist during build FRONTEND_DIST = os.path.join(os.path.dirname(__file__), "frontend/dist") ASSETS_DIR = os.path.join(FRONTEND_DIST, "assets") if os.path.isdir(ASSETS_DIR): app.mount("/assets", StaticFiles(directory=ASSETS_DIR), name="assets") else: print("ℹ️ Frontend assets directory not found — skipping /assets mount.") @app.get("/") async def serve_frontend(): index_path = os.path.join(FRONTEND_DIST, "index.html") if os.path.isfile(index_path): return FileResponse(index_path) return JSONResponse({"message": "Backend is running. Frontend build not found."}) @app.get("/{file_path:path}") async def serve_static_files(file_path: str): """Serve static files from frontend dist (images, logos, etc.)""" # Skip API routes if file_path.startswith(("predict", "reports", "models", "health", "outputs", "assets", "cyto", "colpo", "histo")): return JSONResponse({"error": "Not found"}, status_code=404) # Try to serve file from dist root static_file = os.path.join(FRONTEND_DIST, file_path) if os.path.isfile(static_file): return FileResponse(static_file) # Fallback to index.html for client-side routing index_path = os.path.join(FRONTEND_DIST, "index.html") if os.path.isfile(index_path): return FileResponse(index_path) return JSONResponse({"error": "Not found"}, status_code=404) if __name__ == "__main__": # Use PORT provided by the environment (Hugging Face Spaces sets PORT=7860) port = int(os.environ.get("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port)