Spaces:
Paused
Paused
| """ | |
| Custom Logger that handles batching logic | |
| Use this if you want your logs to be stored in memory and flushed periodically. | |
| """ | |
| import asyncio | |
| import time | |
| from typing import List, Optional | |
| import litellm | |
| from litellm._logging import verbose_logger | |
| from litellm.integrations.custom_logger import CustomLogger | |
| class CustomBatchLogger(CustomLogger): | |
| def __init__( | |
| self, | |
| flush_lock: Optional[asyncio.Lock] = None, | |
| batch_size: Optional[int] = None, | |
| flush_interval: Optional[int] = None, | |
| **kwargs, | |
| ) -> None: | |
| """ | |
| Args: | |
| flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching | |
| """ | |
| self.log_queue: List = [] | |
| self.flush_interval = flush_interval or litellm.DEFAULT_FLUSH_INTERVAL_SECONDS | |
| self.batch_size: int = batch_size or litellm.DEFAULT_BATCH_SIZE | |
| self.last_flush_time = time.time() | |
| self.flush_lock = flush_lock | |
| super().__init__(**kwargs) | |
| async def periodic_flush(self): | |
| while True: | |
| await asyncio.sleep(self.flush_interval) | |
| verbose_logger.debug( | |
| f"CustomLogger periodic flush after {self.flush_interval} seconds" | |
| ) | |
| await self.flush_queue() | |
| async def flush_queue(self): | |
| if self.flush_lock is None: | |
| return | |
| async with self.flush_lock: | |
| if self.log_queue: | |
| verbose_logger.debug( | |
| "CustomLogger: Flushing batch of %s events", len(self.log_queue) | |
| ) | |
| await self.async_send_batch() | |
| self.log_queue.clear() | |
| self.last_flush_time = time.time() | |
| async def async_send_batch(self, *args, **kwargs): | |
| pass | |