Compare changes

Choose any two refs to compare.

Changed files
+249
src
atpasser
model
types
+249
src/atpasser/model/types/string.py
···
···
+
from typing import Any
+
import re
+
from datetime import datetime
+
from pydantic import field_validator
+
from ..base import DataModel
+
+
class StringModel(DataModel):
+
"""
+
Model for AT Protocol string type.
+
+
Represents a Unicode string with support for format restrictions, length limits,
+
known values, enumeration sets, default values and constants as specified in Lexicon.
+
"""
+
+
value: str
+
"""String value"""
+
+
format: str | None = None
+
"""String format restriction (e.g. 'datetime', 'uri')"""
+
+
maxLength: int | None = None
+
"""Maximum length in UTF-8 bytes"""
+
+
minLength: int | None = None
+
"""Minimum length in UTF-8 bytes"""
+
+
knownValues: list[str] | None = None
+
"""Suggested/common values (not enforced)"""
+
+
enum: list[str] | None = None
+
"""Closed set of allowed values"""
+
+
default: str | None = None
+
"""Default value if not provided"""
+
+
const: str | None = None
+
"""Fixed constant value if specified"""
+
+
def __init__(self, **data: Any) -> None:
+
"""
+
Initialize string model with validation.
+
+
Args:
+
**data: Input data containing string value
+
+
Raises:
+
ValueError: If value violates constraints
+
"""
+
super().__init__(**data)
+
if self.const is not None and self.value != self.const:
+
raise ValueError(f"String value must be {self.const}")
+
+
@field_validator("value", mode="before")
+
def validate_string(cls, v: Any) -> str:
+
"""
+
Validate and convert input to string.
+
+
Args:
+
v: Value to validate
+
+
Returns:
+
Validated string value
+
+
Raises:
+
ValueError: If value violates constraints
+
"""
+
if not isinstance(v, str):
+
v = str(v)
+
+
# Validate length constraints
+
if cls.minLength is not None and len(v.encode()) < cls.minLength:
+
raise ValueError(f"String must be at least {cls.minLength} bytes")
+
+
if cls.maxLength is not None and len(v.encode()) > cls.maxLength:
+
raise ValueError(f"String must be at most {cls.maxLength} bytes")
+
+
# Validate enum
+
if cls.enum and v not in cls.enum:
+
raise ValueError(f"Value must be one of {cls.enum}")
+
+
# Validate format if specified
+
if cls.format:
+
if cls.format == "datetime":
+
cls._validate_datetime(v)
+
elif cls.format == "uri":
+
cls._validate_uri(v)
+
elif cls.format == "did":
+
cls._validate_did(v)
+
elif cls.format == "handle":
+
cls._validate_handle(v)
+
elif cls.format == "at-identifier":
+
cls._validate_at_identifier(v)
+
elif cls.format == "at-uri":
+
cls._validate_at_uri(v)
+
elif cls.format == "cid":
+
cls._validate_cid(v)
+
elif cls.format == "nsid":
+
cls._validate_nsid(v)
+
elif cls.format == "tid":
+
cls._validate_tid(v)
+
elif cls.format == "record-key":
+
cls._validate_record_key(v)
+
elif cls.format == "language":
+
cls._validate_language(v)
+
+
return v
+
+
@classmethod
+
def _validate_datetime(cls, v: str) -> None:
+
"""Validate RFC 3339 datetime format"""
+
try:
+
datetime.fromisoformat(v.replace("Z", "+00:00"))
+
except ValueError:
+
raise ValueError("Invalid datetime format, must be RFC 3339")
+
+
@classmethod
+
def _validate_uri(cls, v: str) -> None:
+
"""Validate URI format"""
+
if len(v) > 8192: # 8KB max
+
raise ValueError("URI too long, max 8KB")
+
if not re.match(r"^[a-zA-Z][a-zA-Z0-9+.-]*:.+", v):
+
raise ValueError("Invalid URI format")
+
+
@classmethod
+
def _validate_did(cls, v: str) -> None:
+
"""Validate DID format"""
+
if len(v) > 2048:
+
raise ValueError("DID too long, max 2048 chars")
+
if not re.match(r"^did:[a-z]+:[a-zA-Z0-9._:%-]*[a-zA-Z0-9._-]$", v):
+
raise ValueError("Invalid URI format")
+
+
@classmethod
+
def _validate_handle(cls, v: str) -> None:
+
"""Validate handle format"""
+
if not re.match(r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$", v):
+
raise ValueError("Handle contains invalid characters")
+
if len(v) > 253:
+
raise ValueError("Handle too long, max 253 chars")
+
+
@classmethod
+
def _validate_at_identifier(cls, v: str) -> None:
+
"""Validate at-identifier format (DID or handle)"""
+
try:
+
if v.startswith("did:"):
+
cls._validate_did(v)
+
else:
+
cls._validate_handle(v)
+
except ValueError as e:
+
raise ValueError(f"Invalid at-identifier: {e}")
+
+
@classmethod
+
def _validate_at_uri(cls, v: str) -> None:
+
"""
+
Validate AT-URI format according to AT Protocol specification.
+
+
Args:
+
v: AT-URI string to validate
+
+
Raises:
+
ValueError: If URI violates any of these rules:
+
- Must start with 'at://'
+
- Max length 8KB
+
- No trailing slash
+
- Authority must be valid DID or handle
+
- Path segments must follow NSID/RKEY rules if present
+
"""
+
if not v.startswith("at://"):
+
raise ValueError("AT-URI must start with 'at://'")
+
if len(v) > 8192: # 8KB
+
raise ValueError("AT-URI too long, max 8KB")
+
if v.endswith('/'):
+
raise ValueError("AT-URI cannot have trailing slash")
+
+
# Split into parts
+
parts = v[5:].split('/') # Skip 'at://'
+
authority = parts[0]
+
+
# Validate authority (DID or handle)
+
if not authority:
+
raise ValueError("AT-URI must have authority")
+
+
if authority.startswith('did:'):
+
# Basic DID format check - actual DID validation is done elsewhere
+
if len(authority) > 2048:
+
raise ValueError("DID too long")
+
if ':' not in authority[4:]:
+
raise ValueError("Invalid DID format")
+
else:
+
# Handle validation
+
if not re.match(r'^[a-z0-9.-]+$', authority):
+
raise ValueError("Invalid handle characters")
+
if len(authority) > 253:
+
raise ValueError("Handle too long")
+
+
# Validate path segments if present
+
if len(parts) > 1:
+
if len(parts) > 3:
+
raise ValueError("AT-URI path too deep")
+
+
collection = parts[1]
+
if not re.match(r'^[a-zA-Z0-9.-]+$', collection):
+
raise ValueError("Invalid collection NSID")
+
+
if len(parts) > 2:
+
rkey = parts[2]
+
if not rkey:
+
raise ValueError("Record key cannot be empty")
+
if not re.match(r'^[a-zA-Z0-9._:%-~]+$', rkey):
+
raise ValueError("Invalid record key characters")
+
+
@classmethod
+
def _validate_cid(cls, v: str) -> None:
+
"""Validate CID string format"""
+
if len(v) > 100:
+
raise ValueError("CID too long, max 100 chars")
+
if not re.match(r"^[a-zA-Z0-9]+$", v):
+
raise ValueError("CID contains invalid characters")
+
+
@classmethod
+
def _validate_nsid(cls, v: str) -> None:
+
"""Validate NSID format"""
+
if len(v) > 317:
+
raise ValueError("NSID too long, max 317 chars")
+
if not re.match(r"^[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+(\.[a-zA-Z]([a-zA-Z0-9]{0,62})?)$", v):
+
raise ValueError("NSID contains invalid characters")
+
+
@classmethod
+
def _validate_tid(cls, v: str) -> None:
+
"""Validate TID format"""
+
if len(v) > 13:
+
raise ValueError("TID too long, max 13 chars")
+
if not re.match(r"^[234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{12}$", v):
+
raise ValueError("TID contains invalid characters")
+
+
@classmethod
+
def _validate_record_key(cls, v: str) -> None:
+
"""Validate record-key format"""
+
if len(v) > 512:
+
raise ValueError("Record key too long, max 512 chars")
+
if v == "." or v == "..":
+
raise ValueError(f"Record key is {v}, which is not allowed")
+
if not re.match(r"^[a-zA-Z0-9._:%-~]+$", v):
+
raise ValueError("Record key contains invalid characters")
+
+
@classmethod
+
def _validate_language(cls, v: str) -> None:
+
"""Validate BCP 47 language tag"""
+
if not re.match(r"^[a-zA-Z]{1,8}(-[a-zA-Z0-9]{1,8})*$", v):
+
raise ValueError("Invalid language tag format")