Spaces:
Paused
Paused
| # What is this? | |
| ## On Success events log cost to OpenMeter - https://github.com/BerriAI/litellm/issues/1268 | |
| import json | |
| import os | |
| import httpx | |
| import litellm | |
| from litellm.integrations.custom_logger import CustomLogger | |
| from litellm.llms.custom_httpx.http_handler import ( | |
| HTTPHandler, | |
| get_async_httpx_client, | |
| httpxSpecialProvider, | |
| ) | |
| def get_utc_datetime(): | |
| import datetime as dt | |
| from datetime import datetime | |
| if hasattr(dt, "UTC"): | |
| return datetime.now(dt.UTC) # type: ignore | |
| else: | |
| return datetime.utcnow() # type: ignore | |
| class OpenMeterLogger(CustomLogger): | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self.validate_environment() | |
| self.async_http_handler = get_async_httpx_client( | |
| llm_provider=httpxSpecialProvider.LoggingCallback | |
| ) | |
| self.sync_http_handler = HTTPHandler() | |
| def validate_environment(self): | |
| """ | |
| Expects | |
| OPENMETER_API_ENDPOINT, | |
| OPENMETER_API_KEY, | |
| in the environment | |
| """ | |
| missing_keys = [] | |
| if os.getenv("OPENMETER_API_KEY", None) is None: | |
| missing_keys.append("OPENMETER_API_KEY") | |
| if len(missing_keys) > 0: | |
| raise Exception("Missing keys={} in environment.".format(missing_keys)) | |
| def _common_logic(self, kwargs: dict, response_obj): | |
| call_id = response_obj.get("id", kwargs.get("litellm_call_id")) | |
| dt = get_utc_datetime().isoformat() | |
| cost = kwargs.get("response_cost", None) | |
| model = kwargs.get("model") | |
| usage = {} | |
| if ( | |
| isinstance(response_obj, litellm.ModelResponse) | |
| or isinstance(response_obj, litellm.EmbeddingResponse) | |
| ) and hasattr(response_obj, "usage"): | |
| usage = { | |
| "prompt_tokens": response_obj["usage"].get("prompt_tokens", 0), | |
| "completion_tokens": response_obj["usage"].get("completion_tokens", 0), | |
| "total_tokens": response_obj["usage"].get("total_tokens"), | |
| } | |
| subject = (kwargs.get("user", None),) # end-user passed in via 'user' param | |
| if not subject: | |
| raise Exception("OpenMeter: user is required") | |
| return { | |
| "specversion": "1.0", | |
| "type": os.getenv("OPENMETER_EVENT_TYPE", "litellm_tokens"), | |
| "id": call_id, | |
| "time": dt, | |
| "subject": subject, | |
| "source": "litellm-proxy", | |
| "data": {"model": model, "cost": cost, **usage}, | |
| } | |
| def log_success_event(self, kwargs, response_obj, start_time, end_time): | |
| _url = os.getenv("OPENMETER_API_ENDPOINT", "https://openmeter.cloud") | |
| if _url.endswith("/"): | |
| _url += "api/v1/events" | |
| else: | |
| _url += "/api/v1/events" | |
| api_key = os.getenv("OPENMETER_API_KEY") | |
| _data = self._common_logic(kwargs=kwargs, response_obj=response_obj) | |
| _headers = { | |
| "Content-Type": "application/cloudevents+json", | |
| "Authorization": "Bearer {}".format(api_key), | |
| } | |
| try: | |
| self.sync_http_handler.post( | |
| url=_url, | |
| data=json.dumps(_data), | |
| headers=_headers, | |
| ) | |
| except httpx.HTTPStatusError as e: | |
| raise Exception(f"OpenMeter logging error: {e.response.text}") | |
| except Exception as e: | |
| raise e | |
| async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): | |
| _url = os.getenv("OPENMETER_API_ENDPOINT", "https://openmeter.cloud") | |
| if _url.endswith("/"): | |
| _url += "api/v1/events" | |
| else: | |
| _url += "/api/v1/events" | |
| api_key = os.getenv("OPENMETER_API_KEY") | |
| _data = self._common_logic(kwargs=kwargs, response_obj=response_obj) | |
| _headers = { | |
| "Content-Type": "application/cloudevents+json", | |
| "Authorization": "Bearer {}".format(api_key), | |
| } | |
| try: | |
| await self.async_http_handler.post( | |
| url=_url, | |
| data=json.dumps(_data), | |
| headers=_headers, | |
| ) | |
| except httpx.HTTPStatusError as e: | |
| raise Exception(f"OpenMeter logging error: {e.response.text}") | |
| except Exception as e: | |
| raise e | |