File size: 21,957 Bytes
8d60e33
 
 
 
 
f26d6cd
 
 
8d60e33
 
 
 
f26d6cd
 
 
8d60e33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f26d6cd
8d60e33
 
 
 
 
 
 
f26d6cd
8d60e33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f26d6cd
 
8d60e33
 
 
f26d6cd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8d60e33
 
f26d6cd
8d60e33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f26d6cd
8d60e33
 
 
45c4ed4
f26d6cd
 
 
8d60e33
45c4ed4
8d60e33
 
 
 
 
 
 
 
 
 
 
 
 
 
f26d6cd
8d60e33
 
 
 
f26d6cd
8d60e33
f26d6cd
8d60e33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f26d6cd
8d60e33
f26d6cd
 
8d60e33
f26d6cd
 
 
8d60e33
f26d6cd
 
8d60e33
f26d6cd
 
8d60e33
 
 
 
 
 
 
 
 
 
 
f26d6cd
8d60e33
f26d6cd
8d60e33
f26d6cd
 
8d60e33
f26d6cd
8d60e33
 
 
 
 
f26d6cd
 
8d60e33
f26d6cd
8d60e33
 
 
 
f26d6cd
8d60e33
 
f26d6cd
 
8d60e33
f26d6cd
8d60e33
 
 
 
f26d6cd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8d60e33
 
 
f26d6cd
 
8d60e33
 
 
 
 
 
 
 
 
 
 
 
 
f26d6cd
8d60e33
 
 
 
 
 
 
 
 
 
 
 
f26d6cd
8d60e33
 
 
 
 
 
 
f26d6cd
 
8d60e33
 
 
 
 
 
 
 
 
f26d6cd
 
 
8d60e33
 
 
f26d6cd
 
 
 
 
 
 
8d60e33
f26d6cd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8d60e33
f26d6cd
8d60e33
f26d6cd
8d60e33
f26d6cd
8d60e33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f26d6cd
8d60e33
 
 
 
 
 
 
 
 
 
 
 
f26d6cd
8d60e33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f26d6cd
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
# app/services/validator_service.py
"""
A2A Validator service.
- Provides /validator (UI) + /validator/agent-card (HTTP) routes.
- Defines all Socket.IO event handlers.
- Automatic localhost rewriting: if an Agent Card's "url" is localhost/127.0.0.1,
  we rewrite it to the origin that served the card (same scheme+host+port), then
  probe connectivity. If that fails, we try host.docker.internal:<same-port>.
"""
from __future__ import annotations

import logging
import socket
from typing import Any, Mapping, Tuple
from urllib.parse import urlparse, urlunparse, ParseResult
from uuid import uuid4

import bleach
import httpx

# Socket.IO is optional; create shims when missing
try:
    import socketio  # type: ignore
    HAS_SOCKETIO = True
except Exception:  # pragma: no cover
    socketio = None  # type: ignore
    HAS_SOCKETIO = False

from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from jinja2 import TemplateNotFound

# Conditional import for A2A SDK (optional)
try:
    from a2a.client import A2ACardResolver, A2AClient
    from a2a.types import (
        AgentCard,
        JSONRPCErrorResponse,
        Message,
        MessageSendConfiguration,
        MessageSendParams,
        Role,
        SendMessageRequest,
        SendMessageResponse,
        SendStreamingMessageRequest,
        SendStreamingMessageResponse,
        TextPart,
    )
    HAS_A2A = True
except Exception:
    HAS_A2A = False
    # Dummy stand-ins so type hints won’t explode
    AgentCard = JSONRPCErrorResponse = Message = MessageSendConfiguration = object  # type: ignore
    MessageSendParams = Role = SendMessageRequest = SendMessageResponse = object  # type: ignore
    SendStreamingMessageRequest = SendStreamingMessageResponse = TextPart = object  # type: ignore
    A2ACardResolver = A2AClient = object  # type: ignore

from app import validators  # local validators.py

# ==============================================================================
# Setup
# ==============================================================================
logger = logging.getLogger("uvicorn.error")

if HAS_SOCKETIO:
    sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*")
    socketio_app = socketio.ASGIApp(sio)
else:

    class _SioShim:
        async def emit(self, *a, **k):  # no-op
            pass

        def on(self, *a, **k):
            def _wrap(f):
                return f

            return _wrap

        event = on

    sio = _SioShim()
    socketio_app = None

