HistoPath / app.py
ryanDing26
Please work
fddca78
"""
SIMPLIFIED VERSION - No monkey patching, just clean type hints
"""
import os
import re
import shutil
import traceback
import gradio as gr
from pathlib import Path
from typing import List, Dict, Any, Tuple, Optional
from histopath.agent import A1
from dotenv import load_dotenv
# Load environment
if os.path.exists(".env"):
load_dotenv()
PASSCODE = os.getenv("GRADIO_PASSWORD", "TESTING")
agent = None
def check_for_output_files():
"""Check output directory for files."""
output_dir = Path("./output")
if not output_dir.exists():
return [], []
images = [str(f) for f in output_dir.glob("*") if f.suffix.lower() in {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff"}]
data = [str(f) for f in output_dir.glob("*") if f.suffix.lower() in {".csv", ".txt", ".json", ".npy"}]
return images, data
def preview_file(file):
"""Preview uploaded file."""
if file is None:
return None, None, "No file"
path = Path(file.name)
if path.suffix.lower() in {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff", ".svs"}:
return file.name, None, f"Preview: {path.name}"
else:
size = path.stat().st_size / 1024
return None, file.name, f"File: {path.name} ({size:.1f} KB)"
def parse_output(text):
"""Parse agent output."""
text = re.sub(r'={30,}.*?={30,}', '', text, flags=re.DOTALL).strip()
result = {"type": "text", "content": text, "code": None, "obs": None, "think": None}
code_match = re.search(r'<execute>(.*?)</execute>', text, re.DOTALL)
if code_match:
result["type"] = "code"
result["code"] = code_match.group(1).strip()
before = text[:code_match.start()].strip()
before = re.sub(r'<think>(.*?)</think>', r'\1', before, flags=re.DOTALL)
result["think"] = before or None
return result
obs_match = re.search(r'<observation>(.*?)</observation>', text, re.DOTALL)
if obs_match:
result["type"] = "obs"
result["obs"] = obs_match.group(1).strip()
before = text[:obs_match.start()].strip()
before = re.sub(r'<think>(.*?)</think>', r'\1', before, flags=re.DOTALL)
result["think"] = before or None
return result
sol_match = re.search(r'<solution>(.*?)</solution>', text, re.DOTALL)
if sol_match:
result["type"] = "solution"
result["content"] = sol_match.group(1).strip()
before = text[:sol_match.start()].strip()
before = re.sub(r'<think>(.*?)</think>', r'\1', before, flags=re.DOTALL)
result["think"] = before or None
return result
result["content"] = re.sub(r'<think>(.*?)</think>', r'\1', text, flags=re.DOTALL)
return result
def format_display(parsed):
"""Format for display."""
parts = []
if parsed.get("think"):
parts.append(parsed["think"])
if parsed["type"] == "code":
if parsed.get("think"):
parts.append("\n---\n")
parts.append("### πŸ’» Code\n")
parts.append(f"```python\n{parsed['code']}\n```")
elif parsed["type"] == "obs":
if parsed.get("think"):
parts.append("\n---\n")
parts.append("### πŸ“Š Output\n")
parts.append(f"```\n{parsed['obs']}\n```")
elif parsed["type"] == "solution":
if parsed.get("think"):
parts.append("\n---\n")
parts.append("### βœ… Solution\n")
parts.append(parsed['content'])
else:
if not parsed.get("think"):
parts.append(parsed["content"])
return "\n\n".join(parts)
# CRITICAL: Simple types only
def process_query(prompt: str, file: Any, history: List[Dict[str, str]]):
"""Process user query - SIMPLE TYPES ONLY."""
global agent
# Initialize history if None
if history is None:
history = []
# Check agent
if agent is None:
history.append({"role": "assistant", "content": "⚠️ Enter passcode first"})
yield history, None, None, None, None, "Not initialized"
return
# Check input
if not prompt.strip() and file is None:
history.append({"role": "assistant", "content": "⚠️ Provide prompt or file"})
yield history, None, None, None, None, "No input"
return
# Handle file
if file is not None:
try:
Path("./data").mkdir(exist_ok=True)
fname = Path(file.name).name
fpath = Path("./data") / fname
shutil.copy(file.name, fpath)
prompt = f"{prompt}\n\nFile: {fpath}" if prompt.strip() else f"File at: {fpath}"
except Exception as e:
history.append({"role": "assistant", "content": f"❌ File error: {e}"})
yield history, None, None, None, None, str(e)
return
# Add user message
history.append({"role": "user", "content": prompt})
yield history, None, None, None, None, "Processing..."
# Run agent
try:
outputs = []
for step in agent.go_stream(prompt):
outputs.append(step.get("output", ""))
# Format outputs
for out in outputs:
if not out.strip():
continue
parsed = parse_output(out)
msg = format_display(parsed)
if msg.strip():
history.append({"role": "assistant", "content": msg})
# Get results
imgs, data = check_for_output_files()
yield history, imgs, data, None, None, f"βœ… Done ({len(outputs)} steps)"
except Exception as e:
err = f"❌ Error:\n```\n{traceback.format_exc()}\n```"
history.append({"role": "assistant", "content": err})
yield history, None, None, None, None, str(e)
def validate_pass(pwd: str):
"""Validate passcode."""
global agent
if pwd == PASSCODE:
try:
agent = A1(
path="./data",
llm="claude-sonnet-4-20250514",
source="Anthropic",
use_tool_retriever=True,
timeout_seconds=600
)
return gr.update(visible=False), gr.update(visible=True), "βœ… Authenticated"
except Exception as e:
return gr.update(visible=True), gr.update(visible=False), f"❌ Init error: {e}"
else:
return gr.update(visible=True), gr.update(visible=False), "❌ Invalid passcode"
def clear_all():
"""Clear everything."""
out_dir = Path("./output")
if out_dir.exists():
for f in out_dir.iterdir():
if f.is_file():
f.unlink()
return [], None, None, None, None, "Cleared"
# UI
with gr.Blocks(title="HistoPath") as demo:
gr.HTML("<h1 style='text-align:center'>πŸ”¬ HistoPath Agent</h1>")
with gr.Group(visible=True) as pass_section:
gr.Markdown("### πŸ” Enter Passcode")
with gr.Row():
pass_input = gr.Textbox(label="Passcode", type="password", scale=3)
pass_btn = gr.Button("Unlock", variant="primary", scale=1)
pass_status = gr.Textbox(label="Status", interactive=False)
with gr.Group(visible=False) as main_section:
with gr.Row():
with gr.Column(scale=3):
chat = gr.Chatbot(type="messages", height=500)
with gr.Row():
msg = gr.Textbox(label="Query", placeholder="Enter query...", scale=4)
upload = gr.File(label="Upload", scale=1)
with gr.Row():
send = gr.Button("Send", variant="primary", scale=2)
clear = gr.Button("Clear", scale=1)
status = gr.Textbox(value="Ready", interactive=False, show_label=False)
with gr.Column(scale=2):
with gr.Tabs():
with gr.Tab("Input"):
in_img = gr.Image(height=300)
in_file = gr.File(interactive=False)
in_stat = gr.Textbox(value="No file", interactive=False, show_label=False)
with gr.Tab("Images"):
out_imgs = gr.Gallery(height=500)
with gr.Tab("Data"):
out_data = gr.File(file_count="multiple", interactive=False)
# Events
pass_btn.click(validate_pass, [pass_input], [pass_section, main_section, pass_status])
upload.change(preview_file, [upload], [in_img, in_file, in_stat])
send.click(process_query, [msg, upload, chat], [chat, out_imgs, out_data, in_img, in_file, status])
clear.click(clear_all, None, [chat, out_imgs, out_data, in_img, in_file, status])
msg.submit(process_query, [msg, upload, chat], [chat, out_imgs, out_data, in_img, in_file, status])
if __name__ == "__main__":
Path("./data").mkdir(exist_ok=True)
Path("./output").mkdir(exist_ok=True)
print("=" * 50)
print("πŸ”¬ HistoPath Agent - Simplified")
print("=" * 50)
demo.launch()