| # import os | |
| # import re | |
| # import shutil | |
| # import traceback | |
| # import gradio as gr | |
| # from pathlib import Path | |
| # from histopath.agent import A1 | |
| # from dotenv import load_dotenv | |
| # # Load environment variables | |
| # load_dotenv() | |
| # # Get passcode from environment | |
| # PASSCODE = os.getenv("GRADIO_PASSWORD") | |
| # # Initialize agent (will be created after passcode validation) | |
| # agent = None | |
| # def check_for_output_files(): | |
| # """Check for all files in the output directory and return their paths.""" | |
| # output_dir = Path("./output") | |
| # if not output_dir.exists(): | |
| # return [], [] | |
| # image_extensions = {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff"} | |
| # data_extensions = {".csv", ".txt", ".json", ".npy"} | |
| # images = [] | |
| # data_files = [] | |
| # for file in output_dir.iterdir(): | |
| # if file.is_file(): | |
| # if file.suffix.lower() in image_extensions: | |
| # images.append(str(file)) | |
| # elif file.suffix.lower() in data_extensions: | |
| # data_files.append(str(file)) | |
| # return images, data_files | |
| # def preview_uploaded_file(uploaded_file): | |
| # """Preview the uploaded file - show image or file info.""" | |
| # if uploaded_file is None: | |
| # return None, None, "No file uploaded" | |
| # file_path = Path(uploaded_file.name) | |
| # file_ext = file_path.suffix.lower() | |
| # image_extensions = {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff", ".svs"} | |
| # if file_ext in image_extensions: | |
| # # Show image preview | |
| # return uploaded_file.name, None, f"π· Previewing: {file_path.name}" | |
| # else: | |
| # # Show file info | |
| # file_size = Path(uploaded_file.name).stat().st_size / 1024 # KB | |
| # return None, uploaded_file.name, f"π File: {file_path.name} ({file_size:.1f} KB)" | |
| # def parse_agent_output(output): | |
| # """Parse agent output to extract code blocks, observations, and regular text.""" | |
| # # Strip out the message divider bars | |
| # output = re.sub(r'={30,}\s*(Human|Ai)\s+Message\s*={30,}', '', output) | |
| # output = output.strip() | |
| # parsed = { | |
| # "type": "text", | |
| # "content": output, | |
| # "code": None, | |
| # "observation": None, | |
| # "thinking": None | |
| # } | |
| # # Check for code execution block | |
| # execute_match = re.search(r'<execute>(.*?)</execute>', output, re.DOTALL) | |
| # if execute_match: | |
| # parsed["type"] = "code" | |
| # parsed["code"] = execute_match.group(1).strip() | |
| # # Extract text before the code block (thinking/explanation) | |
| # text_before = output[:execute_match.start()].strip() | |
| # # Remove any think tags but keep the content | |
| # text_before = re.sub(r'<think>(.*?)</think>', r'\1', text_before, flags=re.DOTALL) | |
| # text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip() | |
| # parsed["thinking"] = text_before if text_before else None | |
| # return parsed | |
| # # Check for observation block | |
| # observation_match = re.search(r'<observation>(.*?)</observation>', output, re.DOTALL) | |
| # if observation_match: | |
| # parsed["type"] = "observation" | |
| # parsed["observation"] = observation_match.group(1).strip() | |
| # # Extract text before observation if any | |
| # text_before = output[:observation_match.start()].strip() | |
| # text_before = re.sub(r'<think>(.*?)</think>', r'\1', text_before, flags=re.DOTALL) | |
| # text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip() | |
| # parsed["thinking"] = text_before if text_before else None | |
| # return parsed | |
| # # Check for solution block | |
| # solution_match = re.search(r'<solution>(.*?)</solution>', output, re.DOTALL) | |
| # if solution_match: | |
| # parsed["type"] = "solution" | |
| # parsed["content"] = solution_match.group(1).strip() | |
| # # Get thinking before solution | |
| # text_before = output[:solution_match.start()].strip() | |
| # text_before = re.sub(r'<think>(.*?)</think>', r'\1', text_before, flags=re.DOTALL) | |
| # text_before = re.sub(r'={30,}.*?={30,}', '', text_before).strip() | |
| # parsed["thinking"] = text_before if text_before else None | |
| # return parsed | |
| # # Clean up any remaining tags for display | |
| # cleaned = re.sub(r'<think>(.*?)</think>', r'\1', output, flags=re.DOTALL) | |
| # cleaned = re.sub(r'={30,}.*?={30,}', '', cleaned).strip() | |
| # parsed["content"] = cleaned | |
| # return parsed | |
| # def format_message_for_display(parsed_output): | |
| # """Format parsed output into a readable message for the chatbot.""" | |
| # msg_parts = [] | |
| # # Add thinking/explanation text first if present | |
| # if parsed_output.get("thinking"): | |
| # msg_parts.append(parsed_output["thinking"]) | |
| # if parsed_output["type"] == "code": | |
| # # Add separator if there was thinking text | |
| # if parsed_output.get("thinking"): | |
| # msg_parts.append("\n---\n") | |
| # msg_parts.append("### π» Executing Code\n") | |
| # msg_parts.append(f"```python\n{parsed_output['code']}\n```") | |
| # elif parsed_output["type"] == "observation": | |
| # # Add separator if there was thinking text | |
| # if parsed_output.get("thinking"): | |
| # msg_parts.append("\n---\n") | |
| # msg_parts.append("### π Observation\n") | |
| # msg_parts.append(f"```\n{parsed_output['observation']}\n```") | |
| # elif parsed_output["type"] == "solution": | |
| # # Add separator if there was thinking text | |
| # if parsed_output.get("thinking"): | |
| # msg_parts.append("\n---\n") | |
| # msg_parts.append("### β Solution\n") | |
| # msg_parts.append(parsed_output['content']) | |
| # else: | |
| # # For regular text, just add the content if thinking wasn't already set | |
| # if not parsed_output.get("thinking"): | |
| # msg_parts.append(parsed_output["content"]) | |
| # return "\n\n".join(msg_parts) | |
| # def process_agent_response(prompt, uploaded_file, chatbot_history): | |
| # """Process the agent response and update chatbot.""" | |
| # global agent | |
| # if agent is None: | |
| # chatbot_history.append({ | |
| # "role": "assistant", | |
| # "content": "β οΈ Please enter the passcode first to initialize the agent." | |
| # }) | |
| # yield chatbot_history, None, None, None, None, "β οΈ Agent not initialized" | |
| # return | |
| # if not prompt.strip() and uploaded_file is None: | |
| # chatbot_history.append({ | |
| # "role": "assistant", | |
| # "content": "β οΈ Please provide a prompt or upload a file." | |
| # }) | |
| # yield chatbot_history, None, None, None, None, "β οΈ No input provided" | |
| # return | |
| # # Handle file upload | |
| # file_path = None | |
| # file_info = "" | |
| # if uploaded_file is not None: | |
| # try: | |
| # # Create data directory if it doesn't exist | |
| # data_dir = Path("./data") | |
| # data_dir.mkdir(exist_ok=True) | |
| # # Copy uploaded file to data directory | |
| # file_name = Path(uploaded_file.name).name | |
| # file_path = data_dir / file_name | |
| # shutil.copy(uploaded_file.name, file_path) | |
| # file_info = f"\n\nπ **Uploaded file:** `{file_path}`\n" | |
| # # Augment prompt with file path | |
| # if prompt.strip(): | |
| # prompt = f"{prompt}\n\nUploaded file path: {file_path}" | |
| # else: | |
| # prompt = f"I have uploaded a file at: {file_path}. Please analyze it." | |
| # except Exception as e: | |
| # error_msg = f"β Error handling file upload: {str(e)}" | |
| # chatbot_history.append({ | |
| # "role": "assistant", | |
| # "content": error_msg | |
| # }) | |
| # yield chatbot_history, None, None, None, None, error_msg | |
| # return | |
| # # Add user message to chat | |
| # user_message = prompt if not file_info else f"{prompt}{file_info}" | |
| # chatbot_history.append({"role": "user", "content": user_message}) | |
| # yield chatbot_history, None, None, None, None, "π Processing..." | |
| # try: | |
| # # Stream agent responses | |
| # step_count = 0 | |
| # for step in agent.go_stream(prompt): | |
| # step_count += 1 | |
| # output = step.get("output", "") | |
| # if output: | |
| # # Parse the output | |
| # parsed = parse_agent_output(output) | |
| # # Add thinking text as separate message if present | |
| # if parsed.get("thinking"): | |
| # chatbot_history.append({ | |
| # "role": "assistant", | |
| # "content": parsed["thinking"] | |
| # }) | |
| # # Add the block (code/observation/solution) as separate message if present | |
| # if parsed["type"] == "code" and parsed["code"]: | |
| # chatbot_history.append({ | |
| # "role": "assistant", | |
| # "content": f"### π» Executing Code\n\n```python\n{parsed['code']}\n```" | |
| # }) | |
| # elif parsed["type"] == "observation" and parsed["observation"]: | |
| # chatbot_history.append({ | |
| # "role": "assistant", | |
| # "content": f"### π Observation\n\n```\n{parsed['observation']}\n```" | |
| # }) | |
| # elif parsed["type"] == "solution": | |
| # chatbot_history.append({ | |
| # "role": "assistant", | |
| # "content": f"### β Solution\n\n{parsed['content']}" | |
| # }) | |
| # elif parsed["type"] == "text" and parsed["content"]: | |
| # # Only add if we haven't already added it as thinking | |
| # if not parsed.get("thinking"): | |
| # chatbot_history.append({ | |
| # "role": "assistant", | |
| # "content": parsed["content"] | |
| # }) | |
| # # Check for output files after each step | |
| # images, data_files = check_for_output_files() | |
| # # Create status message | |
| # status = f"π Step {step_count}" | |
| # if parsed["type"] == "code": | |
| # status += " - Executing code..." | |
| # elif parsed["type"] == "observation": | |
| # status += " - Processing results..." | |
| # elif parsed["type"] == "solution": | |
| # status += " - Finalizing solution..." | |
| # yield ( | |
| # chatbot_history, | |
| # images if images else None, | |
| # data_files if data_files else None, | |
| # None, | |
| # None, | |
| # status | |
| # ) | |
| # # Final check for files | |
| # final_images, final_data = check_for_output_files() | |
| # # Create download links message if files were generated | |
| # if final_images or final_data: | |
| # download_msg = "\n\n---\n\n### π Generated Files Ready for Download\n\n" | |
| # if final_images: | |
| # download_msg += f"**πΌοΈ Images ({len(final_images)})** - Available in the **Images** tab β\n" | |
| # for img_path in final_images: | |
| # img_name = Path(img_path).name | |
| # download_msg += f"- `{img_name}`\n" | |
| # download_msg += "\n" | |
| # if final_data: | |
| # download_msg += f"**π Data Files ({len(final_data)})** - Available in the **Data** tab β\n" | |
| # for data_path in final_data: | |
| # data_name = Path(data_path).name | |
| # download_msg += f"- `{data_name}`\n" | |
| # download_msg += "\n*Click the download button on each file in the respective tabs above.*" | |
| # # Add download message as separate bubble | |
| # chatbot_history.append({ | |
| # "role": "assistant", | |
| # "content": download_msg | |
| # }) | |
| # status = "β Complete" | |
| # if final_images: | |
| # status += f" | {len(final_images)} image(s)" | |
| # if final_data: | |
| # status += f" | {len(final_data)} data file(s)" | |
| # yield chatbot_history, final_images if final_images else None, final_data if final_data else None, None, None, status | |
| # except Exception as e: | |
| # error_msg = f"β Error: {str(e)}\n\n```\n{traceback.format_exc()}\n```" | |
| # chatbot_history.append({ | |
| # "role": "assistant", | |
| # "content": error_msg | |
| # }) | |
| # yield chatbot_history, None, None, None, None, "β Error occurred" | |
| # def validate_passcode(passcode): | |
| # """Validate the passcode and initialize the agent.""" | |
| # global agent | |
| # if passcode == PASSCODE: | |
| # # Initialize agent | |
| # try: | |
| # agent = A1() | |
| # return ( | |
| # gr.update(visible=False), # Hide passcode section | |
| # gr.update(visible=True), # Show main interface | |
| # "β Access granted! Agent initialized and ready." | |
| # ) | |
| # except Exception as e: | |
| # error_trace = traceback.format_exc() | |
| # return ( | |
| # gr.update(visible=True), | |
| # gr.update(visible=False), | |
| # f"β Error initializing agent:\n{str(e)}\n\n{error_trace}" | |
| # ) | |
| # else: | |
| # return ( | |
| # gr.update(visible=True), | |
| # gr.update(visible=False), | |
| # "β Invalid passcode. Please try again." | |
| # ) | |
| # def clear_chat(): | |
| # """Clear the chat history and output files.""" | |
| # # Clean up output directory | |
| # output_dir = Path("./output") | |
| # if output_dir.exists(): | |
| # shutil.rmtree(output_dir) | |
| # output_dir.mkdir(exist_ok=True) | |
| # # Clean up data directory | |
| # data_dir = Path("./data") | |
| # if data_dir.exists(): | |
| # for file in data_dir.iterdir(): | |
| # if file.is_file(): | |
| # file.unlink() | |
| # return [], None, None, None, None, "ποΈ Chat cleared" | |
| # # Create Gradio interface with custom theme | |
| # custom_theme = gr.themes.Soft( | |
| # primary_hue="blue", | |
| # secondary_hue="slate", | |
| # spacing_size="sm", | |
| # radius_size="md", | |
| # ).set( | |
| # button_primary_background_fill="*primary_500", | |
| # button_primary_background_fill_hover="*primary_600", | |
| # block_label_text_weight="600", | |
| # block_title_text_weight="600", | |
| # ) | |
| # with gr.Blocks(title="HistoPath Agent", theme=custom_theme, css=""" | |
| # .gradio-container { | |
| # max-width: 100% !important; | |
| # } | |
| # .main-header { | |
| # text-align: center; | |
| # padding: 1.5rem 0; | |
| # background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| # color: white; | |
| # border-radius: 8px; | |
| # margin-bottom: 1.5rem; | |
| # } | |
| # .main-header h1 { | |
| # margin: 0; | |
| # font-size: 2.2rem; | |
| # font-weight: 700; | |
| # } | |
| # .main-header p { | |
| # margin: 0.5rem 0 0 0; | |
| # opacity: 0.95; | |
| # font-size: 1.1rem; | |
| # } | |
| # .file-upload-box .wrap { | |
| # min-width: 0 !important; | |
| # } | |
| # .file-upload-box .file-name { | |
| # word-break: break-word !important; | |
| # white-space: normal !important; | |
| # overflow-wrap: break-word !important; | |
| # } | |
| # .tab-nav { | |
| # margin-bottom: 0.5rem; | |
| # } | |
| # /* Better styling for code and observation blocks */ | |
| # .message.bot pre { | |
| # background-color: #f6f8fa !important; | |
| # border: 1px solid #d0d7de !important; | |
| # border-radius: 6px !important; | |
| # padding: 12px !important; | |
| # margin: 8px 0 !important; | |
| # } | |
| # .message.bot h3 { | |
| # margin-top: 12px !important; | |
| # margin-bottom: 8px !important; | |
| # font-weight: 600 !important; | |
| # } | |
| # .message.bot hr { | |
| # border: none !important; | |
| # border-top: 2px solid #e1e4e8 !important; | |
| # margin: 16px 0 !important; | |
| # } | |
| # """) as demo: | |
| # # Header | |
| # gr.HTML(""" | |
| # <div class="main-header"> | |
| # <h1>π¬ HistoPath Agent</h1> | |
| # <p>AI-Powered Histopathology Analysis Assistant</p> | |
| # </div> | |
| # """) | |
| # # Passcode section | |
| # with gr.Group(visible=True) as passcode_section: | |
| # gr.Markdown("### π Authentication Required") | |
| # with gr.Row(): | |
| # passcode_input = gr.Textbox( | |
| # label="Passcode", | |
| # type="password", | |
| # placeholder="Enter your passcode...", | |
| # scale=3 | |
| # ) | |
| # passcode_btn = gr.Button("π Unlock", variant="primary", scale=1, size="lg") | |
| # passcode_status = gr.Textbox( | |
| # label="Status", | |
| # interactive=False, | |
| # lines=2 | |
| # ) | |
| # # Main interface (hidden initially) | |
| # with gr.Group(visible=False) as main_interface: | |
| # with gr.Row(equal_height=True): | |
| # # Left column - Chat interface | |
| # with gr.Column(scale=3): | |
| # chatbot = gr.Chatbot( | |
| # label="π¬ Conversation", | |
| # height=550, | |
| # # type="messages", | |
| # show_label=True, | |
| # render_markdown=True, | |
| # ) | |
| # # Input area | |
| # with gr.Row(): | |
| # with gr.Column(scale=7): | |
| # prompt_input = gr.Textbox( | |
| # label="Your Query", | |
| # placeholder="E.g., 'Caption the uploaded whole slide image' or 'Segment cells using instanseg model'", | |
| # lines=2, | |
| # max_lines=5, | |
| # show_label=False, | |
| # ) | |
| # with gr.Column(scale=3): | |
| # file_upload = gr.File( | |
| # label="π Upload File", | |
| # file_types=[".svs", ".png", ".jpg", ".jpeg", ".tif", ".tiff", ".csv", ".txt", ".json", ".npy"], | |
| # height=75, | |
| # elem_classes="file-upload-box", | |
| # ) | |
| # with gr.Row(): | |
| # submit_btn = gr.Button("π Submit", variant="primary", scale=3, size="lg") | |
| # clear_btn = gr.Button("ποΈ Clear", scale=1, size="lg", variant="secondary") | |
| # status_text = gr.Textbox( | |
| # label="Status", | |
| # interactive=False, | |
| # value="Ready", | |
| # show_label=False, | |
| # container=False, | |
| # ) | |
| # # Right column - Outputs | |
| # with gr.Column(scale=2): | |
| # with gr.Tabs(): | |
| # with gr.Tab("π₯ Input"): | |
| # with gr.Column(): | |
| # input_image_preview = gr.Image( | |
| # label="Input Image", | |
| # height=400, | |
| # show_label=False, | |
| # container=True, | |
| # ) | |
| # input_file_preview = gr.File( | |
| # label="Input File", | |
| # interactive=False, | |
| # height=100, | |
| # show_label=False, | |
| # container=True, | |
| # ) | |
| # input_status = gr.Textbox( | |
| # value="Upload a file to preview", | |
| # show_label=False, | |
| # interactive=False, | |
| # container=False, | |
| # ) | |
| # with gr.Tab("πΌοΈ Images"): | |
| # output_gallery = gr.Gallery( | |
| # label="Generated Visualizations", | |
| # columns=1, | |
| # height=600, | |
| # object_fit="contain", | |
| # show_label=False, | |
| # show_download_button=True, | |
| # ) | |
| # with gr.Tab("π Data"): | |
| # data_files = gr.File( | |
| # label="Generated Data Files", | |
| # file_count="multiple", | |
| # interactive=False, | |
| # height=600, | |
| # show_label=False, | |
| # ) | |
| # # Event handlers | |
| # passcode_btn.click( | |
| # fn=validate_passcode, | |
| # inputs=[passcode_input], | |
| # outputs=[passcode_section, main_interface, passcode_status] | |
| # ) | |
| # # File upload preview | |
| # file_upload.change( | |
| # fn=preview_uploaded_file, | |
| # inputs=[file_upload], | |
| # outputs=[input_image_preview, input_file_preview, input_status] | |
| # ) | |
| # submit_btn.click( | |
| # fn=process_agent_response, | |
| # inputs=[prompt_input, file_upload, chatbot], | |
| # outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text] | |
| # ) | |
| # clear_btn.click( | |
| # fn=clear_chat, | |
| # outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text] | |
| # ) | |
| # # Allow enter key to submit | |
| # prompt_input.submit( | |
| # fn=process_agent_response, | |
| # inputs=[prompt_input, file_upload, chatbot], | |
| # outputs=[chatbot, output_gallery, data_files, input_image_preview, input_file_preview, status_text] | |
| # ) | |
| # if __name__ == "__main__": | |
| # # Create necessary directories | |
| # Path("./data").mkdir(exist_ok=True) | |
| # Path("./output").mkdir(exist_ok=True) | |
| # print("=" * 60) | |
| # print("π¬ HistoPath Agent - Gradio Interface") | |
| # print("=" * 60) | |
| # # print(f"Passcode: {PASSCODE}") | |
| # print("Starting server...") | |
| # print("=" * 60) | |
| # # Launch the app | |
| # #demo.launch( | |
| # # server_name="0.0.0.0", | |
| # # server_port=7860, # Let Gradio auto-pick an available port | |
| # # share=True, | |
| # # show_error=True, | |
| # #) | |
| # demo.launch() |