router = APIRouter(prefix="/validator", tags=["Validator"])
templates = Jinja2Templates(directory="app/templates")

STANDARD_HEADERS = {
    "host",
    "user-agent",
    "accept",
    "content-type",
    "content-length",
    "connection",
    "accept-encoding",
}

LOCAL_HOSTS = {"localhost", "127.0.0.1", "::1", "[::1]"}

# ==============================================================================
# State Management
# ==============================================================================
# sid -> (httpx_client, a2a_client, card, origin_used_for_card_fetch)
clients: dict[str, tuple[httpx.AsyncClient, Any, Any, str]] = {}

# ==============================================================================
# URL helpers / rewriting
# ==============================================================================
def _parse(url: str) -> ParseResult:
    return urlparse(url)


def _build(pr: ParseResult) -> str:
    return urlunparse(pr)


def _origin_of(url: str) -> str:
    """
    Return scheme://netloc for a URL (no path/query/fragment).
    """
    pr = _parse(url)
    return f"{pr.scheme}://{pr.netloc}" if pr.scheme and pr.netloc else ""


def _looks_localhost(host: str | None) -> bool:
    return (host or "").lower() in LOCAL_HOSTS


def _docker_has_host_gateway() -> bool:
    try:
        socket.gethostbyname("host.docker.internal")
        return True
    except Exception:
        return False


def _rewrite_to_origin(card_url: ParseResult, card_origin: ParseResult) -> Tuple[ParseResult, str | None]:
    """
    If the card_url host is localhost/127.0.0.1 and we know the origin where the
    Agent Card was fetched from, rewrite to that origin (same scheme+host[:port]),
    preserving the /path?query.
    """
    if not _looks_localhost(card_url.hostname):
        return card_url, None
    if not (card_origin.scheme and card_origin.netloc):
        return card_url, None

    rewritten = ParseResult(
        scheme=card_origin.scheme,
        netloc=card_origin.netloc,
        path=card_url.path or "",
        params="",
        query=card_url.query or "",
        fragment="",
    )
    return rewritten, "rewritten to Agent Card origin"


def _rewrite_to_gateway(card_url: ParseResult) -> Tuple[ParseResult, str | None]:
    """
    Fallback: rewrite localhost to host.docker.internal:<same-port> if resolvable.
    """
    if not _looks_localhost(card_url.hostname):
        return card_url, None
    if not _docker_has_host_gateway():
        return card_url, None

    port = f":{card_url.port}" if card_url.port else ""
    rewritten = ParseResult(
        scheme=card_url.scheme or "http",
        netloc=f"host.docker.internal{port}",
        path=card_url.path or "",
        params="",
        query=card_url.query or "",
        fragment="",
    )
    return rewritten, "rewritten via host.docker.internal"


async def _probe_reachable(client: httpx.AsyncClient, url: str) -> Tuple[bool, str]:
    """
    Cheap reachability probe.
    - 2xx/3xx reachable
    - 405 counts as reachable (JSON-RPC endpoints often reject GET)
    """
    try:
        r = await client.get(url)
        if r.status_code == 405:
            return True, "reachable (405 on GET is OK for JSON-RPC)"
        if 200 <= r.status_code < 400:
            return True, f"reachable (HTTP {r.status_code})"
        return False, f"HTTP {r.status_code}"
    except httpx.ConnectError as e:
        return False, f"connect error: {e!s}"
    except httpx.RequestError as e:
        return False, f"request error: {e!s}"
    except Exception as e:
        return False, f"unexpected error: {e!s}"


def _card_copy_with_url(card: AgentCard, new_url: str) -> AgentCard:
    try:
        return card.model_copy(update={"url": new_url})  # type: ignore[attr-defined]
    except Exception:
        try:
            card.url = new_url  # type: ignore[attr-defined]
            return card
        except Exception:
            raise


# ==============================================================================
# Debug helpers
# ==============================================================================
async def _emit_debug_log(sid: str, event_id: str, log_type: str, data: Any) -> None:
    await sio.emit("debug_log", {"type": log_type, "data": data, "id": event_id}, to=sid)


