Spaces:
Build error
Build error
| import asyncio | |
| import time | |
| from copy import deepcopy | |
| from logging import LoggerAdapter | |
| import socketio | |
| from openhands.controller.agent import Agent | |
| from openhands.core.config import OpenHandsConfig | |
| from openhands.core.config.condenser_config import ( | |
| BrowserOutputCondenserConfig, | |
| CondenserPipelineConfig, | |
| LLMSummarizingCondenserConfig, | |
| ) | |
| from openhands.core.config.mcp_config import MCPConfig, OpenHandsMCPConfigImpl | |
| from openhands.core.exceptions import MicroagentValidationError | |
| from openhands.core.logger import OpenHandsLoggerAdapter | |
| from openhands.core.schema import AgentState | |
| from openhands.events.action import MessageAction, NullAction | |
| from openhands.events.event import Event, EventSource | |
| from openhands.events.observation import ( | |
| AgentStateChangedObservation, | |
| CmdOutputObservation, | |
| NullObservation, | |
| ) | |
| from openhands.events.observation.agent import RecallObservation | |
| from openhands.events.observation.error import ErrorObservation | |
| from openhands.events.serialization import event_from_dict, event_to_dict | |
| from openhands.events.stream import EventStreamSubscriber | |
| from openhands.llm.llm import LLM | |
| from openhands.server.session.agent_session import AgentSession | |
| from openhands.server.session.conversation_init_data import ConversationInitData | |
| from openhands.storage.data_models.settings import Settings | |
| from openhands.storage.files import FileStore | |
| ROOM_KEY = 'room:{sid}' | |
| class Session: | |
| sid: str | |
| sio: socketio.AsyncServer | None | |
| last_active_ts: int = 0 | |
| is_alive: bool = True | |
| agent_session: AgentSession | |
| loop: asyncio.AbstractEventLoop | |
| config: OpenHandsConfig | |
| file_store: FileStore | |
| user_id: str | None | |
| logger: LoggerAdapter | |
| def __init__( | |
| self, | |
| sid: str, | |
| config: OpenHandsConfig, | |
| file_store: FileStore, | |
| sio: socketio.AsyncServer | None, | |
| user_id: str | None = None, | |
| ): | |
| self.sid = sid | |
| self.sio = sio | |
| self.last_active_ts = int(time.time()) | |
| self.file_store = file_store | |
| self.logger = OpenHandsLoggerAdapter(extra={'session_id': sid}) | |
| self.agent_session = AgentSession( | |
| sid, | |
| file_store, | |
| status_callback=self.queue_status_message, | |
| user_id=user_id, | |
| ) | |
| self.agent_session.event_stream.subscribe( | |
| EventStreamSubscriber.SERVER, self.on_event, self.sid | |
| ) | |
| # Copying this means that when we update variables they are not applied to the shared global configuration! | |
| self.config = deepcopy(config) | |
| self.loop = asyncio.get_event_loop() | |
| self.user_id = user_id | |
| async def close(self) -> None: | |
| if self.sio: | |
| await self.sio.emit( | |
| 'oh_event', | |
| event_to_dict( | |
| AgentStateChangedObservation('', AgentState.STOPPED.value) | |
| ), | |
| to=ROOM_KEY.format(sid=self.sid), | |
| ) | |
| self.is_alive = False | |
| await self.agent_session.close() | |
| async def initialize_agent( | |
| self, | |
| settings: Settings, | |
| initial_message: MessageAction | None, | |
| replay_json: str | None, | |
| ) -> None: | |
| self.agent_session.event_stream.add_event( | |
| AgentStateChangedObservation('', AgentState.LOADING), | |
| EventSource.ENVIRONMENT, | |
| ) | |
| agent_cls = settings.agent or self.config.default_agent | |
| self.config.security.confirmation_mode = ( | |
| self.config.security.confirmation_mode | |
| if settings.confirmation_mode is None | |
| else settings.confirmation_mode | |
| ) | |
| self.config.security.security_analyzer = ( | |
| settings.security_analyzer or self.config.security.security_analyzer | |
| ) | |
| self.config.sandbox.base_container_image = ( | |
| settings.sandbox_base_container_image | |
| or self.config.sandbox.base_container_image | |
| ) | |
| self.config.sandbox.runtime_container_image = ( | |
| settings.sandbox_runtime_container_image | |
| if settings.sandbox_base_container_image | |
| or settings.sandbox_runtime_container_image | |
| else self.config.sandbox.runtime_container_image | |
| ) | |
| max_iterations = settings.max_iterations or self.config.max_iterations | |
| # This is a shallow copy of the default LLM config, so changes here will | |
| # persist if we retrieve the default LLM config again when constructing | |
| # the agent | |
| default_llm_config = self.config.get_llm_config() | |
| default_llm_config.model = settings.llm_model or '' | |
| default_llm_config.api_key = settings.llm_api_key | |
| default_llm_config.base_url = settings.llm_base_url | |
| self.config.search_api_key = settings.search_api_key | |
| # NOTE: this need to happen AFTER the config is updated with the search_api_key | |
| self.config.mcp = settings.mcp_config or MCPConfig( | |
| sse_servers=[], stdio_servers=[] | |
| ) | |
| # Add OpenHands' MCP server by default | |
| openhands_mcp_server, openhands_mcp_stdio_servers = ( | |
| OpenHandsMCPConfigImpl.create_default_mcp_server_config( | |
| self.config.mcp_host, self.config, self.user_id | |
| ) | |
| ) | |
| if openhands_mcp_server: | |
| self.config.mcp.shttp_servers.append(openhands_mcp_server) | |
| self.config.mcp.stdio_servers.extend(openhands_mcp_stdio_servers) | |
| # TODO: override other LLM config & agent config groups (#2075) | |
| llm = self._create_llm(agent_cls) | |
| agent_config = self.config.get_agent_config(agent_cls) | |
| if settings.enable_default_condenser: | |
| # Default condenser chains a condenser that limits browser the total | |
| # size of browser observations with a condenser that limits the size | |
| # of the view given to the LLM. The order matters: with the browser | |
| # output first, the summarizer will only see the most recent browser | |
| # output, which should keep the summarization cost down. | |
| default_condenser_config = CondenserPipelineConfig( | |
| condensers=[ | |
| BrowserOutputCondenserConfig(attention_window=2), | |
| LLMSummarizingCondenserConfig( | |
| llm_config=llm.config, keep_first=4, max_size=140 | |
| ), | |
| ] | |
| ) | |
| self.logger.info( | |
| f'Enabling pipeline condenser with:' | |
| f' browser_output_masking(attention_window=2), ' | |
| f' llm(model="{llm.config.model}", ' | |
| f' base_url="{llm.config.base_url}", ' | |
| f' keep_first=4, max_size=80)' | |
| ) | |
| agent_config.condenser = default_condenser_config | |
| agent = Agent.get_cls(agent_cls)(llm, agent_config) | |
| git_provider_tokens = None | |
| selected_repository = None | |
| selected_branch = None | |
| custom_secrets = None | |
| conversation_instructions = None | |
| if isinstance(settings, ConversationInitData): | |
| git_provider_tokens = settings.git_provider_tokens | |
| selected_repository = settings.selected_repository | |
| selected_branch = settings.selected_branch | |
| custom_secrets = settings.custom_secrets | |
| conversation_instructions = settings.conversation_instructions | |
| try: | |
| await self.agent_session.start( | |
| runtime_name=self.config.runtime, | |
| config=self.config, | |
| agent=agent, | |
| max_iterations=max_iterations, | |
| max_budget_per_task=self.config.max_budget_per_task, | |
| agent_to_llm_config=self.config.get_agent_to_llm_config_map(), | |
| agent_configs=self.config.get_agent_configs(), | |
| git_provider_tokens=git_provider_tokens, | |
| custom_secrets=custom_secrets, | |
| selected_repository=selected_repository, | |
| selected_branch=selected_branch, | |
| initial_message=initial_message, | |
| conversation_instructions=conversation_instructions, | |
| replay_json=replay_json, | |
| ) | |
| except MicroagentValidationError as e: | |
| self.logger.exception(f'Error creating agent_session: {e}') | |
| # For microagent validation errors, provide more helpful information | |
| await self.send_error(f'Failed to create agent session: {str(e)}') | |
| return | |
| except ValueError as e: | |
| self.logger.exception(f'Error creating agent_session: {e}') | |
| error_message = str(e) | |
| # For ValueError related to microagents, provide more helpful information | |
| if 'microagent' in error_message.lower(): | |
| await self.send_error( | |
| f'Failed to create agent session: {error_message}' | |
| ) | |
| else: | |
| # For other ValueErrors, just show the error class | |
| await self.send_error('Failed to create agent session: ValueError') | |
| return | |
| except Exception as e: | |
| self.logger.exception(f'Error creating agent_session: {e}') | |
| # For other errors, just show the error class to avoid exposing sensitive information | |
| await self.send_error( | |
| f'Failed to create agent session: {e.__class__.__name__}' | |
| ) | |
| return | |
| def _create_llm(self, agent_cls: str | None) -> LLM: | |
| """Initialize LLM, extracted for testing.""" | |
| agent_name = agent_cls if agent_cls is not None else 'agent' | |
| return LLM( | |
| config=self.config.get_llm_config_from_agent(agent_name), | |
| retry_listener=self._notify_on_llm_retry, | |
| ) | |
| def _notify_on_llm_retry(self, retries: int, max: int) -> None: | |
| msg_id = 'STATUS$LLM_RETRY' | |
| self.queue_status_message( | |
| 'info', msg_id, f'Retrying LLM request, {retries} / {max}' | |
| ) | |
| def on_event(self, event: Event) -> None: | |
| asyncio.get_event_loop().run_until_complete(self._on_event(event)) | |
| async def _on_event(self, event: Event) -> None: | |
| """Callback function for events that mainly come from the agent. | |
| Event is the base class for any agent action and observation. | |
| Args: | |
| event: The agent event (Observation or Action). | |
| """ | |
| if isinstance(event, NullAction): | |
| return | |
| if isinstance(event, NullObservation): | |
| return | |
| if event.source == EventSource.AGENT: | |
| await self.send(event_to_dict(event)) | |
| elif event.source == EventSource.USER: | |
| await self.send(event_to_dict(event)) | |
| # NOTE: ipython observations are not sent here currently | |
| elif event.source == EventSource.ENVIRONMENT and isinstance( | |
| event, | |
| (CmdOutputObservation, AgentStateChangedObservation, RecallObservation), | |
| ): | |
| # feedback from the environment to agent actions is understood as agent events by the UI | |
| event_dict = event_to_dict(event) | |
| event_dict['source'] = EventSource.AGENT | |
| await self.send(event_dict) | |
| if ( | |
| isinstance(event, AgentStateChangedObservation) | |
| and event.agent_state == AgentState.ERROR | |
| ): | |
| self.logger.info( | |
| 'Agent status error', | |
| extra={'signal': 'agent_status_error'}, | |
| ) | |
| elif isinstance(event, ErrorObservation): | |
| # send error events as agent events to the UI | |
| event_dict = event_to_dict(event) | |
| event_dict['source'] = EventSource.AGENT | |
| await self.send(event_dict) | |
| async def dispatch(self, data: dict) -> None: | |
| event = event_from_dict(data.copy()) | |
| # This checks if the model supports images | |
| if isinstance(event, MessageAction) and event.image_urls: | |
| controller = self.agent_session.controller | |
| if controller: | |
| if controller.agent.llm.config.disable_vision: | |
| await self.send_error( | |
| 'Support for images is disabled for this model, try without an image.' | |
| ) | |
| return | |
| if not controller.agent.llm.vision_is_active(): | |
| await self.send_error( | |
| 'Model does not support image upload, change to a different model or try without an image.' | |
| ) | |
| return | |
| self.agent_session.event_stream.add_event(event, EventSource.USER) | |
| async def send(self, data: dict[str, object]) -> None: | |
| if asyncio.get_running_loop() != self.loop: | |
| self.loop.create_task(self._send(data)) | |
| return | |
| await self._send(data) | |
| async def _send(self, data: dict[str, object]) -> bool: | |
| try: | |
| if not self.is_alive: | |
| return False | |
| if self.sio: | |
| await self.sio.emit('oh_event', data, to=ROOM_KEY.format(sid=self.sid)) | |
| await asyncio.sleep(0.001) # This flushes the data to the client | |
| self.last_active_ts = int(time.time()) | |
| return True | |
| except RuntimeError as e: | |
| self.logger.error(f'Error sending data to websocket: {str(e)}') | |
| self.is_alive = False | |
| return False | |
| async def send_error(self, message: str) -> None: | |
| """Sends an error message to the client.""" | |
| await self.send({'error': True, 'message': message}) | |
| async def _send_status_message(self, msg_type: str, id: str, message: str) -> None: | |
| """Sends a status message to the client.""" | |
| if msg_type == 'error': | |
| agent_session = self.agent_session | |
| controller = self.agent_session.controller | |
| if controller is not None and not agent_session.is_closed(): | |
| await controller.set_agent_state_to(AgentState.ERROR) | |
| self.logger.info( | |
| 'Agent status error', | |
| extra={'signal': 'agent_status_error'}, | |
| ) | |
| await self.send( | |
| {'status_update': True, 'type': msg_type, 'id': id, 'message': message} | |
| ) | |
| def queue_status_message(self, msg_type: str, id: str, message: str) -> None: | |
| """Queues a status message to be sent asynchronously.""" | |
| asyncio.run_coroutine_threadsafe( | |
| self._send_status_message(msg_type, id, message), self.loop | |
| ) | |