"""Client helpers for the external portfolio analytics API.""" import re from typing import Any, Dict, List, Optional import httpx from config import ( CACHE_RETRY_SECONDS, CACHE_TTL_SECONDS, DEBUG, EXTERNAL_API_URL, REQUEST_TIMEOUT, ) from infrastructure.cache import CacheUnavailableError, TTLCache # === UUID detection === UUID_PATTERN = re.compile( r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}" ) def extract_portfolio_id(text: str) -> Optional[str]: """Extract a portfolio UUID from text or URL.""" match = UUID_PATTERN.search(text or "") return match.group(0) if match else None async def _get_json(url: str) -> Dict[str, Any]: """Generic helper to send GET request and parse JSON.""" if DEBUG: print(f"[DEBUG] Requesting URL: {url}") async with httpx.AsyncClient(timeout=REQUEST_TIMEOUT) as client: r = await client.get(url, headers={"User-Agent": "Mozilla/5.0", "Accept": "application/json"}) r.raise_for_status() return r.json() def _parse_metrics(payload: Dict[str, Any]) -> Dict[str, Any]: extended = payload.get("data", {}).get("extended", {}) result: Dict[str, Any] = {} for k, v in extended.items(): if isinstance(v, (int, float)): if k in {"cagr", "alphaRatio", "volatility", "maxDD"}: result[k] = v * 100 else: result[k] = v return result async def fetch_metrics_async(portfolio_id: str) -> Optional[Dict[str, Any]]: """Fetch portfolio metrics (extended data) asynchronously.""" url = f"{EXTERNAL_API_URL}/portfolio/get?portfolioId={portfolio_id}&extended=1" try: data = await _get_json(url) result = _parse_metrics(data) if DEBUG: print(f"[DEBUG] Metrics fetched for {portfolio_id}: {result}") return result except Exception as e: if DEBUG: print(f"[ERROR] fetch_metrics_async: {e}") return None def _get_json_sync(url: str) -> Dict[str, Any]: """Synchronous helper mirroring :func:`_get_json`.""" if DEBUG: print(f"[DEBUG] Requesting URL (sync): {url}") with httpx.Client(timeout=REQUEST_TIMEOUT) as client: r = client.get(url, headers={"User-Agent": "Mozilla/5.0", "Accept": "application/json"}) r.raise_for_status() return r.json() def fetch_metrics(portfolio_id: str) -> Optional[Dict[str, Any]]: """Synchronous helper to fetch metrics for caching loaders.""" url = f"{EXTERNAL_API_URL}/portfolio/get?portfolioId={portfolio_id}&extended=1" try: data = _get_json_sync(url) result = _parse_metrics(data) if DEBUG: print(f"[DEBUG] Metrics fetched (sync) for {portfolio_id}: {result}") return result except Exception as e: if DEBUG: print(f"[ERROR] fetch_metrics: {e}") return None _metrics_cache = TTLCache(CACHE_TTL_SECONDS, CACHE_RETRY_SECONDS) def fetch_metrics_cached(portfolio_id: str) -> Dict[str, Any]: """Cached variant with cooldown on upstream failures.""" def _loader() -> Dict[str, Any]: data = fetch_metrics(portfolio_id) if not data: raise CacheUnavailableError( "Metrics temporarily unavailable from upstream API.", CACHE_RETRY_SECONDS, ) return data return _metrics_cache.get(portfolio_id, _loader) async def fetch_absolute_pnl_async(portfolio_id: str) -> Optional[List[Dict[str, Any]]]: """Fetch absolutePnL daily data.""" url = f"{EXTERNAL_API_URL}/portfolio/get?portfolioId={portfolio_id}&extended=1&step=day" try: data = await _get_json(url) pnl = data.get("data", {}).get("extended", {}).get("absolutePnL", []) if not isinstance(pnl, list): if DEBUG: print("[ERROR] absolutePnL is not a list or missing") return None return pnl except Exception as e: if DEBUG: print(f"[ERROR] fetch_absolute_pnl_async: {e}") return None