|
|
""" |
|
|
SIMPLIFIED VERSION - No monkey patching, just clean type hints |
|
|
""" |
|
|
import os |
|
|
import re |
|
|
import shutil |
|
|
import traceback |
|
|
import gradio as gr |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Any, Tuple, Optional |
|
|
from histopath.agent import A1 |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
if os.path.exists(".env"): |
|
|
load_dotenv() |
|
|
|
|
|
PASSCODE = os.getenv("GRADIO_PASSWORD", "TESTING") |
|
|
agent = None |
|
|
|
|
|
|
|
|
def check_for_output_files(): |
|
|
"""Check output directory for files.""" |
|
|
output_dir = Path("./output") |
|
|
if not output_dir.exists(): |
|
|
return [], [] |
|
|
|
|
|
images = [str(f) for f in output_dir.glob("*") if f.suffix.lower() in {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff"}] |
|
|
data = [str(f) for f in output_dir.glob("*") if f.suffix.lower() in {".csv", ".txt", ".json", ".npy"}] |
|
|
|
|
|
return images, data |
|
|
|
|
|
|
|
|
def preview_file(file): |
|
|
"""Preview uploaded file.""" |
|
|
if file is None: |
|
|
return None, None, "No file" |
|
|
|
|
|
path = Path(file.name) |
|
|
if path.suffix.lower() in {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff", ".svs"}: |
|
|
return file.name, None, f"Preview: {path.name}" |
|
|
else: |
|
|
size = path.stat().st_size / 1024 |
|
|
return None, file.name, f"File: {path.name} ({size:.1f} KB)" |
|
|
|
|
|
|
|
|
def parse_output(text): |
|
|
"""Parse agent output.""" |
|
|
text = re.sub(r'={30,}.*?={30,}', '', text, flags=re.DOTALL).strip() |
|
|
|
|
|
result = {"type": "text", "content": text, "code": None, "obs": None, "think": None} |
|
|
|
|
|
code_match = re.search(r'<execute>(.*?)</execute>', text, re.DOTALL) |
|
|
if code_match: |
|
|
result["type"] = "code" |
|
|
result["code"] = code_match.group(1).strip() |
|
|
before = text[:code_match.start()].strip() |
|
|
before = re.sub(r'<think>(.*?)</think>', r'\1', before, flags=re.DOTALL) |
|
|
result["think"] = before or None |
|
|
return result |
|
|
|
|
|
obs_match = re.search(r'<observation>(.*?)</observation>', text, re.DOTALL) |
|
|
if obs_match: |
|
|
result["type"] = "obs" |
|
|
result["obs"] = obs_match.group(1).strip() |
|
|
before = text[:obs_match.start()].strip() |
|
|
before = re.sub(r'<think>(.*?)</think>', r'\1', before, flags=re.DOTALL) |
|
|
result["think"] = before or None |
|
|
return result |
|
|
|
|
|
sol_match = re.search(r'<solution>(.*?)</solution>', text, re.DOTALL) |
|
|
if sol_match: |
|
|
result["type"] = "solution" |
|
|
result["content"] = sol_match.group(1).strip() |
|
|
before = text[:sol_match.start()].strip() |
|
|
before = re.sub(r'<think>(.*?)</think>', r'\1', before, flags=re.DOTALL) |
|
|
result["think"] = before or None |
|
|
return result |
|
|
|
|
|
result["content"] = re.sub(r'<think>(.*?)</think>', r'\1', text, flags=re.DOTALL) |
|
|
return result |
|
|
|
|
|
|
|
|
def format_display(parsed): |
|
|
"""Format for display.""" |
|
|
parts = [] |
|
|
|
|
|
if parsed.get("think"): |
|
|
parts.append(parsed["think"]) |
|
|
|
|
|
if parsed["type"] == "code": |
|
|
if parsed.get("think"): |
|
|
parts.append("\n---\n") |
|
|
parts.append("### π» Code\n") |
|
|
parts.append(f"```python\n{parsed['code']}\n```") |
|
|
elif parsed["type"] == "obs": |
|
|
if parsed.get("think"): |
|
|
parts.append("\n---\n") |
|
|
parts.append("### π Output\n") |
|
|
parts.append(f"```\n{parsed['obs']}\n```") |
|
|
elif parsed["type"] == "solution": |
|
|
if parsed.get("think"): |
|
|
parts.append("\n---\n") |
|
|
parts.append("### β
Solution\n") |
|
|
parts.append(parsed['content']) |
|
|
else: |
|
|
if not parsed.get("think"): |
|
|
parts.append(parsed["content"]) |
|
|
|
|
|
return "\n\n".join(parts) |
|
|
|
|
|
|
|
|
|
|
|
def process_query(prompt: str, file: Any, history: List[Dict[str, str]]): |
|
|
"""Process user query - SIMPLE TYPES ONLY.""" |
|
|
global agent |
|
|
|
|
|
|
|
|
if history is None: |
|
|
history = [] |
|
|
|
|
|
|
|
|
if agent is None: |
|
|
history.append({"role": "assistant", "content": "β οΈ Enter passcode first"}) |
|
|
yield history, None, None, None, None, "Not initialized" |
|
|
return |
|
|
|
|
|
|
|
|
if not prompt.strip() and file is None: |
|
|
history.append({"role": "assistant", "content": "β οΈ Provide prompt or file"}) |
|
|
yield history, None, None, None, None, "No input" |
|
|
return |
|
|
|
|
|
|
|
|
if file is not None: |
|
|
try: |
|
|
Path("./data").mkdir(exist_ok=True) |
|
|
fname = Path(file.name).name |
|
|
fpath = Path("./data") / fname |
|
|
shutil.copy(file.name, fpath) |
|
|
prompt = f"{prompt}\n\nFile: {fpath}" if prompt.strip() else f"File at: {fpath}" |
|
|
except Exception as e: |
|
|
history.append({"role": "assistant", "content": f"β File error: {e}"}) |
|
|
yield history, None, None, None, None, str(e) |
|
|
return |
|
|
|
|
|
|
|
|
history.append({"role": "user", "content": prompt}) |
|
|
yield history, None, None, None, None, "Processing..." |
|
|
|
|
|
|
|
|
try: |
|
|
outputs = [] |
|
|
for step in agent.go_stream(prompt): |
|
|
outputs.append(step.get("output", "")) |
|
|
|
|
|
|
|
|
for out in outputs: |
|
|
if not out.strip(): |
|
|
continue |
|
|
parsed = parse_output(out) |
|
|
msg = format_display(parsed) |
|
|
if msg.strip(): |
|
|
history.append({"role": "assistant", "content": msg}) |
|
|
|
|
|
|
|
|
imgs, data = check_for_output_files() |
|
|
yield history, imgs, data, None, None, f"β
Done ({len(outputs)} steps)" |
|
|
|
|
|
except Exception as e: |
|
|
err = f"β Error:\n```\n{traceback.format_exc()}\n```" |
|
|
history.append({"role": "assistant", "content": err}) |
|
|
yield history, None, None, None, None, str(e) |
|
|
|
|
|
|
|
|
def validate_pass(pwd: str): |
|
|
"""Validate passcode.""" |
|
|
global agent |
|
|
|
|
|
if pwd == PASSCODE: |
|
|
try: |
|
|
agent = A1( |
|
|
path="./data", |
|
|
llm="claude-sonnet-4-20250514", |
|
|
source="Anthropic", |
|
|
use_tool_retriever=True, |
|
|
timeout_seconds=600 |
|
|
) |
|
|
return gr.update(visible=False), gr.update(visible=True), "β
Authenticated" |
|
|
except Exception as e: |
|
|
return gr.update(visible=True), gr.update(visible=False), f"β Init error: {e}" |
|
|
else: |
|
|
return gr.update(visible=True), gr.update(visible=False), "β Invalid passcode" |
|
|
|
|
|
|
|
|
def clear_all(): |
|
|
"""Clear everything.""" |
|
|
out_dir = Path("./output") |
|
|
if out_dir.exists(): |
|
|
for f in out_dir.iterdir(): |
|
|
if f.is_file(): |
|
|
f.unlink() |
|
|
return [], None, None, None, None, "Cleared" |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="HistoPath") as demo: |
|
|
gr.HTML("<h1 style='text-align:center'>π¬ HistoPath Agent</h1>") |
|
|
|
|
|
with gr.Group(visible=True) as pass_section: |
|
|
gr.Markdown("### π Enter Passcode") |
|
|
with gr.Row(): |
|
|
pass_input = gr.Textbox(label="Passcode", type="password", scale=3) |
|
|
pass_btn = gr.Button("Unlock", variant="primary", scale=1) |
|
|
pass_status = gr.Textbox(label="Status", interactive=False) |
|
|
|
|
|
with gr.Group(visible=False) as main_section: |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=3): |
|
|
chat = gr.Chatbot(type="messages", height=500) |
|
|
with gr.Row(): |
|
|
msg = gr.Textbox(label="Query", placeholder="Enter query...", scale=4) |
|
|
upload = gr.File(label="Upload", scale=1) |
|
|
with gr.Row(): |
|
|
send = gr.Button("Send", variant="primary", scale=2) |
|
|
clear = gr.Button("Clear", scale=1) |
|
|
status = gr.Textbox(value="Ready", interactive=False, show_label=False) |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
with gr.Tabs(): |
|
|
with gr.Tab("Input"): |
|
|
in_img = gr.Image(height=300) |
|
|
in_file = gr.File(interactive=False) |
|
|
in_stat = gr.Textbox(value="No file", interactive=False, show_label=False) |
|
|
with gr.Tab("Images"): |
|
|
out_imgs = gr.Gallery(height=500) |
|
|
with gr.Tab("Data"): |
|
|
out_data = gr.File(file_count="multiple", interactive=False) |
|
|
|
|
|
|
|
|
pass_btn.click(validate_pass, [pass_input], [pass_section, main_section, pass_status]) |
|
|
upload.change(preview_file, [upload], [in_img, in_file, in_stat]) |
|
|
send.click(process_query, [msg, upload, chat], [chat, out_imgs, out_data, in_img, in_file, status]) |
|
|
clear.click(clear_all, None, [chat, out_imgs, out_data, in_img, in_file, status]) |
|
|
msg.submit(process_query, [msg, upload, chat], [chat, out_imgs, out_data, in_img, in_file, status]) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
Path("./data").mkdir(exist_ok=True) |
|
|
Path("./output").mkdir(exist_ok=True) |
|
|
|
|
|
print("=" * 50) |
|
|
print("π¬ HistoPath Agent - Simplified") |
|
|
print("=" * 50) |
|
|
|
|
|
demo.launch() |