further

Changed files
+237 -2
src
+7
src/atpasser/model/converter.py
···
UnknownModel, RecordModel, QueryModel,
ProcedureModel, SubscriptionModel
)
class LexiconConverter:
"""
···
"boolean": BooleanModel,
"integer": IntegerModel,
"string": StringModel,
# Complex types
"array": ArrayModel,
···
UnknownModel, RecordModel, QueryModel,
ProcedureModel, SubscriptionModel
)
+
from .types.binary import BytesModel, CidLinkModel
+
from .blob import BlobModel
class LexiconConverter:
"""
···
"boolean": BooleanModel,
"integer": IntegerModel,
"string": StringModel,
+
+
# Binary types
+
"bytes": BytesModel,
+
"cid-link": CidLinkModel,
+
"blob": BlobModel,
# Complex types
"array": ArrayModel,
+132
src/atpasser/model/types/binary.py
···
···
+
"""
+
Binary data types for AT Protocol Lexicon models.
+
+
Includes models for bytes, CID links and other binary data formats.
+
"""
+
from typing import Any
+
import base64
+
from pydantic import field_validator, field_serializer
+
from cid import CIDv1, make_cid
+
from ..base import DataModel
+
from ..exceptions import ValidationError, SerializationError, InvalidCIDError
+
+
class BytesModel(DataModel):
+
"""
+
Model for AT Protocol bytes type.
+
+
Represents raw binary data that is encoded as base64 in JSON format.
+
"""
+
+
value: bytes
+
"""Raw binary data"""
+
+
min_length: int | None = None
+
"""Minimum size in bytes"""
+
+
max_length: int | None = None
+
"""Maximum size in bytes"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize bytes model with validation.
+
+
Args:
+
**data: Input data containing bytes value and constraints
+
+
Raises:
+
ValidationError: If length constraints are violated
+
"""
+
super().__init__(**data)
+
+
@field_validator("value")
+
def validate_length(cls, v: bytes, info: Any) -> bytes:
+
"""
+
Validate bytes length against constraints.
+
+
Args:
+
v: Bytes value to validate
+
info: Validation info containing field values
+
+
Returns:
+
Validated bytes
+
+
Raises:
+
ValidationError: If length constraints are violated
+
"""
+
min_len = info.data.get("min_length")
+
max_len = info.data.get("max_length")
+
+
if min_len is not None and len(v) < min_len:
+
raise ValidationError(
+
field="value",
+
message=f"Bytes length {len(v)} is less than minimum {min_len}"
+
)
+
+
if max_len is not None and len(v) > max_len:
+
raise ValidationError(
+
field="value",
+
message=f"Bytes length {len(v)} exceeds maximum {max_len}"
+
)
+
+
return v
+
+
@field_serializer("value")
+
def serialize_bytes(self, v: bytes) -> dict[str, str]:
+
"""
+
Serialize bytes to JSON format with base64 encoding.
+
+
Args:
+
v: Bytes to serialize
+
+
Returns:
+
Dictionary with base64 encoded bytes
+
+
Raises:
+
SerializationError: If encoding fails
+
"""
+
try:
+
return {"$bytes": base64.b64encode(v).decode()}
+
except Exception as e:
+
raise SerializationError("value", f"Failed to encode bytes: {e}")
+
+
class CidLinkModel(DataModel):
+
"""
+
Model for AT Protocol CID link type.
+
+
Represents content-addressable links using CIDs (Content Identifiers).
+
"""
+
+
link: CIDv1
+
"""CID reference to linked content"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize CID link model with validation.
+
+
Args:
+
**data: Input data containing CID link
+
+
Raises:
+
InvalidCIDError: If CID is invalid
+
"""
+
# Handle JSON format with $link field
+
if isinstance(data.get("link"), str):
+
try:
+
data["link"] = make_cid(data["link"])
+
except ValueError as e:
+
raise InvalidCIDError(f"Invalid CID: {e}")
+
+
super().__init__(**data)
+
+
@field_serializer("link")
+
def serialize_cid(self, v: CIDv1) -> dict[str, str]:
+
"""
+
Serialize CID to JSON format.
+
+
Args:
+
v: CID to serialize
+
+
Returns:
+
Dictionary with string CID representation
+
"""
+
return {"$link": str(v)}
+26 -1
src/atpasser/model/types/complex.py
···
"""Map of parameter names to their schema definitions"""
value: dict[str, Any]
-
"""Parameter values"""
def __init__(self, **data: Any) -> None:
"""
···
raise ValueError(f"Parameter {param} must be integer")
elif prop_type == "string" and not isinstance(value, str):
raise ValueError(f"Parameter {param} must be string")
return v
···
"""Map of parameter names to their schema definitions"""
value: dict[str, Any]
+
"""Parameter values
+
+
Supported types:
+
- boolean
+
- integer
+
- string
+
- array (of boolean/integer/string/unknown)
+
- unknown (object)
+
"""
def __init__(self, **data: Any) -> None:
"""
···
raise ValueError(f"Parameter {param} must be integer")
elif prop_type == "string" and not isinstance(value, str):
raise ValueError(f"Parameter {param} must be string")
+
elif prop_type == "array":
+
if not isinstance(value, list):
+
raise ValueError(f"Parameter {param} must be array")
+
# Validate array items if schema is specified
+
if "items" in cls.properties[param]:
+
item_type = cls.properties[param]["items"].get("type")
+
for item in value:
+
if item_type == "boolean" and not isinstance(item, bool):
+
raise ValueError(f"Array item in {param} must be boolean")
+
elif item_type == "integer" and not isinstance(item, int):
+
raise ValueError(f"Array item in {param} must be integer")
+
elif item_type == "string" and not isinstance(item, str):
+
raise ValueError(f"Array item in {param} must be string")
+
elif item_type == "unknown" and not isinstance(item, dict):
+
raise ValueError(f"Array item in {param} must be object")
+
elif prop_type == "unknown" and not isinstance(value, dict):
+
raise ValueError(f"Parameter {param} must be object")
return v
+72 -1
src/atpasser/model/types/string.py
···
cls._validate_did(v)
elif cls.format == "handle":
cls._validate_handle(v)
return v
···
if not re.match(r"^[a-zA-Z0-9._-]+$", v):
raise ValueError("Handle contains invalid characters")
if len(v) > 253:
-
raise ValueError("Handle too long, max 253 chars")
···
cls._validate_did(v)
elif cls.format == "handle":
cls._validate_handle(v)
+
elif cls.format == "at-identifier":
+
cls._validate_at_identifier(v)
+
elif cls.format == "at-uri":
+
cls._validate_at_uri(v)
+
elif cls.format == "cid":
+
cls._validate_cid(v)
+
elif cls.format == "nsid":
+
cls._validate_nsid(v)
+
elif cls.format == "tid":
+
cls._validate_tid(v)
+
elif cls.format == "record-key":
+
cls._validate_record_key(v)
+
elif cls.format == "language":
+
cls._validate_language(v)
return v
···
if not re.match(r"^[a-zA-Z0-9._-]+$", v):
raise ValueError("Handle contains invalid characters")
if len(v) > 253:
+
raise ValueError("Handle too long, max 253 chars")
+
+
@classmethod
+
def _validate_at_identifier(cls, v: str) -> None:
+
"""Validate at-identifier format (DID or handle)"""
+
try:
+
if v.startswith("did:"):
+
cls._validate_did(v)
+
else:
+
cls._validate_handle(v)
+
except ValueError as e:
+
raise ValueError(f"Invalid at-identifier: {e}")
+
+
@classmethod
+
def _validate_at_uri(cls, v: str) -> None:
+
"""Validate AT-URI format"""
+
if not v.startswith("at://"):
+
raise ValueError("AT-URI must start with 'at://'")
+
if len(v) > 8000:
+
raise ValueError("AT-URI too long, max 8000 chars")
+
+
@classmethod
+
def _validate_cid(cls, v: str) -> None:
+
"""Validate CID string format"""
+
if len(v) > 100:
+
raise ValueError("CID too long, max 100 chars")
+
if not re.match(r"^[a-zA-Z0-9]+$", v):
+
raise ValueError("CID contains invalid characters")
+
+
@classmethod
+
def _validate_nsid(cls, v: str) -> None:
+
"""Validate NSID format"""
+
if len(v) > 317:
+
raise ValueError("NSID too long, max 317 chars")
+
if not re.match(r"^[a-zA-Z0-9.-]+$", v):
+
raise ValueError("NSID contains invalid characters")
+
+
@classmethod
+
def _validate_tid(cls, v: str) -> None:
+
"""Validate TID format"""
+
if len(v) > 13:
+
raise ValueError("TID too long, max 13 chars")
+
if not re.match(r"^[234567abcdefghijklmnopqrstuvwxyz]+$", v):
+
raise ValueError("TID contains invalid characters")
+
+
@classmethod
+
def _validate_record_key(cls, v: str) -> None:
+
"""Validate record-key format"""
+
if len(v) > 512:
+
raise ValueError("Record key too long, max 512 chars")
+
if not re.match(r"^[a-zA-Z0-9._:%-~]+$", v):
+
raise ValueError("Record key contains invalid characters")
+
+
@classmethod
+
def _validate_language(cls, v: str) -> None:
+
"""Validate BCP 47 language tag"""
+
if not re.match(r"^[a-zA-Z]{1,8}(-[a-zA-Z0-9]{1,8})*$", v):
+
raise ValueError("Invalid language tag format")