QAway-to
Restore original tab layout while keeping cached data
f5bade2
"""Utility caching primitives used across the demo application."""
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass
from threading import Lock
from typing import Awaitable, Callable, Dict, Generic, Hashable, Optional, TypeVar
T = TypeVar("T")
class CacheUnavailableError(RuntimeError):
"""Raised when cached resource is temporarily unavailable."""
def __init__(self, message: str, retry_in: float):
super().__init__(message)
self.retry_in = max(retry_in, 0.0)
@dataclass
class _CacheRecord(Generic[T]):
value: Optional[T]
expires_at: float
error_until: float
error_message: Optional[str]
class AsyncTTLCache(Generic[T]):
"""Simple async-aware TTL cache with cooldown on failures."""
def __init__(self, ttl: float, retry_after: float):
self.ttl = ttl
self.retry_after = retry_after
self._store: Dict[Hashable, _CacheRecord[T]] = {}
self._locks: Dict[Hashable, asyncio.Lock] = {}
self._global_lock = asyncio.Lock()
async def get(self, key: Hashable, loader: Callable[[], Awaitable[T]]) -> T:
now = time.monotonic()
record = self._store.get(key)
if record:
if record.value is not None and now < record.expires_at:
return record.value
if record.error_message and now < record.error_until:
raise CacheUnavailableError(
record.error_message,
record.error_until - now,
)
lock = await self._get_lock(key)
async with lock:
now = time.monotonic()
record = self._store.get(key)
if record:
if record.value is not None and now < record.expires_at:
return record.value
if record.error_message and now < record.error_until:
raise CacheUnavailableError(
record.error_message,
record.error_until - now,
)
try:
value = await loader()
except CacheUnavailableError as exc:
cooldown = max(exc.retry_in, self.retry_after)
message = str(exc) or "Resource unavailable"
self._store[key] = _CacheRecord(
value=None,
expires_at=0.0,
error_until=now + cooldown,
error_message=message,
)
raise CacheUnavailableError(message, cooldown) from exc
except Exception as exc: # noqa: BLE001 - surface upstream
message = str(exc) or "Source request failed"
self._store[key] = _CacheRecord(
value=None,
expires_at=0.0,
error_until=now + self.retry_after,
error_message=message,
)
raise CacheUnavailableError(message, self.retry_after) from exc
else:
self._store[key] = _CacheRecord(
value=value,
expires_at=now + self.ttl,
error_until=0.0,
error_message=None,
)
return value
async def _get_lock(self, key: Hashable) -> asyncio.Lock:
lock = self._locks.get(key)
if lock is not None:
return lock
async with self._global_lock:
lock = self._locks.get(key)
if lock is None:
lock = asyncio.Lock()
self._locks[key] = lock
return lock
class TTLCache(Generic[T]):
"""Synchronous TTL cache with cooldown control."""
def __init__(self, ttl: float, retry_after: float):
self.ttl = ttl
self.retry_after = retry_after
self._store: Dict[Hashable, _CacheRecord[T]] = {}
self._lock = Lock()
def get(self, key: Hashable, loader: Callable[[], T]) -> T:
now = time.monotonic()
record = self._store.get(key)
if record:
if record.value is not None and now < record.expires_at:
return record.value
if record.error_message and now < record.error_until:
raise CacheUnavailableError(
record.error_message,
record.error_until - now,
)
with self._lock:
now = time.monotonic()
record = self._store.get(key)
if record:
if record.value is not None and now < record.expires_at:
return record.value
if record.error_message and now < record.error_until:
raise CacheUnavailableError(
record.error_message,
record.error_until - now,
)
try:
value = loader()
except CacheUnavailableError as exc:
cooldown = max(exc.retry_in, self.retry_after)
message = str(exc) or "Resource unavailable"
self._store[key] = _CacheRecord(
value=None,
expires_at=0.0,
error_until=now + cooldown,
error_message=message,
)
raise CacheUnavailableError(message, cooldown) from exc
except Exception as exc: # noqa: BLE001 - propagate for visibility
message = str(exc) or "Source request failed"
self._store[key] = _CacheRecord(
value=None,
expires_at=0.0,
error_until=now + self.retry_after,
error_message=message,
)
raise CacheUnavailableError(message, self.retry_after) from exc
else:
self._store[key] = _CacheRecord(
value=value,
expires_at=now + self.ttl,
error_until=0.0,
error_message=None,
)
return value