Jofthomas commited on
Commit
4512783
·
verified ·
1 Parent(s): 3f06653

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +517 -0
app.py ADDED
@@ -0,0 +1,517 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import asyncio
3
+ from dotenv import load_dotenv
4
+ from mcp.server.fastmcp import FastMCP
5
+ from utils.agent_factory import create_agent, get_supported_agent_types, get_default_models
6
+ from utils.pokemon_utils import (
7
+ start_ladder_battle, start_battle_against_agent, start_battle_against_player,
8
+ submit_move_for_battle, get_battle_state, list_active_battles,
9
+ download_battle_replay, cleanup_completed_battles, format_battle_state,
10
+ check_recent_battles, debug_move_attributes, active_battles, player_instances
11
+ )
12
+
13
+ load_dotenv()
14
+
15
+ # --- MCP Server Setup ---
16
+ mcp = FastMCP(
17
+ name="PokemonBattleAgent",
18
+ host="0.0.0.0",
19
+ port=7861,
20
+ )
21
+
22
+ # --- Global state for current user session ---
23
+ current_session = {
24
+ 'username': None,
25
+ 'active_battle_id': None
26
+ }
27
+
28
+ # --- MCP Tools ---
29
+
30
+ @mcp.tool()
31
+ def start_ladder_match(username: str = "MCPTrainer") -> dict:
32
+ """
33
+ Start a ladder battle on Pokemon Showdown.
34
+ Args:
35
+ username (str): Username for the MCP-controlled player
36
+ Returns:
37
+ dict: Battle information including battle ID and initial state
38
+ """
39
+ global current_session
40
+ try:
41
+ # Start ladder search (fire-and-forget)
42
+ ladder_result = start_ladder_battle(username)
43
+
44
+ current_session['username'] = username
45
+ # Don't set active_battle_id yet since battle hasn't started
46
+
47
+ return {
48
+ "status": "ladder_search_queued",
49
+ "message": f"Ladder search queued for {username}. Waiting for opponent match.",
50
+ "instructions": "Use find_recent_battles() or get_player_status() in a few seconds to get the battle ID and viewing link once a match is found.",
51
+ "note": "Check server console for 'Ladder search started' confirmation."
52
+ }
53
+ except Exception as e:
54
+ return {
55
+ "status": "error",
56
+ "message": f"Failed to start ladder battle: {str(e)}"
57
+ }
58
+
59
+ @mcp.tool()
60
+ async def battle_agent(agent_type: str, username: str = "MCPTrainer", api_key: str = None, model: str = None) -> dict:
61
+ """
62
+ Start a battle against an AI agent.
63
+ Args:
64
+ agent_type (str): Type of agent ('openai', 'gemini', 'mistral', 'maxdamage', 'random')
65
+ username (str): Username for the MCP-controlled player
66
+ api_key (str, optional): API key for AI agents (required for openai, gemini, mistral)
67
+ model (str, optional): Specific model to use (will use default if not specified)
68
+ Returns:
69
+ dict: Battle information including battle ID and initial state
70
+ """
71
+ global current_session
72
+ try:
73
+ # Validate agent type
74
+ if agent_type.lower() not in get_supported_agent_types():
75
+ return {
76
+ "status": "error",
77
+ "message": f"Unsupported agent type. Supported types: {get_supported_agent_types()}"
78
+ }
79
+
80
+ # Create opponent agent
81
+ opponent = create_agent(agent_type, api_key=api_key, model=model)
82
+
83
+ # Start battle
84
+ battle_id = await start_battle_against_agent(username, opponent)
85
+
86
+ current_session['username'] = username
87
+ current_session['active_battle_id'] = battle_id
88
+
89
+ # Get initial battle state
90
+ battle_info = get_battle_state(battle_id)
91
+
92
+ return {
93
+ "status": "success",
94
+ "battle_id": battle_id,
95
+ "message": f"Battle started against {agent_type} agent",
96
+ "opponent": opponent.username,
97
+ "battle_url": battle_info.get('battle_url', f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#battle-{battle_id}"),
98
+ "battle_state": battle_info['battle_state'],
99
+ "waiting_for_move": battle_info['waiting_for_move']
100
+ }
101
+ except Exception as e:
102
+ return {
103
+ "status": "error",
104
+ "message": f"Failed to start battle against agent: {str(e)}"
105
+ }
106
+
107
+ @mcp.tool()
108
+ def battle_player(opponent_username: str, username: str = "MCPTrainer") -> dict:
109
+ """
110
+ Start a battle against a specific player.
111
+ Args:
112
+ opponent_username (str): Username of the opponent player
113
+ username (str): Username for the MCP-controlled player
114
+ Returns:
115
+ dict: Battle information including battle ID and initial state
116
+ """
117
+ global current_session
118
+ try:
119
+ # Send battle challenge (fire-and-forget)
120
+ challenge_result = start_battle_against_player(username, opponent_username)
121
+
122
+ current_session['username'] = username
123
+ # Don't set active_battle_id yet since battle hasn't started
124
+
125
+ return {
126
+ "status": "challenge_queued",
127
+ "message": f"Challenge to {opponent_username} queued. Battle will start when accepted.",
128
+ "opponent": opponent_username,
129
+ "instructions": "Use find_recent_battles() or get_player_status() in a few seconds to get the battle ID and viewing link once the battle starts.",
130
+ "note": "Check server console for 'Challenge sent' confirmation."
131
+ }
132
+ except Exception as e:
133
+ return {
134
+ "status": "error",
135
+ "message": f"Failed to challenge player: {str(e)}"
136
+ }
137
+
138
+ @mcp.tool()
139
+ async def choose_move(move_name: str, battle_id: str = None) -> dict:
140
+ """
141
+ Choose and execute a move in the current battle.
142
+ Args:
143
+ move_name (str): Name or ID of the move to use
144
+ battle_id (str, optional): Specific battle ID (uses current active battle if not provided)
145
+ Returns:
146
+ dict: Result of the move and updated battle state
147
+ """
148
+ global current_session
149
+ try:
150
+ # Use provided battle_id or current active battle
151
+ target_battle_id = battle_id or current_session.get('active_battle_id')
152
+
153
+ if not target_battle_id:
154
+ return {
155
+ "status": "error",
156
+ "message": "No active battle. Start a battle first."
157
+ }
158
+
159
+ # Submit move
160
+ result = await submit_move_for_battle(target_battle_id, move_name=move_name)
161
+
162
+ # Get updated battle state
163
+ battle_info = get_battle_state(target_battle_id)
164
+
165
+ return {
166
+ "status": "success",
167
+ "message": result,
168
+ "battle_state": battle_info['battle_state'],
169
+ "waiting_for_move": battle_info['waiting_for_move']
170
+ }
171
+ except Exception as e:
172
+ return {
173
+ "status": "error",
174
+ "message": f"Failed to execute move: {str(e)}"
175
+ }
176
+
177
+ @mcp.tool()
178
+ async def switch_pokemon(pokemon_name: str, battle_id: str = None) -> dict:
179
+ """
180
+ Switch to a different Pokemon in the current battle.
181
+ Args:
182
+ pokemon_name (str): Name of the Pokemon to switch to
183
+ battle_id (str, optional): Specific battle ID (uses current active battle if not provided)
184
+ Returns:
185
+ dict: Result of the switch and updated battle state
186
+ """
187
+ global current_session
188
+ try:
189
+ # Use provided battle_id or current active battle
190
+ target_battle_id = battle_id or current_session.get('active_battle_id')
191
+
192
+ if not target_battle_id:
193
+ return {
194
+ "status": "error",
195
+ "message": "No active battle. Start a battle first."
196
+ }
197
+
198
+ # Submit switch
199
+ result = await submit_move_for_battle(target_battle_id, pokemon_name=pokemon_name)
200
+
201
+ # Get updated battle state
202
+ battle_info = get_battle_state(target_battle_id)
203
+
204
+ return {
205
+ "status": "success",
206
+ "message": result,
207
+ "battle_state": battle_info['battle_state'],
208
+ "waiting_for_move": battle_info['waiting_for_move']
209
+ }
210
+ except Exception as e:
211
+ return {
212
+ "status": "error",
213
+ "message": f"Failed to switch Pokemon: {str(e)}"
214
+ }
215
+
216
+ @mcp.tool()
217
+ def get_current_battle_state(battle_id: str = None) -> dict:
218
+ """
219
+ Get the current state of a battle.
220
+ Args:
221
+ battle_id (str, optional): Specific battle ID (uses current active battle if not provided)
222
+ Returns:
223
+ dict: Current battle state with all relevant information
224
+ """
225
+ global current_session
226
+ try:
227
+ # Use provided battle_id or current active battle
228
+ target_battle_id = battle_id or current_session.get('active_battle_id')
229
+
230
+ if not target_battle_id:
231
+ return {
232
+ "status": "error",
233
+ "message": "No active battle. Start a battle first."
234
+ }
235
+
236
+ # Get battle state
237
+ battle_info = get_battle_state(target_battle_id)
238
+
239
+ return {
240
+ "status": "success",
241
+ "battle_id": target_battle_id,
242
+ **battle_info
243
+ }
244
+ except Exception as e:
245
+ return {
246
+ "status": "error",
247
+ "message": f"Failed to get battle state: {str(e)}"
248
+ }
249
+
250
+ @mcp.tool()
251
+ def list_battles() -> dict:
252
+ """
253
+ List all active battles.
254
+ Returns:
255
+ dict: List of all active battles and their status
256
+ """
257
+ try:
258
+ battles = list_active_battles()
259
+ return {
260
+ "status": "success",
261
+ "active_battles": battles,
262
+ "count": len(battles)
263
+ }
264
+ except Exception as e:
265
+ return {
266
+ "status": "error",
267
+ "message": f"Failed to list battles: {str(e)}"
268
+ }
269
+
270
+ @mcp.tool()
271
+ async def download_replay(battle_id: str = None) -> dict:
272
+ """
273
+ Download the replay for a completed battle.
274
+ Args:
275
+ battle_id (str, optional): Battle ID (uses current active battle if not provided)
276
+ Returns:
277
+ dict: Information about the downloaded replay
278
+ """
279
+ global current_session
280
+ try:
281
+ # Use provided battle_id or current active battle
282
+ target_battle_id = battle_id or current_session.get('active_battle_id')
283
+
284
+ if not target_battle_id:
285
+ return {
286
+ "status": "error",
287
+ "message": "No battle ID provided. Specify a battle_id or start a battle first."
288
+ }
289
+
290
+ # Download replay
291
+ replay_path = await download_battle_replay(target_battle_id)
292
+
293
+ return {
294
+ "status": "success",
295
+ "message": f"Replay downloaded successfully",
296
+ "replay_path": replay_path,
297
+ "battle_id": target_battle_id
298
+ }
299
+ except Exception as e:
300
+ return {
301
+ "status": "error",
302
+ "message": f"Failed to download replay: {str(e)}"
303
+ }
304
+
305
+ @mcp.tool()
306
+ def get_supported_agents() -> dict:
307
+ """
308
+ Get information about supported agent types and their default models.
309
+ Returns:
310
+ dict: Information about available agents
311
+ """
312
+ return {
313
+ "status": "success",
314
+ "supported_agents": get_supported_agent_types(),
315
+ "default_models": get_default_models(),
316
+ "description": {
317
+ "openai": "OpenAI GPT models (requires API key)",
318
+ "gemini": "Google Gemini models (requires API key)",
319
+ "mistral": "Mistral AI models (requires API key)",
320
+ "maxdamage": "Simple agent that chooses moves with highest base power",
321
+ "random": "Random move selection agent"
322
+ }
323
+ }
324
+
325
+ @mcp.tool()
326
+ def get_player_status(username: str = None) -> dict:
327
+ """
328
+ Get the current status of a player including any active battles.
329
+ Args:
330
+ username (str, optional): Username to check (uses current session if not provided)
331
+ Returns:
332
+ dict: Player status and battle information
333
+ """
334
+ global current_session
335
+ try:
336
+ # Use provided username or current session username
337
+ target_username = username or current_session.get('username')
338
+
339
+ if not target_username:
340
+ return {
341
+ "status": "error",
342
+ "message": "No username provided and no active session."
343
+ }
344
+
345
+ if target_username not in player_instances:
346
+ return {
347
+ "status": "no_player",
348
+ "username": target_username,
349
+ "message": f"No player instance found for {target_username}"
350
+ }
351
+
352
+ player = player_instances[target_username]
353
+ player_battles = []
354
+
355
+ for battle_id, battle in player.battles.items():
356
+ battle_info = {
357
+ 'battle_id': battle_id,
358
+ 'battle_url': f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#battle-{battle_id}",
359
+ 'turn': getattr(battle, 'turn', 0),
360
+ 'finished': getattr(battle, 'finished', False),
361
+ 'won': getattr(battle, 'won', None)
362
+ }
363
+ player_battles.append(battle_info)
364
+
365
+ return {
366
+ "status": "success",
367
+ "username": target_username,
368
+ "player_username": player.username,
369
+ "total_battles": len(player.battles),
370
+ "battles": player_battles,
371
+ "n_won_battles": getattr(player, 'n_won_battles', 0),
372
+ "n_finished_battles": getattr(player, 'n_finished_battles', 0)
373
+ }
374
+ except Exception as e:
375
+ return {
376
+ "status": "error",
377
+ "message": f"Failed to get player status: {str(e)}"
378
+ }
379
+
380
+ @mcp.tool()
381
+ def find_recent_battles(username: str = None) -> dict:
382
+ """
383
+ Check for recent battles that may have started after a timeout.
384
+ Useful when battle requests time out but battles actually started.
385
+ Args:
386
+ username (str, optional): Username to check (uses current session if not provided)
387
+ Returns:
388
+ dict: List of recent battles with viewing links
389
+ """
390
+ global current_session
391
+ try:
392
+ # Use provided username or current session username
393
+ target_username = username or current_session.get('username')
394
+
395
+ if not target_username:
396
+ return {
397
+ "status": "error",
398
+ "message": "No username provided and no active session."
399
+ }
400
+
401
+ # Check for recent battles using the imported function
402
+ recent_battles = check_recent_battles(target_username)
403
+
404
+ if recent_battles:
405
+ # Update current session with the most recent battle
406
+ current_session['active_battle_id'] = recent_battles[0]['battle_id']
407
+
408
+ return {
409
+ "status": "success",
410
+ "username": target_username,
411
+ "recent_battles": recent_battles,
412
+ "count": len(recent_battles),
413
+ "message": f"Found {len(recent_battles)} recent battles" if recent_battles else "No recent battles found"
414
+ }
415
+ except Exception as e:
416
+ return {
417
+ "status": "error",
418
+ "message": f"Failed to check recent battles: {str(e)}"
419
+ }
420
+
421
+ @mcp.tool()
422
+ def debug_battle_objects(username: str = None, battle_id: str = None) -> dict:
423
+ """
424
+ Debug tool to inspect battle objects and their attributes.
425
+ Useful for troubleshooting formatting errors.
426
+ Args:
427
+ username (str, optional): Username to debug (uses current session if not provided)
428
+ battle_id (str, optional): Specific battle ID to debug
429
+ Returns:
430
+ dict: Debug information about battle objects
431
+ """
432
+ global current_session
433
+ try:
434
+ # Use provided username or current session username
435
+ target_username = username or current_session.get('username')
436
+ target_battle_id = battle_id or current_session.get('active_battle_id')
437
+
438
+ if not target_username:
439
+ return {
440
+ "status": "error",
441
+ "message": "No username provided and no active session."
442
+ }
443
+
444
+ if target_username not in player_instances:
445
+ return {
446
+ "status": "error",
447
+ "message": f"No player instance found for {target_username}"
448
+ }
449
+
450
+ player = player_instances[target_username]
451
+ debug_result = {
452
+ "status": "success",
453
+ "username": target_username,
454
+ "player_username": player.username,
455
+ "total_battles": len(player.battles),
456
+ "battles_debug": []
457
+ }
458
+
459
+ battles_to_debug = []
460
+ if target_battle_id and target_battle_id in player.battles:
461
+ battles_to_debug = [(target_battle_id, player.battles[target_battle_id])]
462
+ else:
463
+ # Debug all battles (limit to 3 most recent)
464
+ battles_to_debug = list(player.battles.items())[-3:]
465
+
466
+ for bid, battle in battles_to_debug:
467
+ try:
468
+ battle_debug = {
469
+ "battle_id": bid,
470
+ "battle_type": type(battle).__name__,
471
+ "battle_attributes": [attr for attr in dir(battle) if not attr.startswith('_')],
472
+ "move_debug": debug_move_attributes(battle)
473
+ }
474
+ debug_result["battles_debug"].append(battle_debug)
475
+ except Exception as e:
476
+ debug_result["battles_debug"].append({
477
+ "battle_id": bid,
478
+ "error": f"Error debugging battle: {e}"
479
+ })
480
+
481
+ return debug_result
482
+
483
+ except Exception as e:
484
+ return {
485
+ "status": "error",
486
+ "message": f"Failed to debug battle objects: {str(e)}"
487
+ }
488
+
489
+ @mcp.tool()
490
+ def cleanup_battles() -> dict:
491
+ """
492
+ Clean up completed battles to free memory.
493
+ Returns:
494
+ dict: Status of cleanup operation
495
+ """
496
+ try:
497
+ cleanup_completed_battles()
498
+ return {
499
+ "status": "success",
500
+ "message": "Completed battles cleaned up successfully"
501
+ }
502
+ except Exception as e:
503
+ return {
504
+ "status": "error",
505
+ "message": f"Failed to cleanup battles: {str(e)}"
506
+ }
507
+
508
+
509
+ # --- Server Execution ---
510
+ if __name__ == "__main__":
511
+ print(f"Pokemon Battle MCP Server starting on port 7861...")
512
+ print("Available battle types:")
513
+ print("- Ladder battles")
514
+ print("- AI agent battles (OpenAI, Gemini, Mistral, MaxDamage, Random)")
515
+ print("- Human player battles")
516
+ print("Running Pokemon Battle MCP server with SSE transport")
517
+ mcp.run(transport="sse")