Galita / app.py
mokrane25's picture
Update app.py (#6)
7b04ac8 verified
# app.py
import gradio as gr
import os
import traceback
import asyncio
from dotenv import load_dotenv
from models.task_prompt import TaskPrompt
import time
from llama_index.core import Settings as LlamaSettings # Import at top level
from llama_index.llms.anthropic import Anthropic # Import at top level
from manager_agent import ManagerAgent # Ensure this path is correct
import concurrent.futures # For running blocking code in a separate thread
# Load environment variables from .env file
load_dotenv()
# --- Configuration ---
LLM_MODEL = "claude-sonnet-4-20250514"
# --- Global variables ---
current_status = "Ready"
llm_global = None
manager_agent_global = None
# Settings_global is not strictly needed as a global if LlamaSettings is imported directly
# Thread pool executor for running blocking agent tasks
thread_pool_executor = concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count() or 1)
# --- LlamaIndex LLM Initialization ---
def initialize_components():
global llm_global, manager_agent_global
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
print("\n" + "="*60)
print("⚠️ ERROR: ANTHROPIC_API_KEY not found in environment variables!")
print("Please set your API key (e.g., in a .env file).")
print("="*60 + "\n")
return
try:
llm_global = Anthropic(
model=LLM_MODEL,
temperature=0.2,
max_tokens=4096
)
LlamaSettings.llm = llm_global # Use the imported LlamaSettings directly
print(f"Successfully initialized LlamaIndex with Anthropic model: {LLM_MODEL} (temperature=0.2)")
manager_agent_global = ManagerAgent(
llm_global,
max_iterations=30, # Keep this reasonable for testing
update_callback=update_status_callback
)
print("βœ… ManagerAgent initialized successfully")
except Exception as e:
print(f"Error initializing Anthropic LLM or ManagerAgent: {e}")
traceback.print_exc()
# --- Update callback function (called by ManagerAgent) ---
def update_status_callback(message):
global current_status
# This function is called from the ManagerAgent's thread (potentially)
# or the ReAct agent's execution context.
# It needs to update the global variable, which the Gradio polling thread will pick up.
current_status = message
print(f"βœ… UI_STATUS_UPDATE (via callback): {message}") # Differentiate console log
# --- Status retrieval function for Gradio polling ---
def get_current_status_for_ui():
global current_status
timestamp = time.time()
return f"{current_status}<span style='display:none;'>{timestamp}</span>"
# --- Gradio Interface Setup ---
def create_gradio_interface():
if "ANTHROPIC_API_KEY" not in os.environ:
gr.Warning("ANTHROPIC_API_KEY not found in environment variables! ALITA may not function correctly.")
with gr.Blocks(theme="soft") as demo:
gr.Markdown("# ALITA")
gr.Markdown("ALITA is a self-learning AI agent that can search for information, analyze data, create tools, and orchestrate complex tasks.")
chatbot_component = gr.Chatbot(
label="Chat",
height=500,
show_label=False,
type='messages' # Explicitly set type to 'messages' to avoid deprecation warning
)
gr.Markdown("Gradio version: " + gr.__version__ + " (Chatbot type defaults to 'tuples' for older versions. Consider `type='messages'` for newer Gradio if issues persist with chat display).")
with gr.Row():
message_textbox = gr.Textbox(
placeholder="Type your message here...",
scale=7,
show_label=False,
container=False
)
gr.Examples(
examples=[
"πŸ” Search for information on artificial intelligence",
"πŸ“Š Analyze trends in the technology market",
"⚑ Create a script to automate a repetitive task",
"🌐 Find open-source resources for machine learning",
"What is the temperature in Paris now?"
],
inputs=message_textbox,
)
def add_user_msg(user_input_text, chat_history_list):
if not user_input_text.strip():
return gr.update(), chat_history_list
chat_history_list.append((user_input_text, None))
return gr.update(value=""), chat_history_list
async def generate_bot_reply(chat_history_list):
if not chat_history_list or chat_history_list[-1][0] is None:
yield chat_history_list
return
user_message = chat_history_list[-1][0]
if manager_agent_global is None or LlamaSettings.llm is None:
chat_history_list[-1] = (chat_history_list[-1][0], "❌ Critical Error: ALITA is not properly initialized. Please check server logs and API key.")
yield chat_history_list
return
try:
print(f"\nπŸ€– GRADIOLOG: Processing user message: '{user_message[:100]}{'...' if len(user_message) > 100 else ''}'")
await asyncio.sleep(0.01)
task_prompt = TaskPrompt(text=user_message)
await asyncio.sleep(0.01)
loop = asyncio.get_event_loop()
response_text_from_agent = await loop.run_in_executor(
thread_pool_executor,
manager_agent_global.run_task,
task_prompt
)
await asyncio.sleep(0.01)
final_bot_response = response_text_from_agent
words = final_bot_response.split()
accumulated_response_stream = ""
total_words = len(words)
current_user_message = chat_history_list[-1][0]
chat_history_list[-1] = (current_user_message, "")
if not words:
chat_history_list[-1] = (current_user_message, final_bot_response.strip())
yield chat_history_list
else:
for i, word in enumerate(words):
accumulated_response_stream += word + " "
if total_words > 0:
if i == total_words // 4: print("πŸ”„ Streaming response (25%)...")
elif i == total_words // 2: print("πŸ”„ Streaming response (50%)...")
elif i == (total_words * 3) // 4: print("πŸ”„ Streaming response (75%)...")
if i % 3 == 0 or i == len(words) - 1:
chat_history_list[-1] = (current_user_message, accumulated_response_stream.strip())
yield chat_history_list
await asyncio.sleep(0.01)
if chat_history_list[-1][1] != final_bot_response.strip():
chat_history_list[-1] = (current_user_message, final_bot_response.strip())
yield chat_history_list
print("βœ… GRADIOLOG: Task processing and streaming completed.")
except Exception as e:
error_message_for_ui = f"❌ Gradio/Agent Error: {str(e)}"
print(f"\n🚨 GRADIOLOG: Error in generate_bot_reply: {e}")
traceback.print_exc()
chat_history_list[-1] = (chat_history_list[-1][0], error_message_for_ui)
yield chat_history_list
message_textbox.submit(
add_user_msg,
inputs=[message_textbox, chatbot_component],
outputs=[message_textbox, chatbot_component],
show_progress="hidden",
).then(
generate_bot_reply,
inputs=[chatbot_component],
outputs=[chatbot_component],
api_name=False,
)
return demo
# Initialize LLM and Agent components
initialize_components()
# --- Launch the Application ---
if __name__ == "__main__":
print(f"Gradio version: {gr.__version__}")
print("πŸš€ Starting Gradio ALITA Chat Application...")
alita_interface = create_gradio_interface()
try:
alita_interface.launch(
share=False,
server_name="127.0.0.1",
server_port=7126,
show_error=True,
# debug=True # Can be helpful
)
except KeyboardInterrupt:
print("\nπŸ‘‹ Application stopped by user")
except Exception as e:
print(f"\n❌ Error launching application: {e}")
traceback.print_exc()
finally:
print("Shutting down thread pool executor...")
thread_pool_executor.shutdown(wait=True) # Clean up threads
print("βœ… Gradio application stopped.")