···
1
+
from typing import Any
3
+
from datetime import datetime
4
+
from pydantic import field_validator
5
+
from ..base import DataModel
7
+
class StringModel(DataModel):
9
+
Model for AT Protocol string type.
11
+
Represents a Unicode string with support for format restrictions, length limits,
12
+
known values, enumeration sets, default values and constants as specified in Lexicon.
18
+
format: str | None = None
19
+
"""String format restriction (e.g. 'datetime', 'uri')"""
21
+
maxLength: int | None = None
22
+
"""Maximum length in UTF-8 bytes"""
24
+
minLength: int | None = None
25
+
"""Minimum length in UTF-8 bytes"""
27
+
knownValues: list[str] | None = None
28
+
"""Suggested/common values (not enforced)"""
30
+
enum: list[str] | None = None
31
+
"""Closed set of allowed values"""
33
+
default: str | None = None
34
+
"""Default value if not provided"""
36
+
const: str | None = None
37
+
"""Fixed constant value if specified"""
39
+
def __init__(self, **data: Any) -> None:
41
+
Initialize string model with validation.
44
+
**data: Input data containing string value
47
+
ValueError: If value violates constraints
49
+
super().__init__(**data)
50
+
if self.const is not None and self.value != self.const:
51
+
raise ValueError(f"String value must be {self.const}")
53
+
@field_validator("value", mode="before")
54
+
def validate_string(cls, v: Any) -> str:
56
+
Validate and convert input to string.
59
+
v: Value to validate
62
+
Validated string value
65
+
ValueError: If value violates constraints
67
+
if not isinstance(v, str):
70
+
# Validate length constraints
71
+
if cls.minLength is not None and len(v.encode()) < cls.minLength:
72
+
raise ValueError(f"String must be at least {cls.minLength} bytes")
74
+
if cls.maxLength is not None and len(v.encode()) > cls.maxLength:
75
+
raise ValueError(f"String must be at most {cls.maxLength} bytes")
78
+
if cls.enum and v not in cls.enum:
79
+
raise ValueError(f"Value must be one of {cls.enum}")
81
+
# Validate format if specified
83
+
if cls.format == "datetime":
84
+
cls._validate_datetime(v)
85
+
elif cls.format == "uri":
86
+
cls._validate_uri(v)
87
+
elif cls.format == "did":
88
+
cls._validate_did(v)
89
+
elif cls.format == "handle":
90
+
cls._validate_handle(v)
91
+
elif cls.format == "at-identifier":
92
+
cls._validate_at_identifier(v)
93
+
elif cls.format == "at-uri":
94
+
cls._validate_at_uri(v)
95
+
elif cls.format == "cid":
96
+
cls._validate_cid(v)
97
+
elif cls.format == "nsid":
98
+
cls._validate_nsid(v)
99
+
elif cls.format == "tid":
100
+
cls._validate_tid(v)
101
+
elif cls.format == "record-key":
102
+
cls._validate_record_key(v)
103
+
elif cls.format == "language":
104
+
cls._validate_language(v)
109
+
def _validate_datetime(cls, v: str) -> None:
110
+
"""Validate RFC 3339 datetime format"""
112
+
datetime.fromisoformat(v.replace("Z", "+00:00"))
114
+
raise ValueError("Invalid datetime format, must be RFC 3339")
117
+
def _validate_uri(cls, v: str) -> None:
118
+
"""Validate URI format"""
119
+
if len(v) > 8192: # 8KB max
120
+
raise ValueError("URI too long, max 8KB")
121
+
if not re.match(r"^[a-zA-Z][a-zA-Z0-9+.-]*:.+", v):
122
+
raise ValueError("Invalid URI format")
125
+
def _validate_did(cls, v: str) -> None:
126
+
"""Validate DID format"""
128
+
raise ValueError("DID too long, max 2048 chars")
129
+
if not re.match(r"^did:[a-z]+:[a-zA-Z0-9._:%-]*[a-zA-Z0-9._-]$", v):
130
+
raise ValueError("Invalid URI format")
133
+
def _validate_handle(cls, v: str) -> None:
134
+
"""Validate handle format"""
135
+
if not re.match(r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$", v):
136
+
raise ValueError("Handle contains invalid characters")
138
+
raise ValueError("Handle too long, max 253 chars")
141
+
def _validate_at_identifier(cls, v: str) -> None:
142
+
"""Validate at-identifier format (DID or handle)"""
144
+
if v.startswith("did:"):
145
+
cls._validate_did(v)
147
+
cls._validate_handle(v)
148
+
except ValueError as e:
149
+
raise ValueError(f"Invalid at-identifier: {e}")
152
+
def _validate_at_uri(cls, v: str) -> None:
154
+
Validate AT-URI format according to AT Protocol specification.
157
+
v: AT-URI string to validate
160
+
ValueError: If URI violates any of these rules:
161
+
- Must start with 'at://'
163
+
- No trailing slash
164
+
- Authority must be valid DID or handle
165
+
- Path segments must follow NSID/RKEY rules if present
167
+
if not v.startswith("at://"):
168
+
raise ValueError("AT-URI must start with 'at://'")
169
+
if len(v) > 8192: # 8KB
170
+
raise ValueError("AT-URI too long, max 8KB")
171
+
if v.endswith('/'):
172
+
raise ValueError("AT-URI cannot have trailing slash")
175
+
parts = v[5:].split('/') # Skip 'at://'
176
+
authority = parts[0]
178
+
# Validate authority (DID or handle)
180
+
raise ValueError("AT-URI must have authority")
182
+
if authority.startswith('did:'):
183
+
# Basic DID format check - actual DID validation is done elsewhere
184
+
if len(authority) > 2048:
185
+
raise ValueError("DID too long")
186
+
if ':' not in authority[4:]:
187
+
raise ValueError("Invalid DID format")
189
+
# Handle validation
190
+
if not re.match(r'^[a-z0-9.-]+$', authority):
191
+
raise ValueError("Invalid handle characters")
192
+
if len(authority) > 253:
193
+
raise ValueError("Handle too long")
195
+
# Validate path segments if present
198
+
raise ValueError("AT-URI path too deep")
200
+
collection = parts[1]
201
+
if not re.match(r'^[a-zA-Z0-9.-]+$', collection):
202
+
raise ValueError("Invalid collection NSID")
207
+
raise ValueError("Record key cannot be empty")
208
+
if not re.match(r'^[a-zA-Z0-9._:%-~]+$', rkey):
209
+
raise ValueError("Invalid record key characters")
212
+
def _validate_cid(cls, v: str) -> None:
213
+
"""Validate CID string format"""
215
+
raise ValueError("CID too long, max 100 chars")
216
+
if not re.match(r"^[a-zA-Z0-9]+$", v):
217
+
raise ValueError("CID contains invalid characters")
220
+
def _validate_nsid(cls, v: str) -> None:
221
+
"""Validate NSID format"""
223
+
raise ValueError("NSID too long, max 317 chars")
224
+
if not re.match(r"^[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+(\.[a-zA-Z]([a-zA-Z0-9]{0,62})?)$", v):
225
+
raise ValueError("NSID contains invalid characters")
228
+
def _validate_tid(cls, v: str) -> None:
229
+
"""Validate TID format"""
231
+
raise ValueError("TID too long, max 13 chars")
232
+
if not re.match(r"^[234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{12}$", v):
233
+
raise ValueError("TID contains invalid characters")
236
+
def _validate_record_key(cls, v: str) -> None:
237
+
"""Validate record-key format"""
239
+
raise ValueError("Record key too long, max 512 chars")
240
+
if v == "." or v == "..":
241
+
raise ValueError(f"Record key is {v}, which is not allowed")
242
+
if not re.match(r"^[a-zA-Z0-9._:%-~]+$", v):
243
+
raise ValueError("Record key contains invalid characters")
246
+
def _validate_language(cls, v: str) -> None:
247
+
"""Validate BCP 47 language tag"""
248
+
if not re.match(r"^[a-zA-Z]{1,8}(-[a-zA-Z0-9]{1,8})*$", v):
249
+
raise ValueError("Invalid language tag format")