Spaces:
Paused
Paused
| import asyncio | |
| import collections.abc | |
| import datetime | |
| import json | |
| import threading | |
| import time | |
| import traceback | |
| import uuid | |
| from typing import Any, Callable, Dict, List, Optional, Union, cast | |
| import httpx | |
| from pydantic import BaseModel | |
| import litellm | |
| from litellm import verbose_logger | |
| from litellm.litellm_core_utils.redact_messages import LiteLLMLoggingObject | |
| from litellm.litellm_core_utils.thread_pool_executor import executor | |
| from litellm.types.llms.openai import ChatCompletionChunk | |
| from litellm.types.router import GenericLiteLLMParams | |
| from litellm.types.utils import Delta | |
| from litellm.types.utils import GenericStreamingChunk as GChunk | |
| from litellm.types.utils import ( | |
| ModelResponse, | |
| ModelResponseStream, | |
| StreamingChoices, | |
| Usage, | |
| ) | |
| from ..exceptions import OpenAIError | |
| from .core_helpers import map_finish_reason, process_response_headers | |
| from .exception_mapping_utils import exception_type | |
| from .llm_response_utils.get_api_base import get_api_base | |
| from .rules import Rules | |
| def is_async_iterable(obj: Any) -> bool: | |
| """ | |
| Check if an object is an async iterable (can be used with 'async for'). | |
| Args: | |
| obj: Any Python object to check | |
| Returns: | |
| bool: True if the object is async iterable, False otherwise | |
| """ | |
| return isinstance(obj, collections.abc.AsyncIterable) | |
| def print_verbose(print_statement): | |
| try: | |
| if litellm.set_verbose: | |
| print(print_statement) # noqa | |
| except Exception: | |
| pass | |
| class CustomStreamWrapper: | |
| def __init__( | |
| self, | |
| completion_stream, | |
| model, | |
| logging_obj: Any, | |
| custom_llm_provider: Optional[str] = None, | |
| stream_options=None, | |
| make_call: Optional[Callable] = None, | |
| _response_headers: Optional[dict] = None, | |
| ): | |
| self.model = model | |
| self.make_call = make_call | |
| self.custom_llm_provider = custom_llm_provider | |
| self.logging_obj: LiteLLMLoggingObject = logging_obj | |
| self.completion_stream = completion_stream | |
| self.sent_first_chunk = False | |
| self.sent_last_chunk = False | |
| litellm_params: GenericLiteLLMParams = GenericLiteLLMParams( | |
| **self.logging_obj.model_call_details.get("litellm_params", {}) | |
| ) | |
| self.merge_reasoning_content_in_choices: bool = ( | |
| litellm_params.merge_reasoning_content_in_choices or False | |
| ) | |
| self.sent_first_thinking_block = False | |
| self.sent_last_thinking_block = False | |
| self.thinking_content = "" | |
| self.system_fingerprint: Optional[str] = None | |
| self.received_finish_reason: Optional[str] = None | |
| self.intermittent_finish_reason: Optional[ | |
| str | |
| ] = None # finish reasons that show up mid-stream | |
| self.special_tokens = [ | |
| "<|assistant|>", | |
| "<|system|>", | |
| "<|user|>", | |
| "<s>", | |
| "</s>", | |
| "<|im_end|>", | |
| "<|im_start|>", | |
| ] | |
| self.holding_chunk = "" | |
| self.complete_response = "" | |
| self.response_uptil_now = "" | |
| _model_info: Dict = litellm_params.model_info or {} | |
| _api_base = get_api_base( | |
| model=model or "", | |
| optional_params=self.logging_obj.model_call_details.get( | |
| "litellm_params", {} | |
| ), | |
| ) | |
| self._hidden_params = { | |
| "model_id": (_model_info.get("id", None)), | |
| "api_base": _api_base, | |
| } # returned as x-litellm-model-id response header in proxy | |
| self._hidden_params["additional_headers"] = process_response_headers( | |
| _response_headers or {} | |
| ) # GUARANTEE OPENAI HEADERS IN RESPONSE | |
| self._response_headers = _response_headers | |
| self.response_id: Optional[str] = None | |
| self.logging_loop = None | |
| self.rules = Rules() | |
| self.stream_options = stream_options or getattr( | |
| logging_obj, "stream_options", None | |
| ) | |
| self.messages = getattr(logging_obj, "messages", None) | |
| self.sent_stream_usage = False | |
| self.send_stream_usage = ( | |
| True if self.check_send_stream_usage(self.stream_options) else False | |
| ) | |
| self.tool_call = False | |
| self.chunks: List = ( | |
| [] | |
| ) # keep track of the returned chunks - used for calculating the input/output tokens for stream options | |
| self.is_function_call = self.check_is_function_call(logging_obj=logging_obj) | |
| def __iter__(self): | |
| return self | |
| def __aiter__(self): | |
| return self | |
| def check_send_stream_usage(self, stream_options: Optional[dict]): | |
| return ( | |
| stream_options is not None | |
| and stream_options.get("include_usage", False) is True | |
| ) | |
| def check_is_function_call(self, logging_obj) -> bool: | |
| if hasattr(logging_obj, "optional_params") and isinstance( | |
| logging_obj.optional_params, dict | |
| ): | |
| if ( | |
| "litellm_param_is_function_call" in logging_obj.optional_params | |
| and logging_obj.optional_params["litellm_param_is_function_call"] | |
| is True | |
| ): | |
| return True | |
| return False | |
| def process_chunk(self, chunk: str): | |
| """ | |
| NLP Cloud streaming returns the entire response, for each chunk. Process this, to only return the delta. | |
| """ | |
| try: | |
| chunk = chunk.strip() | |
| self.complete_response = self.complete_response.strip() | |
| if chunk.startswith(self.complete_response): | |
| # Remove last_sent_chunk only if it appears at the start of the new chunk | |
| chunk = chunk[len(self.complete_response) :] | |
| self.complete_response += chunk | |
| return chunk | |
| except Exception as e: | |
| raise e | |
| def safety_checker(self) -> None: | |
| """ | |
| Fixes - https://github.com/BerriAI/litellm/issues/5158 | |
| if the model enters a loop and starts repeating the same chunk again, break out of loop and raise an internalservererror - allows for retries. | |
| Raises - InternalServerError, if LLM enters infinite loop while streaming | |
| """ | |
| if len(self.chunks) >= litellm.REPEATED_STREAMING_CHUNK_LIMIT: | |
| # Get the last n chunks | |
| last_chunks = self.chunks[-litellm.REPEATED_STREAMING_CHUNK_LIMIT :] | |
| # Extract the relevant content from the chunks | |
| last_contents = [chunk.choices[0].delta.content for chunk in last_chunks] | |
| # Check if all extracted contents are identical | |
| if all(content == last_contents[0] for content in last_contents): | |
| if ( | |
| last_contents[0] is not None | |
| and isinstance(last_contents[0], str) | |
| and len(last_contents[0]) > 2 | |
| ): # ignore empty content - https://github.com/BerriAI/litellm/issues/5158#issuecomment-2287156946 | |
| # All last n chunks are identical | |
| raise litellm.InternalServerError( | |
| message="The model is repeating the same chunk = {}.".format( | |
| last_contents[0] | |
| ), | |
| model="", | |
| llm_provider="", | |
| ) | |
| def check_special_tokens(self, chunk: str, finish_reason: Optional[str]): | |
| """ | |
| Output parse <s> / </s> special tokens for sagemaker + hf streaming. | |
| """ | |
| hold = False | |
| if self.custom_llm_provider != "sagemaker": | |
| return hold, chunk | |
| if finish_reason: | |
| for token in self.special_tokens: | |
| if token in chunk: | |
| chunk = chunk.replace(token, "") | |
| return hold, chunk | |
| if self.sent_first_chunk is True: | |
| return hold, chunk | |
| curr_chunk = self.holding_chunk + chunk | |
| curr_chunk = curr_chunk.strip() | |
| for token in self.special_tokens: | |
| if len(curr_chunk) < len(token) and curr_chunk in token: | |
| hold = True | |
| self.holding_chunk = curr_chunk | |
| elif len(curr_chunk) >= len(token): | |
| if token in curr_chunk: | |
| self.holding_chunk = curr_chunk.replace(token, "") | |
| hold = True | |
| else: | |
| pass | |
| if hold is False: # reset | |
| self.holding_chunk = "" | |
| return hold, curr_chunk | |
| def handle_predibase_chunk(self, chunk): | |
| try: | |
| if not isinstance(chunk, str): | |
| chunk = chunk.decode( | |
| "utf-8" | |
| ) # DO NOT REMOVE this: This is required for HF inference API + Streaming | |
| text = "" | |
| is_finished = False | |
| finish_reason = "" | |
| print_verbose(f"chunk: {chunk}") | |
| if chunk.startswith("data:"): | |
| data_json = json.loads(chunk[5:]) | |
| print_verbose(f"data json: {data_json}") | |
| if "token" in data_json and "text" in data_json["token"]: | |
| text = data_json["token"]["text"] | |
| if data_json.get("details", False) and data_json["details"].get( | |
| "finish_reason", False | |
| ): | |
| is_finished = True | |
| finish_reason = data_json["details"]["finish_reason"] | |
| elif data_json.get( | |
| "generated_text", False | |
| ): # if full generated text exists, then stream is complete | |
| text = "" # don't return the final bos token | |
| is_finished = True | |
| finish_reason = "stop" | |
| elif data_json.get("error", False): | |
| raise Exception(data_json.get("error")) | |
| return { | |
| "text": text, | |
| "is_finished": is_finished, | |
| "finish_reason": finish_reason, | |
| } | |
| elif "error" in chunk: | |
| raise ValueError(chunk) | |
| return { | |
| "text": text, | |
| "is_finished": is_finished, | |
| "finish_reason": finish_reason, | |
| } | |
| except Exception as e: | |
| raise e | |
| def handle_ai21_chunk(self, chunk): # fake streaming | |
| chunk = chunk.decode("utf-8") | |
| data_json = json.loads(chunk) | |
| try: | |
| text = data_json["completions"][0]["data"]["text"] | |
| is_finished = True | |
| finish_reason = "stop" | |
| return { | |
| "text": text, | |
| "is_finished": is_finished, | |
| "finish_reason": finish_reason, | |
| } | |
| except Exception: | |
| raise ValueError(f"Unable to parse response. Original response: {chunk}") | |
| def handle_maritalk_chunk(self, chunk): # fake streaming | |
| chunk = chunk.decode("utf-8") | |
| data_json = json.loads(chunk) | |
| try: | |
| text = data_json["answer"] | |
| is_finished = True | |
| finish_reason = "stop" | |
| return { | |
| "text": text, | |
| "is_finished": is_finished, | |
| "finish_reason": finish_reason, | |
| } | |
| except Exception: | |
| raise ValueError(f"Unable to parse response. Original response: {chunk}") | |
| def handle_nlp_cloud_chunk(self, chunk): | |
| text = "" | |
| is_finished = False | |
| finish_reason = "" | |
| try: | |
| if "dolphin" in self.model: | |
| chunk = self.process_chunk(chunk=chunk) | |
| else: | |
| data_json = json.loads(chunk) | |
| chunk = data_json["generated_text"] | |
| text = chunk | |
| if "[DONE]" in text: | |
| text = text.replace("[DONE]", "") | |
| is_finished = True | |
| finish_reason = "stop" | |
| return { | |
| "text": text, | |
| "is_finished": is_finished, | |
| "finish_reason": finish_reason, | |
| } | |
| except Exception: | |
| raise ValueError(f"Unable to parse response. Original response: {chunk}") | |
| def handle_aleph_alpha_chunk(self, chunk): | |
| chunk = chunk.decode("utf-8") | |
| data_json = json.loads(chunk) | |
| try: | |
| text = data_json["completions"][0]["completion"] | |
| is_finished = True | |
| finish_reason = "stop" | |
| return { | |
| "text": text, | |
| "is_finished": is_finished, | |
| "finish_reason": finish_reason, | |
| } | |
| except Exception: | |
| raise ValueError(f"Unable to parse response. Original response: {chunk}") | |
| def handle_azure_chunk(self, chunk): | |
| is_finished = False | |
| finish_reason = "" | |
| text = "" | |
| print_verbose(f"chunk: {chunk}") | |
| if "data: [DONE]" in chunk: | |
| text = "" | |
| is_finished = True | |
| finish_reason = "stop" | |
| return { | |
| "text": text, | |
| "is_finished": is_finished, | |
| "finish_reason": finish_reason, | |
| } | |
| elif chunk.startswith("data:"): | |
| data_json = json.loads(chunk[5:]) # chunk.startswith("data:"): | |
| try: | |
| if len(data_json["choices"]) > 0: | |
| delta = data_json["choices"][0]["delta"] | |
| text = "" if delta is None else delta.get("content", "") | |
| if data_json["choices"][0].get("finish_reason", None): | |
| is_finished = True | |
| finish_reason = data_json["choices"][0]["finish_reason"] | |
| print_verbose( | |
| f"text: {text}; is_finished: {is_finished}; finish_reason: {finish_reason}" | |
| ) | |
| return { | |
| "text": text, | |
| "is_finished": is_finished, | |
| "finish_reason": finish_reason, | |
| } | |
| except Exception: | |
| raise ValueError( | |
| f"Unable to parse response. Original response: {chunk}" | |
| ) | |
| elif "error" in chunk: | |
| raise ValueError(f"Unable to parse response. Original response: {chunk}") | |
| else: | |
| return { | |
| "text": text, | |
| "is_finished": is_finished, | |
| "finish_reason": finish_reason, | |
| } | |
| def handle_replicate_chunk(self, chunk): | |
| try: | |
| text = "" | |
| is_finished = False | |
| finish_reason = "" | |
| if "output" in chunk: | |
| text = chunk["output"] | |
| if "status" in chunk: | |
| if chunk["status"] == "succeeded": | |
| is_finished = True | |
| finish_reason = "stop" | |
| elif chunk.get("error", None): | |
| raise Exception(chunk["error"]) | |
| return { | |
| "text": text, | |
| "is_finished": is_finished, | |
| "finish_reason": finish_reason, | |
| } | |
| except Exception: | |
| raise ValueError(f"Unable to parse response. Original response: {chunk}") | |
| def handle_openai_chat_completion_chunk(self, chunk): | |
| try: | |
| print_verbose(f"\nRaw OpenAI Chunk\n{chunk}\n") | |
| str_line = chunk | |
| text = "" | |
| is_finished = False | |
| finish_reason = None | |
| logprobs = None | |
| usage = None | |
| if str_line and str_line.choices and len(str_line.choices) > 0: | |
| if ( | |
| str_line.choices[0].delta is not None | |
| and str_line.choices[0].delta.content is not None | |
| ): | |
| text = str_line.choices[0].delta.content | |
| else: # function/tool calling chunk - when content is None. in this case we just return the original chunk from openai | |
| pass | |
| if str_line.choices[0].finish_reason: | |
| is_finished = True | |
| finish_reason = str_line.choices[0].finish_reason | |
| # checking for logprobs | |
| if ( | |
| hasattr(str_line.choices[0], "logprobs") | |
| and str_line.choices[0].logprobs is not None | |
| ): | |
| logprobs = str_line.choices[0].logprobs | |
| else: | |
| logprobs = None | |
| usage = getattr(str_line, "usage", None) | |
| return { | |
| "text": text, | |
| "is_finished": is_finished, | |
| "finish_reason": finish_reason, | |
| "logprobs": logprobs, | |
| "original_chunk": str_line, | |
| "usage": usage, | |
| } | |
| except Exception as e: | |
| raise e | |
| def handle_azure_text_completion_chunk(self, chunk): | |
| try: | |
| print_verbose(f"\nRaw OpenAI Chunk\n{chunk}\n") | |
| text = "" | |
| is_finished = False | |
| finish_reason = None | |
| choices = getattr(chunk, "choices", []) | |
| if len(choices) > 0: | |
| text = choices[0].text | |
| if choices[0].finish_reason is not None: | |
| is_finished = True | |
| finish_reason = choices[0].finish_reason | |
| return { | |
| "text": text, | |
| "is_finished": is_finished, | |
| "finish_reason": finish_reason, | |
| } | |
| except Exception as e: | |
| raise e | |
| def handle_openai_text_completion_chunk(self, chunk): | |
| try: | |
| print_verbose(f"\nRaw OpenAI Chunk\n{chunk}\n") | |
| text = "" | |
| is_finished = False | |
| finish_reason = None | |
| usage = None | |
| choices = getattr(chunk, "choices", []) | |
| if len(choices) > 0: | |
| text = choices[0].text | |
| if choices[0].finish_reason is not None: | |
| is_finished = True | |
| finish_reason = choices[0].finish_reason | |
| usage = getattr(chunk, "usage", None) | |
| return { | |
| "text": text, | |
| "is_finished": is_finished, | |
| "finish_reason": finish_reason, | |
| "usage": usage, | |
| } | |
| except Exception as e: | |
| raise e | |
| def handle_baseten_chunk(self, chunk): | |
| try: | |
| chunk = chunk.decode("utf-8") | |
| if len(chunk) > 0: | |
| if chunk.startswith("data:"): | |
| data_json = json.loads(chunk[5:]) | |
| if "token" in data_json and "text" in data_json["token"]: | |
| return data_json["token"]["text"] | |
| else: | |
| return "" | |
| data_json = json.loads(chunk) | |
| if "model_output" in data_json: | |
| if ( | |
| isinstance(data_json["model_output"], dict) | |
| and "data" in data_json["model_output"] | |
| and isinstance(data_json["model_output"]["data"], list) | |
| ): | |
| return data_json["model_output"]["data"][0] | |
| elif isinstance(data_json["model_output"], str): | |
| return data_json["model_output"] | |
| elif "completion" in data_json and isinstance( | |
| data_json["completion"], str | |
| ): | |
| return data_json["completion"] | |
| else: | |
| raise ValueError( | |
| f"Unable to parse response. Original response: {chunk}" | |
| ) | |
| else: | |
| return "" | |
| else: | |
| return "" | |
| except Exception as e: | |
| verbose_logger.exception( | |
| "litellm.CustomStreamWrapper.handle_baseten_chunk(): Exception occured - {}".format( | |
| str(e) | |
| ) | |
| ) | |
| return "" | |
| def handle_ollama_chat_stream(self, chunk): | |
| # for ollama_chat/ provider | |
| try: | |
| if isinstance(chunk, dict): | |
| json_chunk = chunk | |
| else: | |
| json_chunk = json.loads(chunk) | |
| if "error" in json_chunk: | |
| raise Exception(f"Ollama Error - {json_chunk}") | |
| text = "" | |
| is_finished = False | |
| finish_reason = None | |
| if json_chunk["done"] is True: | |
| text = "" | |
| is_finished = True | |
| finish_reason = "stop" | |
| return { | |
| "text": text, | |
| "is_finished": is_finished, | |
| "finish_reason": finish_reason, | |
| } | |
| elif "message" in json_chunk: | |
| print_verbose(f"delta content: {json_chunk}") | |
| text = json_chunk["message"]["content"] | |
| return { | |
| "text": text, | |
| "is_finished": is_finished, | |
| "finish_reason": finish_reason, | |
| } | |
| else: | |
| raise Exception(f"Ollama Error - {json_chunk}") | |
| except Exception as e: | |
| raise e | |
| def handle_triton_stream(self, chunk): | |
| try: | |
| if isinstance(chunk, dict): | |
| parsed_response = chunk | |
| elif isinstance(chunk, (str, bytes)): | |
| if isinstance(chunk, bytes): | |
| chunk = chunk.decode("utf-8") | |
| if "text_output" in chunk: | |
| response = ( | |
| CustomStreamWrapper._strip_sse_data_from_chunk(chunk) or "" | |
| ) | |
| response = response.strip() | |
| parsed_response = json.loads(response) | |
| else: | |
| return { | |
| "text": "", | |
| "is_finished": False, | |
| "prompt_tokens": 0, | |
| "completion_tokens": 0, | |
| } | |
| else: | |
| print_verbose(f"chunk: {chunk} (Type: {type(chunk)})") | |
| raise ValueError( | |
| f"Unable to parse response. Original response: {chunk}" | |
| ) | |
| text = parsed_response.get("text_output", "") | |
| finish_reason = parsed_response.get("stop_reason") | |
| is_finished = parsed_response.get("is_finished", False) | |
| return { | |
| "text": text, | |
| "is_finished": is_finished, | |
| "finish_reason": finish_reason, | |
| "prompt_tokens": parsed_response.get("input_token_count", 0), | |
| "completion_tokens": parsed_response.get("generated_token_count", 0), | |
| } | |
| return {"text": "", "is_finished": False} | |
| except Exception as e: | |
| raise e | |
| def model_response_creator( | |
| self, chunk: Optional[dict] = None, hidden_params: Optional[dict] = None | |
| ): | |
| _model = self.model | |
| _received_llm_provider = self.custom_llm_provider | |
| _logging_obj_llm_provider = self.logging_obj.model_call_details.get("custom_llm_provider", None) # type: ignore | |
| if ( | |
| _received_llm_provider == "openai" | |
| and _received_llm_provider != _logging_obj_llm_provider | |
| ): | |
| _model = "{}/{}".format(_logging_obj_llm_provider, _model) | |
| if chunk is None: | |
| chunk = {} | |
| else: | |
| # pop model keyword | |
| chunk.pop("model", None) | |
| chunk_dict = {} | |
| for key, value in chunk.items(): | |
| if key != "stream": | |
| chunk_dict[key] = value | |
| args = { | |
| "model": _model, | |
| "stream_options": self.stream_options, | |
| **chunk_dict, | |
| } | |
| model_response = ModelResponseStream(**args) | |
| if self.response_id is not None: | |
| model_response.id = self.response_id | |
| else: | |
| self.response_id = model_response.id # type: ignore | |
| if self.system_fingerprint is not None: | |
| model_response.system_fingerprint = self.system_fingerprint | |
| if hidden_params is not None: | |
| model_response._hidden_params = hidden_params | |
| model_response._hidden_params["custom_llm_provider"] = _logging_obj_llm_provider | |
| model_response._hidden_params["created_at"] = time.time() | |
| model_response._hidden_params = { | |
| **model_response._hidden_params, | |
| **self._hidden_params, | |
| } | |
| if ( | |
| len(model_response.choices) > 0 | |
| and getattr(model_response.choices[0], "delta") is not None | |
| ): | |
| # do nothing, if object instantiated | |
| pass | |
| else: | |
| model_response.choices = [StreamingChoices(finish_reason=None)] | |
| return model_response | |
| def is_delta_empty(self, delta: Delta) -> bool: | |
| is_empty = True | |
| if delta.content: | |
| is_empty = False | |
| elif delta.tool_calls is not None: | |
| is_empty = False | |
| elif delta.function_call is not None: | |
| is_empty = False | |
| return is_empty | |
| def set_model_id( | |
| self, id: str, model_response: ModelResponseStream | |
| ) -> ModelResponseStream: | |
| """ | |
| Set the model id and response id to the given id. | |
| Ensure model id is always the same across all chunks. | |
| If first chunk sent + id set, use that id for all chunks. | |
| """ | |
| if self.response_id is None: | |
| self.response_id = id | |
| if self.response_id is not None and isinstance(self.response_id, str): | |
| model_response.id = self.response_id | |
| return model_response | |
| def copy_model_response_level_provider_specific_fields( | |
| self, | |
| original_chunk: Union[ModelResponseStream, ChatCompletionChunk], | |
| model_response: ModelResponseStream, | |
| ) -> ModelResponseStream: | |
| """ | |
| Copy provider_specific_fields from original_chunk to model_response. | |
| """ | |
| provider_specific_fields = getattr( | |
| original_chunk, "provider_specific_fields", None | |
| ) | |
| if provider_specific_fields is not None: | |
| model_response.provider_specific_fields = provider_specific_fields | |
| for k, v in provider_specific_fields.items(): | |
| setattr(model_response, k, v) | |
| return model_response | |
| def is_chunk_non_empty( | |
| self, | |
| completion_obj: Dict[str, Any], | |
| model_response: ModelResponseStream, | |
| response_obj: Dict[str, Any], | |
| ) -> bool: | |
| if ( | |
| "content" in completion_obj | |
| and ( | |
| isinstance(completion_obj["content"], str) | |
| and len(completion_obj["content"]) > 0 | |
| ) | |
| or ( | |
| "tool_calls" in completion_obj | |
| and completion_obj["tool_calls"] is not None | |
| and len(completion_obj["tool_calls"]) > 0 | |
| ) | |
| or ( | |
| "function_call" in completion_obj | |
| and completion_obj["function_call"] is not None | |
| ) | |
| or ( | |
| "reasoning_content" in model_response.choices[0].delta | |
| and model_response.choices[0].delta.reasoning_content is not None | |
| ) | |
| or (model_response.choices[0].delta.provider_specific_fields is not None) | |
| or ( | |
| "provider_specific_fields" in model_response | |
| and model_response.choices[0].delta.provider_specific_fields is not None | |
| ) | |
| or ( | |
| "provider_specific_fields" in response_obj | |
| and response_obj["provider_specific_fields"] is not None | |
| ) | |
| or ( | |
| "annotations" in model_response.choices[0].delta | |
| and model_response.choices[0].delta.annotations is not None | |
| ) | |
| ): | |
| return True | |
| else: | |
| return False | |
| def return_processed_chunk_logic( # noqa | |
| self, | |
| completion_obj: Dict[str, Any], | |
| model_response: ModelResponseStream, | |
| response_obj: Dict[str, Any], | |
| ): | |
| print_verbose( | |
| f"completion_obj: {completion_obj}, model_response.choices[0]: {model_response.choices[0]}, response_obj: {response_obj}" | |
| ) | |
| is_chunk_non_empty = self.is_chunk_non_empty( | |
| completion_obj, model_response, response_obj | |
| ) | |
| if ( | |
| is_chunk_non_empty | |
| ): # cannot set content of an OpenAI Object to be an empty string | |
| self.safety_checker() | |
| hold, model_response_str = self.check_special_tokens( | |
| chunk=completion_obj["content"], | |
| finish_reason=model_response.choices[0].finish_reason, | |
| ) # filter out bos/eos tokens from openai-compatible hf endpoints | |
| print_verbose(f"hold - {hold}, model_response_str - {model_response_str}") | |
| if hold is False: | |
| ## check if openai/azure chunk | |
| original_chunk = response_obj.get("original_chunk", None) | |
| if original_chunk: | |
| if len(original_chunk.choices) > 0: | |
| choices = [] | |
| for choice in original_chunk.choices: | |
| try: | |
| if isinstance(choice, BaseModel): | |
| choice_json = choice.model_dump() # type: ignore | |
| choice_json.pop( | |
| "finish_reason", None | |
| ) # for mistral etc. which return a value in their last chunk (not-openai compatible). | |
| print_verbose(f"choice_json: {choice_json}") | |
| choices.append(StreamingChoices(**choice_json)) | |
| except Exception: | |
| choices.append(StreamingChoices()) | |
| print_verbose(f"choices in streaming: {choices}") | |
| setattr(model_response, "choices", choices) | |
| else: | |
| return | |
| model_response.system_fingerprint = ( | |
| original_chunk.system_fingerprint | |
| ) | |
| setattr( | |
| model_response, | |
| "citations", | |
| getattr(original_chunk, "citations", None), | |
| ) | |
| print_verbose(f"self.sent_first_chunk: {self.sent_first_chunk}") | |
| if self.sent_first_chunk is False: | |
| model_response.choices[0].delta["role"] = "assistant" | |
| self.sent_first_chunk = True | |
| elif self.sent_first_chunk is True and hasattr( | |
| model_response.choices[0].delta, "role" | |
| ): | |
| _initial_delta = model_response.choices[0].delta.model_dump() | |
| _initial_delta.pop("role", None) | |
| model_response.choices[0].delta = Delta(**_initial_delta) | |
| verbose_logger.debug( | |
| f"model_response.choices[0].delta: {model_response.choices[0].delta}" | |
| ) | |
| else: | |
| ## else | |
| completion_obj["content"] = model_response_str | |
| if self.sent_first_chunk is False: | |
| completion_obj["role"] = "assistant" | |
| self.sent_first_chunk = True | |
| if response_obj.get("provider_specific_fields") is not None: | |
| completion_obj["provider_specific_fields"] = response_obj[ | |
| "provider_specific_fields" | |
| ] | |
| model_response.choices[0].delta = Delta(**completion_obj) | |
| _index: Optional[int] = completion_obj.get("index") | |
| if _index is not None: | |
| model_response.choices[0].index = _index | |
| self._optional_combine_thinking_block_in_choices( | |
| model_response=model_response | |
| ) | |
| print_verbose(f"returning model_response: {model_response}") | |
| return model_response | |
| else: | |
| return | |
| elif self.received_finish_reason is not None: | |
| if self.sent_last_chunk is True: | |
| # Bedrock returns the guardrail trace in the last chunk - we want to return this here | |
| if self.custom_llm_provider == "bedrock" and "trace" in model_response: | |
| return model_response | |
| # Default - return StopIteration | |
| if hasattr(model_response, "usage"): | |
| self.chunks.append(model_response) | |
| raise StopIteration | |
| # flush any remaining holding chunk | |
| if len(self.holding_chunk) > 0: | |
| if model_response.choices[0].delta.content is None: | |
| model_response.choices[0].delta.content = self.holding_chunk | |
| else: | |
| model_response.choices[0].delta.content = ( | |
| self.holding_chunk + model_response.choices[0].delta.content | |
| ) | |
| self.holding_chunk = "" | |
| # if delta is None | |
| _is_delta_empty = self.is_delta_empty(delta=model_response.choices[0].delta) | |
| if _is_delta_empty: | |
| model_response.choices[0].delta = Delta( | |
| content=None | |
| ) # ensure empty delta chunk returned | |
| # get any function call arguments | |
| model_response.choices[0].finish_reason = map_finish_reason( | |
| finish_reason=self.received_finish_reason | |
| ) # ensure consistent output to openai | |
| self.sent_last_chunk = True | |
| return model_response | |
| elif ( | |
| model_response.choices[0].delta.tool_calls is not None | |
| or model_response.choices[0].delta.function_call is not None | |
| ): | |
| if self.sent_first_chunk is False: | |
| model_response.choices[0].delta["role"] = "assistant" | |
| self.sent_first_chunk = True | |
| return model_response | |
| elif ( | |
| len(model_response.choices) > 0 | |
| and hasattr(model_response.choices[0].delta, "audio") | |
| and model_response.choices[0].delta.audio is not None | |
| ): | |
| return model_response | |
| else: | |
| if hasattr(model_response, "usage"): | |
| self.chunks.append(model_response) | |
| return | |
| def _optional_combine_thinking_block_in_choices( | |
| self, model_response: ModelResponseStream | |
| ) -> None: | |
| """ | |
| UI's Like OpenWebUI expect to get 1 chunk with <think>...</think> tags in the chunk content | |
| In place updates the model_response object with reasoning_content in content with <think>...</think> tags | |
| Enabled when `merge_reasoning_content_in_choices=True` passed in request params | |
| """ | |
| if self.merge_reasoning_content_in_choices is True: | |
| reasoning_content = getattr( | |
| model_response.choices[0].delta, "reasoning_content", None | |
| ) | |
| if reasoning_content: | |
| if self.sent_first_thinking_block is False: | |
| model_response.choices[0].delta.content += ( | |
| "<think>" + reasoning_content | |
| ) | |
| self.sent_first_thinking_block = True | |
| elif ( | |
| self.sent_first_thinking_block is True | |
| and hasattr(model_response.choices[0].delta, "reasoning_content") | |
| and model_response.choices[0].delta.reasoning_content | |
| ): | |
| model_response.choices[0].delta.content = reasoning_content | |
| elif ( | |
| self.sent_first_thinking_block is True | |
| and not self.sent_last_thinking_block | |
| and model_response.choices[0].delta.content | |
| ): | |
| model_response.choices[0].delta.content = ( | |
| "</think>" + model_response.choices[0].delta.content | |
| ) | |
| self.sent_last_thinking_block = True | |
| if hasattr(model_response.choices[0].delta, "reasoning_content"): | |
| del model_response.choices[0].delta.reasoning_content | |
| return | |
| def chunk_creator(self, chunk: Any): # type: ignore # noqa: PLR0915 | |
| model_response = self.model_response_creator() | |
| response_obj: Dict[str, Any] = {} | |
| try: | |
| # return this for all models | |
| completion_obj: Dict[str, Any] = {"content": ""} | |
| from litellm.types.utils import GenericStreamingChunk as GChunk | |
| if ( | |
| isinstance(chunk, dict) | |
| and generic_chunk_has_all_required_fields( | |
| chunk=chunk | |
| ) # check if chunk is a generic streaming chunk | |
| ) or ( | |
| self.custom_llm_provider | |
| and self.custom_llm_provider in litellm._custom_providers | |
| ): | |
| if self.received_finish_reason is not None: | |
| if "provider_specific_fields" not in chunk: | |
| raise StopIteration | |
| anthropic_response_obj: GChunk = cast(GChunk, chunk) | |
| completion_obj["content"] = anthropic_response_obj["text"] | |
| if anthropic_response_obj["is_finished"]: | |
| self.received_finish_reason = anthropic_response_obj[ | |
| "finish_reason" | |
| ] | |
| if anthropic_response_obj["finish_reason"]: | |
| self.intermittent_finish_reason = anthropic_response_obj[ | |
| "finish_reason" | |
| ] | |
| if anthropic_response_obj["usage"] is not None: | |
| model_response.usage = litellm.Usage( | |
| **anthropic_response_obj["usage"] | |
| ) | |
| if ( | |
| "tool_use" in anthropic_response_obj | |
| and anthropic_response_obj["tool_use"] is not None | |
| ): | |
| completion_obj["tool_calls"] = [anthropic_response_obj["tool_use"]] | |
| if ( | |
| "provider_specific_fields" in anthropic_response_obj | |
| and anthropic_response_obj["provider_specific_fields"] is not None | |
| ): | |
| for key, value in anthropic_response_obj[ | |
| "provider_specific_fields" | |
| ].items(): | |
| setattr(model_response, key, value) | |
| response_obj = cast(Dict[str, Any], anthropic_response_obj) | |
| elif self.model == "replicate" or self.custom_llm_provider == "replicate": | |
| response_obj = self.handle_replicate_chunk(chunk) | |
| completion_obj["content"] = response_obj["text"] | |
| if response_obj["is_finished"]: | |
| self.received_finish_reason = response_obj["finish_reason"] | |
| elif self.custom_llm_provider and self.custom_llm_provider == "predibase": | |
| response_obj = self.handle_predibase_chunk(chunk) | |
| completion_obj["content"] = response_obj["text"] | |
| if response_obj["is_finished"]: | |
| self.received_finish_reason = response_obj["finish_reason"] | |
| elif ( | |
| self.custom_llm_provider and self.custom_llm_provider == "baseten" | |
| ): # baseten doesn't provide streaming | |
| completion_obj["content"] = self.handle_baseten_chunk(chunk) | |
| elif ( | |
| self.custom_llm_provider and self.custom_llm_provider == "ai21" | |
| ): # ai21 doesn't provide streaming | |
| response_obj = self.handle_ai21_chunk(chunk) | |
| completion_obj["content"] = response_obj["text"] | |
| if response_obj["is_finished"]: | |
| self.received_finish_reason = response_obj["finish_reason"] | |
| elif self.custom_llm_provider and self.custom_llm_provider == "maritalk": | |
| response_obj = self.handle_maritalk_chunk(chunk) | |
| completion_obj["content"] = response_obj["text"] | |
| if response_obj["is_finished"]: | |
| self.received_finish_reason = response_obj["finish_reason"] | |
| elif self.custom_llm_provider and self.custom_llm_provider == "vllm": | |
| completion_obj["content"] = chunk[0].outputs[0].text | |
| elif ( | |
| self.custom_llm_provider and self.custom_llm_provider == "aleph_alpha" | |
| ): # aleph alpha doesn't provide streaming | |
| response_obj = self.handle_aleph_alpha_chunk(chunk) | |
| completion_obj["content"] = response_obj["text"] | |
| if response_obj["is_finished"]: | |
| self.received_finish_reason = response_obj["finish_reason"] | |
| elif self.custom_llm_provider == "nlp_cloud": | |
| try: | |
| response_obj = self.handle_nlp_cloud_chunk(chunk) | |
| completion_obj["content"] = response_obj["text"] | |
| if response_obj["is_finished"]: | |
| self.received_finish_reason = response_obj["finish_reason"] | |
| except Exception as e: | |
| if self.received_finish_reason: | |
| raise e | |
| else: | |
| if self.sent_first_chunk is False: | |
| raise Exception("An unknown error occurred with the stream") | |
| self.received_finish_reason = "stop" | |
| elif self.custom_llm_provider == "vertex_ai": | |
| import proto # type: ignore | |
| if hasattr(chunk, "candidates") is True: | |
| try: | |
| try: | |
| completion_obj["content"] = chunk.text | |
| except Exception as e: | |
| original_exception = e | |
| if "Part has no text." in str(e): | |
| ## check for function calling | |
| function_call = ( | |
| chunk.candidates[0].content.parts[0].function_call | |
| ) | |
| args_dict = {} | |
| # Check if it's a RepeatedComposite instance | |
| for key, val in function_call.args.items(): | |
| if isinstance( | |
| val, | |
| proto.marshal.collections.repeated.RepeatedComposite, | |
| ): | |
| # If so, convert to list | |
| args_dict[key] = [v for v in val] | |
| else: | |
| args_dict[key] = val | |
| try: | |
| args_str = json.dumps(args_dict) | |
| except Exception as e: | |
| raise e | |
| _delta_obj = litellm.utils.Delta( | |
| content=None, | |
| tool_calls=[ | |
| { | |
| "id": f"call_{str(uuid.uuid4())}", | |
| "function": { | |
| "arguments": args_str, | |
| "name": function_call.name, | |
| }, | |
| "type": "function", | |
| } | |
| ], | |
| ) | |
| _streaming_response = StreamingChoices(delta=_delta_obj) | |
| _model_response = ModelResponse(stream=True) | |
| _model_response.choices = [_streaming_response] | |
| response_obj = {"original_chunk": _model_response} | |
| else: | |
| raise original_exception | |
| if ( | |
| hasattr(chunk.candidates[0], "finish_reason") | |
| and chunk.candidates[0].finish_reason.name | |
| != "FINISH_REASON_UNSPECIFIED" | |
| ): # every non-final chunk in vertex ai has this | |
| self.received_finish_reason = chunk.candidates[ | |
| 0 | |
| ].finish_reason.name | |
| except Exception: | |
| if chunk.candidates[0].finish_reason.name == "SAFETY": | |
| raise Exception( | |
| f"The response was blocked by VertexAI. {str(chunk)}" | |
| ) | |
| else: | |
| completion_obj["content"] = str(chunk) | |
| elif self.custom_llm_provider == "petals": | |
| if len(self.completion_stream) == 0: | |
| if self.received_finish_reason is not None: | |
| raise StopIteration | |
| else: | |
| self.received_finish_reason = "stop" | |
| chunk_size = 30 | |
| new_chunk = self.completion_stream[:chunk_size] | |
| completion_obj["content"] = new_chunk | |
| self.completion_stream = self.completion_stream[chunk_size:] | |
| elif self.custom_llm_provider == "palm": | |
| # fake streaming | |
| response_obj = {} | |
| if len(self.completion_stream) == 0: | |
| if self.received_finish_reason is not None: | |
| raise StopIteration | |
| else: | |
| self.received_finish_reason = "stop" | |
| chunk_size = 30 | |
| new_chunk = self.completion_stream[:chunk_size] | |
| completion_obj["content"] = new_chunk | |
| self.completion_stream = self.completion_stream[chunk_size:] | |
| elif self.custom_llm_provider == "ollama_chat": | |
| response_obj = self.handle_ollama_chat_stream(chunk) | |
| completion_obj["content"] = response_obj["text"] | |
| print_verbose(f"completion obj content: {completion_obj['content']}") | |
| if response_obj["is_finished"]: | |
| self.received_finish_reason = response_obj["finish_reason"] | |
| elif self.custom_llm_provider == "triton": | |
| response_obj = self.handle_triton_stream(chunk) | |
| completion_obj["content"] = response_obj["text"] | |
| print_verbose(f"completion obj content: {completion_obj['content']}") | |
| if response_obj["is_finished"]: | |
| self.received_finish_reason = response_obj["finish_reason"] | |
| elif self.custom_llm_provider == "text-completion-openai": | |
| response_obj = self.handle_openai_text_completion_chunk(chunk) | |
| completion_obj["content"] = response_obj["text"] | |
| print_verbose(f"completion obj content: {completion_obj['content']}") | |
| if response_obj["is_finished"]: | |
| self.received_finish_reason = response_obj["finish_reason"] | |
| if response_obj["usage"] is not None: | |
| model_response.usage = litellm.Usage( | |
| prompt_tokens=response_obj["usage"].prompt_tokens, | |
| completion_tokens=response_obj["usage"].completion_tokens, | |
| total_tokens=response_obj["usage"].total_tokens, | |
| ) | |
| elif self.custom_llm_provider == "text-completion-codestral": | |
| response_obj = cast( | |
| Dict[str, Any], | |
| litellm.CodestralTextCompletionConfig()._chunk_parser(chunk), | |
| ) | |
| completion_obj["content"] = response_obj["text"] | |
| print_verbose(f"completion obj content: {completion_obj['content']}") | |
| if response_obj["is_finished"]: | |
| self.received_finish_reason = response_obj["finish_reason"] | |
| if "usage" in response_obj is not None: | |
| model_response.usage = litellm.Usage( | |
| prompt_tokens=response_obj["usage"].prompt_tokens, | |
| completion_tokens=response_obj["usage"].completion_tokens, | |
| total_tokens=response_obj["usage"].total_tokens, | |
| ) | |
| elif self.custom_llm_provider == "azure_text": | |
| response_obj = self.handle_azure_text_completion_chunk(chunk) | |
| completion_obj["content"] = response_obj["text"] | |
| print_verbose(f"completion obj content: {completion_obj['content']}") | |
| if response_obj["is_finished"]: | |
| self.received_finish_reason = response_obj["finish_reason"] | |
| elif self.custom_llm_provider == "cached_response": | |
| response_obj = { | |
| "text": chunk.choices[0].delta.content, | |
| "is_finished": True, | |
| "finish_reason": chunk.choices[0].finish_reason, | |
| "original_chunk": chunk, | |
| "tool_calls": ( | |
| chunk.choices[0].delta.tool_calls | |
| if hasattr(chunk.choices[0].delta, "tool_calls") | |
| else None | |
| ), | |
| } | |
| completion_obj["content"] = response_obj["text"] | |
| if response_obj["tool_calls"] is not None: | |
| completion_obj["tool_calls"] = response_obj["tool_calls"] | |
| print_verbose(f"completion obj content: {completion_obj['content']}") | |
| if hasattr(chunk, "id"): | |
| model_response.id = chunk.id | |
| self.response_id = chunk.id | |
| if hasattr(chunk, "system_fingerprint"): | |
| self.system_fingerprint = chunk.system_fingerprint | |
| if response_obj["is_finished"]: | |
| self.received_finish_reason = response_obj["finish_reason"] | |
| else: # openai / azure chat model | |
| if self.custom_llm_provider == "azure": | |
| if isinstance(chunk, BaseModel) and hasattr(chunk, "model"): | |
| # for azure, we need to pass the model from the orignal chunk | |
| self.model = chunk.model | |
| response_obj = self.handle_openai_chat_completion_chunk(chunk) | |
| if response_obj is None: | |
| return | |
| completion_obj["content"] = response_obj["text"] | |
| print_verbose(f"completion obj content: {completion_obj['content']}") | |
| if response_obj["is_finished"]: | |
| if response_obj["finish_reason"] == "error": | |
| raise Exception( | |
| "{} raised a streaming error - finish_reason: error, no content string given. Received Chunk={}".format( | |
| self.custom_llm_provider, response_obj | |
| ) | |
| ) | |
| self.received_finish_reason = response_obj["finish_reason"] | |
| if response_obj.get("original_chunk", None) is not None: | |
| if hasattr(response_obj["original_chunk"], "id"): | |
| model_response = self.set_model_id( | |
| response_obj["original_chunk"].id, model_response | |
| ) | |
| if hasattr(response_obj["original_chunk"], "system_fingerprint"): | |
| model_response.system_fingerprint = response_obj[ | |
| "original_chunk" | |
| ].system_fingerprint | |
| self.system_fingerprint = response_obj[ | |
| "original_chunk" | |
| ].system_fingerprint | |
| if response_obj["logprobs"] is not None: | |
| model_response.choices[0].logprobs = response_obj["logprobs"] | |
| if response_obj["usage"] is not None: | |
| if isinstance(response_obj["usage"], dict): | |
| setattr( | |
| model_response, | |
| "usage", | |
| litellm.Usage( | |
| prompt_tokens=response_obj["usage"].get( | |
| "prompt_tokens", None | |
| ) | |
| or None, | |
| completion_tokens=response_obj["usage"].get( | |
| "completion_tokens", None | |
| ) | |
| or None, | |
| total_tokens=response_obj["usage"].get( | |
| "total_tokens", None | |
| ) | |
| or None, | |
| ), | |
| ) | |
| elif isinstance(response_obj["usage"], BaseModel): | |
| setattr( | |
| model_response, | |
| "usage", | |
| litellm.Usage(**response_obj["usage"].model_dump()), | |
| ) | |
| model_response.model = self.model | |
| print_verbose( | |
| f"model_response finish reason 3: {self.received_finish_reason}; response_obj={response_obj}" | |
| ) | |
| ## FUNCTION CALL PARSING | |
| if ( | |
| response_obj is not None | |
| and response_obj.get("original_chunk", None) is not None | |
| ): # function / tool calling branch - only set for openai/azure compatible endpoints | |
| # enter this branch when no content has been passed in response | |
| original_chunk = response_obj.get("original_chunk", None) | |
| if hasattr(original_chunk, "id"): | |
| model_response = self.set_model_id( | |
| original_chunk.id, model_response | |
| ) | |
| if hasattr(original_chunk, "provider_specific_fields"): | |
| model_response = ( | |
| self.copy_model_response_level_provider_specific_fields( | |
| original_chunk, model_response | |
| ) | |
| ) | |
| if original_chunk.choices and len(original_chunk.choices) > 0: | |
| delta = original_chunk.choices[0].delta | |
| if delta is not None and ( | |
| delta.function_call is not None or delta.tool_calls is not None | |
| ): | |
| try: | |
| model_response.system_fingerprint = ( | |
| original_chunk.system_fingerprint | |
| ) | |
| ## AZURE - check if arguments is not None | |
| if ( | |
| original_chunk.choices[0].delta.function_call | |
| is not None | |
| ): | |
| if ( | |
| getattr( | |
| original_chunk.choices[0].delta.function_call, | |
| "arguments", | |
| ) | |
| is None | |
| ): | |
| original_chunk.choices[ | |
| 0 | |
| ].delta.function_call.arguments = "" | |
| elif original_chunk.choices[0].delta.tool_calls is not None: | |
| if isinstance( | |
| original_chunk.choices[0].delta.tool_calls, list | |
| ): | |
| for t in original_chunk.choices[0].delta.tool_calls: | |
| if hasattr(t, "functions") and hasattr( | |
| t.functions, "arguments" | |
| ): | |
| if ( | |
| getattr( | |
| t.function, | |
| "arguments", | |
| ) | |
| is None | |
| ): | |
| t.function.arguments = "" | |
| _json_delta = delta.model_dump() | |
| print_verbose(f"_json_delta: {_json_delta}") | |
| if "role" not in _json_delta or _json_delta["role"] is None: | |
| _json_delta[ | |
| "role" | |
| ] = "assistant" # mistral's api returns role as None | |
| if "tool_calls" in _json_delta and isinstance( | |
| _json_delta["tool_calls"], list | |
| ): | |
| for tool in _json_delta["tool_calls"]: | |
| if ( | |
| isinstance(tool, dict) | |
| and "function" in tool | |
| and isinstance(tool["function"], dict) | |
| and ("type" not in tool or tool["type"] is None) | |
| ): | |
| # if function returned but type set to None - mistral's api returns type: None | |
| tool["type"] = "function" | |
| model_response.choices[0].delta = Delta(**_json_delta) | |
| except Exception as e: | |
| verbose_logger.exception( | |
| "litellm.CustomStreamWrapper.chunk_creator(): Exception occured - {}".format( | |
| str(e) | |
| ) | |
| ) | |
| model_response.choices[0].delta = Delta() | |
| elif ( | |
| delta is not None and getattr(delta, "audio", None) is not None | |
| ): | |
| model_response.choices[0].delta.audio = delta.audio | |
| else: | |
| try: | |
| delta = ( | |
| dict() | |
| if original_chunk.choices[0].delta is None | |
| else dict(original_chunk.choices[0].delta) | |
| ) | |
| print_verbose(f"original delta: {delta}") | |
| model_response.choices[0].delta = Delta(**delta) | |
| print_verbose( | |
| f"new delta: {model_response.choices[0].delta}" | |
| ) | |
| except Exception: | |
| model_response.choices[0].delta = Delta() | |
| else: | |
| if ( | |
| self.stream_options is not None | |
| and self.stream_options["include_usage"] is True | |
| ): | |
| return model_response | |
| return | |
| print_verbose( | |
| f"model_response.choices[0].delta: {model_response.choices[0].delta}; completion_obj: {completion_obj}" | |
| ) | |
| print_verbose(f"self.sent_first_chunk: {self.sent_first_chunk}") | |
| ## CHECK FOR TOOL USE | |
| if "tool_calls" in completion_obj and len(completion_obj["tool_calls"]) > 0: | |
| if self.is_function_call is True: # user passed in 'functions' param | |
| completion_obj["function_call"] = completion_obj["tool_calls"][0][ | |
| "function" | |
| ] | |
| completion_obj["tool_calls"] = None | |
| self.tool_call = True | |
| ## RETURN ARG | |
| return self.return_processed_chunk_logic( | |
| completion_obj=completion_obj, | |
| model_response=model_response, # type: ignore | |
| response_obj=response_obj, | |
| ) | |
| except StopIteration: | |
| raise StopIteration | |
| except Exception as e: | |
| traceback.format_exc() | |
| setattr(e, "message", str(e)) | |
| raise exception_type( | |
| model=self.model, | |
| custom_llm_provider=self.custom_llm_provider, | |
| original_exception=e, | |
| ) | |
| def set_logging_event_loop(self, loop): | |
| """ | |
| import litellm, asyncio | |
| loop = asyncio.get_event_loop() # 👈 gets the current event loop | |
| response = litellm.completion(.., stream=True) | |
| response.set_logging_event_loop(loop=loop) # 👈 enables async_success callbacks for sync logging | |
| for chunk in response: | |
| ... | |
| """ | |
| self.logging_loop = loop | |
| def cache_streaming_response(self, processed_chunk, cache_hit: bool): | |
| """ | |
| Caches the streaming response | |
| """ | |
| if not cache_hit and self.logging_obj._llm_caching_handler is not None: | |
| self.logging_obj._llm_caching_handler._sync_add_streaming_response_to_cache( | |
| processed_chunk | |
| ) | |
| async def async_cache_streaming_response(self, processed_chunk, cache_hit: bool): | |
| """ | |
| Caches the streaming response | |
| """ | |
| if not cache_hit and self.logging_obj._llm_caching_handler is not None: | |
| await self.logging_obj._llm_caching_handler._add_streaming_response_to_cache( | |
| processed_chunk | |
| ) | |
| def run_success_logging_and_cache_storage(self, processed_chunk, cache_hit: bool): | |
| """ | |
| Runs success logging in a thread and adds the response to the cache | |
| """ | |
| if litellm.disable_streaming_logging is True: | |
| """ | |
| [NOT RECOMMENDED] | |
| Set this via `litellm.disable_streaming_logging = True`. | |
| Disables streaming logging. | |
| """ | |
| return | |
| ## ASYNC LOGGING | |
| # Create an event loop for the new thread | |
| if self.logging_loop is not None: | |
| future = asyncio.run_coroutine_threadsafe( | |
| self.logging_obj.async_success_handler( | |
| processed_chunk, None, None, cache_hit | |
| ), | |
| loop=self.logging_loop, | |
| ) | |
| future.result() | |
| else: | |
| asyncio.run( | |
| self.logging_obj.async_success_handler( | |
| processed_chunk, None, None, cache_hit | |
| ) | |
| ) | |
| ## SYNC LOGGING | |
| self.logging_obj.success_handler(processed_chunk, None, None, cache_hit) | |
| def finish_reason_handler(self): | |
| model_response = self.model_response_creator() | |
| _finish_reason = self.received_finish_reason or self.intermittent_finish_reason | |
| if _finish_reason is not None: | |
| model_response.choices[0].finish_reason = _finish_reason | |
| else: | |
| model_response.choices[0].finish_reason = "stop" | |
| ## if tool use | |
| if ( | |
| model_response.choices[0].finish_reason == "stop" and self.tool_call | |
| ): # don't overwrite for other - potential error finish reasons | |
| model_response.choices[0].finish_reason = "tool_calls" | |
| return model_response | |
| def __next__(self): # noqa: PLR0915 | |
| cache_hit = False | |
| if ( | |
| self.custom_llm_provider is not None | |
| and self.custom_llm_provider == "cached_response" | |
| ): | |
| cache_hit = True | |
| try: | |
| if self.completion_stream is None: | |
| self.fetch_sync_stream() | |
| while True: | |
| if ( | |
| isinstance(self.completion_stream, str) | |
| or isinstance(self.completion_stream, bytes) | |
| or isinstance(self.completion_stream, ModelResponse) | |
| ): | |
| chunk = self.completion_stream | |
| else: | |
| chunk = next(self.completion_stream) | |
| if chunk is not None and chunk != b"": | |
| print_verbose( | |
| f"PROCESSED CHUNK PRE CHUNK CREATOR: {chunk}; custom_llm_provider: {self.custom_llm_provider}" | |
| ) | |
| response: Optional[ModelResponseStream] = self.chunk_creator( | |
| chunk=chunk | |
| ) | |
| print_verbose(f"PROCESSED CHUNK POST CHUNK CREATOR: {response}") | |
| if response is None: | |
| continue | |
| if self.logging_obj.completion_start_time is None: | |
| self.logging_obj._update_completion_start_time( | |
| completion_start_time=datetime.datetime.now() | |
| ) | |
| ## LOGGING | |
| executor.submit( | |
| self.run_success_logging_and_cache_storage, | |
| response, | |
| cache_hit, | |
| ) # log response | |
| choice = response.choices[0] | |
| if isinstance(choice, StreamingChoices): | |
| self.response_uptil_now += choice.delta.get("content", "") or "" | |
| else: | |
| self.response_uptil_now += "" | |
| self.rules.post_call_rules( | |
| input=self.response_uptil_now, model=self.model | |
| ) | |
| # HANDLE STREAM OPTIONS | |
| self.chunks.append(response) | |
| if hasattr( | |
| response, "usage" | |
| ): # remove usage from chunk, only send on final chunk | |
| # Convert the object to a dictionary | |
| obj_dict = response.dict() | |
| # Remove an attribute (e.g., 'attr2') | |
| if "usage" in obj_dict: | |
| del obj_dict["usage"] | |
| # Create a new object without the removed attribute | |
| response = self.model_response_creator( | |
| chunk=obj_dict, hidden_params=response._hidden_params | |
| ) | |
| # add usage as hidden param | |
| if self.sent_last_chunk is True and self.stream_options is None: | |
| usage = calculate_total_usage(chunks=self.chunks) | |
| response._hidden_params["usage"] = usage | |
| # RETURN RESULT | |
| return response | |
| except StopIteration: | |
| if self.sent_last_chunk is True: | |
| complete_streaming_response = litellm.stream_chunk_builder( | |
| chunks=self.chunks, messages=self.messages | |
| ) | |
| response = self.model_response_creator() | |
| if complete_streaming_response is not None: | |
| setattr( | |
| response, | |
| "usage", | |
| getattr(complete_streaming_response, "usage"), | |
| ) | |
| self.cache_streaming_response( | |
| processed_chunk=complete_streaming_response.model_copy( | |
| deep=True | |
| ), | |
| cache_hit=cache_hit, | |
| ) | |
| executor.submit( | |
| self.logging_obj.success_handler, | |
| complete_streaming_response.model_copy(deep=True), | |
| None, | |
| None, | |
| cache_hit, | |
| ) | |
| else: | |
| executor.submit( | |
| self.logging_obj.success_handler, | |
| response, | |
| None, | |
| None, | |
| cache_hit, | |
| ) | |
| if self.sent_stream_usage is False and self.send_stream_usage is True: | |
| self.sent_stream_usage = True | |
| return response | |
| raise # Re-raise StopIteration | |
| else: | |
| self.sent_last_chunk = True | |
| processed_chunk = self.finish_reason_handler() | |
| if self.stream_options is None: # add usage as hidden param | |
| usage = calculate_total_usage(chunks=self.chunks) | |
| processed_chunk._hidden_params["usage"] = usage | |
| ## LOGGING | |
| executor.submit( | |
| self.run_success_logging_and_cache_storage, | |
| processed_chunk, | |
| cache_hit, | |
| ) # log response | |
| return processed_chunk | |
| except Exception as e: | |
| traceback_exception = traceback.format_exc() | |
| # LOG FAILURE - handle streaming failure logging in the _next_ object, remove `handle_failure` once it's deprecated | |
| threading.Thread( | |
| target=self.logging_obj.failure_handler, args=(e, traceback_exception) | |
| ).start() | |
| if isinstance(e, OpenAIError): | |
| raise e | |
| else: | |
| raise exception_type( | |
| model=self.model, | |
| original_exception=e, | |
| custom_llm_provider=self.custom_llm_provider, | |
| ) | |
| def fetch_sync_stream(self): | |
| if self.completion_stream is None and self.make_call is not None: | |
| # Call make_call to get the completion stream | |
| self.completion_stream = self.make_call(client=litellm.module_level_client) | |
| self._stream_iter = self.completion_stream.__iter__() | |
| return self.completion_stream | |
| async def fetch_stream(self): | |
| if self.completion_stream is None and self.make_call is not None: | |
| # Call make_call to get the completion stream | |
| self.completion_stream = await self.make_call( | |
| client=litellm.module_level_aclient | |
| ) | |
| self._stream_iter = self.completion_stream.__aiter__() | |
| return self.completion_stream | |
| async def __anext__(self): # noqa: PLR0915 | |
| cache_hit = False | |
| if ( | |
| self.custom_llm_provider is not None | |
| and self.custom_llm_provider == "cached_response" | |
| ): | |
| cache_hit = True | |
| try: | |
| if self.completion_stream is None: | |
| await self.fetch_stream() | |
| if is_async_iterable(self.completion_stream): | |
| async for chunk in self.completion_stream: | |
| if chunk == "None" or chunk is None: | |
| raise Exception | |
| elif ( | |
| self.custom_llm_provider == "gemini" | |
| and hasattr(chunk, "parts") | |
| and len(chunk.parts) == 0 | |
| ): | |
| continue | |
| # chunk_creator() does logging/stream chunk building. We need to let it know its being called in_async_func, so we don't double add chunks. | |
| # __anext__ also calls async_success_handler, which does logging | |
| print_verbose(f"PROCESSED ASYNC CHUNK PRE CHUNK CREATOR: {chunk}") | |
| processed_chunk: Optional[ModelResponseStream] = self.chunk_creator( | |
| chunk=chunk | |
| ) | |
| print_verbose( | |
| f"PROCESSED ASYNC CHUNK POST CHUNK CREATOR: {processed_chunk}" | |
| ) | |
| if processed_chunk is None: | |
| continue | |
| if self.logging_obj.completion_start_time is None: | |
| self.logging_obj._update_completion_start_time( | |
| completion_start_time=datetime.datetime.now() | |
| ) | |
| choice = processed_chunk.choices[0] | |
| if isinstance(choice, StreamingChoices): | |
| self.response_uptil_now += choice.delta.get("content", "") or "" | |
| else: | |
| self.response_uptil_now += "" | |
| self.rules.post_call_rules( | |
| input=self.response_uptil_now, model=self.model | |
| ) | |
| self.chunks.append(processed_chunk) | |
| if hasattr( | |
| processed_chunk, "usage" | |
| ): # remove usage from chunk, only send on final chunk | |
| # Convert the object to a dictionary | |
| obj_dict = processed_chunk.dict() | |
| # Remove an attribute (e.g., 'attr2') | |
| if "usage" in obj_dict: | |
| del obj_dict["usage"] | |
| # Create a new object without the removed attribute | |
| processed_chunk = self.model_response_creator(chunk=obj_dict) | |
| print_verbose(f"final returned processed chunk: {processed_chunk}") | |
| return processed_chunk | |
| raise StopAsyncIteration | |
| else: # temporary patch for non-aiohttp async calls | |
| # example - boto3 bedrock llms | |
| while True: | |
| if isinstance(self.completion_stream, str) or isinstance( | |
| self.completion_stream, bytes | |
| ): | |
| chunk = self.completion_stream | |
| else: | |
| chunk = next(self.completion_stream) | |
| if chunk is not None and chunk != b"": | |
| print_verbose(f"PROCESSED CHUNK PRE CHUNK CREATOR: {chunk}") | |
| processed_chunk: Optional[ | |
| ModelResponseStream | |
| ] = self.chunk_creator(chunk=chunk) | |
| print_verbose( | |
| f"PROCESSED CHUNK POST CHUNK CREATOR: {processed_chunk}" | |
| ) | |
| if processed_chunk is None: | |
| continue | |
| choice = processed_chunk.choices[0] | |
| if isinstance(choice, StreamingChoices): | |
| self.response_uptil_now += ( | |
| choice.delta.get("content", "") or "" | |
| ) | |
| else: | |
| self.response_uptil_now += "" | |
| self.rules.post_call_rules( | |
| input=self.response_uptil_now, model=self.model | |
| ) | |
| # RETURN RESULT | |
| self.chunks.append(processed_chunk) | |
| return processed_chunk | |
| except (StopAsyncIteration, StopIteration): | |
| if self.sent_last_chunk is True: | |
| # log the final chunk with accurate streaming values | |
| complete_streaming_response = litellm.stream_chunk_builder( | |
| chunks=self.chunks, messages=self.messages | |
| ) | |
| response = self.model_response_creator() | |
| if complete_streaming_response is not None: | |
| setattr( | |
| response, | |
| "usage", | |
| getattr(complete_streaming_response, "usage"), | |
| ) | |
| asyncio.create_task( | |
| self.async_cache_streaming_response( | |
| processed_chunk=complete_streaming_response.model_copy( | |
| deep=True | |
| ), | |
| cache_hit=cache_hit, | |
| ) | |
| ) | |
| if self.sent_stream_usage is False and self.send_stream_usage is True: | |
| self.sent_stream_usage = True | |
| return response | |
| asyncio.create_task( | |
| self.logging_obj.async_success_handler( | |
| complete_streaming_response, | |
| cache_hit=cache_hit, | |
| start_time=None, | |
| end_time=None, | |
| ) | |
| ) | |
| executor.submit( | |
| self.logging_obj.success_handler, | |
| complete_streaming_response, | |
| cache_hit=cache_hit, | |
| start_time=None, | |
| end_time=None, | |
| ) | |
| raise StopAsyncIteration # Re-raise StopIteration | |
| else: | |
| self.sent_last_chunk = True | |
| processed_chunk = self.finish_reason_handler() | |
| return processed_chunk | |
| except httpx.TimeoutException as e: # if httpx read timeout error occues | |
| traceback_exception = traceback.format_exc() | |
| ## ADD DEBUG INFORMATION - E.G. LITELLM REQUEST TIMEOUT | |
| traceback_exception += "\nLiteLLM Default Request Timeout - {}".format( | |
| litellm.request_timeout | |
| ) | |
| if self.logging_obj is not None: | |
| ## LOGGING | |
| threading.Thread( | |
| target=self.logging_obj.failure_handler, | |
| args=(e, traceback_exception), | |
| ).start() # log response | |
| # Handle any exceptions that might occur during streaming | |
| asyncio.create_task( | |
| self.logging_obj.async_failure_handler(e, traceback_exception) | |
| ) | |
| raise e | |
| except Exception as e: | |
| traceback_exception = traceback.format_exc() | |
| if self.logging_obj is not None: | |
| ## LOGGING | |
| threading.Thread( | |
| target=self.logging_obj.failure_handler, | |
| args=(e, traceback_exception), | |
| ).start() # log response | |
| # Handle any exceptions that might occur during streaming | |
| asyncio.create_task( | |
| self.logging_obj.async_failure_handler(e, traceback_exception) # type: ignore | |
| ) | |
| ## Map to OpenAI Exception | |
| raise exception_type( | |
| model=self.model, | |
| custom_llm_provider=self.custom_llm_provider, | |
| original_exception=e, | |
| completion_kwargs={}, | |
| extra_kwargs={}, | |
| ) | |
| def _strip_sse_data_from_chunk(chunk: Optional[str]) -> Optional[str]: | |
| """ | |
| Strips the 'data: ' prefix from Server-Sent Events (SSE) chunks. | |
| Some providers like sagemaker send it as `data:`, need to handle both | |
| SSE messages are prefixed with 'data: ' which is part of the protocol, | |
| not the actual content from the LLM. This method removes that prefix | |
| and returns the actual content. | |
| Args: | |
| chunk: The SSE chunk that may contain the 'data: ' prefix (string or bytes) | |
| Returns: | |
| The chunk with the 'data: ' prefix removed, or the original chunk | |
| if no prefix was found. Returns None if input is None. | |
| See OpenAI Python Ref for this: https://github.com/openai/openai-python/blob/041bf5a8ec54da19aad0169671793c2078bd6173/openai/api_requestor.py#L100 | |
| """ | |
| if chunk is None: | |
| return None | |
| if isinstance(chunk, str): | |
| # OpenAI sends `data: ` | |
| if chunk.startswith("data: "): | |
| # Strip the prefix and any leading whitespace that might follow it | |
| _length_of_sse_data_prefix = len("data: ") | |
| return chunk[_length_of_sse_data_prefix:] | |
| elif chunk.startswith("data:"): | |
| # Sagemaker sends `data:`, no trailing whitespace | |
| _length_of_sse_data_prefix = len("data:") | |
| return chunk[_length_of_sse_data_prefix:] | |
| return chunk | |
| def calculate_total_usage(chunks: List[ModelResponse]) -> Usage: | |
| """Assume most recent usage chunk has total usage uptil then.""" | |
| prompt_tokens: int = 0 | |
| completion_tokens: int = 0 | |
| for chunk in chunks: | |
| if "usage" in chunk: | |
| if "prompt_tokens" in chunk["usage"]: | |
| prompt_tokens = chunk["usage"].get("prompt_tokens", 0) or 0 | |
| if "completion_tokens" in chunk["usage"]: | |
| completion_tokens = chunk["usage"].get("completion_tokens", 0) or 0 | |
| returned_usage_chunk = Usage( | |
| prompt_tokens=prompt_tokens, | |
| completion_tokens=completion_tokens, | |
| total_tokens=prompt_tokens + completion_tokens, | |
| ) | |
| return returned_usage_chunk | |
| def generic_chunk_has_all_required_fields(chunk: dict) -> bool: | |
| """ | |
| Checks if the provided chunk dictionary contains all required fields for GenericStreamingChunk. | |
| :param chunk: The dictionary to check. | |
| :return: True if all required fields are present, False otherwise. | |
| """ | |
| _all_fields = GChunk.__annotations__ | |
| decision = all(key in _all_fields for key in chunk) | |
| return decision | |