Spaces:
Running
Running
| from collections.abc import Callable | |
| from functools import wraps | |
| from time import sleep | |
| import asyncio | |
| def async_tasks(*tasks): | |
| async def gather(*t): | |
| t = [await _ for _ in t] | |
| return await asyncio.gather(*t) | |
| loop = asyncio.new_event_loop() | |
| results = loop.run_until_complete(gather(*tasks)) | |
| loop.stop() | |
| loop.close() | |
| return results | |
| def retry_on_status( | |
| num_retries: int = 3, | |
| backoff_factor: float = 0.5, | |
| max_backoff: float | None = None, | |
| retry_statuses: tuple[int, ...] = (501, 503) | |
| ): | |
| """ | |
| Retry decorator for functions making httpx requests. | |
| Retries on specific HTTP status codes with exponential backoff. | |
| Args: | |
| num_retries (int): Max number of retries. | |
| backoff_factor (float): Multiplier for delay (e.g., 0.5, 1, etc.). | |
| max_backoff (float, optional): Cap on the backoff delay in seconds. | |
| retry_statuses (tuple): HTTP status codes to retry on. | |
| """ | |
| def decorator(func: Callable): | |
| if asyncio.iscoroutinefunction(func): | |
| # Async version | |
| async def async_wrapper(*args, **kwargs): | |
| for attempt in range(num_retries + 1): | |
| response = await func(*args, **kwargs) | |
| if response.status_code not in retry_statuses: | |
| return response | |
| if attempt < num_retries: | |
| delay = min(backoff_factor * (2 ** attempt), max_backoff or float('inf')) | |
| await asyncio.sleep(delay) | |
| return response | |
| return async_wrapper | |
| # Sync version | |
| def sync_wrapper(*args, **kwargs): | |
| for attempt in range(num_retries + 1): | |
| response = func(*args, **kwargs) | |
| if response.status_code not in retry_statuses: | |
| return response | |
| if attempt < num_retries: | |
| delay = min(backoff_factor * (2 ** attempt), max_backoff or float('inf')) | |
| sleep(delay) | |
| return response | |
| return sync_wrapper | |
| return decorator | |