|
|
import os |
|
|
import re |
|
|
import shutil |
|
|
import traceback |
|
|
import gradio as gr |
|
|
from pathlib import Path |
|
|
from histopath.agent import A1 |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
PASSCODE = os.getenv("GRADIO_PASSWORD") |
|
|
|
|
|
|
|
|
agent = None |
|
|
|
|
|
|
|
|
def check_for_output_files(): |
|
|
"""Check for all files in the output directory and return their paths.""" |
|
|
output_dir = Path("./output") |
|
|
if not output_dir.exists(): |
|
|
return [], [] |
|
|
|
|
|
image_extensions = {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff"} |
|
|
data_extensions = {".csv", ".txt", ".json", ".npy"} |
|
|
|
|
|
images = [] |
|
|
data_files = [] |
|
|
|
|
|
for file in output_dir.iterdir(): |
|
|
if file.is_file(): |
|
|
if file.suffix.lower() in image_extensions: |
|
|
images.append(str(file)) |
|
|
elif file.suffix.lower() in data_extensions: |
|
|
data_files.append(str(file)) |
|
|
|
|
|
return images, data_files |
|
|
|
|
|
|
|
|
def preview_uploaded_file(uploaded_file): |
|
|
"""Preview the uploaded file - show image or file info.""" |
|
|
if uploaded_file is None: |
|
|
return None, None, "No file uploaded" |
|
|
|
|
|
file_path = Path(uploaded_file.name) |
|
|
file_ext = file_path.suffix.lower() |
|
|
|
|
|
image_extensions = {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff", ".svs"} |
|
|
|
|
|
if file_ext in image_extensions: |
|
|
|
|
|
return uploaded_file.name, None, f"π· Previewing: {file_path.name}" |
|
|
else: |
|
|
|
|
|
file_size = Path(uploaded_file.name).stat().st_size / 1024 |
|
|
return None, uploaded_file.name, f"π File: {file_path.name} ({file_size:.1f} KB)" |
|
|
|
|
|
|
|
|
def parse_agent_output(output): |
|
|
"""Parse agent output to extract code blocks, observations, and regular text.""" |
|
|
|
|
|
output = re.sub(r'={30,}\s*(Human|Ai)\s+Message\s*={30,}', '', output) |
|
|
output = output.strip() |
|
|
|
|
|
parsed = { |
|
|
"type": "text", |
|
|
"content": output, |
|
|
"code": None, |
|
|
"observation": None, |
|
|
"thinking": None |
|
|
} |
|
|
|
|
|
|
|
|
execute_match = re.search(r'<execute>(.*?)</execute>', output, re.DOTALL) |
|
|
if execute_match: |
|
|
parsed["type"] = "code" |
|
|
parsed["code"] = execute_match.group(1).strip() |
|
|
|
|
|
text_before = output[:execute_match.start()].strip() |
|
|
|
|
|
text_before = re.sub(r'<think>(.*?)</think>', r'\1', text_before, flags=re.DOTALL) |
|
|
text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip() |
|
|
parsed["thinking"] = text_before if text_before else None |
|
|
return parsed |
|
|
|
|
|
|
|
|
observation_match = re.search(r'<observation>(.*?)</observation>', output, re.DOTALL) |
|
|
if observation_match: |
|
|
parsed["type"] = "observation" |
|
|
parsed["observation"] = observation_match.group(1).strip() |
|
|
|
|
|
text_before = output[:observation_match.start()].strip() |
|
|
text_before = re.sub(r'<think>(.*?)</think>', r'\1', text_before, flags=re.DOTALL) |
|
|
text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip() |
|
|
parsed["thinking"] = text_before if text_before else None |
|
|
return parsed |
|
|
|
|
|
|
|
|
solution_match = re.search(r'<solution>(.*?)</solution>', output, re.DOTALL) |
|
|
if solution_match: |
|
|
parsed["type"] = "solution" |
|
|
parsed["content"] = solution_match.group(1).strip() |
|
|
|
|
|
text_before = output[:solution_match.start()].strip() |
|
|
text_before = re.sub(r'<think>(.*?)</think>', r'\1', text_before, flags=re.DOTALL) |
|
|
text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip() |
|
|
parsed["thinking"] = text_before if text_before else None |
|
|
return parsed |
|
|
|
|
|
|
|
|
cleaned = re.sub(r'<think>(.*?)</think>', r'\1', output, flags=re.DOTALL) |
|
|
cleaned = re.sub(r'={30,}.*?={30,}', '', cleaned).strip() |
|
|
parsed["content"] = cleaned |
|
|
|
|
|
return parsed |
|
|
|
|
|
|
|
|
def format_message_for_display(parsed_output): |
|
|
"""Format parsed output into a readable message for the chatbot.""" |
|
|
msg_parts = [] |
|
|
|
|
|
|
|
|
if parsed_output.get("thinking"): |
|
|
msg_parts.append(parsed_output["thinking"]) |
|
|
|
|
|
if parsed_output["type"] == "code": |
|
|
|
|
|
if parsed_output.get("thinking"): |
|
|
msg_parts.append("\n---\n") |
|
|
|
|
|
msg_parts.append("### π» Executing Code\n") |
|
|
msg_parts.append(f"```python\n{parsed_output['code']}\n```") |
|
|
|
|
|
elif parsed_output["type"] == "observation": |
|
|
|
|
|
if parsed_output.get("thinking"): |
|
|
msg_parts.append("\n---\n") |
|
|
|
|
|
msg_parts.append("### π Observation\n") |
|
|
msg_parts.append(f"```\n{parsed_output['observation']}\n```") |
|
|
|
|
|
elif parsed_output["type"] == "solution": |
|
|
|
|
|
if parsed_output.get("thinking"): |
|
|
msg_parts.append("\n---\n") |
|
|
|
|
|
msg_parts.append("### β
Solution\n") |
|
|
msg_parts.append(parsed_output['content']) |
|
|
|
|
|
else: |
|
|
|
|
|
if not parsed_output.get("thinking"): |
|
|
msg_parts.append(parsed_output["content"]) |
|
|
|
|
|
return "\n\n".join(msg_parts) |
|
|
|
|
|
|
|
|
def process_agent_response(prompt, uploaded_file, chatbot_history): |
|
|
"""Process the agent response and update chatbot.""" |
|
|
global agent |
|
|
|
|
|
if agent is None: |
|
|
chatbot_history.append({ |
|
|
"role": "assistant", |
|
|
"content": "β οΈ Please enter the passcode first to initialize the agent." |
|
|
}) |
|
|
yield chatbot_history, None, None, None, None, "β οΈ Agent not initialized" |
|
|
return |
|
|
|
|
|
if not prompt.strip() and uploaded_file is None: |
|
|
chatbot_history.append({ |
|
|
"role": "assistant", |
|
|
"content": "β οΈ Please provide a prompt or upload a file." |
|
|
}) |
|
|
yield chatbot_history, None, None, None, None, "β οΈ No input provided" |
|
|
return |
|
|
|
|
|
|
|
|
file_path = None |
|
|
file_info = "" |
|
|
if uploaded_file is not None: |
|
|
try: |
|
|
|
|
|
data_dir = Path("./data") |
|
|
data_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
file_name = Path(uploaded_file.name).name |
|
|
file_path = data_dir / file_name |
|
|
shutil.copy(uploaded_file.name, file_path) |
|
|
|
|
|
file_info = f"\n\nπ **Uploaded file:** `{file_path}`\n" |
|
|
|
|
|
|
|
|
if prompt.strip(): |
|
|
prompt = f"{prompt}\n\nUploaded file path: {file_path}" |
|
|
else: |
|
|
prompt = f"I have uploaded a file at: {file_path}. Please analyze it." |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"β Error handling file upload: {str(e)}" |
|
|
chatbot_history.append({ |
|
|
"role": "assistant", |
|
|
"content": error_msg |
|
|
}) |
|
|
yield chatbot_history, None, None, None, None, error_msg |
|
|
return |
|
|
|
|
|
|
|
|
user_message = prompt if not file_info else f"{prompt}{file_info}" |
|
|
chatbot_history.append({"role": "user", "content": user_message}) |
|
|
yield chatbot_history, None, None, None, None, "π Processing..." |
|
|
|
|
|
try: |
|
|
|
|
|
step_count = 0 |
|
|
for step in agent.go_stream(prompt): |
|
|
step_count += 1 |
|
|
output = step.get("output", "") |
|
|
|
|
|
if output: |
|
|
|
|
|
parsed = parse_agent_output(output) |
|
|
|
|
|
|
|
|
if parsed.get("thinking"): |
|
|
chatbot_history.append({ |
|
|
"role": "assistant", |
|
|
"content": parsed["thinking"] |
|
|
}) |
|
|
|
|
|
|
|
|
if parsed["type"] == "code" and parsed["code"]: |
|
|
chatbot_history.append({ |
|
|
"role": "assistant", |
|
|
"content": f"### π» Executing Code\n\n```python\n{parsed['code']}\n```" |
|
|
}) |
|
|
elif parsed["type"] == "observation" and parsed["observation"]: |
|
|
chatbot_history.append({ |
|
|
"role": "assistant", |
|
|
"content": f"### π Observation\n\n```\n{parsed['observation']}\n```" |
|
|
}) |
|
|
elif parsed["type"] == "solution": |
|
|
chatbot_history.append({ |
|
|
"role": "assistant", |
|
|
"content": f"### β
Solution\n\n{parsed['content']}" |
|
|
}) |
|
|
elif parsed["type"] == "text" and parsed["content"]: |
|
|
|
|
|
if not parsed.get("thinking"): |
|
|
chatbot_history.append({ |
|
|
"role": "assistant", |
|
|
"content": parsed["content"] |
|
|
}) |
|
|
|
|
|
|
|
|
images, data_files = check_for_output_files() |
|
|
|
|
|
|
|
|
status = f"π Step {step_count}" |
|
|
if parsed["type"] == "code": |
|
|
status += " - Executing code..." |
|
|
elif parsed["type"] == "observation": |
|
|
status += " - Processing results..." |
|
|
elif parsed["type"] == "solution": |
|
|
status += " - Finalizing solution..." |
|
|
|
|
|
yield ( |
|
|
chatbot_history, |
|
|
images if images else None, |
|
|
data_files if data_files else None, |
|
|
None, |
|
|
None, |
|
|
status |
|
|
) |
|
|
|
|
|
|
|
|
final_images, final_data = check_for_output_files() |
|
|
|
|
|
|
|
|
if final_images or final_data: |
|
|
download_msg = "\n\n---\n\n### π Generated Files Ready for Download\n\n" |
|
|
|
|
|
if final_images: |
|
|
download_msg += f"**πΌοΈ Images ({len(final_images)})** - Available in the **Images** tab β\n" |
|
|
for img_path in final_images: |
|
|
img_name = Path(img_path).name |
|
|
download_msg += f"- `{img_name}`\n" |
|
|
download_msg += "\n" |
|
|
|
|
|
if final_data: |
|
|
download_msg += f"**π Data Files ({len(final_data)})** - Available in the **Data** tab β\n" |
|
|
for data_path in final_data: |
|
|
data_name = Path(data_path).name |
|
|
download_msg += f"- `{data_name}`\n" |
|
|
|
|
|
download_msg += "\n*Click the download button on each file in the respective tabs above.*" |
|
|
|
|
|
|
|
|
chatbot_history.append({ |
|
|
"role": "assistant", |
|
|
"content": download_msg |
|
|
}) |
|
|
|
|
|
status = "β
Complete" |
|
|
if final_images: |
|
|
status += f" | {len(final_images)} image(s)" |
|
|
if final_data: |
|
|
status += f" | {len(final_data)} data file(s)" |
|
|
|
|
|
yield chatbot_history, final_images if final_images else None, final_data if final_data else None, None, None, status |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"β Error: {str(e)}\n\n```\n{traceback.format_exc()}\n```" |
|
|
chatbot_history.append({ |
|
|
"role": "assistant", |
|
|
"content": error_msg |
|
|
}) |
|
|
yield chatbot_history, None, None, None, None, "β Error occurred" |
|
|
|
|
|
|
|
|
def validate_passcode(passcode): |
|
|
"""Validate the passcode and initialize the agent.""" |
|
|
global agent |
|
|
|
|
|
if passcode == PASSCODE: |
|
|
|
|
|
try: |
|
|
agent = A1() |
|
|
return ( |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=True), |
|
|
"β
Access granted! Agent initialized and ready." |
|
|
) |
|
|
except Exception as e: |
|
|
error_trace = traceback.format_exc() |
|
|
return ( |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=False), |
|
|
f"β Error initializing agent:\n{str(e)}\n\n{error_trace}" |
|
|
) |
|
|
else: |
|
|
return ( |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=False), |
|
|
"β Invalid passcode. Please try again." |
|
|
) |
|
|
|
|
|
|
|
|
def clear_chat(): |
|
|
"""Clear the chat history and output files.""" |
|
|
|
|
|
output_dir = Path("./output") |
|
|
if output_dir.exists(): |
|
|
shutil.rmtree(output_dir) |
|
|
output_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
data_dir = Path("./data") |
|
|
if data_dir.exists(): |
|
|
for file in data_dir.iterdir(): |
|
|
if file.is_file(): |
|
|
file.unlink() |
|
|
|
|
|
return [], None, None, None, None, "ποΈ Chat cleared" |
|
|
|
|
|
|
|
|
|
|
|
custom_theme = gr.themes.Soft( |
|
|
primary_hue="blue", |
|
|
secondary_hue="slate", |
|
|
spacing_size="sm", |
|
|
radius_size="md", |
|
|
).set( |
|
|
button_primary_background_fill="*primary_500", |
|
|
button_primary_background_fill_hover="*primary_600", |
|
|
block_label_text_weight="600", |
|
|
block_title_text_weight="600", |
|
|
) |
|
|
|
|
|
with gr.Blocks(title="HistoPath Agent", theme=custom_theme, css=""" |
|
|
.gradio-container { |
|
|
max-width: 100% !important; |
|
|
} |
|
|
.main-header { |
|
|
text-align: center; |
|
|
padding: 1.5rem 0; |
|
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
|
|
color: white; |
|
|
border-radius: 8px; |
|
|
margin-bottom: 1.5rem; |
|
|
} |
|
|
.main-header h1 { |
|
|
margin: 0; |
|
|
font-size: 2.2rem; |
|
|
font-weight: 700; |
|
|
} |
|
|
.main-header p { |
|
|
margin: 0.5rem 0 0 0; |
|
|
opacity: 0.95; |
|
|
font-size: 1.1rem; |
|
|
} |
|
|
.file-upload-box .wrap { |
|
|
min-width: 0 !important; |
|
|
} |
|
|
.file-upload-box .file-name { |
|
|
word-break: break-word !important; |
|
|
white-space: normal !important; |
|
|
overflow-wrap: break-word !important; |
|
|
} |
|
|
.tab-nav { |
|
|
margin-bottom: 0.5rem; |
|
|
} |
|
|
/* Better styling for code and observation blocks */ |
|
|
.message.bot pre { |
|
|
background-color: #f6f8fa !important; |
|
|
border: 1px solid #d0d7de !important; |
|
|
border-radius: 6px !important; |
|
|
padding: 12px !important; |
|
|
margin: 8px 0 !important; |
|
|
} |
|
|
.message.bot h3 { |
|
|
margin-top: 12px !important; |
|
|
margin-bottom: 8px !important; |
|
|
font-weight: 600 !important; |
|
|
} |
|
|
.message.bot hr { |
|
|
border: none !important; |
|
|
border-top: 2px solid #e1e4e8 !important; |
|
|
margin: 16px 0 !important; |
|
|
} |
|
|
""") as demo: |
|
|
|
|
|
|
|
|
gr.HTML(""" |
|
|
<div class="main-header"> |
|
|
<h1>π¬ HistoPath Agent</h1> |
|
|
<p>AI-Powered Histopathology Analysis Assistant</p> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
|
|
|
with gr.Group(visible=True) as passcode_section: |
|
|
gr.Markdown("### π Authentication Required") |
|
|
|
|
|
with gr.Row(): |
|
|
passcode_input = gr.Textbox( |
|
|
label="Passcode", |
|
|
type="password", |
|
|
placeholder="Enter your passcode...", |
|
|
scale=3 |
|
|
) |
|
|
passcode_btn = gr.Button("π Unlock", variant="primary", scale=1, size="lg") |
|
|
|
|
|
passcode_status = gr.Textbox( |
|
|
label="Status", |
|
|
interactive=False, |
|
|
lines=2 |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Group(visible=False) as main_interface: |
|
|
with gr.Row(equal_height=True): |
|
|
|
|
|
with gr.Column(scale=3): |
|
|
chatbot = gr.Chatbot( |
|
|
label="π¬ Conversation", |
|
|
height=550, |
|
|
type="messages", |
|
|
show_label=True, |
|
|
avatar_images=(None, "π€"), |
|
|
render_markdown=True, |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=7): |
|
|
prompt_input = gr.Textbox( |
|
|
label="Your Query", |
|
|
placeholder="E.g., 'Caption the uploaded whole slide image' or 'Segment cells using instanseg model'", |
|
|
lines=2, |
|
|
max_lines=5, |
|
|
show_label=False, |
|
|
) |
|
|
with gr.Column(scale=3): |
|
|
file_upload = gr.File( |
|
|
label="π Upload File", |
|
|
file_types=[".svs", ".png", ".jpg", ".jpeg", ".tif", ".tiff", ".csv", ".txt", ".json", ".npy"], |
|
|
height=75, |
|
|
elem_classes="file-upload-box", |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
submit_btn = gr.Button("π Submit", variant="primary", scale=3, size="lg") |
|
|
clear_btn = gr.Button("ποΈ Clear", scale=1, size="lg", variant="secondary") |
|
|
|
|
|
status_text = gr.Textbox( |
|
|
label="Status", |
|
|
interactive=False, |
|
|
value="Ready", |
|
|
show_label=False, |
|
|
container=False, |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Column(scale=2): |
|
|
with gr.Tabs(): |
|
|
with gr.Tab("π₯ Input"): |
|
|
with gr.Column(): |
|
|
input_image_preview = gr.Image( |
|
|
label="Input Image", |
|
|
height=400, |
|
|
show_label=False, |
|
|
container=True, |
|
|
) |
|
|
input_file_preview = gr.File( |
|
|
label="Input File", |
|
|
interactive=False, |
|
|
height=100, |
|
|
show_label=False, |
|
|
container=True, |
|
|
) |
|
|
input_status = gr.Textbox( |
|
|
value="Upload a file to preview", |
|
|
show_label=False, |
|
|
interactive=False, |
|
|
container=False, |
|
|
) |
|
|
|
|
|
with gr.Tab("πΌοΈ Images"): |
|
|
output_gallery = gr.Gallery( |
|
|
label="Generated Visualizations", |
|
|
columns=1, |
|
|
height=600, |
|
|
object_fit="contain", |
|
|
show_label=False, |
|
|
show_download_button=True, |
|
|
) |
|
|
|
|
|
with gr.Tab("π Data"): |
|
|
data_files = gr.File( |
|
|
label="Generated Data Files", |
|
|
file_count="multiple", |
|
|
interactive=False, |
|
|
height=600, |
|
|
show_label=False, |
|
|
) |
|
|
|
|
|
|
|
|
passcode_btn.click( |
|
|
fn=validate_passcode, |
|
|
inputs=[passcode_input], |
|
|
outputs=[passcode_section, main_interface, passcode_status] |
|
|
) |
|
|
|
|
|
|
|
|
file_upload.change( |
|
|
fn=preview_uploaded_file, |
|
|
inputs=[file_upload], |
|
|
outputs=[input_image_preview, input_file_preview, input_status] |
|
|
) |
|
|
|
|
|
submit_btn.click( |
|
|
fn=process_agent_response, |
|
|
inputs=[prompt_input, file_upload, chatbot], |
|
|
outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text] |
|
|
) |
|
|
|
|
|
clear_btn.click( |
|
|
fn=clear_chat, |
|
|
outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text] |
|
|
) |
|
|
|
|
|
|
|
|
prompt_input.submit( |
|
|
fn=process_agent_response, |
|
|
inputs=[prompt_input, file_upload, chatbot], |
|
|
outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text] |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
Path("./data").mkdir(exist_ok=True) |
|
|
Path("./output").mkdir(exist_ok=True) |
|
|
|
|
|
print("=" * 60) |
|
|
print("π¬ HistoPath Agent - Gradio Interface") |
|
|
print("=" * 60) |
|
|
print(f"Passcode: {PASSCODE}") |
|
|
print("Starting server...") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
demo.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=None, |
|
|
share=False, |
|
|
show_error=True, |
|
|
) |