# import os
# import re
# import shutil
# import traceback
# import gradio as gr
# from pathlib import Path
# from histopath.agent import A1
# from dotenv import load_dotenv
# # Load environment variables
# load_dotenv()
# # Get passcode from environment
# PASSCODE = os.getenv("GRADIO_PASSWORD")
# # Initialize agent (will be created after passcode validation)
# agent = None
# def check_for_output_files():
# """Check for all files in the output directory and return their paths."""
# output_dir = Path("./output")
# if not output_dir.exists():
# return [], []
# image_extensions = {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff"}
# data_extensions = {".csv", ".txt", ".json", ".npy"}
# images = []
# data_files = []
# for file in output_dir.iterdir():
# if file.is_file():
# if file.suffix.lower() in image_extensions:
# images.append(str(file))
# elif file.suffix.lower() in data_extensions:
# data_files.append(str(file))
# return images, data_files
# def preview_uploaded_file(uploaded_file):
# """Preview the uploaded file - show image or file info."""
# if uploaded_file is None:
# return None, None, "No file uploaded"
# file_path = Path(uploaded_file.name)
# file_ext = file_path.suffix.lower()
# image_extensions = {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff", ".svs"}
# if file_ext in image_extensions:
# # Show image preview
# return uploaded_file.name, None, f"š· Previewing: {file_path.name}"
# else:
# # Show file info
# file_size = Path(uploaded_file.name).stat().st_size / 1024 # KB
# return None, uploaded_file.name, f"š File: {file_path.name} ({file_size:.1f} KB)"
# def parse_agent_output(output):
# """Parse agent output to extract code blocks, observations, and regular text."""
# # Strip out the message divider bars
# output = re.sub(r'={30,}\s*(Human|Ai)\s+Message\s*={30,}', '', output)
# output = output.strip()
# parsed = {
# "type": "text",
# "content": output,
# "code": None,
# "observation": None,
# "thinking": None
# }
# # Check for code execution block
# execute_match = re.search(r'(.*?)', output, re.DOTALL)
# if execute_match:
# parsed["type"] = "code"
# parsed["code"] = execute_match.group(1).strip()
# # Extract text before the code block (thinking/explanation)
# text_before = output[:execute_match.start()].strip()
# # Remove any think tags but keep the content
# text_before = re.sub(r'(.*?)', r'\1', text_before, flags=re.DOTALL)
# text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip()
# parsed["thinking"] = text_before if text_before else None
# return parsed
# # Check for observation block
# observation_match = re.search(r'(.*?)', output, re.DOTALL)
# if observation_match:
# parsed["type"] = "observation"
# parsed["observation"] = observation_match.group(1).strip()
# # Extract text before observation if any
# text_before = output[:observation_match.start()].strip()
# text_before = re.sub(r'(.*?)', r'\1', text_before, flags=re.DOTALL)
# text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip()
# parsed["thinking"] = text_before if text_before else None
# return parsed
# # Check for solution block
# solution_match = re.search(r'(.*?)', output, re.DOTALL)
# if solution_match:
# parsed["type"] = "solution"
# parsed["content"] = solution_match.group(1).strip()
# # Get thinking before solution
# text_before = output[:solution_match.start()].strip()
# text_before = re.sub(r'(.*?)', r'\1', text_before, flags=re.DOTALL)
# text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip()
# parsed["thinking"] = text_before if text_before else None
# return parsed
# # Clean up any remaining tags for display
# cleaned = re.sub(r'(.*?)', r'\1', output, flags=re.DOTALL)
# cleaned = re.sub(r'={30,}.*?={30,}', '', cleaned).strip()
# parsed["content"] = cleaned
# return parsed
# def format_message_for_display(parsed_output):
# """Format parsed output into a readable message for the chatbot."""
# msg_parts = []
# # Add thinking/explanation text first if present
# if parsed_output.get("thinking"):
# msg_parts.append(parsed_output["thinking"])
# if parsed_output["type"] == "code":
# # Add separator if there was thinking text
# if parsed_output.get("thinking"):
# msg_parts.append("\n---\n")
# msg_parts.append("### š» Executing Code\n")
# msg_parts.append(f"```python\n{parsed_output['code']}\n```")
# elif parsed_output["type"] == "observation":
# # Add separator if there was thinking text
# if parsed_output.get("thinking"):
# msg_parts.append("\n---\n")
# msg_parts.append("### š Observation\n")
# msg_parts.append(f"```\n{parsed_output['observation']}\n```")
# elif parsed_output["type"] == "solution":
# # Add separator if there was thinking text
# if parsed_output.get("thinking"):
# msg_parts.append("\n---\n")
# msg_parts.append("### ā
Solution\n")
# msg_parts.append(parsed_output['content'])
# else:
# # For regular text, just add the content if thinking wasn't already set
# if not parsed_output.get("thinking"):
# msg_parts.append(parsed_output["content"])
# return "\n\n".join(msg_parts)
# def process_agent_response(prompt, uploaded_file, chatbot_history):
# """Process the agent response and update chatbot."""
# global agent
# if agent is None:
# chatbot_history.append({
# "role": "assistant",
# "content": "ā ļø Please enter the passcode first to initialize the agent."
# })
# yield chatbot_history, None, None, None, None, "ā ļø Agent not initialized"
# return
# if not prompt.strip() and uploaded_file is None:
# chatbot_history.append({
# "role": "assistant",
# "content": "ā ļø Please provide a prompt or upload a file."
# })
# yield chatbot_history, None, None, None, None, "ā ļø No input provided"
# return
# # Handle file upload
# file_path = None
# file_info = ""
# if uploaded_file is not None:
# try:
# # Create data directory if it doesn't exist
# data_dir = Path("./data")
# data_dir.mkdir(exist_ok=True)
# # Copy uploaded file to data directory
# file_name = Path(uploaded_file.name).name
# file_path = data_dir / file_name
# shutil.copy(uploaded_file.name, file_path)
# file_info = f"\n\nš **Uploaded file:** `{file_path}`\n"
# # Augment prompt with file path
# if prompt.strip():
# prompt = f"{prompt}\n\nUploaded file path: {file_path}"
# else:
# prompt = f"I have uploaded a file at: {file_path}. Please analyze it."
# except Exception as e:
# error_msg = f"ā Error handling file upload: {str(e)}"
# chatbot_history.append({
# "role": "assistant",
# "content": error_msg
# })
# yield chatbot_history, None, None, None, None, error_msg
# return
# # Add user message to chat
# user_message = prompt if not file_info else f"{prompt}{file_info}"
# chatbot_history.append({"role": "user", "content": user_message})
# yield chatbot_history, None, None, None, None, "š Processing..."
# try:
# # Stream agent responses
# step_count = 0
# for step in agent.go_stream(prompt):
# step_count += 1
# output = step.get("output", "")
# if output:
# # Parse the output
# parsed = parse_agent_output(output)
# # Add thinking text as separate message if present
# if parsed.get("thinking"):
# chatbot_history.append({
# "role": "assistant",
# "content": parsed["thinking"]
# })
# # Add the block (code/observation/solution) as separate message if present
# if parsed["type"] == "code" and parsed["code"]:
# chatbot_history.append({
# "role": "assistant",
# "content": f"### š» Executing Code\n\n```python\n{parsed['code']}\n```"
# })
# elif parsed["type"] == "observation" and parsed["observation"]:
# chatbot_history.append({
# "role": "assistant",
# "content": f"### š Observation\n\n```\n{parsed['observation']}\n```"
# })
# elif parsed["type"] == "solution":
# chatbot_history.append({
# "role": "assistant",
# "content": f"### ā
Solution\n\n{parsed['content']}"
# })
# elif parsed["type"] == "text" and parsed["content"]:
# # Only add if we haven't already added it as thinking
# if not parsed.get("thinking"):
# chatbot_history.append({
# "role": "assistant",
# "content": parsed["content"]
# })
# # Check for output files after each step
# images, data_files = check_for_output_files()
# # Create status message
# status = f"š Step {step_count}"
# if parsed["type"] == "code":
# status += " - Executing code..."
# elif parsed["type"] == "observation":
# status += " - Processing results..."
# elif parsed["type"] == "solution":
# status += " - Finalizing solution..."
# yield (
# chatbot_history,
# images if images else None,
# data_files if data_files else None,
# None,
# None,
# status
# )
# # Final check for files
# final_images, final_data = check_for_output_files()
# # Create download links message if files were generated
# if final_images or final_data:
# download_msg = "\n\n---\n\n### š Generated Files Ready for Download\n\n"
# if final_images:
# download_msg += f"**š¼ļø Images ({len(final_images)})** - Available in the **Images** tab ā\n"
# for img_path in final_images:
# img_name = Path(img_path).name
# download_msg += f"- `{img_name}`\n"
# download_msg += "\n"
# if final_data:
# download_msg += f"**š Data Files ({len(final_data)})** - Available in the **Data** tab ā\n"
# for data_path in final_data:
# data_name = Path(data_path).name
# download_msg += f"- `{data_name}`\n"
# download_msg += "\n*Click the download button on each file in the respective tabs above.*"
# # Add download message as separate bubble
# chatbot_history.append({
# "role": "assistant",
# "content": download_msg
# })
# status = "ā
Complete"
# if final_images:
# status += f" | {len(final_images)} image(s)"
# if final_data:
# status += f" | {len(final_data)} data file(s)"
# yield chatbot_history, final_images if final_images else None, final_data if final_data else None, None, None, status
# except Exception as e:
# error_msg = f"ā Error: {str(e)}\n\n```\n{traceback.format_exc()}\n```"
# chatbot_history.append({
# "role": "assistant",
# "content": error_msg
# })
# yield chatbot_history, None, None, None, None, "ā Error occurred"
# def validate_passcode(passcode):
# """Validate the passcode and initialize the agent."""
# global agent
# if passcode == PASSCODE:
# # Initialize agent
# try:
# agent = A1()
# return (
# gr.update(visible=False), # Hide passcode section
# gr.update(visible=True), # Show main interface
# "ā
Access granted! Agent initialized and ready."
# )
# except Exception as e:
# error_trace = traceback.format_exc()
# return (
# gr.update(visible=True),
# gr.update(visible=False),
# f"ā Error initializing agent:\n{str(e)}\n\n{error_trace}"
# )
# else:
# return (
# gr.update(visible=True),
# gr.update(visible=False),
# "ā Invalid passcode. Please try again."
# )
# def clear_chat():
# """Clear the chat history and output files."""
# # Clean up output directory
# output_dir = Path("./output")
# if output_dir.exists():
# shutil.rmtree(output_dir)
# output_dir.mkdir(exist_ok=True)
# # Clean up data directory
# data_dir = Path("./data")
# if data_dir.exists():
# for file in data_dir.iterdir():
# if file.is_file():
# file.unlink()
# return [], None, None, None, None, "šļø Chat cleared"
# # Create Gradio interface with custom theme
# custom_theme = gr.themes.Soft(
# primary_hue="blue",
# secondary_hue="slate",
# spacing_size="sm",
# radius_size="md",
# ).set(
# button_primary_background_fill="*primary_500",
# button_primary_background_fill_hover="*primary_600",
# block_label_text_weight="600",
# block_title_text_weight="600",
# )
# with gr.Blocks(title="HistoPath Agent", theme=custom_theme, css="""
# .gradio-container {
# max-width: 100% !important;
# }
# .main-header {
# text-align: center;
# padding: 1.5rem 0;
# background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
# color: white;
# border-radius: 8px;
# margin-bottom: 1.5rem;
# }
# .main-header h1 {
# margin: 0;
# font-size: 2.2rem;
# font-weight: 700;
# }
# .main-header p {
# margin: 0.5rem 0 0 0;
# opacity: 0.95;
# font-size: 1.1rem;
# }
# .file-upload-box .wrap {
# min-width: 0 !important;
# }
# .file-upload-box .file-name {
# word-break: break-word !important;
# white-space: normal !important;
# overflow-wrap: break-word !important;
# }
# .tab-nav {
# margin-bottom: 0.5rem;
# }
# /* Better styling for code and observation blocks */
# .message.bot pre {
# background-color: #f6f8fa !important;
# border: 1px solid #d0d7de !important;
# border-radius: 6px !important;
# padding: 12px !important;
# margin: 8px 0 !important;
# }
# .message.bot h3 {
# margin-top: 12px !important;
# margin-bottom: 8px !important;
# font-weight: 600 !important;
# }
# .message.bot hr {
# border: none !important;
# border-top: 2px solid #e1e4e8 !important;
# margin: 16px 0 !important;
# }
# """) as demo:
# # Header
# gr.HTML("""
#
#
š¬ HistoPath Agent
#
AI-Powered Histopathology Analysis Assistant
#
# """)
# # Passcode section
# with gr.Group(visible=True) as passcode_section:
# gr.Markdown("### š Authentication Required")
# with gr.Row():
# passcode_input = gr.Textbox(
# label="Passcode",
# type="password",
# placeholder="Enter your passcode...",
# scale=3
# )
# passcode_btn = gr.Button("š Unlock", variant="primary", scale=1, size="lg")
# passcode_status = gr.Textbox(
# label="Status",
# interactive=False,
# lines=2
# )
# # Main interface (hidden initially)
# with gr.Group(visible=False) as main_interface:
# with gr.Row(equal_height=True):
# # Left column - Chat interface
# with gr.Column(scale=3):
# chatbot = gr.Chatbot(
# label="š¬ Conversation",
# height=550,
# # type="messages",
# show_label=True,
# render_markdown=True,
# )
# # Input area
# with gr.Row():
# with gr.Column(scale=7):
# prompt_input = gr.Textbox(
# label="Your Query",
# placeholder="E.g., 'Caption the uploaded whole slide image' or 'Segment cells using instanseg model'",
# lines=2,
# max_lines=5,
# show_label=False,
# )
# with gr.Column(scale=3):
# file_upload = gr.File(
# label="š Upload File",
# file_types=[".svs", ".png", ".jpg", ".jpeg", ".tif", ".tiff", ".csv", ".txt", ".json", ".npy"],
# height=75,
# elem_classes="file-upload-box",
# )
# with gr.Row():
# submit_btn = gr.Button("š Submit", variant="primary", scale=3, size="lg")
# clear_btn = gr.Button("šļø Clear", scale=1, size="lg", variant="secondary")
# status_text = gr.Textbox(
# label="Status",
# interactive=False,
# value="Ready",
# show_label=False,
# container=False,
# )
# # Right column - Outputs
# with gr.Column(scale=2):
# with gr.Tabs():
# with gr.Tab("š„ Input"):
# with gr.Column():
# input_image_preview = gr.Image(
# label="Input Image",
# height=400,
# show_label=False,
# container=True,
# )
# input_file_preview = gr.File(
# label="Input File",
# interactive=False,
# height=100,
# show_label=False,
# container=True,
# )
# input_status = gr.Textbox(
# value="Upload a file to preview",
# show_label=False,
# interactive=False,
# container=False,
# )
# with gr.Tab("š¼ļø Images"):
# output_gallery = gr.Gallery(
# label="Generated Visualizations",
# columns=1,
# height=600,
# object_fit="contain",
# show_label=False,
# show_download_button=True,
# )
# with gr.Tab("š Data"):
# data_files = gr.File(
# label="Generated Data Files",
# file_count="multiple",
# interactive=False,
# height=600,
# show_label=False,
# )
# # Event handlers
# passcode_btn.click(
# fn=validate_passcode,
# inputs=[passcode_input],
# outputs=[passcode_section, main_interface, passcode_status]
# )
# # File upload preview
# file_upload.change(
# fn=preview_uploaded_file,
# inputs=[file_upload],
# outputs=[input_image_preview, input_file_preview, input_status]
# )
# submit_btn.click(
# fn=process_agent_response,
# inputs=[prompt_input, file_upload, chatbot],
# outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text]
# )
# clear_btn.click(
# fn=clear_chat,
# outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text]
# )
# # Allow enter key to submit
# prompt_input.submit(
# fn=process_agent_response,
# inputs=[prompt_input, file_upload, chatbot],
# outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text]
# )
# if __name__ == "__main__":
# # Create necessary directories
# Path("./data").mkdir(exist_ok=True)
# Path("./output").mkdir(exist_ok=True)
# print("=" * 60)
# print("š¬ HistoPath Agent - Gradio Interface")
# print("=" * 60)
# # print(f"Passcode: {PASSCODE}")
# print("Starting server...")
# print("=" * 60)
# # Launch the app
# #demo.launch(
# # server_name="0.0.0.0",
# # server_port=7860, # Let Gradio auto-pick an available port
# # share=True,
# # show_error=True,
# #)
# demo.launch()