Spaces:
Runtime error
Runtime error
| # flake8: noqa E501 | |
| import re | |
| from typing import Dict | |
| import json | |
| from builder_prompt import BuilderPromptGenerator | |
| from builder_prompt_zh import ZhBuilderPromptGenerator | |
| from config_utils import parse_configuration | |
| from help_tools import LogoGeneratorTool, config_conversion | |
| from modelscope_agent import prompt_generator_register | |
| from modelscope_agent.agent import AgentExecutor | |
| from modelscope_agent.agent_types import AgentType | |
| from modelscope_agent.llm import LLMFactory | |
| from modelscope_agent.prompt import MessagesGenerator | |
| from modelscope_agent.utils.logger import agent_logger as logger | |
| prompts = { | |
| 'BuilderPromptGenerator': BuilderPromptGenerator, | |
| 'ZhBuilderPromptGenerator': ZhBuilderPromptGenerator, | |
| } | |
| prompt_generator_register(prompts) | |
| SYSTEM = 'You are a helpful assistant.' | |
| LOGO_TOOL_NAME = 'logo_designer' | |
| ANSWER = 'Answer' | |
| CONFIG = 'Config' | |
| ASSISTANT_PROMPT = """{}: <answer>\n{}: <config>\nRichConfig: <rich_config>""".format( | |
| ANSWER, CONFIG) | |
| UPDATING_CONFIG_STEP = '🚀Updating Config...' | |
| CONFIG_UPDATED_STEP = '✅Config Updated!' | |
| UPDATING_LOGO_STEP = '🚀Updating Logo...' | |
| LOGO_UPDATED_STEP = '✅Logo Updated!' | |
| def init_builder_chatbot_agent(uuid_str): | |
| # build model | |
| builder_cfg, model_cfg, _, _, _, _ = parse_configuration(uuid_str) | |
| # additional tool | |
| additional_tool_list = {LOGO_TOOL_NAME: LogoGeneratorTool()} | |
| tool_cfg = {LOGO_TOOL_NAME: {'is_remote_tool': True}} | |
| # build llm | |
| logger.info( | |
| uuid=uuid_str, message=f'using builder model {builder_cfg.model}') | |
| llm = LLMFactory.build_llm(builder_cfg.model, model_cfg) | |
| llm.set_agent_type(AgentType.Messages) | |
| # build prompt | |
| # prompt generator | |
| prompt_generator = 'BuilderPromptGenerator' | |
| language = builder_cfg.get('language', 'en') | |
| if language == 'zh': | |
| prompt_generator = 'ZhBuilderPromptGenerator' | |
| # build agent | |
| agent = BuilderChatbotAgent( | |
| llm, | |
| tool_cfg, | |
| agent_type=AgentType.Messages, | |
| additional_tool_list=additional_tool_list, | |
| prompt_generator=prompt_generator, | |
| uuid=uuid_str) | |
| agent.set_available_tools([LOGO_TOOL_NAME]) | |
| return agent | |
| class BuilderChatbotAgent(AgentExecutor): | |
| def __init__(self, llm, tool_cfg, agent_type, additional_tool_list, | |
| **kwargs): | |
| super().__init__( | |
| llm, | |
| tool_cfg, | |
| agent_type=agent_type, | |
| additional_tool_list=additional_tool_list, | |
| tool_retrieval=False, | |
| **kwargs) | |
| # used to reconstruct assistant message when builder config is updated | |
| self._last_assistant_structured_response = {} | |
| def stream_run(self, | |
| task: str, | |
| remote: bool = True, | |
| print_info: bool = False, | |
| append_files: list = [], | |
| uuid_str: str = '') -> Dict: | |
| # retrieve tools | |
| tool_list = self.retrieve_tools(task) | |
| self.prompt_generator.init_prompt(task, tool_list, []) | |
| function_list = [] | |
| llm_result, exec_result = '', '' | |
| idx = 0 | |
| while True: | |
| idx += 1 | |
| llm_artifacts = self.prompt_generator.generate( | |
| llm_result, exec_result) | |
| if print_info: | |
| logger.info( | |
| uuid=uuid_str, | |
| message=f'LLM inputs in round {idx}', | |
| content={'llm_artifacts': llm_artifacts}) | |
| llm_result = '' | |
| try: | |
| parser_obj = AnswerParser() | |
| for s in self.llm.stream_generate(llm_artifacts=llm_artifacts): | |
| llm_result += s | |
| answer, finish = parser_obj.parse_answer(llm_result) | |
| if answer == '': | |
| continue | |
| result = {'llm_text': answer} | |
| if finish: | |
| result.update({'step': UPDATING_CONFIG_STEP}) | |
| yield result | |
| if print_info: | |
| logger.info( | |
| uuid=uuid_str, | |
| message=f'LLM output in round {idx}', | |
| content={'llm_result': llm_result}) | |
| except Exception as e: | |
| yield {'error': 'llm result is not valid'} | |
| try: | |
| re_pattern_config = re.compile( | |
| pattern=r'Config: ([\s\S]+)\nRichConfig') | |
| res = re_pattern_config.search(llm_result) | |
| if res is None: | |
| return | |
| config = res.group(1).strip() | |
| self._last_assistant_structured_response['config_str'] = config | |
| rich_config = llm_result[llm_result.rfind('RichConfig:') | |
| + len('RichConfig:'):].strip() | |
| try: | |
| answer = json.loads(rich_config) | |
| except Exception: | |
| logger.error(uuid=uuid_str, error='parse RichConfig error') | |
| return | |
| self._last_assistant_structured_response[ | |
| 'rich_config_dict'] = answer | |
| builder_cfg = config_conversion(answer, uuid_str=uuid_str) | |
| yield {'exec_result': {'result': builder_cfg}} | |
| yield {'step': CONFIG_UPDATED_STEP} | |
| except ValueError as e: | |
| logger.error(uuid=uuid_str, error=str(e)) | |
| yield {'error content=[{}]'.format(llm_result)} | |
| return | |
| # record the llm_result result | |
| _ = self.prompt_generator.generate( | |
| { | |
| 'role': 'assistant', | |
| 'content': llm_result | |
| }, '') | |
| messages = self.prompt_generator.history | |
| if 'logo_prompt' in answer and len(messages) > 4 and ( | |
| answer['logo_prompt'] not in messages[-3]['content']): | |
| # draw logo | |
| yield {'step': UPDATING_LOGO_STEP} | |
| params = { | |
| 'user_requirement': answer['logo_prompt'], | |
| 'uuid_str': uuid_str | |
| } | |
| tool = self.tool_list[LOGO_TOOL_NAME] | |
| try: | |
| exec_result = tool(**params, remote=remote) | |
| yield {'exec_result': exec_result} | |
| yield {'step': LOGO_UPDATED_STEP} | |
| return | |
| except Exception as e: | |
| exec_result = f'Action call error: {LOGO_TOOL_NAME}: {params}. \n Error message: {e}' | |
| yield {'error': exec_result} | |
| self.prompt_generator.reset() | |
| return | |
| else: | |
| return | |
| def update_config_to_history(self, config: Dict): | |
| """ update builder config to message when user modify configuration | |
| Args: | |
| config info read from builder config file | |
| """ | |
| if len( | |
| self.prompt_generator.history | |
| ) > 0 and self.prompt_generator.history[-1]['role'] == 'assistant': | |
| answer = self._last_assistant_structured_response['answer_str'] | |
| simple_config = self._last_assistant_structured_response[ | |
| 'config_str'] | |
| rich_config_dict = { | |
| k: config[k] | |
| for k in ['name', 'description', 'prompt_recommend'] | |
| } | |
| rich_config_dict[ | |
| 'logo_prompt'] = self._last_assistant_structured_response[ | |
| 'rich_config_dict']['logo_prompt'] | |
| rich_config_dict['instructions'] = config['instruction'].split(';') | |
| rich_config = json.dumps(rich_config_dict, ensure_ascii=False) | |
| new_content = ASSISTANT_PROMPT.replace('<answer>', answer).replace( | |
| '<config>', simple_config).replace('<rich_config>', | |
| rich_config) | |
| self.prompt_generator.history[-1]['content'] = new_content | |
| def beauty_output(response: str, step_result: str): | |
| flag_list = [ | |
| CONFIG_UPDATED_STEP, UPDATING_CONFIG_STEP, LOGO_UPDATED_STEP, | |
| UPDATING_LOGO_STEP | |
| ] | |
| if step_result in flag_list: | |
| end_str = '' | |
| for item in flag_list: | |
| if response.endswith(item): | |
| end_str = item | |
| if end_str == '': | |
| response = f'{response}\n{step_result}' | |
| elif end_str in [CONFIG_UPDATED_STEP, LOGO_UPDATED_STEP]: | |
| response = f'{response}\n{step_result}' | |
| else: | |
| response = response[:-len('\n' + end_str)] | |
| response = f'{response}\n{step_result}' | |
| return response | |
| class AnswerParser(object): | |
| def __init__(self): | |
| self._history = '' | |
| def parse_answer(self, llm_result: str): | |
| finish = False | |
| answer_prompt = ANSWER + ': ' | |
| if len(llm_result) >= len(answer_prompt): | |
| start_pos = llm_result.find(answer_prompt) | |
| end_pos = llm_result.find(f'\n{CONFIG}') | |
| if start_pos >= 0: | |
| if end_pos > start_pos: | |
| result = llm_result[start_pos + len(answer_prompt):end_pos] | |
| finish = True | |
| else: | |
| result = llm_result[start_pos + len(answer_prompt):] | |
| else: | |
| result = llm_result | |
| else: | |
| result = '' | |
| new_result = result[len(self._history):] | |
| self._history = result | |
| return new_result, finish | |