|
|
import ast |
|
|
import enum |
|
|
import importlib |
|
|
import json |
|
|
import os |
|
|
import pickle |
|
|
import subprocess |
|
|
import tempfile |
|
|
import traceback |
|
|
import zipfile |
|
|
from typing import Any, ClassVar |
|
|
from urllib.parse import urljoin |
|
|
|
|
|
import pandas as pd |
|
|
import requests |
|
|
import tqdm |
|
|
from langchain_core.callbacks import BaseCallbackHandler |
|
|
from langchain_core.messages.base import get_msg_title_repr |
|
|
from langchain_core.tools import StructuredTool |
|
|
from langchain_core.utils.interactive_env import is_interactive_env |
|
|
from pydantic import BaseModel, Field, ValidationError |
|
|
|
|
|
|
|
|
def run_bash_script(script: str) -> str: |
|
|
"""Run a Bash script using subprocess. |
|
|
|
|
|
Args: |
|
|
script: Bash script to run |
|
|
|
|
|
Returns: |
|
|
Output of the Bash script |
|
|
|
|
|
Example: |
|
|
``` |
|
|
# Example of a complex Bash script |
|
|
script = ''' |
|
|
#!/bin/bash |
|
|
|
|
|
# Define variables |
|
|
DATA_DIR="/path/to/data" |
|
|
OUTPUT_FILE="results.txt" |
|
|
|
|
|
# Create output directory if it doesn't exist |
|
|
mkdir -p $(dirname $OUTPUT_FILE) |
|
|
|
|
|
# Loop through files |
|
|
for file in $DATA_DIR/*.txt; do |
|
|
echo "Processing $file..." |
|
|
# Count lines in each file |
|
|
line_count=$(wc -l < $file) |
|
|
echo "$file: $line_count lines" >> $OUTPUT_FILE |
|
|
done |
|
|
|
|
|
echo "Processing complete. Results saved to $OUTPUT_FILE" |
|
|
''' |
|
|
result = run_bash_script(script) |
|
|
print(result) |
|
|
``` |
|
|
|
|
|
""" |
|
|
try: |
|
|
|
|
|
script = script.strip() |
|
|
|
|
|
|
|
|
if not script: |
|
|
return "Error: Empty script" |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".sh", mode="w", delete=False) as f: |
|
|
|
|
|
if not script.startswith("#!/"): |
|
|
f.write("#!/bin/bash\n") |
|
|
|
|
|
if "set -e" not in script: |
|
|
f.write("set -e\n") |
|
|
f.write(script) |
|
|
temp_file = f.name |
|
|
|
|
|
|
|
|
os.chmod(temp_file, 0o755) |
|
|
|
|
|
|
|
|
env = os.environ.copy() |
|
|
cwd = os.getcwd() |
|
|
|
|
|
|
|
|
result = subprocess.run( |
|
|
[temp_file], |
|
|
shell=True, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
check=False, |
|
|
env=env, |
|
|
cwd=cwd, |
|
|
) |
|
|
|
|
|
|
|
|
os.unlink(temp_file) |
|
|
|
|
|
|
|
|
if result.returncode != 0: |
|
|
traceback.print_stack() |
|
|
print(result) |
|
|
return f"Error running Bash script (exit code {result.returncode}):\n{result.stderr}" |
|
|
else: |
|
|
return result.stdout |
|
|
except Exception as e: |
|
|
traceback.print_exc() |
|
|
return f"Error running Bash script: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
def run_cli_command(command: str) -> str: |
|
|
"""Run a CLI command using subprocess. |
|
|
|
|
|
Args: |
|
|
command: CLI command to run |
|
|
|
|
|
Returns: |
|
|
Output of the CLI command |
|
|
|
|
|
""" |
|
|
try: |
|
|
|
|
|
command = command.strip() |
|
|
|
|
|
|
|
|
if not command: |
|
|
return "Error: Empty command" |
|
|
|
|
|
|
|
|
import shlex |
|
|
|
|
|
args = shlex.split(command) |
|
|
|
|
|
|
|
|
result = subprocess.run(args, capture_output=True, text=True, check=False) |
|
|
|
|
|
|
|
|
if result.returncode != 0: |
|
|
return f"Error running command '{command}':\n{result.stderr}" |
|
|
else: |
|
|
return result.stdout |
|
|
except Exception as e: |
|
|
return f"Error running command '{command}': {str(e)}" |
|
|
|
|
|
|
|
|
def run_with_timeout(func, args=None, kwargs=None, timeout=600): |
|
|
"""Run a function with a timeout using threading instead of multiprocessing. |
|
|
This allows variables to persist in the global namespace between function calls. |
|
|
Returns the function result or a timeout error message. |
|
|
""" |
|
|
if args is None: |
|
|
args = [] |
|
|
if kwargs is None: |
|
|
kwargs = {} |
|
|
|
|
|
import ctypes |
|
|
import queue |
|
|
import threading |
|
|
|
|
|
result_queue = queue.Queue() |
|
|
|
|
|
def thread_func(func, args, kwargs, result_queue): |
|
|
"""Function to run in a separate thread.""" |
|
|
try: |
|
|
result = func(*args, **kwargs) |
|
|
result_queue.put(("success", result)) |
|
|
except Exception as e: |
|
|
result_queue.put(("error", str(e))) |
|
|
|
|
|
|
|
|
thread = threading.Thread(target=thread_func, args=(func, args, kwargs, result_queue)) |
|
|
thread.daemon = True |
|
|
thread.start() |
|
|
|
|
|
|
|
|
thread.join(timeout) |
|
|
|
|
|
|
|
|
if thread.is_alive(): |
|
|
print(f"TIMEOUT: Code execution timed out after {timeout} seconds") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
thread_id = thread.ident |
|
|
if thread_id: |
|
|
|
|
|
|
|
|
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread_id), ctypes.py_object(SystemExit)) |
|
|
if res > 1: |
|
|
|
|
|
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread_id), None) |
|
|
except Exception as e: |
|
|
print(f"Error trying to terminate thread: {e}") |
|
|
|
|
|
return f"ERROR: Code execution timed out after {timeout} seconds. Please try with simpler inputs or break your task into smaller steps." |
|
|
|
|
|
|
|
|
try: |
|
|
status, result = result_queue.get(block=False) |
|
|
return result if status == "success" else f"Error in execution: {result}" |
|
|
except queue.Empty: |
|
|
return "Error: Execution completed but no result was returned" |
|
|
|
|
|
|
|
|
class api_schema(BaseModel): |
|
|
"""api schema specification.""" |
|
|
|
|
|
api_schema: str | None = Field(description="The api schema as a dictionary") |
|
|
|
|
|
|
|
|
def function_to_api_schema(function_string, llm): |
|
|
prompt = """ |
|
|
Based on a code snippet and help me write an API docstring in the format like this: |
|
|
|
|
|
{{'name': 'get_gene_set_enrichment', |
|
|
'description': 'Given a list of genes, identify a pathway that is enriched for this gene set. Return a list of pathway name, p-value, z-scores.', |
|
|
'required_parameters': [{{'name': 'genes', |
|
|
'type': 'List[str]', |
|
|
'description': 'List of g`ene symbols to analyze', |
|
|
'default': None}}], |
|
|
'optional_parameters': [{{'name': 'top_k', |
|
|
'type': 'int', |
|
|
'description': 'Top K pathways to return', |
|
|
'default': 10}}, {{'name': 'database', |
|
|
'type': 'str', |
|
|
'description': 'Name of the database to use for enrichment analysis', |
|
|
'default': "gene_ontology"}}]}} |
|
|
|
|
|
Strictly follow the input from the function - don't create fake optional parameters. |
|
|
For variable without default values, set them as None, not null. |
|
|
For variable with boolean values, use capitalized True or False, not true or false. |
|
|
Do not add any return type in the docstring. |
|
|
Be as clear and succint as possible for the descriptions. Please do not make it overly verbose. |
|
|
Here is the code snippet: |
|
|
{code} |
|
|
""" |
|
|
llm = llm.with_structured_output(api_schema) |
|
|
|
|
|
for _ in range(7): |
|
|
try: |
|
|
api = llm.invoke(prompt.format(code=function_string)).dict()["api_schema"] |
|
|
return ast.literal_eval(api) |
|
|
|
|
|
except Exception as e: |
|
|
print("API string:", api) |
|
|
print("Error parsing the API string:", e) |
|
|
continue |
|
|
|
|
|
return "Error: Could not parse the API schema" |
|
|
|
|
|
|
|
|
|
|
|
def get_all_functions_from_file(file_path): |
|
|
with open(file_path) as file: |
|
|
file_content = file.read() |
|
|
|
|
|
|
|
|
tree = ast.parse(file_content) |
|
|
|
|
|
|
|
|
functions = [] |
|
|
|
|
|
|
|
|
for node in tree.body: |
|
|
if isinstance(node, ast.FunctionDef): |
|
|
|
|
|
if node.name.startswith("_"): |
|
|
continue |
|
|
|
|
|
start_line = node.lineno - 1 |
|
|
end_line = node.end_lineno |
|
|
func_code = file_content.splitlines()[start_line:end_line] |
|
|
functions.append("\n".join(func_code)) |
|
|
|
|
|
return functions |
|
|
|
|
|
|
|
|
def write_python_code(request: str): |
|
|
from langchain_anthropic import ChatAnthropic |
|
|
from langchain_core.output_parsers import StrOutputParser |
|
|
from langchain_core.prompts import ChatPromptTemplate |
|
|
|
|
|
model = ChatAnthropic(model="claude-3-5-sonnet-20240620") |
|
|
template = """Write some python code to solve the user's problem. |
|
|
|
|
|
Return only python code in Markdown format, e.g.: |
|
|
|
|
|
```python |
|
|
.... |
|
|
```""" |
|
|
prompt = ChatPromptTemplate.from_messages([("system", template), ("human", "{input}")]) |
|
|
|
|
|
def _sanitize_output(text: str): |
|
|
_, after = text.split("```python") |
|
|
return after.split("```")[0] |
|
|
|
|
|
chain = prompt | model | StrOutputParser() | _sanitize_output |
|
|
return chain.invoke({"input": "write a code that " + request}) |
|
|
|
|
|
|
|
|
def execute_graphql_query( |
|
|
query: str, |
|
|
variables: dict, |
|
|
api_address: str = "https://api.genetics.opentargets.org/graphql", |
|
|
) -> dict: |
|
|
"""Executes a GraphQL query with variables and returns the data as a dictionary.""" |
|
|
headers = {"Content-Type": "application/json"} |
|
|
response = requests.post(api_address, json={"query": query, "variables": variables}, headers=headers) |
|
|
if response.status_code == 200: |
|
|
return response.json() |
|
|
else: |
|
|
print(response.text) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
def get_tool_decorated_functions(relative_path): |
|
|
import ast |
|
|
import importlib.util |
|
|
import os |
|
|
|
|
|
|
|
|
current_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
|
|
|
|
|
file_path = os.path.join(current_dir, relative_path) |
|
|
|
|
|
with open(file_path) as file: |
|
|
tree = ast.parse(file.read(), filename=file_path) |
|
|
|
|
|
tool_function_names = [] |
|
|
|
|
|
for node in ast.walk(tree): |
|
|
if isinstance(node, ast.FunctionDef): |
|
|
for decorator in node.decorator_list: |
|
|
if ( |
|
|
isinstance(decorator, ast.Name) |
|
|
and decorator.id == "tool" |
|
|
or ( |
|
|
isinstance(decorator, ast.Call) |
|
|
and isinstance(decorator.func, ast.Name) |
|
|
and decorator.func.id == "tool" |
|
|
) |
|
|
): |
|
|
tool_function_names.append(node.name) |
|
|
|
|
|
|
|
|
package_path = os.path.relpath(file_path, start=current_dir) |
|
|
module_name = package_path.replace(os.path.sep, ".").rsplit(".", 1)[0] |
|
|
|
|
|
|
|
|
spec = importlib.util.spec_from_file_location(module_name, file_path) |
|
|
module = importlib.util.module_from_spec(spec) |
|
|
spec.loader.exec_module(module) |
|
|
|
|
|
tool_functions = [getattr(module, name) for name in tool_function_names] |
|
|
|
|
|
return tool_functions |
|
|
|
|
|
|
|
|
def load_pickle(file): |
|
|
import pickle |
|
|
|
|
|
with open(file, "rb") as f: |
|
|
return pickle.load(f) |
|
|
|
|
|
def pretty_print(message, printout=True): |
|
|
if isinstance(message, tuple): |
|
|
title = message |
|
|
elif isinstance(message.content, list): |
|
|
title = get_msg_title_repr(message.type.title().upper() + " Message", bold=is_interactive_env()) |
|
|
if message.name is not None: |
|
|
title += f"\nName: {message.name}" |
|
|
|
|
|
for i in message.content: |
|
|
if i["type"] == "text": |
|
|
title += f"\n{i['text']}\n" |
|
|
elif i["type"] == "tool_use": |
|
|
title += f"\nTool: {i['name']}" |
|
|
title += f"\nInput: {i['input']}" |
|
|
if printout: |
|
|
print(f"{title}") |
|
|
else: |
|
|
title = get_msg_title_repr(message.type.title() + " Message", bold=is_interactive_env()) |
|
|
if message.name is not None: |
|
|
title += f"\nName: {message.name}" |
|
|
title += f"\n\n{message.content}" |
|
|
if printout: |
|
|
print(f"{title}") |
|
|
return title |
|
|
|
|
|
|
|
|
class CustomBaseModel(BaseModel): |
|
|
api_schema: ClassVar[dict] = None |
|
|
|
|
|
|
|
|
model_config = {"arbitrary_types_allowed": True} |
|
|
|
|
|
@classmethod |
|
|
def set_api_schema(cls, schema: dict): |
|
|
cls.api_schema = schema |
|
|
|
|
|
@classmethod |
|
|
def model_validate(cls, obj): |
|
|
try: |
|
|
return super().model_validate(obj) |
|
|
except (ValidationError, AttributeError) as e: |
|
|
if not cls.api_schema: |
|
|
raise e |
|
|
|
|
|
error_msg = "Required Parameters:\n" |
|
|
for param in cls.api_schema["required_parameters"]: |
|
|
error_msg += f"- {param['name']} ({param['type']}): {param['description']}\n" |
|
|
|
|
|
error_msg += "\nErrors:\n" |
|
|
for err in e.errors(): |
|
|
field = err["loc"][0] if err["loc"] else "input" |
|
|
error_msg += f"- {field}: {err['msg']}\n" |
|
|
|
|
|
if not obj: |
|
|
error_msg += "\nNo input provided" |
|
|
else: |
|
|
error_msg += "\nProvided Input:\n" |
|
|
for key, value in obj.items(): |
|
|
error_msg += f"- {key}: {value}\n" |
|
|
|
|
|
missing_params = {param["name"] for param in cls.api_schema["required_parameters"]} - set(obj.keys()) |
|
|
if missing_params: |
|
|
error_msg += "\nMissing Parameters:\n" |
|
|
for param in missing_params: |
|
|
error_msg += f"- {param}\n" |
|
|
|
|
|
|
|
|
raise ValidationError.from_exception_data( |
|
|
title="Validation Error", |
|
|
line_errors=[ |
|
|
{ |
|
|
"type": "value_error", |
|
|
"loc": ("input",), |
|
|
"input": obj, |
|
|
"ctx": { |
|
|
"error": error_msg, |
|
|
}, |
|
|
} |
|
|
], |
|
|
) from None |
|
|
|
|
|
|
|
|
def safe_execute_decorator(func): |
|
|
def wrapper(*args, **kwargs): |
|
|
try: |
|
|
return func(*args, **kwargs) |
|
|
except Exception as e: |
|
|
return str(e) |
|
|
|
|
|
return wrapper |
|
|
|
|
|
|
|
|
def api_schema_to_langchain_tool(api_schema, mode="generated_tool", module_name=None): |
|
|
if mode == "generated_tool": |
|
|
module = importlib.import_module("histopath.tool.generated_tool." + api_schema["tool_name"] + ".api") |
|
|
elif mode == "custom_tool": |
|
|
module = importlib.import_module(module_name) |
|
|
|
|
|
api_function = getattr(module, api_schema["name"]) |
|
|
api_function = safe_execute_decorator(api_function) |
|
|
|
|
|
|
|
|
type_mapping = { |
|
|
"string": str, |
|
|
"integer": int, |
|
|
"boolean": bool, |
|
|
"pandas": pd.DataFrame, |
|
|
"str": str, |
|
|
"int": int, |
|
|
"bool": bool, |
|
|
"List[str]": list[str], |
|
|
"List[int]": list[int], |
|
|
"Dict": dict, |
|
|
"Any": Any, |
|
|
} |
|
|
|
|
|
|
|
|
annotations = {} |
|
|
for param in api_schema["required_parameters"]: |
|
|
param_type = param["type"] |
|
|
if param_type in type_mapping: |
|
|
annotations[param["name"]] = type_mapping[param_type] |
|
|
else: |
|
|
|
|
|
try: |
|
|
annotations[param["name"]] = eval(param_type) |
|
|
except (NameError, SyntaxError): |
|
|
|
|
|
annotations[param["name"]] = Any |
|
|
|
|
|
fields = {param["name"]: Field(description=param["description"]) for param in api_schema["required_parameters"]} |
|
|
|
|
|
|
|
|
ApiInput = type("Input", (CustomBaseModel,), {"__annotations__": annotations, **fields}) |
|
|
|
|
|
ApiInput.set_api_schema(api_schema) |
|
|
|
|
|
|
|
|
api_tool = StructuredTool.from_function( |
|
|
func=api_function, |
|
|
name=api_schema["name"], |
|
|
description=api_schema["description"], |
|
|
args_schema=ApiInput, |
|
|
return_direct=True, |
|
|
) |
|
|
|
|
|
return api_tool |
|
|
|
|
|
class ID(enum.Enum): |
|
|
ENTREZ = "Entrez" |
|
|
ENSEMBL = "Ensembl without version" |
|
|
ENSEMBL_W_VERSION = "Ensembl with version" |
|
|
|
|
|
def save_pkl(f, filename): |
|
|
with open(filename, "wb") as file: |
|
|
pickle.dump(f, file) |
|
|
|
|
|
def load_pkl(filename): |
|
|
with open(filename, "rb") as file: |
|
|
return pickle.load(file) |
|
|
|
|
|
_TEXT_COLOR_MAPPING = { |
|
|
"blue": "36;1", |
|
|
"yellow": "33;1", |
|
|
"pink": "38;5;200", |
|
|
"green": "32;1", |
|
|
"red": "31;1", |
|
|
} |
|
|
|
|
|
def color_print(text, color="blue"): |
|
|
color_str = _TEXT_COLOR_MAPPING[color] |
|
|
print(f"\u001b[{color_str}m\033[1;3m{text}\u001b[0m") |
|
|
|
|
|
class PromptLogger(BaseCallbackHandler): |
|
|
def on_chat_model_start(self, serialized, messages, **kwargs): |
|
|
for message in messages[0]: |
|
|
color_print(message.pretty_repr(), color="green") |
|
|
|
|
|
|
|
|
class NodeLogger(BaseCallbackHandler): |
|
|
def on_llm_end(self, response, **kwargs): |
|
|
for generations in response.generations: |
|
|
for generation in generations: |
|
|
generated_text = generation.message.content |
|
|
|
|
|
color_print(generated_text, color="yellow") |
|
|
|
|
|
def on_agent_action(self, action, **kwargs): |
|
|
color_print(action.log, color="pink") |
|
|
|
|
|
def on_agent_finish(self, finish, **kwargs): |
|
|
color_print(finish, color="red") |
|
|
|
|
|
def on_tool_start(self, serialized, input_str, **kwargs): |
|
|
tool_name = serialized.get("name") |
|
|
color_print(f"Calling {tool_name} with inputs: {input_str}", color="pink") |
|
|
|
|
|
def on_tool_end(self, output, **kwargs): |
|
|
output = str(output) |
|
|
color_print(output, color="blue") |
|
|
|
|
|
|
|
|
def check_or_create_path(path=None): |
|
|
|
|
|
if path is None: |
|
|
path = os.path.join(os.getcwd(), "tmp_directory") |
|
|
|
|
|
|
|
|
if not os.path.exists(path): |
|
|
|
|
|
os.makedirs(path) |
|
|
print(f"Directory created at: {path}") |
|
|
else: |
|
|
print(f"Directory already exists at: {path}") |
|
|
|
|
|
return path |
|
|
|
|
|
|
|
|
def langchain_to_gradio_message(message): |
|
|
|
|
|
if isinstance(message.content, list): |
|
|
|
|
|
gradio_messages = [] |
|
|
for item in message.content: |
|
|
gradio_message = { |
|
|
"role": "user" if message.type == "human" else "assistant", |
|
|
"content": "", |
|
|
"metadata": {}, |
|
|
} |
|
|
|
|
|
if item["type"] == "text": |
|
|
item["text"] = item["text"].replace("<think>", "\n") |
|
|
item["text"] = item["text"].replace("</think>", "\n") |
|
|
gradio_message["content"] += f"{item['text']}\n" |
|
|
gradio_messages.append(gradio_message) |
|
|
elif item["type"] == "tool_use": |
|
|
if item["name"] == "run_python_repl": |
|
|
gradio_message["metadata"]["title"] = "π οΈ Writing code..." |
|
|
|
|
|
gradio_message["metadata"]["log"] = "Executing Code block..." |
|
|
gradio_message["content"] = f"##### Code: \n ```python \n {item['input']['command']} \n``` \n" |
|
|
else: |
|
|
gradio_message["metadata"]["title"] = f"π οΈ Used tool ```{item['name']}```" |
|
|
to_print = ";".join([i + ": " + str(j) for i, j in item["input"].items()]) |
|
|
gradio_message["metadata"]["log"] = f"π Input -- {to_print}\n" |
|
|
gradio_message["metadata"]["status"] = "pending" |
|
|
gradio_messages.append(gradio_message) |
|
|
|
|
|
else: |
|
|
gradio_message = { |
|
|
"role": "user" if message.type == "human" else "assistant", |
|
|
"content": "", |
|
|
"metadata": {}, |
|
|
} |
|
|
print(message) |
|
|
content = message.content |
|
|
content = content.replace("<think>", "\n") |
|
|
content = content.replace("</think>", "\n") |
|
|
content = content.replace("<solution>", "\n") |
|
|
content = content.replace("</solution>", "\n") |
|
|
|
|
|
gradio_message["content"] = content |
|
|
gradio_messages = [gradio_message] |
|
|
return gradio_messages |
|
|
|
|
|
|
|
|
def parse_hpo_obo(file_path): |
|
|
"""Parse the HPO OBO file and create a dictionary mapping HP IDs to phenotype descriptions. |
|
|
|
|
|
Args: |
|
|
file_path (str): Path to the HPO OBO file. |
|
|
|
|
|
Returns: |
|
|
dict: A dictionary where keys are HP IDs and values are phenotype descriptions. |
|
|
|
|
|
""" |
|
|
hp_dict = {} |
|
|
current_id = None |
|
|
current_name = None |
|
|
|
|
|
with open(file_path) as file: |
|
|
for line in file: |
|
|
line = line.strip() |
|
|
if line.startswith("[Term]"): |
|
|
|
|
|
if current_id and current_name: |
|
|
hp_dict[current_id] = current_name |
|
|
current_id = None |
|
|
current_name = None |
|
|
elif line.startswith("id: HP:"): |
|
|
current_id = line.split(": ")[1] |
|
|
elif line.startswith("name:"): |
|
|
current_name = line.split(": ", 1)[1] |
|
|
|
|
|
|
|
|
if current_id and current_name: |
|
|
hp_dict[current_id] = current_name |
|
|
|
|
|
return hp_dict |
|
|
|
|
|
|
|
|
def textify_api_dict(api_dict): |
|
|
"""Convert a nested API dictionary to a nicely formatted string.""" |
|
|
lines = [] |
|
|
for category, methods in api_dict.items(): |
|
|
lines.append(f"Import file: {category}") |
|
|
lines.append("=" * (len("Import file: ") + len(category))) |
|
|
for method in methods: |
|
|
lines.append(f"Method: {method.get('name', 'N/A')}") |
|
|
lines.append(f" Description: {method.get('description', 'No description provided.')}") |
|
|
|
|
|
|
|
|
req_params = method.get("required_parameters", []) |
|
|
if req_params: |
|
|
lines.append(" Required Parameters:") |
|
|
for param in req_params: |
|
|
param_name = param.get("name", "N/A") |
|
|
param_type = param.get("type", "N/A") |
|
|
param_desc = param.get("description", "No description") |
|
|
param_default = param.get("default", "None") |
|
|
lines.append(f" - {param_name} ({param_type}): {param_desc} [Default: {param_default}]") |
|
|
|
|
|
|
|
|
opt_params = method.get("optional_parameters", []) |
|
|
if opt_params: |
|
|
lines.append(" Optional Parameters:") |
|
|
for param in opt_params: |
|
|
param_name = param.get("name", "N/A") |
|
|
param_type = param.get("type", "N/A") |
|
|
param_desc = param.get("description", "No description") |
|
|
param_default = param.get("default", "None") |
|
|
lines.append(f" - {param_name} ({param_type}): {param_desc} [Default: {param_default}]") |
|
|
|
|
|
lines.append("") |
|
|
lines.append("") |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
def read_module2api(): |
|
|
fields = [ |
|
|
"support_tools", |
|
|
"pathology" |
|
|
] |
|
|
|
|
|
module2api = {} |
|
|
for field in fields: |
|
|
module_name = f"histopath.tool.tool_description.{field}" |
|
|
module = importlib.import_module(module_name) |
|
|
module2api[f"histopath.tool.{field}"] = module.description |
|
|
return module2api |