Spaces:
Paused
Paused
| # What is this? | |
| ## On Success events log cost to Lago - https://github.com/BerriAI/litellm/issues/3639 | |
| import json | |
| import os | |
| import uuid | |
| from typing import Literal, Optional | |
| import httpx | |
| import litellm | |
| from litellm._logging import verbose_logger | |
| from litellm.integrations.custom_logger import CustomLogger | |
| from litellm.llms.custom_httpx.http_handler import ( | |
| HTTPHandler, | |
| get_async_httpx_client, | |
| httpxSpecialProvider, | |
| ) | |
| def get_utc_datetime(): | |
| import datetime as dt | |
| from datetime import datetime | |
| if hasattr(dt, "UTC"): | |
| return datetime.now(dt.UTC) # type: ignore | |
| else: | |
| return datetime.utcnow() # type: ignore | |
| class LagoLogger(CustomLogger): | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self.validate_environment() | |
| self.async_http_handler = get_async_httpx_client( | |
| llm_provider=httpxSpecialProvider.LoggingCallback | |
| ) | |
| self.sync_http_handler = HTTPHandler() | |
| def validate_environment(self): | |
| """ | |
| Expects | |
| LAGO_API_BASE, | |
| LAGO_API_KEY, | |
| LAGO_API_EVENT_CODE, | |
| Optional: | |
| LAGO_API_CHARGE_BY | |
| in the environment | |
| """ | |
| missing_keys = [] | |
| if os.getenv("LAGO_API_KEY", None) is None: | |
| missing_keys.append("LAGO_API_KEY") | |
| if os.getenv("LAGO_API_BASE", None) is None: | |
| missing_keys.append("LAGO_API_BASE") | |
| if os.getenv("LAGO_API_EVENT_CODE", None) is None: | |
| missing_keys.append("LAGO_API_EVENT_CODE") | |
| if len(missing_keys) > 0: | |
| raise Exception("Missing keys={} in environment.".format(missing_keys)) | |
| def _common_logic(self, kwargs: dict, response_obj) -> dict: | |
| response_obj.get("id", kwargs.get("litellm_call_id")) | |
| get_utc_datetime().isoformat() | |
| cost = kwargs.get("response_cost", None) | |
| model = kwargs.get("model") | |
| usage = {} | |
| if ( | |
| isinstance(response_obj, litellm.ModelResponse) | |
| or isinstance(response_obj, litellm.EmbeddingResponse) | |
| ) and hasattr(response_obj, "usage"): | |
| usage = { | |
| "prompt_tokens": response_obj["usage"].get("prompt_tokens", 0), | |
| "completion_tokens": response_obj["usage"].get("completion_tokens", 0), | |
| "total_tokens": response_obj["usage"].get("total_tokens"), | |
| } | |
| litellm_params = kwargs.get("litellm_params", {}) or {} | |
| proxy_server_request = litellm_params.get("proxy_server_request") or {} | |
| end_user_id = proxy_server_request.get("body", {}).get("user", None) | |
| user_id = litellm_params["metadata"].get("user_api_key_user_id", None) | |
| team_id = litellm_params["metadata"].get("user_api_key_team_id", None) | |
| litellm_params["metadata"].get("user_api_key_org_id", None) | |
| charge_by: Literal["end_user_id", "team_id", "user_id"] = "end_user_id" | |
| external_customer_id: Optional[str] = None | |
| if os.getenv("LAGO_API_CHARGE_BY", None) is not None and isinstance( | |
| os.environ["LAGO_API_CHARGE_BY"], str | |
| ): | |
| if os.environ["LAGO_API_CHARGE_BY"] in [ | |
| "end_user_id", | |
| "user_id", | |
| "team_id", | |
| ]: | |
| charge_by = os.environ["LAGO_API_CHARGE_BY"] # type: ignore | |
| else: | |
| raise Exception("invalid LAGO_API_CHARGE_BY set") | |
| if charge_by == "end_user_id": | |
| external_customer_id = end_user_id | |
| elif charge_by == "team_id": | |
| external_customer_id = team_id | |
| elif charge_by == "user_id": | |
| external_customer_id = user_id | |
| if external_customer_id is None: | |
| raise Exception( | |
| "External Customer ID is not set. Charge_by={}. User_id={}. End_user_id={}. Team_id={}".format( | |
| charge_by, user_id, end_user_id, team_id | |
| ) | |
| ) | |
| returned_val = { | |
| "event": { | |
| "transaction_id": str(uuid.uuid4()), | |
| "external_subscription_id": external_customer_id, | |
| "code": os.getenv("LAGO_API_EVENT_CODE"), | |
| "properties": {"model": model, "response_cost": cost, **usage}, | |
| } | |
| } | |
| verbose_logger.debug( | |
| "\033[91mLogged Lago Object:\n{}\033[0m\n".format(returned_val) | |
| ) | |
| return returned_val | |
| def log_success_event(self, kwargs, response_obj, start_time, end_time): | |
| _url = os.getenv("LAGO_API_BASE") | |
| assert _url is not None and isinstance( | |
| _url, str | |
| ), "LAGO_API_BASE missing or not set correctly. LAGO_API_BASE={}".format(_url) | |
| if _url.endswith("/"): | |
| _url += "api/v1/events" | |
| else: | |
| _url += "/api/v1/events" | |
| api_key = os.getenv("LAGO_API_KEY") | |
| _data = self._common_logic(kwargs=kwargs, response_obj=response_obj) | |
| _headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": "Bearer {}".format(api_key), | |
| } | |
| try: | |
| response = self.sync_http_handler.post( | |
| url=_url, | |
| data=json.dumps(_data), | |
| headers=_headers, | |
| ) | |
| response.raise_for_status() | |
| except Exception as e: | |
| error_response = getattr(e, "response", None) | |
| if error_response is not None and hasattr(error_response, "text"): | |
| verbose_logger.debug(f"\nError Message: {error_response.text}") | |
| raise e | |
| async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): | |
| try: | |
| verbose_logger.debug("ENTERS LAGO CALLBACK") | |
| _url = os.getenv("LAGO_API_BASE") | |
| assert _url is not None and isinstance( | |
| _url, str | |
| ), "LAGO_API_BASE missing or not set correctly. LAGO_API_BASE={}".format( | |
| _url | |
| ) | |
| if _url.endswith("/"): | |
| _url += "api/v1/events" | |
| else: | |
| _url += "/api/v1/events" | |
| api_key = os.getenv("LAGO_API_KEY") | |
| _data = self._common_logic(kwargs=kwargs, response_obj=response_obj) | |
| _headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": "Bearer {}".format(api_key), | |
| } | |
| except Exception as e: | |
| raise e | |
| response: Optional[httpx.Response] = None | |
| try: | |
| response = await self.async_http_handler.post( | |
| url=_url, | |
| data=json.dumps(_data), | |
| headers=_headers, | |
| ) | |
| response.raise_for_status() | |
| verbose_logger.debug(f"Logged Lago Object: {response.text}") | |
| except Exception as e: | |
| if response is not None and hasattr(response, "text"): | |
| verbose_logger.debug(f"\nError Message: {response.text}") | |
| raise e | |