# app.py import gradio as gr import os import traceback import asyncio from dotenv import load_dotenv from models.task_prompt import TaskPrompt import time from llama_index.core import Settings as LlamaSettings # Import at top level from llama_index.llms.anthropic import Anthropic # Import at top level from manager_agent import ManagerAgent # Ensure this path is correct import concurrent.futures # For running blocking code in a separate thread # Load environment variables from .env file load_dotenv() # --- Configuration --- LLM_MODEL = "claude-sonnet-4-20250514" # --- Global variables --- current_status = "Ready" llm_global = None manager_agent_global = None # Settings_global is not strictly needed as a global if LlamaSettings is imported directly # Thread pool executor for running blocking agent tasks thread_pool_executor = concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count() or 1) # --- LlamaIndex LLM Initialization --- def initialize_components(): global llm_global, manager_agent_global api_key = os.environ.get("ANTHROPIC_API_KEY") if not api_key: print("\n" + "="*60) print("āš ļø ERROR: ANTHROPIC_API_KEY not found in environment variables!") print("Please set your API key (e.g., in a .env file).") print("="*60 + "\n") return try: llm_global = Anthropic( model=LLM_MODEL, temperature=0.2, max_tokens=4096 ) LlamaSettings.llm = llm_global # Use the imported LlamaSettings directly print(f"Successfully initialized LlamaIndex with Anthropic model: {LLM_MODEL} (temperature=0.2)") manager_agent_global = ManagerAgent( llm_global, max_iterations=30, # Keep this reasonable for testing update_callback=update_status_callback ) print("āœ… ManagerAgent initialized successfully") except Exception as e: print(f"Error initializing Anthropic LLM or ManagerAgent: {e}") traceback.print_exc() # --- Update callback function (called by ManagerAgent) --- def update_status_callback(message): global current_status # This function is called from the ManagerAgent's thread (potentially) # or the ReAct agent's execution context. # It needs to update the global variable, which the Gradio polling thread will pick up. current_status = message print(f"āœ… UI_STATUS_UPDATE (via callback): {message}") # Differentiate console log # --- Status retrieval function for Gradio polling --- def get_current_status_for_ui(): global current_status timestamp = time.time() return f"{current_status}{timestamp}" # --- Gradio Interface Setup --- def create_gradio_interface(): if "ANTHROPIC_API_KEY" not in os.environ: gr.Warning("ANTHROPIC_API_KEY not found in environment variables! ALITA may not function correctly.") with gr.Blocks(theme="soft") as demo: gr.Markdown("# ALITA") gr.Markdown("ALITA is a self-learning AI agent that can search for information, analyze data, create tools, and orchestrate complex tasks.") chatbot_component = gr.Chatbot( label="Chat", height=500, show_label=False, type='messages' # Explicitly set type to 'messages' to avoid deprecation warning ) gr.Markdown("Gradio version: " + gr.__version__ + " (Chatbot type defaults to 'tuples' for older versions. Consider `type='messages'` for newer Gradio if issues persist with chat display).") with gr.Row(): message_textbox = gr.Textbox( placeholder="Type your message here...", scale=7, show_label=False, container=False ) gr.Examples( examples=[ "šŸ” Search for information on artificial intelligence", "šŸ“Š Analyze trends in the technology market", "⚔ Create a script to automate a repetitive task", "🌐 Find open-source resources for machine learning", "What is the temperature in Paris now?" ], inputs=message_textbox, ) def add_user_msg(user_input_text, chat_history_list): if not user_input_text.strip(): return gr.update(), chat_history_list chat_history_list.append((user_input_text, None)) return gr.update(value=""), chat_history_list async def generate_bot_reply(chat_history_list): if not chat_history_list or chat_history_list[-1][0] is None: yield chat_history_list return user_message = chat_history_list[-1][0] if manager_agent_global is None or LlamaSettings.llm is None: chat_history_list[-1] = (chat_history_list[-1][0], "āŒ Critical Error: ALITA is not properly initialized. Please check server logs and API key.") yield chat_history_list return try: print(f"\nšŸ¤– GRADIOLOG: Processing user message: '{user_message[:100]}{'...' if len(user_message) > 100 else ''}'") await asyncio.sleep(0.01) task_prompt = TaskPrompt(text=user_message) await asyncio.sleep(0.01) loop = asyncio.get_event_loop() response_text_from_agent = await loop.run_in_executor( thread_pool_executor, manager_agent_global.run_task, task_prompt ) await asyncio.sleep(0.01) final_bot_response = response_text_from_agent words = final_bot_response.split() accumulated_response_stream = "" total_words = len(words) current_user_message = chat_history_list[-1][0] chat_history_list[-1] = (current_user_message, "") if not words: chat_history_list[-1] = (current_user_message, final_bot_response.strip()) yield chat_history_list else: for i, word in enumerate(words): accumulated_response_stream += word + " " if total_words > 0: if i == total_words // 4: print("šŸ”„ Streaming response (25%)...") elif i == total_words // 2: print("šŸ”„ Streaming response (50%)...") elif i == (total_words * 3) // 4: print("šŸ”„ Streaming response (75%)...") if i % 3 == 0 or i == len(words) - 1: chat_history_list[-1] = (current_user_message, accumulated_response_stream.strip()) yield chat_history_list await asyncio.sleep(0.01) if chat_history_list[-1][1] != final_bot_response.strip(): chat_history_list[-1] = (current_user_message, final_bot_response.strip()) yield chat_history_list print("āœ… GRADIOLOG: Task processing and streaming completed.") except Exception as e: error_message_for_ui = f"āŒ Gradio/Agent Error: {str(e)}" print(f"\n🚨 GRADIOLOG: Error in generate_bot_reply: {e}") traceback.print_exc() chat_history_list[-1] = (chat_history_list[-1][0], error_message_for_ui) yield chat_history_list message_textbox.submit( add_user_msg, inputs=[message_textbox, chatbot_component], outputs=[message_textbox, chatbot_component], show_progress="hidden", ).then( generate_bot_reply, inputs=[chatbot_component], outputs=[chatbot_component], api_name=False, ) return demo # Initialize LLM and Agent components initialize_components() # --- Launch the Application --- if __name__ == "__main__": print(f"Gradio version: {gr.__version__}") print("šŸš€ Starting Gradio ALITA Chat Application...") alita_interface = create_gradio_interface() try: alita_interface.launch( share=False, server_name="127.0.0.1", server_port=7126, show_error=True, # debug=True # Can be helpful ) except KeyboardInterrupt: print("\nšŸ‘‹ Application stopped by user") except Exception as e: print(f"\nāŒ Error launching application: {e}") traceback.print_exc() finally: print("Shutting down thread pool executor...") thread_pool_executor.shutdown(wait=True) # Clean up threads print("āœ… Gradio application stopped.")