Spaces:
Sleeping
Sleeping
| from typing import Any, Dict, List | |
| import copy | |
| import json | |
| import os | |
| import time | |
| import traceback | |
| from openai import OpenAI | |
| import tiktoken | |
| from lpm_kernel.api.services.user_llm_config_service import UserLLMConfigService | |
| from lpm_kernel.configs.config import Config | |
| from lpm_kernel.L0.models import InsighterInput, SummarizerInput | |
| from lpm_kernel.L0.prompt import * | |
| from lpm_kernel.utils import ( | |
| DataType, | |
| IntentType, | |
| TokenParagraphSplitter, | |
| TokenTextSplitter, | |
| cal_upperbound, | |
| chunk_filter, | |
| equidistant_filter, | |
| get_safe_content_turncate, | |
| get_summarize_title_keywords, | |
| select_language_desc, | |
| ) | |
| from lpm_kernel.configs.logging import get_train_process_logger | |
| logger = get_train_process_logger() | |
| class L0Generator: | |
| def __init__(self, preferred_language="English"): | |
| """Initialize L0Generator with language preference. | |
| Args: | |
| preferred_language: The language to use for generation, defaults to English. | |
| """ | |
| self.preferred_language = preferred_language | |
| # Initialize tokenizer | |
| self._tokenizer = tiktoken.get_encoding("cl100k_base") # OpenAI default tokenizer | |
| self.lf_prompt_image_parser = insight_image_parser | |
| self.lf_prompt_image_overview = insight_image_overview | |
| self.lf_prompt_image_breakdown = insight_image_breakdown | |
| self.lf_prompt_audio_parser = insight_audio_parser | |
| self.lf_prompt_audio_overview = insight_audio_overview | |
| self.lf_prompt_audio_breakdown = insight_audio_breakdown | |
| self.lf_prompt_doc_overview = insight_doc_overview | |
| self.lf_prompt_doc_breakdown = insight_doc_breakdown | |
| self.max_retries_summarize = 2 | |
| self.timeout_summarize = 30 | |
| self.user_llm_config_service = UserLLMConfigService() | |
| self.user_llm_config = self.user_llm_config_service.get_available_llm() | |
| if self.user_llm_config is None: | |
| self.client = None | |
| self.model_name = None | |
| else: | |
| self.client = OpenAI( | |
| api_key=self.user_llm_config.chat_api_key, | |
| base_url=self.user_llm_config.chat_endpoint, | |
| ) | |
| self.model_name = self.user_llm_config.chat_model_name | |
| def _insighter_image( | |
| self, bio: Dict[str, str], content: str, max_retries: int, request_timeout: int, file_content: str | |
| ) -> tuple[str, str]: | |
| """Process image content to generate insights. | |
| Args: | |
| bio: Dictionary containing user biography information | |
| content: Text content related to the image | |
| max_retries: Maximum number of API call retries | |
| request_timeout: Timeout for API calls in seconds | |
| file_content: URL or base64 content of the image | |
| Returns: | |
| Tuple of (summary, title) strings | |
| """ | |
| hint_prompt = f"# Hint #\n{content}\n# Instruction #\n" | |
| language_desc = select_language_desc(self.preferred_language) | |
| segment_list = [ | |
| self.lf_prompt_image_parser, | |
| self.lf_prompt_image_overview, | |
| self.lf_prompt_image_breakdown, | |
| ] | |
| messages_list = [] | |
| for i in range(len(segment_list)): | |
| image_parser_prompt = segment_list[i] | |
| if "__global_bio__" in image_parser_prompt: | |
| image_parser_prompt = image_parser_prompt.replace( | |
| "__about_me__", bio["about_me"] | |
| ) | |
| image_parser_prompt = image_parser_prompt.replace( | |
| "__global_bio__", bio["global_bio"] | |
| ) | |
| image_parser_prompt = image_parser_prompt.replace( | |
| "__status_bio__", bio["status_bio"] | |
| ) | |
| # system prompt | |
| language = language_desc if i != 0 else "English" | |
| messages = [ | |
| {"role": "system", "content": image_parser_prompt}, | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": hint_prompt | |
| + "Here are some images and their Hint. Please follow the WorkFlow and do your best. Ensure that your response is in a parseable JSON format." | |
| + language, | |
| } | |
| ], | |
| }, | |
| ] | |
| if i == 0: | |
| new_messages = copy.deepcopy(messages) | |
| new_messages[-1]["content"].append( | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": file_content, # file_content is the image url | |
| }, | |
| } | |
| ) | |
| messages_list.append(new_messages) | |
| else: | |
| messages[-1]["content"].append( | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": file_content, # file_content is the image url | |
| }, | |
| } | |
| ) | |
| messages_list.append(messages) | |
| results = [] | |
| for messages in messages_list: | |
| response = self.client.chat.completions.create( | |
| model=self.model_name, | |
| messages=messages, | |
| max_tokens=4096, | |
| temperature=0.0, | |
| max_retries=max_retries, | |
| timeout=request_timeout, | |
| response_format={"type": "json_object"}, | |
| ) | |
| results.append(response.choices[0].message.content) | |
| try: | |
| images_intent_list = [] | |
| for image_id in range(len(results) - 2): | |
| images_intent_list.append(results[image_id]["image"].get("Step 3", "")) | |
| title = results[-2].get("Title", "") | |
| opening = results[-2].get("Opening", "") | |
| insight = results[-1].get("Insight", []) | |
| insight = "- " + "\n- ".join(insight) if insight else "" | |
| summary = "\n\n".join([opening, insight]) | |
| return summary, title | |
| except Exception as e: | |
| logger.error(f"Unexpected error: {e}") | |
| raise RuntimeError(f"Unexpected error: {e}") | |
| def _insighter_audio( | |
| self, bio: str, content: str, max_retries: int, request_timeout: int, file_content: Dict[str, Any] | |
| ) -> tuple[str, str]: | |
| """Process audio content to generate insights. | |
| Args: | |
| bio: User biography information | |
| content: Text content related to the audio | |
| max_retries: Maximum number of API call retries | |
| request_timeout: Timeout for API calls in seconds | |
| file_content: Dictionary containing audio metadata and content | |
| Returns: | |
| Tuple of (insight, title) strings | |
| """ | |
| user_info = """# Hint # | |
| "{content}" | |
| # Speech # | |
| "{speech}" | |
| # User Instruction # | |
| '{user_input}' | |
| """ | |
| user_input = "Here are some speech and their hint. Please follow the WorkFlow and do your best. Ensure that your response is in a parseable JSON format. " | |
| language_desc = select_language_desc(self.preferred_language) | |
| speech_dict = file_content["metadata"]["audio"].get("segmentList", []) | |
| speech = "" | |
| end_point = 0 | |
| # Raise exception if speech is empty or too short | |
| if not speech_dict: | |
| raise ValueError("Invalid input: speech must not be empty") | |
| for segment in speech_dict: | |
| start_time = int(segment["segmentStartTime"]) | |
| end_time = int(segment["segmentEndTime"]) | |
| segment_content = segment["segmentContent"] | |
| tmp = f"[{start_time}-{end_time}]: {segment_content}\n" | |
| speech += tmp | |
| end_point = int(end_time) | |
| logger.info(f"length of speech: {end_point}") | |
| # Split speech over 1200s into segments, maximum 1200s each | |
| num_segments = 1 | |
| if end_point > 1200: | |
| num_segments = max(2, int(round(end_point / 1200.0))) | |
| segment_duration = end_point / num_segments | |
| speech_segments = ["" for _ in range(num_segments)] | |
| for segment in speech_dict: | |
| start_time = int(segment["segmentStartTime"]) | |
| end_time = int(segment["segmentEndTime"]) | |
| segment_content = segment["segmentContent"] | |
| segment_index = min( | |
| num_segments - 1, int(start_time // segment_duration) | |
| ) | |
| speech_segments[ | |
| segment_index | |
| ] += f"[{start_time}-{end_time}]: {segment_content}\n" | |
| user_info_overall = user_info.format( | |
| content=content, speech=speech, user_input=user_input | |
| ) | |
| audio_parser_prompt_overview = self.lf_prompt_audio_overview.replace( | |
| "__bio__", bio | |
| ) | |
| messages_overall = [ | |
| {"role": "system", "content": audio_parser_prompt_overview}, | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": user_info_overall + language_desc} | |
| ], | |
| }, | |
| ] | |
| message_list = [messages_overall] | |
| max_retry_list = [2] | |
| for i in range(num_segments): | |
| user_info_segment = user_info.format( | |
| content=content, speech=speech_segments[i], user_input=user_input | |
| ) | |
| messages_segment = [ | |
| {"role": "system", "content": self.lf_prompt_audio_breakdown}, | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": user_info_segment + language_desc} | |
| ], | |
| }, | |
| ] | |
| message_list.append(messages_segment) | |
| max_retry_list.append(2) | |
| results = [] | |
| for messages in message_list: | |
| response = self.client.chat.completions.create( | |
| model=self.model_name, | |
| messages=messages, | |
| max_tokens=4096, | |
| temperature=0.0, | |
| max_retries=max_retries, | |
| timeout=request_timeout, | |
| response_format={"type": "json_object"}, | |
| ) | |
| results.append(response.choices[0].message.content) | |
| try: | |
| title = results[0].get("Title", "") | |
| overview = results[0].get("Overview", "") | |
| breakdown = {} | |
| for res_p in results[1:]: | |
| breakdown = {**breakdown, **res_p.get("Breakdown", {})} | |
| tmpl = "{}\n{}" | |
| formated_breakdown = "" | |
| for subtitle, key_points in breakdown.items(): | |
| formated_breakdown += f"\n**{subtitle}**\n" | |
| for key_point in key_points: | |
| if len(key_point) != 3: | |
| raise ValueError( | |
| f"Unexpected length of key_point: {key_point}" | |
| ) | |
| timestamps = ( | |
| key_point[2].replace(",", ",").replace(" ", "").split(",") | |
| ) | |
| std_timestamps = "".join( | |
| [ | |
| f"[_TIMESTAMP_]('{timestamp}')" | |
| for timestamp in timestamps | |
| ] | |
| ) | |
| formated_breakdown += ( | |
| f"- **{key_point[0]}**: {key_point[1]}{std_timestamps}\n" | |
| ) | |
| insight = tmpl.format(overview, formated_breakdown) | |
| return insight, title | |
| except Exception as e: | |
| logger.error(f"Unexpected error: {e}") | |
| raise RuntimeError(f"Unexpected error: {e}") | |
| else: | |
| user_info = user_info.format( | |
| content=content, speech=speech, user_input=user_input | |
| ) | |
| prompt_audio_parser = self.lf_prompt_audio_parser.replace("__bio__", bio) | |
| messages = [ | |
| {"role": "system", "content": prompt_audio_parser}, | |
| { | |
| "role": "user", | |
| "content": [{"type": "text", "text": user_info + language_desc}], | |
| }, | |
| ] | |
| response = self.client.chat.completions.create( | |
| model=self.model_name, | |
| messages=messages, | |
| max_tokens=4096, | |
| temperature=0.0, | |
| max_retries=max_retries, | |
| timeout=request_timeout, | |
| response_format={"type": "json_object"}, | |
| ) | |
| api_res_dict = response.choices[0].message.content | |
| try: | |
| title = api_res_dict.get("Title", "") | |
| overview = api_res_dict.get("Overview", "") | |
| breakdown = api_res_dict.get("Breakdown", {}) | |
| tmpl = "{}\n{}" | |
| formated_breakdown = "" | |
| for subtitle, key_points in breakdown.items(): | |
| formated_breakdown += f"\n**{subtitle}**\n" | |
| for key_point in key_points: | |
| if len(key_point) != 3: | |
| raise ValueError( | |
| f"Unexpected length of key_point: {key_point}" | |
| ) | |
| timestamps = ( | |
| key_point[2].replace(",", ",").replace(" ", "").split(",") | |
| ) | |
| std_timestamps = "".join( | |
| [ | |
| f"[_TIMESTAMP_]('{timestamp}')" | |
| for timestamp in timestamps | |
| ] | |
| ) | |
| formated_breakdown += ( | |
| f"- **{key_point[0]}**: {key_point[1]}{std_timestamps}\n" | |
| ) | |
| insight = tmpl.format(overview, formated_breakdown) | |
| return insight, title | |
| except Exception as e: | |
| logger.error(f"Unexpected error: {e}") | |
| raise RuntimeError(f"Unexpected error: {e}") | |
| def _insighter_doc( | |
| self, | |
| bio: Dict[str, str], | |
| content: str, | |
| max_retries: int, | |
| request_timeout: int, | |
| file_content: Dict[str, Any], | |
| max_tokens: int = 3000, | |
| filter=equidistant_filter, | |
| ) -> tuple[str, str]: | |
| """Process document content to generate insights. | |
| Args: | |
| bio: Dictionary containing user biography information | |
| content: Text content or hint about the document | |
| max_retries: Maximum number of API call retries | |
| request_timeout: Timeout for API calls in seconds | |
| file_content: Dictionary containing document content | |
| max_tokens: Maximum tokens for generation | |
| filter: Function to filter document chunks | |
| Returns: | |
| Tuple of (insight, title) strings | |
| """ | |
| user_info = """# Hint # | |
| "{hint}" | |
| # Content # | |
| "{content}" | |
| # User Instruction # | |
| "{user_input}" | |
| """ | |
| user_input = "Here are some content and their hint. Please follow the WorkFlow and do your best. Ensure that your response is in a parseable JSON format. " | |
| language_desc = select_language_desc(self.preferred_language) | |
| segment_list = [self.lf_prompt_doc_overview, self.lf_prompt_doc_breakdown] | |
| messages_list = [] | |
| max_retry_list = [] | |
| alarm_mesg_list = [] | |
| for i in range(len(segment_list)): | |
| DOC_PARSER_PROMPT = segment_list[i] | |
| raw_text = DOC_PARSER_PROMPT + user_input + user_info + language_desc | |
| upper_bound = cal_upperbound( | |
| model_limit=7000 + max_tokens, | |
| generage_limit=max_tokens, | |
| tolerance=500, | |
| raw=raw_text, | |
| ) | |
| # Chunk and truncate | |
| chunk_size = 512 | |
| chunk_num = upper_bound // chunk_size + 1 | |
| if self.model_name is None: | |
| self.user_llm_config = self.user_llm_config_service.get_available_llm() | |
| self.client = OpenAI( | |
| api_key=self.user_llm_config.chat_api_key, | |
| base_url=self.user_llm_config.chat_endpoint, | |
| ) | |
| self.model_name = self.user_llm_config.chat_model_name | |
| spliter = TokenTextSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=0, | |
| model_name=self.model_name.replace("openai/", ""), | |
| ) | |
| tmp = file_content.get("content", "") | |
| doc_content = "\n".join(tmp) | |
| splits = spliter.split_text(doc_content) | |
| use_content = chunk_filter( | |
| splits, filter, filtered_chunks_n=chunk_num, separator="\n", spacer="\n" | |
| ) | |
| doc_content = get_safe_content_turncate( | |
| use_content, self.model_name.replace("openai/", ""), max_tokens=upper_bound | |
| ) | |
| user_content = user_info.format( | |
| hint=content, content=doc_content, user_input=user_input | |
| ) | |
| if "__global_bio__" in DOC_PARSER_PROMPT: | |
| DOC_PARSER_PROMPT = DOC_PARSER_PROMPT.replace( | |
| "__about_me__", bio["about_me"] | |
| ) | |
| DOC_PARSER_PROMPT = DOC_PARSER_PROMPT.replace( | |
| "__global_bio__", bio["global_bio"] | |
| ) | |
| DOC_PARSER_PROMPT = DOC_PARSER_PROMPT.replace( | |
| "__status_bio__", bio["status_bio"] | |
| ) | |
| messages = [ | |
| {"role": "system", "content": DOC_PARSER_PROMPT}, | |
| {"role": "user", "content": user_content + language_desc}, | |
| ] | |
| messages_list.append(messages) | |
| results = [] | |
| for messages in messages_list: | |
| response = self.client.chat.completions.create( | |
| model=self.model_name, | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=0.0, | |
| timeout=request_timeout, | |
| response_format={"type": "json_object"}, | |
| ) | |
| results.append(json.loads(response.choices[0].message.content)) | |
| try: | |
| title = results[0].get("Title") | |
| overview = results[0].get("Overview") | |
| breakdown = results[1].get("Breakdown", {}) | |
| tmpl = "{}\n{}" | |
| formated_breakdown = "" | |
| for subtitle, key_points in breakdown.items(): | |
| formated_breakdown += f"\n**{subtitle}**\n" | |
| if not isinstance(key_points, list): | |
| raise RuntimeError( | |
| f"Unexpected generated result: {json.dumps(breakdown)}" | |
| ) | |
| for key_point in key_points: | |
| if isinstance(key_point, list) and len(key_point) == 2: | |
| formated_breakdown += f"- **{key_point[0]}**: {key_point[1]}\n" | |
| else: | |
| raise RuntimeError( | |
| f"Unexpected generated result in key_points: {json.dumps(breakdown)} expected a list of length 2." | |
| ) | |
| insight = tmpl.format(overview, formated_breakdown) | |
| return insight, title | |
| except Exception as e: | |
| logger.error(traceback.format_exc()) | |
| raise RuntimeError(f"Unexpected error: {e}") | |
| def insighter(self, inputs: InsighterInput) -> Dict[str, str]: | |
| """Generate insights from document inputs. | |
| Args: | |
| inputs: Structured input parameters containing file and bio information | |
| Returns: | |
| Dictionary containing title and insight | |
| """ | |
| try: | |
| datatype = DataType(inputs.file_info.data_type) | |
| except ValueError: | |
| logger.warning( | |
| "Unsupported dataType: %s. Processing as DOCUMENT by default", | |
| inputs.file_info.data_type, | |
| ) | |
| datatype = DataType.DOCUMENT | |
| logger.info("input filename=%s", inputs.file_info.filename) | |
| logger.info( | |
| "input content=%s (first 100 characters)", | |
| inputs.file_info.content.strip()[:100], | |
| ) | |
| bio = { | |
| "global_bio": inputs.bio_info.global_bio.split("### Conclusion ###")[ | |
| -1 | |
| ].strip("\n ") | |
| if inputs.bio_info.global_bio | |
| else "User has no biography right now", | |
| "status_bio": inputs.bio_info.status_bio.split( | |
| "** User Activities Overview **" | |
| )[-1] | |
| .strip("** Physical and mental health status **")[0] | |
| .strip("\n") | |
| if inputs.bio_info.status_bio | |
| else "", | |
| "about_me": inputs.bio_info.about_me.strip("\n") | |
| if inputs.bio_info.about_me | |
| else "", | |
| } | |
| text_len = len(self._tokenizer.encode(inputs.file_info.content)) | |
| if text_len > 20 or inputs.file_info.file_content: | |
| if datatype == DataType.IMAGE: | |
| insight, title = self._insighter_image( | |
| bio=bio, | |
| content=inputs.file_info.content, | |
| max_retries=self.max_retries_summarize, | |
| request_timeout=30, | |
| file_content=inputs.file_info.file_content, | |
| ) | |
| elif datatype == DataType.AUDIO: | |
| insight, title = self._insighter_audio( | |
| bio=bio, | |
| content=inputs.file_info.content, | |
| max_retries=self.max_retries_summarize, | |
| request_timeout=45, | |
| file_content=inputs.file_info.file_content, | |
| ) | |
| else: | |
| insight, title = self._insighter_doc( | |
| bio=bio, | |
| content=inputs.file_info.content, | |
| max_retries=self.max_retries_summarize, | |
| request_timeout=45, | |
| file_content=inputs.file_info.file_content, | |
| ) | |
| else: | |
| logger.warning("less than 20 characters, use filename as title") | |
| title, insight = inputs.file_info.content, inputs.file_info.content | |
| if inputs.file_info.filename: | |
| logger.info("use filename as title") | |
| title = inputs.file_info.filename | |
| t1 = time.time() | |
| logger.warning( | |
| "Insighter: title=%s, summary=%s", | |
| title, | |
| insight, | |
| ) | |
| return { | |
| "title": title, | |
| "insight": insight, | |
| } | |
| def __serial_summary_filter( | |
| self, summaries: List[str], chunks_list: List[List[str]], separator: str = "", filtered_chunks_n: int = 6 | |
| ) -> List[str]: | |
| """Filter and combine summaries with relevant chunks. | |
| Args: | |
| summaries: List of summary strings | |
| chunks_list: List of lists containing text chunks | |
| separator: String to join chunks and summaries | |
| filtered_chunks_n: Maximum number of chunks to filter | |
| Returns: | |
| List of combined content strings | |
| """ | |
| # Skip summary when chunks length is 0, otherwise combine summary with some adjacent chunks | |
| use_contents = [] | |
| for summary, chunks in zip(summaries, chunks_list): | |
| # When chunks exceed filtered_chunks_n-1, this is not the final summarization round | |
| if len(chunks) > filtered_chunks_n - 1: | |
| use_content = separator.join([summary, *chunks[:5]]) | |
| # When chunks are between 0 and filtered_chunks_n-1, this is the final round | |
| elif len(chunks) > 0: | |
| use_content = separator.join([summary, *chunks]) | |
| else: | |
| # When chunks are 0, summary is done, skip this round to avoid using resources | |
| continue | |
| use_contents.append(use_content) | |
| return use_contents | |
| def _summarize_title_abstract_keywords( | |
| self, | |
| content: str or List[str], | |
| filename: str, | |
| file_type: str, | |
| request_timeout: int, | |
| max_retries: int, | |
| preferred_language: str, | |
| filter=equidistant_filter, | |
| ) -> tuple[str, str, List[str]] or List[tuple[str, str, List[str]]]: | |
| """Generate title, abstract and keywords from content. | |
| Args: | |
| content: String or list of strings to summarize | |
| filename: Name of the file being summarized | |
| file_type: Type of file (document, image, audio, etc.) | |
| request_timeout: Timeout for API calls in seconds | |
| max_retries: Maximum number of API call retries | |
| preferred_language: Language to use for generation | |
| filter: Function to filter content chunks | |
| Returns: | |
| Single tuple or list of tuples containing (title, summary, keywords) | |
| """ | |
| upper_limit = 8192 | |
| filtered_chunks_n = 14 | |
| max_tokens = 512 | |
| if isinstance(content, str): | |
| inputs = [content] | |
| else: | |
| inputs = content | |
| filename = filename or "" | |
| if not filename: | |
| filename_desc = "" | |
| else: | |
| filename_desc = f"Filename: {filename}\n" | |
| def get_text_generate(_requests): | |
| language_desc = "" | |
| prompt = NOTE_SUMMARY_PROMPT.replace("{language_desc}", language_desc) | |
| messages = [ | |
| [ | |
| {"role": "user", "content": prompt.format(**_request)}, | |
| { | |
| "role": "system", | |
| "content": f"""User Preferred Language: {preferred_language}, you should use this language to generate the title, summary. | |
| Don't to start the summary section with sentences like "This document", "This text" or "This article", but describe the content directly.""", | |
| }, | |
| ] | |
| for _request in _requests | |
| ] | |
| logger.info("generate inputs: %s", _requests) | |
| responses = [ | |
| self.client.chat.completions.create( | |
| model=self.model_name, | |
| messages=msg, | |
| max_tokens=max_tokens, | |
| temperature=0.0, | |
| timeout=request_timeout, | |
| ) | |
| for msg in messages | |
| ] | |
| return responses | |
| spliter = TokenParagraphSplitter(chunk_size=512, chunk_overlap=0) | |
| if filter is self.__serial_summary_filter: | |
| # Serial fine-grained full-text summary | |
| chunks_list = [spliter.split_text(each) for each in inputs] | |
| # Maximum number of summaries needed [K summaries can handle docs with 5K+1 chunks] | |
| max_summary_times = int( | |
| (max([len(chunks) for chunks in chunks_list]) + 4) / 5 | |
| ) | |
| results = [() for i in range(len(inputs))] | |
| # Initialize summaries with first chunk content | |
| # Set to empty string if chunks length is 0 | |
| summaries = [chunks[0] if len(chunks) > 0 else "" for chunks in chunks_list] | |
| # When chunks length is 1, set to [""], requires one summary | |
| # When chunks length is 0, set to empty list, no summary needed | |
| chunks_list = [ | |
| [] if len(chunks) == 0 else ([""] if len(chunks) == 1 else chunks[1:]) | |
| for chunks in chunks_list | |
| ] | |
| for i in range(max_summary_times): | |
| use_contents = self.__serial_summary_filter(summaries, chunks_list) | |
| requests = [ | |
| { | |
| "content": use_content, | |
| "file_type": file_type, | |
| "filename_desc": filename_desc, | |
| } | |
| for use_content in use_contents | |
| ] | |
| responses = get_text_generate(requests) | |
| tmp_results = get_summarize_title_keywords(responses) | |
| for doc_id, chunks in enumerate(chunks_list): | |
| index = 0 | |
| # Documents participating in this round of summaries | |
| if len(chunks) > 0: | |
| # Update result (title, abstract, keywords) | |
| results[doc_id] = tmp_results[index] | |
| # Update summary list | |
| summaries[doc_id] = tmp_results[index][1] | |
| # Update chunks list to be summarized | |
| chunks_list[doc_id] = chunks_list[doc_id][5:] | |
| index += 1 | |
| else: | |
| requests = [] | |
| for each in inputs: | |
| splits = spliter.split_text(each) | |
| # Sampling-based full text summary approach | |
| # Keep beginning and end, can skip middle. End is useful for company signatures and information, reducing model hallucination | |
| # Also keep one extra chunk at the end to avoid issues with short final chunks providing insufficient information | |
| use_content = chunk_filter( | |
| splits, | |
| filter, | |
| filtered_chunks_n=filtered_chunks_n, | |
| separator="\n", | |
| spacer="\n……\n……\n……\n", | |
| ) | |
| if self.model_name is None: | |
| self.user_llm_config = self.user_llm_config_service.get_available_llm() | |
| self.client = OpenAI( | |
| api_key=self.user_llm_config.chat_api_key, | |
| base_url=self.user_llm_config.chat_endpoint, | |
| ) | |
| self.model_name = self.user_llm_config.chat_model_name | |
| requests.append( | |
| { | |
| "content": get_safe_content_turncate( | |
| use_content, | |
| self.model_name.replace("openai/", ""), | |
| max_tokens=upper_limit, | |
| ), | |
| "file_type": file_type, | |
| "filename_desc": filename_desc, | |
| } | |
| ) | |
| responses = get_text_generate(requests) | |
| results = get_summarize_title_keywords(responses) | |
| logger.debug("results: %s", results) | |
| if isinstance(content, str): | |
| return results[0] | |
| else: | |
| return results | |
| def summarizer(self, inputs: SummarizerInput) -> Dict[str, Any]: | |
| """Generate summary from document inputs. | |
| Args: | |
| inputs: Structured input parameters containing file information and insight | |
| Returns: | |
| Dictionary containing title, summary and keywords | |
| """ | |
| bottom_summary_len = 200 | |
| datatype = inputs.file_info.data_type | |
| filename = inputs.file_info.filename | |
| md = inputs.file_info.content # hint | |
| inner_content = inputs.file_info.file_content.get("content") | |
| insight = inputs.insight | |
| md = md + "\n" + inner_content | |
| md = f"insight: {insight}\ncontent: {md}" | |
| try: | |
| datatype = DataType(datatype) | |
| except ValueError: | |
| logger.warning("Unsupported dataType: %s. Processing as DOCUMENT by default", datatype) | |
| datatype = DataType.DOCUMENT | |
| logger.info("input filename=%s", filename) | |
| logger.info("input content=%s (first 100 characters)", md.strip()[:100]) | |
| t0 = time.time() | |
| bottom_summary = self._tokenizer.decode( | |
| self._tokenizer.encode(insight)[:bottom_summary_len] | |
| ) | |
| if len(self._tokenizer.encode(md)) > 20: | |
| title, summary, keywords = self._summarize_title_abstract_keywords( | |
| md, | |
| filename=filename, | |
| file_type=datatype.value, | |
| request_timeout=self.timeout_summarize, | |
| max_retries=self.max_retries_summarize, | |
| preferred_language=self.preferred_language, | |
| ) | |
| if not (title or summary or keywords): | |
| logger.warning("summary failed, use insight as summary") | |
| title, summary, keywords = filename, bottom_summary, [] | |
| if filename: | |
| title = filename | |
| else: | |
| logger.warning("less than 20 characters, use filename as title") | |
| title, summary, keywords = md, md, [] | |
| if filename: | |
| title = filename | |
| t1 = time.time() | |
| logger.warning( | |
| "MarkdownChunkAPI summarize_title_abstract_keywords(): time spent %.2f seconds, title=%s, summary=%s", | |
| t1 - t0, | |
| title, | |
| summary, | |
| ) | |
| return {"title": title, "summary": summary, "keywords": keywords} | |