···
+
AT Protocol Lexicon Type Models
+
Combined implementation of all AT Protocol Lexicon data types including:
+
- Primitive types (boolean, integer, string, null)
+
- Complex types (array, object, params)
+
- Reference types (ref, union, token)
+
- Special types (record, query, procedure, subscription)
+
- Binary types (bytes, CID links)
+
from datetime import datetime
+
from pydantic import field_validator, field_serializer
+
from cid.cid import CIDv1, make_cid
+
from .base import DataModel
+
from .exceptions import ValidationError, SerializationError, InvalidCIDError
+
class BooleanModel(DataModel):
+
"""Model for AT Protocol boolean type."""
+
default: bool | None = None
+
"""Default value if not provided"""
+
const: bool | None = None
+
"""Fixed constant value if specified"""
+
def __init__(self, **data: Any) -> None:
+
Initialize boolean model with validation.
+
**data: Input data containing boolean value
+
ValueError: If value doesn't match const or is not boolean
+
super().__init__(**data)
+
if self.const is not None and self.value != self.const:
+
raise ValueError(f"Boolean value must be {self.const}")
+
@field_validator("value", mode="before")
+
def validateBoolean(cls, v: Any) -> bool:
+
Validate and convert input to boolean.
+
Validated boolean value
+
ValueError: If value cannot be converted to boolean
+
if isinstance(v, bool):
+
if v.lower() in ("true", "1"):
+
if v.lower() in ("false", "0"):
+
raise ValueError("Value must be a boolean")
+
class IntegerModel(DataModel):
+
"""Model for AT Protocol integer type."""
+
minimum: int | None = None
+
"""Minimum acceptable value"""
+
maximum: int | None = None
+
"""Maximum acceptable value"""
+
enum: list[int] | None = None
+
"""Closed set of allowed values"""
+
default: int | None = None
+
"""Default value if not provided"""
+
const: int | None = None
+
"""Fixed constant value if specified"""
+
def __init__(self, **data: Any) -> None:
+
Initialize integer model with validation.
+
**data: Input data containing integer value
+
ValueError: If value violates constraints
+
super().__init__(**data)
+
if self.const is not None and self.value != self.const:
+
raise ValueError(f"Integer value must be {self.const}")
+
@field_validator("value", mode="before")
+
def validateInteger(cls, v: Any) -> int:
+
Validate and convert input to integer.
+
Validated integer value
+
ValueError: If value violates constraints
+
if not isinstance(v, int):
+
except (TypeError, ValueError):
+
raise ValueError("Value must be an integer")
+
if cls.enum and v not in cls.enum:
+
raise ValueError(f"Value must be one of {cls.enum}")
+
if cls.minimum is not None and v < cls.minimum:
+
raise ValueError(f"Value must be >= {cls.minimum}")
+
if cls.maximum is not None and v > cls.maximum:
+
raise ValueError(f"Value must be <= {cls.maximum}")
+
# String Types (from string.py)
+
class StringModel(DataModel):
+
"""Model for AT Protocol string type."""
+
format: str | None = None
+
"""String format restriction"""
+
maxLength: int | None = None
+
"""Maximum length in UTF-8 bytes"""
+
minLength: int | None = None
+
"""Minimum length in UTF-8 bytes"""
+
knownValues: list[str] | None = None
+
"""Suggested/common values"""
+
enum: list[str] | None = None
+
"""Closed set of allowed values"""
+
default: str | None = None
+
"""Default value if not provided"""
+
const: str | None = None
+
"""Fixed constant value if specified"""
+
def __init__(self, **data: Any) -> None:
+
Initialize string model with validation.
+
**data: Input data containing string value
+
ValueError: If value violates constraints
+
super().__init__(**data)
+
if self.const is not None and self.value != self.const:
+
raise ValueError(f"String value must be {self.const}")
+
@field_validator("value", mode="before")
+
def validateString(cls, v: Any) -> str:
+
Validate and convert input to string.
+
ValueError: If value violates constraints
+
if not isinstance(v, str):
+
if cls.minLength is not None and len(v.encode()) < cls.minLength:
+
raise ValueError(f"String must be at least {cls.minLength} bytes")
+
if cls.maxLength is not None and len(v.encode()) > cls.maxLength:
+
raise ValueError(f"String must be at most {cls.maxLength} bytes")
+
if cls.enum and v not in cls.enum:
+
raise ValueError(f"Value must be one of {cls.enum}")
+
def _validateFormat(cls, v: str) -> None:
+
"""Validate string format based on specified format type."""
+
if cls.format == "datetime":
+
cls._validateDatetime(v)
+
elif cls.format == "uri":
+
elif cls.format == "did":
+
elif cls.format == "handle":
+
elif cls.format == "at-identifier":
+
cls._validateAtIdentifier(v)
+
elif cls.format == "at-uri":
+
elif cls.format == "cid":
+
elif cls.format == "nsid":
+
elif cls.format == "tid":
+
elif cls.format == "record-key":
+
cls._validateRecordKey(v)
+
elif cls.format == "language":
+
cls._validateLanguage(v)
+
def _validateDid(cls, v: str) -> None:
+
"""Validate DID format"""
+
raise ValueError("DID too long")
+
if not re.match(r"^did:[a-z]+:[a-zA-Z0-9._:%-]*[a-zA-Z0-9._-]$", v):
+
raise ValueError("Invalid DID format")
+
def _validateHandle(cls, v: str) -> None:
+
"""Validate handle format"""
+
r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$",
+
raise ValueError("Handle contains invalid characters")
+
raise ValueError("Handle too long")
+
def _validateAtIdentifier(cls, v: str) -> None:
+
"""Validate at-identifier format (DID or handle)"""
+
if v.startswith("did:"):
+
except ValueError as e:
+
raise ValueError(f"Invalid at-identifier: {e}")
+
def _validateAtUri(cls, v: str) -> None:
+
"""Validate AT-URI format"""
+
if not v.startswith("at://"):
+
raise ValueError("AT-URI must start with 'at://'")
+
raise ValueError("AT-URI too long")
+
raise ValueError("AT-URI cannot have trailing slash")
+
parts = v[5:].split("/")
+
raise ValueError("AT-URI must have authority")
+
if authority.startswith("did:"):
+
if len(authority) > 2048:
+
raise ValueError("DID too long")
+
if ":" not in authority[4:]:
+
raise ValueError("Invalid DID format")
+
if not re.match(r"^[a-z0-9.-]+$", authority):
+
raise ValueError("Invalid handle characters")
+
if len(authority) > 253:
+
raise ValueError("Handle too long")
+
raise ValueError("AT-URI path too deep")
+
if not re.match(r"^[a-zA-Z0-9.-]+$", collection):
+
raise ValueError("Invalid collection NSID")
+
raise ValueError("Record key cannot be empty")
+
if not re.match(r"^[a-zA-Z0-9._:%-~]+$", rkey):
+
raise ValueError("Invalid record key characters")
+
def _validateCid(cls, v: str) -> None:
+
"""Validate CID string format"""
+
raise ValueError("CID too long")
+
if not re.match(r"^[a-zA-Z]+$", v):
+
raise ValueError("CID contains invalid characters")
+
def _validateNsid(cls, v: str) -> None:
+
"""Validate NSID format"""
+
raise ValueError("NSID too long")
+
r"^[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+(\.[a-zA-Z]([a-zA-Z0-9]{0,62})?)$",
+
raise ValueError("NSID contains invalid characters")
+
def _validateTid(cls, v: str) -> None:
+
"""Validate TID format"""
+
raise ValueError("TID too long")
+
r"^[234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{12}$", v
+
raise ValueError("TID contains invalid characters")
+
def _validateRecordKey(cls, v: str) -> None:
+
"""Validate record-key format"""
+
raise ValueError("Record key too long")
+
if v == "." or v == "..":
+
raise ValueError(f"Record key is {v}, which is not allowed")
+
if not re.match(r"^[a-zA-Z0-9._:%-~]+$", v):
+
raise ValueError("Record key contains invalid characters")
+
def _validateLanguage(cls, v: str) -> None:
+
"""Validate BCP 47 language tag"""
+
if not re.match(r"^[a-zA-Z]{1,8}(-[a-zA-Z0-9]{1,8})*$", v):
+
raise ValueError("Invalid language tag format")
+
cls._validateDatetime(v)
+
elif cls.format == "uri":
+
elif cls.format == "did":
+
elif cls.format == "handle":
+
elif cls.format == "at-identifier":
+
cls._validateAtIdentifier(v)
+
elif cls.format == "at-uri":
+
elif cls.format == "cid":
+
elif cls.format == "nsid":
+
elif cls.format == "tid":
+
elif cls.format == "record-key":
+
cls._validateRecordKey(v)
+
elif cls.format == "language":
+
cls._validateLanguage(v)
+
def _validateDatetime(cls, v: str) -> None:
+
"""Validate RFC 3339 datetime format"""
+
datetime.fromisoformat(v.replace("Z", "+00:00"))
+
raise ValueError("Invalid datetime format")
+
def _validateUri(cls, v: str) -> None:
+
"""Validate URI format"""
+
raise ValueError("URI too long")
+
if not re.match(r"^[a-zA-Z][a-zA-Z0-9+.-]*:.+", v):
+
raise ValueError("Invalid URI format")
+
# ... (other validation methods remain the same)
+
# Binary Types (from binary.py)
+
class BytesModel(DataModel):
+
"""Model for AT Protocol bytes type."""
+
minLength: int | None = None
+
"""Minimum size in bytes"""
+
maxLength: int | None = None
+
"""Maximum size in bytes"""
+
def __init__(self, **data: Any) -> None:
+
Initialize bytes model with validation.
+
**data: Input data containing bytes value and constraints
+
ValidationError: If length constraints are violated
+
super().__init__(**data)
+
@field_validator("value")
+
def validateLength(cls, v: bytes, info: Any) -> bytes:
+
Validate bytes length against constraints.
+
v: Bytes value to validate
+
info: Validation info containing field values
+
ValidationError: If length constraints are violated
+
minLen = info.data.get("minLength")
+
maxLen = info.data.get("maxLength")
+
if minLen is not None and len(v) < minLen:
+
message=f"Bytes length {len(v)} is less than minimum {minLen}",
+
if maxLen is not None and len(v) > maxLen:
+
field="value", message=f"Bytes length {len(v)} exceeds maximum {maxLen}"
+
@field_serializer("value")
+
def serializeBytes(self, v: bytes) -> dict[str, str]:
+
Serialize bytes to JSON format with base64 encoding.
+
Dictionary with base64 encoded bytes
+
SerializationError: If encoding fails
+
return {"$bytes": base64.b64encode(v).decode()}
+
raise SerializationError("value", f"Failed to encode bytes: {e}")
+
class CidLinkModel(DataModel):
+
"""Model for AT Protocol CID link type."""
+
"""CID reference to linked content"""
+
def __init__(self, **data: Any) -> None:
+
Initialize CID link model with validation.
+
**data: Input data containing CID link
+
InvalidCIDError: If CID is invalid
+
if isinstance(data.get("link"), str):
+
data["link"] = make_cid(data["link"])
+
except ValueError as e:
+
raise InvalidCIDError(f"Invalid CID: {e}")
+
super().__init__(**data)
+
@field_serializer("link")
+
def serializeCid(self, v: CIDv1) -> dict[str, str]:
+
Serialize CID to JSON format.
+
Dictionary with string CID representation
+
return {"$link": str(v)}
+
# Complex Types (from complex.py)
+
class ArrayModel(DataModel):
+
"""Model for AT Protocol array type."""
+
"""Schema definition for array elements"""
+
minLength: int | None = None
+
"""Minimum number of elements"""
+
maxLength: int | None = None
+
"""Maximum number of elements"""
+
def __init__(self, **data: Any) -> None:
+
Initialize array model with validation.
+
**data: Input data containing array values
+
ValueError: If array violates constraints
+
super().__init__(**data)
+
@field_validator("value", mode="before")
+
def validateArray(cls, v: Any) -> list[Any]:
+
Validate array structure and elements.
+
ValueError: If array violates constraints
+
if not isinstance(v, list):
+
raise ValueError("Value must be an array")
+
if cls.minLength is not None and len(v) < cls.minLength:
+
raise ValueError(f"Array must have at least {cls.minLength} items")
+
if cls.maxLength is not None and len(v) > cls.maxLength:
+
raise ValueError(f"Array must have at most {cls.maxLength} items")
+
class ObjectModel(DataModel):
+
"""Model for AT Protocol object type."""
+
properties: dict[str, Any]
+
"""Map of property names to their schema definitions"""
+
required: list[str] | None = None
+
"""List of required property names"""
+
nullable: list[str] | None = None
+
"""List of properties that can be null"""
+
"""Object property values"""
+
def __init__(self, **data: Any) -> None:
+
Initialize object model with validation.
+
**data: Input data containing object properties
+
ValueError: If object violates constraints
+
super().__init__(**data)
+
@field_validator("value", mode="before")
+
def validateObject(cls, v: Any) -> dict[str, Any]:
+
Validate object structure and properties.
+
ValueError: If object violates constraints
+
if not isinstance(v, dict):
+
raise ValueError("Value must be an object")
+
for field in cls.required:
+
raise ValueError(f"Missing required field: {field}")
+
for field, value in v.items():
+
if field not in cls.nullable and value is None:
+
raise ValueError(f"Field {field} cannot be null")
+
class ParamsModel(DataModel):
+
"""Model for AT Protocol params type."""
+
required: list[str] | None = None
+
"""List of required parameter names"""
+
properties: dict[str, Any]
+
"""Map of parameter names to their schema definitions"""
+
def __init__(self, **data: Any) -> None:
+
Initialize params model with validation.
+
**data: Input data containing parameter values
+
ValueError: If parameters violate constraints
+
super().__init__(**data)
+
@field_validator("value", mode="before")
+
def validateParams(cls, v: Any) -> dict[str, Any]:
+
Validate parameters structure and values.
+
Validated parameters dictionary
+
ValueError: If parameters violate constraints
+
if not isinstance(v, dict):
+
raise ValueError("Value must be a dictionary of parameters")
+
for param in cls.required:
+
if param not in validated:
+
raise ValueError(f"Missing required parameter: {param}")
+
for param, value in validated.items():
+
if param in cls.properties:
+
propType = cls.properties[param].get("type")
+
if propType == "boolean" and not isinstance(value, bool):
+
raise ValueError(f"Parameter {param} must be boolean")
+
elif propType == "integer" and not isinstance(value, int):
+
raise ValueError(f"Parameter {param} must be integer")
+
elif propType == "string" and not isinstance(value, str):
+
raise ValueError(f"Parameter {param} must be string")
+
elif propType == "array":
+
if not isinstance(value, list):
+
raise ValueError(f"Parameter {param} must be array")
+
if "items" in cls.properties[param]:
+
itemType = cls.properties[param]["items"].get("type")
+
if itemType == "boolean" and not isinstance(item, bool):
+
f"Array item in {param} must be boolean"
+
elif itemType == "integer" and not isinstance(item, int):
+
f"Array item in {param} must be integer"
+
elif itemType == "string" and not isinstance(item, str):
+
f"Array item in {param} must be string"
+
elif itemType == "unknown" and not isinstance(item, dict):
+
f"Array item in {param} must be object"
+
elif propType == "unknown" and not isinstance(value, dict):
+
raise ValueError(f"Parameter {param} must be object")