async def _process_a2a_response(result: Any, sid: str, request_id: str) -> None:
    if not HAS_A2A:
        return

    if isinstance(result.root, JSONRPCErrorResponse):
        error_data = result.root.error.model_dump(exclude_none=True)
        await _emit_debug_log(sid, request_id, "error", error_data)
        await sio.emit(
            "agent_response",
            {"error": error_data.get("message", "Unknown error"), "id": request_id},
            to=sid,
        )
        return

    event = result.root.result
    response_id = getattr(event, "id", request_id)
    response_data = event.model_dump(exclude_none=True)
    response_data["id"] = response_id
    response_data["validation_errors"] = validators.validate_message(response_data)

    await _emit_debug_log(sid, response_id, "response", response_data)
    await sio.emit("agent_response", response_data, to=sid)


def get_card_resolver(client: httpx.AsyncClient, agent_card_url: str) -> Any:
    if not HAS_A2A:
        return None
    parsed_url = urlparse(agent_card_url)
    base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
    path_with_query = urlunparse(("", "", parsed_url.path, "", parsed_url.query, ""))
    card_path = path_with_query.lstrip("/")
    if card_path:
        return A2ACardResolver(client, base_url, agent_card_path=card_path)
    return A2ACardResolver(client, base_url)


# ==============================================================================
# FastAPI Routes
# ==============================================================================

# Handle both /validator and /validator/
@router.get("", response_class=HTMLResponse, include_in_schema=False)
@router.get("/", response_class=HTMLResponse)
async def validator_ui(request: Request) -> HTMLResponse:
    for name in ("validator.html", "validator.hml"):
        try:
            return templates.TemplateResponse(name, {"request": request})
        except TemplateNotFound:
            continue
    return HTMLResponse("<h3>Validator UI template not found.</h3>", status_code=500)


@router.post("/agent-card")
async def get_agent_card(request: Request) -> JSONResponse:
    """
    Fetch and validate an Agent Card from a URL.

    If A2A SDK is installed, use its resolver.
    Otherwise, be lenient: follow redirects and probe common well-known paths.
    Automatically rewrite localhost URLs in the card to the card's own origin.
    """
    # Parse request body
    try:
        request_data = await request.json()
        user_url = (request_data.get("url") or "").strip()
        sid = request_data.get("sid")
        if not user_url or not sid:
            return JSONResponse({"error": "Agent URL and SID are required."}, status_code=400)
    except Exception:
        return JSONResponse({"error": "Invalid request body."}, status_code=400)

    # Collect custom headers (forwarded to the target)
    custom_headers = {
        name: value
        for name, value in request.headers.items()
        if name.lower() not in STANDARD_HEADERS
    }

    await _emit_debug_log(
        sid,
        "http-agent-card",
        "request",
        {"endpoint": "/agent-card", "payload": request_data, "custom_headers": custom_headers},
    )

    # Fetch the agent card
    try:
        async with httpx.AsyncClient(
            timeout=httpx.Timeout(30.0, connect=10.0),
            headers=custom_headers,
            follow_redirects=True,
            trust_env=True,
        ) as client:
            # We'll remember the ORIGIN we used to reach the card, for rewriting.
            card_fetch_origin = _origin_of(user_url)

            if HAS_A2A:
                resolver = get_card_resolver(client, user_url)
                card = await resolver.get_agent_card()  # type: ignore[assignment]
                card_data = card.model_dump(exclude_none=True)
                # Origin we used is the resolver's base (scheme+host[:port])
                card_fetch_origin = _origin_of(user_url)
            else:
                tried: list[str] = []

                async def _try(url: str) -> dict[str, Any]:
                    r = await client.get(url)
                    r.raise_for_status()
                    ctype = (r.headers.get("content-type") or "").lower()
                    if "application/json" in ctype or ctype.endswith("+json"):
                        return r.json()
                    raise ValueError(f"Non-JSON response (content-type={ctype or 'unknown'}) at {url}")

                # Try the user URL first; otherwise probe well-knowns at that origin
                try:
                    card_data = await _try(user_url)
                except Exception:
                    pr = _parse(user_url)
                    base = f"{pr.scheme}://{pr.netloc}" if pr.scheme and pr.netloc else ""
                    candidates = [
                        user_url,
                        f"{base}/.well-known/agent.json",
                        f"{base}/.well-known/ai-agent.json",
                        f"{base}/agent-card",
                        f"{base}/agent.json",
                    ]
                    last_err: Exception | None = None
                    card_data = None  # type: ignore[assignment]
                    for u in candidates:
                        if u in tried or not u.startswith("http"):
                            continue
                        tried.append(u)
                        try:
                            card_data = await _try(u)
                            card_fetch_origin = _origin_of(u)
                            break
                        except Exception as e:
                            last_err = e
                    if card_data is None:  # type: ignore[truthy-bool]
                        raise RuntimeError(
                            f"Could not find a JSON Agent Card at {user_url} (last error: {last_err})"
                        )

        # Validate locally
        validation_errors = validators.validate_agent_card(card_data)  # type: ignore[arg-type]

        # --- Automatic localhost rewrite of the card's own URL ---
        rewrite_note = None
        resolved_card_url = None
        try:
            card_origin_pr = _parse(card_fetch_origin) if card_fetch_origin else None
            raw_card_url = (card_data.get("url") if isinstance(card_data, Mapping) else None) or ""
            card_url_pr = _parse(raw_card_url)

            if _looks_localhost(card_url_pr.hostname):
                # 1) Prefer rewrite to the origin that served the Agent Card
                if card_origin_pr and (card_origin_pr.scheme and card_origin_pr.netloc):
                    new_pr, note = _rewrite_to_origin(card_url_pr, card_origin_pr)
                    if note:
                        resolved_card_url = _build(new_pr)
                        card_data = {**card_data, "url": resolved_card_url}  # type: ignore[operator]
                        rewrite_note = note
                # 2) Fallback to host.docker.internal if available
                if not resolved_card_url:
                    new_pr, note = _rewrite_to_gateway(card_url_pr)
                    if note:
                        resolved_card_url = _build(new_pr)
                        card_data = {**card_data, "url": resolved_card_url}  # type: ignore[operator]
                        rewrite_note = note
        except Exception as e:
            # Do not fail the card response if rewriting fails
            rewrite_note = f"rewrite failed: {e}"

        response = {
            "card": card_data,
            "validation_errors": validation_errors,
            "resolved_url": (card_data.get("url") if isinstance(card_data, Mapping) else None),  # type: ignore[union-attr]
            "rewrite_note": rewrite_note,
        }
        status = 200

    except httpx.RequestError as e:
        response = {"error": f"Failed to connect to agent: {e}"}
        status = 502
    except Exception as e:
        response = {"error": f"An internal server error occurred: {e}"}
        status = 500

    await _emit_debug_log(sid, "http-agent-card", "response", {"status": status, "payload": response})
    return JSONResponse(content=response, status_code=status)


