Compare changes

Choose any two refs to compare.

Changed files
+381
src
atpasser
model
+132
src/atpasser/model/types/binary.py
···
+
"""
+
Binary data types for AT Protocol Lexicon models.
+
+
Includes models for bytes, CID links and other binary data formats.
+
"""
+
from typing import Any
+
import base64
+
from pydantic import field_validator, field_serializer
+
from cid.cid import CIDv1, make_cid
+
from ..base import DataModel
+
from ..exceptions import ValidationError, SerializationError, InvalidCIDError
+
+
class BytesModel(DataModel):
+
"""
+
Model for AT Protocol bytes type.
+
+
Represents raw binary data that is encoded as base64 in JSON format.
+
"""
+
+
value: bytes
+
"""Raw binary data"""
+
+
min_length: int | None = None
+
"""Minimum size in bytes"""
+
+
max_length: int | None = None
+
"""Maximum size in bytes"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize bytes model with validation.
+
+
Args:
+
**data: Input data containing bytes value and constraints
+
+
Raises:
+
ValidationError: If length constraints are violated
+
"""
+
super().__init__(**data)
+
+
@field_validator("value")
+
def validate_length(cls, v: bytes, info: Any) -> bytes:
+
"""
+
Validate bytes length against constraints.
+
+
Args:
+
v: Bytes value to validate
+
info: Validation info containing field values
+
+
Returns:
+
Validated bytes
+
+
Raises:
+
ValidationError: If length constraints are violated
+
"""
+
min_len = info.data.get("min_length")
+
max_len = info.data.get("max_length")
+
+
if min_len is not None and len(v) < min_len:
+
raise ValidationError(
+
field="value",
+
message=f"Bytes length {len(v)} is less than minimum {min_len}"
+
)
+
+
if max_len is not None and len(v) > max_len:
+
raise ValidationError(
+
field="value",
+
message=f"Bytes length {len(v)} exceeds maximum {max_len}"
+
)
+
+
return v
+
+
@field_serializer("value")
+
def serialize_bytes(self, v: bytes) -> dict[str, str]:
+
"""
+
Serialize bytes to JSON format with base64 encoding.
+
+
Args:
+
v: Bytes to serialize
+
+
Returns:
+
Dictionary with base64 encoded bytes
+
+
Raises:
+
SerializationError: If encoding fails
+
"""
+
try:
+
return {"$bytes": base64.b64encode(v).decode()}
+
except Exception as e:
+
raise SerializationError("value", f"Failed to encode bytes: {e}")
+
+
class CidLinkModel(DataModel):
+
"""
+
Model for AT Protocol CID link type.
+
+
Represents content-addressable links using CIDs (Content Identifiers).
+
"""
+
+
link: CIDv1
+
"""CID reference to linked content"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize CID link model with validation.
+
+
Args:
+
**data: Input data containing CID link
+
+
Raises:
+
InvalidCIDError: If CID is invalid
+
"""
+
# Handle JSON format with $link field
+
if isinstance(data.get("link"), str):
+
try:
+
data["link"] = make_cid(data["link"])
+
except ValueError as e:
+
raise InvalidCIDError(f"Invalid CID: {e}")
+
+
super().__init__(**data)
+
+
@field_serializer("link")
+
def serialize_cid(self, v: CIDv1) -> dict[str, str]:
+
"""
+
Serialize CID to JSON format.
+
+
Args:
+
v: CID to serialize
+
+
Returns:
+
Dictionary with string CID representation
+
"""
+
return {"$link": str(v)}
+249
src/atpasser/model/types/string.py
···
+
from typing import Any
+
import re
+
from datetime import datetime
+
from pydantic import field_validator
+
from ..base import DataModel
+
+
class StringModel(DataModel):
+
"""
+
Model for AT Protocol string type.
+
+
Represents a Unicode string with support for format restrictions, length limits,
+
known values, enumeration sets, default values and constants as specified in Lexicon.
+
"""
+
+
value: str
+
"""String value"""
+
+
format: str | None = None
+
"""String format restriction (e.g. 'datetime', 'uri')"""
+
+
maxLength: int | None = None
+
"""Maximum length in UTF-8 bytes"""
+
+
minLength: int | None = None
+
"""Minimum length in UTF-8 bytes"""
+
+
knownValues: list[str] | None = None
+
"""Suggested/common values (not enforced)"""
+
+
enum: list[str] | None = None
+
"""Closed set of allowed values"""
+
+
default: str | None = None
+
"""Default value if not provided"""
+
+
const: str | None = None
+
"""Fixed constant value if specified"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize string model with validation.
+
+
Args:
+
**data: Input data containing string value
+
+
Raises:
+
ValueError: If value violates constraints
+
"""
+
super().__init__(**data)
+
if self.const is not None and self.value != self.const:
+
raise ValueError(f"String value must be {self.const}")
+
+
@field_validator("value", mode="before")
+
def validate_string(cls, v: Any) -> str:
+
"""
+
Validate and convert input to string.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
Validated string value
+
+
Raises:
+
ValueError: If value violates constraints
+
"""
+
if not isinstance(v, str):
+
v = str(v)
+
+
# Validate length constraints
+
if cls.minLength is not None and len(v.encode()) < cls.minLength:
+
raise ValueError(f"String must be at least {cls.minLength} bytes")
+
+
if cls.maxLength is not None and len(v.encode()) > cls.maxLength:
+
raise ValueError(f"String must be at most {cls.maxLength} bytes")
+
+
# Validate enum
+
if cls.enum and v not in cls.enum:
+
raise ValueError(f"Value must be one of {cls.enum}")
+
+
# Validate format if specified
+
if cls.format:
+
if cls.format == "datetime":
+
cls._validate_datetime(v)
+
elif cls.format == "uri":
+
cls._validate_uri(v)
+
elif cls.format == "did":
+
cls._validate_did(v)
+
elif cls.format == "handle":
+
cls._validate_handle(v)
+
elif cls.format == "at-identifier":
+
cls._validate_at_identifier(v)
+
elif cls.format == "at-uri":
+
cls._validate_at_uri(v)
+
elif cls.format == "cid":
+
cls._validate_cid(v)
+
elif cls.format == "nsid":
+
cls._validate_nsid(v)
+
elif cls.format == "tid":
+
cls._validate_tid(v)
+
elif cls.format == "record-key":
+
cls._validate_record_key(v)
+
elif cls.format == "language":
+
cls._validate_language(v)
+
+
return v
+
+
@classmethod
+
def _validate_datetime(cls, v: str) -> None:
+
"""Validate RFC 3339 datetime format"""
+
try:
+
datetime.fromisoformat(v.replace("Z", "+00:00"))
+
except ValueError:
+
raise ValueError("Invalid datetime format, must be RFC 3339")
+
+
@classmethod
+
def _validate_uri(cls, v: str) -> None:
+
"""Validate URI format"""
+
if len(v) > 8192: # 8KB max
+
raise ValueError("URI too long, max 8KB")
+
if not re.match(r"^[a-zA-Z][a-zA-Z0-9+.-]*:.+", v):
+
raise ValueError("Invalid URI format")
+
+
@classmethod
+
def _validate_did(cls, v: str) -> None:
+
"""Validate DID format"""
+
if len(v) > 2048:
+
raise ValueError("DID too long, max 2048 chars")
+
if not re.match(r"^did:[a-z]+:[a-zA-Z0-9._:%-]*[a-zA-Z0-9._-]$", v):
+
raise ValueError("Invalid URI format")
+
+
@classmethod
+
def _validate_handle(cls, v: str) -> None:
+
"""Validate handle format"""
+
if not re.match(r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$", v):
+
raise ValueError("Handle contains invalid characters")
+
if len(v) > 253:
+
raise ValueError("Handle too long, max 253 chars")
+
+
@classmethod
+
def _validate_at_identifier(cls, v: str) -> None:
+
"""Validate at-identifier format (DID or handle)"""
+
try:
+
if v.startswith("did:"):
+
cls._validate_did(v)
+
else:
+
cls._validate_handle(v)
+
except ValueError as e:
+
raise ValueError(f"Invalid at-identifier: {e}")
+
+
@classmethod
+
def _validate_at_uri(cls, v: str) -> None:
+
"""
+
Validate AT-URI format according to AT Protocol specification.
+
+
Args:
+
v: AT-URI string to validate
+
+
Raises:
+
ValueError: If URI violates any of these rules:
+
- Must start with 'at://'
+
- Max length 8KB
+
- No trailing slash
+
- Authority must be valid DID or handle
+
- Path segments must follow NSID/RKEY rules if present
+
"""
+
if not v.startswith("at://"):
+
raise ValueError("AT-URI must start with 'at://'")
+
if len(v) > 8192: # 8KB
+
raise ValueError("AT-URI too long, max 8KB")
+
if v.endswith('/'):
+
raise ValueError("AT-URI cannot have trailing slash")
+
+
# Split into parts
+
parts = v[5:].split('/') # Skip 'at://'
+
authority = parts[0]
+
+
# Validate authority (DID or handle)
+
if not authority:
+
raise ValueError("AT-URI must have authority")
+
+
if authority.startswith('did:'):
+
# Basic DID format check - actual DID validation is done elsewhere
+
if len(authority) > 2048:
+
raise ValueError("DID too long")
+
if ':' not in authority[4:]:
+
raise ValueError("Invalid DID format")
+
else:
+
# Handle validation
+
if not re.match(r'^[a-z0-9.-]+$', authority):
+
raise ValueError("Invalid handle characters")
+
if len(authority) > 253:
+
raise ValueError("Handle too long")
+
+
# Validate path segments if present
+
if len(parts) > 1:
+
if len(parts) > 3:
+
raise ValueError("AT-URI path too deep")
+
+
collection = parts[1]
+
if not re.match(r'^[a-zA-Z0-9.-]+$', collection):
+
raise ValueError("Invalid collection NSID")
+
+
if len(parts) > 2:
+
rkey = parts[2]
+
if not rkey:
+
raise ValueError("Record key cannot be empty")
+
if not re.match(r'^[a-zA-Z0-9._:%-~]+$', rkey):
+
raise ValueError("Invalid record key characters")
+
+
@classmethod
+
def _validate_cid(cls, v: str) -> None:
+
"""Validate CID string format"""
+
if len(v) > 100:
+
raise ValueError("CID too long, max 100 chars")
+
if not re.match(r"^[a-zA-Z0-9]+$", v):
+
raise ValueError("CID contains invalid characters")
+
+
@classmethod
+
def _validate_nsid(cls, v: str) -> None:
+
"""Validate NSID format"""
+
if len(v) > 317:
+
raise ValueError("NSID too long, max 317 chars")
+
if not re.match(r"^[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+(\.[a-zA-Z]([a-zA-Z0-9]{0,62})?)$", v):
+
raise ValueError("NSID contains invalid characters")
+
+
@classmethod
+
def _validate_tid(cls, v: str) -> None:
+
"""Validate TID format"""
+
if len(v) > 13:
+
raise ValueError("TID too long, max 13 chars")
+
if not re.match(r"^[234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{12}$", v):
+
raise ValueError("TID contains invalid characters")
+
+
@classmethod
+
def _validate_record_key(cls, v: str) -> None:
+
"""Validate record-key format"""
+
if len(v) > 512:
+
raise ValueError("Record key too long, max 512 chars")
+
if v == "." or v == "..":
+
raise ValueError(f"Record key is {v}, which is not allowed")
+
if not re.match(r"^[a-zA-Z0-9._:%-~]+$", v):
+
raise ValueError("Record key contains invalid characters")
+
+
@classmethod
+
def _validate_language(cls, v: str) -> None:
+
"""Validate BCP 47 language tag"""
+
if not re.match(r"^[a-zA-Z]{1,8}(-[a-zA-Z0-9]{1,8})*$", v):
+
raise ValueError("Invalid language tag format")