Spaces:
Paused
Paused
| """ | |
| Router cooldown handlers | |
| - _set_cooldown_deployments: puts a deployment in the cooldown list | |
| - get_cooldown_deployments: returns the list of deployments in the cooldown list | |
| - async_get_cooldown_deployments: ASYNC: returns the list of deployments in the cooldown list | |
| """ | |
| import asyncio | |
| from typing import TYPE_CHECKING, Any, List, Optional, Union | |
| import litellm | |
| from litellm._logging import verbose_router_logger | |
| from litellm.constants import ( | |
| DEFAULT_COOLDOWN_TIME_SECONDS, | |
| DEFAULT_FAILURE_THRESHOLD_PERCENT, | |
| SINGLE_DEPLOYMENT_TRAFFIC_FAILURE_THRESHOLD, | |
| ) | |
| from litellm.router_utils.cooldown_callbacks import router_cooldown_event_callback | |
| from .router_callbacks.track_deployment_metrics import ( | |
| get_deployment_failures_for_current_minute, | |
| get_deployment_successes_for_current_minute, | |
| ) | |
| if TYPE_CHECKING: | |
| from opentelemetry.trace import Span as _Span | |
| from litellm.router import Router as _Router | |
| LitellmRouter = _Router | |
| Span = Union[_Span, Any] | |
| else: | |
| LitellmRouter = Any | |
| Span = Any | |
| def _is_cooldown_required( | |
| litellm_router_instance: LitellmRouter, | |
| model_id: str, | |
| exception_status: Union[str, int], | |
| exception_str: Optional[str] = None, | |
| ) -> bool: | |
| """ | |
| A function to determine if a cooldown is required based on the exception status. | |
| Parameters: | |
| model_id (str) The id of the model in the model list | |
| exception_status (Union[str, int]): The status of the exception. | |
| Returns: | |
| bool: True if a cooldown is required, False otherwise. | |
| """ | |
| try: | |
| ignored_strings = ["APIConnectionError"] | |
| if ( | |
| exception_str is not None | |
| ): # don't cooldown on litellm api connection errors errors | |
| for ignored_string in ignored_strings: | |
| if ignored_string in exception_str: | |
| return False | |
| if isinstance(exception_status, str): | |
| exception_status = int(exception_status) | |
| if exception_status >= 400 and exception_status < 500: | |
| if exception_status == 429: | |
| # Cool down 429 Rate Limit Errors | |
| return True | |
| elif exception_status == 401: | |
| # Cool down 401 Auth Errors | |
| return True | |
| elif exception_status == 408: | |
| return True | |
| elif exception_status == 404: | |
| return True | |
| else: | |
| # Do NOT cool down all other 4XX Errors | |
| return False | |
| else: | |
| # should cool down for all other errors | |
| return True | |
| except Exception: | |
| # Catch all - if any exceptions default to cooling down | |
| return True | |
| def _should_run_cooldown_logic( | |
| litellm_router_instance: LitellmRouter, | |
| deployment: Optional[str], | |
| exception_status: Union[str, int], | |
| original_exception: Any, | |
| ) -> bool: | |
| """ | |
| Helper that decides if cooldown logic should be run | |
| Returns False if cooldown logic should not be run | |
| Does not run cooldown logic when: | |
| - router.disable_cooldowns is True | |
| - deployment is None | |
| - _is_cooldown_required() returns False | |
| - deployment is in litellm_router_instance.provider_default_deployment_ids | |
| - exception_status is not one that should be immediately retried (e.g. 401) | |
| """ | |
| if ( | |
| deployment is None | |
| or litellm_router_instance.get_model_group(id=deployment) is None | |
| ): | |
| verbose_router_logger.debug( | |
| "Should Not Run Cooldown Logic: deployment id is none or model group can't be found." | |
| ) | |
| return False | |
| if litellm_router_instance.disable_cooldowns: | |
| verbose_router_logger.debug( | |
| "Should Not Run Cooldown Logic: disable_cooldowns is True" | |
| ) | |
| return False | |
| if deployment is None: | |
| verbose_router_logger.debug("Should Not Run Cooldown Logic: deployment is None") | |
| return False | |
| if not _is_cooldown_required( | |
| litellm_router_instance=litellm_router_instance, | |
| model_id=deployment, | |
| exception_status=exception_status, | |
| exception_str=str(original_exception), | |
| ): | |
| verbose_router_logger.debug( | |
| "Should Not Run Cooldown Logic: _is_cooldown_required returned False" | |
| ) | |
| return False | |
| if deployment in litellm_router_instance.provider_default_deployment_ids: | |
| verbose_router_logger.debug( | |
| "Should Not Run Cooldown Logic: deployment is in provider_default_deployment_ids" | |
| ) | |
| return False | |
| return True | |
| def _should_cooldown_deployment( | |
| litellm_router_instance: LitellmRouter, | |
| deployment: str, | |
| exception_status: Union[str, int], | |
| original_exception: Any, | |
| ) -> bool: | |
| """ | |
| Helper that decides if a deployment should be put in cooldown | |
| Returns True if the deployment should be put in cooldown | |
| Returns False if the deployment should not be put in cooldown | |
| Deployment is put in cooldown when: | |
| - v2 logic (Current): | |
| cooldown if: | |
| - got a 429 error from LLM API | |
| - if %fails/%(successes + fails) > ALLOWED_FAILURE_RATE_PER_MINUTE | |
| - got 401 Auth error, 404 NotFounder - checked by litellm._should_retry() | |
| - v1 logic (Legacy): if allowed fails or allowed fail policy set, coolsdown if num fails in this minute > allowed fails | |
| """ | |
| ## BASE CASE - single deployment | |
| model_group = litellm_router_instance.get_model_group(id=deployment) | |
| is_single_deployment_model_group = False | |
| if model_group is not None and len(model_group) == 1: | |
| is_single_deployment_model_group = True | |
| if ( | |
| litellm_router_instance.allowed_fails_policy is None | |
| and _is_allowed_fails_set_on_router( | |
| litellm_router_instance=litellm_router_instance | |
| ) | |
| is False | |
| ): | |
| num_successes_this_minute = get_deployment_successes_for_current_minute( | |
| litellm_router_instance=litellm_router_instance, deployment_id=deployment | |
| ) | |
| num_fails_this_minute = get_deployment_failures_for_current_minute( | |
| litellm_router_instance=litellm_router_instance, deployment_id=deployment | |
| ) | |
| total_requests_this_minute = num_successes_this_minute + num_fails_this_minute | |
| percent_fails = 0.0 | |
| if total_requests_this_minute > 0: | |
| percent_fails = num_fails_this_minute / ( | |
| num_successes_this_minute + num_fails_this_minute | |
| ) | |
| verbose_router_logger.debug( | |
| "percent fails for deployment = %s, percent fails = %s, num successes = %s, num fails = %s", | |
| deployment, | |
| percent_fails, | |
| num_successes_this_minute, | |
| num_fails_this_minute, | |
| ) | |
| exception_status_int = cast_exception_status_to_int(exception_status) | |
| if exception_status_int == 429 and not is_single_deployment_model_group: | |
| return True | |
| elif ( | |
| percent_fails == 1.0 | |
| and total_requests_this_minute | |
| >= SINGLE_DEPLOYMENT_TRAFFIC_FAILURE_THRESHOLD | |
| ): | |
| # Cooldown if all requests failed and we have reasonable traffic | |
| return True | |
| elif ( | |
| percent_fails > DEFAULT_FAILURE_THRESHOLD_PERCENT | |
| and not is_single_deployment_model_group # by default we should avoid cooldowns on single deployment model groups | |
| ): | |
| return True | |
| elif ( | |
| litellm._should_retry( | |
| status_code=cast_exception_status_to_int(exception_status) | |
| ) | |
| is False | |
| ): | |
| return True | |
| return False | |
| else: | |
| return should_cooldown_based_on_allowed_fails_policy( | |
| litellm_router_instance=litellm_router_instance, | |
| deployment=deployment, | |
| original_exception=original_exception, | |
| ) | |
| return False | |
| def _set_cooldown_deployments( | |
| litellm_router_instance: LitellmRouter, | |
| original_exception: Any, | |
| exception_status: Union[str, int], | |
| deployment: Optional[str] = None, | |
| time_to_cooldown: Optional[float] = None, | |
| ) -> bool: | |
| """ | |
| Add a model to the list of models being cooled down for that minute, if it exceeds the allowed fails / minute | |
| or | |
| the exception is not one that should be immediately retried (e.g. 401) | |
| Returns: | |
| - True if the deployment should be put in cooldown | |
| - False if the deployment should not be put in cooldown | |
| """ | |
| verbose_router_logger.debug("checks 'should_run_cooldown_logic'") | |
| if ( | |
| _should_run_cooldown_logic( | |
| litellm_router_instance, deployment, exception_status, original_exception | |
| ) | |
| is False | |
| or deployment is None | |
| ): | |
| verbose_router_logger.debug("should_run_cooldown_logic returned False") | |
| return False | |
| exception_status_int = cast_exception_status_to_int(exception_status) | |
| verbose_router_logger.debug(f"Attempting to add {deployment} to cooldown list") | |
| cooldown_time = litellm_router_instance.cooldown_time or 1 | |
| if time_to_cooldown is not None: | |
| cooldown_time = time_to_cooldown | |
| if _should_cooldown_deployment( | |
| litellm_router_instance, deployment, exception_status, original_exception | |
| ): | |
| litellm_router_instance.cooldown_cache.add_deployment_to_cooldown( | |
| model_id=deployment, | |
| original_exception=original_exception, | |
| exception_status=exception_status_int, | |
| cooldown_time=cooldown_time, | |
| ) | |
| # Trigger cooldown callback handler | |
| asyncio.create_task( | |
| router_cooldown_event_callback( | |
| litellm_router_instance=litellm_router_instance, | |
| deployment_id=deployment, | |
| exception_status=exception_status, | |
| cooldown_time=cooldown_time, | |
| ) | |
| ) | |
| return True | |
| return False | |
| async def _async_get_cooldown_deployments( | |
| litellm_router_instance: LitellmRouter, | |
| parent_otel_span: Optional[Span], | |
| ) -> List[str]: | |
| """ | |
| Async implementation of '_get_cooldown_deployments' | |
| """ | |
| model_ids = litellm_router_instance.get_model_ids() | |
| cooldown_models = ( | |
| await litellm_router_instance.cooldown_cache.async_get_active_cooldowns( | |
| model_ids=model_ids, | |
| parent_otel_span=parent_otel_span, | |
| ) | |
| ) | |
| cached_value_deployment_ids = [] | |
| if ( | |
| cooldown_models is not None | |
| and isinstance(cooldown_models, list) | |
| and len(cooldown_models) > 0 | |
| and isinstance(cooldown_models[0], tuple) | |
| ): | |
| cached_value_deployment_ids = [cv[0] for cv in cooldown_models] | |
| verbose_router_logger.debug(f"retrieve cooldown models: {cooldown_models}") | |
| return cached_value_deployment_ids | |
| async def _async_get_cooldown_deployments_with_debug_info( | |
| litellm_router_instance: LitellmRouter, | |
| parent_otel_span: Optional[Span], | |
| ) -> List[tuple]: | |
| """ | |
| Async implementation of '_get_cooldown_deployments' | |
| """ | |
| model_ids = litellm_router_instance.get_model_ids() | |
| cooldown_models = ( | |
| await litellm_router_instance.cooldown_cache.async_get_active_cooldowns( | |
| model_ids=model_ids, parent_otel_span=parent_otel_span | |
| ) | |
| ) | |
| verbose_router_logger.debug(f"retrieve cooldown models: {cooldown_models}") | |
| return cooldown_models | |
| def _get_cooldown_deployments( | |
| litellm_router_instance: LitellmRouter, parent_otel_span: Optional[Span] | |
| ) -> List[str]: | |
| """ | |
| Get the list of models being cooled down for this minute | |
| """ | |
| # get the current cooldown list for that minute | |
| # ---------------------- | |
| # Return cooldown models | |
| # ---------------------- | |
| model_ids = litellm_router_instance.get_model_ids() | |
| cooldown_models = litellm_router_instance.cooldown_cache.get_active_cooldowns( | |
| model_ids=model_ids, parent_otel_span=parent_otel_span | |
| ) | |
| cached_value_deployment_ids = [] | |
| if ( | |
| cooldown_models is not None | |
| and isinstance(cooldown_models, list) | |
| and len(cooldown_models) > 0 | |
| and isinstance(cooldown_models[0], tuple) | |
| ): | |
| cached_value_deployment_ids = [cv[0] for cv in cooldown_models] | |
| return cached_value_deployment_ids | |
| def should_cooldown_based_on_allowed_fails_policy( | |
| litellm_router_instance: LitellmRouter, | |
| deployment: str, | |
| original_exception: Any, | |
| ) -> bool: | |
| """ | |
| Check if fails are within the allowed limit and update the number of fails. | |
| Returns: | |
| - True if fails exceed the allowed limit (should cooldown) | |
| - False if fails are within the allowed limit (should not cooldown) | |
| """ | |
| allowed_fails = ( | |
| litellm_router_instance.get_allowed_fails_from_policy( | |
| exception=original_exception, | |
| ) | |
| or litellm_router_instance.allowed_fails | |
| ) | |
| cooldown_time = ( | |
| litellm_router_instance.cooldown_time or DEFAULT_COOLDOWN_TIME_SECONDS | |
| ) | |
| current_fails = litellm_router_instance.failed_calls.get_cache(key=deployment) or 0 | |
| updated_fails = current_fails + 1 | |
| if updated_fails > allowed_fails: | |
| return True | |
| else: | |
| litellm_router_instance.failed_calls.set_cache( | |
| key=deployment, value=updated_fails, ttl=cooldown_time | |
| ) | |
| return False | |
| def _is_allowed_fails_set_on_router( | |
| litellm_router_instance: LitellmRouter, | |
| ) -> bool: | |
| """ | |
| Check if Router.allowed_fails is set or is Non-default Value | |
| Returns: | |
| - True if Router.allowed_fails is set or is Non-default Value | |
| - False if Router.allowed_fails is None or is Default Value | |
| """ | |
| if litellm_router_instance.allowed_fails is None: | |
| return False | |
| if litellm_router_instance.allowed_fails != litellm.allowed_fails: | |
| return True | |
| return False | |
| def cast_exception_status_to_int(exception_status: Union[str, int]) -> int: | |
| if isinstance(exception_status, str): | |
| try: | |
| exception_status = int(exception_status) | |
| except Exception: | |
| verbose_router_logger.debug( | |
| f"Unable to cast exception status to int {exception_status}. Defaulting to status=500." | |
| ) | |
| exception_status = 500 | |
| return exception_status | |