Pokemon_server / app.py
Jofthomas's picture
change
d5e10b3
raw
history blame
18.8 kB
import os
import asyncio
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from utils.agent_factory import create_agent, get_supported_agent_types, get_default_models
from utils.pokemon_utils import (
start_ladder_battle, start_battle_against_agent, start_battle_against_player,
submit_move_for_battle, get_battle_state, list_active_battles,
download_battle_replay, cleanup_completed_battles, format_battle_state,
check_recent_battles, debug_move_attributes, active_battles, player_instances
)
load_dotenv()
# --- MCP Server Setup ---
mcp = FastMCP(
name="PokemonBattleAgent",
host="0.0.0.0",
port=7860,
)
# --- Global state for current user session ---
current_session = {
'username': None,
'active_battle_id': None
}
# --- MCP Tools ---
@mcp.tool()
def start_ladder_match(username: str = "MCPTrainer") -> dict:
"""
Start a ladder battle on Pokemon Showdown.
Args:
username (str): Username for the MCP-controlled player
Returns:
dict: Battle information including battle ID and initial state
"""
global current_session
try:
# Start ladder search (fire-and-forget)
ladder_result = start_ladder_battle(username)
current_session['username'] = username
# Don't set active_battle_id yet since battle hasn't started
return {
"status": "ladder_search_queued",
"message": f"Ladder search queued for {username}. Waiting for opponent match.",
"instructions": "Use find_recent_battles() or get_player_status() in a few seconds to get the battle ID and viewing link once a match is found.",
"note": "Check server console for 'Ladder search started' confirmation."
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to start ladder battle: {str(e)}"
}
@mcp.tool()
async def battle_agent(agent_type: str, username: str = "MCPTrainer", api_key: str = None, model: str = None) -> dict:
"""
Start a battle against an AI agent.
Args:
agent_type (str): Type of agent ('openai', 'gemini', 'mistral', 'maxdamage', 'random')
username (str): Username for the MCP-controlled player
api_key (str, optional): API key for AI agents (required for openai, gemini, mistral)
model (str, optional): Specific model to use (will use default if not specified)
Returns:
dict: Battle information including battle ID and initial state
"""
global current_session
try:
# Validate agent type
if agent_type.lower() not in get_supported_agent_types():
return {
"status": "error",
"message": f"Unsupported agent type. Supported types: {get_supported_agent_types()}"
}
# Create opponent agent
opponent = create_agent(agent_type, api_key=api_key, model=model)
# Start battle
battle_id = await start_battle_against_agent(username, opponent)
current_session['username'] = username
current_session['active_battle_id'] = battle_id
# Get initial battle state
battle_info = get_battle_state(battle_id)
return {
"status": "success",
"battle_id": battle_id,
"message": f"Battle started against {agent_type} agent",
"opponent": opponent.username,
"battle_url": battle_info.get('battle_url', f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#battle-{battle_id}"),
"battle_state": battle_info['battle_state'],
"waiting_for_move": battle_info['waiting_for_move']
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to start battle against agent: {str(e)}"
}
@mcp.tool()
def battle_player(opponent_username: str, username: str = "MCPTrainer") -> dict:
"""
Start a battle against a specific player.
Args:
opponent_username (str): Username of the opponent player
username (str): Username for the MCP-controlled player
Returns:
dict: Battle information including battle ID and initial state
"""
global current_session
try:
# Send battle challenge (fire-and-forget)
challenge_result = start_battle_against_player(username, opponent_username)
current_session['username'] = username
# Don't set active_battle_id yet since battle hasn't started
return {
"status": "challenge_queued",
"message": f"Challenge to {opponent_username} queued. Battle will start when accepted.",
"opponent": opponent_username,
"instructions": "Use find_recent_battles() or get_player_status() in a few seconds to get the battle ID and viewing link once the battle starts.",
"note": "Check server console for 'Challenge sent' confirmation."
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to challenge player: {str(e)}"
}
@mcp.tool()
async def choose_move(move_name: str, battle_id: str = None) -> dict:
"""
Choose and execute a move in the current battle.
Args:
move_name (str): Name or ID of the move to use
battle_id (str, optional): Specific battle ID (uses current active battle if not provided)
Returns:
dict: Result of the move and updated battle state
"""
global current_session
try:
# Use provided battle_id or current active battle
target_battle_id = battle_id or current_session.get('active_battle_id')
if not target_battle_id:
return {
"status": "error",
"message": "No active battle. Start a battle first."
}
# Submit move
result = await submit_move_for_battle(target_battle_id, move_name=move_name)
# Get updated battle state
battle_info = get_battle_state(target_battle_id)
return {
"status": "success",
"message": result,
"battle_state": battle_info['battle_state'],
"waiting_for_move": battle_info['waiting_for_move']
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to execute move: {str(e)}"
}
@mcp.tool()
async def switch_pokemon(pokemon_name: str, battle_id: str = None) -> dict:
"""
Switch to a different Pokemon in the current battle.
Args:
pokemon_name (str): Name of the Pokemon to switch to
battle_id (str, optional): Specific battle ID (uses current active battle if not provided)
Returns:
dict: Result of the switch and updated battle state
"""
global current_session
try:
# Use provided battle_id or current active battle
target_battle_id = battle_id or current_session.get('active_battle_id')
if not target_battle_id:
return {
"status": "error",
"message": "No active battle. Start a battle first."
}
# Submit switch
result = await submit_move_for_battle(target_battle_id, pokemon_name=pokemon_name)
# Get updated battle state
battle_info = get_battle_state(target_battle_id)
return {
"status": "success",
"message": result,
"battle_state": battle_info['battle_state'],
"waiting_for_move": battle_info['waiting_for_move']
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to switch Pokemon: {str(e)}"
}
@mcp.tool()
def get_current_battle_state(battle_id: str = None) -> dict:
"""
Get the current state of a battle.
Args:
battle_id (str, optional): Specific battle ID (uses current active battle if not provided)
Returns:
dict: Current battle state with all relevant information
"""
global current_session
try:
# Use provided battle_id or current active battle
target_battle_id = battle_id or current_session.get('active_battle_id')
if not target_battle_id:
return {
"status": "error",
"message": "No active battle. Start a battle first."
}
# Get battle state
battle_info = get_battle_state(target_battle_id)
return {
"status": "success",
"battle_id": target_battle_id,
**battle_info
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to get battle state: {str(e)}"
}
@mcp.tool()
def list_battles() -> dict:
"""
List all active battles.
Returns:
dict: List of all active battles and their status
"""
try:
battles = list_active_battles()
return {
"status": "success",
"active_battles": battles,
"count": len(battles)
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to list battles: {str(e)}"
}
@mcp.tool()
async def download_replay(battle_id: str = None) -> dict:
"""
Download the replay for a completed battle.
Args:
battle_id (str, optional): Battle ID (uses current active battle if not provided)
Returns:
dict: Information about the downloaded replay
"""
global current_session
try:
# Use provided battle_id or current active battle
target_battle_id = battle_id or current_session.get('active_battle_id')
if not target_battle_id:
return {
"status": "error",
"message": "No battle ID provided. Specify a battle_id or start a battle first."
}
# Download replay
replay_path = await download_battle_replay(target_battle_id)
return {
"status": "success",
"message": f"Replay downloaded successfully",
"replay_path": replay_path,
"battle_id": target_battle_id
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to download replay: {str(e)}"
}
@mcp.tool()
def get_supported_agents() -> dict:
"""
Get information about supported agent types and their default models.
Returns:
dict: Information about available agents
"""
return {
"status": "success",
"supported_agents": get_supported_agent_types(),
"default_models": get_default_models(),
"description": {
"openai": "OpenAI GPT models (requires API key)",
"gemini": "Google Gemini models (requires API key)",
"mistral": "Mistral AI models (requires API key)",
"maxdamage": "Simple agent that chooses moves with highest base power",
"random": "Random move selection agent"
}
}
@mcp.tool()
def get_player_status(username: str = None) -> dict:
"""
Get the current status of a player including any active battles.
Args:
username (str, optional): Username to check (uses current session if not provided)
Returns:
dict: Player status and battle information
"""
global current_session
try:
# Use provided username or current session username
target_username = username or current_session.get('username')
if not target_username:
return {
"status": "error",
"message": "No username provided and no active session."
}
if target_username not in player_instances:
return {
"status": "no_player",
"username": target_username,
"message": f"No player instance found for {target_username}"
}
player = player_instances[target_username]
player_battles = []
for battle_id, battle in player.battles.items():
battle_info = {
'battle_id': battle_id,
'battle_url': f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#battle-{battle_id}",
'turn': getattr(battle, 'turn', 0),
'finished': getattr(battle, 'finished', False),
'won': getattr(battle, 'won', None)
}
player_battles.append(battle_info)
return {
"status": "success",
"username": target_username,
"player_username": player.username,
"total_battles": len(player.battles),
"battles": player_battles,
"n_won_battles": getattr(player, 'n_won_battles', 0),
"n_finished_battles": getattr(player, 'n_finished_battles', 0)
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to get player status: {str(e)}"
}
@mcp.tool()
def find_recent_battles(username: str = None) -> dict:
"""
Check for recent battles that may have started after a timeout.
Useful when battle requests time out but battles actually started.
Args:
username (str, optional): Username to check (uses current session if not provided)
Returns:
dict: List of recent battles with viewing links
"""
global current_session
try:
# Use provided username or current session username
target_username = username or current_session.get('username')
if not target_username:
return {
"status": "error",
"message": "No username provided and no active session."
}
# Check for recent battles using the imported function
recent_battles = check_recent_battles(target_username)
if recent_battles:
# Update current session with the most recent battle
current_session['active_battle_id'] = recent_battles[0]['battle_id']
return {
"status": "success",
"username": target_username,
"recent_battles": recent_battles,
"count": len(recent_battles),
"message": f"Found {len(recent_battles)} recent battles" if recent_battles else "No recent battles found"
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to check recent battles: {str(e)}"
}
@mcp.tool()
def debug_battle_objects(username: str = None, battle_id: str = None) -> dict:
"""
Debug tool to inspect battle objects and their attributes.
Useful for troubleshooting formatting errors.
Args:
username (str, optional): Username to debug (uses current session if not provided)
battle_id (str, optional): Specific battle ID to debug
Returns:
dict: Debug information about battle objects
"""
global current_session
try:
# Use provided username or current session username
target_username = username or current_session.get('username')
target_battle_id = battle_id or current_session.get('active_battle_id')
if not target_username:
return {
"status": "error",
"message": "No username provided and no active session."
}
if target_username not in player_instances:
return {
"status": "error",
"message": f"No player instance found for {target_username}"
}
player = player_instances[target_username]
debug_result = {
"status": "success",
"username": target_username,
"player_username": player.username,
"total_battles": len(player.battles),
"battles_debug": []
}
battles_to_debug = []
if target_battle_id and target_battle_id in player.battles:
battles_to_debug = [(target_battle_id, player.battles[target_battle_id])]
else:
# Debug all battles (limit to 3 most recent)
battles_to_debug = list(player.battles.items())[-3:]
for bid, battle in battles_to_debug:
try:
battle_debug = {
"battle_id": bid,
"battle_type": type(battle).__name__,
"battle_attributes": [attr for attr in dir(battle) if not attr.startswith('_')],
"move_debug": debug_move_attributes(battle)
}
debug_result["battles_debug"].append(battle_debug)
except Exception as e:
debug_result["battles_debug"].append({
"battle_id": bid,
"error": f"Error debugging battle: {e}"
})
return debug_result
except Exception as e:
return {
"status": "error",
"message": f"Failed to debug battle objects: {str(e)}"
}
@mcp.tool()
def cleanup_battles() -> dict:
"""
Clean up completed battles to free memory.
Returns:
dict: Status of cleanup operation
"""
try:
cleanup_completed_battles()
return {
"status": "success",
"message": "Completed battles cleaned up successfully"
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to cleanup battles: {str(e)}"
}
@mcp.tool()
async def wait_30_seconds() -> dict:
"""
Wait for 30 seconds. Useful for giving processes time to complete or timing operations.
Returns:
dict: Status of the wait operation
"""
try:
await asyncio.sleep(30)
return {
"status": "success",
"message": "Waited 30 seconds successfully"
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to wait: {str(e)}"
}
# --- Server Execution ---
if __name__ == "__main__":
print(f"Pokemon Battle MCP Server starting on port 7861...")
print("Available battle types:")
print("- Ladder battles")
print("- AI agent battles (OpenAI, Gemini, Mistral, MaxDamage, Random)")
print("- Human player battles")
print("Running Pokemon Battle MCP server with SSE transport")
mcp.run(transport="sse")