format code

+57 -13
src/atpasser/uri/__init__.py
···
"""
if not set(uri).issubset(set([chr(i) for i in range(0x80)])):
-
raise InvalidURIError(uri, "contains invalid characters", "URI must only contain ASCII characters")
+
raise InvalidURIError(
+
uri,
+
"contains invalid characters",
+
"URI must only contain ASCII characters",
+
)
if len(uri) > 8000:
-
raise InvalidURIError(uri, "exceeds maximum length", f"URI length {len(uri)} exceeds maximum allowed length of 8000 characters")
+
raise InvalidURIError(
+
uri,
+
"exceeds maximum length",
+
f"URI length {len(uri)} exceeds maximum allowed length of 8000 characters",
+
)
if not uri.startswith("at://"):
raise InvalidURIError(uri, "invalid format", "URI must start with 'at://'")
···
except IndexError:
fragment = None
except Exception as e:
-
raise InvalidURIError(uri, "fragment parsing failed", f"Failed to parse URI fragment: {str(e)}")
+
raise InvalidURIError(
+
uri,
+
"fragment parsing failed",
+
f"Failed to parse URI fragment: {str(e)}",
+
)
try:
query = up.unquote(burner[0].split("?")[1])
except IndexError:
query = None
except Exception as e:
-
raise InvalidURIError(uri, "query parameter parsing failed", f"Failed to parse query parameters: {str(e)}")
+
raise InvalidURIError(
+
uri,
+
"query parameter parsing failed",
+
f"Failed to parse query parameters: {str(e)}",
+
)
try:
path = [up.unquote(segment) for segment in burner[0].split("/")[1:]]
except Exception as e:
-
raise InvalidURIError(uri, "path parsing failed", f"Failed to parse URI path: {str(e)}")
-
+
raise InvalidURIError(
+
uri, "path parsing failed", f"Failed to parse URI path: {str(e)}"
+
)
+
if len(path) > 0 and path[-1] == "":
-
raise InvalidURIError(uri, "invalid path format", "URI cannot end with a slash")
+
raise InvalidURIError(
+
uri, "invalid path format", "URI cannot end with a slash"
+
)
try:
authorityValue = up.unquote(burner[0].split("/")[0])
except Exception as e:
-
raise InvalidURIError(uri, "authority parsing failed", f"Failed to parse URI authority: {str(e)}")
+
raise InvalidURIError(
+
uri,
+
"authority parsing failed",
+
f"Failed to parse URI authority: {str(e)}",
+
)
try:
p = up.urlparse(up.unquote("//" + authorityValue))
if p.username is not None or p.password is not None:
-
raise InvalidURIError(uri, "unsupported format", "URI does not support user information (username:password)")
+
raise InvalidURIError(
+
uri,
+
"unsupported format",
+
"URI does not support user information (username:password)",
+
)
except Exception as e:
if not isinstance(e, InvalidURIError):
-
raise InvalidURIError(uri, "authority parsing failed", f"Failed to parse authority: {str(e)}")
+
raise InvalidURIError(
+
uri,
+
"authority parsing failed",
+
f"Failed to parse authority: {str(e)}",
+
)
else:
raise
···
self.fragment = jsonpath_ng.parse(fragment)
self.fragmentAsText = up.quote(fragment)
except Exception as e:
-
raise InvalidURIError(uri, "JSONPath parsing failed", f"Failed to parse JSONPath fragment '{fragment}': {str(e)}")
+
raise InvalidURIError(
+
uri,
+
"JSONPath parsing failed",
+
f"Failed to parse JSONPath fragment '{fragment}': {str(e)}",
+
)
if query != None:
try:
self.query = up.parse_qs(query)
self.queryAsText = up.quote(query)
except Exception as e:
-
raise InvalidURIError(uri, "query parameter parsing failed", f"Failed to parse query parameters '{query}': {str(e)}")
+
raise InvalidURIError(
+
uri,
+
"query parameter parsing failed",
+
f"Failed to parse query parameters '{query}': {str(e)}",
+
)
else:
self.query, self.queryAsText = None, None
···
"#" + fragment if fragment != None else "",
)
except Exception as e:
-
raise InvalidURIError(uri, "URI construction failed", f"Failed to construct canonical URI: {str(e)}")
+
raise InvalidURIError(
+
uri,
+
"URI construction failed",
+
f"Failed to construct canonical URI: {str(e)}",
+
)
def __str__(self) -> str:
"""Convert the URI object to its string representation.
+24 -8
src/atpasser/uri/did.py
···
"""
pattern = re.compile("^did:[a-z]+:[a-zA-Z0-9._:%-]*[a-zA-Z0-9._-]$")
patternMatch = pattern.match(uri)
-
+
if not patternMatch:
-
raise InvalidDIDError(uri, "invalid format", "DID must follow 'did:method:identifier' format")
-
+
raise InvalidDIDError(
+
uri, "invalid format", "DID must follow 'did:method:identifier' format"
+
)
+
if len(uri) > 2048:
-
raise InvalidDIDError(uri, "exceeds maximum length", f"DID length {len(uri)} exceeds maximum allowed length of 2048 characters")
-
+
raise InvalidDIDError(
+
uri,
+
"exceeds maximum length",
+
f"DID length {len(uri)} exceeds maximum allowed length of 2048 characters",
+
)
+
self.uri = patternMatch[0]
def __str__(self) -> str:
···
elif self.uri.startswith("did:web:"):
domain = self.uri.replace("did:web:", "")
if not domain:
-
raise InvalidDIDError(self.uri, "invalid format", "did:web DID must contain a domain")
+
raise InvalidDIDError(
+
self.uri, "invalid format", "did:web DID must contain a domain"
+
)
return jsonld.expand(f"https://{domain}/.well-known/did.json")
else:
-
raise InvalidDIDError(self.uri, "unsupported DID method", f"Unsupported DID method: {self.uri.split(':')[1]}")
+
raise InvalidDIDError(
+
self.uri,
+
"unsupported DID method",
+
f"Unsupported DID method: {self.uri.split(':')[1]}",
+
)
except Exception as e:
if isinstance(e, (InvalidDIDError, ResolutionError)):
raise
-
raise ResolutionError(self.uri, "fetch DID document", f"Failed to fetch or parse DID document: {str(e)}")
+
raise ResolutionError(
+
self.uri,
+
"fetch DID document",
+
f"Failed to fetch or parse DID document: {str(e)}",
+
)
+16 -11
src/atpasser/uri/exceptions.py
···
class URIError(Exception):
"""Base class for AT Protocol URI related errors."""
+
pass
class InvalidURIError(URIError):
"""Raised when URI format is invalid."""
-
+
def __init__(self, uri: str, reason: str, details: Optional[str] = None):
self.uri = uri
self.reason = reason
···
class InvalidDIDError(URIError):
"""Raised when DID format is invalid."""
-
+
def __init__(self, did: str, reason: str, details: Optional[str] = None):
self.did = did
self.reason = reason
···
class InvalidHandleError(URIError):
"""Raised when Handle format is invalid."""
-
+
def __init__(self, handle: str, reason: str, details: Optional[str] = None):
self.handle = handle
self.reason = reason
···
class InvalidNSIDError(URIError):
"""Raised when NSID format is invalid."""
-
+
def __init__(self, nsid: str, reason: str, details: Optional[str] = None):
self.nsid = nsid
self.reason = reason
···
class InvalidRKeyError(URIError):
"""Raised when RKey format is invalid."""
-
+
def __init__(self, rkey: str, reason: str, details: Optional[str] = None):
self.rkey = rkey
self.reason = reason
···
class InvalidRestrictedURIError(URIError):
"""Raised when Restricted URI format is invalid."""
-
+
def __init__(self, uri: str, reason: str, details: Optional[str] = None):
self.uri = uri
self.reason = reason
···
class ResolutionError(URIError):
"""Raised when resolution operation fails."""
-
-
def __init__(self, target: str, operation: str, reason: str, details: Optional[str] = None):
+
+
def __init__(
+
self, target: str, operation: str, reason: str, details: Optional[str] = None
+
):
self.target = target
self.operation = operation
self.reason = reason
···
class ValidationError(URIError):
"""Raised when validation operation fails."""
-
-
def __init__(self, value: Any, field: str, reason: str, details: Optional[str] = None):
+
+
def __init__(
+
self, value: Any, field: str, reason: str, details: Optional[str] = None
+
):
self.value = value
self.field = field
self.reason = reason
···
message = f"Validation failed: value '{value}' for field '{field}' {reason}"
if details:
message += f" ({details})"
-
super().__init__(message)
+
super().__init__(message)
+61 -17
src/atpasser/uri/handle.py
···
"""
if len(handle) > 253:
-
raise InvalidHandleError(handle, "exceeds maximum length", f"Handle length {len(handle)} exceeds maximum allowed length of 253 characters")
+
raise InvalidHandleError(
+
handle,
+
"exceeds maximum length",
+
f"Handle length {len(handle)} exceeds maximum allowed length of 253 characters",
+
)
labels = handle.lower().split(".")
if len(labels) < 2:
-
raise InvalidHandleError(handle, "invalid format", "Handle must contain at least one dot separator, e.g., 'example.com'")
+
raise InvalidHandleError(
+
handle,
+
"invalid format",
+
"Handle must contain at least one dot separator, e.g., 'example.com'",
+
)
if labels[0] == "" or labels[-1] == "":
-
raise InvalidHandleError(handle, "invalid format", "Handle cannot start or end with a dot")
+
raise InvalidHandleError(
+
handle, "invalid format", "Handle cannot start or end with a dot"
+
)
for i, label in enumerate(labels):
if len(label) not in range(1, 64):
-
raise InvalidHandleError(handle, "segment length error", f"Handle segment {i+1} length {len(label)} is not in the 1-63 character range")
+
raise InvalidHandleError(
+
handle,
+
"segment length error",
+
f"Handle segment {i+1} length {len(label)} is not in the 1-63 character range",
+
)
charset = set(label)
validset = set("abcdefghijklmnopqrstuvwxyz0123456789-")
if not charset.issubset(validset):
invalid_chars = charset - validset
-
raise InvalidHandleError(handle, "contains invalid characters", f"Handle segment {i+1} contains invalid characters: {', '.join(invalid_chars)}")
+
raise InvalidHandleError(
+
handle,
+
"contains invalid characters",
+
f"Handle segment {i+1} contains invalid characters: {', '.join(invalid_chars)}",
+
)
if label.startswith("-") or label.endswith("-"):
-
raise InvalidHandleError(handle, "invalid format", f"Handle segment {i+1} cannot start or end with a hyphen")
+
raise InvalidHandleError(
+
handle,
+
"invalid format",
+
f"Handle segment {i+1} cannot start or end with a hyphen",
+
)
tld = labels[-1]
if tld[0] in "0123456789":
-
raise InvalidHandleError(handle, "invalid format", "Handle's top-level domain cannot start with a digit")
+
raise InvalidHandleError(
+
handle,
+
"invalid format",
+
"Handle's top-level domain cannot start with a digit",
+
)
self.handle = handle
···
answers = dns.resolver.resolve("_atproto." + self.handle, "TXT")
except Exception as e:
answers = []
-
+
for answer in answers:
answer_str = str(answer)
if answer_str.startswith('"did='):
···
# Continue trying other records or methods
continue
except Exception as e:
-
raise ResolutionError(self.handle, "DNS resolution", f"Error parsing DNS TXT record: {str(e)}")
-
+
raise ResolutionError(
+
self.handle,
+
"DNS resolution",
+
f"Error parsing DNS TXT record: {str(e)}",
+
)
+
# If DNS resolution fails, try HTTP method
try:
-
response = requests.get(f"https://{self.handle}/.well-known/atproto-did")
+
response = requests.get(
+
f"https://{self.handle}/.well-known/atproto-did"
+
)
if response.status_code // 100 != 2:
return None
-
+
# Some websites may return incorrect Content-Type, so here we only warn without throwing an exception
content_type = response.headers.get("Content-Type")
if content_type != "text/plain" and content_type:
# Log warning but don't block processing
pass
-
+
try:
return DID(response.text.strip())
except InvalidDIDError:
return None
except requests.RequestException as e:
-
raise ResolutionError(self.handle, "HTTP request", f"Error requesting well-known endpoint: {str(e)}")
+
raise ResolutionError(
+
self.handle,
+
"HTTP request",
+
f"Error requesting well-known endpoint: {str(e)}",
+
)
except Exception as e:
-
raise ResolutionError(self.handle, "HTTP parsing", f"Error parsing HTTP response: {str(e)}")
-
+
raise ResolutionError(
+
self.handle,
+
"HTTP parsing",
+
f"Error parsing HTTP response: {str(e)}",
+
)
+
except Exception as e:
if isinstance(e, ResolutionError):
raise
-
raise ResolutionError(self.handle, "resolution", f"Unknown error occurred while resolving Handle: {str(e)}")
+
raise ResolutionError(
+
self.handle,
+
"resolution",
+
f"Unknown error occurred while resolving Handle: {str(e)}",
+
)
+82 -20
src/atpasser/uri/nsid.py
···
if "#" in nsid:
parts = nsid.split("#", 1)
if len(parts) != 2:
-
raise InvalidNSIDError(nsid, "invalid format", "NSID fragment format is incorrect")
+
raise InvalidNSIDError(
+
nsid, "invalid format", "NSID fragment format is incorrect"
+
)
nsidWithoutFragment, fragment = parts
else:
nsidWithoutFragment, fragment = nsid, None
···
if not set([x for x in nsidWithoutFragment]).issubset(
set([chr(i) for i in range(0x80)])
):
-
raise InvalidNSIDError(nsid, "contains invalid characters", "NSID must only contain ASCII characters")
+
raise InvalidNSIDError(
+
nsid,
+
"contains invalid characters",
+
"NSID must only contain ASCII characters",
+
)
# Check length
if len(nsidWithoutFragment) > 317:
-
raise InvalidNSIDError(nsid, "exceeds maximum length", f"NSID length {len(nsidWithoutFragment)} exceeds maximum allowed length of 317 characters")
+
raise InvalidNSIDError(
+
nsid,
+
"exceeds maximum length",
+
f"NSID length {len(nsidWithoutFragment)} exceeds maximum allowed length of 317 characters",
+
)
segments = nsidWithoutFragment.split(".")
# Check for leading or trailing dots
if nsidWithoutFragment.startswith(".") or nsidWithoutFragment.endswith("."):
-
raise InvalidNSIDError(nsid, "invalid format", "NSID cannot start or end with a dot")
+
raise InvalidNSIDError(
+
nsid, "invalid format", "NSID cannot start or end with a dot"
+
)
# Check segment count
if len(segments) < 3:
-
raise InvalidNSIDError(nsid, "invalid format", f"NSID must contain at least 3 segments, currently has {len(segments)}")
+
raise InvalidNSIDError(
+
nsid,
+
"invalid format",
+
f"NSID must contain at least 3 segments, currently has {len(segments)}",
+
)
domainAuthority = [segment.lower() for segment in segments[0:-1]]
# Check domain authority length
if len(".".join(domainAuthority)) > 253:
-
raise InvalidNSIDError(nsid, "domain authority length exceeds limit", "Domain authority part length exceeds 253 characters")
+
raise InvalidNSIDError(
+
nsid,
+
"domain authority length exceeds limit",
+
"Domain authority part length exceeds 253 characters",
+
)
# Check each domain segment
for i, segment in enumerate(domainAuthority):
if len(segment) > 63 or segment == "":
-
raise InvalidNSIDError(nsid, "segment length error", f"Domain authority segment {i+1} length is not in the 1-63 character range")
+
raise InvalidNSIDError(
+
nsid,
+
"segment length error",
+
f"Domain authority segment {i+1} length is not in the 1-63 character range",
+
)
if not set(segment).issubset(set("abcdefghijklmnopqrstuvwxyz0123456789-")):
-
invalid_chars = set(segment) - set("abcdefghijklmnopqrstuvwxyz0123456789-")
-
raise InvalidNSIDError(nsid, "contains invalid characters", f"Domain authority segment {i+1} contains invalid characters: {', '.join(invalid_chars)}")
+
invalid_chars = set(segment) - set(
+
"abcdefghijklmnopqrstuvwxyz0123456789-"
+
)
+
raise InvalidNSIDError(
+
nsid,
+
"contains invalid characters",
+
f"Domain authority segment {i+1} contains invalid characters: {', '.join(invalid_chars)}",
+
)
if segment.startswith("-") or segment.endswith("-"):
-
raise InvalidNSIDError(nsid, "invalid format", f"Domain authority segment {i+1} cannot start or end with a hyphen")
-
+
raise InvalidNSIDError(
+
nsid,
+
"invalid format",
+
f"Domain authority segment {i+1} cannot start or end with a hyphen",
+
)
+
# Check if top-level domain starts with a digit
if segments[0][0] in "0123456789":
-
raise InvalidNSIDError(nsid, "invalid format", "NSID's top-level domain cannot start with a digit")
+
raise InvalidNSIDError(
+
nsid,
+
"invalid format",
+
"NSID's top-level domain cannot start with a digit",
+
)
self.domainAuthority = domainAuthority
self.domainAuthorityAsText = ".".join(domainAuthority)
···
# Check name
if name == "" or len(name) > 63:
-
raise InvalidNSIDError(nsid, "name length error", "NSID name cannot be empty and length cannot exceed 63 characters")
+
raise InvalidNSIDError(
+
nsid,
+
"name length error",
+
"NSID name cannot be empty and length cannot exceed 63 characters",
+
)
if not set(name).issubset(
set("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789")
):
-
invalid_chars = set(name) - set("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789")
-
raise InvalidNSIDError(nsid, "contains invalid characters", f"NSID name contains invalid characters: {', '.join(invalid_chars)}")
+
invalid_chars = set(name) - set(
+
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
+
)
+
raise InvalidNSIDError(
+
nsid,
+
"contains invalid characters",
+
f"NSID name contains invalid characters: {', '.join(invalid_chars)}",
+
)
if name[0] in "0123456789":
-
raise InvalidNSIDError(nsid, "invalid format", "NSID name cannot start with a digit")
+
raise InvalidNSIDError(
+
nsid, "invalid format", "NSID name cannot start with a digit"
+
)
self.name = name
# Check fragment
if fragment != None:
if fragment == "" or len(fragment) > 63:
-
raise InvalidNSIDError(nsid, "fragment length error", "NSID fragment cannot be empty and length cannot exceed 63 characters")
+
raise InvalidNSIDError(
+
nsid,
+
"fragment length error",
+
"NSID fragment cannot be empty and length cannot exceed 63 characters",
+
)
if not set(fragment).issubset(
set("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789")
):
-
invalid_chars = set(fragment) - set("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789")
-
raise InvalidNSIDError(nsid, "contains invalid characters", f"NSID fragment contains invalid characters: {', '.join(invalid_chars)}")
+
invalid_chars = set(fragment) - set(
+
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
+
)
+
raise InvalidNSIDError(
+
nsid,
+
"contains invalid characters",
+
f"NSID fragment contains invalid characters: {', '.join(invalid_chars)}",
+
)
if fragment[0] in "0123456789":
-
raise InvalidNSIDError(nsid, "invalid format", "NSID fragment cannot start with a digit")
+
raise InvalidNSIDError(
+
nsid, "invalid format", "NSID fragment cannot start with a digit"
+
)
self.fragment = fragment
+28 -8
src/atpasser/uri/restricted.py
···
from .exceptions import InvalidRestrictedURIError, InvalidURIError
-
class RestrictedURI(URI):
"""A class representing a restricted AT Protocol URI for record access.
···
except InvalidURIError:
raise
except Exception as e:
-
raise InvalidURIError(uri, "base URI validation failed", f"Failed to parse base URI: {str(e)}")
+
raise InvalidURIError(
+
uri, "base URI validation failed", f"Failed to parse base URI: {str(e)}"
+
)
if self.query is not None:
-
raise InvalidRestrictedURIError(uri, "query parameters not supported", "Restricted URI cannot contain query parameters")
+
raise InvalidRestrictedURIError(
+
uri,
+
"query parameters not supported",
+
"Restricted URI cannot contain query parameters",
+
)
if self.fragment is not None:
-
raise InvalidRestrictedURIError(uri, "fragments not supported", "Restricted URI cannot contain fragments")
+
raise InvalidRestrictedURIError(
+
uri,
+
"fragments not supported",
+
"Restricted URI cannot contain fragments",
+
)
if self.authority is None:
-
raise InvalidRestrictedURIError(uri, "invalid authority", "Restricted URI must contain a valid DID or Handle")
+
raise InvalidRestrictedURIError(
+
uri,
+
"invalid authority",
+
"Restricted URI must contain a valid DID or Handle",
+
)
try:
if len(self.path) == 0:
···
self.collection = nsid.NSID(self.path[0])
self.rkey = rKey.RKey(self.path[1])
else:
-
raise InvalidRestrictedURIError(uri, "too many path segments", f"Restricted URI can have at most 2 path segments, currently has {len(self.path)}")
+
raise InvalidRestrictedURIError(
+
uri,
+
"too many path segments",
+
f"Restricted URI can have at most 2 path segments, currently has {len(self.path)}",
+
)
except Exception as e:
if isinstance(e, (InvalidRestrictedURIError, InvalidURIError)):
raise
-
raise InvalidRestrictedURIError(uri, "parsing error", f"Error occurred while parsing Restricted URI: {str(e)}")
-
+
raise InvalidRestrictedURIError(
+
uri,
+
"parsing error",
+
f"Error occurred while parsing Restricted URI: {str(e)}",
+
)
+28 -11
src/atpasser/uri/rkey.py
···
"""
if recordKey == "":
-
raise InvalidRKeyError(recordKey, "record key is empty", "Record key cannot be empty")
-
+
raise InvalidRKeyError(
+
recordKey, "record key is empty", "Record key cannot be empty"
+
)
+
if len(recordKey) > 512:
-
raise InvalidRKeyError(recordKey, "exceeds maximum length", f"Record key length {len(recordKey)} exceeds maximum allowed length of 512 characters")
+
raise InvalidRKeyError(
+
recordKey,
+
"exceeds maximum length",
+
f"Record key length {len(recordKey)} exceeds maximum allowed length of 512 characters",
+
)
if recordKey == "..":
-
raise InvalidRKeyError(recordKey, "reserved value", "'..' is a reserved value and cannot be used as a record key")
-
+
raise InvalidRKeyError(
+
recordKey,
+
"reserved value",
+
"'..' is a reserved value and cannot be used as a record key",
+
)
+
if recordKey == ".":
-
raise InvalidRKeyError(recordKey, "reserved value", "'.' is a reserved value and cannot be used as a record key")
+
raise InvalidRKeyError(
+
recordKey,
+
"reserved value",
+
"'.' is a reserved value and cannot be used as a record key",
+
)
-
valid_chars = set("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789.-_:~")
+
valid_chars = set(
+
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789.-_:~"
+
)
if not set(recordKey).issubset(valid_chars):
invalid_chars = set(recordKey) - valid_chars
-
raise InvalidRKeyError(recordKey, "contains invalid characters", f"Record key contains invalid characters: {', '.join(invalid_chars)}")
+
raise InvalidRKeyError(
+
recordKey,
+
"contains invalid characters",
+
f"Record key contains invalid characters: {', '.join(invalid_chars)}",
+
)
self.recordKey = recordKey
-
def __str__(self) -> str:
"""Convert the Record Key object to its string representation.
···
str: The canonical record key string.
"""
return self.recordKey
-
def __eq__(self, value: object, /) -> bool:
"""Check if two RKey objects represent the same record key.
···
return str(self) == str(value)
else:
return False
-
+1 -1
src/atpasser/uri/tid.py
···
for i in range(0, len(binary), 5)
]
)
-
+
def __eq__(self, value: object, /) -> bool:
"""Check if two TID objects represent the same identifier.