Spaces:
Sleeping
Sleeping
| import os | |
| import asyncio | |
| from dotenv import load_dotenv | |
| from mcp.server.fastmcp import FastMCP | |
| from utils.pokemon_utils import ( | |
| start_ladder_battle, start_battle_against_player, | |
| submit_move_for_battle, get_battle_state, list_active_battles, | |
| download_battle_replay, cleanup_completed_battles, format_battle_state, | |
| check_recent_battles, debug_move_attributes, active_battles, player_instances | |
| ) | |
| from utils.pokemon_utils import player_instances_by_unique | |
| load_dotenv() | |
| # --- MCP Server Setup --- | |
| mcp = FastMCP( | |
| name="PokemonBattleAgent", | |
| host="0.0.0.0", | |
| port=7860, | |
| ) | |
| current_session = { | |
| 'username': None, | |
| 'active_battle_id': None | |
| } | |
| def battle_player(opponent_username: str, username: str = "MCPTrainer") -> dict: | |
| """ | |
| Start a battle against a specific player. | |
| Args: | |
| opponent_username (str): Username of the opponent player | |
| username (str): Username for the MCP-controlled player | |
| Returns: | |
| dict: Battle information including battle ID and initial state | |
| """ | |
| global current_session | |
| try: | |
| challenge_result = start_battle_against_player(username, opponent_username) | |
| current_session['username'] = username | |
| return { | |
| "status": "challenge_queued", | |
| "message": f"Challenge to {opponent_username} queued. Battle will start when accepted.", | |
| "opponent": opponent_username, | |
| "player_base_username": challenge_result.get('player_base_username'), | |
| "player_unique_username": challenge_result.get('player_unique_username'), | |
| "instructions": "Use find_recent_battles() or get_player_status() in a few seconds to get the battle ID and viewing link once the battle starts.", | |
| "note": "Check server console for 'Challenge sent' confirmation." | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"Failed to challenge player: {str(e)}" | |
| } | |
| async def choose_move(move_name: str, battle_id: str = None) -> dict: | |
| """ | |
| Choose and execute a move in the current battle. | |
| Args: | |
| move_name (str): Name or ID of the move to use | |
| battle_id (str, optional): Specific battle ID (uses current active battle if not provided) | |
| Returns: | |
| dict: Result of the move and updated battle state | |
| """ | |
| global current_session | |
| try: | |
| # Use provided battle_id or current active battle | |
| target_battle_id = battle_id or current_session.get('active_battle_id') | |
| if not target_battle_id: | |
| return { | |
| "status": "error", | |
| "message": "No active battle. Start a battle first." | |
| } | |
| # Submit move | |
| result = await submit_move_for_battle(target_battle_id, move_name=move_name) | |
| # Get updated battle state | |
| battle_info = get_battle_state(target_battle_id) | |
| return { | |
| "status": "success", | |
| "message": result, | |
| "battle_state": battle_info['battle_state'], | |
| "waiting_for_move": battle_info['waiting_for_move'] | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"Failed to execute move: {str(e)}" | |
| } | |
| async def switch_pokemon(pokemon_name: str, battle_id: str = None) -> dict: | |
| """ | |
| Switch to a different Pokemon in the current battle. | |
| Args: | |
| pokemon_name (str): Name of the Pokemon to switch to | |
| battle_id (str, optional): Specific battle ID (uses current active battle if not provided) | |
| Returns: | |
| dict: Result of the switch and updated battle state | |
| """ | |
| global current_session | |
| try: | |
| # Use provided battle_id or current active battle | |
| target_battle_id = battle_id or current_session.get('active_battle_id') | |
| if not target_battle_id: | |
| return { | |
| "status": "error", | |
| "message": "No active battle. Start a battle first." | |
| } | |
| # Submit switch | |
| result = await submit_move_for_battle(target_battle_id, pokemon_name=pokemon_name) | |
| # Get updated battle state | |
| battle_info = get_battle_state(target_battle_id) | |
| return { | |
| "status": "success", | |
| "message": result, | |
| "battle_state": battle_info['battle_state'], | |
| "waiting_for_move": battle_info['waiting_for_move'] | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"Failed to switch Pokemon: {str(e)}" | |
| } | |
| def get_current_battle_state(battle_id: str = None) -> dict: | |
| """ | |
| Get the current state of a battle. | |
| Args: | |
| battle_id (str, optional): Specific battle ID (uses current active battle if not provided) | |
| Returns: | |
| dict: Current battle state with all relevant information | |
| """ | |
| global current_session | |
| try: | |
| # Use provided battle_id or current active battle | |
| target_battle_id = battle_id or current_session.get('active_battle_id') | |
| if not target_battle_id: | |
| return { | |
| "status": "error", | |
| "message": "No active battle. Start a battle first." | |
| } | |
| battle_info = get_battle_state(target_battle_id) | |
| return { | |
| "status": "success", | |
| "battle_id": target_battle_id, | |
| **battle_info | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"Failed to get battle state: {str(e)}" | |
| } | |
| def list_battles() -> dict: | |
| """ | |
| List all active battles. | |
| Returns: | |
| dict: List of all active battles and their status | |
| """ | |
| try: | |
| battles = list_active_battles() | |
| return { | |
| "status": "success", | |
| "active_battles": battles, | |
| "count": len(battles) | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"Failed to list battles: {str(e)}" | |
| } | |
| async def download_replay(battle_id: str = None) -> dict: | |
| """ | |
| Download the replay for a completed battle. | |
| Args: | |
| battle_id (str, optional): Battle ID (uses current active battle if not provided) | |
| Returns: | |
| dict: Information about the downloaded replay | |
| """ | |
| global current_session | |
| try: | |
| # Use provided battle_id or current active battle | |
| target_battle_id = battle_id or current_session.get('active_battle_id') | |
| if not target_battle_id: | |
| return { | |
| "status": "error", | |
| "message": "No battle ID provided. Specify a battle_id or start a battle first." | |
| } | |
| replay_path = await download_battle_replay(target_battle_id) | |
| return { | |
| "status": "success", | |
| "message": f"Replay downloaded successfully", | |
| "replay_path": replay_path, | |
| "battle_id": target_battle_id | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"Failed to download replay: {str(e)}" | |
| } | |
| def get_player_status(username: str = None) -> dict: | |
| """ | |
| Get the current status of a player including any active battles. | |
| Args: | |
| username (str, optional): Username to check (uses current session if not provided) | |
| Returns: | |
| dict: Player status and battle information | |
| """ | |
| global current_session | |
| try: | |
| # Use provided username or current session username | |
| target_username = username or current_session.get('username') | |
| if not target_username: | |
| return { | |
| "status": "error", | |
| "message": "No username provided and no active session." | |
| } | |
| players_to_check = [] | |
| if target_username in player_instances: | |
| players_to_check = list(player_instances[target_username].values()) | |
| elif target_username in player_instances_by_unique: | |
| players_to_check = [player_instances_by_unique[target_username]] | |
| else: | |
| return { | |
| "status": "no_player", | |
| "username": target_username, | |
| "message": f"No player instance found for {target_username}" | |
| } | |
| all_battles = [] | |
| total_battles = 0 | |
| total_won = 0 | |
| total_finished = 0 | |
| for p in players_to_check: | |
| total_battles += len(p.battles) | |
| total_won += getattr(p, 'n_won_battles', 0) | |
| total_finished += getattr(p, 'n_finished_battles', 0) | |
| for battle_id, battle in p.battles.items(): | |
| battle_info = { | |
| 'battle_id': battle_id, | |
| 'battle_url': f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#battle-{battle_id}", | |
| 'turn': getattr(battle, 'turn', 0), | |
| 'finished': getattr(battle, 'finished', False), | |
| 'won': getattr(battle, 'won', None), | |
| 'player_unique_username': p.username, | |
| 'player_base_username': getattr(p, 'base_username', target_username) | |
| } | |
| all_battles.append(battle_info) | |
| return { | |
| "status": "success", | |
| "query_username": target_username, | |
| "resolved_scope": "base" if target_username in player_instances else "unique", | |
| "players": [ | |
| { | |
| 'player_unique_username': p.username, | |
| 'player_base_username': getattr(p, 'base_username', target_username), | |
| 'n_battles': len(p.battles) | |
| } for p in players_to_check | |
| ], | |
| "total_battles": total_battles, | |
| "battles": all_battles, | |
| "n_won_battles": total_won, | |
| "n_finished_battles": total_finished | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"Failed to get player status: {str(e)}" | |
| } | |
| def find_recent_battles(username: str = None) -> dict: | |
| """ | |
| Check for recent battles that may have started after a timeout. | |
| Useful when battle requests time out but battles actually started. | |
| Args: | |
| username (str, optional): Username to check (uses current session if not provided) | |
| Returns: | |
| dict: List of recent battles with viewing links | |
| """ | |
| global current_session | |
| try: | |
| # Use provided username or current session username | |
| target_username = username or current_session.get('username') | |
| if not target_username: | |
| return { | |
| "status": "error", | |
| "message": "No username provided and no active session." | |
| } | |
| # Check for recent battles using the imported function | |
| recent_battles = check_recent_battles(target_username) | |
| if recent_battles: | |
| # Update current session with the most recent battle | |
| current_session['active_battle_id'] = recent_battles[0]['battle_id'] | |
| return { | |
| "status": "success", | |
| "username": target_username, | |
| "recent_battles": recent_battles, | |
| "count": len(recent_battles), | |
| "message": f"Found {len(recent_battles)} recent battles" if recent_battles else "No recent battles found" | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"Failed to check recent battles: {str(e)}" | |
| } | |
| def cleanup_battles() -> dict: | |
| """ | |
| Clean up completed battles to free memory. | |
| Returns: | |
| dict: Status of cleanup operation | |
| """ | |
| try: | |
| cleanup_completed_battles() | |
| return { | |
| "status": "success", | |
| "message": "Completed battles cleaned up successfully" | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"Failed to cleanup battles: {str(e)}" | |
| } | |
| async def wait_30_seconds() -> dict: | |
| """ | |
| Wait for 30 seconds. Useful for giving processes time to complete or timing operations. | |
| Returns: | |
| dict: Status of the wait operation | |
| """ | |
| try: | |
| await asyncio.sleep(30) | |
| return { | |
| "status": "success", | |
| "message": "Waited 30 seconds successfully" | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"Failed to wait: {str(e)}" | |
| } | |
| # --- Server Execution --- | |
| if __name__ == "__main__": | |
| print(f"Pokemon Battle MCP Server starting on port 7860...") | |
| print("Available battle types:") | |
| print("- Ladder battles") | |
| print("- AI agent battles (OpenAI, Gemini, Mistral, MaxDamage) - NO RANDOM FALLBACKS") | |
| print("- Human player battles") | |
| print("Running Pokemon Battle MCP server with SSE transport") | |
| mcp.run(transport="sse") |