# import os # import re # import shutil # import traceback # import gradio as gr # from pathlib import Path # from histopath.agent import A1 # from dotenv import load_dotenv # # Load environment variables # load_dotenv() # # Get passcode from environment # PASSCODE = os.getenv("GRADIO_PASSWORD") # # Initialize agent (will be created after passcode validation) # agent = None # def check_for_output_files(): # """Check for all files in the output directory and return their paths.""" # output_dir = Path("./output") # if not output_dir.exists(): # return [], [] # image_extensions = {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff"} # data_extensions = {".csv", ".txt", ".json", ".npy"} # images = [] # data_files = [] # for file in output_dir.iterdir(): # if file.is_file(): # if file.suffix.lower() in image_extensions: # images.append(str(file)) # elif file.suffix.lower() in data_extensions: # data_files.append(str(file)) # return images, data_files # def preview_uploaded_file(uploaded_file): # """Preview the uploaded file - show image or file info.""" # if uploaded_file is None: # return None, None, "No file uploaded" # file_path = Path(uploaded_file.name) # file_ext = file_path.suffix.lower() # image_extensions = {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff", ".svs"} # if file_ext in image_extensions: # # Show image preview # return uploaded_file.name, None, f"šŸ“· Previewing: {file_path.name}" # else: # # Show file info # file_size = Path(uploaded_file.name).stat().st_size / 1024 # KB # return None, uploaded_file.name, f"šŸ“„ File: {file_path.name} ({file_size:.1f} KB)" # def parse_agent_output(output): # """Parse agent output to extract code blocks, observations, and regular text.""" # # Strip out the message divider bars # output = re.sub(r'={30,}\s*(Human|Ai)\s+Message\s*={30,}', '', output) # output = output.strip() # parsed = { # "type": "text", # "content": output, # "code": None, # "observation": None, # "thinking": None # } # # Check for code execution block # execute_match = re.search(r'(.*?)', output, re.DOTALL) # if execute_match: # parsed["type"] = "code" # parsed["code"] = execute_match.group(1).strip() # # Extract text before the code block (thinking/explanation) # text_before = output[:execute_match.start()].strip() # # Remove any think tags but keep the content # text_before = re.sub(r'(.*?)', r'\1', text_before, flags=re.DOTALL) # text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip() # parsed["thinking"] = text_before if text_before else None # return parsed # # Check for observation block # observation_match = re.search(r'(.*?)', output, re.DOTALL) # if observation_match: # parsed["type"] = "observation" # parsed["observation"] = observation_match.group(1).strip() # # Extract text before observation if any # text_before = output[:observation_match.start()].strip() # text_before = re.sub(r'(.*?)', r'\1', text_before, flags=re.DOTALL) # text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip() # parsed["thinking"] = text_before if text_before else None # return parsed # # Check for solution block # solution_match = re.search(r'(.*?)', output, re.DOTALL) # if solution_match: # parsed["type"] = "solution" # parsed["content"] = solution_match.group(1).strip() # # Get thinking before solution # text_before = output[:solution_match.start()].strip() # text_before = re.sub(r'(.*?)', r'\1', text_before, flags=re.DOTALL) # text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip() # parsed["thinking"] = text_before if text_before else None # return parsed # # Clean up any remaining tags for display # cleaned = re.sub(r'(.*?)', r'\1', output, flags=re.DOTALL) # cleaned = re.sub(r'={30,}.*?={30,}', '', cleaned).strip() # parsed["content"] = cleaned # return parsed # def format_message_for_display(parsed_output): # """Format parsed output into a readable message for the chatbot.""" # msg_parts = [] # # Add thinking/explanation text first if present # if parsed_output.get("thinking"): # msg_parts.append(parsed_output["thinking"]) # if parsed_output["type"] == "code": # # Add separator if there was thinking text # if parsed_output.get("thinking"): # msg_parts.append("\n---\n") # msg_parts.append("### šŸ’» Executing Code\n") # msg_parts.append(f"```python\n{parsed_output['code']}\n```") # elif parsed_output["type"] == "observation": # # Add separator if there was thinking text # if parsed_output.get("thinking"): # msg_parts.append("\n---\n") # msg_parts.append("### šŸ“Š Observation\n") # msg_parts.append(f"```\n{parsed_output['observation']}\n```") # elif parsed_output["type"] == "solution": # # Add separator if there was thinking text # if parsed_output.get("thinking"): # msg_parts.append("\n---\n") # msg_parts.append("### āœ… Solution\n") # msg_parts.append(parsed_output['content']) # else: # # For regular text, just add the content if thinking wasn't already set # if not parsed_output.get("thinking"): # msg_parts.append(parsed_output["content"]) # return "\n\n".join(msg_parts) # def process_agent_response(prompt, uploaded_file, chatbot_history): # """Process the agent response and update chatbot.""" # global agent # if agent is None: # chatbot_history.append({ # "role": "assistant", # "content": "āš ļø Please enter the passcode first to initialize the agent." # }) # yield chatbot_history, None, None, None, None, "āš ļø Agent not initialized" # return # if not prompt.strip() and uploaded_file is None: # chatbot_history.append({ # "role": "assistant", # "content": "āš ļø Please provide a prompt or upload a file." # }) # yield chatbot_history, None, None, None, None, "āš ļø No input provided" # return # # Handle file upload # file_path = None # file_info = "" # if uploaded_file is not None: # try: # # Create data directory if it doesn't exist # data_dir = Path("./data") # data_dir.mkdir(exist_ok=True) # # Copy uploaded file to data directory # file_name = Path(uploaded_file.name).name # file_path = data_dir / file_name # shutil.copy(uploaded_file.name, file_path) # file_info = f"\n\nšŸ“Ž **Uploaded file:** `{file_path}`\n" # # Augment prompt with file path # if prompt.strip(): # prompt = f"{prompt}\n\nUploaded file path: {file_path}" # else: # prompt = f"I have uploaded a file at: {file_path}. Please analyze it." # except Exception as e: # error_msg = f"āŒ Error handling file upload: {str(e)}" # chatbot_history.append({ # "role": "assistant", # "content": error_msg # }) # yield chatbot_history, None, None, None, None, error_msg # return # # Add user message to chat # user_message = prompt if not file_info else f"{prompt}{file_info}" # chatbot_history.append({"role": "user", "content": user_message}) # yield chatbot_history, None, None, None, None, "šŸ”„ Processing..." # try: # # Stream agent responses # step_count = 0 # for step in agent.go_stream(prompt): # step_count += 1 # output = step.get("output", "") # if output: # # Parse the output # parsed = parse_agent_output(output) # # Add thinking text as separate message if present # if parsed.get("thinking"): # chatbot_history.append({ # "role": "assistant", # "content": parsed["thinking"] # }) # # Add the block (code/observation/solution) as separate message if present # if parsed["type"] == "code" and parsed["code"]: # chatbot_history.append({ # "role": "assistant", # "content": f"### šŸ’» Executing Code\n\n```python\n{parsed['code']}\n```" # }) # elif parsed["type"] == "observation" and parsed["observation"]: # chatbot_history.append({ # "role": "assistant", # "content": f"### šŸ“Š Observation\n\n```\n{parsed['observation']}\n```" # }) # elif parsed["type"] == "solution": # chatbot_history.append({ # "role": "assistant", # "content": f"### āœ… Solution\n\n{parsed['content']}" # }) # elif parsed["type"] == "text" and parsed["content"]: # # Only add if we haven't already added it as thinking # if not parsed.get("thinking"): # chatbot_history.append({ # "role": "assistant", # "content": parsed["content"] # }) # # Check for output files after each step # images, data_files = check_for_output_files() # # Create status message # status = f"šŸ”„ Step {step_count}" # if parsed["type"] == "code": # status += " - Executing code..." # elif parsed["type"] == "observation": # status += " - Processing results..." # elif parsed["type"] == "solution": # status += " - Finalizing solution..." # yield ( # chatbot_history, # images if images else None, # data_files if data_files else None, # None, # None, # status # ) # # Final check for files # final_images, final_data = check_for_output_files() # # Create download links message if files were generated # if final_images or final_data: # download_msg = "\n\n---\n\n### šŸ“ Generated Files Ready for Download\n\n" # if final_images: # download_msg += f"**šŸ–¼ļø Images ({len(final_images)})** - Available in the **Images** tab →\n" # for img_path in final_images: # img_name = Path(img_path).name # download_msg += f"- `{img_name}`\n" # download_msg += "\n" # if final_data: # download_msg += f"**šŸ“„ Data Files ({len(final_data)})** - Available in the **Data** tab →\n" # for data_path in final_data: # data_name = Path(data_path).name # download_msg += f"- `{data_name}`\n" # download_msg += "\n*Click the download button on each file in the respective tabs above.*" # # Add download message as separate bubble # chatbot_history.append({ # "role": "assistant", # "content": download_msg # }) # status = "āœ… Complete" # if final_images: # status += f" | {len(final_images)} image(s)" # if final_data: # status += f" | {len(final_data)} data file(s)" # yield chatbot_history, final_images if final_images else None, final_data if final_data else None, None, None, status # except Exception as e: # error_msg = f"āŒ Error: {str(e)}\n\n```\n{traceback.format_exc()}\n```" # chatbot_history.append({ # "role": "assistant", # "content": error_msg # }) # yield chatbot_history, None, None, None, None, "āŒ Error occurred" # def validate_passcode(passcode): # """Validate the passcode and initialize the agent.""" # global agent # if passcode == PASSCODE: # # Initialize agent # try: # agent = A1() # return ( # gr.update(visible=False), # Hide passcode section # gr.update(visible=True), # Show main interface # "āœ… Access granted! Agent initialized and ready." # ) # except Exception as e: # error_trace = traceback.format_exc() # return ( # gr.update(visible=True), # gr.update(visible=False), # f"āŒ Error initializing agent:\n{str(e)}\n\n{error_trace}" # ) # else: # return ( # gr.update(visible=True), # gr.update(visible=False), # "āŒ Invalid passcode. Please try again." # ) # def clear_chat(): # """Clear the chat history and output files.""" # # Clean up output directory # output_dir = Path("./output") # if output_dir.exists(): # shutil.rmtree(output_dir) # output_dir.mkdir(exist_ok=True) # # Clean up data directory # data_dir = Path("./data") # if data_dir.exists(): # for file in data_dir.iterdir(): # if file.is_file(): # file.unlink() # return [], None, None, None, None, "šŸ—‘ļø Chat cleared" # # Create Gradio interface with custom theme # custom_theme = gr.themes.Soft( # primary_hue="blue", # secondary_hue="slate", # spacing_size="sm", # radius_size="md", # ).set( # button_primary_background_fill="*primary_500", # button_primary_background_fill_hover="*primary_600", # block_label_text_weight="600", # block_title_text_weight="600", # ) # with gr.Blocks(title="HistoPath Agent", theme=custom_theme, css=""" # .gradio-container { # max-width: 100% !important; # } # .main-header { # text-align: center; # padding: 1.5rem 0; # background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); # color: white; # border-radius: 8px; # margin-bottom: 1.5rem; # } # .main-header h1 { # margin: 0; # font-size: 2.2rem; # font-weight: 700; # } # .main-header p { # margin: 0.5rem 0 0 0; # opacity: 0.95; # font-size: 1.1rem; # } # .file-upload-box .wrap { # min-width: 0 !important; # } # .file-upload-box .file-name { # word-break: break-word !important; # white-space: normal !important; # overflow-wrap: break-word !important; # } # .tab-nav { # margin-bottom: 0.5rem; # } # /* Better styling for code and observation blocks */ # .message.bot pre { # background-color: #f6f8fa !important; # border: 1px solid #d0d7de !important; # border-radius: 6px !important; # padding: 12px !important; # margin: 8px 0 !important; # } # .message.bot h3 { # margin-top: 12px !important; # margin-bottom: 8px !important; # font-weight: 600 !important; # } # .message.bot hr { # border: none !important; # border-top: 2px solid #e1e4e8 !important; # margin: 16px 0 !important; # } # """) as demo: # # Header # gr.HTML(""" #
#

