Compare changes

Choose any two refs to compare.

-16
src/atpasser/blob/__init__.py
···
-
import cid
-
import multihash, hashlib
-
-
-
def generateCID(file):
-
hasher = hashlib.new("sha-256")
-
while True:
-
chunk = file.read(8192)
-
if not chunk:
-
break
-
hasher.update(chunk)
-
-
digest = hasher.digest
-
mh = multihash.encode(digest, "sha-256")
-
-
return cid.CIDv1(codec="raw", multihash=mh)
-1
src/atpasser/data/__init__.py
···
-
from ._wrapper import *
-76
src/atpasser/data/_data.py
···
-
import base64
-
from cid import CIDv0, CIDv1, cid, make_cid
-
import json
-
-
-
class Data:
-
"""
-
A class representing data with "$type" key.
-
-
Attributes:
-
type (str): The type of the data.
-
json (str): Original object in JSON format.
-
"""
-
-
def __init__(self, dataType: str, json: str = "{}") -> None:
-
"""
-
Initalizes data object.
-
-
Parameters:
-
type (str): The type of the data.
-
json (str): Original object in JSON format.
-
"""
-
self.type = dataType
-
self.json = json
-
-
def data(self):
-
"""
-
Loads data as a Python-friendly format.
-
-
Returns:
-
dict: Converted data from JSON object.
-
"""
-
return json.loads(self.json, object_hook=dataHook)
-
-
-
def dataHook(data: dict):
-
"""
-
Treated as `JSONDecoder`'s `object_hook`
-
-
Parameters:
-
data: data in format that `JSONDecoder` like ;)
-
"""
-
if "$bytes" in data:
-
return base64.b64decode(data["$bytes"])
-
elif "$link" in data:
-
return make_cid(data["$link"])
-
elif "$type" in data:
-
dataType = data["$type"]
-
del data["$type"]
-
return Data(dataType, json.dumps(data))
-
else:
-
return data
-
-
-
def _convertDataToFakeJSON(data):
-
if isinstance(data, bytes):
-
return {"$bytes": base64.b64encode(data)}
-
elif isinstance(data, (CIDv0, CIDv1)):
-
return {"link": data.encode()}
-
elif isinstance(data, dict):
-
for item in data:
-
data[item] = _convertDataToFakeJSON(data[item])
-
elif isinstance(data, (tuple, list, set)):
-
return [_convertDataToFakeJSON(item) for item in data]
-
else:
-
return data
-
-
-
class DataEncoder(json.JSONEncoder):
-
"""
-
A superset of `JSONEncoder` to support ATProto data.
-
"""
-
-
def default(self, o):
-
result = _convertDataToFakeJSON(o)
-
return super().default(result)
-61
src/atpasser/data/_wrapper.py
···
-
from json import loads
-
from typing import Callable, Any
-
from ._data import *
-
import functools
-
-
# Pyright did the whole job. Thank it.
-
-
-
class DataDecoder(json.JSONDecoder):
-
"""
-
A superset of `JSONDecoder` to support ATProto data.
-
"""
-
-
def __init__(
-
self,
-
*,
-
object_hook: Callable[[dict[str, Any]], Any] | None = dataHook,
-
parse_float: Callable[[str], Any] | None = None,
-
parse_int: Callable[[str], Any] | None = None,
-
parse_constant: Callable[[str], Any] | None = None,
-
strict: bool = True,
-
object_pairs_hook: Callable[[list[tuple[str, Any]]], Any] | None = None,
-
) -> None:
-
super().__init__(
-
object_hook=object_hook,
-
parse_float=parse_float,
-
parse_int=parse_int,
-
parse_constant=parse_constant,
-
strict=strict,
-
object_pairs_hook=object_pairs_hook,
-
)
-
-
-
# Screw it. I have to make 4 `json`-like functions.
-
-
-
def _dataDecoratorForDump(func):
-
@functools.wraps(func)
-
def wrapper(obj, *args, **kwargs):
-
kwargs.setdefault("cls", DataEncoder)
-
return func(obj, *args, **kwargs)
-
-
return wrapper
-
-
-
def _dataDecoratorForLoad(func):
-
@functools.wraps(func)
-
def wrapper(obj, *args, **kwargs):
-
kwargs.setdefault("cls", DataDecoder)
-
return func(obj, *args, **kwargs)
-
-
return wrapper
-
-
-
dump = _dataDecoratorForDump(json.dump)
-
dumps = _dataDecoratorForDump(json.dumps)
-
load = _dataDecoratorForLoad(json.load)
-
loads = _dataDecoratorForLoad(json.loads)
-
"""
-
Wrapper of the JSON functions to support ATProto data.
-
"""
-137
src/atpasser/data/cbor.py
···
-
from datetime import tzinfo
-
import typing
-
import cbor2
-
import cid
-
-
from .data import dataHook, Data
-
-
-
def tagHook(decoder: cbor2.CBORDecoder, tag: cbor2.CBORTag, shareable_index=None):
-
"""
-
A simple tag hook for CID support.
-
"""
-
return cid.from_bytes(tag.value) if tag.tag == 42 else tag
-
-
-
class CBOREncoder(cbor2.CBOREncoder):
-
"""
-
Wrapper of cbor2.CBOREncoder.
-
"""
-
-
def __init__(
-
self,
-
fp: typing.IO[bytes],
-
datetime_as_timestamp: bool = False,
-
timezone: tzinfo | None = None,
-
value_sharing: bool = False,
-
default: (
-
typing.Callable[[cbor2.CBOREncoder, typing.Any], typing.Any] | None
-
) = None,
-
canonical: bool = False,
-
date_as_datetime: bool = False,
-
string_referencing: bool = False,
-
indefinite_containers: bool = False,
-
):
-
super().__init__(
-
fp,
-
datetime_as_timestamp,
-
timezone,
-
value_sharing,
-
default,
-
canonical,
-
date_as_datetime,
-
string_referencing,
-
indefinite_containers,
-
)
-
-
@cbor2.shareable_encoder
-
def cidOrDataEncoder(self: cbor2.CBOREncoder, value: cid.CIDv0 | cid.CIDv1 | Data):
-
"""
-
Encode CID or Data to CBOR Tag.
-
"""
-
if isinstance(value, (cid.CIDv0, cid.CIDv1)):
-
self.encode(cbor2.CBORTag(42, value.encode()))
-
elif isinstance(value, Data):
-
self.encode(value.data())
-
-
-
def _cborObjectHook(decoder: cbor2.CBORDecoder, value):
-
return dataHook(value)
-
-
-
class CBORDecoder(cbor2.CBORDecoder):
-
"""
-
Wrapper of cbor2.CBORDecoder.
-
"""
-
-
def __init__(
-
self,
-
fp: typing.IO[bytes],
-
tag_hook: (
-
typing.Callable[[cbor2.CBORDecoder, cbor2.CBORTag], typing.Any] | None
-
) = tagHook,
-
object_hook: (
-
typing.Callable[
-
[cbor2.CBORDecoder, dict[typing.Any, typing.Any]], typing.Any
-
]
-
| None
-
) = _cborObjectHook,
-
str_errors: typing.Literal["strict", "error", "replace"] = "strict",
-
):
-
super().__init__(fp, tag_hook, object_hook, str_errors)
-
-
-
# Make things for CBOR again.
-
-
from io import BytesIO
-
-
-
def dumps(
-
obj: object,
-
datetime_as_timestamp: bool = False,
-
timezone: tzinfo | None = None,
-
value_sharing: bool = False,
-
default: typing.Callable[[cbor2.CBOREncoder, typing.Any], typing.Any] | None = None,
-
canonical: bool = False,
-
date_as_datetime: bool = False,
-
string_referencing: bool = False,
-
indefinite_containers: bool = False,
-
) -> bytes:
-
with BytesIO() as fp:
-
CBOREncoder(
-
fp,
-
datetime_as_timestamp=datetime_as_timestamp,
-
timezone=timezone,
-
value_sharing=value_sharing,
-
default=default,
-
canonical=canonical,
-
date_as_datetime=date_as_datetime,
-
string_referencing=string_referencing,
-
indefinite_containers=indefinite_containers,
-
).encode(obj)
-
return fp.getvalue()
-
-
-
def dump(
-
obj: object,
-
fp: typing.IO[bytes],
-
datetime_as_timestamp: bool = False,
-
timezone: tzinfo | None = None,
-
value_sharing: bool = False,
-
default: typing.Callable[[cbor2.CBOREncoder, typing.Any], typing.Any] | None = None,
-
canonical: bool = False,
-
date_as_datetime: bool = False,
-
string_referencing: bool = False,
-
indefinite_containers: bool = False,
-
) -> None:
-
CBOREncoder(
-
fp,
-
datetime_as_timestamp=datetime_as_timestamp,
-
timezone=timezone,
-
value_sharing=value_sharing,
-
default=default,
-
canonical=canonical,
-
date_as_datetime=date_as_datetime,
-
string_referencing=string_referencing,
-
indefinite_containers=indefinite_containers,
-
).encode(obj)
-4
tests/__init__.py
···
-
if __name__ != "__main__":
-
raise Exception("name != main")
-
-
import _strings
-179
tests/_strings.py
···
-
from atpasser import did, handle, nsid, rKey, uri
-
-
-
testStrings, testMethods = {}, {}
-
-
-
testStrings[
-
"did"
-
-
] = """did:plc:z72i7hdynmk6r22z27h6tvur
-
-
did:web:blueskyweb.xyz
-
-
did:method:val:two
-
-
did:m:v
-
-
did:method::::val
-
-
did:method:-:_:.
-
-
did:key:zQ3shZc2QzApp2oymGvQbzP8eKheVshBHbU4ZYjeXqwSKEn6N
-
-
did:METHOD:val
-
-
did:m123:val
-
-
DID:method:val
-
did:method:
-
-
did:method:val/two
-
-
did:method:val?two
-
-
did:method:val#two"""
-
-
testMethods["did"] = did.DID
-
-
-
testStrings[
-
"handle"
-
-
] = """jay.bsky.social
-
-
8.cn
-
-
name.t--t
-
-
XX.LCS.MIT.EDU
-
a.co
-
-
xn--notarealidn.com
-
-
xn--fiqa61au8b7zsevnm8ak20mc4a87e.xn--fiqs8s
-
-
xn--ls8h.test
-
example.t
-
-
jo@hn.test
-
-
๐Ÿ’ฉ.tes
-
t
-
john..test
-
-
xn--bcher-.tld
-
-
john.0
-
-
cn.8
-
-
www.maseล‚kowski.pl.com
-
-
org
-
-
name.org.
-
-
2gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion
-
laptop.local
-
-
blah.arpa"""
-
-
testMethods["handle"] = handle.Handle
-
-
-
testStrings[
-
"nsid"
-
-
] = """com.example.fooBar
-
-
net.users.bob.ping
-
-
a-0.b-1.c
-
-
a.b.c
-
-
com.example.fooBarV2
-
-
cn.8.lex.stuff
-
-
com.exa๐Ÿ’ฉple.thin
-
com.example
-
-
com.example.3"""
-
-
testMethods["nsid"] = nsid.NSID
-
-
-
testStrings[
-
-
"rkey"
-
-
] = """3jui7kd54zh2y
-
self
-
example.com
-
-
~1.2-3_
-
-
dHJ1ZQ
-
pre:fix
-
-
_
-
-
alpha/beta
-
.
-
..
-
-
#extra
-
-
@handle
-
-
any space
-
-
any+space
-
-
number[3]
-
-
number(3)
-
-
"quote"
-
-
dHJ1ZQ=="""
-
-
testMethods["rkey"] = rKey.RKey
-
-
-
testStrings[
-
"uri"
-
-
] = """at://foo.com/com.example.foo/123
-
-
at://foo.com/example/123
-
-
at://computer
-
-
at://example.com:3000
-
-
at://foo.com/
-
-
at://user:pass@foo.com"""
-
-
testMethods["uri"] = uri.URI
-
-
-
for item in testMethods:
-
-
print(f"START TEST {item}")
-
-
for value in testStrings[item].splitlines():
-
-
print(f"Value: {value}")
-
-
try:
-
-
print(f"str(): {str(testMethods[item](value))}")
-
-
except Exception as e:
-
-
print(f"ร— {e}")
-
+41
src/atpasser/model/typed.py
···
+
from typing import Any
+
from pydantic import field_serializer
+
from .base import DataModel
+
+
class TypedDataModel(DataModel):
+
"""
+
Model for AT Protocol data with type information.
+
+
Includes support for $type field that specifies Lexicon schema.
+
"""
+
+
type: str | None = None
+
"""Lexicon schema type identifier"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize typed data model with automatic $type handling.
+
+
Args:
+
**data: Data including optional $type field
+
"""
+
# Extract $type if present
+
dataType = data.pop("$type", None)
+
if dataType:
+
data["type"] = dataType
+
super().__init__(**data)
+
+
@field_serializer("type")
+
def serializeType(self, v: str | None) -> dict[str, str] | None:
+
"""
+
Serialize type field to $type object.
+
+
Args:
+
v: Type value to serialize
+
+
Returns:
+
$type object if type is not None
+
"""
+
if v is not None:
+
return {"$type": v}
+
return None
+36
src/atpasser/model/exceptions.py
···
+
class AtprotoModelError(Exception):
+
"""Base exception for all AT Protocol model errors"""
+
pass
+
+
class ValidationError(AtprotoModelError):
+
"""Raised when data validation fails"""
+
def __init__(self, field: str, message: str):
+
self.field = field
+
self.message = message
+
super().__init__(f"Validation error for field '{field}': {message}")
+
+
class SerializationError(AtprotoModelError):
+
"""Raised when data serialization fails"""
+
def __init__(self, field: str, message: str):
+
self.field = field
+
self.message = message
+
super().__init__(f"Serialization error for field '{field}': {message}")
+
+
class DeserializationError(AtprotoModelError):
+
"""Raised when data deserialization fails"""
+
def __init__(self, field: str, message: str):
+
self.field = field
+
self.message = message
+
super().__init__(f"Deserialization error for field '{field}': {message}")
+
+
class InvalidCIDError(AtprotoModelError):
+
"""Raised when CID validation fails"""
+
pass
+
+
class InvalidBlobError(AtprotoModelError):
+
"""Raised when blob validation fails"""
+
pass
+
+
class TypeMismatchError(AtprotoModelError):
+
"""Raised when type validation fails"""
+
pass
+3
src/atpasser/model/__init__.py
···
ProcedureModel,
SubscriptionModel
)
+
from .converter import LexiconConverter
__all__ = [
"DataModel",
···
"QueryModel",
"ProcedureModel",
"SubscriptionModel",
+
# Converter
+
"LexiconConverter",
# Exceptions
"AtprotoModelError",
"ValidationError",
-5
src/atpasser/model/base.py
···
import base64
-
import re
-
from datetime import datetime
from typing import Any
-
from collections.abc import Mapping
from cid.cid import CIDv1, make_cid
from pydantic import BaseModel, field_serializer, field_validator, ConfigDict
-
from pydantic.fields import FieldInfo
from .exceptions import (
-
ValidationError,
SerializationError,
DeserializationError,
InvalidCIDError
+5 -5
src/atpasser/model/blob.py
···
from typing import Any
from pydantic import field_validator, ConfigDict
from .base import DataModel
-
from .exceptions import ValidationError, InvalidBlobError
+
from .exceptions import ValidationError
class BlobModel(DataModel):
"""
···
Validated size
Raises:
-
ValueError: If size is not positive
+
ValidationError: If size is not positive
"""
if v <= 0:
-
raise ValueError("Blob size must be positive and non-zero")
+
raise ValidationError(field="size", message="must be positive and non-zero")
return v
@field_validator("mimeType")
···
Validated MIME type
Raises:
-
ValueError: If MIME type is empty
+
ValidationError: If MIME type is empty
"""
if not v:
-
raise ValueError("MIME type cannot be empty")
+
raise ValidationError(field="mimeType", message="cannot be empty")
return v
+7
src/atpasser/model/converter.py
···
UnknownModel, RecordModel, QueryModel,
ProcedureModel, SubscriptionModel
)
+
from .types.binary import BytesModel, CidLinkModel
+
from .blob import BlobModel
class LexiconConverter:
"""
···
"integer": IntegerModel,
"string": StringModel,
+
# Binary types
+
"bytes": BytesModel,
+
"cid-link": CidLinkModel,
+
"blob": BlobModel,
+
# Complex types
"array": ArrayModel,
"object": ObjectModel,
+214
src/atpasser/model/types/complex.py
···
+
from typing import Any
+
from pydantic import field_validator
+
from ..base import DataModel
+
+
class ArrayModel(DataModel):
+
"""
+
Model for AT Protocol array type.
+
+
Represents an array of elements with support for item schema definition,
+
minimum/maximum length constraints as specified in Lexicon.
+
"""
+
+
items: Any
+
"""Schema definition for array elements"""
+
+
minLength: int | None = None
+
"""Minimum number of elements"""
+
+
maxLength: int | None = None
+
"""Maximum number of elements"""
+
+
value: list[Any]
+
"""Array values"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize array model with validation.
+
+
Args:
+
**data: Input data containing array values
+
+
Raises:
+
ValueError: If array violates constraints
+
"""
+
super().__init__(**data)
+
+
@field_validator("value", mode="before")
+
def validate_array(cls, v: Any) -> list[Any]:
+
"""
+
Validate array structure and elements.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
Validated array
+
+
Raises:
+
ValueError: If array violates constraints
+
"""
+
if not isinstance(v, list):
+
raise ValueError("Value must be an array")
+
+
# Validate length constraints
+
if cls.minLength is not None and len(v) < cls.minLength:
+
raise ValueError(f"Array must have at least {cls.minLength} items")
+
+
if cls.maxLength is not None and len(v) > cls.maxLength:
+
raise ValueError(f"Array must have at most {cls.maxLength} items")
+
+
return v
+
+
class ObjectModel(DataModel):
+
"""
+
Model for AT Protocol object type.
+
+
Represents a generic object schema with properties definitions,
+
required fields and nullable fields as specified in Lexicon.
+
"""
+
+
properties: dict[str, Any]
+
"""Map of property names to their schema definitions"""
+
+
required: list[str] | None = None
+
"""List of required property names"""
+
+
nullable: list[str] | None = None
+
"""List of properties that can be null"""
+
+
value: dict[str, Any]
+
"""Object property values"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize object model with validation.
+
+
Args:
+
**data: Input data containing object properties
+
+
Raises:
+
ValueError: If object violates constraints
+
"""
+
super().__init__(**data)
+
+
@field_validator("value", mode="before")
+
def validate_object(cls, v: Any) -> dict[str, Any]:
+
"""
+
Validate object structure and properties.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
Validated object
+
+
Raises:
+
ValueError: If object violates constraints
+
"""
+
if not isinstance(v, dict):
+
raise ValueError("Value must be an object")
+
+
# Validate required fields
+
if cls.required:
+
for field in cls.required:
+
if field not in v:
+
raise ValueError(f"Missing required field: {field}")
+
+
# Validate nullable fields
+
if cls.nullable:
+
for field, value in v.items():
+
if field not in cls.nullable and value is None:
+
raise ValueError(f"Field {field} cannot be null")
+
+
return v
+
+
class ParamsModel(DataModel):
+
"""
+
Model for AT Protocol params type.
+
+
Specialized for HTTP query parameters with support for boolean,
+
integer, string and unknown types as specified in Lexicon.
+
"""
+
+
required: list[str] | None = None
+
"""List of required parameter names"""
+
+
properties: dict[str, Any]
+
"""Map of parameter names to their schema definitions"""
+
+
value: dict[str, Any]
+
"""Parameter values
+
+
Supported types:
+
- boolean
+
- integer
+
- string
+
- array (of boolean/integer/string/unknown)
+
- unknown (object)
+
"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize params model with validation.
+
+
Args:
+
**data: Input data containing parameter values
+
+
Raises:
+
ValueError: If parameters violate constraints
+
"""
+
super().__init__(**data)
+
+
@field_validator("value", mode="before")
+
def validate_params(cls, v: Any) -> dict[str, Any]:
+
"""
+
Validate parameters structure and values.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
Validated parameters
+
+
Raises:
+
ValueError: If parameters violate constraints
+
"""
+
if not isinstance(v, dict):
+
raise ValueError("Value must be a dictionary of parameters")
+
+
# Validate required parameters
+
if cls.required:
+
for param in cls.required:
+
if param not in v:
+
raise ValueError(f"Missing required parameter: {param}")
+
+
# Validate parameter types
+
for param, value in v.items():
+
if param in cls.properties:
+
prop_type = cls.properties[param].get("type")
+
if prop_type == "boolean" and not isinstance(value, bool):
+
raise ValueError(f"Parameter {param} must be boolean")
+
elif prop_type == "integer" and not isinstance(value, int):
+
raise ValueError(f"Parameter {param} must be integer")
+
elif prop_type == "string" and not isinstance(value, str):
+
raise ValueError(f"Parameter {param} must be string")
+
elif prop_type == "array":
+
if not isinstance(value, list):
+
raise ValueError(f"Parameter {param} must be array")
+
# Validate array items if schema is specified
+
if "items" in cls.properties[param]:
+
item_type = cls.properties[param]["items"].get("type")
+
for item in value:
+
if item_type == "boolean" and not isinstance(item, bool):
+
raise ValueError(f"Array item in {param} must be boolean")
+
elif item_type == "integer" and not isinstance(item, int):
+
raise ValueError(f"Array item in {param} must be integer")
+
elif item_type == "string" and not isinstance(item, str):
+
raise ValueError(f"Array item in {param} must be string")
+
elif item_type == "unknown" and not isinstance(item, dict):
+
raise ValueError(f"Array item in {param} must be object")
+
elif prop_type == "unknown" and not isinstance(value, dict):
+
raise ValueError(f"Parameter {param} must be object")
+
+
return v
+172
src/atpasser/model/types/primitive.py
···
+
from typing import Any
+
from pydantic import field_validator
+
from ..base import DataModel
+
+
class NullModel(DataModel):
+
"""
+
Model for AT Protocol null type.
+
+
Represents a null value in AT Protocol data model. This model ensures proper
+
serialization and validation of null values according to Lexicon specification.
+
"""
+
+
value: None = None
+
"""Always None for null type"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize null model with validation.
+
+
Args:
+
**data: Input data (must be empty or contain only None values)
+
+
Raises:
+
ValueError: If non-null value is provided
+
"""
+
if data and any(v is not None for v in data.values()):
+
raise ValueError("NullModel only accepts None values")
+
super().__init__(**data)
+
+
@field_validator("*", mode="before")
+
def validate_null(cls, v: Any) -> None:
+
"""
+
Validate that value is null.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
None if validation succeeds
+
+
Raises:
+
ValueError: If value is not null
+
"""
+
if v is not None:
+
raise ValueError("NullModel only accepts None values")
+
return None
+
+
class BooleanModel(DataModel):
+
"""
+
Model for AT Protocol boolean type.
+
+
Represents a boolean value in AT Protocol data model with support for
+
default values and constants as specified in Lexicon.
+
"""
+
+
value: bool
+
"""Boolean value"""
+
+
default: bool | None = None
+
"""Default value if not provided"""
+
+
const: bool | None = None
+
"""Fixed constant value if specified"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize boolean model with validation.
+
+
Args:
+
**data: Input data containing boolean value
+
+
Raises:
+
ValueError: If value doesn't match const or is not boolean
+
"""
+
super().__init__(**data)
+
if self.const is not None and self.value != self.const:
+
raise ValueError(f"Boolean value must be {self.const}")
+
+
@field_validator("value", mode="before")
+
def validate_boolean(cls, v: Any) -> bool:
+
"""
+
Validate and convert input to boolean.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
Validated boolean value
+
+
Raises:
+
ValueError: If value cannot be converted to boolean
+
"""
+
if isinstance(v, bool):
+
return v
+
if isinstance(v, str):
+
if v.lower() in ("true", "1"):
+
return True
+
if v.lower() in ("false", "0"):
+
return False
+
raise ValueError("Value must be a boolean")
+
+
class IntegerModel(DataModel):
+
"""
+
Model for AT Protocol integer type.
+
+
Represents a signed integer number with support for minimum/maximum values,
+
enumeration sets, default values and constraints as specified in Lexicon.
+
"""
+
+
value: int
+
"""Integer value"""
+
+
minimum: int | None = None
+
"""Minimum acceptable value"""
+
+
maximum: int | None = None
+
"""Maximum acceptable value"""
+
+
enum: list[int] | None = None
+
"""Closed set of allowed values"""
+
+
default: int | None = None
+
"""Default value if not provided"""
+
+
const: int | None = None
+
"""Fixed constant value if specified"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize integer model with validation.
+
+
Args:
+
**data: Input data containing integer value
+
+
Raises:
+
ValueError: If value violates constraints
+
"""
+
super().__init__(**data)
+
if self.const is not None and self.value != self.const:
+
raise ValueError(f"Integer value must be {self.const}")
+
+
@field_validator("value", mode="before")
+
def validate_integer(cls, v: Any) -> int:
+
"""
+
Validate and convert input to integer.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
Validated integer value
+
+
Raises:
+
ValueError: If value violates constraints
+
"""
+
if not isinstance(v, int):
+
try:
+
v = int(v)
+
except (TypeError, ValueError):
+
raise ValueError("Value must be an integer")
+
+
# Validate against instance attributes
+
if cls.enum and v not in cls.enum:
+
raise ValueError(f"Value must be one of {cls.enum}")
+
+
if cls.minimum is not None and v < cls.minimum:
+
raise ValueError(f"Value must be >= {cls.minimum}")
+
+
if cls.maximum is not None and v > cls.maximum:
+
raise ValueError(f"Value must be <= {cls.maximum}")
+
+
return v
+131
src/atpasser/model/types/reference.py
···
+
from typing import Any
+
from pydantic import field_validator
+
from ..base import DataModel
+
+
class TokenModel(DataModel):
+
"""
+
Model for AT Protocol token type.
+
+
Represents empty data values which exist only to be referenced by name.
+
Tokens encode as string data with the string being the fully-qualified
+
reference to the token itself (NSID followed by optional fragment).
+
"""
+
+
name: str
+
"""Token name/identifier"""
+
+
description: str | None = None
+
"""Description clarifying the meaning of the token"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize token model.
+
+
Args:
+
**data: Input data containing token name
+
"""
+
super().__init__(**data)
+
+
@field_validator("name")
+
def validate_name(cls, v: str) -> str:
+
"""
+
Validate token name format.
+
+
Args:
+
v: Name to validate
+
+
Returns:
+
Validated name
+
+
Raises:
+
ValueError: If name contains whitespace
+
"""
+
if any(c.isspace() for c in v):
+
raise ValueError("Token name must not contain whitespace")
+
return v
+
+
class RefModel(DataModel):
+
"""
+
Model for AT Protocol ref type.
+
+
Represents a reference to another schema definition, either globally
+
(using NSID) or locally (using #-delimited name).
+
"""
+
+
ref: str
+
"""Reference to schema definition (NSID or #name)"""
+
+
description: str | None = None
+
"""Description of the reference"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize reference model.
+
+
Args:
+
**data: Input data containing reference
+
"""
+
super().__init__(**data)
+
+
@field_validator("ref")
+
def validate_ref(cls, v: str) -> str:
+
"""
+
Validate reference format.
+
+
Args:
+
v: Reference to validate
+
+
Returns:
+
Validated reference
+
+
Raises:
+
ValueError: If reference is empty or invalid
+
"""
+
if not v:
+
raise ValueError("Reference cannot be empty")
+
return v
+
+
class UnionModel(DataModel):
+
"""
+
Model for AT Protocol union type.
+
+
Represents that multiple possible types could be present at a location.
+
The references follow the same syntax as `ref`, allowing references to
+
both global or local schema definitions.
+
"""
+
+
refs: list[str]
+
"""References to schema definitions"""
+
+
closed: bool = False
+
"""Indicates if union is open (can be extended) or closed"""
+
+
description: str | None = None
+
"""Description of the union"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize union model.
+
+
Args:
+
**data: Input data containing union references
+
"""
+
super().__init__(**data)
+
+
@field_validator("refs")
+
def validate_refs(cls, v: list[str]) -> list[str]:
+
"""
+
Validate union references.
+
+
Args:
+
v: References to validate
+
+
Returns:
+
Validated references
+
+
Raises:
+
ValueError: If references list is empty for closed union
+
"""
+
if cls.closed and not v:
+
raise ValueError("Closed union must have at least one reference")
+
return v
+323
src/atpasser/model/types/special.py
···
+
from typing import Any
+
from pydantic import field_validator
+
from ..base import DataModel
+
+
class UnknownModel(DataModel):
+
"""
+
Model for AT Protocol unknown type.
+
+
Indicates that any data object could appear at this location,
+
with no specific validation. The top-level data must be an object.
+
"""
+
+
description: str | None = None
+
"""Description of the unknown type usage"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize unknown model.
+
+
Args:
+
**data: Input data containing unknown object
+
"""
+
super().__init__(**data)
+
+
@field_validator("*", mode="before")
+
def validate_unknown(cls, v: Any) -> Any:
+
"""
+
Validate unknown data is an object.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
Validated value
+
+
Raises:
+
ValueError: If value is not an object
+
"""
+
if not isinstance(v, dict):
+
raise ValueError("Unknown type must be an object")
+
return v
+
+
class RecordModel(DataModel):
+
"""
+
Model for AT Protocol record type.
+
+
Describes an object that can be stored in a repository record.
+
Records must include a $type field indicating their schema.
+
"""
+
+
key: str
+
"""Specifies the Record Key type"""
+
+
record: dict[str, Any]
+
"""Schema definition with type 'object'"""
+
+
type: str
+
"""Lexicon schema type identifier"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize record model with validation.
+
+
Args:
+
**data: Input data containing record values
+
+
Raises:
+
ValueError: If record is missing required fields
+
"""
+
# Extract $type if present
+
data_type = data.pop("$type", None)
+
if data_type:
+
data["type"] = data_type
+
super().__init__(**data)
+
+
@field_validator("type")
+
def validate_type(cls, v: str) -> str:
+
"""
+
Validate record type field.
+
+
Args:
+
v: Type value to validate
+
+
Returns:
+
Validated type
+
+
Raises:
+
ValueError: If type is empty
+
"""
+
if not v:
+
raise ValueError("Record must have a type")
+
return v
+
+
@field_validator("record", mode="before")
+
def validate_record(cls, v: Any) -> dict[str, Any]:
+
"""
+
Validate record structure.
+
+
Args:
+
v: Record value to validate
+
+
Returns:
+
Validated record
+
+
Raises:
+
ValueError: If record is not an object
+
"""
+
if not isinstance(v, dict):
+
raise ValueError("Record must be an object")
+
return v
+
+
class QueryModel(DataModel):
+
"""
+
Model for AT Protocol query type.
+
+
Describes an XRPC Query endpoint (HTTP GET) with support for
+
parameters, output schema and error responses.
+
"""
+
+
parameters: dict[str, Any] | None = None
+
"""HTTP query parameters schema"""
+
+
output: dict[str, Any] | None = None
+
"""HTTP response body schema"""
+
+
errors: list[dict[str, str]] | None = None
+
"""Possible error responses"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize query model with validation.
+
+
Args:
+
**data: Input data containing query definition
+
"""
+
super().__init__(**data)
+
+
@field_validator("output")
+
def validate_output(cls, v: dict[str, Any] | None) -> dict[str, Any] | None:
+
"""
+
Validate output schema.
+
+
Args:
+
v: Output schema to validate
+
+
Returns:
+
Validated output schema
+
+
Raises:
+
ValueError: If output schema is invalid
+
"""
+
if v and "encoding" not in v:
+
raise ValueError("Output must specify encoding")
+
return v
+
+
@field_validator("errors")
+
def validate_errors(cls, v: list[dict[str, str]] | None) -> list[dict[str, str]] | None:
+
"""
+
Validate error definitions.
+
+
Args:
+
v: Error definitions to validate
+
+
Returns:
+
Validated error definitions
+
+
Raises:
+
ValueError: If any error definition is invalid
+
"""
+
if v:
+
for error in v:
+
if "name" not in error:
+
raise ValueError("Error must have a name")
+
return v
+
+
class ProcedureModel(DataModel):
+
"""
+
Model for AT Protocol procedure type.
+
+
Describes an XRPC Procedure endpoint (HTTP POST) with support for
+
parameters, input/output schemas and error responses.
+
"""
+
+
parameters: dict[str, Any] | None = None
+
"""HTTP query parameters schema"""
+
+
input: dict[str, Any] | None = None
+
"""HTTP request body schema"""
+
+
output: dict[str, Any] | None = None
+
"""HTTP response body schema"""
+
+
errors: list[dict[str, str]] | None = None
+
"""Possible error responses"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize procedure model with validation.
+
+
Args:
+
**data: Input data containing procedure definition
+
"""
+
super().__init__(**data)
+
+
@field_validator("input")
+
def validate_input(cls, v: dict[str, Any] | None) -> dict[str, Any] | None:
+
"""
+
Validate input schema.
+
+
Args:
+
v: Input schema to validate
+
+
Returns:
+
Validated input schema
+
+
Raises:
+
ValueError: If input schema is invalid
+
"""
+
if v and "encoding" not in v:
+
raise ValueError("Input must specify encoding")
+
return v
+
+
@field_validator("output")
+
def validate_output(cls, v: dict[str, Any] | None) -> dict[str, Any] | None:
+
"""
+
Validate output schema.
+
+
Args:
+
v: Output schema to validate
+
+
Returns:
+
Validated output schema
+
+
Raises:
+
ValueError: If output schema is invalid
+
"""
+
if v and "encoding" not in v:
+
raise ValueError("Output must specify encoding")
+
return v
+
+
@field_validator("errors")
+
def validate_errors(cls, v: list[dict[str, str]] | None) -> list[dict[str, str]] | None:
+
"""
+
Validate error definitions.
+
+
Args:
+
v: Error definitions to validate
+
+
Returns:
+
Validated error definitions
+
+
Raises:
+
ValueError: If any error definition is invalid
+
"""
+
if v:
+
for error in v:
+
if "name" not in error:
+
raise ValueError("Error must have a name")
+
return v
+
+
class SubscriptionModel(DataModel):
+
"""
+
Model for AT Protocol subscription type.
+
+
Describes an Event Stream (WebSocket) with support for parameters,
+
message schemas and error responses.
+
"""
+
+
parameters: dict[str, Any] | None = None
+
"""HTTP query parameters schema"""
+
+
message: dict[str, Any] | None = None
+
"""Specifies what messages can be"""
+
+
errors: list[dict[str, str]] | None = None
+
"""Possible error responses"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize subscription model with validation.
+
+
Args:
+
**data: Input data containing subscription definition
+
"""
+
super().__init__(**data)
+
+
@field_validator("message")
+
def validate_message(cls, v: dict[str, Any] | None) -> dict[str, Any] | None:
+
"""
+
Validate message schema.
+
+
Args:
+
v: Message schema to validate
+
+
Returns:
+
Validated message schema
+
+
Raises:
+
ValueError: If message schema is invalid
+
"""
+
if v and "schema" not in v:
+
raise ValueError("Message must specify schema")
+
return v
+
+
@field_validator("errors")
+
def validate_errors(cls, v: list[dict[str, str]] | None) -> list[dict[str, str]] | None:
+
"""
+
Validate error definitions.
+
+
Args:
+
v: Error definitions to validate
+
+
Returns:
+
Validated error definitions
+
+
Raises:
+
ValueError: If any error definition is invalid
+
"""
+
if v:
+
for error in v:
+
if "name" not in error:
+
raise ValueError("Error must have a name")
+
return v
+249
src/atpasser/model/types/string.py
···
+
from typing import Any
+
import re
+
from datetime import datetime
+
from pydantic import field_validator
+
from ..base import DataModel
+
+
class StringModel(DataModel):
+
"""
+
Model for AT Protocol string type.
+
+
Represents a Unicode string with support for format restrictions, length limits,
+
known values, enumeration sets, default values and constants as specified in Lexicon.
+
"""
+
+
value: str
+
"""String value"""
+
+
format: str | None = None
+
"""String format restriction (e.g. 'datetime', 'uri')"""
+
+
maxLength: int | None = None
+
"""Maximum length in UTF-8 bytes"""
+
+
minLength: int | None = None
+
"""Minimum length in UTF-8 bytes"""
+
+
knownValues: list[str] | None = None
+
"""Suggested/common values (not enforced)"""
+
+
enum: list[str] | None = None
+
"""Closed set of allowed values"""
+
+
default: str | None = None
+
"""Default value if not provided"""
+
+
const: str | None = None
+
"""Fixed constant value if specified"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize string model with validation.
+
+
Args:
+
**data: Input data containing string value
+
+
Raises:
+
ValueError: If value violates constraints
+
"""
+
super().__init__(**data)
+
if self.const is not None and self.value != self.const:
+
raise ValueError(f"String value must be {self.const}")
+
+
@field_validator("value", mode="before")
+
def validate_string(cls, v: Any) -> str:
+
"""
+
Validate and convert input to string.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
Validated string value
+
+
Raises:
+
ValueError: If value violates constraints
+
"""
+
if not isinstance(v, str):
+
v = str(v)
+
+
# Validate length constraints
+
if cls.minLength is not None and len(v.encode()) < cls.minLength:
+
raise ValueError(f"String must be at least {cls.minLength} bytes")
+
+
if cls.maxLength is not None and len(v.encode()) > cls.maxLength:
+
raise ValueError(f"String must be at most {cls.maxLength} bytes")
+
+
# Validate enum
+
if cls.enum and v not in cls.enum:
+
raise ValueError(f"Value must be one of {cls.enum}")
+
+
# Validate format if specified
+
if cls.format:
+
if cls.format == "datetime":
+
cls._validate_datetime(v)
+
elif cls.format == "uri":
+
cls._validate_uri(v)
+
elif cls.format == "did":
+
cls._validate_did(v)
+
elif cls.format == "handle":
+
cls._validate_handle(v)
+
elif cls.format == "at-identifier":
+
cls._validate_at_identifier(v)
+
elif cls.format == "at-uri":
+
cls._validate_at_uri(v)
+
elif cls.format == "cid":
+
cls._validate_cid(v)
+
elif cls.format == "nsid":
+
cls._validate_nsid(v)
+
elif cls.format == "tid":
+
cls._validate_tid(v)
+
elif cls.format == "record-key":
+
cls._validate_record_key(v)
+
elif cls.format == "language":
+
cls._validate_language(v)
+
+
return v
+
+
@classmethod
+
def _validate_datetime(cls, v: str) -> None:
+
"""Validate RFC 3339 datetime format"""
+
try:
+
datetime.fromisoformat(v.replace("Z", "+00:00"))
+
except ValueError:
+
raise ValueError("Invalid datetime format, must be RFC 3339")
+
+
@classmethod
+
def _validate_uri(cls, v: str) -> None:
+
"""Validate URI format"""
+
if len(v) > 8192: # 8KB max
+
raise ValueError("URI too long, max 8KB")
+
if not re.match(r"^[a-zA-Z][a-zA-Z0-9+.-]*:.+", v):
+
raise ValueError("Invalid URI format")
+
+
@classmethod
+
def _validate_did(cls, v: str) -> None:
+
"""Validate DID format"""
+
if len(v) > 2048:
+
raise ValueError("DID too long, max 2048 chars")
+
if not re.match(r"^did:[a-z]+:[a-zA-Z0-9._:%-]*[a-zA-Z0-9._-]$", v):
+
raise ValueError("Invalid URI format")
+
+
@classmethod
+
def _validate_handle(cls, v: str) -> None:
+
"""Validate handle format"""
+
if not re.match(r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$", v):
+
raise ValueError("Handle contains invalid characters")
+
if len(v) > 253:
+
raise ValueError("Handle too long, max 253 chars")
+
+
@classmethod
+
def _validate_at_identifier(cls, v: str) -> None:
+
"""Validate at-identifier format (DID or handle)"""
+
try:
+
if v.startswith("did:"):
+
cls._validate_did(v)
+
else:
+
cls._validate_handle(v)
+
except ValueError as e:
+
raise ValueError(f"Invalid at-identifier: {e}")
+
+
@classmethod
+
def _validate_at_uri(cls, v: str) -> None:
+
"""
+
Validate AT-URI format according to AT Protocol specification.
+
+
Args:
+
v: AT-URI string to validate
+
+
Raises:
+
ValueError: If URI violates any of these rules:
+
- Must start with 'at://'
+
- Max length 8KB
+
- No trailing slash
+
- Authority must be valid DID or handle
+
- Path segments must follow NSID/RKEY rules if present
+
"""
+
if not v.startswith("at://"):
+
raise ValueError("AT-URI must start with 'at://'")
+
if len(v) > 8192: # 8KB
+
raise ValueError("AT-URI too long, max 8KB")
+
if v.endswith('/'):
+
raise ValueError("AT-URI cannot have trailing slash")
+
+
# Split into parts
+
parts = v[5:].split('/') # Skip 'at://'
+
authority = parts[0]
+
+
# Validate authority (DID or handle)
+
if not authority:
+
raise ValueError("AT-URI must have authority")
+
+
if authority.startswith('did:'):
+
# Basic DID format check - actual DID validation is done elsewhere
+
if len(authority) > 2048:
+
raise ValueError("DID too long")
+
if ':' not in authority[4:]:
+
raise ValueError("Invalid DID format")
+
else:
+
# Handle validation
+
if not re.match(r'^[a-z0-9.-]+$', authority):
+
raise ValueError("Invalid handle characters")
+
if len(authority) > 253:
+
raise ValueError("Handle too long")
+
+
# Validate path segments if present
+
if len(parts) > 1:
+
if len(parts) > 3:
+
raise ValueError("AT-URI path too deep")
+
+
collection = parts[1]
+
if not re.match(r'^[a-zA-Z0-9.-]+$', collection):
+
raise ValueError("Invalid collection NSID")
+
+
if len(parts) > 2:
+
rkey = parts[2]
+
if not rkey:
+
raise ValueError("Record key cannot be empty")
+
if not re.match(r'^[a-zA-Z0-9._:%-~]+$', rkey):
+
raise ValueError("Invalid record key characters")
+
+
@classmethod
+
def _validate_cid(cls, v: str) -> None:
+
"""Validate CID string format"""
+
if len(v) > 100:
+
raise ValueError("CID too long, max 100 chars")
+
if not re.match(r"^[a-zA-Z0-9]+$", v):
+
raise ValueError("CID contains invalid characters")
+
+
@classmethod
+
def _validate_nsid(cls, v: str) -> None:
+
"""Validate NSID format"""
+
if len(v) > 317:
+
raise ValueError("NSID too long, max 317 chars")
+
if not re.match(r"^[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+(\.[a-zA-Z]([a-zA-Z0-9]{0,62})?)$", v):
+
raise ValueError("NSID contains invalid characters")
+
+
@classmethod
+
def _validate_tid(cls, v: str) -> None:
+
"""Validate TID format"""
+
if len(v) > 13:
+
raise ValueError("TID too long, max 13 chars")
+
if not re.match(r"^[234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{12}$", v):
+
raise ValueError("TID contains invalid characters")
+
+
@classmethod
+
def _validate_record_key(cls, v: str) -> None:
+
"""Validate record-key format"""
+
if len(v) > 512:
+
raise ValueError("Record key too long, max 512 chars")
+
if v == "." or v == "..":
+
raise ValueError(f"Record key is {v}, which is not allowed")
+
if not re.match(r"^[a-zA-Z0-9._:%-~]+$", v):
+
raise ValueError("Record key contains invalid characters")
+
+
@classmethod
+
def _validate_language(cls, v: str) -> None:
+
"""Validate BCP 47 language tag"""
+
if not re.match(r"^[a-zA-Z]{1,8}(-[a-zA-Z0-9]{1,8})*$", v):
+
raise ValueError("Invalid language tag format")