formatted

+5 -3
src/atpasser/blob/__init__.py
···
import cid
import multihash, hashlib
+
def generateCID(file):
hasher = hashlib.new("sha-256")
while True:
chunk = file.read(8192)
-
if not chunk: break
+
if not chunk:
+
break
hasher.update(chunk)
-
+
digest = hasher.digest
mh = multihash.encode(digest, "sha-256")
-
return cid.CIDv1(codec='raw', multihash=mh)
+
return cid.CIDv1(codec="raw", multihash=mh)
+1 -1
src/atpasser/data/__init__.py
···
"dumps",
"load",
"loads",
-
]
+
]
+41 -37
src/atpasser/data/decoder.py
···
class JsonDecoder(json.JSONDecoder):
"""A JSON decoder that supports ATProto data types.
-
+
This decoder extends the standard JSON decoder to handle ATProto-specific
data types, including bytes, CID links, and typed objects.
-
+
Attributes:
type_hook_registry: Registry for type-specific hooks.
encoding: The encoding to use for string deserialization.
"""
-
+
def __init__(
self,
*,
···
type_hook_registry: Optional[Any] = None,
type_processor_registry: Optional[Any] = None,
encoding: str = "utf-8",
-
**kwargs: Any
+
**kwargs: Any,
) -> None:
"""Initialize the JSON decoder.
-
+
Args:
object_hook: Optional function to call with each decoded object.
type_hook_registry: Registry for type-specific hooks.
···
type_hook_registry = type_processor_registry.to_hook_registry()
elif type_hook_registry is None:
from .hooks import get_global_registry
+
type_hook_registry = get_global_registry()
-
+
# Create a combined object hook that calls both the custom hook and our hook
combined_hook = self._create_combined_hook(object_hook, type_hook_registry)
-
+
super().__init__(object_hook=combined_hook, **kwargs)
self.type_hook_registry = type_hook_registry
self.type_processor_registry = type_processor_registry
self.encoding = encoding
-
+
def _create_combined_hook(
self,
custom_hook: Optional[Callable[[Dict[str, Any]], Any]],
-
type_hook_registry: Optional[Any]
+
type_hook_registry: Optional[Any],
) -> Callable[[Dict[str, Any]], Any]:
"""Create a combined object hook function.
-
+
Args:
custom_hook: Optional custom object hook function.
type_hook_registry: Registry for type-specific hooks.
-
+
Returns:
A combined object hook function.
"""
+
def combined_hook(obj: Dict[str, Any]) -> Any:
# First, apply our ATProto-specific decoding
decoded_obj = self._atproto_object_hook(obj)
-
+
# Then, apply the custom hook if provided
if custom_hook is not None:
decoded_obj = custom_hook(decoded_obj)
-
+
return decoded_obj
-
+
return combined_hook
-
+
def _atproto_object_hook(self, obj: Dict[str, Any]) -> Any:
"""Handle ATProto-specific object decoding.
-
+
Args:
obj: The object to decode.
-
+
Returns:
The decoded object.
"""
···
# If there are other keys, this is invalid
raise ValueError(f"Invalid $bytes object: {obj}")
return base64.b64decode(obj["$bytes"].encode(self.encoding))
-
+
# Handle $link key (CID parsing)
elif "$link" in obj:
if len(obj) != 1:
# If there are other keys, this is invalid
raise ValueError(f"Invalid $link object: {obj}")
return make_cid(obj["$link"])
-
+
# Handle $type key (typed objects)
elif "$type" in obj:
type_value = obj["$type"]
remaining_obj = {k: v for k, v in obj.items() if k != "$type"}
-
+
# Check if there's a registered type handler
if self.type_hook_registry is not None:
handler = self.type_hook_registry.get_handler(type_value)
if handler is not None:
return handler(remaining_obj)
-
+
# If no handler is registered, return a typed object
return TypedObject(type_value, remaining_obj)
-
+
# Handle nested objects recursively
elif isinstance(obj, dict):
-
return {k: self._atproto_object_hook(v) if isinstance(v, dict) else v
-
for k, v in obj.items()}
-
+
return {
+
k: self._atproto_object_hook(v) if isinstance(v, dict) else v
+
for k, v in obj.items()
+
}
+
return obj
class TypedObject:
"""A typed object in the ATProto data model.
-
+
This class represents an object with a $type field in the ATProto data model.
-
+
Attributes:
type: The type of the object.
data: The data associated with the object.
"""
-
+
def __init__(self, type_name: str, data: Dict[str, Any]) -> None:
"""Initialize a typed object.
-
+
Args:
type_name: The type of the object.
data: The data associated with the object.
"""
self.type_name = type_name
self.data = data
-
+
def __repr__(self) -> str:
"""Return a string representation of the typed object.
-
+
Returns:
A string representation of the typed object.
"""
return f"TypedObject(type_name={self.type_name!r}, data={self.data!r})"
-
+
def __eq__(self, other: Any) -> bool:
"""Check if two typed objects are equal.
-
+
Args:
other: The object to compare with.
-
+
Returns:
True if the objects are equal, False otherwise.
"""
if not isinstance(other, TypedObject):
return False
return self.type_name == other.type_name and self.data == other.data
-
+
def __atproto_json_encode__(self) -> Dict[str, Any]:
"""Encode the typed object to a JSON-serializable format.
-
+
Returns:
A JSON-serializable representation of the typed object.
"""
result = {"$type": self.type_name}
result.update(self.data)
-
return result
+
return result
+10 -10
src/atpasser/data/encoder.py
···
class JsonEncoder(json.JSONEncoder):
"""A JSON encoder that supports ATProto data types.
-
+
This encoder extends the standard JSON encoder to handle ATProto-specific
data types, including bytes, CID links, and typed objects.
-
+
Attributes:
encoding (str): The encoding to use for string serialization.
type_processor_registry: Registry for type-specific processors.
"""
-
+
def __init__(
self,
*,
encoding: str = "utf-8",
type_processor_registry: Optional[Any] = None,
-
**kwargs: Any
+
**kwargs: Any,
) -> None:
"""Initialize the JSON encoder.
-
+
Args:
encoding: The encoding to use for string serialization.
type_processor_registry: Registry for type-specific processors.
···
super().__init__(**kwargs)
self.encoding = encoding
self.type_processor_registry = type_processor_registry
-
+
def default(self, o: Any) -> Any:
"""Convert an object to a serializable format.
-
+
Args:
o: The object to serialize.
-
+
Returns:
A serializable representation of the object.
-
+
Raises:
TypeError: If the object is not serializable.
"""
···
return [self.default(item) for item in o]
else:
# Use the parent class for other types
-
return super().default(o)
+
return super().default(o)
+51 -46
src/atpasser/data/hooks.py
···
from typing import Any, Callable, Dict, Optional, TypeVar, Union
# Type variable for the decorated function
-
F = TypeVar('F', bound=Callable[..., Any])
+
F = TypeVar("F", bound=Callable[..., Any])
class TypeHookRegistry:
"""Registry for type-specific hooks in the ATProto JSON decoder.
-
+
This class maintains a registry of type-specific hooks that can be used
to customize the decoding of objects with $type keys in the ATProto data model.
-
+
Attributes:
_handlers: Dictionary mapping type names to handler functions.
"""
-
+
def __init__(self) -> None:
"""Initialize the type hook registry."""
self._handlers: Dict[str, Callable[[Dict[str, Any]], Any]] = {}
-
+
def register(self, type_name: str) -> Callable[[F], F]:
"""Register a type handler function.
-
+
This method can be used as a decorator to register a function as a handler
for a specific type.
-
+
Args:
type_name: The name of the type to handle.
-
+
Returns:
A decorator function that registers the decorated function as a handler.
-
+
Example:
>>> registry = TypeHookRegistry()
-
>>>
+
>>>
>>> @registry.register("app.bsky.feed.post")
... def handle_post(data: Dict[str, Any]) -> Any:
... return Post(**data)
"""
+
def decorator(func: F) -> F:
self._handlers[type_name] = func
return func
-
+
return decorator
-
-
def register_handler(self, type_name: str, handler: Callable[[Dict[str, Any]], Any]) -> None:
+
+
def register_handler(
+
self, type_name: str, handler: Callable[[Dict[str, Any]], Any]
+
) -> None:
"""Register a type handler function directly.
-
+
Args:
type_name: The name of the type to handle.
handler: The function to call when decoding objects of this type.
-
+
Example:
>>> registry = TypeHookRegistry()
-
>>>
+
>>>
>>> def handle_post(data: Dict[str, Any]) -> Any:
... return Post(**data)
-
>>>
+
>>>
>>> registry.register_handler("app.bsky.feed.post", handle_post)
"""
self._handlers[type_name] = handler
-
+
def unregister(self, type_name: str) -> None:
"""Unregister a type handler function.
-
+
Args:
type_name: The name of the type to unregister.
"""
if type_name in self._handlers:
del self._handlers[type_name]
-
+
def get_handler(self, type_name: str) -> Optional[Callable[[Dict[str, Any]], Any]]:
"""Get the handler function for a specific type.
-
+
Args:
type_name: The name of the type to get the handler for.
-
+
Returns:
The handler function for the specified type, or None if no handler
is registered.
"""
return self._handlers.get(type_name)
-
+
def has_handler(self, type_name: str) -> bool:
"""Check if a handler is registered for a specific type.
-
+
Args:
type_name: The name of the type to check.
-
+
Returns:
True if a handler is registered for the specified type, False otherwise.
"""
return type_name in self._handlers
-
+
def clear(self) -> None:
"""Clear all registered handlers."""
self._handlers.clear()
-
+
def get_registered_types(self) -> set:
"""Get the set of all registered type names.
-
+
Returns:
A set of all registered type names.
"""
···
def type_handler(type_name: str) -> Callable[[F], F]:
"""Register a global type handler function.
-
+
This decorator registers a function as a global handler for a specific type
in the ATProto data model.
-
+
Args:
type_name: The name of the type to handle.
-
+
Returns:
A decorator function that registers the decorated function as a handler.
-
+
Example:
>>> @type_handler("app.bsky.feed.post")
... def handle_post(data: Dict[str, Any]) -> Any:
···
def get_global_registry() -> TypeHookRegistry:
"""Get the global type hook registry.
-
+
Returns:
The global TypeHookRegistry instance.
"""
return _global_registry
-
def register_type_handler(type_name: str, handler: Callable[[Dict[str, Any]], Any]) -> None:
+
def register_type_handler(
+
type_name: str, handler: Callable[[Dict[str, Any]], Any]
+
) -> None:
"""Register a global type handler function directly.
-
+
Args:
type_name: The name of the type to handle.
handler: The function to call when decoding objects of this type.
-
+
Example:
>>> def handle_post(data: Dict[str, Any]) -> Any:
... return Post(**data)
-
>>>
+
>>>
>>> register_type_handler("app.bsky.feed.post", handle_post)
"""
_global_registry.register_handler(type_name, handler)
···
def unregister_type_handler(type_name: str) -> None:
"""Unregister a global type handler function.
-
+
Args:
type_name: The name of the type to unregister.
"""
···
def get_type_handler(type_name: str) -> Optional[Callable[[Dict[str, Any]], Any]]:
"""Get the global handler function for a specific type.
-
+
Args:
type_name: The name of the type to get the handler for.
-
+
Returns:
The handler function for the specified type, or None if no handler
is registered.
···
def has_type_handler(type_name: str) -> bool:
"""Check if a global handler is registered for a specific type.
-
+
Args:
type_name: The name of the type to check.
-
+
Returns:
True if a handler is registered for the specified type, False otherwise.
"""
···
def get_registered_types() -> set:
"""Get the set of all globally registered type names.
-
+
Returns:
A set of all registered type names.
"""
···
def create_registry() -> TypeHookRegistry:
"""Create a new type hook registry.
-
+
This function creates a new, independent registry that can be used
instead of the global registry.
-
+
Returns:
A new TypeHookRegistry instance.
"""
-
return TypeHookRegistry()
+
return TypeHookRegistry()
+120 -114
src/atpasser/data/types.py
···
from .hooks import TypeHookRegistry
# Type variable for the decorated class
-
T = TypeVar('T')
+
T = TypeVar("T")
class TypeProcessor:
"""A type processor for ATProto JSON objects.
-
+
This class represents a processor for a specific type in the ATProto data model.
It contains information about how to convert JSON data to Python objects and
vice versa.
-
+
Attributes:
type_name: The name of the type this processor handles.
decoder: The function to decode JSON data to a Python object.
encoder: The function to encode a Python object to JSON data.
priority: The priority of this processor (higher values = higher priority).
"""
-
+
def __init__(
self,
type_name: str,
decoder: Optional[Callable[[Dict[str, Any]], Any]] = None,
encoder: Optional[Callable[[Any], Dict[str, Any]]] = None,
-
priority: int = 0
+
priority: int = 0,
) -> None:
"""Initialize a type processor.
-
+
Args:
type_name: The name of the type this processor handles.
decoder: The function to decode JSON data to a Python object.
···
self.decoder = decoder
self.encoder = encoder
self.priority = priority
-
+
def decode(self, data: Dict[str, Any]) -> Any:
"""Decode JSON data to a Python object.
-
+
Args:
data: The JSON data to decode.
-
+
Returns:
The decoded Python object.
-
+
Raises:
ValueError: If no decoder is registered.
"""
if self.decoder is None:
raise ValueError(f"No decoder registered for type {self.type_name}")
return self.decoder(data)
-
+
def encode(self, obj: Any) -> Dict[str, Any]:
"""Encode a Python object to JSON data.
-
+
Args:
obj: The Python object to encode.
-
+
Returns:
The encoded JSON data.
-
+
Raises:
ValueError: If no encoder is registered.
"""
···
class TypeProcessorRegistry:
"""Registry for type processors in the ATProto JSON decoder.
-
+
This class maintains a registry of type processors that can be used
to customize the encoding and decoding of objects with $type keys in
the ATProto data model.
-
+
Attributes:
_processors: Dictionary mapping type names to processor lists.
"""
-
+
def __init__(self) -> None:
"""Initialize the type processor registry."""
self._processors: Dict[str, List[TypeProcessor]] = {}
-
+
def register_processor(self, processor: TypeProcessor) -> None:
"""Register a type processor.
-
+
Args:
processor: The type processor to register.
"""
if processor.type_name not in self._processors:
self._processors[processor.type_name] = []
-
+
self._processors[processor.type_name].append(processor)
# Sort processors by priority (descending)
-
self._processors[processor.type_name].sort(key=lambda p: p.priority, reverse=True)
-
+
self._processors[processor.type_name].sort(
+
key=lambda p: p.priority, reverse=True
+
)
+
def register(
-
self,
-
type_name: str,
-
priority: int = 0
+
self, type_name: str, priority: int = 0
) -> Callable[[Callable[[Dict[str, Any]], Any]], Callable[[Dict[str, Any]], Any]]:
"""Register a type decoder function.
-
+
This method can be used as a decorator to register a function as a decoder
for a specific type.
-
+
Args:
type_name: The name of the type to handle.
priority: The priority of this processor (higher values = higher priority).
-
+
Returns:
A decorator function that registers the decorated function as a decoder.
-
+
Example:
>>> registry = TypeProcessorRegistry()
-
>>>
+
>>>
>>> @registry.register("app.bsky.feed.post", priority=10)
... def decode_post(data: Dict[str, Any]) -> Any:
... return Post(**data)
"""
-
def decorator(func: Callable[[Dict[str, Any]], Any]) -> Callable[[Dict[str, Any]], Any]:
+
+
def decorator(
+
func: Callable[[Dict[str, Any]], Any],
+
) -> Callable[[Dict[str, Any]], Any]:
processor = TypeProcessor(type_name, decoder=func, priority=priority)
self.register_processor(processor)
return func
-
+
return decorator
-
+
def register_encoder(
-
self,
-
type_name: str,
-
priority: int = 0
+
self, type_name: str, priority: int = 0
) -> Callable[[Callable[[Any], Dict[str, Any]]], Callable[[Any], Dict[str, Any]]]:
"""Register a type encoder function.
-
+
This method can be used as a decorator to register a function as an encoder
for a specific type.
-
+
Args:
type_name: The name of the type to handle.
priority: The priority of this processor (higher values = higher priority).
-
+
Returns:
A decorator function that registers the decorated function as an encoder.
-
+
Example:
>>> registry = TypeProcessorRegistry()
-
>>>
+
>>>
>>> @registry.register_encoder("app.bsky.feed.post", priority=10)
... def encode_post(post: Post) -> Dict[str, Any]:
... return {"text": post.text, "createdAt": post.created_at}
"""
-
def decorator(func: Callable[[Any], Dict[str, Any]]) -> Callable[[Any], Dict[str, Any]]:
+
+
def decorator(
+
func: Callable[[Any], Dict[str, Any]],
+
) -> Callable[[Any], Dict[str, Any]]:
# Check if a processor for this type already exists
if type_name in self._processors:
for processor in self._processors[type_name]:
···
break
else:
# No decoder found, create a new processor
-
processor = TypeProcessor(type_name, encoder=func, priority=priority)
+
processor = TypeProcessor(
+
type_name, encoder=func, priority=priority
+
)
self.register_processor(processor)
else:
# No processor exists, create a new one
processor = TypeProcessor(type_name, encoder=func, priority=priority)
self.register_processor(processor)
-
+
return func
-
+
return decorator
-
+
def register_class(
-
self,
-
type_name: str,
-
priority: int = 0
+
self, type_name: str, priority: int = 0
) -> Callable[[Type[T]], Type[T]]:
"""Register a class for both encoding and decoding.
-
+
This method can be used as a decorator to register a class for both
encoding and decoding of a specific type.
-
+
The class must have a class method `from_json` that takes a dictionary
and returns an instance of the class, and an instance method `to_json`
that returns a dictionary.
-
+
Args:
type_name: The name of the type to handle.
priority: The priority of this processor (higher values = higher priority).
-
+
Returns:
A decorator function that registers the decorated class.
-
+
Example:
>>> registry = TypeProcessorRegistry()
-
>>>
+
>>>
>>> @registry.register_class("app.bsky.feed.post", priority=10)
... class Post:
... def __init__(self, text: str, created_at: str) -> None:
... self.text = text
... self.created_at = created_at
-
...
+
...
... @classmethod
... def from_json(cls, data: Dict[str, Any]) -> "Post":
... return cls(data["text"], data["createdAt"])
-
...
+
...
... def to_json(self) -> Dict[str, Any]:
... return {"text": self.text, "createdAt": self.created_at}
"""
+
def decorator(cls: Type[T]) -> Type[T]:
# Create decoder from class method
if hasattr(cls, "from_json"):
···
# Create a decoder that passes the data as keyword arguments
decoder = lambda data: cls(**data)
else:
-
raise ValueError(f"Class {cls.__name__} has no from_json method or compatible __init__")
-
+
raise ValueError(
+
f"Class {cls.__name__} has no from_json method or compatible __init__"
+
)
+
# Create encoder from instance method
if hasattr(cls, "to_json"):
encoder = lambda obj: obj.to_json()
else:
raise ValueError(f"Class {cls.__name__} has no to_json method")
-
+
# Register the processor
-
processor = TypeProcessor(type_name, decoder=decoder, encoder=encoder, priority=priority)
+
processor = TypeProcessor(
+
type_name, decoder=decoder, encoder=encoder, priority=priority
+
)
self.register_processor(processor)
-
+
return cls
-
+
return decorator
-
+
def unregister(self, type_name: str, priority: Optional[int] = None) -> None:
"""Unregister type processors.
-
+
Args:
type_name: The name of the type to unregister.
priority: If specified, only unregister processors with this priority.
···
else:
# Remove all processors for this type
del self._processors[type_name]
-
+
def get_decoder(self, type_name: str) -> Optional[Callable[[Dict[str, Any]], Any]]:
"""Get the decoder function for a specific type.
-
+
Args:
type_name: The name of the type to get the decoder for.
-
+
Returns:
The decoder function for the specified type, or None if no decoder
is registered.
···
# Return the decoder of the highest priority processor
return self._processors[type_name][0].decoder
return None
-
+
def get_encoder(self, type_name: str) -> Optional[Callable[[Any], Dict[str, Any]]]:
"""Get the encoder function for a specific type.
-
+
Args:
type_name: The name of the type to get the encoder for.
-
+
Returns:
The encoder function for the specified type, or None if no encoder
is registered.
···
# Return the encoder of the highest priority processor
return self._processors[type_name][0].encoder
return None
-
+
def has_processor(self, type_name: str) -> bool:
"""Check if a processor is registered for a specific type.
-
+
Args:
type_name: The name of the type to check.
-
+
Returns:
True if a processor is registered for the specified type, False otherwise.
"""
return type_name in self._processors and bool(self._processors[type_name])
-
+
def clear(self) -> None:
"""Clear all registered processors."""
self._processors.clear()
-
+
def get_registered_types(self) -> set:
"""Get the set of all registered type names.
-
+
Returns:
A set of all registered type names.
"""
return set(self._processors.keys())
-
+
def to_hook_registry(self) -> TypeHookRegistry:
"""Convert this processor registry to a hook registry.
-
+
This method creates a TypeHookRegistry that uses the decoders from
this processor registry.
-
+
Returns:
A TypeHookRegistry with the same decoders as this processor registry.
"""
hook_registry = TypeHookRegistry()
-
+
for type_name, processors in self._processors.items():
if processors and processors[0].decoder is not None:
hook_registry.register_handler(type_name, processors[0].decoder)
-
+
return hook_registry
···
def register_type(
-
type_name: str,
-
priority: int = 0
+
type_name: str, priority: int = 0
) -> Callable[[Callable[[Dict[str, Any]], Any]], Callable[[Dict[str, Any]], Any]]:
"""Register a global type decoder function.
-
+
This decorator registers a function as a global decoder for a specific type
in the ATProto data model.
-
+
Args:
type_name: The name of the type to handle.
priority: The priority of this processor (higher values = higher priority).
-
+
Returns:
A decorator function that registers the decorated function as a decoder.
-
+
Example:
>>> @register_type("app.bsky.feed.post", priority=10)
... def decode_post(data: Dict[str, Any]) -> Any:
···
def get_global_processor_registry() -> TypeProcessorRegistry:
"""Get the global type processor registry.
-
+
Returns:
The global TypeProcessorRegistry instance.
"""
···
def register_type_encoder(
-
type_name: str,
-
priority: int = 0
+
type_name: str, priority: int = 0
) -> Callable[[Callable[[Any], Dict[str, Any]]], Callable[[Any], Dict[str, Any]]]:
"""Register a global type encoder function.
-
+
This decorator registers a function as a global encoder for a specific type
in the ATProto data model.
-
+
Args:
type_name: The name of the type to handle.
priority: The priority of this processor (higher values = higher priority).
-
+
Returns:
A decorator function that registers the decorated function as an encoder.
-
+
Example:
>>> @register_type_encoder("app.bsky.feed.post", priority=10)
... def encode_post(post: Post) -> Dict[str, Any]:
···
def register_type_class(
-
type_name: str,
-
priority: int = 0
+
type_name: str, priority: int = 0
) -> Callable[[Type[T]], Type[T]]:
"""Register a class for both global encoding and decoding.
-
+
This decorator registers a class for both encoding and decoding of a specific type
in the ATProto data model.
-
+
Args:
type_name: The name of the type to handle.
priority: The priority of this processor (higher values = higher priority).
-
+
Returns:
A decorator function that registers the decorated class.
-
+
Example:
>>> @register_type_class("app.bsky.feed.post", priority=10)
... class Post:
... def __init__(self, text: str, created_at: str) -> None:
... self.text = text
... self.created_at = created_at
-
...
+
...
... @classmethod
... def from_json(cls, data: Dict[str, Any]) -> "Post":
... return cls(data["text"], data["createdAt"])
-
...
+
...
... def to_json(self) -> Dict[str, Any]:
... return {"text": self.text, "createdAt": self.created_at}
"""
···
def unregister_type(type_name: str, priority: Optional[int] = None) -> None:
"""Unregister global type processors.
-
+
Args:
type_name: The name of the type to unregister.
priority: If specified, only unregister processors with this priority.
···
def get_type_decoder(type_name: str) -> Optional[Callable[[Dict[str, Any]], Any]]:
"""Get the global decoder function for a specific type.
-
+
Args:
type_name: The name of the type to get the decoder for.
-
+
Returns:
The decoder function for the specified type, or None if no decoder
is registered.
···
def get_type_encoder(type_name: str) -> Optional[Callable[[Any], Dict[str, Any]]]:
"""Get the global encoder function for a specific type.
-
+
Args:
type_name: The name of the type to get the encoder for.
-
+
Returns:
The encoder function for the specified type, or None if no encoder
is registered.
···
def has_type_processor(type_name: str) -> bool:
"""Check if a global processor is registered for a specific type.
-
+
Args:
type_name: The name of the type to check.
-
+
Returns:
True if a processor is registered for the specified type, False otherwise.
"""
···
def get_registered_types() -> set:
"""Get the set of all globally registered type names.
-
+
Returns:
A set of all registered type names.
"""
···
def create_processor_registry() -> TypeProcessorRegistry:
"""Create a new type processor registry.
-
+
This function creates a new, independent registry that can be used
instead of the global registry.
-
+
Returns:
A new TypeProcessorRegistry instance.
"""
-
return TypeProcessorRegistry()
+
return TypeProcessorRegistry()
+42 -35
src/atpasser/data/wrapper.py
···
sort_keys: bool = False,
encoding: str = "utf-8",
type_processor_registry: Optional[TypeProcessorRegistry] = None,
-
**kwargs: Any
+
**kwargs: Any,
) -> None:
"""Serialize obj as a JSON formatted stream to fp.
-
+
This function is similar to json.dump() but supports ATProto-specific
data types, including bytes, CID links, and typed objects.
-
+
Args:
obj: The object to serialize.
fp: A file-like object with a write() method.
···
"""
if cls is None:
cls = JsonEncoder
-
+
# Use the global type processor registry if none is provided
if type_processor_registry is None:
from .types import get_global_processor_registry
+
type_processor_registry = get_global_processor_registry()
-
+
# Create an encoder instance with the specified encoding and type processor registry
-
encoder = cls(encoding=encoding, type_processor_registry=type_processor_registry, **kwargs)
-
+
encoder = cls(
+
encoding=encoding, type_processor_registry=type_processor_registry, **kwargs
+
)
+
# Use the standard json.dump with our custom encoder
json.dump(
obj,
···
separators=separators,
default=default,
sort_keys=sort_keys,
-
**kwargs
+
**kwargs,
)
···
sort_keys: bool = False,
encoding: str = "utf-8",
type_processor_registry: Optional[TypeProcessorRegistry] = None,
-
**kwargs: Any
+
**kwargs: Any,
) -> str:
"""Serialize obj to a JSON formatted string.
-
+
This function is similar to json.dumps() but supports ATProto-specific
data types, including bytes, CID links, and typed objects.
-
+
Args:
obj: The object to serialize.
skipkeys: If True, dict keys that are not basic types (str, int, float,
···
encoding: The encoding to use for string serialization.
type_processor_registry: Registry for type-specific processors.
**kwargs: Additional keyword arguments to pass to the JSON encoder.
-
+
Returns:
A JSON formatted string.
"""
if cls is None:
cls = JsonEncoder
-
+
# Create an encoder instance with the specified encoding and type processor registry
-
encoder = cls(encoding=encoding, type_processor_registry=type_processor_registry, **kwargs)
-
+
encoder = cls(
+
encoding=encoding, type_processor_registry=type_processor_registry, **kwargs
+
)
+
# Use the standard json.dumps with our custom encoder
return json.dumps(
obj,
···
separators=separators,
default=default,
sort_keys=sort_keys,
-
**kwargs
+
**kwargs,
)
···
type_hook_registry: Optional[TypeHookRegistry] = None,
type_processor_registry: Optional[TypeProcessorRegistry] = None,
encoding: str = "utf-8",
-
**kwargs: Any
+
**kwargs: Any,
) -> Any:
"""Deserialize fp (a .read()-supporting text file or binary file containing
a JSON document) to a Python object.
-
+
This function is similar to json.load() but supports ATProto-specific
data types, including bytes, CID links, and typed objects.
-
+
Args:
fp: A .read()-supporting text file or binary file containing a JSON document.
cls: A custom JSONDecoder subclass. If not specified, JsonDecoder is used.
···
type_processor_registry: Registry for type-specific processors.
encoding: The encoding to use for string deserialization.
**kwargs: Additional keyword arguments to pass to the JSON decoder.
-
+
Returns:
A Python object.
"""
if cls is None:
cls = JsonDecoder
-
+
# Use the global type hook registry if none is provided
if type_hook_registry is None and type_processor_registry is None:
from .hooks import get_global_registry
+
type_hook_registry = get_global_registry()
elif type_processor_registry is not None:
# Convert the type processor registry to a hook registry
type_hook_registry = type_processor_registry.to_hook_registry()
-
+
# Create a decoder instance with the specified parameters
decoder = cls(
object_hook=object_hook,
type_hook_registry=type_hook_registry,
encoding=encoding,
-
**kwargs
+
**kwargs,
)
-
+
# Use the standard json.load with our custom decoder
return json.load(
fp,
···
parse_int=parse_int,
parse_constant=parse_constant,
object_pairs_hook=object_pairs_hook,
-
**kwargs
+
**kwargs,
)
···
type_hook_registry: Optional[TypeHookRegistry] = None,
type_processor_registry: Optional[TypeProcessorRegistry] = None,
encoding: str = "utf-8",
-
**kwargs: Any
+
**kwargs: Any,
) -> Any:
"""Deserialize s (a str, bytes or bytearray instance containing a JSON document)
to a Python object.
-
+
This function is similar to json.loads() but supports ATProto-specific
data types, including bytes, CID links, and typed objects.
-
+
Args:
s: A str, bytes or bytearray instance containing a JSON document.
cls: A custom JSONDecoder subclass. If not specified, JsonDecoder is used.
···
type_processor_registry: Registry for type-specific processors.
encoding: The encoding to use for string deserialization.
**kwargs: Additional keyword arguments to pass to the JSON decoder.
-
+
Returns:
A Python object.
"""
if cls is None:
cls = JsonDecoder
-
+
# Use the global type hook registry if none is provided
if type_hook_registry is None and type_processor_registry is None:
from .hooks import get_global_registry
+
type_hook_registry = get_global_registry()
elif type_processor_registry is not None:
# Convert the type processor registry to a hook registry
type_hook_registry = type_processor_registry.to_hook_registry()
-
+
# Create a decoder instance with the specified parameters
decoder = cls(
object_hook=object_hook,
type_hook_registry=type_hook_registry,
encoding=encoding,
-
**kwargs
+
**kwargs,
)
-
+
# Use the standard json.loads with our custom decoder
return json.loads(
s,
···
parse_int=parse_int,
parse_constant=parse_constant,
object_pairs_hook=object_pairs_hook,
-
**kwargs
-
)
+
**kwargs,
+
)
+1 -1
tests/__init__.py
···
-
"""Test package for atpasser."""
+
"""Test package for atpasser."""
+1 -1
tests/uri/__init__.py
···
-
"""Test package for atpasser.uri module."""
+
"""Test package for atpasser.uri module."""
+16 -16
tests/uri/test_did.py
···
"""Test creating a DID with a valid did:plc format."""
did_str = "did:plc:z72i7hdynmk6r22z27h6tvur"
did = DID(did_str)
-
+
assert str(did) == did_str
assert did.uri == did_str
···
"""Test creating a DID with a valid did:web format."""
did_str = "did:web:blueskyweb.xyz"
did = DID(did_str)
-
+
assert str(did) == did_str
assert did.uri == did_str
···
"""Test creating a DID with various valid characters."""
did_str = "did:method:val:two-with_underscores.and-dashes"
did = DID(did_str)
-
+
assert str(did) == did_str
assert did.uri == did_str
def test_invalid_did_wrong_format(self):
"""Test that a DID with wrong format raises InvalidDIDError."""
did_str = "not-a-did"
-
+
with pytest.raises(InvalidDIDError, match="invalid format"):
DID(did_str)
def test_invalid_did_uppercase_method(self):
"""Test that a DID with uppercase method raises InvalidDIDError."""
did_str = "did:METHOD:val"
-
+
with pytest.raises(InvalidDIDError, match="invalid format"):
DID(did_str)
def test_invalid_did_method_with_numbers(self):
"""Test that a DID with method containing numbers raises InvalidDIDError."""
did_str = "did:m123:val"
-
+
with pytest.raises(InvalidDIDError, match="invalid format"):
DID(did_str)
def test_invalid_did_empty_identifier(self):
"""Test that a DID with empty identifier raises InvalidDIDError."""
did_str = "did:method:"
-
+
with pytest.raises(InvalidDIDError, match="invalid format"):
DID(did_str)
def test_invalid_did_ends_with_colon(self):
"""Test that a DID ending with colon raises InvalidDIDError."""
did_str = "did:method:val:"
-
+
with pytest.raises(InvalidDIDError, match="invalid format"):
DID(did_str)
···
# Create a DID that exceeds the 2048 character limit
long_identifier = "a" * 2040
did_str = f"did:method:{long_identifier}"
-
+
with pytest.raises(InvalidDIDError, match="exceeds maximum length"):
DID(did_str)
···
did_str = "did:plc:z72i7hdynmk6r22z27h6tvur"
did1 = DID(did_str)
did2 = DID(did_str)
-
+
assert did1 == did2
assert did1 != "not a did object"
···
"""Test DID string representation."""
did_str = "did:plc:z72i7hdynmk6r22z27h6tvur"
did = DID(did_str)
-
+
assert str(did) == did_str
def test_did_fetch_plc_method(self):
"""Test fetching a DID document for did:plc method."""
did_str = "did:plc:z72i7hdynmk6r22z27h6tvur"
did = DID(did_str)
-
+
# This test may fail if there's no internet connection or if the PLC directory is down
try:
document = did.fetch()
···
"""Test fetching a DID document for did:web method."""
did_str = "did:web:blueskyweb.xyz"
did = DID(did_str)
-
+
# This test may fail if there's no internet connection or if the web server is down
try:
document = did.fetch()
···
"""Test that fetching a DID document with unsupported method raises InvalidDIDError."""
did_str = "did:unsupported:method"
did = DID(did_str)
-
+
with pytest.raises(InvalidDIDError, match="unsupported DID method"):
did.fetch()
···
"""Test that fetching a DID document with empty domain raises InvalidDIDError."""
did_str = "did:web:"
did = DID(did_str)
-
+
with pytest.raises(InvalidDIDError, match="invalid format"):
-
did.fetch()
+
did.fetch()
+22 -22
tests/uri/test_handle.py
···
"""Test creating a Handle with a valid simple format."""
handle_str = "example.com"
handle = Handle(handle_str)
-
+
assert str(handle) == handle_str
assert handle.handle == handle_str
···
"""Test creating a Handle with a valid subdomain format."""
handle_str = "subdomain.example.com"
handle = Handle(handle_str)
-
+
assert str(handle) == handle_str
assert handle.handle == handle_str
···
"""Test creating a Handle with a valid format containing hyphens."""
handle_str = "my-example.com"
handle = Handle(handle_str)
-
+
assert str(handle) == handle_str
assert handle.handle == handle_str
···
"""Test creating a Handle with a valid format containing numbers."""
handle_str = "example123.com"
handle = Handle(handle_str)
-
+
assert str(handle) == handle_str
assert handle.handle == handle_str
···
"""Test creating a Handle with a valid long domain name."""
handle_str = "a" * 63 + "." + "b" * 63 + "." + "c" * 63 + ".com"
handle = Handle(handle_str)
-
+
assert str(handle) == handle_str
assert handle.handle == handle_str
···
# Create a handle that exceeds the 253 character limit
long_handle = "a" * 254
handle_str = f"{long_handle}.com"
-
+
with pytest.raises(InvalidHandleError, match="exceeds maximum length"):
Handle(handle_str)
def test_invalid_handle_no_dot_separator(self):
"""Test that a Handle without a dot separator raises InvalidHandleError."""
handle_str = "example"
-
+
with pytest.raises(InvalidHandleError, match="invalid format"):
Handle(handle_str)
def test_invalid_handle_starts_with_dot(self):
"""Test that a Handle starting with a dot raises InvalidHandleError."""
handle_str = ".example.com"
-
+
with pytest.raises(InvalidHandleError, match="invalid format"):
Handle(handle_str)
def test_invalid_handle_ends_with_dot(self):
"""Test that a Handle ending with a dot raises InvalidHandleError."""
handle_str = "example.com."
-
+
with pytest.raises(InvalidHandleError, match="invalid format"):
Handle(handle_str)
def test_invalid_handle_segment_too_long(self):
"""Test that a Handle with a segment that is too long raises InvalidHandleError."""
handle_str = f"{'a' * 64}.com"
-
+
with pytest.raises(InvalidHandleError, match="segment length error"):
Handle(handle_str)
def test_invalid_handle_segment_empty(self):
"""Test that a Handle with an empty segment raises InvalidHandleError."""
handle_str = "example..com"
-
+
with pytest.raises(InvalidHandleError, match="segment length error"):
Handle(handle_str)
def test_invalid_handle_invalid_characters(self):
"""Test that a Handle with invalid characters raises InvalidHandleError."""
handle_str = "ex@mple.com"
-
+
with pytest.raises(InvalidHandleError, match="contains invalid characters"):
Handle(handle_str)
def test_invalid_handle_segment_starts_with_hyphen(self):
"""Test that a Handle with a segment starting with a hyphen raises InvalidHandleError."""
handle_str = "-example.com"
-
+
with pytest.raises(InvalidHandleError, match="invalid format"):
Handle(handle_str)
def test_invalid_handle_segment_ends_with_hyphen(self):
"""Test that a Handle with a segment ending with a hyphen raises InvalidHandleError."""
handle_str = "example-.com"
-
+
with pytest.raises(InvalidHandleError, match="invalid format"):
Handle(handle_str)
def test_invalid_handle_tld_starts_with_digit(self):
"""Test that a Handle with a TLD starting with a digit raises InvalidHandleError."""
handle_str = "example.1com"
-
+
with pytest.raises(InvalidHandleError, match="invalid format"):
Handle(handle_str)
···
handle_str = "example.com"
handle1 = Handle(handle_str)
handle2 = Handle(handle_str)
-
+
assert handle1 == handle2
assert handle1 != "not a handle object"
···
"""Test Handle string representation."""
handle_str = "example.com"
handle = Handle(handle_str)
-
+
assert str(handle) == handle_str
def test_handle_case_insensitive_storage(self):
"""Test that Handle stores the handle in lowercase."""
handle_str = "ExAmPlE.CoM"
handle = Handle(handle_str)
-
+
# The handle should be stored in lowercase
assert handle.handle == "example.com"
# The string representation should also return the lowercase form
···
"""Test resolving a handle to DID using DNS method."""
handle_str = "bsky.app"
handle = Handle(handle_str)
-
+
# This test may fail if there's no internet connection or if DNS resolution fails
try:
did = handle.toTID()
···
"""Test resolving a handle to DID using HTTP method."""
handle_str = "blueskyweb.xyz"
handle = Handle(handle_str)
-
+
# This test may fail if there's no internet connection or if HTTP resolution fails
try:
did = handle.toTID()
···
"""Test resolving an unresolvable handle returns None."""
handle_str = "nonexistent-domain-12345.com"
handle = Handle(handle_str)
-
+
# This should return None for a non-existent domain
did = handle.toTID()
-
assert did is None
+
assert did is None
+39 -33
tests/uri/test_nsid.py
···
"""Test creating an NSID with a valid simple format."""
nsid_str = "com.example.recordName"
nsid = NSID(nsid_str)
-
+
assert str(nsid) == nsid_str
assert nsid.nsid == nsid_str
assert nsid.domainAuthority == ["com", "example"]
···
"""Test creating an NSID with a valid fragment."""
nsid_str = "com.example.recordName#fragment"
nsid = NSID(nsid_str)
-
+
assert str(nsid) == nsid_str
assert nsid.nsid == nsid_str
assert nsid.domainAuthority == ["com", "example"]
···
"""Test creating an NSID with multiple domain segments."""
nsid_str = "net.users.bob.ping"
nsid = NSID(nsid_str)
-
+
assert str(nsid) == nsid_str
assert nsid.nsid == nsid_str
assert nsid.domainAuthority == ["net", "users", "bob"]
···
"""Test creating an NSID with hyphens in domain segments."""
nsid_str = "a-0.b-1.c.recordName"
nsid = NSID(nsid_str)
-
+
assert str(nsid) == nsid_str
assert nsid.nsid == nsid_str
assert nsid.domainAuthority == ["a-0", "b-1", "c"]
···
"""Test creating an NSID with case-sensitive name."""
nsid_str = "com.example.fooBar"
nsid = NSID(nsid_str)
-
+
assert str(nsid) == nsid_str
assert nsid.nsid == nsid_str
assert nsid.domainAuthority == ["com", "example"]
···
"""Test creating an NSID with numbers in the name."""
nsid_str = "com.example.record123"
nsid = NSID(nsid_str)
-
+
assert str(nsid) == nsid_str
assert nsid.nsid == nsid_str
assert nsid.domainAuthority == ["com", "example"]
···
def test_invalid_nsid_non_ascii_characters(self):
"""Test that an NSID with non-ASCII characters raises InvalidNSIDError."""
nsid_str = "com.exa💩ple.thing"
-
+
with pytest.raises(InvalidNSIDError, match="contains invalid characters"):
NSID(nsid_str)
···
# Create an NSID that exceeds the 317 character limit
long_segment = "a" * 100
nsid_str = f"{long_segment}.{long_segment}.{long_segment}.recordName"
-
-
with pytest.raises(InvalidNSIDError, match="domain authority length exceeds limit"):
+
+
with pytest.raises(
+
InvalidNSIDError, match="domain authority length exceeds limit"
+
):
NSID(nsid_str)
def test_invalid_nsid_starts_with_dot(self):
"""Test that an NSID starting with a dot raises InvalidNSIDError."""
nsid_str = ".com.example.recordName"
-
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
NSID(nsid_str)
def test_invalid_nsid_ends_with_dot(self):
"""Test that an NSID ending with a dot raises InvalidNSIDError."""
nsid_str = "com.example.recordName."
-
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
NSID(nsid_str)
def test_invalid_nsid_too_few_segments(self):
"""Test that an NSID with too few segments raises InvalidNSIDError."""
nsid_str = "com.example"
-
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
NSID(nsid_str)
···
"""Test that an NSID with domain authority that is too long raises InvalidNSIDError."""
# Create a domain authority that exceeds the 253 character limit
long_segment = "a" * 63
-
nsid_str = f"{long_segment}.{long_segment}.{long_segment}.{long_segment}.recordName"
-
-
with pytest.raises(InvalidNSIDError, match="domain authority length exceeds limit"):
+
nsid_str = (
+
f"{long_segment}.{long_segment}.{long_segment}.{long_segment}.recordName"
+
)
+
+
with pytest.raises(
+
InvalidNSIDError, match="domain authority length exceeds limit"
+
):
NSID(nsid_str)
def test_invalid_nsid_domain_segment_too_long(self):
"""Test that an NSID with a domain segment that is too long raises InvalidNSIDError."""
nsid_str = f"{'a' * 64}.example.recordName"
-
+
with pytest.raises(InvalidNSIDError, match="segment length error"):
NSID(nsid_str)
def test_invalid_nsid_domain_segment_empty(self):
"""Test that an NSID with an empty domain segment raises InvalidNSIDError."""
nsid_str = "com..example.recordName"
-
+
with pytest.raises(InvalidNSIDError, match="segment length error"):
NSID(nsid_str)
def test_invalid_nsid_domain_invalid_characters(self):
"""Test that an NSID with invalid characters in domain raises InvalidNSIDError."""
nsid_str = "com.ex@mple.recordName"
-
+
with pytest.raises(InvalidNSIDError, match="contains invalid characters"):
NSID(nsid_str)
def test_invalid_nsid_domain_segment_starts_with_hyphen(self):
"""Test that an NSID with a domain segment starting with a hyphen raises InvalidNSIDError."""
nsid_str = "com.-example.recordName"
-
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
NSID(nsid_str)
def test_invalid_nsid_domain_segment_ends_with_hyphen(self):
"""Test that an NSID with a domain segment ending with a hyphen raises InvalidNSIDError."""
nsid_str = "com.example-.recordName"
-
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
NSID(nsid_str)
def test_invalid_nsid_tld_starts_with_digit(self):
"""Test that an NSID with a TLD starting with a digit raises InvalidNSIDError."""
nsid_str = "1com.example.recordName"
-
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
NSID(nsid_str)
def test_invalid_nsid_name_empty(self):
"""Test that an NSID with an empty name raises InvalidNSIDError."""
nsid_str = "com.example."
-
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
NSID(nsid_str)
def test_invalid_nsid_name_too_long(self):
"""Test that an NSID with a name that is too long raises InvalidNSIDError."""
nsid_str = f"com.example.{'a' * 64}"
-
+
with pytest.raises(InvalidNSIDError, match="name length error"):
NSID(nsid_str)
def test_invalid_nsid_name_invalid_characters(self):
"""Test that an NSID with invalid characters in name raises InvalidNSIDError."""
nsid_str = "com.example.record-name"
-
+
with pytest.raises(InvalidNSIDError, match="contains invalid characters"):
NSID(nsid_str)
def test_invalid_nsid_name_starts_with_digit(self):
"""Test that an NSID with a name starting with a digit raises InvalidNSIDError."""
nsid_str = "com.example.1record"
-
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
NSID(nsid_str)
def test_invalid_nsid_fragment_empty(self):
"""Test that an NSID with an empty fragment raises InvalidNSIDError."""
nsid_str = "com.example.recordName#"
-
+
with pytest.raises(InvalidNSIDError, match="fragment length error"):
NSID(nsid_str)
def test_invalid_nsid_fragment_too_long(self):
"""Test that an NSID with a fragment that is too long raises InvalidNSIDError."""
nsid_str = f"com.example.recordName#{'a' * 64}"
-
+
with pytest.raises(InvalidNSIDError, match="fragment length error"):
NSID(nsid_str)
def test_invalid_nsid_fragment_invalid_characters(self):
"""Test that an NSID with invalid characters in fragment raises InvalidNSIDError."""
nsid_str = "com.example.recordName#fragment-with-hyphen"
-
+
with pytest.raises(InvalidNSIDError, match="contains invalid characters"):
NSID(nsid_str)
def test_invalid_nsid_fragment_starts_with_digit(self):
"""Test that an NSID with a fragment starting with a digit raises InvalidNSIDError."""
nsid_str = "com.example.recordName#1fragment"
-
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
NSID(nsid_str)
···
nsid_str = "com.example.recordName"
nsid1 = NSID(nsid_str)
nsid2 = NSID(nsid_str)
-
+
assert nsid1 == nsid2
assert nsid1 != "not an nsid object"
···
"""Test NSID string representation."""
nsid_str = "com.example.recordName"
nsid = NSID(nsid_str)
-
+
assert str(nsid) == nsid_str
def test_nsid_string_representation_with_fragment(self):
"""Test NSID string representation with fragment."""
nsid_str = "com.example.recordName#fragment"
nsid = NSID(nsid_str)
-
-
assert str(nsid) == nsid_str
+
+
assert str(nsid) == nsid_str
+27 -17
tests/uri/test_restricted_uri.py
···
def test_valid_restricted_uri_with_did_collection_and_rkey(self):
"""Test creating a RestrictedURI with a valid DID, collection, and rkey."""
-
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
uri_str = (
+
"at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
)
uri = RestrictedURI(uri_str)
-
+
assert str(uri) == uri_str
assert uri.authorityAsText == "did:plc:z72i7hdynmk6r22z27h6tvur"
assert uri.path == ["app.bsky.feed.post", "3jwdwj2ctlk26"]
···
"""Test creating a RestrictedURI with a valid handle, collection, and rkey."""
uri_str = "at://bnewbold.bsky.team/app.bsky.feed.post/3jwdwj2ctlk26"
uri = RestrictedURI(uri_str)
-
+
assert str(uri) == uri_str
assert uri.authorityAsText == "bnewbold.bsky.team"
assert uri.path == ["app.bsky.feed.post", "3jwdwj2ctlk26"]
···
"""Test creating a RestrictedURI with only a collection."""
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post"
uri = RestrictedURI(uri_str)
-
+
assert str(uri) == uri_str
assert uri.authorityAsText == "did:plc:z72i7hdynmk6r22z27h6tvur"
assert uri.path == ["app.bsky.feed.post"]
···
"""Test creating a RestrictedURI with only an authority."""
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur"
uri = RestrictedURI(uri_str)
-
+
assert str(uri) == uri_str
assert uri.authorityAsText == "did:plc:z72i7hdynmk6r22z27h6tvur"
assert uri.path == []
···
def test_invalid_restricted_uri_with_query(self):
"""Test that a RestrictedURI with query parameters raises InvalidRestrictedURIError."""
-
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post?param1=value1"
-
-
with pytest.raises(InvalidRestrictedURIError, match="query parameters not supported"):
+
uri_str = (
+
"at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post?param1=value1"
+
)
+
+
with pytest.raises(
+
InvalidRestrictedURIError, match="query parameters not supported"
+
):
RestrictedURI(uri_str)
def test_invalid_restricted_uri_with_fragment(self):
"""Test that a RestrictedURI with a fragment raises InvalidRestrictedURIError."""
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26#$.some.json.path"
-
+
with pytest.raises(InvalidRestrictedURIError, match="fragments not supported"):
RestrictedURI(uri_str)
def test_invalid_restricted_uri_with_invalid_authority(self):
"""Test that a RestrictedURI with invalid authority raises InvalidRestrictedURIError."""
uri_str = "at://invalid_authority/app.bsky.feed.post/3jwdwj2ctlk26"
-
+
with pytest.raises(InvalidRestrictedURIError, match="invalid authority"):
RestrictedURI(uri_str)
def test_invalid_restricted_uri_too_many_path_segments(self):
"""Test that a RestrictedURI with too many path segments raises InvalidRestrictedURIError."""
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26/extra"
-
+
with pytest.raises(InvalidRestrictedURIError, match="too many path segments"):
RestrictedURI(uri_str)
def test_invalid_restricted_uri_base_uri_validation_failure(self):
"""Test that a RestrictedURI with invalid base URI raises InvalidURIError."""
uri_str = "https://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post"
-
+
with pytest.raises(InvalidURIError, match="invalid format"):
RestrictedURI(uri_str)
def test_restricted_uri_equality(self):
"""Test RestrictedURI equality comparison."""
-
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
uri_str = (
+
"at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
)
uri1 = RestrictedURI(uri_str)
uri2 = RestrictedURI(uri_str)
-
+
assert uri1 == uri2
assert uri1 != "not a uri object"
def test_restricted_uri_string_representation(self):
"""Test RestrictedURI string representation."""
-
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
uri_str = (
+
"at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
)
uri = RestrictedURI(uri_str)
-
-
assert str(uri) == uri_str
+
+
assert str(uri) == uri_str
+39 -39
tests/uri/test_rkey.py
···
"""Test creating an RKey with a valid simple format."""
rkey_str = "3jui7kd54zh2y"
rkey = RKey(rkey_str)
-
+
assert str(rkey) == rkey_str
assert rkey.recordKey == rkey_str
···
"""Test creating an RKey with various valid characters."""
rkey_str = "example.com"
rkey = RKey(rkey_str)
-
+
assert str(rkey) == rkey_str
assert rkey.recordKey == rkey_str
···
"""Test creating an RKey with valid special characters."""
rkey_str = "~1.2-3_"
rkey = RKey(rkey_str)
-
+
assert str(rkey) == rkey_str
assert rkey.recordKey == rkey_str
···
"""Test creating an RKey with a colon."""
rkey_str = "pre:fix"
rkey = RKey(rkey_str)
-
+
assert str(rkey) == rkey_str
assert rkey.recordKey == rkey_str
···
"""Test creating an RKey with just an underscore."""
rkey_str = "_"
rkey = RKey(rkey_str)
-
+
assert str(rkey) == rkey_str
assert rkey.recordKey == rkey_str
def test_invalid_rkey_empty(self):
"""Test that an empty RKey raises InvalidRKeyError."""
rkey_str = ""
-
+
with pytest.raises(InvalidRKeyError, match="record key is empty"):
RKey(rkey_str)
···
"""Test that an RKey that is too long raises InvalidRKeyError."""
# Create an RKey that exceeds the 512 character limit
rkey_str = "a" * 513
-
+
with pytest.raises(InvalidRKeyError, match="exceeds maximum length"):
RKey(rkey_str)
def test_invalid_rkey_reserved_double_dot(self):
"""Test that an RKey with '..' raises InvalidRKeyError."""
rkey_str = ".."
-
+
with pytest.raises(InvalidRKeyError, match="reserved value"):
RKey(rkey_str)
def test_invalid_rkey_reserved_single_dot(self):
"""Test that an RKey with '.' raises InvalidRKeyError."""
rkey_str = "."
-
+
with pytest.raises(InvalidRKeyError, match="reserved value"):
RKey(rkey_str)
def test_invalid_rkey_invalid_characters(self):
"""Test that an RKey with invalid characters raises InvalidRKeyError."""
rkey_str = "alpha/beta"
-
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
RKey(rkey_str)
def test_invalid_rkey_hash_character(self):
"""Test that an RKey with a hash character raises InvalidRKeyError."""
rkey_str = "#extra"
-
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
RKey(rkey_str)
def test_invalid_rkey_at_character(self):
"""Test that an RKey with an at character raises InvalidRKeyError."""
rkey_str = "@handle"
-
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
RKey(rkey_str)
def test_invalid_rkey_space(self):
"""Test that an RKey with a space raises InvalidRKeyError."""
rkey_str = "any space"
-
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
RKey(rkey_str)
def test_invalid_rkey_plus_character(self):
"""Test that an RKey with a plus character raises InvalidRKeyError."""
rkey_str = "any+space"
-
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
RKey(rkey_str)
def test_invalid_rkey_brackets(self):
"""Test that an RKey with brackets raises InvalidRKeyError."""
rkey_str = "number[3]"
-
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
RKey(rkey_str)
def test_invalid_rkey_parentheses(self):
"""Test that an RKey with parentheses raises InvalidRKeyError."""
rkey_str = "number(3)"
-
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
RKey(rkey_str)
def test_invalid_rkey_quotes(self):
"""Test that an RKey with quotes raises InvalidRKeyError."""
rkey_str = '"quote"'
-
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
RKey(rkey_str)
def test_invalid_rkey_base64_padding(self):
"""Test that an RKey with base64 padding raises InvalidRKeyError."""
rkey_str = "dHJ1ZQ=="
-
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
RKey(rkey_str)
···
rkey_str = "3jui7kd54zh2y"
rkey1 = RKey(rkey_str)
rkey2 = RKey(rkey_str)
-
+
assert rkey1 == rkey2
assert rkey1 != "not an rkey object"
···
"""Test RKey string representation."""
rkey_str = "3jui7kd54zh2y"
rkey = RKey(rkey_str)
-
+
assert str(rkey) == rkey_str
···
def test_tid_creation_default(self):
"""Test creating a TID with default parameters."""
tid = TID()
-
+
assert isinstance(tid, TID)
assert isinstance(tid, RKey)
assert isinstance(tid.timestamp, datetime.datetime)
···
"""Test creating a TID with a specific timestamp."""
timestamp = datetime.datetime(2023, 1, 1, 12, 0, 0)
tid = TID(time=timestamp)
-
+
assert tid.timestamp == timestamp
assert isinstance(tid.clockIdentifier, int)
assert 0 <= tid.clockIdentifier < 1024
···
"""Test creating a TID with a specific clock identifier."""
clock_id = 42
tid = TID(clockIdentifier=clock_id)
-
+
assert tid.clockIdentifier == clock_id
assert isinstance(tid.timestamp, datetime.datetime)
···
timestamp = datetime.datetime(2023, 1, 1, 12, 0, 0)
clock_id = 42
tid = TID(time=timestamp, clockIdentifier=clock_id)
-
+
assert tid.timestamp == timestamp
assert tid.clockIdentifier == clock_id
···
timestamp = datetime.datetime(2023, 1, 1, 12, 0, 0)
clock_id = 42
tid = TID(time=timestamp, clockIdentifier=clock_id)
-
+
int_value = int(tid)
expected_value = int(timestamp.timestamp() * 1000000) * 1024 + clock_id
-
+
assert int_value == expected_value
def test_tid_string_representation(self):
"""Test TID string representation."""
tid = TID()
-
+
str_value = str(tid)
assert len(str_value) == 13
assert all(c in "234567abcdefghijklmnopqrstuvwxyz" for c in str_value)
···
clock_id = 42
tid1 = TID(time=timestamp, clockIdentifier=clock_id)
tid2 = TID(time=timestamp, clockIdentifier=clock_id)
-
+
assert tid1 == tid2
def test_tid_equality_with_rkey(self):
···
clock_id = 42
tid = TID(time=timestamp, clockIdentifier=clock_id)
rkey = RKey(str(tid))
-
+
assert tid == rkey
def test_tid_inequality_with_different_object(self):
"""Test TID inequality comparison with a different object type."""
tid = TID()
-
+
assert tid != "not a tid object"
def test_tid_inequality_with_different_timestamp(self):
···
clock_id = 42
tid1 = TID(time=timestamp1, clockIdentifier=clock_id)
tid2 = TID(time=timestamp2, clockIdentifier=clock_id)
-
+
assert tid1 != tid2
def test_tid_inequality_with_different_clock_id(self):
···
clock_id2 = 43
tid1 = TID(time=timestamp, clockIdentifier=clock_id1)
tid2 = TID(time=timestamp, clockIdentifier=clock_id2)
-
+
assert tid1 != tid2
···
def test_import_tid_from_integer_default(self):
"""Test importing a TID from integer with default value."""
tid = importTIDfromInteger()
-
+
assert isinstance(tid, TID)
assert isinstance(tid.timestamp, datetime.datetime)
assert isinstance(tid.clockIdentifier, int)
···
clock_id = 42
original_tid = TID(time=timestamp, clockIdentifier=clock_id)
int_value = int(original_tid)
-
+
imported_tid = importTIDfromInteger(int_value)
-
+
assert imported_tid.timestamp == timestamp
assert imported_tid.clockIdentifier == clock_id
def test_import_tid_from_base32_default(self):
"""Test importing a TID from base32 with default value."""
tid = importTIDfromBase32()
-
+
assert isinstance(tid, TID)
assert isinstance(tid.timestamp, datetime.datetime)
assert isinstance(tid.clockIdentifier, int)
···
"""Test importing a TID from base32 with a specific value."""
original_tid = TID()
str_value = str(original_tid)
-
+
imported_tid = importTIDfromBase32(str_value)
-
-
assert int(imported_tid) == int(original_tid)
+
+
assert int(imported_tid) == int(original_tid)
+26 -18
tests/uri/test_uri.py
···
def test_valid_uri_with_did(self):
"""Test creating a URI with a valid DID."""
-
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
uri_str = (
+
"at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
)
uri = URI(uri_str)
-
+
assert str(uri) == uri_str
assert uri.authorityAsText == "did:plc:z72i7hdynmk6r22z27h6tvur"
assert uri.path == ["app.bsky.feed.post", "3jwdwj2ctlk26"]
···
"""Test creating a URI with a valid handle."""
uri_str = "at://bnewbold.bsky.team/app.bsky.feed.post/3jwdwj2ctlk26"
uri = URI(uri_str)
-
+
assert str(uri) == uri_str
assert uri.authorityAsText == "bnewbold.bsky.team"
assert uri.path == ["app.bsky.feed.post", "3jwdwj2ctlk26"]
···
"""Test creating a URI with only a collection."""
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post"
uri = URI(uri_str)
-
+
assert str(uri) == uri_str
assert uri.authorityAsText == "did:plc:z72i7hdynmk6r22z27h6tvur"
assert uri.path == ["app.bsky.feed.post"]
···
"""Test creating a URI with only an authority."""
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur"
uri = URI(uri_str)
-
+
assert str(uri) == uri_str
assert uri.authorityAsText == "did:plc:z72i7hdynmk6r22z27h6tvur"
assert uri.path == []
···
"""Test creating a URI with query parameters."""
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post?param1=value1&param2=value2"
uri = URI(uri_str)
-
+
assert uri.query == {"param1": ["value1"], "param2": ["value2"]}
assert uri.queryAsText == "param1%3Dvalue1%26param2%3Dvalue2"
···
"""Test creating a URI with a fragment."""
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26#$.some.json.path"
uri = URI(uri_str)
-
+
assert uri.fragment is not None
assert uri.fragmentAsText == "%24.some.json.path"
def test_invalid_uri_non_ascii_characters(self):
"""Test that non-ASCII characters in URI raise InvalidURIError."""
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/💩"
-
+
with pytest.raises(InvalidURIError, match="contains invalid characters"):
URI(uri_str)
···
# Create a URI that exceeds the 8000 character limit
long_path = "a" * 8000
uri_str = f"at://did:plc:z72i7hdynmk6r22z27h6tvur/{long_path}"
-
+
with pytest.raises(InvalidURIError, match="exceeds maximum length"):
URI(uri_str)
def test_invalid_uri_wrong_scheme(self):
"""Test that a URI with wrong scheme raises InvalidURIError."""
-
uri_str = "https://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
-
+
uri_str = (
+
"https://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
)
+
with pytest.raises(InvalidURIError, match="invalid format"):
URI(uri_str)
def test_invalid_uri_trailing_slash(self):
"""Test that a URI with trailing slash raises InvalidURIError."""
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/"
-
+
with pytest.raises(InvalidURIError, match="cannot end with a slash"):
URI(uri_str)
def test_invalid_uri_with_userinfo(self):
"""Test that a URI with userinfo raises InvalidURIError."""
uri_str = "at://user:pass@did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post"
-
+
with pytest.raises(InvalidURIError, match="does not support user information"):
URI(uri_str)
def test_uri_equality(self):
"""Test URI equality comparison."""
-
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
uri_str = (
+
"at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
)
uri1 = URI(uri_str)
uri2 = URI(uri_str)
-
+
assert uri1 == uri2
assert uri1 != "not a uri object"
def test_uri_string_representation(self):
"""Test URI string representation."""
-
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
uri_str = (
+
"at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
)
uri = URI(uri_str)
-
-
assert str(uri) == uri_str
+
+
assert str(uri) == uri_str