# ==============================================================================
# Socket.IO Event Handlers
# ==============================================================================
@sio.on("connect")
async def handle_connect(sid: str, environ: dict[str, Any]) -> None:  # type: ignore[misc]
    logger.info(f"Client connected: {sid}")


@sio.on("disconnect")
async def handle_disconnect(sid: str) -> None:  # type: ignore[misc]
    logger.info(f"Client disconnected: {sid}")
    if sid in clients:
        httpx_client, _, _, _ = clients.pop(sid)
        await httpx_client.aclose()
        logger.info(f"Cleaned up client for {sid}")


@sio.on("initialize_client")
async def handle_initialize_client(sid: str, data: dict[str, Any]) -> None:  # type: ignore[misc]
    """
    Prepare an A2A client for chat/streaming.
    Automatically rewrites localhost card URLs to the card origin or host.docker.internal.
    """
    if not HAS_A2A:
        await sio.emit(
            "client_initialized",
            {"status": "warning", "message": "A2A SDK not installed; chat/streaming disabled."},
            to=sid,
        )
        return

    user_url = (data.get("url") or "").strip()
    custom_headers = data.get("customHeaders", {}) or {}
    if not user_url:
        await sio.emit("client_initialized", {"status": "error", "message": "Agent URL is required."}, to=sid)
        return

    httpx_client = httpx.AsyncClient(
        timeout=httpx.Timeout(60.0, connect=15.0),
        headers=custom_headers,
        follow_redirects=True,
        trust_env=True,
    )

    try:
        # Resolve card (this base will be our "origin" candidate)
        resolver = get_card_resolver(httpx_client, user_url)
        card: AgentCard = await resolver.get_agent_card()  # type: ignore[assignment]
        card_fetch_origin = _origin_of(user_url)
        origin_pr = _parse(card_fetch_origin) if card_fetch_origin else None

        # Rewrite card.url if it's localhost to the card origin first
        try:
            card_url_pr = _parse(getattr(card, "url", "") or "")
            if _looks_localhost(card_url_pr.hostname):
                if origin_pr and (origin_pr.scheme and origin_pr.netloc):
                    new_pr, note = _rewrite_to_origin(card_url_pr, origin_pr)
                    if note:
                        new_url = _build(new_pr)
                        card = _card_copy_with_url(card, new_url)
                        await _emit_debug_log(
                            sid,
                            "client-init-rewrite",
                            "info",
                            {"original": card_url_pr.geturl(), "rewritten": new_url, "note": note},
                        )
                # Fallback to host.docker.internal if still localhost
                card_url_pr2 = _parse(getattr(card, "url", "") or "")
                if _looks_localhost(card_url_pr2.hostname):
                    new_pr, note = _rewrite_to_gateway(card_url_pr2)
                    if note:
                        new_url = _build(new_pr)
                        card = _card_copy_with_url(card, new_url)
                        await _emit_debug_log(
                            sid,
                            "client-init-gateway",
                            "info",
                            {"original": card_url_pr2.geturl(), "rewritten": new_url, "note": note},
                        )
        except Exception as e:
            await _emit_debug_log(sid, "client-init-rewrite", "warn", {"error": str(e)})

        # Connectivity probe before enabling chat
        ok, detail = await _probe_reachable(httpx_client, getattr(card, "url", ""))
        if not ok:
            await sio.emit(
                "client_initialized",
                {
                    "status": "error",
                    "message": f"Agent endpoint unreachable: {detail}. "
                               f"Ensure your Agent Card URL points to a network-reachable host.",
                },
                to=sid,
            )
            await httpx_client.aclose()
            return

        # Create A2A client and store
        a2a_client = A2AClient(httpx_client, agent_card=card)
        clients[sid] = (httpx_client, a2a_client, card, card_fetch_origin)
        await sio.emit("client_initialized", {"status": "success"}, to=sid)

    except Exception as e:
        await httpx_client.aclose()
        await sio.emit("client_initialized", {"status": "error", "message": str(e)}, to=sid)


