Spaces:
Paused
Paused
| """ | |
| Utils used for slack alerting | |
| """ | |
| import asyncio | |
| from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union | |
| from litellm.proxy._types import AlertType | |
| from litellm.secret_managers.main import get_secret | |
| if TYPE_CHECKING: | |
| from litellm.litellm_core_utils.litellm_logging import Logging as _Logging | |
| Logging = _Logging | |
| else: | |
| Logging = Any | |
| def process_slack_alerting_variables( | |
| alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]] | |
| ) -> Optional[Dict[AlertType, Union[List[str], str]]]: | |
| """ | |
| process alert_to_webhook_url | |
| - check if any urls are set as os.environ/SLACK_WEBHOOK_URL_1 read env var and set the correct value | |
| """ | |
| if alert_to_webhook_url is None: | |
| return None | |
| for alert_type, webhook_urls in alert_to_webhook_url.items(): | |
| if isinstance(webhook_urls, list): | |
| _webhook_values: List[str] = [] | |
| for webhook_url in webhook_urls: | |
| if "os.environ/" in webhook_url: | |
| _env_value = get_secret(secret_name=webhook_url) | |
| if not isinstance(_env_value, str): | |
| raise ValueError( | |
| f"Invalid webhook url value for: {webhook_url}. Got type={type(_env_value)}" | |
| ) | |
| _webhook_values.append(_env_value) | |
| else: | |
| _webhook_values.append(webhook_url) | |
| alert_to_webhook_url[alert_type] = _webhook_values | |
| else: | |
| _webhook_value_str: str = webhook_urls | |
| if "os.environ/" in webhook_urls: | |
| _env_value = get_secret(secret_name=webhook_urls) | |
| if not isinstance(_env_value, str): | |
| raise ValueError( | |
| f"Invalid webhook url value for: {webhook_urls}. Got type={type(_env_value)}" | |
| ) | |
| _webhook_value_str = _env_value | |
| else: | |
| _webhook_value_str = webhook_urls | |
| alert_to_webhook_url[alert_type] = _webhook_value_str | |
| return alert_to_webhook_url | |
| async def _add_langfuse_trace_id_to_alert( | |
| request_data: Optional[dict] = None, | |
| ) -> Optional[str]: | |
| """ | |
| Returns langfuse trace url | |
| - check: | |
| -> existing_trace_id | |
| -> trace_id | |
| -> litellm_call_id | |
| """ | |
| # do nothing for now | |
| if ( | |
| request_data is not None | |
| and request_data.get("litellm_logging_obj", None) is not None | |
| ): | |
| trace_id: Optional[str] = None | |
| litellm_logging_obj: Logging = request_data["litellm_logging_obj"] | |
| for _ in range(3): | |
| trace_id = litellm_logging_obj._get_trace_id(service_name="langfuse") | |
| if trace_id is not None: | |
| break | |
| await asyncio.sleep(3) # wait 3s before retrying for trace id | |
| _langfuse_object = litellm_logging_obj._get_callback_object( | |
| service_name="langfuse" | |
| ) | |
| if _langfuse_object is not None: | |
| base_url = _langfuse_object.Langfuse.base_url | |
| return f"{base_url}/trace/{trace_id}" | |
| return None | |