Spaces:
Paused
Paused
| import json | |
| import os | |
| from typing import Any, Callable, Dict, Optional, Union | |
| import httpx | |
| from openai import AsyncAzureOpenAI, AzureOpenAI | |
| import litellm | |
| from litellm._logging import verbose_logger | |
| from litellm.caching.caching import DualCache | |
| from litellm.llms.base_llm.chat.transformation import BaseLLMException | |
| from litellm.llms.openai.common_utils import BaseOpenAILLM | |
| from litellm.secret_managers.get_azure_ad_token_provider import ( | |
| get_azure_ad_token_provider, | |
| ) | |
| from litellm.secret_managers.main import get_secret_str | |
| azure_ad_cache = DualCache() | |
| class AzureOpenAIError(BaseLLMException): | |
| def __init__( | |
| self, | |
| status_code, | |
| message, | |
| request: Optional[httpx.Request] = None, | |
| response: Optional[httpx.Response] = None, | |
| headers: Optional[Union[httpx.Headers, dict]] = None, | |
| body: Optional[dict] = None, | |
| ): | |
| super().__init__( | |
| status_code=status_code, | |
| message=message, | |
| request=request, | |
| response=response, | |
| headers=headers, | |
| body=body, | |
| ) | |
| def process_azure_headers(headers: Union[httpx.Headers, dict]) -> dict: | |
| openai_headers = {} | |
| if "x-ratelimit-limit-requests" in headers: | |
| openai_headers["x-ratelimit-limit-requests"] = headers[ | |
| "x-ratelimit-limit-requests" | |
| ] | |
| if "x-ratelimit-remaining-requests" in headers: | |
| openai_headers["x-ratelimit-remaining-requests"] = headers[ | |
| "x-ratelimit-remaining-requests" | |
| ] | |
| if "x-ratelimit-limit-tokens" in headers: | |
| openai_headers["x-ratelimit-limit-tokens"] = headers["x-ratelimit-limit-tokens"] | |
| if "x-ratelimit-remaining-tokens" in headers: | |
| openai_headers["x-ratelimit-remaining-tokens"] = headers[ | |
| "x-ratelimit-remaining-tokens" | |
| ] | |
| llm_response_headers = { | |
| "{}-{}".format("llm_provider", k): v for k, v in headers.items() | |
| } | |
| return {**llm_response_headers, **openai_headers} | |
| def get_azure_ad_token_from_entra_id( | |
| tenant_id: str, | |
| client_id: str, | |
| client_secret: str, | |
| scope: str = "https://cognitiveservices.azure.com/.default", | |
| ) -> Callable[[], str]: | |
| """ | |
| Get Azure AD token provider from `client_id`, `client_secret`, and `tenant_id` | |
| Args: | |
| tenant_id: str | |
| client_id: str | |
| client_secret: str | |
| scope: str | |
| Returns: | |
| callable that returns a bearer token. | |
| """ | |
| from azure.identity import ClientSecretCredential, get_bearer_token_provider | |
| verbose_logger.debug("Getting Azure AD Token from Entra ID") | |
| if tenant_id.startswith("os.environ/"): | |
| _tenant_id = get_secret_str(tenant_id) | |
| else: | |
| _tenant_id = tenant_id | |
| if client_id.startswith("os.environ/"): | |
| _client_id = get_secret_str(client_id) | |
| else: | |
| _client_id = client_id | |
| if client_secret.startswith("os.environ/"): | |
| _client_secret = get_secret_str(client_secret) | |
| else: | |
| _client_secret = client_secret | |
| verbose_logger.debug( | |
| "tenant_id %s, client_id %s, client_secret %s", | |
| _tenant_id, | |
| _client_id, | |
| _client_secret, | |
| ) | |
| if _tenant_id is None or _client_id is None or _client_secret is None: | |
| raise ValueError("tenant_id, client_id, and client_secret must be provided") | |
| credential = ClientSecretCredential(_tenant_id, _client_id, _client_secret) | |
| verbose_logger.debug("credential %s", credential) | |
| token_provider = get_bearer_token_provider(credential, scope) | |
| verbose_logger.debug("token_provider %s", token_provider) | |
| return token_provider | |
| def get_azure_ad_token_from_username_password( | |
| client_id: str, | |
| azure_username: str, | |
| azure_password: str, | |
| scope: str = "https://cognitiveservices.azure.com/.default", | |
| ) -> Callable[[], str]: | |
| """ | |
| Get Azure AD token provider from `client_id`, `azure_username`, and `azure_password` | |
| Args: | |
| client_id: str | |
| azure_username: str | |
| azure_password: str | |
| scope: str | |
| Returns: | |
| callable that returns a bearer token. | |
| """ | |
| from azure.identity import UsernamePasswordCredential, get_bearer_token_provider | |
| verbose_logger.debug( | |
| "client_id %s, azure_username %s, azure_password %s", | |
| client_id, | |
| azure_username, | |
| azure_password, | |
| ) | |
| credential = UsernamePasswordCredential( | |
| client_id=client_id, | |
| username=azure_username, | |
| password=azure_password, | |
| ) | |
| verbose_logger.debug("credential %s", credential) | |
| token_provider = get_bearer_token_provider(credential, scope) | |
| verbose_logger.debug("token_provider %s", token_provider) | |
| return token_provider | |
| def get_azure_ad_token_from_oidc( | |
| azure_ad_token: str, | |
| azure_client_id: Optional[str], | |
| azure_tenant_id: Optional[str], | |
| ) -> str: | |
| """ | |
| Get Azure AD token from OIDC token | |
| Args: | |
| azure_ad_token: str | |
| azure_client_id: Optional[str] | |
| azure_tenant_id: Optional[str] | |
| Returns: | |
| `azure_ad_token_access_token` - str | |
| """ | |
| azure_authority_host = os.getenv( | |
| "AZURE_AUTHORITY_HOST", "https://login.microsoftonline.com" | |
| ) | |
| azure_client_id = azure_client_id or os.getenv("AZURE_CLIENT_ID") | |
| azure_tenant_id = azure_tenant_id or os.getenv("AZURE_TENANT_ID") | |
| if azure_client_id is None or azure_tenant_id is None: | |
| raise AzureOpenAIError( | |
| status_code=422, | |
| message="AZURE_CLIENT_ID and AZURE_TENANT_ID must be set", | |
| ) | |
| oidc_token = get_secret_str(azure_ad_token) | |
| if oidc_token is None: | |
| raise AzureOpenAIError( | |
| status_code=401, | |
| message="OIDC token could not be retrieved from secret manager.", | |
| ) | |
| azure_ad_token_cache_key = json.dumps( | |
| { | |
| "azure_client_id": azure_client_id, | |
| "azure_tenant_id": azure_tenant_id, | |
| "azure_authority_host": azure_authority_host, | |
| "oidc_token": oidc_token, | |
| } | |
| ) | |
| azure_ad_token_access_token = azure_ad_cache.get_cache(azure_ad_token_cache_key) | |
| if azure_ad_token_access_token is not None: | |
| return azure_ad_token_access_token | |
| client = litellm.module_level_client | |
| req_token = client.post( | |
| f"{azure_authority_host}/{azure_tenant_id}/oauth2/v2.0/token", | |
| data={ | |
| "client_id": azure_client_id, | |
| "grant_type": "client_credentials", | |
| "scope": "https://cognitiveservices.azure.com/.default", | |
| "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", | |
| "client_assertion": oidc_token, | |
| }, | |
| ) | |
| if req_token.status_code != 200: | |
| raise AzureOpenAIError( | |
| status_code=req_token.status_code, | |
| message=req_token.text, | |
| ) | |
| azure_ad_token_json = req_token.json() | |
| azure_ad_token_access_token = azure_ad_token_json.get("access_token", None) | |
| azure_ad_token_expires_in = azure_ad_token_json.get("expires_in", None) | |
| if azure_ad_token_access_token is None: | |
| raise AzureOpenAIError( | |
| status_code=422, message="Azure AD Token access_token not returned" | |
| ) | |
| if azure_ad_token_expires_in is None: | |
| raise AzureOpenAIError( | |
| status_code=422, message="Azure AD Token expires_in not returned" | |
| ) | |
| azure_ad_cache.set_cache( | |
| key=azure_ad_token_cache_key, | |
| value=azure_ad_token_access_token, | |
| ttl=azure_ad_token_expires_in, | |
| ) | |
| return azure_ad_token_access_token | |
| def select_azure_base_url_or_endpoint(azure_client_params: dict): | |
| azure_endpoint = azure_client_params.get("azure_endpoint", None) | |
| if azure_endpoint is not None: | |
| # see : https://github.com/openai/openai-python/blob/3d61ed42aba652b547029095a7eb269ad4e1e957/src/openai/lib/azure.py#L192 | |
| if "/openai/deployments" in azure_endpoint: | |
| # this is base_url, not an azure_endpoint | |
| azure_client_params["base_url"] = azure_endpoint | |
| azure_client_params.pop("azure_endpoint") | |
| return azure_client_params | |
| class BaseAzureLLM(BaseOpenAILLM): | |
| def get_azure_openai_client( | |
| self, | |
| api_key: Optional[str], | |
| api_base: Optional[str], | |
| api_version: Optional[str] = None, | |
| client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = None, | |
| litellm_params: Optional[dict] = None, | |
| _is_async: bool = False, | |
| model: Optional[str] = None, | |
| ) -> Optional[Union[AzureOpenAI, AsyncAzureOpenAI]]: | |
| openai_client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = None | |
| client_initialization_params: dict = locals() | |
| if client is None: | |
| cached_client = self.get_cached_openai_client( | |
| client_initialization_params=client_initialization_params, | |
| client_type="azure", | |
| ) | |
| if cached_client: | |
| if isinstance(cached_client, AzureOpenAI) or isinstance( | |
| cached_client, AsyncAzureOpenAI | |
| ): | |
| return cached_client | |
| azure_client_params = self.initialize_azure_sdk_client( | |
| litellm_params=litellm_params or {}, | |
| api_key=api_key, | |
| api_base=api_base, | |
| model_name=model, | |
| api_version=api_version, | |
| is_async=_is_async, | |
| ) | |
| if _is_async is True: | |
| openai_client = AsyncAzureOpenAI(**azure_client_params) | |
| else: | |
| openai_client = AzureOpenAI(**azure_client_params) # type: ignore | |
| else: | |
| openai_client = client | |
| if api_version is not None and isinstance( | |
| openai_client._custom_query, dict | |
| ): | |
| # set api_version to version passed by user | |
| openai_client._custom_query.setdefault("api-version", api_version) | |
| # save client in-memory cache | |
| self.set_cached_openai_client( | |
| openai_client=openai_client, | |
| client_initialization_params=client_initialization_params, | |
| client_type="azure", | |
| ) | |
| return openai_client | |
| def initialize_azure_sdk_client( | |
| self, | |
| litellm_params: dict, | |
| api_key: Optional[str], | |
| api_base: Optional[str], | |
| model_name: Optional[str], | |
| api_version: Optional[str], | |
| is_async: bool, | |
| ) -> dict: | |
| azure_ad_token_provider: Optional[Callable[[], str]] = None | |
| # If we have api_key, then we have higher priority | |
| azure_ad_token = litellm_params.get("azure_ad_token") | |
| tenant_id = litellm_params.get("tenant_id", os.getenv("AZURE_TENANT_ID")) | |
| client_id = litellm_params.get("client_id", os.getenv("AZURE_CLIENT_ID")) | |
| client_secret = litellm_params.get( | |
| "client_secret", os.getenv("AZURE_CLIENT_SECRET") | |
| ) | |
| azure_username = litellm_params.get( | |
| "azure_username", os.getenv("AZURE_USERNAME") | |
| ) | |
| azure_password = litellm_params.get( | |
| "azure_password", os.getenv("AZURE_PASSWORD") | |
| ) | |
| max_retries = litellm_params.get("max_retries") | |
| timeout = litellm_params.get("timeout") | |
| if not api_key and tenant_id and client_id and client_secret: | |
| verbose_logger.debug( | |
| "Using Azure AD Token Provider from Entra ID for Azure Auth" | |
| ) | |
| azure_ad_token_provider = get_azure_ad_token_from_entra_id( | |
| tenant_id=tenant_id, | |
| client_id=client_id, | |
| client_secret=client_secret, | |
| ) | |
| if azure_username and azure_password and client_id: | |
| verbose_logger.debug("Using Azure Username and Password for Azure Auth") | |
| azure_ad_token_provider = get_azure_ad_token_from_username_password( | |
| azure_username=azure_username, | |
| azure_password=azure_password, | |
| client_id=client_id, | |
| ) | |
| if azure_ad_token is not None and azure_ad_token.startswith("oidc/"): | |
| verbose_logger.debug("Using Azure OIDC Token for Azure Auth") | |
| azure_ad_token = get_azure_ad_token_from_oidc( | |
| azure_ad_token=azure_ad_token, | |
| azure_client_id=client_id, | |
| azure_tenant_id=tenant_id, | |
| ) | |
| elif ( | |
| not api_key | |
| and azure_ad_token_provider is None | |
| and litellm.enable_azure_ad_token_refresh is True | |
| ): | |
| verbose_logger.debug( | |
| "Using Azure AD token provider based on Service Principal with Secret workflow for Azure Auth" | |
| ) | |
| try: | |
| azure_ad_token_provider = get_azure_ad_token_provider() | |
| except ValueError: | |
| verbose_logger.debug("Azure AD Token Provider could not be used.") | |
| if api_version is None: | |
| api_version = os.getenv( | |
| "AZURE_API_VERSION", litellm.AZURE_DEFAULT_API_VERSION | |
| ) | |
| _api_key = api_key | |
| if _api_key is not None and isinstance(_api_key, str): | |
| # only show first 5 chars of api_key | |
| _api_key = _api_key[:8] + "*" * 15 | |
| verbose_logger.debug( | |
| f"Initializing Azure OpenAI Client for {model_name}, Api Base: {str(api_base)}, Api Key:{_api_key}" | |
| ) | |
| azure_client_params = { | |
| "api_key": api_key, | |
| "azure_endpoint": api_base, | |
| "api_version": api_version, | |
| "azure_ad_token": azure_ad_token, | |
| "azure_ad_token_provider": azure_ad_token_provider, | |
| } | |
| # init http client + SSL Verification settings | |
| if is_async is True: | |
| azure_client_params["http_client"] = self._get_async_http_client() | |
| else: | |
| azure_client_params["http_client"] = self._get_sync_http_client() | |
| if max_retries is not None: | |
| azure_client_params["max_retries"] = max_retries | |
| if timeout is not None: | |
| azure_client_params["timeout"] = timeout | |
| if azure_ad_token_provider is not None: | |
| azure_client_params["azure_ad_token_provider"] = azure_ad_token_provider | |
| # this decides if we should set azure_endpoint or base_url on Azure OpenAI Client | |
| # required to support GPT-4 vision enhancements, since base_url needs to be set on Azure OpenAI Client | |
| azure_client_params = select_azure_base_url_or_endpoint( | |
| azure_client_params=azure_client_params | |
| ) | |
| return azure_client_params | |
| def _init_azure_client_for_cloudflare_ai_gateway( | |
| self, | |
| api_base: str, | |
| model: str, | |
| api_version: str, | |
| max_retries: int, | |
| timeout: Union[float, httpx.Timeout], | |
| litellm_params: dict, | |
| api_key: Optional[str], | |
| azure_ad_token: Optional[str], | |
| azure_ad_token_provider: Optional[Callable[[], str]], | |
| acompletion: bool, | |
| client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = None, | |
| ) -> Union[AzureOpenAI, AsyncAzureOpenAI]: | |
| ## build base url - assume api base includes resource name | |
| tenant_id = litellm_params.get("tenant_id", os.getenv("AZURE_TENANT_ID")) | |
| client_id = litellm_params.get("client_id", os.getenv("AZURE_CLIENT_ID")) | |
| if client is None: | |
| if not api_base.endswith("/"): | |
| api_base += "/" | |
| api_base += f"{model}" | |
| azure_client_params: Dict[str, Any] = { | |
| "api_version": api_version, | |
| "base_url": f"{api_base}", | |
| "http_client": litellm.client_session, | |
| "max_retries": max_retries, | |
| "timeout": timeout, | |
| } | |
| if api_key is not None: | |
| azure_client_params["api_key"] = api_key | |
| elif azure_ad_token is not None: | |
| if azure_ad_token.startswith("oidc/"): | |
| azure_ad_token = get_azure_ad_token_from_oidc( | |
| azure_ad_token=azure_ad_token, | |
| azure_client_id=client_id, | |
| azure_tenant_id=tenant_id, | |
| ) | |
| azure_client_params["azure_ad_token"] = azure_ad_token | |
| if azure_ad_token_provider is not None: | |
| azure_client_params["azure_ad_token_provider"] = azure_ad_token_provider | |
| if acompletion is True: | |
| client = AsyncAzureOpenAI(**azure_client_params) # type: ignore | |
| else: | |
| client = AzureOpenAI(**azure_client_params) # type: ignore | |
| return client | |