@sio.on("send_message")
async def handle_send_message(sid: str, json_data: dict[str, Any]) -> None:  # type: ignore[misc]
    if not HAS_A2A:
        await sio.emit("agent_response", {"error": "A2A SDK not installed", "id": json_data.get("id")}, to=sid)
        return

    message_text = bleach.clean(json_data.get("message", ""))
    message_id = json_data.get("id", str(uuid4()))
    context_id = json_data.get("contextId")
    metadata = json_data.get("metadata", {})

    if sid not in clients:
        await sio.emit("agent_response", {"error": "Client not initialized.", "id": message_id}, to=sid)
        return

    httpx_client, a2a_client, card, _origin = clients[sid]

    message = Message(
        role=Role.user,
        parts=[TextPart(text=str(message_text))],  # type: ignore[list-item]
        message_id=message_id,
        context_id=context_id,
        metadata=metadata,
    )
    payload = MessageSendParams(
        message=message,
        configuration=MessageSendConfiguration(accepted_output_modes=["text/plain", "video/mp4"]),
    )
    supports_streaming = hasattr(card.capabilities, "streaming") and getattr(card.capabilities, "streaming") is True

    try:
        if supports_streaming:
            stream_request = SendStreamingMessageRequest(
                id=message_id, method="message/stream", jsonrpc="2.0", params=payload
            )
            await _emit_debug_log(sid, message_id, "request", stream_request.model_dump(exclude_none=True))
            response_stream = a2a_client.send_message_streaming(stream_request)
            async for stream_result in response_stream:
                await _process_a2a_response(stream_result, sid, message_id)
        else:
            send_message_request = SendMessageRequest(
                id=message_id, method="message/send", jsonrpc="2.0", params=payload
            )
            await _emit_debug_log(sid, message_id, "request", send_message_request.model_dump(exclude_none=True))
            send_result = await a2a_client.send_message(send_message_request)
            await _process_a2a_response(send_result, sid, message_id)
    except Exception as e:
        await sio.emit("agent_response", {"error": f"Failed to send message: {e}", "id": message_id}, to=sid)


__all__ = ["router", "socketio_app", "HAS_SOCKETIO"]