Spaces:
Paused
Paused
| """ | |
| What is this? | |
| Logging Pass-Through Endpoints | |
| """ | |
| """ | |
| 1. Create pass-through endpoints for any LITELLM_BASE_URL/langfuse/<endpoint> map to LANGFUSE_BASE_URL/<endpoint> | |
| """ | |
| import base64 | |
| import os | |
| from base64 import b64encode | |
| from typing import Optional | |
| import httpx | |
| from fastapi import APIRouter, Request, Response | |
| import litellm | |
| from litellm.proxy._types import * | |
| from litellm.proxy.auth.user_api_key_auth import user_api_key_auth | |
| from litellm.proxy.litellm_pre_call_utils import _get_dynamic_logging_metadata | |
| from litellm.proxy.pass_through_endpoints.pass_through_endpoints import ( | |
| create_pass_through_route, | |
| ) | |
| router = APIRouter() | |
| default_vertex_config = None | |
| def create_request_copy(request: Request): | |
| return { | |
| "method": request.method, | |
| "url": str(request.url), | |
| "headers": dict(request.headers), | |
| "cookies": request.cookies, | |
| "query_params": dict(request.query_params), | |
| } | |
| async def langfuse_proxy_route( | |
| endpoint: str, | |
| request: Request, | |
| fastapi_response: Response, | |
| ): | |
| """ | |
| Call Langfuse via LiteLLM proxy. Works with Langfuse SDK. | |
| [Docs](https://docs.litellm.ai/docs/pass_through/langfuse) | |
| """ | |
| from litellm.proxy.proxy_server import proxy_config | |
| ## CHECK FOR LITELLM API KEY IN THE QUERY PARAMS - ?..key=LITELLM_API_KEY | |
| api_key = request.headers.get("Authorization") or "" | |
| ## decrypt base64 hash | |
| api_key = api_key.replace("Basic ", "") | |
| decoded_bytes = base64.b64decode(api_key) | |
| decoded_str = decoded_bytes.decode("utf-8") | |
| api_key = decoded_str.split(":")[1] # assume api key is passed in as secret key | |
| user_api_key_dict = await user_api_key_auth( | |
| request=request, api_key="Bearer {}".format(api_key) | |
| ) | |
| callback_settings_obj: Optional[ | |
| TeamCallbackMetadata | |
| ] = _get_dynamic_logging_metadata( | |
| user_api_key_dict=user_api_key_dict, proxy_config=proxy_config | |
| ) | |
| dynamic_langfuse_public_key: Optional[str] = None | |
| dynamic_langfuse_secret_key: Optional[str] = None | |
| dynamic_langfuse_host: Optional[str] = None | |
| if ( | |
| callback_settings_obj is not None | |
| and callback_settings_obj.callback_vars is not None | |
| ): | |
| for k, v in callback_settings_obj.callback_vars.items(): | |
| if k == "langfuse_public_key": | |
| dynamic_langfuse_public_key = v | |
| elif k == "langfuse_secret_key": | |
| dynamic_langfuse_secret_key = v | |
| elif k == "langfuse_host": | |
| dynamic_langfuse_host = v | |
| base_target_url: str = ( | |
| dynamic_langfuse_host | |
| or os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com") | |
| or "https://cloud.langfuse.com" | |
| ) | |
| if not ( | |
| base_target_url.startswith("http://") or base_target_url.startswith("https://") | |
| ): | |
| # add http:// if unset, assume communicating over private network - e.g. render | |
| base_target_url = "http://" + base_target_url | |
| encoded_endpoint = httpx.URL(endpoint).path | |
| # Ensure endpoint starts with '/' for proper URL construction | |
| if not encoded_endpoint.startswith("/"): | |
| encoded_endpoint = "/" + encoded_endpoint | |
| # Construct the full target URL using httpx | |
| base_url = httpx.URL(base_target_url) | |
| updated_url = base_url.copy_with(path=encoded_endpoint) | |
| # Add or update query parameters | |
| langfuse_public_key = dynamic_langfuse_public_key or litellm.utils.get_secret( | |
| secret_name="LANGFUSE_PUBLIC_KEY" | |
| ) | |
| langfuse_secret_key = dynamic_langfuse_secret_key or litellm.utils.get_secret( | |
| secret_name="LANGFUSE_SECRET_KEY" | |
| ) | |
| langfuse_combined_key = "Basic " + b64encode( | |
| f"{langfuse_public_key}:{langfuse_secret_key}".encode("utf-8") | |
| ).decode("ascii") | |
| ## CREATE PASS-THROUGH | |
| endpoint_func = create_pass_through_route( | |
| endpoint=endpoint, | |
| target=str(updated_url), | |
| custom_headers={"Authorization": langfuse_combined_key}, | |
| ) # dynamically construct pass-through endpoint based on incoming path | |
| received_value = await endpoint_func( | |
| request, | |
| fastapi_response, | |
| user_api_key_dict, | |
| query_params=dict(request.query_params), # type: ignore | |
| ) | |
| return received_value | |