"""Utility caching primitives used across the demo application.""" from __future__ import annotations import asyncio import time from dataclasses import dataclass from threading import Lock from typing import Awaitable, Callable, Dict, Generic, Hashable, Optional, TypeVar T = TypeVar("T") class CacheUnavailableError(RuntimeError): """Raised when cached resource is temporarily unavailable.""" def __init__(self, message: str, retry_in: float): super().__init__(message) self.retry_in = max(retry_in, 0.0) @dataclass class _CacheRecord(Generic[T]): value: Optional[T] expires_at: float error_until: float error_message: Optional[str] class AsyncTTLCache(Generic[T]): """Simple async-aware TTL cache with cooldown on failures.""" def __init__(self, ttl: float, retry_after: float): self.ttl = ttl self.retry_after = retry_after self._store: Dict[Hashable, _CacheRecord[T]] = {} self._locks: Dict[Hashable, asyncio.Lock] = {} self._global_lock = asyncio.Lock() async def get(self, key: Hashable, loader: Callable[[], Awaitable[T]]) -> T: now = time.monotonic() record = self._store.get(key) if record: if record.value is not None and now < record.expires_at: return record.value if record.error_message and now < record.error_until: raise CacheUnavailableError( record.error_message, record.error_until - now, ) lock = await self._get_lock(key) async with lock: now = time.monotonic() record = self._store.get(key) if record: if record.value is not None and now < record.expires_at: return record.value if record.error_message and now < record.error_until: raise CacheUnavailableError( record.error_message, record.error_until - now, ) try: value = await loader() except CacheUnavailableError as exc: cooldown = max(exc.retry_in, self.retry_after) message = str(exc) or "Resource unavailable" self._store[key] = _CacheRecord( value=None, expires_at=0.0, error_until=now + cooldown, error_message=message, ) raise CacheUnavailableError(message, cooldown) from exc except Exception as exc: # noqa: BLE001 - surface upstream message = str(exc) or "Source request failed" self._store[key] = _CacheRecord( value=None, expires_at=0.0, error_until=now + self.retry_after, error_message=message, ) raise CacheUnavailableError(message, self.retry_after) from exc else: self._store[key] = _CacheRecord( value=value, expires_at=now + self.ttl, error_until=0.0, error_message=None, ) return value async def _get_lock(self, key: Hashable) -> asyncio.Lock: lock = self._locks.get(key) if lock is not None: return lock async with self._global_lock: lock = self._locks.get(key) if lock is None: lock = asyncio.Lock() self._locks[key] = lock return lock class TTLCache(Generic[T]): """Synchronous TTL cache with cooldown control.""" def __init__(self, ttl: float, retry_after: float): self.ttl = ttl self.retry_after = retry_after self._store: Dict[Hashable, _CacheRecord[T]] = {} self._lock = Lock() def get(self, key: Hashable, loader: Callable[[], T]) -> T: now = time.monotonic() record = self._store.get(key) if record: if record.value is not None and now < record.expires_at: return record.value if record.error_message and now < record.error_until: raise CacheUnavailableError( record.error_message, record.error_until - now, ) with self._lock: now = time.monotonic() record = self._store.get(key) if record: if record.value is not None and now < record.expires_at: return record.value if record.error_message and now < record.error_until: raise CacheUnavailableError( record.error_message, record.error_until - now, ) try: value = loader() except CacheUnavailableError as exc: cooldown = max(exc.retry_in, self.retry_after) message = str(exc) or "Resource unavailable" self._store[key] = _CacheRecord( value=None, expires_at=0.0, error_until=now + cooldown, error_message=message, ) raise CacheUnavailableError(message, cooldown) from exc except Exception as exc: # noqa: BLE001 - propagate for visibility message = str(exc) or "Source request failed" self._store[key] = _CacheRecord( value=None, expires_at=0.0, error_until=now + self.retry_after, error_message=message, ) raise CacheUnavailableError(message, self.retry_after) from exc else: self._store[key] = _CacheRecord( value=value, expires_at=now + self.ttl, error_until=0.0, error_message=None, ) return value