Spaces:
Sleeping
Sleeping
| # app/services/validator_service.py | |
| """ | |
| A2A Validator service. | |
| - Provides /validator (UI) + /validator/agent-card (HTTP) routes. | |
| - Defines all Socket.IO event handlers. | |
| - Automatic localhost rewriting: if an Agent Card's "url" is localhost/127.0.0.1, | |
| we rewrite it to the origin that served the card (same scheme+host+port), then | |
| probe connectivity. If that fails, we try host.docker.internal:<same-port>. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import socket | |
| from typing import Any, Mapping, Tuple | |
| from urllib.parse import urlparse, urlunparse, ParseResult | |
| from uuid import uuid4 | |
| import bleach | |
| import httpx | |
| # Socket.IO is optional; create shims when missing | |
| try: | |
| import socketio # type: ignore | |
| HAS_SOCKETIO = True | |
| except Exception: # pragma: no cover | |
| socketio = None # type: ignore | |
| HAS_SOCKETIO = False | |
| from fastapi import APIRouter, Request | |
| from fastapi.responses import HTMLResponse, JSONResponse | |
| from fastapi.templating import Jinja2Templates | |
| from jinja2 import TemplateNotFound | |
| # Conditional import for A2A SDK (optional) | |
| try: | |
| from a2a.client import A2ACardResolver, A2AClient | |
| from a2a.types import ( | |
| AgentCard, | |
| JSONRPCErrorResponse, | |
| Message, | |
| MessageSendConfiguration, | |
| MessageSendParams, | |
| Role, | |
| SendMessageRequest, | |
| SendMessageResponse, | |
| SendStreamingMessageRequest, | |
| SendStreamingMessageResponse, | |
| TextPart, | |
| ) | |
| HAS_A2A = True | |
| except Exception: | |
| HAS_A2A = False | |
| # Dummy stand-ins so type hints won’t explode | |
| AgentCard = JSONRPCErrorResponse = Message = MessageSendConfiguration = object # type: ignore | |
| MessageSendParams = Role = SendMessageRequest = SendMessageResponse = object # type: ignore | |
| SendStreamingMessageRequest = SendStreamingMessageResponse = TextPart = object # type: ignore | |
| A2ACardResolver = A2AClient = object # type: ignore | |
| from app import validators # local validators.py | |
| # ============================================================================== | |
| # Setup | |
| # ============================================================================== | |
| logger = logging.getLogger("uvicorn.error") | |
| if HAS_SOCKETIO: | |
| sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*") | |
| socketio_app = socketio.ASGIApp(sio) | |
| else: | |
| class _SioShim: | |
| async def emit(self, *a, **k): # no-op | |
| pass | |
| def on(self, *a, **k): | |
| def _wrap(f): | |
| return f | |
| return _wrap | |
| event = on | |
| sio = _SioShim() | |
| socketio_app = None | |
| router = APIRouter(prefix="/validator", tags=["Validator"]) | |
| templates = Jinja2Templates(directory="app/templates") | |
| STANDARD_HEADERS = { | |
| "host", | |
| "user-agent", | |
| "accept", | |
| "content-type", | |
| "content-length", | |
| "connection", | |
| "accept-encoding", | |
| } | |
| LOCAL_HOSTS = {"localhost", "127.0.0.1", "::1", "[::1]"} | |
| # ============================================================================== | |
| # State Management | |
| # ============================================================================== | |
| # sid -> (httpx_client, a2a_client, card, origin_used_for_card_fetch) | |
| clients: dict[str, tuple[httpx.AsyncClient, Any, Any, str]] = {} | |
| # ============================================================================== | |
| # URL helpers / rewriting | |
| # ============================================================================== | |
| def _parse(url: str) -> ParseResult: | |
| return urlparse(url) | |
| def _build(pr: ParseResult) -> str: | |
| return urlunparse(pr) | |
| def _origin_of(url: str) -> str: | |
| """ | |
| Return scheme://netloc for a URL (no path/query/fragment). | |
| """ | |
| pr = _parse(url) | |
| return f"{pr.scheme}://{pr.netloc}" if pr.scheme and pr.netloc else "" | |
| def _looks_localhost(host: str | None) -> bool: | |
| return (host or "").lower() in LOCAL_HOSTS | |
| def _docker_has_host_gateway() -> bool: | |
| try: | |
| socket.gethostbyname("host.docker.internal") | |
| return True | |
| except Exception: | |
| return False | |
| def _rewrite_to_origin(card_url: ParseResult, card_origin: ParseResult) -> Tuple[ParseResult, str | None]: | |
| """ | |
| If the card_url host is localhost/127.0.0.1 and we know the origin where the | |
| Agent Card was fetched from, rewrite to that origin (same scheme+host[:port]), | |
| preserving the /path?query. | |
| """ | |
| if not _looks_localhost(card_url.hostname): | |
| return card_url, None | |
| if not (card_origin.scheme and card_origin.netloc): | |
| return card_url, None | |
| rewritten = ParseResult( | |
| scheme=card_origin.scheme, | |
| netloc=card_origin.netloc, | |
| path=card_url.path or "", | |
| params="", | |
| query=card_url.query or "", | |
| fragment="", | |
| ) | |
| return rewritten, "rewritten to Agent Card origin" | |
| def _rewrite_to_gateway(card_url: ParseResult) -> Tuple[ParseResult, str | None]: | |
| """ | |
| Fallback: rewrite localhost to host.docker.internal:<same-port> if resolvable. | |
| """ | |
| if not _looks_localhost(card_url.hostname): | |
| return card_url, None | |
| if not _docker_has_host_gateway(): | |
| return card_url, None | |
| port = f":{card_url.port}" if card_url.port else "" | |
| rewritten = ParseResult( | |
| scheme=card_url.scheme or "http", | |
| netloc=f"host.docker.internal{port}", | |
| path=card_url.path or "", | |
| params="", | |
| query=card_url.query or "", | |
| fragment="", | |
| ) | |
| return rewritten, "rewritten via host.docker.internal" | |
| async def _probe_reachable(client: httpx.AsyncClient, url: str) -> Tuple[bool, str]: | |
| """ | |
| Cheap reachability probe. | |
| - 2xx/3xx reachable | |
| - 405 counts as reachable (JSON-RPC endpoints often reject GET) | |
| """ | |
| try: | |
| r = await client.get(url) | |
| if r.status_code == 405: | |
| return True, "reachable (405 on GET is OK for JSON-RPC)" | |
| if 200 <= r.status_code < 400: | |
| return True, f"reachable (HTTP {r.status_code})" | |
| return False, f"HTTP {r.status_code}" | |
| except httpx.ConnectError as e: | |
| return False, f"connect error: {e!s}" | |
| except httpx.RequestError as e: | |
| return False, f"request error: {e!s}" | |
| except Exception as e: | |
| return False, f"unexpected error: {e!s}" | |
| def _card_copy_with_url(card: AgentCard, new_url: str) -> AgentCard: | |
| try: | |
| return card.model_copy(update={"url": new_url}) # type: ignore[attr-defined] | |
| except Exception: | |
| try: | |
| card.url = new_url # type: ignore[attr-defined] | |
| return card | |
| except Exception: | |
| raise | |
| # ============================================================================== | |
| # Debug helpers | |
| # ============================================================================== | |
| async def _emit_debug_log(sid: str, event_id: str, log_type: str, data: Any) -> None: | |
| await sio.emit("debug_log", {"type": log_type, "data": data, "id": event_id}, to=sid) | |
| async def _process_a2a_response(result: Any, sid: str, request_id: str) -> None: | |
| if not HAS_A2A: | |
| return | |
| if isinstance(result.root, JSONRPCErrorResponse): | |
| error_data = result.root.error.model_dump(exclude_none=True) | |
| await _emit_debug_log(sid, request_id, "error", error_data) | |
| await sio.emit( | |
| "agent_response", | |
| {"error": error_data.get("message", "Unknown error"), "id": request_id}, | |
| to=sid, | |
| ) | |
| return | |
| event = result.root.result | |
| response_id = getattr(event, "id", request_id) | |
| response_data = event.model_dump(exclude_none=True) | |
| response_data["id"] = response_id | |
| response_data["validation_errors"] = validators.validate_message(response_data) | |
| await _emit_debug_log(sid, response_id, "response", response_data) | |
| await sio.emit("agent_response", response_data, to=sid) | |
| def get_card_resolver(client: httpx.AsyncClient, agent_card_url: str) -> Any: | |
| if not HAS_A2A: | |
| return None | |
| parsed_url = urlparse(agent_card_url) | |
| base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" | |
| path_with_query = urlunparse(("", "", parsed_url.path, "", parsed_url.query, "")) | |
| card_path = path_with_query.lstrip("/") | |
| if card_path: | |
| return A2ACardResolver(client, base_url, agent_card_path=card_path) | |
| return A2ACardResolver(client, base_url) | |
| # ============================================================================== | |
| # FastAPI Routes | |
| # ============================================================================== | |
| # Handle both /validator and /validator/ | |
| async def validator_ui(request: Request) -> HTMLResponse: | |
| for name in ("validator.html", "validator.hml"): | |
| try: | |
| return templates.TemplateResponse(name, {"request": request}) | |
| except TemplateNotFound: | |
| continue | |
| return HTMLResponse("<h3>Validator UI template not found.</h3>", status_code=500) | |
| async def get_agent_card(request: Request) -> JSONResponse: | |
| """ | |
| Fetch and validate an Agent Card from a URL. | |
| If A2A SDK is installed, use its resolver. | |
| Otherwise, be lenient: follow redirects and probe common well-known paths. | |
| Automatically rewrite localhost URLs in the card to the card's own origin. | |
| """ | |
| # Parse request body | |
| try: | |
| request_data = await request.json() | |
| user_url = (request_data.get("url") or "").strip() | |
| sid = request_data.get("sid") | |
| if not user_url or not sid: | |
| return JSONResponse({"error": "Agent URL and SID are required."}, status_code=400) | |
| except Exception: | |
| return JSONResponse({"error": "Invalid request body."}, status_code=400) | |
| # Collect custom headers (forwarded to the target) | |
| custom_headers = { | |
| name: value | |
| for name, value in request.headers.items() | |
| if name.lower() not in STANDARD_HEADERS | |
| } | |
| await _emit_debug_log( | |
| sid, | |
| "http-agent-card", | |
| "request", | |
| {"endpoint": "/agent-card", "payload": request_data, "custom_headers": custom_headers}, | |
| ) | |
| # Fetch the agent card | |
| try: | |
| async with httpx.AsyncClient( | |
| timeout=httpx.Timeout(30.0, connect=10.0), | |
| headers=custom_headers, | |
| follow_redirects=True, | |
| trust_env=True, | |
| ) as client: | |
| # We'll remember the ORIGIN we used to reach the card, for rewriting. | |
| card_fetch_origin = _origin_of(user_url) | |
| if HAS_A2A: | |
| resolver = get_card_resolver(client, user_url) | |
| card = await resolver.get_agent_card() # type: ignore[assignment] | |
| card_data = card.model_dump(exclude_none=True) | |
| # Origin we used is the resolver's base (scheme+host[:port]) | |
| card_fetch_origin = _origin_of(user_url) | |
| else: | |
| tried: list[str] = [] | |
| async def _try(url: str) -> dict[str, Any]: | |
| r = await client.get(url) | |
| r.raise_for_status() | |
| ctype = (r.headers.get("content-type") or "").lower() | |
| if "application/json" in ctype or ctype.endswith("+json"): | |
| return r.json() | |
| raise ValueError(f"Non-JSON response (content-type={ctype or 'unknown'}) at {url}") | |
| # Try the user URL first; otherwise probe well-knowns at that origin | |
| try: | |
| card_data = await _try(user_url) | |
| except Exception: | |
| pr = _parse(user_url) | |
| base = f"{pr.scheme}://{pr.netloc}" if pr.scheme and pr.netloc else "" | |
| candidates = [ | |
| user_url, | |
| f"{base}/.well-known/agent.json", | |
| f"{base}/.well-known/ai-agent.json", | |
| f"{base}/agent-card", | |
| f"{base}/agent.json", | |
| ] | |
| last_err: Exception | None = None | |
| card_data = None # type: ignore[assignment] | |
| for u in candidates: | |
| if u in tried or not u.startswith("http"): | |
| continue | |
| tried.append(u) | |
| try: | |
| card_data = await _try(u) | |
| card_fetch_origin = _origin_of(u) | |
| break | |
| except Exception as e: | |
| last_err = e | |
| if card_data is None: # type: ignore[truthy-bool] | |
| raise RuntimeError( | |
| f"Could not find a JSON Agent Card at {user_url} (last error: {last_err})" | |
| ) | |
| # Validate locally | |
| validation_errors = validators.validate_agent_card(card_data) # type: ignore[arg-type] | |
| # --- Automatic localhost rewrite of the card's own URL --- | |
| rewrite_note = None | |
| resolved_card_url = None | |
| try: | |
| card_origin_pr = _parse(card_fetch_origin) if card_fetch_origin else None | |
| raw_card_url = (card_data.get("url") if isinstance(card_data, Mapping) else None) or "" | |
| card_url_pr = _parse(raw_card_url) | |
| if _looks_localhost(card_url_pr.hostname): | |
| # 1) Prefer rewrite to the origin that served the Agent Card | |
| if card_origin_pr and (card_origin_pr.scheme and card_origin_pr.netloc): | |
| new_pr, note = _rewrite_to_origin(card_url_pr, card_origin_pr) | |
| if note: | |
| resolved_card_url = _build(new_pr) | |
| card_data = {**card_data, "url": resolved_card_url} # type: ignore[operator] | |
| rewrite_note = note | |
| # 2) Fallback to host.docker.internal if available | |
| if not resolved_card_url: | |
| new_pr, note = _rewrite_to_gateway(card_url_pr) | |
| if note: | |
| resolved_card_url = _build(new_pr) | |
| card_data = {**card_data, "url": resolved_card_url} # type: ignore[operator] | |
| rewrite_note = note | |
| except Exception as e: | |
| # Do not fail the card response if rewriting fails | |
| rewrite_note = f"rewrite failed: {e}" | |
| response = { | |
| "card": card_data, | |
| "validation_errors": validation_errors, | |
| "resolved_url": (card_data.get("url") if isinstance(card_data, Mapping) else None), # type: ignore[union-attr] | |
| "rewrite_note": rewrite_note, | |
| } | |
| status = 200 | |
| except httpx.RequestError as e: | |
| response = {"error": f"Failed to connect to agent: {e}"} | |
| status = 502 | |
| except Exception as e: | |
| response = {"error": f"An internal server error occurred: {e}"} | |
| status = 500 | |
| await _emit_debug_log(sid, "http-agent-card", "response", {"status": status, "payload": response}) | |
| return JSONResponse(content=response, status_code=status) | |
| # ============================================================================== | |
| # Socket.IO Event Handlers | |
| # ============================================================================== | |
| async def handle_connect(sid: str, environ: dict[str, Any]) -> None: # type: ignore[misc] | |
| logger.info(f"Client connected: {sid}") | |
| async def handle_disconnect(sid: str) -> None: # type: ignore[misc] | |
| logger.info(f"Client disconnected: {sid}") | |
| if sid in clients: | |
| httpx_client, _, _, _ = clients.pop(sid) | |
| await httpx_client.aclose() | |
| logger.info(f"Cleaned up client for {sid}") | |
| async def handle_initialize_client(sid: str, data: dict[str, Any]) -> None: # type: ignore[misc] | |
| """ | |
| Prepare an A2A client for chat/streaming. | |
| Automatically rewrites localhost card URLs to the card origin or host.docker.internal. | |
| """ | |
| if not HAS_A2A: | |
| await sio.emit( | |
| "client_initialized", | |
| {"status": "warning", "message": "A2A SDK not installed; chat/streaming disabled."}, | |
| to=sid, | |
| ) | |
| return | |
| user_url = (data.get("url") or "").strip() | |
| custom_headers = data.get("customHeaders", {}) or {} | |
| if not user_url: | |
| await sio.emit("client_initialized", {"status": "error", "message": "Agent URL is required."}, to=sid) | |
| return | |
| httpx_client = httpx.AsyncClient( | |
| timeout=httpx.Timeout(60.0, connect=15.0), | |
| headers=custom_headers, | |
| follow_redirects=True, | |
| trust_env=True, | |
| ) | |
| try: | |
| # Resolve card (this base will be our "origin" candidate) | |
| resolver = get_card_resolver(httpx_client, user_url) | |
| card: AgentCard = await resolver.get_agent_card() # type: ignore[assignment] | |
| card_fetch_origin = _origin_of(user_url) | |
| origin_pr = _parse(card_fetch_origin) if card_fetch_origin else None | |
| # Rewrite card.url if it's localhost to the card origin first | |
| try: | |
| card_url_pr = _parse(getattr(card, "url", "") or "") | |
| if _looks_localhost(card_url_pr.hostname): | |
| if origin_pr and (origin_pr.scheme and origin_pr.netloc): | |
| new_pr, note = _rewrite_to_origin(card_url_pr, origin_pr) | |
| if note: | |
| new_url = _build(new_pr) | |
| card = _card_copy_with_url(card, new_url) | |
| await _emit_debug_log( | |
| sid, | |
| "client-init-rewrite", | |
| "info", | |
| {"original": card_url_pr.geturl(), "rewritten": new_url, "note": note}, | |
| ) | |
| # Fallback to host.docker.internal if still localhost | |
| card_url_pr2 = _parse(getattr(card, "url", "") or "") | |
| if _looks_localhost(card_url_pr2.hostname): | |
| new_pr, note = _rewrite_to_gateway(card_url_pr2) | |
| if note: | |
| new_url = _build(new_pr) | |
| card = _card_copy_with_url(card, new_url) | |
| await _emit_debug_log( | |
| sid, | |
| "client-init-gateway", | |
| "info", | |
| {"original": card_url_pr2.geturl(), "rewritten": new_url, "note": note}, | |
| ) | |
| except Exception as e: | |
| await _emit_debug_log(sid, "client-init-rewrite", "warn", {"error": str(e)}) | |
| # Connectivity probe before enabling chat | |
| ok, detail = await _probe_reachable(httpx_client, getattr(card, "url", "")) | |
| if not ok: | |
| await sio.emit( | |
| "client_initialized", | |
| { | |
| "status": "error", | |
| "message": f"Agent endpoint unreachable: {detail}. " | |
| f"Ensure your Agent Card URL points to a network-reachable host.", | |
| }, | |
| to=sid, | |
| ) | |
| await httpx_client.aclose() | |
| return | |
| # Create A2A client and store | |
| a2a_client = A2AClient(httpx_client, agent_card=card) | |
| clients[sid] = (httpx_client, a2a_client, card, card_fetch_origin) | |
| await sio.emit("client_initialized", {"status": "success"}, to=sid) | |
| except Exception as e: | |
| await httpx_client.aclose() | |
| await sio.emit("client_initialized", {"status": "error", "message": str(e)}, to=sid) | |
| async def handle_send_message(sid: str, json_data: dict[str, Any]) -> None: # type: ignore[misc] | |
| if not HAS_A2A: | |
| await sio.emit("agent_response", {"error": "A2A SDK not installed", "id": json_data.get("id")}, to=sid) | |
| return | |
| message_text = bleach.clean(json_data.get("message", "")) | |
| message_id = json_data.get("id", str(uuid4())) | |
| context_id = json_data.get("contextId") | |
| metadata = json_data.get("metadata", {}) | |
| if sid not in clients: | |
| await sio.emit("agent_response", {"error": "Client not initialized.", "id": message_id}, to=sid) | |
| return | |
| httpx_client, a2a_client, card, _origin = clients[sid] | |
| message = Message( | |
| role=Role.user, | |
| parts=[TextPart(text=str(message_text))], # type: ignore[list-item] | |
| message_id=message_id, | |
| context_id=context_id, | |
| metadata=metadata, | |
| ) | |
| payload = MessageSendParams( | |
| message=message, | |
| configuration=MessageSendConfiguration(accepted_output_modes=["text/plain", "video/mp4"]), | |
| ) | |
| supports_streaming = hasattr(card.capabilities, "streaming") and getattr(card.capabilities, "streaming") is True | |
| try: | |
| if supports_streaming: | |
| stream_request = SendStreamingMessageRequest( | |
| id=message_id, method="message/stream", jsonrpc="2.0", params=payload | |
| ) | |
| await _emit_debug_log(sid, message_id, "request", stream_request.model_dump(exclude_none=True)) | |
| response_stream = a2a_client.send_message_streaming(stream_request) | |
| async for stream_result in response_stream: | |
| await _process_a2a_response(stream_result, sid, message_id) | |
| else: | |
| send_message_request = SendMessageRequest( | |
| id=message_id, method="message/send", jsonrpc="2.0", params=payload | |
| ) | |
| await _emit_debug_log(sid, message_id, "request", send_message_request.model_dump(exclude_none=True)) | |
| send_result = await a2a_client.send_message(send_message_request) | |
| await _process_a2a_response(send_result, sid, message_id) | |
| except Exception as e: | |
| await sio.emit("agent_response", {"error": f"Failed to send message: {e}", "id": message_id}, to=sid) | |
| __all__ = ["router", "socketio_app", "HAS_SOCKETIO"] | |