File size: 7,795 Bytes
8d60e33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# app/validators.py
from __future__ import annotations

import re
from typing import Any, Iterable, Mapping, Sequence
from urllib.parse import urlparse


# -----------------------------
# Helpers
# -----------------------------

_SEMVER_RE = re.compile(r"^\d+\.\d+\.\d+(?:[-+][0-9A-Za-z.\-]+)?$")


def _is_non_empty_str(val: Any) -> bool:
    return isinstance(val, str) and val.strip() != ""


def _is_list_of_str(val: Any) -> bool:
    return isinstance(val, list) and all(isinstance(x, str) for x in val)


def _as_mapping(val: Any) -> Mapping[str, Any] | None:
    return val if isinstance(val, Mapping) else None


def _as_sequence(val: Any) -> Sequence[Any] | None:
    return val if isinstance(val, Sequence) and not isinstance(val, (str, bytes)) else None


# -----------------------------
# Agent Card Validation
# -----------------------------

_REQUIRED_AGENT_CARD_FIELDS = frozenset(
    [
        "name",
        "description",
        "url",
        "version",
        "capabilities",
        "defaultInputModes",
        "defaultOutputModes",
        "skills",
    ]
)


def validate_agent_card(card_data: dict[str, Any]) -> list[str]:
    """
    Validate the structure and fields of an agent card.

    Contract (non-exhaustive, pragmatic checks):
      - Required top-level fields must exist.
      - url must be absolute (http/https) with a host.
      - version should be semver-like (e.g., 1.2.3 or 1.2.3-alpha).
      - capabilities must be an object/dict.
      - defaultInputModes/defaultOutputModes must be non-empty arrays of strings.
      - skills must be a non-empty array (objects or strings permitted); if objects, "name" should be string.

    Returns:
        A list of human-readable error strings. Empty list means "looks valid".
    """
    errors: list[str] = []
    data = card_data or {}

    # Presence of required fields
    for field in _REQUIRED_AGENT_CARD_FIELDS:
        if field not in data:
            errors.append(f"Required field is missing: '{field}'.")

    # Type/format checks (guard with `in` to avoid KeyErrors)
    # name
    if "name" in data and not _is_non_empty_str(data["name"]):
        errors.append("Field 'name' must be a non-empty string.")

    # description
    if "description" in data and not _is_non_empty_str(data["description"]):
        errors.append("Field 'description' must be a non-empty string.")

    # url
    if "url" in data:
        url_val = data["url"]
        if not _is_non_empty_str(url_val):
            errors.append("Field 'url' must be a non-empty string.")
        else:
            parsed = urlparse(url_val)
            if parsed.scheme not in {"http", "https"} or not parsed.netloc:
                errors.append(
                    "Field 'url' must be an absolute URL with http(s) scheme and host."
                )

    # version (soft semver check; adjust if your ecosystem allows non-semver)
    if "version" in data:
        ver = data["version"]
        if not _is_non_empty_str(ver):
            errors.append("Field 'version' must be a non-empty string.")
        elif not _SEMVER_RE.match(ver):
            errors.append(
                "Field 'version' should be semver-like (e.g., '1.2.3' or '1.2.3-alpha')."
            )

    # capabilities
    if "capabilities" in data:
        if not isinstance(data["capabilities"], dict):
            errors.append("Field 'capabilities' must be an object.")
        else:
            # Optional: sanity checks for common capability fields
            caps = data["capabilities"]
            if "streaming" in caps and not isinstance(caps["streaming"], bool):
                errors.append("Field 'capabilities.streaming' must be a boolean if present.")

    # defaultInputModes / defaultOutputModes
    for field in ("defaultInputModes", "defaultOutputModes"):
        if field in data:
            modes = data[field]
            if not _is_list_of_str(modes):
                errors.append(f"Field '{field}' must be an array of strings.")
            elif len(modes) == 0:
                errors.append(f"Field '{field}' must not be empty.")

    # skills
    if "skills" in data:
        skills = _as_sequence(data["skills"])
        if skills is None:
            errors.append("Field 'skills' must be an array.")
        elif len(skills) == 0:
            errors.append(
                "Field 'skills' must not be empty. Agent must have at least one skill if it performs actions."
            )
        else:
            # If entries are objects, check they have a name
            for i, s in enumerate(skills):
                if isinstance(s, Mapping):
                    if not _is_non_empty_str(s.get("name")):
                        errors.append(f"skills[{i}].name is required and must be a non-empty string.")
                elif not isinstance(s, str):
                    errors.append(
                        f"skills[{i}] must be either an object with 'name' or a string; found: {type(s).__name__}"
                    )

    return errors


# -----------------------------
# Agent Message/Event Validation
# -----------------------------

def _validate_task(data: dict[str, Any]) -> list[str]:
    errors: list[str] = []
    if "id" not in data:
        errors.append("Task object missing required field: 'id'.")
    status = _as_mapping(data.get("status"))
    if status is None or "state" not in status:
        errors.append("Task object missing required field: 'status.state'.")
    return errors


def _validate_status_update(data: dict[str, Any]) -> list[str]:
    errors: list[str] = []
    status = _as_mapping(data.get("status"))
    if status is None or "state" not in status:
        errors.append("StatusUpdate object missing required field: 'status.state'.")
    return errors


def _validate_artifact_update(data: dict[str, Any]) -> list[str]:
    errors: list[str] = []
    artifact = _as_mapping(data.get("artifact"))
    if artifact is None:
        errors.append("ArtifactUpdate object missing required field: 'artifact'.")
        return errors

    parts = artifact.get("parts")
    if not isinstance(parts, list) or len(parts) == 0:
        errors.append("Artifact object must have a non-empty 'parts' array.")
    return errors


def _validate_message(data: dict[str, Any]) -> list[str]:
    errors: list[str] = []
    parts = data.get("parts")
    if not isinstance(parts, list) or len(parts) == 0:
        errors.append("Message object must have a non-empty 'parts' array.")
    role = data.get("role")
    if role != "agent":
        errors.append("Message from agent must have 'role' set to 'agent'.")
    # Optional: check text presence in at least one part if parts are objects
    # (Leave relaxed to avoid false negatives if parts are other media-types)
    return errors


_KIND_VALIDATORS: dict[str, callable[[dict[str, Any]], list[str]]] = {
    "task": _validate_task,
    "status-update": _validate_status_update,
    "artifact-update": _validate_artifact_update,
    "message": _validate_message,
}


def validate_message(data: dict[str, Any]) -> list[str]:
    """
    Validate an incoming event/message coming from the agent according to its 'kind'.

    Expected kinds: 'task', 'status-update', 'artifact-update', 'message'
    Returns:
        A list of human-readable error strings. Empty list means "looks valid".
    """
    if not isinstance(data, Mapping):
        return ["Response from agent must be an object."]
    if "kind" not in data:
        return ["Response from agent is missing required 'kind' field."]

    kind = str(data.get("kind"))
    validator = _KIND_VALIDATORS.get(kind)
    if validator:
        return validator(dict(data))

    return [f"Unknown message kind received: '{kind}'."]


__all__ = [
    "validate_agent_card",
    "validate_message",
]