šŸ”¬ HistoPath Agent

#

AI-Powered Histopathology Analysis Assistant

#
# """) # # Passcode section # with gr.Group(visible=True) as passcode_section: # gr.Markdown("### šŸ” Authentication Required") # with gr.Row(): # passcode_input = gr.Textbox( # label="Passcode", # type="password", # placeholder="Enter your passcode...", # scale=3 # ) # passcode_btn = gr.Button("šŸ”“ Unlock", variant="primary", scale=1, size="lg") # passcode_status = gr.Textbox( # label="Status", # interactive=False, # lines=2 # ) # # Main interface (hidden initially) # with gr.Group(visible=False) as main_interface: # with gr.Row(equal_height=True): # # Left column - Chat interface # with gr.Column(scale=3): # chatbot = gr.Chatbot( # label="šŸ’¬ Conversation", # height=550, # # type="messages", # show_label=True, # render_markdown=True, # ) # # Input area # with gr.Row(): # with gr.Column(scale=7): # prompt_input = gr.Textbox( # label="Your Query", # placeholder="E.g., 'Caption the uploaded whole slide image' or 'Segment cells using instanseg model'", # lines=2, # max_lines=5, # show_label=False, # ) # with gr.Column(scale=3): # file_upload = gr.File( # label="šŸ“Ž Upload File", # file_types=[".svs", ".png", ".jpg", ".jpeg", ".tif", ".tiff", ".csv", ".txt", ".json", ".npy"], # height=75, # elem_classes="file-upload-box", # ) # with gr.Row(): # submit_btn = gr.Button("šŸš€ Submit", variant="primary", scale=3, size="lg") # clear_btn = gr.Button("šŸ—‘ļø Clear", scale=1, size="lg", variant="secondary") # status_text = gr.Textbox( # label="Status", # interactive=False, # value="Ready", # show_label=False, # container=False, # ) # # Right column - Outputs # with gr.Column(scale=2): # with gr.Tabs(): # with gr.Tab("šŸ“„ Input"): # with gr.Column(): # input_image_preview = gr.Image( # label="Input Image", # height=400, # show_label=False, # container=True, # ) # input_file_preview = gr.File( # label="Input File", # interactive=False, # height=100, # show_label=False, # container=True, # ) # input_status = gr.Textbox( # value="Upload a file to preview", # show_label=False, # interactive=False, # container=False, # ) # with gr.Tab("šŸ–¼ļø Images"): # output_gallery = gr.Gallery( # label="Generated Visualizations", # columns=1, # height=600, # object_fit="contain", # show_label=False, # show_download_button=True, # ) # with gr.Tab("šŸ“„ Data"): # data_files = gr.File( # label="Generated Data Files", # file_count="multiple", # interactive=False, # height=600, # show_label=False, # ) # # Event handlers # passcode_btn.click( # fn=validate_passcode, # inputs=[passcode_input], # outputs=[passcode_section, main_interface, passcode_status] # ) # # File upload preview # file_upload.change( # fn=preview_uploaded_file, # inputs=[file_upload], # outputs=[input_image_preview, input_file_preview, input_status] # ) # submit_btn.click( # fn=process_agent_response, # inputs=[prompt_input, file_upload, chatbot], # outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text] # ) # clear_btn.click( # fn=clear_chat, # outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text] # ) # # Allow enter key to submit # prompt_input.submit( # fn=process_agent_response, # inputs=[prompt_input, file_upload, chatbot], # outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text] # ) # if __name__ == "__main__": # # Create necessary directories # Path("./data").mkdir(exist_ok=True) # Path("./output").mkdir(exist_ok=True) # print("=" * 60) # print("šŸ”¬ HistoPath Agent - Gradio Interface") # print("=" * 60) # # print(f"Passcode: {PASSCODE}") # print("Starting server...") # print("=" * 60) # # Launch the app # #demo.launch( # # server_name="0.0.0.0", # # server_port=7860, # Let Gradio auto-pick an available port # # share=True, # # show_error=True, # #) # demo.launch()