import os import shutil for d in ["/tmp/huggingface", "/tmp/Ultralytics", "/tmp/matplotlib", "/tmp/torch", "/root/.cache"]: shutil.rmtree(d, ignore_errors=True) os.environ["HF_HOME"] = "/tmp/huggingface" os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface" os.environ["TORCH_HOME"] = "/tmp/torch" os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" import json import uuid import datetime import numpy as np import torch import cv2 import joblib import torch.nn as nn import torchvision.transforms as transforms import torchvision.models as models from io import BytesIO from PIL import Image as PILImage from fastapi import FastAPI, File, UploadFile, Form from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, FileResponse import tensorflow as tf from model_histo import BreastCancerClassifier from fastapi.staticfiles import StaticFiles import uvicorn try: from reportlab.lib.pagesizes import letter from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image as ReportLabImage from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.enums import TA_CENTER, TA_JUSTIFY from reportlab.lib.units import inch from reportlab.lib.colors import navy, black REPORTLAB_AVAILABLE = True except ImportError: REPORTLAB_AVAILABLE = False from ultralytics import YOLO from sklearn.preprocessing import MinMaxScaler from model import MWT as create_model from augmentations import Augmentations from huggingface_hub import InferenceClient # ===================================================== # SETUP TEMP DIRS AND ENV # ===================================================== for d in ["/tmp/huggingface", "/tmp/Ultralytics", "/tmp/matplotlib", "/tmp/torch"]: shutil.rmtree(d, ignore_errors=True) os.environ["HF_HOME"] = "/tmp/huggingface" os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface" os.environ["TORCH_HOME"] = "/tmp/torch" os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" # ===================================================== # HUGGING FACE CLIENT SETUP # ===================================================== HF_MODEL_ID = "BioMistral/BioMistral-7B" hf_token = os.getenv("HF_TOKEN") client = None if hf_token: try: client = InferenceClient(model=HF_MODEL_ID, token=hf_token) print(f"✅ Hugging Face InferenceClient initialized for {HF_MODEL_ID}") except Exception as e: print("⚠️ Failed to initialize Hugging Face client:", e) else: print("⚠️ Warning: No HF_TOKEN found — summaries will be skipped.") def generate_ai_summary(abnormal_cells, normal_cells, avg_confidence): """Generate a brief medical interpretation using Mistral.""" if not client: return "⚠️ Hugging Face client not initialized — skipping summary." try: prompt = f"""Act as a cytopathology expert providing a brief diagnostic interpretation. Observed Cell Counts: - {abnormal_cells} Abnormal Cells - {normal_cells} Normal Cells Write a 2-3 sentence professional medical assessment focusing on: 1. Cell count analysis 2. Abnormality ratio ({abnormal_cells/(abnormal_cells + normal_cells)*100:.1f}%) 3. Clinical significance Use objective, scientific language suitable for a pathology report.""" # Use streaming to avoid StopIteration response = client.text_generation( prompt, max_new_tokens=200, temperature=0.7, stream=False, details=True, stop_sequences=["\n\n", "###"] ) # Handle different response formats if hasattr(response, 'generated_text'): return response.generated_text.strip() elif isinstance(response, dict): return response.get('generated_text', '').strip() elif isinstance(response, str): return response.strip() # Fallback summary if response format is unexpected ratio = abnormal_cells / (abnormal_cells + normal_cells) * 100 if (abnormal_cells + normal_cells) > 0 else 0 return f"Analysis shows {abnormal_cells} abnormal cells ({ratio:.1f}%) and {normal_cells} normal cells." except Exception as e: # Provide a structured fallback summary instead of error message total = abnormal_cells + normal_cells if total == 0: return "No cells were detected in the sample. Consider re-scanning or adjusting detection parameters." ratio = (abnormal_cells / total) * 100 severity = "high" if ratio > 70 else "moderate" if ratio > 30 else "low" return f"Quantitative analysis detected {abnormal_cells} abnormal cells ({ratio:.1f}%) among {total} total cells, indicating {severity} abnormality ratio." def generate_mwt_summary(predicted_label, confidences, avg_confidence): """Generate a short MWT-specific interpretation using the HF client when available.""" if not client: return "⚠️ Hugging Face client not initialized — skipping AI interpretation." try: prompt = f""" You are a concise cytopathology expert. Given an MWT classifier result, write a 1-2 sentence professional interpretation suitable for embedding in a diagnostic report. Result: - Predicted label: {predicted_label} - Class probabilities: {json.dumps(confidences)} Provide guidance on the significance of the result and any suggested next steps in plain, objective language. """ response = client.text_generation( prompt, max_new_tokens=120, temperature=0.2, stream=False, details=True, stop_sequences=["\n\n", "###"] ) if hasattr(response, 'generated_text'): return response.generated_text.strip() elif isinstance(response, dict): return response.get('generated_text', '').strip() elif isinstance(response, str): return response.strip() return f"Result: {predicted_label}." except Exception as e: return f"Quantitative result: {predicted_label}." def generate_cin_summary(predicted_grade, confidences, avg_confidence): """Generate a short CIN-specific interpretation using the HF client when available.""" if not client: return "⚠️ Hugging Face client not initialized — skipping AI interpretation." try: prompt = f""" You are a concise gynecologic pathology expert. Given a CIN classifier result, write a 1-2 sentence professional interpretation suitable for a diagnostic report. Result: - Predicted grade: {predicted_grade} - Class probabilities: {json.dumps(confidences)} Provide a brief statement about clinical significance and suggested next steps (e.g., further colposcopic evaluation) in objective, clinical language. """ response = client.text_generation( prompt, max_new_tokens=140, temperature=0.2, stream=False, details=True, stop_sequences=["\n\n", "###"] ) if hasattr(response, 'generated_text'): return response.generated_text.strip() elif isinstance(response, dict): return response.get('generated_text', '').strip() elif isinstance(response, str): return response.strip() return f"Result: {predicted_grade}." except Exception: return f"Quantitative result: {predicted_grade}." # ===================================================== # FASTAPI SETUP # ===================================================== app = FastAPI(title="Pathora Medical Diagnostic API") app.add_middleware( CORSMiddleware, allow_origins=["*", "http://localhost:5173", "http://127.0.0.1:5173"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], expose_headers=["*"] # Allow access to response headers ) # Use /tmp for outputs in Hugging Face Spaces (writable directory) OUTPUT_DIR = os.environ.get("OUTPUT_DIR", "/tmp/outputs") os.makedirs(OUTPUT_DIR, exist_ok=True) # Create image outputs dir IMAGES_DIR = os.path.join(OUTPUT_DIR, "images") os.makedirs(IMAGES_DIR, exist_ok=True) app.mount("/outputs", StaticFiles(directory=OUTPUT_DIR), name="outputs") # Mount public sample images from frontend dist (Vite copies public/ to dist/ root) # Check both possible locations: frontend/dist (Docker) and ../frontend/dist (local dev) FRONTEND_DIST_CHECK = os.path.join(os.path.dirname(__file__), "frontend/dist") if not os.path.isdir(FRONTEND_DIST_CHECK): FRONTEND_DIST_CHECK = os.path.abspath(os.path.join(os.path.dirname(__file__), "../frontend/dist")) for sample_dir in ["cyto", "colpo", "histo"]: sample_path = os.path.join(FRONTEND_DIST_CHECK, sample_dir) if os.path.isdir(sample_path): app.mount(f"/{sample_dir}", StaticFiles(directory=sample_path), name=sample_dir) print(f"✅ Mounted /{sample_dir} from {sample_path}") else: print(f"⚠️ Sample directory not found: {sample_path}") # Mount other static assets (logos, banners) from dist root for static_file in ["banner.jpeg", "white_logo.png", "black_logo.png", "manalife_LOGO.jpg"]: static_path = os.path.join(FRONTEND_DIST_CHECK, static_file) if os.path.isfile(static_path): print(f"✅ Static file available: /{static_file}") device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # ===================================================== # MODEL LOADS # ===================================================== print("🔹 Loading YOLO model...") yolo_model = YOLO("best2.pt") print("🔹 Loading MWT model...") mwt_model = create_model(num_classes=2).to(device) mwt_model.load_state_dict(torch.load("MWTclass2.pth", map_location=device)) mwt_model.eval() mwt_class_names = ["Negative", "Positive"] print("🔹 Loading CIN model...") try: clf = joblib.load("logistic_regression_model.pkl") except Exception as e: print(f"⚠️ CIN classifier not available (logistic_regression_model.pkl missing or invalid): {e}") clf = None yolo_colposcopy = YOLO("yolo_colposcopy.pt") # ===================================================== # RESNET FEATURE EXTRACTORS FOR CIN # ===================================================== def build_resnet(model_name="resnet50"): if model_name == "resnet50": model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT) elif model_name == "resnet101": model = models.resnet101(weights=models.ResNet101_Weights.DEFAULT) elif model_name == "resnet152": model = models.resnet152(weights=models.ResNet152_Weights.DEFAULT) model.eval().to(device) return ( nn.Sequential(model.conv1, model.bn1, model.relu, model.maxpool), model.layer1, model.layer2, model.layer3, model.layer4, ) gap = nn.AdaptiveAvgPool2d((1, 1)) gmp = nn.AdaptiveMaxPool2d((1, 1)) resnet50_blocks = build_resnet("resnet50") resnet101_blocks = build_resnet("resnet101") resnet152_blocks = build_resnet("resnet152") transform = transforms.Compose([ transforms.ToPILImage(), transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) def preprocess_for_mwt(image_np): img = cv2.resize(image_np, (224, 224)) img = Augmentations.Normalization((0, 1))(img) img = np.array(img, np.float32) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = img.transpose(2, 0, 1) img = np.expand_dims(img, axis=0) return torch.Tensor(img) def extract_cbf_features(blocks, img_t): block1, block2, block3, block4, block5 = blocks with torch.no_grad(): f1 = block1(img_t) f2 = block2(f1) f3 = block3(f2) f4 = block4(f3) f5 = block5(f4) p1 = gmp(f1).view(-1) p2 = gmp(f2).view(-1) p3 = gap(f3).view(-1) p4 = gap(f4).view(-1) p5 = gap(f5).view(-1) return torch.cat([p1, p2, p3, p4, p5], dim=0).cpu().numpy() # ===================================================== # Model 4: Histopathology Classifier (TensorFlow) # ===================================================== print("🔹 Attempting to load Breast Cancer Histopathology model...") try: classifier = BreastCancerClassifier(fine_tune=False) # Safely handle Hugging Face token auth hf_token = os.getenv("HF_TOKEN") if hf_token: if classifier.authenticate_huggingface(): print("✅ Hugging Face authentication successful.") else: print("⚠️ Warning: Hugging Face authentication failed, using local model only.") else: print("⚠️ HF_TOKEN not found in environment — skipping authentication.") # Load Path Foundation model if classifier.load_path_foundation(): print("✅ Loaded Path Foundation base model.") else: print("⚠️ Could not load Path Foundation base model, continuing with local weights only.") # Load trained histopathology model model_path = "histopathology_trained_model.keras" if os.path.exists(model_path): classifier.model = tf.keras.models.load_model(model_path) print(f"✅ Loaded local histopathology model: {model_path}") else: print(f"⚠️ Model file not found: {model_path}") except Exception as e: classifier = None print(f"❌ Error initializing histopathology model: {e}") def predict_histopathology(image): if classifier is None: return {"error": "Histopathology model not available."} try: if image.mode != "RGB": image = image.convert("RGB") image = image.resize((224, 224)) img_array = np.expand_dims(np.array(image).astype("float32") / 255.0, axis=0) embeddings = classifier.extract_embeddings(img_array) prediction_proba = classifier.model.predict(embeddings, verbose=0)[0] predicted_class = int(np.argmax(prediction_proba)) class_names = ["Benign", "Malignant"] # Return confidence as dictionary with both class probabilities (like MWT/CIN) confidences = {class_names[i]: float(prediction_proba[i]) for i in range(len(class_names))} avg_confidence = float(np.max(prediction_proba)) * 100 return { "model_used": "Histopathology Classifier", "prediction": class_names[predicted_class], "confidence": confidences, "summary": { "ai_interpretation": f"Histopathological analysis indicates {class_names[predicted_class].lower()} tissue.", }, } except Exception as e: return {"error": f"Histopathology prediction failed: {e}"} # ===================================================== # MAIN ENDPOINT # ===================================================== @app.post("/predict/") async def predict(model_name: str = Form(...), file: UploadFile = File(...)): print(f"Received prediction request - model: {model_name}, file: {file.filename}") # Validate model name if model_name not in ["yolo", "mwt", "cin", "histopathology"]: return JSONResponse( content={ "error": f"Invalid model_name: {model_name}. Must be one of: yolo, mwt, cin, histopathology" }, status_code=400 ) # Validate and read file if not file.filename: return JSONResponse( content={"error": "No file provided"}, status_code=400 ) contents = await file.read() if len(contents) == 0: return JSONResponse( content={"error": "Empty file provided"}, status_code=400 ) # Attempt to open and validate image try: image = PILImage.open(BytesIO(contents)).convert("RGB") image_np = np.array(image) if image_np.size == 0: raise ValueError("Empty image array") print(f"Successfully loaded image, shape: {image_np.shape}") except Exception as e: return JSONResponse( content={"error": f"Invalid image file: {str(e)}"}, status_code=400 ) if model_name == "yolo": results = yolo_model(image) detections_json = results[0].to_json() detections = json.loads(detections_json) abnormal_cells = sum(1 for d in detections if d["name"] == "abnormal") normal_cells = sum(1 for d in detections if d["name"] == "normal") avg_confidence = np.mean([d.get("confidence", 0) for d in detections]) * 100 if detections else 0 ai_summary = generate_ai_summary(abnormal_cells, normal_cells, avg_confidence) output_filename = f"detected_{uuid.uuid4().hex[:8]}.jpg" output_path = os.path.join(IMAGES_DIR, output_filename) results[0].save(filename=output_path) return { "model_used": "YOLO Detection", "detections": detections, "annotated_image_url": f"/outputs/images/{output_filename}", "summary": { "abnormal_cells": abnormal_cells, "normal_cells": normal_cells, "ai_interpretation": ai_summary, }, } elif model_name == "mwt": tensor = preprocess_for_mwt(image_np) with torch.no_grad(): output = mwt_model(tensor.to(device)).cpu() probs = torch.softmax(output, dim=1)[0] confidences = {mwt_class_names[i]: float(probs[i]) for i in range(2)} predicted_label = mwt_class_names[int(torch.argmax(probs).item())] # Average / primary confidence for display avg_confidence = float(torch.max(probs).item()) * 100 # Generate a brief AI interpretation using the Mistral client (if available) ai_interp = generate_mwt_summary(predicted_label, confidences, avg_confidence) return { "model_used": "MWT Classifier", "prediction": predicted_label, "confidence": confidences, "summary": { "ai_interpretation": ai_interp, }, } elif model_name == "cin": if clf is None: return JSONResponse( content={"error": "CIN classifier not available on server."}, status_code=503, ) # Decode uploaded image and run colposcopy detector nparr = np.frombuffer(contents, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) results = yolo_colposcopy.predict(source=img, conf=0.7, save=False, verbose=False) if len(results[0].boxes) == 0: return {"error": "No cervix detected"} x1, y1, x2, y2 = map(int, results[0].boxes.xyxy[0].cpu().numpy()) crop = img[y1:y2, x1:x2] crop = cv2.resize(crop, (224, 224)) img_t = transform(crop).unsqueeze(0).to(device) # Extract features from multiple ResNet backbones f50 = extract_cbf_features(resnet50_blocks, img_t) f101 = extract_cbf_features(resnet101_blocks, img_t) f152 = extract_cbf_features(resnet152_blocks, img_t) features = np.concatenate([f50, f101, f152]).reshape(1, -1) # Scale and predict X_scaled = MinMaxScaler().fit_transform(features) # Ensure classifier supports probability outputs try: proba = clf.predict_proba(X_scaled)[0] except Exception as e: return JSONResponse( content={"error": "CIN classifier does not support probability estimates (predict_proba)."}, status_code=503, ) num_classes = int(len(proba)) # Handle different classifier output sizes: # - If 3 classes: map directly to CIN1/CIN2/CIN3 # - If 2 classes: apply a conservative heuristic to split High-grade into CIN2/CIN3 if num_classes == 3: classes = ["CIN1", "CIN2", "CIN3"] confidences = {classes[i]: float(proba[i]) for i in range(3)} predicted_idx = int(np.argmax(proba)) predicted_label = classes[predicted_idx] avg_confidence = float(np.max(proba)) * 100 mapping_used = "direct_3class" elif num_classes == 2: # Binary model detected (e.g., Low-grade vs High-grade). We'll convert to CIN1/CIN2/CIN3 # Heuristic: # - CIN1 <- low_grade_prob # - Split high_grade_prob into CIN2 and CIN3 based on how confident 'high' is. # * If high <= 0.6 -> mostly CIN2 # * If high >= 0.8 -> mostly CIN3 # * Between 0.6 and 0.8 -> interpolate low_prob = float(proba[0]) high_prob = float(proba[1]) if high_prob <= 0.6: cin3_factor = 0.0 elif high_prob >= 0.8: cin3_factor = 1.0 else: cin3_factor = (high_prob - 0.6) / 0.2 cin1 = low_prob cin3 = high_prob * cin3_factor cin2 = high_prob - cin3 confidences = {"CIN1": cin1, "CIN2": cin2, "CIN3": cin3} # pick highest of the mapped three as primary prediction predicted_label = max(confidences.items(), key=lambda x: x[1])[0] avg_confidence = float(max(confidences.values())) * 100 mapping_used = "binary_to_3class_heuristic" else: return JSONResponse( content={ "error": "CIN classifier must output 2-class (Low/High) or 3-class probabilities (CIN1, CIN2, CIN3).", "detected_num_classes": num_classes, }, status_code=503, ) # Generate AI interpretation using Mistral client (if available) ai_interp = generate_cin_summary(predicted_label, confidences, avg_confidence) response = { "model_used": "CIN Classifier", "prediction": predicted_label, "confidence": confidences, "summary": { "ai_interpretation": ai_interp, }, } # If we used the binary->3class heuristic, include a diagnostic field so callers know it was mapped if 'mapping_used' in locals() and mapping_used == 'binary_to_3class_heuristic': response["mapping_used"] = mapping_used response["mapping_note"] = ( "The server mapped a binary Low/High classifier to CIN1/CIN2/CIN3 using a heuristic split. " "This is an approximation — for clinical use please supply a native 3-class model." ) return response elif model_name == "histopathology": result = predict_histopathology(image) return result else: return JSONResponse(content={"error": "Invalid model name"}, status_code=400) # ===================================================== # ROUTES # ===================================================== def create_designed_pdf(pdf_path, report_data, analysis_summary_json, annotated_image_path=None): doc = SimpleDocTemplate(pdf_path, pagesize=letter, rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=18) styles = getSampleStyleSheet() story = [] styles.add(ParagraphStyle(name='Title', fontSize=20, fontName='Helvetica-Bold', alignment=TA_CENTER, textColor=navy)) styles.add(ParagraphStyle(name='Section', fontSize=14, fontName='Helvetica-Bold', spaceBefore=10, spaceAfter=6)) styles.add(ParagraphStyle(name='NormalSmall', fontSize=10, leading=12)) styles.add(ParagraphStyle(name='Heading', fontSize=16, fontName='Helvetica-Bold', textColor=navy, spaceBefore=6, spaceAfter=4)) patient = report_data['patient'] analysis = report_data.get('analysis', {}) # Safely parse analysis_summary_json try: ai_summary = json.loads(analysis_summary_json) if analysis_summary_json else {} except (json.JSONDecodeError, TypeError): ai_summary = {} # Determine report type based on model used model_used = ai_summary.get('model_used', '') if 'YOLO' in model_used or 'yolo' in str(analysis.get('id', '')).lower(): report_type = "CYTOLOGY" report_title = "Cytology Report" elif 'MWT' in model_used or 'mwt' in str(model_used).lower(): # MWT is a cytology classifier; use a clearer report title for MWT results report_type = "CYTOLOGY" report_title = "Cytology Analysis Report" elif 'CIN' in model_used or 'cin' in str(analysis.get('id', '')).lower() or 'colpo' in str(analysis.get('id', '')).lower(): report_type = "COLPOSCOPY" report_title = "Colposcopy Report" elif 'histo' in str(analysis.get('id', '')).lower() or 'histopathology' in model_used.lower(): report_type = "HISTOPATHOLOGY" report_title = "Histopathology Report" else: report_type = "CYTOLOGY" report_title = "Medical Analysis Report" # Header story.append(Paragraph("MANALIFE AI", styles['Title'])) story.append(Paragraph("Advanced Medical Analysis", styles['NormalSmall'])) story.append(Spacer(1, 0.3*inch)) story.append(Paragraph(f"MEDICAL ANALYSIS REPORT OF {report_type}", styles['Heading'])) story.append(Paragraph(report_title, styles['Section'])) story.append(Spacer(1, 0.2*inch)) # Report ID and Date story.append(Paragraph(f"Report ID: {report_data.get('report_id', 'N/A')}", styles['NormalSmall'])) story.append(Paragraph(f"Generated: {datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')}", styles['NormalSmall'])) story.append(Spacer(1, 0.2*inch)) # Patient Information Section story.append(Paragraph("Patient Information", styles['Section'])) story.append(Paragraph(f"Patient ID: {patient.get('id', 'N/A')}", styles['NormalSmall'])) story.append(Paragraph(f"Exam Date: {patient.get('exam_date', 'N/A')}", styles['NormalSmall'])) story.append(Paragraph(f"Physician: {patient.get('physician', 'N/A')}", styles['NormalSmall'])) story.append(Paragraph(f"Facility: {patient.get('facility', 'N/A')}", styles['NormalSmall'])) story.append(Spacer(1, 0.2*inch)) # Sample Information Section story.append(Paragraph("Sample Information", styles['Section'])) story.append(Paragraph(f"Specimen Type: {patient.get('specimen_type', 'Cervical Cytology')}", styles['NormalSmall'])) story.append(Paragraph(f"Clinical History: {patient.get('clinical_history', 'N/A')}", styles['NormalSmall'])) story.append(Spacer(1, 0.2*inch)) # AI Analysis Section story.append(Paragraph("AI-ASSISTED ANALYSIS", styles['Section'])) story.append(Paragraph("System: Manalife AI System — Automated Analysis", styles['NormalSmall'])) # Add metrics based on report type if report_type == "HISTOPATHOLOGY": # For histopathology, show Benign/Malignant confidence confidence_dict = ai_summary.get('confidence', {}) if isinstance(confidence_dict, dict): benign_conf = confidence_dict.get('Benign', 0) * 100 malignant_conf = confidence_dict.get('Malignant', 0) * 100 story.append(Paragraph(f"Benign Confidence: {benign_conf:.2f}%", styles['NormalSmall'])) story.append(Paragraph(f"Malignant Confidence: {malignant_conf:.2f}%", styles['NormalSmall'])) elif report_type == "CYTOLOGY": # For cytology and MWT, show class confidences if available, otherwise show abnormal/normal cells confidence_dict = ai_summary.get('confidence', {}) if isinstance(confidence_dict, dict) and confidence_dict: for cls, val in confidence_dict.items(): conf_pct = val * 100 if isinstance(val, (int, float)) else 0 story.append(Paragraph(f"{cls} Confidence: {conf_pct:.2f}%", styles['NormalSmall'])) else: if 'abnormal_cells' in ai_summary: story.append(Paragraph(f"Abnormal Cells: {ai_summary.get('abnormal_cells', 'N/A')}", styles['NormalSmall'])) if 'normal_cells' in ai_summary: story.append(Paragraph(f"Normal Cells: {ai_summary.get('normal_cells', 'N/A')}", styles['NormalSmall'])) else: # For CIN/Colposcopy, show class confidences confidence_dict = ai_summary.get('confidence', {}) if isinstance(confidence_dict, dict): for cls, val in confidence_dict.items(): conf_pct = val * 100 if isinstance(val, (int, float)) else 0 story.append(Paragraph(f"{cls} Confidence: {conf_pct:.2f}%", styles['NormalSmall'])) story.append(Spacer(1, 0.1*inch)) story.append(Paragraph("AI Interpretation:", styles['NormalSmall'])) story.append(Paragraph(ai_summary.get('ai_interpretation', 'Not available.'), styles['NormalSmall'])) story.append(Spacer(1, 0.2*inch)) # If an annotated image path was provided and exists on disk, embed it if annotated_image_path: try: if os.path.isfile(annotated_image_path): story.append(Spacer(1, 0.1*inch)) # Determine image pixel size and scale to a reasonable width for PDF try: from PIL import Image as PILImageLocal with PILImageLocal.open(annotated_image_path) as im: img_w, img_h = im.size except Exception: img_w, img_h = (800, 600) # Display width in points (ReportLab uses points; 1 inch = 72 points). Assume 96 DPI for pixel->inch. display_width_px = max(300, min(img_w, 800)) width_points = min(5 * inch, (display_width_px / 96.0) * inch) img = ReportLabImage(annotated_image_path, width=width_points, kind='proportional') story.append(img) story.append(Spacer(1, 0.2*inch)) except Exception as e: # Don't fail the whole PDF creation if image embedding fails print(f"⚠️ Could not embed annotated image in PDF: {e}") # Doctor's Notes story.append(Paragraph("Doctor's Notes", styles['Section'])) story.append(Paragraph(report_data.get('doctor_notes') or 'No additional notes provided.', styles['NormalSmall'])) story.append(Spacer(1, 0.2*inch)) # Recommendations story.append(Paragraph("RECOMMENDATIONS", styles['Section'])) story.append(Paragraph("Continue routine screening as per standard guidelines. Follow up as directed by your physician.", styles['NormalSmall'])) story.append(Spacer(1, 0.3*inch)) # Signatures story.append(Paragraph("Signatures", styles['Section'])) story.append(Paragraph("Rajesh Venugopal, Physician", styles['NormalSmall'])) #story.append(Paragraph("", styles['NormalSmall'])) story.append(Spacer(1, 0.1*inch)) story.append(Paragraph(f"Generated on: {datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')}", styles['NormalSmall'])) doc.build(story) @app.post("/reports/") async def generate_report( patient_id: str = Form(...), exam_date: str = Form(...), metadata: str = Form(...), notes: str = Form(None), analysis_id: str = Form(None), analysis_summary: str = Form(None), file: UploadFile = File(None), ): """Generate a structured medical report from analysis results and metadata.""" try: # Create reports directory if it doesn't exist reports_dir = os.path.join(OUTPUT_DIR, "reports") os.makedirs(reports_dir, exist_ok=True) # Generate unique report ID report_id = f"{patient_id}_{uuid.uuid4().hex[:8]}" report_dir = os.path.join(reports_dir, report_id) os.makedirs(report_dir, exist_ok=True) # Parse metadata metadata_dict = json.loads(metadata) # Get analysis results - assuming stored in memory or retrievable # TODO: Implement analysis results storage/retrieval # Construct report data report_data = { "report_id": report_id, "generated_at": datetime.datetime.now().isoformat(), "patient": { "id": patient_id, "exam_date": exam_date, **metadata_dict }, "analysis": { "id": analysis_id, # If the analysis_id is actually an annotated image URL, store it for report embedding "annotated_image_url": analysis_id, # TODO: Add actual analysis results }, "doctor_notes": notes } # Save report data report_json = os.path.join(report_dir, "report.json") with open(report_json, "w", encoding="utf-8") as f: json.dump(report_data, f, indent=2, ensure_ascii=False) # We'll create PDF later (after parsing analysis_summary and resolving # any annotated/input image). Initialize pdf_url here. pdf_url = None # Parse analysis_summary to get AI results try: ai_summary = json.loads(analysis_summary) if analysis_summary else {} except (json.JSONDecodeError, TypeError): ai_summary = {} # Resolve annotated image: prefer AI/analysis annotated image; if none, # save the uploaded input image (if provided) into the report folder # and use that as the embedded image. annotated_img = ai_summary.get('annotated_image_url') or report_data.get("analysis", {}).get("annotated_image_url") or "" annotated_img_full = "" annotated_img_local = None if annotated_img: # If it's an outputs path served by StaticFiles, map to local file if isinstance(annotated_img, str) and annotated_img.startswith('/outputs/'): rel = annotated_img[len('/outputs/'):].lstrip('/') annotated_img_local = os.path.join(OUTPUT_DIR, rel) annotated_img_full = annotated_img else: # keep absolute URLs as-is for HTML annotated_img_full = annotated_img if isinstance(annotated_img, str) else '' # If no annotated image provided, but an input file was uploaded, save it if not annotated_img_full and file is not None and getattr(file, 'filename', None): try: input_filename = f"input_image{os.path.splitext(file.filename)[1] or '.jpg'}" input_path = os.path.join(report_dir, input_filename) contents = await file.read() with open(input_path, 'wb') as out_f: out_f.write(contents) annotated_img_full = f"/outputs/reports/{report_id}/{input_filename}" annotated_img_local = input_path except Exception as e: print(f"⚠️ Failed to save uploaded input image for report: {e}") # Ensure annotated_img_full has a leading slash if it's a relative path if annotated_img_full and not annotated_img_full.startswith(('http://', 'https://')): annotated_img_full = annotated_img_full if annotated_img_full.startswith('/') else '/' + annotated_img_full # If we have a local annotated image but it's stored in the shared images folder # (e.g. /outputs/images/...), copy it into this report's folder so the HTML/PDF # can reference the image relative to the report directory. This also makes the # image visible when opening report.html directly from disk (file://). try: if annotated_img_local: annotated_img_local_abs = os.path.abspath(annotated_img_local) report_dir_abs = os.path.abspath(report_dir) # If the image is not already in the report directory, copy it there if not os.path.commonpath([annotated_img_local_abs, report_dir_abs]) == report_dir_abs: src_basename = os.path.basename(annotated_img_local_abs) dest_name = f"annotated_{src_basename}" dest_path = os.path.join(report_dir, dest_name) try: shutil.copy2(annotated_img_local_abs, dest_path) annotated_img_local = dest_path annotated_img_full = f"/outputs/reports/{report_id}/{dest_name}" except Exception as e: # If copy fails, keep using the original annotated_img_full (may be served by StaticFiles) print(f"⚠️ Failed to copy annotated image into report folder: {e}") except Exception: pass # Now attempt to create the PDF (passing the local annotated image path # so the PDF writer can embed it). If annotation is remote or not # available, PDF creation will still proceed without the image. if REPORTLAB_AVAILABLE: try: pdf_path = os.path.join(report_dir, "report.pdf") create_designed_pdf(pdf_path, report_data, analysis_summary, annotated_img_local) pdf_url = f"/outputs/reports/{report_id}/report.pdf" except Exception as e: print(f"Error creating designed PDF: {e}") pdf_url = None # Determine report type based on analysis summary or model used model_used = ai_summary.get('model_used', '') if 'YOLO' in model_used or 'yolo' in str(analysis_id).lower(): report_type = "Cytology" report_title = "Cytology Report" elif 'MWT' in model_used or 'mwt' in str(model_used).lower() or 'mwt' in str(analysis_id).lower(): # MWT is a cytology classifier — use clearer title report_type = "Cytology" report_title = "Cytology Analysis Report" elif 'CIN' in model_used or 'cin' in str(analysis_id).lower() or 'colpo' in str(analysis_id).lower(): report_type = "Colposcopy" report_title = "Colposcopy Report" elif 'histo' in str(analysis_id).lower() or 'histopathology' in model_used.lower(): report_type = "Histopathology" report_title = "Histopathology Report" else: # Default fallback report_type = "Cytology" report_title = "Medical Analysis Report" # Build analysis metrics HTML based on report type if report_type == "Histopathology": # For histopathology, show Benign/Malignant confidence from the confidence dict confidence_dict = ai_summary.get('confidence', {}) benign_conf = confidence_dict.get('Benign', 0) * 100 if isinstance(confidence_dict, dict) else 0 malignant_conf = confidence_dict.get('Malignant', 0) * 100 if isinstance(confidence_dict, dict) else 0 analysis_metrics_html = f"""
| Patient ID | {patient_id} |
|---|---|
| Exam Date | {exam_date} |
| Physician | {metadata_dict.get('physician', 'N/A')} |
| Facility | {metadata_dict.get('facility', 'N/A')} |
| Specimen Type | {metadata_dict.get('specimen_type', 'N/A')} |
|---|---|
| Clinical History | {metadata_dict.get('clinical_history', 'N/A')} |
| Collected | {exam_date} |
| Reported | {generated_time} |
{notes or 'No additional notes provided.'}
Continue routine screening as per standard guidelines. Follow up as directed by your physician.