Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Sundew Diabetes Watch — Streamlit App (multilingual, model backends, Sundew v0.6/v0.7 compatible) | |
| - Multilingual UI: English, French, Swahili, Hausa (DeepTranslate API or deep_translator fallback) | |
| - Model backends: Demo(LogReg), XGBoost(.json), PyTorch TorchScript(.pt/.pth), ONNX(.onnx) | |
| - Sundew selective-activation gate with compatibility wrapper across package versions | |
| - Robust timestamp parsing (handles tz-aware), ROC calculation, KPIs, charts, and alerts | |
| - Research prototype — not medical advice | |
| """ | |
| from __future__ import annotations | |
| import math | |
| import os | |
| import inspect | |
| from dataclasses import dataclass | |
| from typing import Dict, Tuple, Optional, Callable | |
| import numpy as np | |
| import pandas as pd | |
| import streamlit as st | |
| # ------------------------------ Sundew import (tolerant) ------------------------------ | |
| try: | |
| from sundew import SundewAlgorithm # provided by sundew-algorithms | |
| _HAS_SUNDEW = True | |
| except Exception: | |
| SundewAlgorithm = None # type: ignore | |
| _HAS_SUNDEW = False | |
| # ------------------------------ Optional model backends ------------------------------ | |
| _HAS_XGB = False | |
| try: | |
| import xgboost as xgb # type: ignore | |
| _HAS_XGB = True | |
| except Exception: | |
| pass | |
| _HAS_TORCH = False | |
| try: | |
| import torch # type: ignore | |
| _HAS_TORCH = True | |
| except Exception: | |
| pass | |
| _HAS_ONNX = False | |
| try: | |
| import onnxruntime as ort # type: ignore | |
| _HAS_ONNX = True | |
| except Exception: | |
| pass | |
| # ------------------------------ Translation utils ------------------------------ | |
| import requests | |
| from deep_translator import GoogleTranslator | |
| DT_KEY = os.getenv("DEEPTRANSLATE_API_KEY", "").strip() | |
| DT_ENDPOINT = os.getenv( | |
| "DEEPTRANSLATE_ENDPOINT", | |
| "https://deep-translate1.p.rapidapi.com/language/translate/v2", | |
| ).strip() | |
| def _translate_deeptranslate(text: str, target_lang: str, source_lang: str = "en") -> str: | |
| """Translate via DeepTranslate (RapidAPI-style). Caches results.""" | |
| if not DT_KEY: | |
| raise RuntimeError("Missing DEEPTRANSLATE_API_KEY") | |
| headers = { | |
| "content-type": "application/json", | |
| "X-RapidAPI-Key": DT_KEY, | |
| "X-RapidAPI-Host": "deep-translate1.p.rapidapi.com", | |
| } | |
| payload = {"q": text, "source": source_lang, "target": target_lang} | |
| r = requests.post(DT_ENDPOINT, json=payload, headers=headers, timeout=10) | |
| r.raise_for_status() | |
| data = r.json() | |
| return data.get("data", {}).get("translations", {}).get("translatedText", text) | |
| def _translate_fallback(text: str, target_lang: str, source_lang: str = "en") -> str: | |
| """Fallback using deep_translator (Google).""" | |
| try: | |
| return GoogleTranslator(source=source_lang, target=target_lang).translate(text) | |
| except Exception: | |
| return text | |
| _translation_cache: Dict[Tuple[str, str], str] = {} | |
| def tr(text: str, target_lang: str, source_lang: str = "en") -> str: | |
| """Translate with DeepTranslate if key set, else fallback, with an in-session cache.""" | |
| key = (text, target_lang) | |
| if key in _translation_cache: | |
| return _translation_cache[key] | |
| if target_lang.lower() in ("en", "eng", "english"): | |
| _translation_cache[key] = text | |
| return text | |
| try: | |
| out = _translate_deeptranslate(text, target_lang, source_lang) | |
| except Exception: | |
| out = _translate_fallback(text, target_lang, source_lang) | |
| _translation_cache[key] = out | |
| return out | |
| LANGS = { | |
| "English": "en", | |
| "Français (French)": "fr", | |
| "Kiswahili (Swahili)": "sw", | |
| "Hausa": "ha", | |
| } | |
| # ------------------------------ Sundew wrapper (v0.6 + v0.7) ------------------------------ | |
| class SundewGate: | |
| target_activation: float = 0.25 | |
| temperature: float = 0.08 | |
| mode: str = "tuned_v2" | |
| def __post_init__(self): | |
| self.sd = None | |
| if _HAS_SUNDEW and SundewAlgorithm is not None: | |
| cfg = { | |
| "target_activation": self.target_activation, | |
| "temperature": self.temperature, | |
| "mode": self.mode, | |
| } | |
| try: | |
| sig = inspect.signature(SundewAlgorithm) | |
| if "config" in sig.parameters: | |
| # 0.7.x style | |
| try: | |
| self.sd = SundewAlgorithm(config=cfg) | |
| except TypeError: | |
| self.sd = SundewAlgorithm(cfg) # positional | |
| else: | |
| # 0.6.x style kwargs | |
| self.sd = SundewAlgorithm( | |
| target_activation=self.target_activation, | |
| temperature=self.temperature, | |
| mode=self.mode, | |
| ) | |
| except Exception: | |
| pass | |
| # try factory helpers if constructor failed | |
| if self.sd is None: | |
| for factory in ("from_config", "create", "build"): | |
| if hasattr(SundewAlgorithm, factory): | |
| try: | |
| self.sd = getattr(SundewAlgorithm, factory)(cfg) | |
| break | |
| except Exception: | |
| continue | |
| # fallback gate state (keeps app usable even if Sundew not available) | |
| self._tau = 0.5 | |
| self._ema = 0.0 | |
| self._alpha = 0.02 # EMA smoothing | |
| def decide(self, score: float) -> bool: | |
| score = float(max(0.0, min(1.0, score))) | |
| if self.sd is not None: | |
| for method_name in ("decide", "step", "open"): | |
| if hasattr(self.sd, method_name): | |
| try: | |
| return bool(getattr(self.sd, method_name)(score)) | |
| except Exception: | |
| pass | |
| # fallback stochastic logistic gate targeting activation rate | |
| p_open = 1.0 / (1.0 + math.exp(-(score - self._tau) / max(1e-6, self.temperature))) | |
| fired = np.random.rand() < p_open | |
| self._ema = (1 - self._alpha) * self._ema + self._alpha * (1.0 if fired else 0.0) | |
| self._tau += 0.01 * (self.target_activation - self._ema) | |
| self._tau = min(0.95, max(0.05, self._tau)) | |
| return fired | |
| # ------------------------------ Risk scoring ------------------------------ | |
| def compute_lightweight_score(row: pd.Series) -> float: | |
| """Heuristic risk proxy in [0,1] using glucose, rate-of-change, insulin, carbs, heart rate.""" | |
| g = float(row.get("glucose_mgdl", np.nan)) | |
| roc = float(row.get("roc_mgdl_min", 0.0)) | |
| insulin = float(row.get("insulin_units", 0.0)) | |
| carbs = float(row.get("carbs_g", 0.0)) | |
| hr = float(row.get("hr", 0.0)) | |
| low_gap = max(0.0, 80 - g) | |
| high_gap = max(0.0, g - 140) | |
| base = (low_gap + high_gap) / 120.0 # ~[0,1] | |
| roc_term = min(1.0, abs(roc) / 3.0) # 3 mg/dL/min ~ strong trend | |
| insulin_term = min(1.0, insulin / 6.0) * (1.0 if roc < -0.5 else 0.3) | |
| carbs_term = min(1.0, carbs / 50.0) * (1.0 if roc > 0.5 else 0.3) | |
| activity_term = min(1.0, max(0.0, hr - 100) / 60.0) * (1.0 if insulin > 0.5 else 0.2) | |
| score = base + 0.7 * roc_term + 0.5 * insulin_term + 0.4 * carbs_term + 0.3 * activity_term | |
| return float(max(0.0, min(1.0, score))) | |
| # ------------------------------ Heavy model backends ------------------------------ | |
| def build_demo_model(df: pd.DataFrame): | |
| """Session-trained logistic regression demo model (portable).""" | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.pipeline import Pipeline | |
| model = Pipeline([("scaler", StandardScaler()), ("clf", LogisticRegression(max_iter=1000))]) | |
| tmp = df.copy() | |
| # label: 30-min ahead hypo (<70) OR hyper (>180) | |
| tmp["future_glucose"] = tmp["glucose_mgdl"].shift(-6) # assuming 5-min cadence | |
| tmp["label"] = ((tmp["future_glucose"] < 70) | (tmp["future_glucose"] > 180)).astype(int) | |
| tmp = tmp.dropna(subset=["label"]).copy() | |
| X = tmp[["glucose_mgdl", "roc_mgdl_min", "insulin_units", "carbs_g", "hr"]].fillna(0.0).values | |
| y = tmp["label"].values | |
| if len(np.unique(y)) < 2: | |
| # ensure fit works even with degenerate labels | |
| y = np.array([0, 1] * (len(X) // 2 + 1))[: len(X)] | |
| model.fit(X, y) | |
| def _predict(Xarr: np.ndarray) -> float: | |
| try: | |
| return float(model.predict_proba(Xarr)[0, 1]) | |
| except Exception: | |
| return float(model.predict(Xarr)[0]) | |
| return _predict | |
| def load_xgb_predictor(file_bytes: bytes) -> Callable[[np.ndarray], float]: | |
| if not _HAS_XGB: | |
| raise RuntimeError("XGBoost not installed in this environment.") | |
| import tempfile | |
| with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f: | |
| f.write(file_bytes) | |
| path = f.name | |
| booster = xgb.XGBClassifier() | |
| booster.load_model(path) | |
| def _predict(Xarr: np.ndarray) -> float: | |
| return float(booster.predict_proba(Xarr)[0, 1]) | |
| return _predict | |
| def load_torch_predictor(file_bytes: bytes) -> Callable[[np.ndarray], float]: | |
| if not _HAS_TORCH: | |
| raise RuntimeError("PyTorch not installed in this environment.") | |
| import io | |
| model = torch.jit.load(io.BytesIO(file_bytes), map_location="cpu") | |
| model.eval() | |
| def _predict(Xarr: np.ndarray) -> float: | |
| t = torch.tensor(Xarr, dtype=torch.float32) | |
| out = model(t) | |
| # accept logits or probabilities | |
| if out.ndim == 2 and out.shape[1] == 1: | |
| out = out.squeeze(1) | |
| out = torch.sigmoid(out) if (out.ndim == 1 or out.shape[1] == 1) else torch.softmax(out, dim=1)[:, 1] | |
| return float(out[0].cpu().item()) | |
| return _predict | |
| def load_onnx_predictor(file_bytes: bytes) -> Callable[[np.ndarray], float]: | |
| if not _HAS_ONNX: | |
| raise RuntimeError("onnxruntime not installed in this environment.") | |
| import tempfile | |
| with tempfile.NamedTemporaryFile(suffix=".onnx", delete=False) as f: | |
| f.write(file_bytes) | |
| path = f.name | |
| sess = ort.InferenceSession(path, providers=["CPUExecutionProvider"]) | |
| input_name = sess.get_inputs()[0].name | |
| def _predict(Xarr: np.ndarray) -> float: | |
| y = sess.run(None, {input_name: Xarr.astype(np.float32)})[0] | |
| if y.ndim == 2 and y.shape[1] == 2: | |
| return float(y[0, 1]) | |
| if y.ndim == 2 and y.shape[1] == 1: | |
| return float(y[0, 0]) | |
| return float(np.ravel(y)[0]) | |
| return _predict | |
| # ------------------------------ Streamlit UI ------------------------------ | |
| st.set_page_config(page_title="Sundew Diabetes Watch", layout="wide") | |
| # Language selector | |
| lang_name = st.sidebar.selectbox("Language / Lugha / Taal / Harshe", list(LANGS.keys()), index=0) | |
| LANG = LANGS[lang_name] | |
| T = lambda s: tr(s, LANG, "en") | |
| st.title("🌿 " + T("Sundew Diabetes Watch")) | |
| st.caption(T("Energy-aware selective activation for diabetes monitoring — research demo (not medical advice).")) | |
| # File upload / controls | |
| left, right = st.columns([2, 1]) | |
| with left: | |
| uploaded = st.file_uploader( | |
| T("Upload CGM CSV (timestamp, glucose_mgdl, carbs_g, insulin_units, steps, hr)"), | |
| type=["csv"], | |
| ) | |
| use_synth = st.checkbox(T("Use synthetic example if no file uploaded"), value=True) | |
| with right: | |
| target_activation = st.slider(T("Target heavy-activation rate"), 0.05, 0.9, 0.25, 0.01) | |
| temperature = st.slider(T("Gate temperature"), 0.02, 0.5, 0.08, 0.01) | |
| mode = st.selectbox(T("Sundew mode"), ["tuned_v2", "conservative", "aggressive", "auto_tuned"], index=0) | |
| # Backend selector with stable internal keys | |
| backend_options = [ | |
| ("demo", T("Demo (Logistic Regression)")), | |
| ("xgb", "XGBoost"), | |
| ("torch", "PyTorch"), | |
| ("onnx", "ONNX"), | |
| ] | |
| backend_label = st.sidebar.selectbox(T("Model backend"), [lbl for _, lbl in backend_options], index=0) | |
| BACKEND_KEY = next(k for k, lbl in backend_options if lbl == backend_label) | |
| model_file = None | |
| if BACKEND_KEY in ("xgb", "torch", "onnx"): | |
| model_file = st.sidebar.file_uploader(T("Upload trained model file"), type=["json", "bin", "pt", "pth", "onnx"], key="model") | |
| # ------------------------------ Load/synthesize data ------------------------------ | |
| if uploaded is not None: | |
| df = pd.read_csv(uploaded) | |
| else: | |
| if not use_synth: | |
| st.stop() | |
| rng = np.random.default_rng(7) | |
| n = 600 # ~50 hours if 5-min cadence | |
| t0 = pd.Timestamp.utcnow().floor("min") | |
| times = [t0 + pd.Timedelta(minutes=5 * i) for i in range(n)] | |
| base = 120 + 25 * np.sin(np.linspace(0, 10 * np.pi, n)) | |
| noise = rng.normal(0, 10, n) | |
| meals = (rng.random(n) < 0.04).astype(float) * rng.normal(45, 15, n).clip(0, 120) | |
| insulin = (rng.random(n) < 0.03).astype(float) * rng.normal(4, 1.2, n).clip(0, 8) | |
| steps = rng.integers(0, 150, size=n) | |
| hr = 70 + (steps > 80) * rng.integers(30, 60, size=n) | |
| glucose = base + noise + 0.3 * meals - 0.8 * insulin | |
| df = pd.DataFrame( | |
| { | |
| "timestamp": times, | |
| "glucose_mgdl": np.round(glucose, 1), | |
| "carbs_g": np.round(meals, 1), | |
| "insulin_units": np.round(insulin, 1), | |
| "steps": steps, | |
| "hr": hr, | |
| } | |
| ) | |
| # Robust timestamp parsing (handles tz-aware, strings, epoch) | |
| from pandas.api.types import is_datetime64_any_dtype | |
| if "timestamp" not in df.columns: | |
| st.error(T("CSV must include a 'timestamp' column.")) | |
| st.stop() | |
| if not is_datetime64_any_dtype(df["timestamp"]): | |
| df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True, errors="coerce") | |
| # Localize if naive | |
| if getattr(df["timestamp"].dt, "tz", None) is None: | |
| df["timestamp"] = df["timestamp"].dt.tz_localize("UTC") | |
| df = df.sort_values("timestamp").reset_index(drop=True) | |
| # Rate-of-change mg/dL per minute | |
| df["dt_min"] = df["timestamp"].diff().dt.total_seconds() / 60.0 | |
| df["glucose_prev"] = df["glucose_mgdl"].shift(1) | |
| df["roc_mgdl_min"] = (df["glucose_mgdl"] - df["glucose_prev"]) / df["dt_min"] | |
| df["roc_mgdl_min"] = df["roc_mgdl_min"].replace([np.inf, -np.inf], 0.0).fillna(0.0) | |
| # ------------------------------ Heavy predictor selection ------------------------------ | |
| predict_proba: Optional[Callable[[np.ndarray], float]] = None | |
| header_note = "" | |
| if BACKEND_KEY == "demo": | |
| predict_proba = build_demo_model(df) | |
| header_note = T("Demo model trains per session for portability.") | |
| elif BACKEND_KEY == "xgb" and model_file is not None: | |
| try: | |
| predict_proba = load_xgb_predictor(model_file.read()) | |
| header_note = T("XGBoost model loaded from file.") | |
| except Exception as e: | |
| st.warning(T("Could not load XGBoost model; falling back to Demo.")) | |
| predict_proba = build_demo_model(df) | |
| header_note = T("Demo model used (no external file).") | |
| elif BACKEND_KEY == "torch" and model_file is not None: | |
| try: | |
| predict_proba = load_torch_predictor(model_file.read()) | |
| header_note = T("PyTorch TorchScript model loaded.") | |
| except Exception: | |
| st.warning(T("Could not load PyTorch model; falling back to Demo.")) | |
| predict_proba = build_demo_model(df) | |
| header_note = T("Demo model used (no external file).") | |
| elif BACKEND_KEY == "onnx" and model_file is not None: | |
| try: | |
| predict_proba = load_onnx_predictor(model_file.read()) | |
| header_note = T("ONNX model loaded via onnxruntime.") | |
| except Exception: | |
| st.warning(T("Could not load ONNX model; falling back to Demo.")) | |
| predict_proba = build_demo_model(df) | |
| header_note = T("Demo model used (no external file).") | |
| else: | |
| st.warning(T("Selected backend requires a model file. Falling back to Demo.")) | |
| predict_proba = build_demo_model(df) | |
| header_note = T("Demo model used (no external file).") | |
| st.info(header_note) | |
| # ------------------------------ Gate + streaming loop ------------------------------ | |
| gate = SundewGate(target_activation=target_activation, temperature=temperature, mode=mode) | |
| def make_X(row: pd.Series) -> np.ndarray: | |
| return np.array( | |
| [ | |
| [ | |
| row.get("glucose_mgdl", 0.0), | |
| row.get("roc_mgdl_min", 0.0), | |
| row.get("insulin_units", 0.0), | |
| row.get("carbs_g", 0.0), | |
| row.get("hr", 0.0), | |
| ] | |
| ], | |
| dtype=np.float32, | |
| ) | |
| records = [] | |
| alerts = [] | |
| for _, row in df.iterrows(): | |
| score = compute_lightweight_score(row) | |
| open_gate = gate.decide(score) | |
| decision = "SKIP" | |
| proba = None | |
| if open_gate and predict_proba is not None: | |
| X = make_X(row) | |
| try: | |
| proba = float(predict_proba(X)) | |
| except Exception: | |
| proba = None | |
| decision = "RUN" | |
| if proba is not None and proba >= 0.6: | |
| alerts.append( | |
| { | |
| "timestamp": row["timestamp"], | |
| "glucose": row["glucose_mgdl"], | |
| "risk_proba": proba, | |
| "note": T("⚠ Elevated 30-min risk — please check CGM and plan carbs/insulin."), | |
| } | |
| ) | |
| records.append( | |
| { | |
| "timestamp": row["timestamp"], | |
| "glucose": row["glucose_mgdl"], | |
| "roc": row["roc_mgdl_min"], | |
| "score": score, | |
| "gate": decision, | |
| "risk_proba": proba, | |
| } | |
| ) | |
| out = pd.DataFrame(records) | |
| events = len(out) | |
| activations = int((out["gate"] == "RUN").sum()) | |
| rate = activations / max(events, 1) | |
| c1, c2, c3 = st.columns(3) | |
| c1.metric(T("Events"), f"{events}") | |
| c2.metric(T("Heavy activations"), f"{activations}") | |
| c3.metric(T("Activation rate"), f"{rate:.2%}") | |
| st.line_chart(out.set_index("timestamp")["glucose"], height=220) | |
| st.line_chart(out.set_index("timestamp")["score"], height=220) | |
| st.subheader(T("Decisions (tail)")) | |
| st.dataframe(out.tail(50)) | |
| st.subheader(T("Alerts")) | |
| if alerts: | |
| st.dataframe(pd.DataFrame(alerts)) | |
| else: | |
| st.info(T("No high-risk alerts triggered in this window.")) | |
| # Footer: show Sundew version & engine status | |
| try: | |
| from importlib.metadata import version as _ver | |
| _sundew_ver = _ver("sundew-algorithms") | |
| except Exception: | |
| _sundew_ver = "unknown" | |
| engine_txt = f"sundew-algorithms {_sundew_ver}" if _HAS_SUNDEW else T("fallback gate (install sundew-algorithms)") | |
| st.caption(T("Engine: ") + engine_txt) | |