Spaces:
Running
Running
| """Utility caching primitives used across the demo application.""" | |
| from __future__ import annotations | |
| import asyncio | |
| import time | |
| from dataclasses import dataclass | |
| from threading import Lock | |
| from typing import Awaitable, Callable, Dict, Generic, Hashable, Optional, TypeVar | |
| T = TypeVar("T") | |
| class CacheUnavailableError(RuntimeError): | |
| """Raised when cached resource is temporarily unavailable.""" | |
| def __init__(self, message: str, retry_in: float): | |
| super().__init__(message) | |
| self.retry_in = max(retry_in, 0.0) | |
| class _CacheRecord(Generic[T]): | |
| value: Optional[T] | |
| expires_at: float | |
| error_until: float | |
| error_message: Optional[str] | |
| class AsyncTTLCache(Generic[T]): | |
| """Simple async-aware TTL cache with cooldown on failures.""" | |
| def __init__(self, ttl: float, retry_after: float): | |
| self.ttl = ttl | |
| self.retry_after = retry_after | |
| self._store: Dict[Hashable, _CacheRecord[T]] = {} | |
| self._locks: Dict[Hashable, asyncio.Lock] = {} | |
| self._global_lock = asyncio.Lock() | |
| async def get(self, key: Hashable, loader: Callable[[], Awaitable[T]]) -> T: | |
| now = time.monotonic() | |
| record = self._store.get(key) | |
| if record: | |
| if record.value is not None and now < record.expires_at: | |
| return record.value | |
| if record.error_message and now < record.error_until: | |
| raise CacheUnavailableError( | |
| record.error_message, | |
| record.error_until - now, | |
| ) | |
| lock = await self._get_lock(key) | |
| async with lock: | |
| now = time.monotonic() | |
| record = self._store.get(key) | |
| if record: | |
| if record.value is not None and now < record.expires_at: | |
| return record.value | |
| if record.error_message and now < record.error_until: | |
| raise CacheUnavailableError( | |
| record.error_message, | |
| record.error_until - now, | |
| ) | |
| try: | |
| value = await loader() | |
| except CacheUnavailableError as exc: | |
| cooldown = max(exc.retry_in, self.retry_after) | |
| message = str(exc) or "Resource unavailable" | |
| self._store[key] = _CacheRecord( | |
| value=None, | |
| expires_at=0.0, | |
| error_until=now + cooldown, | |
| error_message=message, | |
| ) | |
| raise CacheUnavailableError(message, cooldown) from exc | |
| except Exception as exc: # noqa: BLE001 - surface upstream | |
| message = str(exc) or "Source request failed" | |
| self._store[key] = _CacheRecord( | |
| value=None, | |
| expires_at=0.0, | |
| error_until=now + self.retry_after, | |
| error_message=message, | |
| ) | |
| raise CacheUnavailableError(message, self.retry_after) from exc | |
| else: | |
| self._store[key] = _CacheRecord( | |
| value=value, | |
| expires_at=now + self.ttl, | |
| error_until=0.0, | |
| error_message=None, | |
| ) | |
| return value | |
| async def _get_lock(self, key: Hashable) -> asyncio.Lock: | |
| lock = self._locks.get(key) | |
| if lock is not None: | |
| return lock | |
| async with self._global_lock: | |
| lock = self._locks.get(key) | |
| if lock is None: | |
| lock = asyncio.Lock() | |
| self._locks[key] = lock | |
| return lock | |
| class TTLCache(Generic[T]): | |
| """Synchronous TTL cache with cooldown control.""" | |
| def __init__(self, ttl: float, retry_after: float): | |
| self.ttl = ttl | |
| self.retry_after = retry_after | |
| self._store: Dict[Hashable, _CacheRecord[T]] = {} | |
| self._lock = Lock() | |
| def get(self, key: Hashable, loader: Callable[[], T]) -> T: | |
| now = time.monotonic() | |
| record = self._store.get(key) | |
| if record: | |
| if record.value is not None and now < record.expires_at: | |
| return record.value | |
| if record.error_message and now < record.error_until: | |
| raise CacheUnavailableError( | |
| record.error_message, | |
| record.error_until - now, | |
| ) | |
| with self._lock: | |
| now = time.monotonic() | |
| record = self._store.get(key) | |
| if record: | |
| if record.value is not None and now < record.expires_at: | |
| return record.value | |
| if record.error_message and now < record.error_until: | |
| raise CacheUnavailableError( | |
| record.error_message, | |
| record.error_until - now, | |
| ) | |
| try: | |
| value = loader() | |
| except CacheUnavailableError as exc: | |
| cooldown = max(exc.retry_in, self.retry_after) | |
| message = str(exc) or "Resource unavailable" | |
| self._store[key] = _CacheRecord( | |
| value=None, | |
| expires_at=0.0, | |
| error_until=now + cooldown, | |
| error_message=message, | |
| ) | |
| raise CacheUnavailableError(message, cooldown) from exc | |
| except Exception as exc: # noqa: BLE001 - propagate for visibility | |
| message = str(exc) or "Source request failed" | |
| self._store[key] = _CacheRecord( | |
| value=None, | |
| expires_at=0.0, | |
| error_until=now + self.retry_after, | |
| error_message=message, | |
| ) | |
| raise CacheUnavailableError(message, self.retry_after) from exc | |
| else: | |
| self._store[key] = _CacheRecord( | |
| value=value, | |
| expires_at=now + self.ttl, | |
| error_until=0.0, | |
| error_message=None, | |
| ) | |
| return value | |