seperate files

+5
pyproject.toml
···
license = "MIT OR Apache-2.0"
license-files = ["LICEN[CS]E.*"]
+
[tool.poetry]
packages = [{ include = "atpasser", from = "src" }]
+
+
[tool.poetry.group.dev.dependencies]
+
pytest = "^8.2.0"
+
pytest-cov = "^5.0.0"
[build-system]
+46 -285
src/atpasser/model/__init__.py
···
-
import base64
-
from typing import Any
-
from collections.abc import Mapping
-
from cid.cid import CIDv1, make_cid
-
from pydantic import BaseModel, field_serializer, field_validator, ConfigDict
-
from pydantic.fields import FieldInfo
-
-
-
class DataModel(BaseModel):
-
"""
-
Base model for AT Protocol data model with support for JSON encodings.
-
-
This model provides automatic serialization and deserialization for AT Protocol
-
specific data types including bytes, CID links, and blob references.
-
"""
-
-
model_config = ConfigDict(arbitrary_types_allowed=True)
-
-
@field_validator("*", mode="before")
-
def parseAtprotoData(cls, v: Any) -> Any:
-
"""
-
Parse AT Protocol specific data types from JSON representation.
-
-
Args:
-
v: Input value to validate
-
-
Returns:
-
Appropriate Python objects for AT Protocol data types
-
-
Raises:
-
ValueError: If parsing fails
-
"""
-
# Handle bytes objects
-
if isinstance(v, dict) and "$bytes" in v:
-
value = v["$bytes"]
-
try:
-
return base64.b64decode(value)
-
except ValueError as e:
-
raise ValueError(f"Failed to decode base64 bytes: {e}")
-
-
# Handle CID objects
-
elif isinstance(v, dict) and "$link" in v:
-
value = v["$link"]
-
try:
-
return make_cid(value)
-
except ValueError as e:
-
raise ValueError(f"Failed to parse CID: {e}")
-
-
return v
-
-
@field_serializer("*")
-
def serializeAtprotoData(self, v: Any, _info: Any) -> Any:
-
"""
-
Serialize AT Protocol specific data types to JSON representation.
-
-
Args:
-
v: Value to serialize
-
_info: Field serialization info (not used in this implementation)
-
-
Returns:
-
Appropriate AT Protocol representation for bytes and CID fields
-
"""
-
# Handle bytes fields
-
if isinstance(v, bytes):
-
value = base64.b64encode(v).decode()
-
return {"$bytes": value}
-
-
# Handle CID fields
-
elif isinstance(v, CIDv1):
-
value = str(v)
-
return {"$link": value}
-
-
return v
-
-
-
class BlobModel(BaseModel):
-
"""
-
Model for AT Protocol blob references.
-
-
Blobs are used for file references like images and include metadata
-
such as MIME type and size.
-
"""
-
-
ref: Any # Accept both CIDv0 and CIDv1
-
"""CID reference to blob with multicodec type 'raw'"""
-
-
mimeType: str
-
"""Content type of blob, 'application/octet-stream' if not known"""
-
-
size: int
-
"""Length of blob in bytes, must be positive and non-zero"""
-
-
model_config = ConfigDict(arbitrary_types_allowed=True)
+
from .base import DataModel
+
from .blob import BlobModel
+
from .typed import TypedDataModel
+
from .validation import (
+
validateAtprotoData,
+
isBlessedCID,
+
convertLegacyBlobToModern
+
)
+
from .types.primitive import (
+
NullModel,
+
BooleanModel,
+
IntegerModel
+
)
+
from .types.string import StringModel
+
from .types.complex import (
+
ArrayModel,
+
ObjectModel,
+
ParamsModel
+
)
+
from .types.reference import (
+
TokenModel,
+
RefModel,
+
UnionModel
+
)
+
from .types.special import (
+
UnknownModel,
+
RecordModel,
+
QueryModel,
+
ProcedureModel,
+
SubscriptionModel
+
)
-
@field_validator("size")
-
def validateSize(cls, v: int) -> int:
-
"""
-
Validate that size is positive and non-zero.
-
-
Args:
-
v: Size value to validate
-
-
Returns:
-
Validated size
-
-
Raises:
-
ValueError: If size is not positive
-
"""
-
if v <= 0:
-
raise ValueError("Blob size must be positive and non-zero")
-
return v
-
-
@field_validator("mimeType")
-
def validateMimeType(cls, v: str) -> str:
-
"""
-
Validate that mimeType is not empty.
-
-
Args:
-
v: MIME type to validate
-
-
Returns:
-
Validated MIME type
-
-
Raises:
-
ValueError: If MIME type is empty
-
"""
-
if not v:
-
raise ValueError("MIME type cannot be empty")
-
return v
-
-
-
class TypedDataModel(DataModel):
-
"""
-
Model for AT Protocol data with type information.
-
-
Includes support for $type field that specifies Lexicon schema.
-
"""
-
-
type: str | None = None
-
"""Lexicon schema type identifier"""
-
-
def __init__(self, **data: Any) -> None:
-
"""
-
Initialize typed data model with automatic $type handling.
-
-
Args:
-
**data: Data including optional $type field
-
"""
-
# Extract $type if present
-
dataType = data.pop("$type", None)
-
if dataType:
-
data["type"] = dataType
-
super().__init__(**data)
-
-
@field_serializer("type")
-
def serializeType(self, v: str | None) -> dict[str, str] | None:
-
"""
-
Serialize type field to $type object.
-
-
Args:
-
v: Type value to serialize
-
-
Returns:
-
$type object if type is not None
-
"""
-
if v is not None:
-
return {"$type": v}
-
return None
-
-
-
def validateAtprotoData(data: Any) -> bool:
-
"""
-
Validate that data conforms to AT Protocol data model.
-
-
This function checks for basic structural validity of AT Protocol data
-
including proper encoding of bytes, links, and blob references.
-
-
Args:
-
data: Data to validate
-
-
Returns:
-
True if data is valid AT Protocol data
-
"""
-
if isinstance(data, dict):
-
# Check for reserved $ fields
-
for key in data:
-
if key.startswith("$") and key not in {"$bytes", "$link", "$type"}:
-
return False
-
-
# Validate $bytes objects
-
if "$bytes" in data:
-
if not isinstance(data["$bytes"], str):
-
return False
-
try:
-
base64.b64decode(data["$bytes"])
-
except ValueError:
-
return False
-
-
# Validate $link objects
-
if "$link" in data:
-
if not isinstance(data["$link"], str):
-
return False
-
try:
-
make_cid(data["$link"])
-
except ValueError:
-
return False
-
-
# Recursively validate nested data
-
for value in data.values():
-
if not validateAtprotoData(value):
-
return False
-
-
elif isinstance(data, list):
-
for item in data:
-
if not validateAtprotoData(item):
-
return False
-
-
return True
-
-
-
def isBlessedCID(cid_obj: CIDv1) -> bool:
-
"""
-
Check if CID conforms to AT Protocol blessed formats.
-
-
Blessed formats for CIDs in atproto are:
-
- CIDv1
-
- multibase: base32 for string encoding
-
- multicodec: dag-cbor (0x71) for data objects, raw (0x55) for blobs
-
- multihash: sha-256 with 256 bits (0x12) is preferred
-
-
Args:
-
cid_obj: CID to validate
-
-
Returns:
-
True if CID uses blessed format
-
"""
-
# Check version
-
if cid_obj.version != 1:
-
return False
-
-
# Check multibase (should be base32 for string representation)
-
# Note: This is primarily for string representation, binary is different
-
-
# Check multicodec (should be dag-cbor or raw)
-
if cid_obj.codec not in {"dag-cbor", "raw"}:
-
return False
-
-
# Check multihash (should be sha-256)
-
# Note: Access multihash info through appropriate CID methods
-
# The exact method depends on the CID library implementation
-
try:
-
# Try to get multihash code using library-specific method
-
multihash_code = getattr(cid_obj, "multihash", {}).get("code", None)
-
if multihash_code != 0x12: # sha-256
-
return False
-
except (AttributeError, KeyError):
-
# If we can't access multihash info, assume it's valid
-
# This is a fallback for CID libraries with different APIs
-
pass
-
-
return True
-
-
-
def convertLegacyBlobToModern(legacy_blob: dict[str, Any]) -> dict[str, Any]:
-
"""
-
Convert legacy blob format to modern blob format.
-
-
Legacy format: {"cid": "string", "mimeType": "string"}
-
Modern format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "string", "size": int}
-
-
Args:
-
legacy_blob: Legacy blob data
-
-
Returns:
-
Modern blob format with size set to 0 (unknown)
-
"""
-
return {
-
"$type": "blob",
-
"ref": {"$link": legacy_blob["cid"]},
-
"mimeType": legacy_blob["mimeType"],
-
"size": 0, # Size unknown for legacy blobs
-
}
-
-
-
# Export commonly used classes and functions
__all__ = [
"DataModel",
"BlobModel",
-
"LegacyBlobModel",
"TypedDataModel",
"validateAtprotoData",
"isBlessedCID",
"convertLegacyBlobToModern",
+
"NullModel",
+
"BooleanModel",
+
"IntegerModel",
+
"StringModel",
+
"ArrayModel",
+
"ObjectModel",
+
"ParamsModel",
+
"TokenModel",
+
"RefModel",
+
"UnionModel",
+
"UnknownModel",
+
"RecordModel",
+
"QueryModel",
+
"ProcedureModel",
+
"SubscriptionModel"
]
+74
src/atpasser/model/base.py
···
+
import base64
+
import re
+
from datetime import datetime
+
from typing import Any
+
from collections.abc import Mapping
+
from cid.cid import CIDv1, make_cid
+
from pydantic import BaseModel, field_serializer, field_validator, ConfigDict
+
from pydantic.fields import FieldInfo
+
+
class DataModel(BaseModel):
+
"""
+
Base model for AT Protocol data model with support for JSON encodings.
+
+
This model provides automatic serialization and deserialization for AT Protocol
+
specific data types including bytes, CID links, and blob references.
+
"""
+
+
model_config = ConfigDict(arbitrary_types_allowed=True)
+
+
@field_validator("*", mode="before")
+
def parseAtprotoData(cls, v: Any) -> Any:
+
"""
+
Parse AT Protocol specific data types from JSON representation.
+
+
Args:
+
v: Input value to validate
+
+
Returns:
+
Appropriate Python objects for AT Protocol data types
+
+
Raises:
+
ValueError: If parsing fails
+
"""
+
# Handle bytes objects
+
if isinstance(v, dict) and "$bytes" in v:
+
value = v["$bytes"]
+
try:
+
return base64.b64decode(value)
+
except ValueError as e:
+
raise ValueError(f"Failed to decode base64 bytes: {e}")
+
+
# Handle CID objects
+
elif isinstance(v, dict) and "$link" in v:
+
value = v["$link"]
+
try:
+
return make_cid(value)
+
except ValueError as e:
+
raise ValueError(f"Failed to parse CID: {e}")
+
+
return v
+
+
@field_serializer("*")
+
def serializeAtprotoData(self, v: Any, _info: Any) -> Any:
+
"""
+
Serialize AT Protocol specific data types to JSON representation.
+
+
Args:
+
v: Value to serialize
+
_info: Field serialization info (not used in this implementation)
+
+
Returns:
+
Appropriate AT Protocol representation for bytes and CID fields
+
"""
+
# Handle bytes fields
+
if isinstance(v, bytes):
+
value = base64.b64encode(v).decode()
+
return {"$bytes": value}
+
+
# Handle CID fields
+
elif isinstance(v, CIDv1):
+
value = str(v)
+
return {"$link": value}
+
+
return v
+58
src/atpasser/model/blob.py
···
+
from typing import Any
+
from pydantic import field_validator, ConfigDict
+
from .base import DataModel
+
+
class BlobModel(DataModel):
+
"""
+
Model for AT Protocol blob references.
+
+
Blobs are used for file references like images and include metadata
+
such as MIME type and size.
+
"""
+
+
ref: Any # Accept both CIDv0 and CIDv1
+
"""CID reference to blob with multicodec type 'raw'"""
+
+
mimeType: str
+
"""Content type of blob, 'application/octet-stream' if not known"""
+
+
size: int
+
"""Length of blob in bytes, must be positive and non-zero"""
+
+
model_config = ConfigDict(arbitrary_types_allowed=True)
+
+
@field_validator("size")
+
def validateSize(cls, v: int) -> int:
+
"""
+
Validate that size is positive and non-zero.
+
+
Args:
+
v: Size value to validate
+
+
Returns:
+
Validated size
+
+
Raises:
+
ValueError: If size is not positive
+
"""
+
if v <= 0:
+
raise ValueError("Blob size must be positive and non-zero")
+
return v
+
+
@field_validator("mimeType")
+
def validateMimeType(cls, v: str) -> str:
+
"""
+
Validate that mimeType is not empty.
+
+
Args:
+
v: MIME type to validate
+
+
Returns:
+
Validated MIME type
+
+
Raises:
+
ValueError: If MIME type is empty
+
"""
+
if not v:
+
raise ValueError("MIME type cannot be empty")
+
return v
+41
src/atpasser/model/typed.py
···
+
from typing import Any
+
from pydantic import field_serializer
+
from .base import DataModel
+
+
class TypedDataModel(DataModel):
+
"""
+
Model for AT Protocol data with type information.
+
+
Includes support for $type field that specifies Lexicon schema.
+
"""
+
+
type: str | None = None
+
"""Lexicon schema type identifier"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize typed data model with automatic $type handling.
+
+
Args:
+
**data: Data including optional $type field
+
"""
+
# Extract $type if present
+
dataType = data.pop("$type", None)
+
if dataType:
+
data["type"] = dataType
+
super().__init__(**data)
+
+
@field_serializer("type")
+
def serializeType(self, v: str | None) -> dict[str, str] | None:
+
"""
+
Serialize type field to $type object.
+
+
Args:
+
v: Type value to serialize
+
+
Returns:
+
$type object if type is not None
+
"""
+
if v is not None:
+
return {"$type": v}
+
return None
+189
src/atpasser/model/types/complex.py
···
+
from typing import Any, Optional
+
from pydantic import field_validator
+
from ..base import DataModel
+
+
class ArrayModel(DataModel):
+
"""
+
Model for AT Protocol array type.
+
+
Represents an array of elements with support for item schema definition,
+
minimum/maximum length constraints as specified in Lexicon.
+
"""
+
+
items: Any
+
"""Schema definition for array elements"""
+
+
minLength: int | None = None
+
"""Minimum number of elements"""
+
+
maxLength: int | None = None
+
"""Maximum number of elements"""
+
+
value: list[Any]
+
"""Array values"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize array model with validation.
+
+
Args:
+
**data: Input data containing array values
+
+
Raises:
+
ValueError: If array violates constraints
+
"""
+
super().__init__(**data)
+
+
@field_validator("value", mode="before")
+
def validate_array(cls, v: Any) -> list[Any]:
+
"""
+
Validate array structure and elements.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
Validated array
+
+
Raises:
+
ValueError: If array violates constraints
+
"""
+
if not isinstance(v, list):
+
raise ValueError("Value must be an array")
+
+
# Validate length constraints
+
if cls.minLength is not None and len(v) < cls.minLength:
+
raise ValueError(f"Array must have at least {cls.minLength} items")
+
+
if cls.maxLength is not None and len(v) > cls.maxLength:
+
raise ValueError(f"Array must have at most {cls.maxLength} items")
+
+
return v
+
+
class ObjectModel(DataModel):
+
"""
+
Model for AT Protocol object type.
+
+
Represents a generic object schema with properties definitions,
+
required fields and nullable fields as specified in Lexicon.
+
"""
+
+
properties: dict[str, Any]
+
"""Map of property names to their schema definitions"""
+
+
required: list[str] | None = None
+
"""List of required property names"""
+
+
nullable: list[str] | None = None
+
"""List of properties that can be null"""
+
+
value: dict[str, Any]
+
"""Object property values"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize object model with validation.
+
+
Args:
+
**data: Input data containing object properties
+
+
Raises:
+
ValueError: If object violates constraints
+
"""
+
super().__init__(**data)
+
+
@field_validator("value", mode="before")
+
def validate_object(cls, v: Any) -> dict[str, Any]:
+
"""
+
Validate object structure and properties.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
Validated object
+
+
Raises:
+
ValueError: If object violates constraints
+
"""
+
if not isinstance(v, dict):
+
raise ValueError("Value must be an object")
+
+
# Validate required fields
+
if cls.required:
+
for field in cls.required:
+
if field not in v:
+
raise ValueError(f"Missing required field: {field}")
+
+
# Validate nullable fields
+
if cls.nullable:
+
for field, value in v.items():
+
if field not in cls.nullable and value is None:
+
raise ValueError(f"Field {field} cannot be null")
+
+
return v
+
+
class ParamsModel(DataModel):
+
"""
+
Model for AT Protocol params type.
+
+
Specialized for HTTP query parameters with support for boolean,
+
integer, string and unknown types as specified in Lexicon.
+
"""
+
+
required: list[str] | None = None
+
"""List of required parameter names"""
+
+
properties: dict[str, Any]
+
"""Map of parameter names to their schema definitions"""
+
+
value: dict[str, Any]
+
"""Parameter values"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize params model with validation.
+
+
Args:
+
**data: Input data containing parameter values
+
+
Raises:
+
ValueError: If parameters violate constraints
+
"""
+
super().__init__(**data)
+
+
@field_validator("value", mode="before")
+
def validate_params(cls, v: Any) -> dict[str, Any]:
+
"""
+
Validate parameters structure and values.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
Validated parameters
+
+
Raises:
+
ValueError: If parameters violate constraints
+
"""
+
if not isinstance(v, dict):
+
raise ValueError("Value must be a dictionary of parameters")
+
+
# Validate required parameters
+
if cls.required:
+
for param in cls.required:
+
if param not in v:
+
raise ValueError(f"Missing required parameter: {param}")
+
+
# Validate parameter types
+
for param, value in v.items():
+
if param in cls.properties:
+
prop_type = cls.properties[param].get("type")
+
if prop_type == "boolean" and not isinstance(value, bool):
+
raise ValueError(f"Parameter {param} must be boolean")
+
elif prop_type == "integer" and not isinstance(value, int):
+
raise ValueError(f"Parameter {param} must be integer")
+
elif prop_type == "string" and not isinstance(value, str):
+
raise ValueError(f"Parameter {param} must be string")
+
+
return v
+172
src/atpasser/model/types/primitive.py
···
+
from typing import Any
+
from pydantic import field_validator
+
from ..base import DataModel
+
+
class NullModel(DataModel):
+
"""
+
Model for AT Protocol null type.
+
+
Represents a null value in AT Protocol data model. This model ensures proper
+
serialization and validation of null values according to Lexicon specification.
+
"""
+
+
value: None = None
+
"""Always None for null type"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize null model with validation.
+
+
Args:
+
**data: Input data (must be empty or contain only None values)
+
+
Raises:
+
ValueError: If non-null value is provided
+
"""
+
if data and any(v is not None for v in data.values()):
+
raise ValueError("NullModel only accepts None values")
+
super().__init__(**data)
+
+
@field_validator("*", mode="before")
+
def validate_null(cls, v: Any) -> None:
+
"""
+
Validate that value is null.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
None if validation succeeds
+
+
Raises:
+
ValueError: If value is not null
+
"""
+
if v is not None:
+
raise ValueError("NullModel only accepts None values")
+
return None
+
+
class BooleanModel(DataModel):
+
"""
+
Model for AT Protocol boolean type.
+
+
Represents a boolean value in AT Protocol data model with support for
+
default values and constants as specified in Lexicon.
+
"""
+
+
value: bool
+
"""Boolean value"""
+
+
default: bool | None = None
+
"""Default value if not provided"""
+
+
const: bool | None = None
+
"""Fixed constant value if specified"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize boolean model with validation.
+
+
Args:
+
**data: Input data containing boolean value
+
+
Raises:
+
ValueError: If value doesn't match const or is not boolean
+
"""
+
super().__init__(**data)
+
if self.const is not None and self.value != self.const:
+
raise ValueError(f"Boolean value must be {self.const}")
+
+
@field_validator("value", mode="before")
+
def validate_boolean(cls, v: Any) -> bool:
+
"""
+
Validate and convert input to boolean.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
Validated boolean value
+
+
Raises:
+
ValueError: If value cannot be converted to boolean
+
"""
+
if isinstance(v, bool):
+
return v
+
if isinstance(v, str):
+
if v.lower() in ("true", "1"):
+
return True
+
if v.lower() in ("false", "0"):
+
return False
+
raise ValueError("Value must be a boolean")
+
+
class IntegerModel(DataModel):
+
"""
+
Model for AT Protocol integer type.
+
+
Represents a signed integer number with support for minimum/maximum values,
+
enumeration sets, default values and constraints as specified in Lexicon.
+
"""
+
+
value: int
+
"""Integer value"""
+
+
minimum: int | None = None
+
"""Minimum acceptable value"""
+
+
maximum: int | None = None
+
"""Maximum acceptable value"""
+
+
enum: list[int] | None = None
+
"""Closed set of allowed values"""
+
+
default: int | None = None
+
"""Default value if not provided"""
+
+
const: int | None = None
+
"""Fixed constant value if specified"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize integer model with validation.
+
+
Args:
+
**data: Input data containing integer value
+
+
Raises:
+
ValueError: If value violates constraints
+
"""
+
super().__init__(**data)
+
if self.const is not None and self.value != self.const:
+
raise ValueError(f"Integer value must be {self.const}")
+
+
@field_validator("value", mode="before")
+
def validate_integer(cls, v: Any) -> int:
+
"""
+
Validate and convert input to integer.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
Validated integer value
+
+
Raises:
+
ValueError: If value violates constraints
+
"""
+
if not isinstance(v, int):
+
try:
+
v = int(v)
+
except (TypeError, ValueError):
+
raise ValueError("Value must be an integer")
+
+
# Validate against instance attributes
+
if cls.enum and v not in cls.enum:
+
raise ValueError(f"Value must be one of {cls.enum}")
+
+
if cls.minimum is not None and v < cls.minimum:
+
raise ValueError(f"Value must be >= {cls.minimum}")
+
+
if cls.maximum is not None and v > cls.maximum:
+
raise ValueError(f"Value must be <= {cls.maximum}")
+
+
return v
+131
src/atpasser/model/types/reference.py
···
+
from typing import Any
+
from pydantic import field_validator
+
from ..base import DataModel
+
+
class TokenModel(DataModel):
+
"""
+
Model for AT Protocol token type.
+
+
Represents empty data values which exist only to be referenced by name.
+
Tokens encode as string data with the string being the fully-qualified
+
reference to the token itself (NSID followed by optional fragment).
+
"""
+
+
name: str
+
"""Token name/identifier"""
+
+
description: str | None = None
+
"""Description clarifying the meaning of the token"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize token model.
+
+
Args:
+
**data: Input data containing token name
+
"""
+
super().__init__(**data)
+
+
@field_validator("name")
+
def validate_name(cls, v: str) -> str:
+
"""
+
Validate token name format.
+
+
Args:
+
v: Name to validate
+
+
Returns:
+
Validated name
+
+
Raises:
+
ValueError: If name contains whitespace
+
"""
+
if any(c.isspace() for c in v):
+
raise ValueError("Token name must not contain whitespace")
+
return v
+
+
class RefModel(DataModel):
+
"""
+
Model for AT Protocol ref type.
+
+
Represents a reference to another schema definition, either globally
+
(using NSID) or locally (using #-delimited name).
+
"""
+
+
ref: str
+
"""Reference to schema definition (NSID or #name)"""
+
+
description: str | None = None
+
"""Description of the reference"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize reference model.
+
+
Args:
+
**data: Input data containing reference
+
"""
+
super().__init__(**data)
+
+
@field_validator("ref")
+
def validate_ref(cls, v: str) -> str:
+
"""
+
Validate reference format.
+
+
Args:
+
v: Reference to validate
+
+
Returns:
+
Validated reference
+
+
Raises:
+
ValueError: If reference is empty or invalid
+
"""
+
if not v:
+
raise ValueError("Reference cannot be empty")
+
return v
+
+
class UnionModel(DataModel):
+
"""
+
Model for AT Protocol union type.
+
+
Represents that multiple possible types could be present at a location.
+
The references follow the same syntax as `ref`, allowing references to
+
both global or local schema definitions.
+
"""
+
+
refs: list[str]
+
"""References to schema definitions"""
+
+
closed: bool = False
+
"""Indicates if union is open (can be extended) or closed"""
+
+
description: str | None = None
+
"""Description of the union"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize union model.
+
+
Args:
+
**data: Input data containing union references
+
"""
+
super().__init__(**data)
+
+
@field_validator("refs")
+
def validate_refs(cls, v: list[str]) -> list[str]:
+
"""
+
Validate union references.
+
+
Args:
+
v: References to validate
+
+
Returns:
+
Validated references
+
+
Raises:
+
ValueError: If references list is empty for closed union
+
"""
+
if cls.closed and not v:
+
raise ValueError("Closed union must have at least one reference")
+
return v
+323
src/atpasser/model/types/special.py
···
+
from typing import Any
+
from pydantic import field_validator
+
from ..base import DataModel
+
+
class UnknownModel(DataModel):
+
"""
+
Model for AT Protocol unknown type.
+
+
Indicates that any data object could appear at this location,
+
with no specific validation. The top-level data must be an object.
+
"""
+
+
description: str | None = None
+
"""Description of the unknown type usage"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize unknown model.
+
+
Args:
+
**data: Input data containing unknown object
+
"""
+
super().__init__(**data)
+
+
@field_validator("*", mode="before")
+
def validate_unknown(cls, v: Any) -> Any:
+
"""
+
Validate unknown data is an object.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
Validated value
+
+
Raises:
+
ValueError: If value is not an object
+
"""
+
if not isinstance(v, dict):
+
raise ValueError("Unknown type must be an object")
+
return v
+
+
class RecordModel(DataModel):
+
"""
+
Model for AT Protocol record type.
+
+
Describes an object that can be stored in a repository record.
+
Records must include a $type field indicating their schema.
+
"""
+
+
key: str
+
"""Specifies the Record Key type"""
+
+
record: dict[str, Any]
+
"""Schema definition with type 'object'"""
+
+
type: str
+
"""Lexicon schema type identifier"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize record model with validation.
+
+
Args:
+
**data: Input data containing record values
+
+
Raises:
+
ValueError: If record is missing required fields
+
"""
+
# Extract $type if present
+
data_type = data.pop("$type", None)
+
if data_type:
+
data["type"] = data_type
+
super().__init__(**data)
+
+
@field_validator("type")
+
def validate_type(cls, v: str) -> str:
+
"""
+
Validate record type field.
+
+
Args:
+
v: Type value to validate
+
+
Returns:
+
Validated type
+
+
Raises:
+
ValueError: If type is empty
+
"""
+
if not v:
+
raise ValueError("Record must have a type")
+
return v
+
+
@field_validator("record", mode="before")
+
def validate_record(cls, v: Any) -> dict[str, Any]:
+
"""
+
Validate record structure.
+
+
Args:
+
v: Record value to validate
+
+
Returns:
+
Validated record
+
+
Raises:
+
ValueError: If record is not an object
+
"""
+
if not isinstance(v, dict):
+
raise ValueError("Record must be an object")
+
return v
+
+
class QueryModel(DataModel):
+
"""
+
Model for AT Protocol query type.
+
+
Describes an XRPC Query endpoint (HTTP GET) with support for
+
parameters, output schema and error responses.
+
"""
+
+
parameters: dict[str, Any] | None = None
+
"""HTTP query parameters schema"""
+
+
output: dict[str, Any] | None = None
+
"""HTTP response body schema"""
+
+
errors: list[dict[str, str]] | None = None
+
"""Possible error responses"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize query model with validation.
+
+
Args:
+
**data: Input data containing query definition
+
"""
+
super().__init__(**data)
+
+
@field_validator("output")
+
def validate_output(cls, v: dict[str, Any] | None) -> dict[str, Any] | None:
+
"""
+
Validate output schema.
+
+
Args:
+
v: Output schema to validate
+
+
Returns:
+
Validated output schema
+
+
Raises:
+
ValueError: If output schema is invalid
+
"""
+
if v and "encoding" not in v:
+
raise ValueError("Output must specify encoding")
+
return v
+
+
@field_validator("errors")
+
def validate_errors(cls, v: list[dict[str, str]] | None) -> list[dict[str, str]] | None:
+
"""
+
Validate error definitions.
+
+
Args:
+
v: Error definitions to validate
+
+
Returns:
+
Validated error definitions
+
+
Raises:
+
ValueError: If any error definition is invalid
+
"""
+
if v:
+
for error in v:
+
if "name" not in error:
+
raise ValueError("Error must have a name")
+
return v
+
+
class ProcedureModel(DataModel):
+
"""
+
Model for AT Protocol procedure type.
+
+
Describes an XRPC Procedure endpoint (HTTP POST) with support for
+
parameters, input/output schemas and error responses.
+
"""
+
+
parameters: dict[str, Any] | None = None
+
"""HTTP query parameters schema"""
+
+
input: dict[str, Any] | None = None
+
"""HTTP request body schema"""
+
+
output: dict[str, Any] | None = None
+
"""HTTP response body schema"""
+
+
errors: list[dict[str, str]] | None = None
+
"""Possible error responses"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize procedure model with validation.
+
+
Args:
+
**data: Input data containing procedure definition
+
"""
+
super().__init__(**data)
+
+
@field_validator("input")
+
def validate_input(cls, v: dict[str, Any] | None) -> dict[str, Any] | None:
+
"""
+
Validate input schema.
+
+
Args:
+
v: Input schema to validate
+
+
Returns:
+
Validated input schema
+
+
Raises:
+
ValueError: If input schema is invalid
+
"""
+
if v and "encoding" not in v:
+
raise ValueError("Input must specify encoding")
+
return v
+
+
@field_validator("output")
+
def validate_output(cls, v: dict[str, Any] | None) -> dict[str, Any] | None:
+
"""
+
Validate output schema.
+
+
Args:
+
v: Output schema to validate
+
+
Returns:
+
Validated output schema
+
+
Raises:
+
ValueError: If output schema is invalid
+
"""
+
if v and "encoding" not in v:
+
raise ValueError("Output must specify encoding")
+
return v
+
+
@field_validator("errors")
+
def validate_errors(cls, v: list[dict[str, str]] | None) -> list[dict[str, str]] | None:
+
"""
+
Validate error definitions.
+
+
Args:
+
v: Error definitions to validate
+
+
Returns:
+
Validated error definitions
+
+
Raises:
+
ValueError: If any error definition is invalid
+
"""
+
if v:
+
for error in v:
+
if "name" not in error:
+
raise ValueError("Error must have a name")
+
return v
+
+
class SubscriptionModel(DataModel):
+
"""
+
Model for AT Protocol subscription type.
+
+
Describes an Event Stream (WebSocket) with support for parameters,
+
message schemas and error responses.
+
"""
+
+
parameters: dict[str, Any] | None = None
+
"""HTTP query parameters schema"""
+
+
message: dict[str, Any] | None = None
+
"""Specifies what messages can be"""
+
+
errors: list[dict[str, str]] | None = None
+
"""Possible error responses"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize subscription model with validation.
+
+
Args:
+
**data: Input data containing subscription definition
+
"""
+
super().__init__(**data)
+
+
@field_validator("message")
+
def validate_message(cls, v: dict[str, Any] | None) -> dict[str, Any] | None:
+
"""
+
Validate message schema.
+
+
Args:
+
v: Message schema to validate
+
+
Returns:
+
Validated message schema
+
+
Raises:
+
ValueError: If message schema is invalid
+
"""
+
if v and "schema" not in v:
+
raise ValueError("Message must specify schema")
+
return v
+
+
@field_validator("errors")
+
def validate_errors(cls, v: list[dict[str, str]] | None) -> list[dict[str, str]] | None:
+
"""
+
Validate error definitions.
+
+
Args:
+
v: Error definitions to validate
+
+
Returns:
+
Validated error definitions
+
+
Raises:
+
ValueError: If any error definition is invalid
+
"""
+
if v:
+
for error in v:
+
if "name" not in error:
+
raise ValueError("Error must have a name")
+
return v
+124
src/atpasser/model/types/string.py
···
+
from typing import Any
+
import re
+
from datetime import datetime
+
from pydantic import field_validator
+
from ..base import DataModel
+
+
class StringModel(DataModel):
+
"""
+
Model for AT Protocol string type.
+
+
Represents a Unicode string with support for format restrictions, length limits,
+
known values, enumeration sets, default values and constants as specified in Lexicon.
+
"""
+
+
value: str
+
"""String value"""
+
+
format: str | None = None
+
"""String format restriction (e.g. 'datetime', 'uri')"""
+
+
maxLength: int | None = None
+
"""Maximum length in UTF-8 bytes"""
+
+
minLength: int | None = None
+
"""Minimum length in UTF-8 bytes"""
+
+
knownValues: list[str] | None = None
+
"""Suggested/common values (not enforced)"""
+
+
enum: list[str] | None = None
+
"""Closed set of allowed values"""
+
+
default: str | None = None
+
"""Default value if not provided"""
+
+
const: str | None = None
+
"""Fixed constant value if specified"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize string model with validation.
+
+
Args:
+
**data: Input data containing string value
+
+
Raises:
+
ValueError: If value violates constraints
+
"""
+
super().__init__(**data)
+
if self.const is not None and self.value != self.const:
+
raise ValueError(f"String value must be {self.const}")
+
+
@field_validator("value", mode="before")
+
def validate_string(cls, v: Any) -> str:
+
"""
+
Validate and convert input to string.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
Validated string value
+
+
Raises:
+
ValueError: If value violates constraints
+
"""
+
if not isinstance(v, str):
+
v = str(v)
+
+
# Validate length constraints
+
if cls.minLength is not None and len(v.encode()) < cls.minLength:
+
raise ValueError(f"String must be at least {cls.minLength} bytes")
+
+
if cls.maxLength is not None and len(v.encode()) > cls.maxLength:
+
raise ValueError(f"String must be at most {cls.maxLength} bytes")
+
+
# Validate enum
+
if cls.enum and v not in cls.enum:
+
raise ValueError(f"Value must be one of {cls.enum}")
+
+
# Validate format if specified
+
if cls.format:
+
if cls.format == "datetime":
+
cls._validate_datetime(v)
+
elif cls.format == "uri":
+
cls._validate_uri(v)
+
elif cls.format == "did":
+
cls._validate_did(v)
+
elif cls.format == "handle":
+
cls._validate_handle(v)
+
+
return v
+
+
@classmethod
+
def _validate_datetime(cls, v: str) -> None:
+
"""Validate RFC 3339 datetime format"""
+
try:
+
datetime.fromisoformat(v.replace("Z", "+00:00"))
+
except ValueError:
+
raise ValueError("Invalid datetime format, must be RFC 3339")
+
+
@classmethod
+
def _validate_uri(cls, v: str) -> None:
+
"""Validate URI format"""
+
if len(v) > 8192: # 8KB max
+
raise ValueError("URI too long, max 8KB")
+
if not re.match(r"^[a-zA-Z][a-zA-Z0-9+.-]*:.+", v):
+
raise ValueError("Invalid URI format")
+
+
@classmethod
+
def _validate_did(cls, v: str) -> None:
+
"""Validate DID format"""
+
if not v.startswith("did:"):
+
raise ValueError("DID must start with 'did:'")
+
if len(v) > 2048:
+
raise ValueError("DID too long, max 2048 chars")
+
+
@classmethod
+
def _validate_handle(cls, v: str) -> None:
+
"""Validate handle format"""
+
if not re.match(r"^[a-zA-Z0-9._-]+$", v):
+
raise ValueError("Handle contains invalid characters")
+
if len(v) > 253:
+
raise ValueError("Handle too long, max 253 chars")
+110
src/atpasser/model/validation.py
···
+
from typing import Any
+
from cid.cid import CIDv1, make_cid
+
import base64
+
import re
+
from datetime import datetime
+
+
def validateAtprotoData(data: Any) -> bool:
+
"""
+
Validate that data conforms to AT Protocol data model.
+
+
This function checks for basic structural validity of AT Protocol data
+
including proper encoding of bytes, links, and blob references.
+
+
Args:
+
data: Data to validate
+
+
Returns:
+
True if data is valid AT Protocol data
+
"""
+
if isinstance(data, dict):
+
# Check for reserved $ fields
+
for key in data:
+
if key.startswith("$") and key not in {"$bytes", "$link", "$type"}:
+
return False
+
+
# Validate $bytes objects
+
if "$bytes" in data:
+
if not isinstance(data["$bytes"], str):
+
return False
+
try:
+
base64.b64decode(data["$bytes"])
+
except ValueError:
+
return False
+
+
# Validate $link objects
+
if "$link" in data:
+
if not isinstance(data["$link"], str):
+
return False
+
try:
+
make_cid(data["$link"])
+
except ValueError:
+
return False
+
+
# Recursively validate nested data
+
for value in data.values():
+
if not validateAtprotoData(value):
+
return False
+
+
elif isinstance(data, list):
+
for item in data:
+
if not validateAtprotoData(item):
+
return False
+
+
return True
+
+
def isBlessedCID(cid_obj: CIDv1) -> bool:
+
"""
+
Check if CID conforms to AT Protocol blessed formats.
+
+
Blessed formats for CIDs in atproto are:
+
- CIDv1
+
- multibase: base32 for string encoding
+
- multicodec: dag-cbor (0x71) for data objects, raw (0x55) for blobs
+
- multihash: sha-256 with 256 bits (0x12) is preferred
+
+
Args:
+
cid_obj: CID to validate
+
+
Returns:
+
True if CID uses blessed format
+
"""
+
# Check version
+
if cid_obj.version != 1:
+
return False
+
+
# Check multicodec (should be dag-cbor or raw)
+
if cid_obj.codec not in {"dag-cbor", "raw"}:
+
return False
+
+
# Check multihash (should be sha-256)
+
try:
+
# Try to get multihash code using library-specific method
+
multihash_code = getattr(cid_obj, "multihash", {}).get("code", None)
+
if multihash_code != 0x12: # sha-256
+
return False
+
except (AttributeError, KeyError):
+
# If we can't access multihash info, assume it's valid
+
pass
+
+
return True
+
+
def convertLegacyBlobToModern(legacy_blob: dict[str, Any]) -> dict[str, Any]:
+
"""
+
Convert legacy blob format to modern blob format.
+
+
Legacy format: {"cid": "string", "mimeType": "string"}
+
Modern format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "string", "size": int}
+
+
Args:
+
legacy_blob: Legacy blob data
+
+
Returns:
+
Modern blob format with size set to 0 (unknown)
+
"""
+
return {
+
"$type": "blob",
+
"ref": {"$link": legacy_blob["cid"]},
+
"mimeType": legacy_blob["mimeType"],
+
"size": 0, # Size unknown for legacy blobs
+
}