add exceptions support

Changed files
+350 -128
src
+55 -26
src/atpasser/uri/__init__.py
···
import urllib.parse as up
from . import handle, did
import jsonpath_ng
+
from .exceptions import InvalidURIError, ValidationError, URIError
class URI:
···
uri (str): The AT Protocol URI string to parse.
Raises:
-
ValueError: If the URI contains invalid characters, exceeds maximum length,
-
doesn't start with 'at://', or has other formatting issues.
+
InvalidURIError: If the URI contains invalid characters, exceeds maximum length,
+
doesn't start with 'at://', or has other formatting issues.
+
ValidationError: If specific validation checks fail for URI components.
"""
if not set(uri).issubset(set([chr(i) for i in range(0x80)])):
-
raise ValueError("invalid char in uri")
+
raise InvalidURIError(uri, "contains invalid characters", "URI must only contain ASCII characters")
if len(uri) > 8000:
-
raise ValueError("uri longer than 8000 chars") # the doc says 8 "kilobytes"
+
raise InvalidURIError(uri, "exceeds maximum length", f"URI length {len(uri)} exceeds maximum allowed length of 8000 characters")
if not uri.startswith("at://"):
-
raise ValueError("not starts with at://")
+
raise InvalidURIError(uri, "invalid format", "URI must start with 'at://'")
burner = uri[5:].split("#")
try:
fragment = up.unquote(burner[1])
-
except:
+
except IndexError:
fragment = None
+
except Exception as e:
+
raise InvalidURIError(uri, "fragment parsing failed", f"Failed to parse URI fragment: {str(e)}")
try:
query = up.unquote(burner[0].split("?")[1])
-
except:
+
except IndexError:
query = None
+
except Exception as e:
+
raise InvalidURIError(uri, "query parameter parsing failed", f"Failed to parse query parameters: {str(e)}")
-
path = [up.unquote(segment) for segment in burner[0].split("/")[1:]]
+
try:
+
path = [up.unquote(segment) for segment in burner[0].split("/")[1:]]
+
except Exception as e:
+
raise InvalidURIError(uri, "path parsing failed", f"Failed to parse URI path: {str(e)}")
+
if len(path) > 0 and path[-1] == "":
-
raise ValueError("trailing slash")
+
raise InvalidURIError(uri, "invalid path format", "URI cannot end with a slash")
-
authorityValue = up.unquote(burner[0].split("/")[0])
+
try:
+
authorityValue = up.unquote(burner[0].split("/")[0])
+
except Exception as e:
+
raise InvalidURIError(uri, "authority parsing failed", f"Failed to parse URI authority: {str(e)}")
-
p = up.urlparse(up.unquote("//" + authorityValue))
-
if p.username is not None or p.password is not None:
-
raise ValueError("userinfo unsupported")
+
try:
+
p = up.urlparse(up.unquote("//" + authorityValue))
+
if p.username is not None or p.password is not None:
+
raise InvalidURIError(uri, "unsupported format", "URI does not support user information (username:password)")
+
except Exception as e:
+
if not isinstance(e, InvalidURIError):
+
raise InvalidURIError(uri, "authority parsing failed", f"Failed to parse authority: {str(e)}")
+
else:
+
raise
# We decided not to detect if it's a handle there
#
···
# raise ValueError("authority is neither handle nor tid")
if fragment != None:
-
self.fragment = jsonpath_ng.parse(fragment)
-
self.fragmentAsText = up.quote(fragment)
+
try:
+
self.fragment = jsonpath_ng.parse(fragment)
+
self.fragmentAsText = up.quote(fragment)
+
except Exception as e:
+
raise InvalidURIError(uri, "JSONPath parsing failed", f"Failed to parse JSONPath fragment '{fragment}': {str(e)}")
if query != None:
-
self.query = up.parse_qs(query)
-
self.queryAsText = up.quote(query)
+
try:
+
self.query = up.parse_qs(query)
+
self.queryAsText = up.quote(query)
+
except Exception as e:
+
raise InvalidURIError(uri, "query parameter parsing failed", f"Failed to parse query parameters '{query}': {str(e)}")
else:
self.query, self.queryAsText = None, None
···
self.authorityAsText = authorityValue
try:
authority = handle.Handle(authorityValue)
-
except:
+
except Exception as e:
try:
authority = did.DID(authorityValue)
-
except:
+
except Exception as e2:
+
# Don't throw exception, just set authority to None
authority = None
self.authority = authority
-
unescapedAV = up.quote(authorityValue).replace("%3A", ":")
-
self.uri = "at://{}{}{}{}".format(
-
unescapedAV,
-
"/" + pathAsText if pathAsText != "" else "",
-
"?" + query if query != None else "",
-
"?" + fragment if fragment != None else "",
-
)
+
try:
+
unescapedAV = up.quote(authorityValue).replace("%3A", ":")
+
self.uri = "at://{}{}{}{}".format(
+
unescapedAV,
+
"/" + pathAsText if pathAsText != "" else "",
+
"?" + query if query != None else "",
+
"#" + fragment if fragment != None else "",
+
)
+
except Exception as e:
+
raise InvalidURIError(uri, "URI construction failed", f"Failed to construct canonical URI: {str(e)}")
def __str__(self) -> str:
"""Convert the URI object to its string representation.
+28 -11
src/atpasser/uri/did.py
···
import re
from pyld import jsonld
+
from .exceptions import InvalidDIDError, ResolutionError
class DID:
···
uri (str): The DID URI string to validate.
Raises:
-
ValueError: If the URI doesn't match the DID format or exceeds maximum length.
+
InvalidDIDError: If the URI doesn't match the DID format or exceeds maximum length.
"""
pattern = re.compile("^did:[a-z]+:[a-zA-Z0-9._:%-]*[a-zA-Z0-9._-]$")
patternMatch = pattern.match(uri)
-
if patternMatch and len(uri) <= 2048:
-
self.uri = patternMatch[0]
-
else:
-
raise ValueError
+
+
if not patternMatch:
+
raise InvalidDIDError(uri, "invalid format", "DID must follow 'did:method:identifier' format")
+
+
if len(uri) > 2048:
+
raise InvalidDIDError(uri, "exceeds maximum length", f"DID length {len(uri)} exceeds maximum allowed length of 2048 characters")
+
+
self.uri = patternMatch[0]
def __str__(self) -> str:
"""Convert the DID object to its string representation.
···
list: The expanded JSON-LD document representing the DID document,
compatible with the PyLD library for further processing.
+
Raises:
+
ResolutionError: If the DID document cannot be fetched or parsed.
+
InvalidDIDError: If the DID method is not supported.
+
Note:
- For 'did:plc:' DIDs, queries https://plc.directory/{did}
- For 'did:web:' DIDs, queries https://{domain}/.well-known/did.json
"""
-
if self.uri.startswith("did:plc:"):
-
return jsonld.expand(f"https://plc.directory/{self.uri}")
-
elif self.uri.startswith("did:web"):
-
return jsonld.expand(
-
f"https://{self.uri.replace("did:web:","")}/.well-known/did.json"
-
)
+
try:
+
if self.uri.startswith("did:plc:"):
+
return jsonld.expand(f"https://plc.directory/{self.uri}")
+
elif self.uri.startswith("did:web:"):
+
domain = self.uri.replace("did:web:", "")
+
if not domain:
+
raise InvalidDIDError(self.uri, "invalid format", "did:web DID must contain a domain")
+
return jsonld.expand(f"https://{domain}/.well-known/did.json")
+
else:
+
raise InvalidDIDError(self.uri, "unsupported DID method", f"Unsupported DID method: {self.uri.split(':')[1]}")
+
except Exception as e:
+
if isinstance(e, (InvalidDIDError, ResolutionError)):
+
raise
+
raise ResolutionError(self.uri, "fetch DID document", f"Failed to fetch or parse DID document: {str(e)}")
+116
src/atpasser/uri/exceptions.py
···
+
"""
+
Custom exception classes for error conditions in AT Protocol URI parsing and processing.
+
"""
+
+
from typing import Optional, Any
+
+
+
class URIError(Exception):
+
"""Base class for AT Protocol URI related errors."""
+
pass
+
+
+
class InvalidURIError(URIError):
+
"""Raised when URI format is invalid."""
+
+
def __init__(self, uri: str, reason: str, details: Optional[str] = None):
+
self.uri = uri
+
self.reason = reason
+
self.details = details
+
message = f"Invalid URI '{uri}': {reason}"
+
if details:
+
message += f" ({details})"
+
super().__init__(message)
+
+
+
class InvalidDIDError(URIError):
+
"""Raised when DID format is invalid."""
+
+
def __init__(self, did: str, reason: str, details: Optional[str] = None):
+
self.did = did
+
self.reason = reason
+
self.details = details
+
message = f"Invalid DID '{did}': {reason}"
+
if details:
+
message += f" ({details})"
+
super().__init__(message)
+
+
+
class InvalidHandleError(URIError):
+
"""Raised when Handle format is invalid."""
+
+
def __init__(self, handle: str, reason: str, details: Optional[str] = None):
+
self.handle = handle
+
self.reason = reason
+
self.details = details
+
message = f"Invalid Handle '{handle}': {reason}"
+
if details:
+
message += f" ({details})"
+
super().__init__(message)
+
+
+
class InvalidNSIDError(URIError):
+
"""Raised when NSID format is invalid."""
+
+
def __init__(self, nsid: str, reason: str, details: Optional[str] = None):
+
self.nsid = nsid
+
self.reason = reason
+
self.details = details
+
message = f"Invalid NSID '{nsid}': {reason}"
+
if details:
+
message += f" ({details})"
+
super().__init__(message)
+
+
+
class InvalidRKeyError(URIError):
+
"""Raised when RKey format is invalid."""
+
+
def __init__(self, rkey: str, reason: str, details: Optional[str] = None):
+
self.rkey = rkey
+
self.reason = reason
+
self.details = details
+
message = f"Invalid RKey '{rkey}': {reason}"
+
if details:
+
message += f" ({details})"
+
super().__init__(message)
+
+
+
class InvalidRestrictedURIError(URIError):
+
"""Raised when Restricted URI format is invalid."""
+
+
def __init__(self, uri: str, reason: str, details: Optional[str] = None):
+
self.uri = uri
+
self.reason = reason
+
self.details = details
+
message = f"Invalid Restricted URI '{uri}': {reason}"
+
if details:
+
message += f" ({details})"
+
super().__init__(message)
+
+
+
class ResolutionError(URIError):
+
"""Raised when resolution operation fails."""
+
+
def __init__(self, target: str, operation: str, reason: str, details: Optional[str] = None):
+
self.target = target
+
self.operation = operation
+
self.reason = reason
+
self.details = details
+
message = f"Resolution failed for '{target}' during {operation}: {reason}"
+
if details:
+
message += f" ({details})"
+
super().__init__(message)
+
+
+
class ValidationError(URIError):
+
"""Raised when validation operation fails."""
+
+
def __init__(self, value: Any, field: str, reason: str, details: Optional[str] = None):
+
self.value = value
+
self.field = field
+
self.reason = reason
+
self.details = details
+
message = f"Validation failed: value '{value}' for field '{field}' {reason}"
+
if details:
+
message += f" ({details})"
+
super().__init__(message)
+57 -28
src/atpasser/uri/handle.py
···
import dns.resolver, requests
from .did import DID
+
from .exceptions import InvalidHandleError, ResolutionError, InvalidDIDError
class Handle:
···
handle (str): The handle string to validate.
Raises:
-
ValueError: If the handle exceeds maximum length, has invalid domain structure,
-
contains invalid characters, or violates other naming rules.
+
InvalidHandleError: If the handle exceeds maximum length, has invalid domain structure,
+
contains invalid characters, or violates other naming rules.
"""
if len(handle) > 253:
-
raise ValueError("handle is more than 253 chars")
+
raise InvalidHandleError(handle, "exceeds maximum length", f"Handle length {len(handle)} exceeds maximum allowed length of 253 characters")
labels = handle.lower().split(".")
if len(labels) < 2:
-
raise ValueError("are you tld?")
+
raise InvalidHandleError(handle, "invalid format", "Handle must contain at least one dot separator, e.g., 'example.com'")
if labels[0] == "" or labels[-1] == "":
-
raise ValueError("proceeding or tariling ascii periods")
+
raise InvalidHandleError(handle, "invalid format", "Handle cannot start or end with a dot")
-
for label in labels:
+
for i, label in enumerate(labels):
if len(label) not in range(1, 64):
-
raise ValueError("two periods or segment longer than 63 char")
+
raise InvalidHandleError(handle, "segment length error", f"Handle segment {i+1} length {len(label)} is not in the 1-63 character range")
charset = set(label)
validset = set("abcdefghijklmnopqrstuvwxyz0123456789-")
if not charset.issubset(validset):
-
raise ValueError("invalid char used in segment")
+
invalid_chars = charset - validset
+
raise InvalidHandleError(handle, "contains invalid characters", f"Handle segment {i+1} contains invalid characters: {', '.join(invalid_chars)}")
if label.startswith("-") or label.endswith("-"):
-
raise ValueError("segments starts or ends with hyphen")
+
raise InvalidHandleError(handle, "invalid format", f"Handle segment {i+1} cannot start or end with a hyphen")
tld = labels[-1]
if tld[0] in "0123456789":
-
raise ValueError("tld starts with digit")
+
raise InvalidHandleError(handle, "invalid format", "Handle's top-level domain cannot start with a digit")
self.handle = handle
···
atpasser.uri.did.DID or None: A DID object if resolution succeeds,
None if resolution fails.
+
Raises:
+
ResolutionError: If there's an error during the resolution process.
+
Note:
Resolution follows AT Protocol specifications:
1. First attempts DNS TXT record lookup for 'did={did}' value
···
3. Returns None if both methods fail
"""
try:
-
answers = dns.resolver.resolve("_atproto." + self.handle, "TXT")
-
except:
-
answers = []
-
for answer in answers:
-
if str(answer).startswith('"did='):
-
try:
-
uri = str(answer)[5:-1]
-
return DID(uri)
-
except:
-
pass # cannot resolve via dns
-
response = requests.get(f"https://{self.handle}/.well-known/atproto-did")
-
if response.status_code // 100 != 2:
-
return None
-
if response.headers.get("Content-Type") != "text/plain":
-
pass # Pass for now, because some sites like neocities, breaks this rule
+
# First try DNS resolution
+
try:
+
answers = dns.resolver.resolve("_atproto." + self.handle, "TXT")
+
except Exception as e:
+
answers = []
+
+
for answer in answers:
+
answer_str = str(answer)
+
if answer_str.startswith('"did='):
+
try:
+
uri = answer_str[5:-1] # Remove '"did=' and '"'
+
return DID(uri)
+
except InvalidDIDError as e:
+
# Continue trying other records or methods
+
continue
+
except Exception as e:
+
raise ResolutionError(self.handle, "DNS resolution", f"Error parsing DNS TXT record: {str(e)}")
+
+
# If DNS resolution fails, try HTTP method
try:
-
return DID(response.text)
-
except:
-
return None
+
response = requests.get(f"https://{self.handle}/.well-known/atproto-did")
+
if response.status_code // 100 != 2:
+
return None
+
+
# Some websites may return incorrect Content-Type, so here we only warn without throwing an exception
+
content_type = response.headers.get("Content-Type")
+
if content_type != "text/plain" and content_type:
+
# Log warning but don't block processing
+
pass
+
+
try:
+
return DID(response.text.strip())
+
except InvalidDIDError:
+
return None
+
except requests.RequestException as e:
+
raise ResolutionError(self.handle, "HTTP request", f"Error requesting well-known endpoint: {str(e)}")
+
except Exception as e:
+
raise ResolutionError(self.handle, "HTTP parsing", f"Error parsing HTTP response: {str(e)}")
+
+
except Exception as e:
+
if isinstance(e, ResolutionError):
+
raise
+
raise ResolutionError(self.handle, "resolution", f"Unknown error occurred while resolving Handle: {str(e)}")
+44 -21
src/atpasser/uri/nsid.py
···
+
from .exceptions import InvalidNSIDError, ValidationError
+
+
class NSID:
"""A class representing a NSID (Namespace Identifier) in the AT Protocol.
···
nsid (str): The NSID string to parse and validate.
Raises:
-
ValueError: If the NSID contains invalid characters, exceeds maximum length,
-
has improper segment structure, or violates other naming rules.
+
InvalidNSIDError: If the NSID contains invalid characters, exceeds maximum length,
+
has improper segment structure, or violates other naming rules.
+
ValidationError: If specific validation checks fail for NSID components.
"""
if "#" in nsid:
-
nsidWithoutFragment, fragment = nsid.split("#")
+
parts = nsid.split("#", 1)
+
if len(parts) != 2:
+
raise InvalidNSIDError(nsid, "invalid format", "NSID fragment format is incorrect")
+
nsidWithoutFragment, fragment = parts
else:
nsidWithoutFragment, fragment = nsid, None
+
# Check for non-ASCII characters
if not set([x for x in nsidWithoutFragment]).issubset(
set([chr(i) for i in range(0x80)])
):
-
raise ValueError("nsid contains non-ascii chars")
+
raise InvalidNSIDError(nsid, "contains invalid characters", "NSID must only contain ASCII characters")
+
# Check length
if len(nsidWithoutFragment) > 317:
-
raise ValueError("length longer than 317 chars")
+
raise InvalidNSIDError(nsid, "exceeds maximum length", f"NSID length {len(nsidWithoutFragment)} exceeds maximum allowed length of 317 characters")
segments = nsidWithoutFragment.split(".")
+
# Check for leading or trailing dots
if nsidWithoutFragment.startswith(".") or nsidWithoutFragment.endswith("."):
-
raise ValueError("invalid, but is that undocumented lol")
+
raise InvalidNSIDError(nsid, "invalid format", "NSID cannot start or end with a dot")
+
# Check segment count
if len(segments) < 3:
-
raise ValueError("less than 3 segments")
+
raise InvalidNSIDError(nsid, "invalid format", f"NSID must contain at least 3 segments, currently has {len(segments)}")
domainAuthority = [segment.lower() for segment in segments[0:-1]]
+
# Check domain authority length
if len(".".join(domainAuthority)) > 253:
-
raise ValueError("domain authority more than 253 chars")
+
raise InvalidNSIDError(nsid, "domain authority length exceeds limit", "Domain authority part length exceeds 253 characters")
-
for segment in domainAuthority:
+
# Check each domain segment
+
for i, segment in enumerate(domainAuthority):
if len(segment) > 63 or segment == "":
-
raise ValueError("segment not in 1~63 chars")
+
raise InvalidNSIDError(nsid, "segment length error", f"Domain authority segment {i+1} length is not in the 1-63 character range")
if not set(segment).issubset(set("abcdefghijklmnopqrstuvwxyz0123456789-")):
-
raise ValueError("domain authority contains invalid chars")
+
invalid_chars = set(segment) - set("abcdefghijklmnopqrstuvwxyz0123456789-")
+
raise InvalidNSIDError(nsid, "contains invalid characters", f"Domain authority segment {i+1} contains invalid characters: {', '.join(invalid_chars)}")
if segment.startswith("-") or segment.endswith("-"):
-
raise ValueError("segment starts or ends with hyphen")
+
raise InvalidNSIDError(nsid, "invalid format", f"Domain authority segment {i+1} cannot start or end with a hyphen")
+
+
# Check if top-level domain starts with a digit
if segments[0][0] in "0123456789":
-
raise ValueError("tld starts with a digit")
+
raise InvalidNSIDError(nsid, "invalid format", "NSID's top-level domain cannot start with a digit")
self.domainAuthority = domainAuthority
self.domainAuthorityAsText = ".".join(domainAuthority)
name = segments[-1]
+
# Check name
if name == "" or len(name) > 63:
-
raise ValueError("name null or longer than 63 chars")
+
raise InvalidNSIDError(nsid, "name length error", "NSID name cannot be empty and length cannot exceed 63 characters")
if not set(name).issubset(
set("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789")
):
-
raise ValueError("name contains invalid chars")
+
invalid_chars = set(name) - set("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789")
+
raise InvalidNSIDError(nsid, "contains invalid characters", f"NSID name contains invalid characters: {', '.join(invalid_chars)}")
if name[0] in "0123456789":
-
raise ValueError("first char of name is a digit")
+
raise InvalidNSIDError(nsid, "invalid format", "NSID name cannot start with a digit")
self.name = name
+
# Check fragment
if fragment != None:
-
if fragment == "" or len(fragment) > 63:
-
raise ValueError("fragment null or longer than 63 chars")
+
raise InvalidNSIDError(nsid, "fragment length error", "NSID fragment cannot be empty and length cannot exceed 63 characters")
if not set(fragment).issubset(
set("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789")
):
-
raise ValueError("fragment contains invalid chars")
+
invalid_chars = set(fragment) - set("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789")
+
raise InvalidNSIDError(nsid, "contains invalid characters", f"NSID fragment contains invalid characters: {', '.join(invalid_chars)}")
if fragment[0] in "0123456789":
-
raise ValueError("first char of fragment is a digit")
+
raise InvalidNSIDError(nsid, "invalid format", "NSID fragment cannot start with a digit")
self.fragment = fragment
else:
self.fragment = None
-
self.nsid = ".".join(domainAuthority) + f".{name}" + f"#{fragment}"
+
# Build NSID string
+
if fragment is not None:
+
self.nsid = ".".join(domainAuthority) + f".{name}" + f"#{fragment}"
+
else:
+
self.nsid = ".".join(domainAuthority) + f".{name}"
def __str__(self) -> str:
"""Convert the NSID object to its string representation.
+31 -24
src/atpasser/uri/restricted.py
···
from . import handle, nsid
from . import rkey as rKey
from . import URI
+
from .exceptions import InvalidRestrictedURIError, InvalidURIError
···
uri (str): The AT Protocol URI string to parse as a restricted URI.
Raises:
-
ValueError: If the URI has query parameters or fragments, invalid authority,
-
or too many path segments for a restricted URI format.
+
InvalidRestrictedURIError: If the URI has query parameters or fragments, invalid authority,
+
or too many path segments for a restricted URI format.
+
InvalidURIError: If the underlying URI validation fails.
"""
-
super().__init__(uri)
+
try:
+
super().__init__(uri)
+
except InvalidURIError:
+
raise
+
except Exception as e:
+
raise InvalidURIError(uri, "base URI validation failed", f"Failed to parse base URI: {str(e)}")
-
if self.query != None and self.fragment != None:
+
if self.query is not None:
+
raise InvalidRestrictedURIError(uri, "query parameters not supported", "Restricted URI cannot contain query parameters")
-
raise ValueError("has query and/or fragments")
+
if self.fragment is not None:
+
raise InvalidRestrictedURIError(uri, "fragments not supported", "Restricted URI cannot contain fragments")
-
if self.authority == None:
+
if self.authority is None:
+
raise InvalidRestrictedURIError(uri, "invalid authority", "Restricted URI must contain a valid DID or Handle")
-
raise ValueError("invalid DID or handle")
+
try:
+
if len(self.path) == 0:
+
self.collection, self.rkey = None, None
-
if len(self.path) == 0:
+
elif len(self.path) == 1:
+
self.collection = nsid.NSID(self.path[0])
+
self.rkey = None
-
self.collection, self.rkey = None, None
-
-
elif len(self.path) == 1:
-
-
self.collection = nsid.NSID(self.path[0])
-
-
self.rkey = None
-
-
elif len(self.path) == 2:
-
-
self.collection = nsid.NSID(self.path[0])
-
-
self.rkey = rKey.RKey(self.path[1])
-
else:
-
-
raise ValueError("ttoo much path segments")
+
elif len(self.path) == 2:
+
self.collection = nsid.NSID(self.path[0])
+
self.rkey = rKey.RKey(self.path[1])
+
else:
+
raise InvalidRestrictedURIError(uri, "too many path segments", f"Restricted URI can have at most 2 path segments, currently has {len(self.path)}")
+
except Exception as e:
+
if isinstance(e, (InvalidRestrictedURIError, InvalidURIError)):
+
raise
+
raise InvalidRestrictedURIError(uri, "parsing error", f"Error occurred while parsing Restricted URI: {str(e)}")
+19 -18
src/atpasser/uri/rkey.py
···
+
from .exceptions import InvalidRKeyError
+
+
class RKey:
"""A class representing a Record Key (RKey) in the AT Protocol.
···
recordKey (str): The record key string to validate.
Raises:
-
ValueError: If the record key is empty, exceeds maximum length,
-
is a reserved value, or contains invalid characters.
+
InvalidRKeyError: If the record key is empty, exceeds maximum length,
+
is a reserved value, or contains invalid characters.
"""
-
-
if recordKey == "" or len(recordKey) > 512:
-
-
raise ValueError("null record key or record key longer than 512 chars")
-
-
-
if recordKey == ".." or recordKey == ".":
-
-
raise ValueError("reserved value . and ..")
-
-
-
if not set(recordKey).issubset(
-
-
set("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789.-_:~")
-
):
+
if recordKey == "":
+
raise InvalidRKeyError(recordKey, "record key is empty", "Record key cannot be empty")
+
+
if len(recordKey) > 512:
+
raise InvalidRKeyError(recordKey, "exceeds maximum length", f"Record key length {len(recordKey)} exceeds maximum allowed length of 512 characters")
-
raise ValueError("invalid char")
+
if recordKey == "..":
+
raise InvalidRKeyError(recordKey, "reserved value", "'..' is a reserved value and cannot be used as a record key")
+
+
if recordKey == ".":
+
raise InvalidRKeyError(recordKey, "reserved value", "'.' is a reserved value and cannot be used as a record key")
+
valid_chars = set("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789.-_:~")
+
if not set(recordKey).issubset(valid_chars):
+
invalid_chars = set(recordKey) - valid_chars
+
raise InvalidRKeyError(recordKey, "contains invalid characters", f"Record key contains invalid characters: {', '.join(invalid_chars)}")
self.recordKey = recordKey