Spaces:
Runtime error
Runtime error
File size: 1,801 Bytes
0e02b97 a8ee91f 7bd7366 ecc9e42 c8f7ad8 2bae1d8 a8ee91f 2bae1d8 7bd7366 aee9883 2afcf85 94490a0 59f95d3 7bd7366 c8f7ad8 ecc9e42 aee9883 7a7b1d3 2bae1d8 c8f7ad8 2bae1d8 7bd7366 c8f7ad8 ecc9e42 c8f7ad8 ecc9e42 c8f7ad8 ecc9e42 a8ee91f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
from __future__ import annotations
__all__ = ["execute_terminal", "execute_terminal_async", "set_vm"]
import subprocess
import os
from typing import Optional
import asyncio
from .vm import LinuxVM
_VM: Optional[LinuxVM] = None
def set_vm(vm: LinuxVM | None) -> None:
"""Register the VM instance used for command execution."""
global _VM
_VM = vm
def execute_terminal(command: str) -> str:
"""
Execute a shell command in a Ubuntu terminal.
Use this tool to inspect uploaded documents under ``/data``, fetch web
content with utilities like ``curl`` or ``wget`` and run other commands.
The assistant must call this tool to search the internet whenever unsure
about any detail.
The command is executed with network access enabled. It runs in the
background without a timeout so the assistant can continue responding
while the command executes.
"""
if not command:
return "No command provided."
if _VM:
try:
return _VM.execute(command, detach=True)
except Exception as exc: # pragma: no cover - unforeseen errors
return f"Failed to execute command in VM: {exc}"
try:
subprocess.Popen(
command,
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
env=os.environ.copy(),
)
return "Command started in background."
except Exception as exc: # pragma: no cover - unforeseen errors
return f"Failed to execute command: {exc}"
async def execute_terminal_async(command: str) -> str:
"""Asynchronously execute a shell command."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, execute_terminal, command)
|