add tests for uri

+3
src/atpasser/uri/__init__.py
···
"JSONPath parsing failed",
f"Failed to parse JSONPath fragment '{fragment}': {str(e)}",
)
+
else:
+
self.fragment = None
+
self.fragmentAsText = None
if query != None:
try:
+1 -1
src/atpasser/uri/identifier.py
···
"Handle's top-level domain cannot start with a digit",
)
-
self.handle = handle
+
self.handle = handle.lower()
def __str__(self) -> str:
"""Convert the Handle object to its string representation.
+1
tests/__init__.py
···
+
"""Test package for atpasser."""
+1
tests/uri/__init__.py
···
+
"""Test package for atpasser.uri module."""
+137
tests/uri/test_did.py
···
+
"""Test cases for the DID class in atpasser.uri.identifier module."""
+
+
import pytest
+
from atpasser.uri.identifier import DID
+
from atpasser.uri.exceptions import InvalidDIDError, ResolutionError
+
+
+
class TestDID:
+
"""Test cases for the DID class."""
+
+
def test_valid_did_plc(self):
+
"""Test creating a DID with a valid did:plc format."""
+
did_str = "did:plc:z72i7hdynmk6r22z27h6tvur"
+
did = DID(did_str)
+
+
assert str(did) == did_str
+
assert did.uri == did_str
+
+
def test_valid_did_web(self):
+
"""Test creating a DID with a valid did:web format."""
+
did_str = "did:web:blueskyweb.xyz"
+
did = DID(did_str)
+
+
assert str(did) == did_str
+
assert did.uri == did_str
+
+
def test_valid_did_with_various_characters(self):
+
"""Test creating a DID with various valid characters."""
+
did_str = "did:method:val:two-with_underscores.and-dashes"
+
did = DID(did_str)
+
+
assert str(did) == did_str
+
assert did.uri == did_str
+
+
def test_invalid_did_wrong_format(self):
+
"""Test that a DID with wrong format raises InvalidDIDError."""
+
did_str = "not-a-did"
+
+
with pytest.raises(InvalidDIDError, match="invalid format"):
+
DID(did_str)
+
+
def test_invalid_did_uppercase_method(self):
+
"""Test that a DID with uppercase method raises InvalidDIDError."""
+
did_str = "did:METHOD:val"
+
+
with pytest.raises(InvalidDIDError, match="invalid format"):
+
DID(did_str)
+
+
def test_invalid_did_method_with_numbers(self):
+
"""Test that a DID with method containing numbers raises InvalidDIDError."""
+
did_str = "did:m123:val"
+
+
with pytest.raises(InvalidDIDError, match="invalid format"):
+
DID(did_str)
+
+
def test_invalid_did_empty_identifier(self):
+
"""Test that a DID with empty identifier raises InvalidDIDError."""
+
did_str = "did:method:"
+
+
with pytest.raises(InvalidDIDError, match="invalid format"):
+
DID(did_str)
+
+
def test_invalid_did_ends_with_colon(self):
+
"""Test that a DID ending with colon raises InvalidDIDError."""
+
did_str = "did:method:val:"
+
+
with pytest.raises(InvalidDIDError, match="invalid format"):
+
DID(did_str)
+
+
def test_invalid_did_too_long(self):
+
"""Test that a DID that is too long raises InvalidDIDError."""
+
# Create a DID that exceeds the 2048 character limit
+
long_identifier = "a" * 2040
+
did_str = f"did:method:{long_identifier}"
+
+
with pytest.raises(InvalidDIDError, match="exceeds maximum length"):
+
DID(did_str)
+
+
def test_did_equality(self):
+
"""Test DID equality comparison."""
+
did_str = "did:plc:z72i7hdynmk6r22z27h6tvur"
+
did1 = DID(did_str)
+
did2 = DID(did_str)
+
+
assert did1 == did2
+
assert did1 != "not a did object"
+
+
def test_did_string_representation(self):
+
"""Test DID string representation."""
+
did_str = "did:plc:z72i7hdynmk6r22z27h6tvur"
+
did = DID(did_str)
+
+
assert str(did) == did_str
+
+
def test_did_fetch_plc_method(self):
+
"""Test fetching a DID document for did:plc method."""
+
did_str = "did:plc:z72i7hdynmk6r22z27h6tvur"
+
did = DID(did_str)
+
+
# This test may fail if there's no internet connection or if the PLC directory is down
+
try:
+
document = did.fetch()
+
assert isinstance(document, list)
+
assert len(document) > 0
+
except ResolutionError:
+
# If resolution fails, we'll skip this test
+
pytest.skip("Failed to resolve DID document")
+
+
def test_did_fetch_web_method(self):
+
"""Test fetching a DID document for did:web method."""
+
did_str = "did:web:blueskyweb.xyz"
+
did = DID(did_str)
+
+
# This test may fail if there's no internet connection or if the web server is down
+
try:
+
document = did.fetch()
+
assert isinstance(document, list)
+
assert len(document) > 0
+
except ResolutionError:
+
# If resolution fails, we'll skip this test
+
pytest.skip("Failed to resolve DID document")
+
+
def test_did_fetch_unsupported_method(self):
+
"""Test that fetching a DID document with unsupported method raises InvalidDIDError."""
+
did_str = "did:unsupported:method"
+
did = DID(did_str)
+
+
with pytest.raises(InvalidDIDError, match="unsupported DID method"):
+
did.fetch()
+
+
def test_did_fetch_web_empty_domain(self):
+
"""Test that fetching a DID document with empty domain raises InvalidDIDError."""
+
did_str = "did:web:"
+
did = DID(did_str)
+
+
with pytest.raises(InvalidDIDError, match="invalid format"):
+
did.fetch()
+184
tests/uri/test_handle.py
···
+
"""Test cases for the Handle class in atpasser.uri.identifier module."""
+
+
import pytest
+
from atpasser.uri.identifier import Handle
+
from atpasser.uri.exceptions import InvalidHandleError, ResolutionError
+
+
+
class TestHandle:
+
"""Test cases for the Handle class."""
+
+
def test_valid_handle_simple(self):
+
"""Test creating a Handle with a valid simple format."""
+
handle_str = "example.com"
+
handle = Handle(handle_str)
+
+
assert str(handle) == handle_str
+
assert handle.handle == handle_str
+
+
def test_valid_handle_subdomain(self):
+
"""Test creating a Handle with a valid subdomain format."""
+
handle_str = "subdomain.example.com"
+
handle = Handle(handle_str)
+
+
assert str(handle) == handle_str
+
assert handle.handle == handle_str
+
+
def test_valid_handle_with_hyphen(self):
+
"""Test creating a Handle with a valid format containing hyphens."""
+
handle_str = "my-example.com"
+
handle = Handle(handle_str)
+
+
assert str(handle) == handle_str
+
assert handle.handle == handle_str
+
+
def test_valid_handle_with_numbers(self):
+
"""Test creating a Handle with a valid format containing numbers."""
+
handle_str = "example123.com"
+
handle = Handle(handle_str)
+
+
assert str(handle) == handle_str
+
assert handle.handle == handle_str
+
+
def test_valid_handle_long_domain(self):
+
"""Test creating a Handle with a valid long domain name."""
+
handle_str = "a" * 63 + "." + "b" * 63 + "." + "c" * 63 + ".com"
+
handle = Handle(handle_str)
+
+
assert str(handle) == handle_str
+
assert handle.handle == handle_str
+
+
def test_invalid_handle_too_long(self):
+
"""Test that a Handle that is too long raises InvalidHandleError."""
+
# Create a handle that exceeds the 253 character limit
+
long_handle = "a" * 254
+
handle_str = f"{long_handle}.com"
+
+
with pytest.raises(InvalidHandleError, match="exceeds maximum length"):
+
Handle(handle_str)
+
+
def test_invalid_handle_no_dot_separator(self):
+
"""Test that a Handle without a dot separator raises InvalidHandleError."""
+
handle_str = "example"
+
+
with pytest.raises(InvalidHandleError, match="invalid format"):
+
Handle(handle_str)
+
+
def test_invalid_handle_starts_with_dot(self):
+
"""Test that a Handle starting with a dot raises InvalidHandleError."""
+
handle_str = ".example.com"
+
+
with pytest.raises(InvalidHandleError, match="invalid format"):
+
Handle(handle_str)
+
+
def test_invalid_handle_ends_with_dot(self):
+
"""Test that a Handle ending with a dot raises InvalidHandleError."""
+
handle_str = "example.com."
+
+
with pytest.raises(InvalidHandleError, match="invalid format"):
+
Handle(handle_str)
+
+
def test_invalid_handle_segment_too_long(self):
+
"""Test that a Handle with a segment that is too long raises InvalidHandleError."""
+
handle_str = f"{'a' * 64}.com"
+
+
with pytest.raises(InvalidHandleError, match="segment length error"):
+
Handle(handle_str)
+
+
def test_invalid_handle_segment_empty(self):
+
"""Test that a Handle with an empty segment raises InvalidHandleError."""
+
handle_str = "example..com"
+
+
with pytest.raises(InvalidHandleError, match="segment length error"):
+
Handle(handle_str)
+
+
def test_invalid_handle_invalid_characters(self):
+
"""Test that a Handle with invalid characters raises InvalidHandleError."""
+
handle_str = "ex@mple.com"
+
+
with pytest.raises(InvalidHandleError, match="contains invalid characters"):
+
Handle(handle_str)
+
+
def test_invalid_handle_segment_starts_with_hyphen(self):
+
"""Test that a Handle with a segment starting with a hyphen raises InvalidHandleError."""
+
handle_str = "-example.com"
+
+
with pytest.raises(InvalidHandleError, match="invalid format"):
+
Handle(handle_str)
+
+
def test_invalid_handle_segment_ends_with_hyphen(self):
+
"""Test that a Handle with a segment ending with a hyphen raises InvalidHandleError."""
+
handle_str = "example-.com"
+
+
with pytest.raises(InvalidHandleError, match="invalid format"):
+
Handle(handle_str)
+
+
def test_invalid_handle_tld_starts_with_digit(self):
+
"""Test that a Handle with a TLD starting with a digit raises InvalidHandleError."""
+
handle_str = "example.1com"
+
+
with pytest.raises(InvalidHandleError, match="invalid format"):
+
Handle(handle_str)
+
+
def test_handle_equality(self):
+
"""Test Handle equality comparison."""
+
handle_str = "example.com"
+
handle1 = Handle(handle_str)
+
handle2 = Handle(handle_str)
+
+
assert handle1 == handle2
+
assert handle1 != "not a handle object"
+
+
def test_handle_string_representation(self):
+
"""Test Handle string representation."""
+
handle_str = "example.com"
+
handle = Handle(handle_str)
+
+
assert str(handle) == handle_str
+
+
def test_handle_case_insensitive_storage(self):
+
"""Test that Handle stores the handle in lowercase."""
+
handle_str = "ExAmPlE.CoM"
+
handle = Handle(handle_str)
+
+
# The handle should be stored in lowercase
+
assert handle.handle == "example.com"
+
# The string representation should also return the lowercase form
+
assert str(handle) == "example.com"
+
+
def test_handle_to_tid_dns_resolution(self):
+
"""Test resolving a handle to DID using DNS method."""
+
handle_str = "bsky.app"
+
handle = Handle(handle_str)
+
+
# This test may fail if there's no internet connection or if DNS resolution fails
+
try:
+
did = handle.toTID()
+
assert did is not None
+
assert str(did).startswith("did:")
+
except ResolutionError:
+
# If resolution fails, we'll skip this test
+
pytest.skip("Failed to resolve handle via DNS")
+
+
def test_handle_to_tid_http_resolution(self):
+
"""Test resolving a handle to DID using HTTP method."""
+
handle_str = "blueskyweb.xyz"
+
handle = Handle(handle_str)
+
+
# This test may fail if there's no internet connection or if HTTP resolution fails
+
try:
+
did = handle.toTID()
+
assert did is not None
+
assert str(did).startswith("did:")
+
except ResolutionError:
+
# If resolution fails, we'll skip this test
+
pytest.skip("Failed to resolve handle via HTTP")
+
+
def test_handle_to_tid_unresolvable(self):
+
"""Test resolving an unresolvable handle returns None."""
+
handle_str = "nonexistent-domain-12345.com"
+
handle = Handle(handle_str)
+
+
# This should return None for a non-existent domain
+
did = handle.toTID()
+
assert did is None
+248
tests/uri/test_nsid.py
···
+
"""Test cases for the NSID class in atpasser.uri.nsid module."""
+
+
import pytest
+
from atpasser.uri.nsid import NSID
+
from atpasser.uri.exceptions import InvalidNSIDError, ValidationError
+
+
+
class TestNSID:
+
"""Test cases for the NSID class."""
+
+
def test_valid_nsid_simple(self):
+
"""Test creating an NSID with a valid simple format."""
+
nsid_str = "com.example.recordName"
+
nsid = NSID(nsid_str)
+
+
assert str(nsid) == nsid_str
+
assert nsid.nsid == nsid_str
+
assert nsid.domainAuthority == ["com", "example"]
+
assert nsid.domainAuthorityAsText == "com.example"
+
assert nsid.name == "recordName"
+
assert nsid.fragment is None
+
+
def test_valid_nsid_with_fragment(self):
+
"""Test creating an NSID with a valid fragment."""
+
nsid_str = "com.example.recordName#fragment"
+
nsid = NSID(nsid_str)
+
+
assert str(nsid) == nsid_str
+
assert nsid.nsid == nsid_str
+
assert nsid.domainAuthority == ["com", "example"]
+
assert nsid.domainAuthorityAsText == "com.example"
+
assert nsid.name == "recordName"
+
assert nsid.fragment == "fragment"
+
+
def test_valid_nsid_multiple_segments(self):
+
"""Test creating an NSID with multiple domain segments."""
+
nsid_str = "net.users.bob.ping"
+
nsid = NSID(nsid_str)
+
+
assert str(nsid) == nsid_str
+
assert nsid.nsid == nsid_str
+
assert nsid.domainAuthority == ["net", "users", "bob"]
+
assert nsid.domainAuthorityAsText == "net.users.bob"
+
assert nsid.name == "ping"
+
assert nsid.fragment is None
+
+
def test_valid_nsid_with_hyphens(self):
+
"""Test creating an NSID with hyphens in domain segments."""
+
nsid_str = "a-0.b-1.c.recordName"
+
nsid = NSID(nsid_str)
+
+
assert str(nsid) == nsid_str
+
assert nsid.nsid == nsid_str
+
assert nsid.domainAuthority == ["a-0", "b-1", "c"]
+
assert nsid.domainAuthorityAsText == "a-0.b-1.c"
+
assert nsid.name == "recordName"
+
assert nsid.fragment is None
+
+
def test_valid_nsid_case_sensitivity(self):
+
"""Test creating an NSID with case-sensitive name."""
+
nsid_str = "com.example.fooBar"
+
nsid = NSID(nsid_str)
+
+
assert str(nsid) == nsid_str
+
assert nsid.nsid == nsid_str
+
assert nsid.domainAuthority == ["com", "example"]
+
assert nsid.domainAuthorityAsText == "com.example"
+
assert nsid.name == "fooBar"
+
assert nsid.fragment is None
+
+
def test_valid_nsid_with_numbers_in_name(self):
+
"""Test creating an NSID with numbers in the name."""
+
nsid_str = "com.example.record123"
+
nsid = NSID(nsid_str)
+
+
assert str(nsid) == nsid_str
+
assert nsid.nsid == nsid_str
+
assert nsid.domainAuthority == ["com", "example"]
+
assert nsid.domainAuthorityAsText == "com.example"
+
assert nsid.name == "record123"
+
assert nsid.fragment is None
+
+
def test_invalid_nsid_non_ascii_characters(self):
+
"""Test that an NSID with non-ASCII characters raises InvalidNSIDError."""
+
nsid_str = "com.exa💩ple.thing"
+
+
with pytest.raises(InvalidNSIDError, match="contains invalid characters"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_too_long(self):
+
"""Test that an NSID that is too long raises InvalidNSIDError."""
+
# Create an NSID that exceeds the 317 character limit
+
long_segment = "a" * 100
+
nsid_str = f"{long_segment}.{long_segment}.{long_segment}.recordName"
+
+
with pytest.raises(InvalidNSIDError, match="domain authority length exceeds limit"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_starts_with_dot(self):
+
"""Test that an NSID starting with a dot raises InvalidNSIDError."""
+
nsid_str = ".com.example.recordName"
+
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_ends_with_dot(self):
+
"""Test that an NSID ending with a dot raises InvalidNSIDError."""
+
nsid_str = "com.example.recordName."
+
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_too_few_segments(self):
+
"""Test that an NSID with too few segments raises InvalidNSIDError."""
+
nsid_str = "com.example"
+
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_domain_authority_too_long(self):
+
"""Test that an NSID with domain authority that is too long raises InvalidNSIDError."""
+
# Create a domain authority that exceeds the 253 character limit
+
long_segment = "a" * 63
+
nsid_str = f"{long_segment}.{long_segment}.{long_segment}.{long_segment}.recordName"
+
+
with pytest.raises(InvalidNSIDError, match="domain authority length exceeds limit"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_domain_segment_too_long(self):
+
"""Test that an NSID with a domain segment that is too long raises InvalidNSIDError."""
+
nsid_str = f"{'a' * 64}.example.recordName"
+
+
with pytest.raises(InvalidNSIDError, match="segment length error"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_domain_segment_empty(self):
+
"""Test that an NSID with an empty domain segment raises InvalidNSIDError."""
+
nsid_str = "com..example.recordName"
+
+
with pytest.raises(InvalidNSIDError, match="segment length error"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_domain_invalid_characters(self):
+
"""Test that an NSID with invalid characters in domain raises InvalidNSIDError."""
+
nsid_str = "com.ex@mple.recordName"
+
+
with pytest.raises(InvalidNSIDError, match="contains invalid characters"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_domain_segment_starts_with_hyphen(self):
+
"""Test that an NSID with a domain segment starting with a hyphen raises InvalidNSIDError."""
+
nsid_str = "com.-example.recordName"
+
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_domain_segment_ends_with_hyphen(self):
+
"""Test that an NSID with a domain segment ending with a hyphen raises InvalidNSIDError."""
+
nsid_str = "com.example-.recordName"
+
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_tld_starts_with_digit(self):
+
"""Test that an NSID with a TLD starting with a digit raises InvalidNSIDError."""
+
nsid_str = "1com.example.recordName"
+
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_name_empty(self):
+
"""Test that an NSID with an empty name raises InvalidNSIDError."""
+
nsid_str = "com.example."
+
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_name_too_long(self):
+
"""Test that an NSID with a name that is too long raises InvalidNSIDError."""
+
nsid_str = f"com.example.{'a' * 64}"
+
+
with pytest.raises(InvalidNSIDError, match="name length error"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_name_invalid_characters(self):
+
"""Test that an NSID with invalid characters in name raises InvalidNSIDError."""
+
nsid_str = "com.example.record-name"
+
+
with pytest.raises(InvalidNSIDError, match="contains invalid characters"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_name_starts_with_digit(self):
+
"""Test that an NSID with a name starting with a digit raises InvalidNSIDError."""
+
nsid_str = "com.example.1record"
+
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_fragment_empty(self):
+
"""Test that an NSID with an empty fragment raises InvalidNSIDError."""
+
nsid_str = "com.example.recordName#"
+
+
with pytest.raises(InvalidNSIDError, match="fragment length error"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_fragment_too_long(self):
+
"""Test that an NSID with a fragment that is too long raises InvalidNSIDError."""
+
nsid_str = f"com.example.recordName#{'a' * 64}"
+
+
with pytest.raises(InvalidNSIDError, match="fragment length error"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_fragment_invalid_characters(self):
+
"""Test that an NSID with invalid characters in fragment raises InvalidNSIDError."""
+
nsid_str = "com.example.recordName#fragment-with-hyphen"
+
+
with pytest.raises(InvalidNSIDError, match="contains invalid characters"):
+
NSID(nsid_str)
+
+
def test_invalid_nsid_fragment_starts_with_digit(self):
+
"""Test that an NSID with a fragment starting with a digit raises InvalidNSIDError."""
+
nsid_str = "com.example.recordName#1fragment"
+
+
with pytest.raises(InvalidNSIDError, match="invalid format"):
+
NSID(nsid_str)
+
+
def test_nsid_equality(self):
+
"""Test NSID equality comparison."""
+
nsid_str = "com.example.recordName"
+
nsid1 = NSID(nsid_str)
+
nsid2 = NSID(nsid_str)
+
+
assert nsid1 == nsid2
+
assert nsid1 != "not an nsid object"
+
+
def test_nsid_string_representation(self):
+
"""Test NSID string representation."""
+
nsid_str = "com.example.recordName"
+
nsid = NSID(nsid_str)
+
+
assert str(nsid) == nsid_str
+
+
def test_nsid_string_representation_with_fragment(self):
+
"""Test NSID string representation with fragment."""
+
nsid_str = "com.example.recordName#fragment"
+
nsid = NSID(nsid_str)
+
+
assert str(nsid) == nsid_str
+110
tests/uri/test_restricted_uri.py
···
+
"""Test cases for the RestrictedURI class in atpasser.uri module."""
+
+
import pytest
+
from atpasser.uri import RestrictedURI
+
from atpasser.uri.exceptions import InvalidRestrictedURIError, InvalidURIError
+
+
+
class TestRestrictedURI:
+
"""Test cases for the RestrictedURI class."""
+
+
def test_valid_restricted_uri_with_did_collection_and_rkey(self):
+
"""Test creating a RestrictedURI with a valid DID, collection, and rkey."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
uri = RestrictedURI(uri_str)
+
+
assert str(uri) == uri_str
+
assert uri.authorityAsText == "did:plc:z72i7hdynmk6r22z27h6tvur"
+
assert uri.path == ["app.bsky.feed.post", "3jwdwj2ctlk26"]
+
assert uri.pathAsText == "app.bsky.feed.post/3jwdwj2ctlk26"
+
assert uri.collection is not None
+
assert str(uri.collection) == "app.bsky.feed.post"
+
assert uri.rkey is not None
+
assert str(uri.rkey) == "3jwdwj2ctlk26"
+
+
def test_valid_restricted_uri_with_handle_collection_and_rkey(self):
+
"""Test creating a RestrictedURI with a valid handle, collection, and rkey."""
+
uri_str = "at://bnewbold.bsky.team/app.bsky.feed.post/3jwdwj2ctlk26"
+
uri = RestrictedURI(uri_str)
+
+
assert str(uri) == uri_str
+
assert uri.authorityAsText == "bnewbold.bsky.team"
+
assert uri.path == ["app.bsky.feed.post", "3jwdwj2ctlk26"]
+
assert uri.collection is not None
+
assert str(uri.collection) == "app.bsky.feed.post"
+
assert uri.rkey is not None
+
assert str(uri.rkey) == "3jwdwj2ctlk26"
+
+
def test_valid_restricted_uri_with_collection_only(self):
+
"""Test creating a RestrictedURI with only a collection."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post"
+
uri = RestrictedURI(uri_str)
+
+
assert str(uri) == uri_str
+
assert uri.authorityAsText == "did:plc:z72i7hdynmk6r22z27h6tvur"
+
assert uri.path == ["app.bsky.feed.post"]
+
assert uri.collection is not None
+
assert str(uri.collection) == "app.bsky.feed.post"
+
assert uri.rkey is None
+
+
def test_valid_restricted_uri_with_authority_only(self):
+
"""Test creating a RestrictedURI with only an authority."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur"
+
uri = RestrictedURI(uri_str)
+
+
assert str(uri) == uri_str
+
assert uri.authorityAsText == "did:plc:z72i7hdynmk6r22z27h6tvur"
+
assert uri.path == []
+
assert uri.collection is None
+
assert uri.rkey is None
+
+
def test_invalid_restricted_uri_with_query(self):
+
"""Test that a RestrictedURI with query parameters raises InvalidRestrictedURIError."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post?param1=value1"
+
+
with pytest.raises(InvalidRestrictedURIError, match="query parameters not supported"):
+
RestrictedURI(uri_str)
+
+
def test_invalid_restricted_uri_with_fragment(self):
+
"""Test that a RestrictedURI with a fragment raises InvalidRestrictedURIError."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26#$.some.json.path"
+
+
with pytest.raises(InvalidRestrictedURIError, match="fragments not supported"):
+
RestrictedURI(uri_str)
+
+
def test_invalid_restricted_uri_with_invalid_authority(self):
+
"""Test that a RestrictedURI with invalid authority raises InvalidRestrictedURIError."""
+
uri_str = "at://invalid_authority/app.bsky.feed.post/3jwdwj2ctlk26"
+
+
with pytest.raises(InvalidRestrictedURIError, match="invalid authority"):
+
RestrictedURI(uri_str)
+
+
def test_invalid_restricted_uri_too_many_path_segments(self):
+
"""Test that a RestrictedURI with too many path segments raises InvalidRestrictedURIError."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26/extra"
+
+
with pytest.raises(InvalidRestrictedURIError, match="too many path segments"):
+
RestrictedURI(uri_str)
+
+
def test_invalid_restricted_uri_base_uri_validation_failure(self):
+
"""Test that a RestrictedURI with invalid base URI raises InvalidURIError."""
+
uri_str = "https://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post"
+
+
with pytest.raises(InvalidURIError, match="invalid format"):
+
RestrictedURI(uri_str)
+
+
def test_restricted_uri_equality(self):
+
"""Test RestrictedURI equality comparison."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
uri1 = RestrictedURI(uri_str)
+
uri2 = RestrictedURI(uri_str)
+
+
assert uri1 == uri2
+
assert uri1 != "not a uri object"
+
+
def test_restricted_uri_string_representation(self):
+
"""Test RestrictedURI string representation."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
uri = RestrictedURI(uri_str)
+
+
assert str(uri) == uri_str
+305
tests/uri/test_rkey.py
···
+
"""Test cases for the RKey and TID classes in atpasser.uri.rkey module."""
+
+
import pytest
+
import datetime
+
from atpasser.uri.rkey import RKey, TID, importTIDfromInteger, importTIDfromBase32
+
from atpasser.uri.exceptions import InvalidRKeyError
+
+
+
class TestRKey:
+
"""Test cases for the RKey class."""
+
+
def test_valid_rkey_simple(self):
+
"""Test creating an RKey with a valid simple format."""
+
rkey_str = "3jui7kd54zh2y"
+
rkey = RKey(rkey_str)
+
+
assert str(rkey) == rkey_str
+
assert rkey.recordKey == rkey_str
+
+
def test_valid_rkey_with_various_characters(self):
+
"""Test creating an RKey with various valid characters."""
+
rkey_str = "example.com"
+
rkey = RKey(rkey_str)
+
+
assert str(rkey) == rkey_str
+
assert rkey.recordKey == rkey_str
+
+
def test_valid_rkey_with_special_characters(self):
+
"""Test creating an RKey with valid special characters."""
+
rkey_str = "~1.2-3_"
+
rkey = RKey(rkey_str)
+
+
assert str(rkey) == rkey_str
+
assert rkey.recordKey == rkey_str
+
+
def test_valid_rkey_with_colon(self):
+
"""Test creating an RKey with a colon."""
+
rkey_str = "pre:fix"
+
rkey = RKey(rkey_str)
+
+
assert str(rkey) == rkey_str
+
assert rkey.recordKey == rkey_str
+
+
def test_valid_rkey_underscore(self):
+
"""Test creating an RKey with just an underscore."""
+
rkey_str = "_"
+
rkey = RKey(rkey_str)
+
+
assert str(rkey) == rkey_str
+
assert rkey.recordKey == rkey_str
+
+
def test_invalid_rkey_empty(self):
+
"""Test that an empty RKey raises InvalidRKeyError."""
+
rkey_str = ""
+
+
with pytest.raises(InvalidRKeyError, match="record key is empty"):
+
RKey(rkey_str)
+
+
def test_invalid_rkey_too_long(self):
+
"""Test that an RKey that is too long raises InvalidRKeyError."""
+
# Create an RKey that exceeds the 512 character limit
+
rkey_str = "a" * 513
+
+
with pytest.raises(InvalidRKeyError, match="exceeds maximum length"):
+
RKey(rkey_str)
+
+
def test_invalid_rkey_reserved_double_dot(self):
+
"""Test that an RKey with '..' raises InvalidRKeyError."""
+
rkey_str = ".."
+
+
with pytest.raises(InvalidRKeyError, match="reserved value"):
+
RKey(rkey_str)
+
+
def test_invalid_rkey_reserved_single_dot(self):
+
"""Test that an RKey with '.' raises InvalidRKeyError."""
+
rkey_str = "."
+
+
with pytest.raises(InvalidRKeyError, match="reserved value"):
+
RKey(rkey_str)
+
+
def test_invalid_rkey_invalid_characters(self):
+
"""Test that an RKey with invalid characters raises InvalidRKeyError."""
+
rkey_str = "alpha/beta"
+
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
+
RKey(rkey_str)
+
+
def test_invalid_rkey_hash_character(self):
+
"""Test that an RKey with a hash character raises InvalidRKeyError."""
+
rkey_str = "#extra"
+
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
+
RKey(rkey_str)
+
+
def test_invalid_rkey_at_character(self):
+
"""Test that an RKey with an at character raises InvalidRKeyError."""
+
rkey_str = "@handle"
+
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
+
RKey(rkey_str)
+
+
def test_invalid_rkey_space(self):
+
"""Test that an RKey with a space raises InvalidRKeyError."""
+
rkey_str = "any space"
+
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
+
RKey(rkey_str)
+
+
def test_invalid_rkey_plus_character(self):
+
"""Test that an RKey with a plus character raises InvalidRKeyError."""
+
rkey_str = "any+space"
+
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
+
RKey(rkey_str)
+
+
def test_invalid_rkey_brackets(self):
+
"""Test that an RKey with brackets raises InvalidRKeyError."""
+
rkey_str = "number[3]"
+
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
+
RKey(rkey_str)
+
+
def test_invalid_rkey_parentheses(self):
+
"""Test that an RKey with parentheses raises InvalidRKeyError."""
+
rkey_str = "number(3)"
+
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
+
RKey(rkey_str)
+
+
def test_invalid_rkey_quotes(self):
+
"""Test that an RKey with quotes raises InvalidRKeyError."""
+
rkey_str = '"quote"'
+
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
+
RKey(rkey_str)
+
+
def test_invalid_rkey_base64_padding(self):
+
"""Test that an RKey with base64 padding raises InvalidRKeyError."""
+
rkey_str = "dHJ1ZQ=="
+
+
with pytest.raises(InvalidRKeyError, match="contains invalid characters"):
+
RKey(rkey_str)
+
+
def test_rkey_equality(self):
+
"""Test RKey equality comparison."""
+
rkey_str = "3jui7kd54zh2y"
+
rkey1 = RKey(rkey_str)
+
rkey2 = RKey(rkey_str)
+
+
assert rkey1 == rkey2
+
assert rkey1 != "not an rkey object"
+
+
def test_rkey_string_representation(self):
+
"""Test RKey string representation."""
+
rkey_str = "3jui7kd54zh2y"
+
rkey = RKey(rkey_str)
+
+
assert str(rkey) == rkey_str
+
+
+
class TestTID:
+
"""Test cases for the TID class."""
+
+
def test_tid_creation_default(self):
+
"""Test creating a TID with default parameters."""
+
tid = TID()
+
+
assert isinstance(tid, TID)
+
assert isinstance(tid, RKey)
+
assert isinstance(tid.timestamp, datetime.datetime)
+
assert isinstance(tid.clockIdentifier, int)
+
assert 0 <= tid.clockIdentifier < 1024
+
assert len(str(tid)) == 13 # TID string is always 13 characters
+
+
def test_tid_creation_with_timestamp(self):
+
"""Test creating a TID with a specific timestamp."""
+
timestamp = datetime.datetime(2023, 1, 1, 12, 0, 0)
+
tid = TID(time=timestamp)
+
+
assert tid.timestamp == timestamp
+
assert isinstance(tid.clockIdentifier, int)
+
assert 0 <= tid.clockIdentifier < 1024
+
+
def test_tid_creation_with_clock_identifier(self):
+
"""Test creating a TID with a specific clock identifier."""
+
clock_id = 42
+
tid = TID(clockIdentifier=clock_id)
+
+
assert tid.clockIdentifier == clock_id
+
assert isinstance(tid.timestamp, datetime.datetime)
+
+
def test_tid_creation_with_both_parameters(self):
+
"""Test creating a TID with both timestamp and clock identifier."""
+
timestamp = datetime.datetime(2023, 1, 1, 12, 0, 0)
+
clock_id = 42
+
tid = TID(time=timestamp, clockIdentifier=clock_id)
+
+
assert tid.timestamp == timestamp
+
assert tid.clockIdentifier == clock_id
+
+
def test_tid_integer_representation(self):
+
"""Test TID integer representation."""
+
timestamp = datetime.datetime(2023, 1, 1, 12, 0, 0)
+
clock_id = 42
+
tid = TID(time=timestamp, clockIdentifier=clock_id)
+
+
int_value = int(tid)
+
expected_value = int(timestamp.timestamp() * 1000000) * 1024 + clock_id
+
+
assert int_value == expected_value
+
+
def test_tid_string_representation(self):
+
"""Test TID string representation."""
+
tid = TID()
+
+
str_value = str(tid)
+
assert len(str_value) == 13
+
assert all(c in "234567abcdefghijklmnopqrstuvwxyz" for c in str_value)
+
+
def test_tid_equality_with_tid(self):
+
"""Test TID equality comparison with another TID."""
+
timestamp = datetime.datetime(2023, 1, 1, 12, 0, 0)
+
clock_id = 42
+
tid1 = TID(time=timestamp, clockIdentifier=clock_id)
+
tid2 = TID(time=timestamp, clockIdentifier=clock_id)
+
+
assert tid1 == tid2
+
+
def test_tid_equality_with_rkey(self):
+
"""Test TID equality comparison with an RKey."""
+
timestamp = datetime.datetime(2023, 1, 1, 12, 0, 0)
+
clock_id = 42
+
tid = TID(time=timestamp, clockIdentifier=clock_id)
+
rkey = RKey(str(tid))
+
+
assert tid == rkey
+
+
def test_tid_inequality_with_different_object(self):
+
"""Test TID inequality comparison with a different object type."""
+
tid = TID()
+
+
assert tid != "not a tid object"
+
+
def test_tid_inequality_with_different_timestamp(self):
+
"""Test TID inequality comparison with different timestamp."""
+
timestamp1 = datetime.datetime(2023, 1, 1, 12, 0, 0)
+
timestamp2 = datetime.datetime(2023, 1, 1, 12, 0, 1)
+
clock_id = 42
+
tid1 = TID(time=timestamp1, clockIdentifier=clock_id)
+
tid2 = TID(time=timestamp2, clockIdentifier=clock_id)
+
+
assert tid1 != tid2
+
+
def test_tid_inequality_with_different_clock_id(self):
+
"""Test TID inequality comparison with different clock identifier."""
+
timestamp = datetime.datetime(2023, 1, 1, 12, 0, 0)
+
clock_id1 = 42
+
clock_id2 = 43
+
tid1 = TID(time=timestamp, clockIdentifier=clock_id1)
+
tid2 = TID(time=timestamp, clockIdentifier=clock_id2)
+
+
assert tid1 != tid2
+
+
+
class TestTIDImportFunctions:
+
"""Test cases for TID import functions."""
+
+
def test_import_tid_from_integer_default(self):
+
"""Test importing a TID from integer with default value."""
+
tid = importTIDfromInteger()
+
+
assert isinstance(tid, TID)
+
assert isinstance(tid.timestamp, datetime.datetime)
+
assert isinstance(tid.clockIdentifier, int)
+
assert 0 <= tid.clockIdentifier < 1024
+
+
def test_import_tid_from_integer_with_value(self):
+
"""Test importing a TID from integer with a specific value."""
+
timestamp = datetime.datetime(2023, 1, 1, 12, 0, 0)
+
clock_id = 42
+
original_tid = TID(time=timestamp, clockIdentifier=clock_id)
+
int_value = int(original_tid)
+
+
imported_tid = importTIDfromInteger(int_value)
+
+
assert imported_tid.timestamp == timestamp
+
assert imported_tid.clockIdentifier == clock_id
+
+
def test_import_tid_from_base32_default(self):
+
"""Test importing a TID from base32 with default value."""
+
tid = importTIDfromBase32()
+
+
assert isinstance(tid, TID)
+
assert isinstance(tid.timestamp, datetime.datetime)
+
assert isinstance(tid.clockIdentifier, int)
+
assert 0 <= tid.clockIdentifier < 1024
+
+
def test_import_tid_from_base32_with_value(self):
+
"""Test importing a TID from base32 with a specific value."""
+
original_tid = TID()
+
str_value = str(original_tid)
+
+
imported_tid = importTIDfromBase32(str_value)
+
+
assert int(imported_tid) == int(original_tid)
+122
tests/uri/test_uri.py
···
+
"""Test cases for the URI class in atpasser.uri module."""
+
+
import pytest
+
from atpasser.uri import URI
+
from atpasser.uri.exceptions import InvalidURIError, ValidationError
+
+
+
class TestURI:
+
"""Test cases for the URI class."""
+
+
def test_valid_uri_with_did(self):
+
"""Test creating a URI with a valid DID."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
uri = URI(uri_str)
+
+
assert str(uri) == uri_str
+
assert uri.authorityAsText == "did:plc:z72i7hdynmk6r22z27h6tvur"
+
assert uri.path == ["app.bsky.feed.post", "3jwdwj2ctlk26"]
+
assert uri.pathAsText == "app.bsky.feed.post/3jwdwj2ctlk26"
+
assert uri.query is None
+
assert uri.queryAsText is None
+
assert uri.fragment is None
+
assert uri.fragmentAsText is None
+
+
def test_valid_uri_with_handle(self):
+
"""Test creating a URI with a valid handle."""
+
uri_str = "at://bnewbold.bsky.team/app.bsky.feed.post/3jwdwj2ctlk26"
+
uri = URI(uri_str)
+
+
assert str(uri) == uri_str
+
assert uri.authorityAsText == "bnewbold.bsky.team"
+
assert uri.path == ["app.bsky.feed.post", "3jwdwj2ctlk26"]
+
assert uri.pathAsText == "app.bsky.feed.post/3jwdwj2ctlk26"
+
+
def test_valid_uri_with_collection_only(self):
+
"""Test creating a URI with only a collection."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post"
+
uri = URI(uri_str)
+
+
assert str(uri) == uri_str
+
assert uri.authorityAsText == "did:plc:z72i7hdynmk6r22z27h6tvur"
+
assert uri.path == ["app.bsky.feed.post"]
+
assert uri.pathAsText == "app.bsky.feed.post"
+
+
def test_valid_uri_with_authority_only(self):
+
"""Test creating a URI with only an authority."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur"
+
uri = URI(uri_str)
+
+
assert str(uri) == uri_str
+
assert uri.authorityAsText == "did:plc:z72i7hdynmk6r22z27h6tvur"
+
assert uri.path == []
+
assert uri.pathAsText == ""
+
+
def test_valid_uri_with_query(self):
+
"""Test creating a URI with query parameters."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post?param1=value1&param2=value2"
+
uri = URI(uri_str)
+
+
assert uri.query == {"param1": ["value1"], "param2": ["value2"]}
+
assert uri.queryAsText == "param1%3Dvalue1%26param2%3Dvalue2"
+
+
def test_valid_uri_with_fragment(self):
+
"""Test creating a URI with a fragment."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26#$.some.json.path"
+
uri = URI(uri_str)
+
+
assert uri.fragment is not None
+
assert uri.fragmentAsText == "%24.some.json.path"
+
+
def test_invalid_uri_non_ascii_characters(self):
+
"""Test that non-ASCII characters in URI raise InvalidURIError."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/💩"
+
+
with pytest.raises(InvalidURIError, match="contains invalid characters"):
+
URI(uri_str)
+
+
def test_invalid_uri_too_long(self):
+
"""Test that a URI that is too long raises InvalidURIError."""
+
# Create a URI that exceeds the 8000 character limit
+
long_path = "a" * 8000
+
uri_str = f"at://did:plc:z72i7hdynmk6r22z27h6tvur/{long_path}"
+
+
with pytest.raises(InvalidURIError, match="exceeds maximum length"):
+
URI(uri_str)
+
+
def test_invalid_uri_wrong_scheme(self):
+
"""Test that a URI with wrong scheme raises InvalidURIError."""
+
uri_str = "https://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
+
with pytest.raises(InvalidURIError, match="invalid format"):
+
URI(uri_str)
+
+
def test_invalid_uri_trailing_slash(self):
+
"""Test that a URI with trailing slash raises InvalidURIError."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/"
+
+
with pytest.raises(InvalidURIError, match="cannot end with a slash"):
+
URI(uri_str)
+
+
def test_invalid_uri_with_userinfo(self):
+
"""Test that a URI with userinfo raises InvalidURIError."""
+
uri_str = "at://user:pass@did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post"
+
+
with pytest.raises(InvalidURIError, match="does not support user information"):
+
URI(uri_str)
+
+
def test_uri_equality(self):
+
"""Test URI equality comparison."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
uri1 = URI(uri_str)
+
uri2 = URI(uri_str)
+
+
assert uri1 == uri2
+
assert uri1 != "not a uri object"
+
+
def test_uri_string_representation(self):
+
"""Test URI string representation."""
+
uri_str = "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jwdwj2ctlk26"
+
uri = URI(uri_str)
+
+
assert str(uri) == uri_str