rebuild data model, again

+205
ARCHITECTURE_OVERVIEW.md
···
+
# ATProto Data 和 Lexicon 模块架构总览
+
+
## 项目概述
+
+
本项目为 ATProto (Authenticated Transfer Protocol) 提供 Python 实现,专注于数据模型和 Lexicon 定义的处理。基于现有的 URI 模块架构模式,提供类型安全的数据验证、序列化和 Lexicon 解析功能。
+
+
## 整体架构设计
+
+
### 1. 系统架构图
+
+
```mermaid
+
graph TB
+
subgraph "ATProto 核心模块"
+
URI[URI 模块]
+
Data[Data 模块]
+
Lexicon[Lexicon 模块]
+
end
+
+
subgraph "外部依赖"
+
Pydantic[Pydantic]
+
CBOR[cbor2]
+
CID[py-cid]
+
end
+
+
subgraph "数据流"
+
LexiconJSON[Lexicon JSON 文件]
+
RawData[原始数据]
+
end
+
+
LexiconJSON --> Lexicon
+
Lexicon --> Data
+
RawData --> Data
+
Data --> Serialized[序列化数据]
+
+
URI --> Data
+
URI --> Lexicon
+
+
Pydantic --> Data
+
Pydantic --> Lexicon
+
CBOR --> Data
+
CID --> Data
+
```
+
+
### 2. 模块职责划分
+
+
#### 2.1 Data 模块 (`src/atpasser/data`)
+
- **数据序列化**: JSON 和 DAG-CBOR 格式的序列化/反序列化
+
- **数据验证**: 类型验证、格式验证、约束验证
+
- **特殊类型处理**: CID 链接、Blob 引用、日期时间格式等
+
- **错误处理**: 详细的验证错误和序列化错误
+
+
#### 2.2 Lexicon 模块 (`src/atpasser/lexicon`)
+
- **定义解析**: 解析 Lexicon JSON 定义文件
+
- **模型生成**: 动态生成 Pydantic 模型类
+
- **引用解析**: 处理跨定义引用和联合类型
+
- **注册管理**: 模型注册表和缓存管理
+
- **兼容性验证**: 前向和后向兼容性检查
+
+
### 3. 核心功能特性
+
+
#### 3.1 类型安全
+
- 基于 Pydantic 的强类型系统
+
- 运行时类型验证
+
- 自动类型转换和规范化
+
+
#### 3.2 格式支持
+
- **JSON**: 符合 ATProto JSON 编码规范
+
- **DAG-CBOR**: 支持规范的 DAG-CBOR 编码
+
- **混合格式**: 支持两种格式间的转换
+
+
#### 3.3 验证系统
+
- 语法验证 (基础数据类型)
+
- 语义验证 (业务规则和约束)
+
- 格式验证 (字符串格式如 datetime、uri、did 等)
+
- 引用验证 (CID、blob、跨定义引用)
+
+
### 4. 集成架构
+
+
#### 4.1 与现有 URI 模块的集成
+
+
```python
+
# 示例:URI 与 Data 模块的集成
+
from atpasser.uri import URI, NSID
+
from atpasser.data import ATProtoSerializer
+
from atpasser.lexicon import LexiconRegistry
+
+
# 解析 URI
+
uri = URI("at://example.com/com.example.blog.post/123")
+
+
# 根据 NSID 获取对应的数据模型
+
model_class = LexiconRegistry.get_model(uri.collection.nsid)
+
+
# 使用 Data 模块处理数据
+
serializer = ATProtoSerializer()
+
data = serializer.from_json(raw_data, model_class)
+
```
+
+
#### 4.2 数据流架构
+
+
```
+
原始数据 → Data 模块验证 → Lexicon 模型转换 → 序列化输出
+
Lexicon JSON → Lexicon 模块解析 → 生成 Pydantic 模型 → 注册到注册表
+
```
+
+
### 5. 错误处理架构
+
+
#### 5.1 统一的错误体系
+
+
```python
+
class ATProtoError(Exception):
+
"""基础错误类"""
+
pass
+
+
class DataError(ATProtoError):
+
"""数据相关错误"""
+
pass
+
+
class LexiconError(ATProtoError):
+
"""Lexicon 相关错误"""
+
pass
+
+
class URIError(ATProtoError):
+
"""URI 相关错误"""
+
pass
+
```
+
+
#### 5.2 错误诊断
+
- **字段级错误定位**: 精确到具体字段的路径信息
+
- **上下文信息**: 包含验证时的输入数据和期望格式
+
- **建议修复**: 提供具体的修复建议
+
+
### 6. 性能优化策略
+
+
#### 6.1 缓存机制
+
- **模型缓存**: 缓存已解析的 Lexicon 模型
+
- **序列化缓存**: 缓存序列化结果
+
- **引用解析缓存**: 缓存跨定义引用解析结果
+
+
#### 6.2 懒加载
+
- 按需解析 Lexicon 定义
+
- 延迟模型生成直到实际使用
+
- 动态导入依赖模块
+
+
### 7. 扩展性设计
+
+
#### 7.1 插件系统
+
- 支持自定义类型处理器
+
- 支持自定义验证规则
+
- 支持自定义序列化格式
+
+
#### 7.2 中间件支持
+
- 预处理钩子 (数据清洗、转换)
+
- 后处理钩子 (日志记录、监控)
+
- 验证钩子 (自定义验证逻辑)
+
+
### 8. 实施路线图
+
+
#### 阶段 1: 基础实现 (2-3 周)
+
- 实现 Data 模块基础类型和 JSON 序列化
+
- 实现 Lexicon 模块基础解析器
+
- 建立基本的错误处理系统
+
+
#### 阶段 2: 完整功能 (3-4 周)
+
- 添加 CBOR 序列化支持
+
- 实现完整的验证系统
+
- 添加引用解析和联合类型支持
+
+
#### 阶段 3: 优化增强 (2 周)
+
- 实现缓存和性能优化
+
- 添加高级格式验证
+
- 完善错误处理和诊断信息
+
+
#### 阶段 4: 测试部署 (1-2 周)
+
- 编写完整的测试套件
+
- 性能测试和优化
+
- 文档编写和示例代码
+
+
### 9. 依赖管理
+
+
#### 9.1 核心依赖
+
- `pydantic >=2.11.9`: 数据验证和模型定义
+
- `cbor2 >=5.7.0`: CBOR 序列化支持
+
- `py-cid >=0.3.0`: CID 处理支持
+
+
#### 9.2 可选依赖
+
- `jsonpath-ng >=1.7.0`: JSONPath 支持
+
- `langcodes >=3.5.0`: 语言代码验证
+
+
### 10. 质量保证
+
+
#### 10.1 测试策略
+
- **单元测试**: 覆盖所有核心功能
+
- **集成测试**: 测试模块间集成
+
- **兼容性测试**: 确保与规范兼容
+
- **性能测试**: 验证性能指标
+
+
#### 10.2 代码质量
+
- 类型注解覆盖率达到 100%
+
- 测试覆盖率超过 90%
+
- 遵循 PEP 8 编码规范
+
- 详细的文档和示例
+
+
## 总结
+
+
本架构设计提供了一个完整、可扩展的 ATProto 数据处理解决方案,充分利用了 Python 的类型系统和现有生态,同时保持了与 ATProto 规范的完全兼容性。模块化的设计使得各个组件可以独立开发和测试,同时也便于未来的扩展和维护。
+119
examples/basic_usage.py
···
+
"""Basic usage examples for ATProto data and lexicon modules."""
+
+
import json
+
from atpasser.data import serializer, CIDLink, DateTimeString
+
from atpasser.lexicon import parser, registry
+
+
+
def demonstrate_data_serialization():
+
"""Demonstrate basic data serialization."""
+
print("=== Data Serialization Demo ===")
+
+
# Create some sample data
+
sample_data = {
+
"title": "Hello ATProto",
+
"content": "This is a test post",
+
"createdAt": "2024-01-15T10:30:00.000Z",
+
"tags": ["atproto", "test", "demo"],
+
"cidLink": CIDLink(
+
"bafyreidfayvfuwqa7qlnopdjiqrxzs6blmoeu4rujcjtnci5beludirz2a"
+
),
+
}
+
+
# Serialize to JSON
+
json_output = serializer.to_json(sample_data, indent=2)
+
print("JSON Output:")
+
print(json_output)
+
+
# Deserialize back
+
deserialized = serializer.from_json(json_output)
+
print("\nDeserialized:")
+
print(deserialized)
+
+
print()
+
+
+
def demonstrate_lexicon_parsing():
+
"""Demonstrate Lexicon parsing."""
+
print("=== Lexicon Parsing Demo ===")
+
+
# Sample Lexicon definition
+
sample_lexicon = {
+
"lexicon": 1,
+
"id": "com.example.blog.post",
+
"description": "A simple blog post record",
+
"defs": {
+
"main": {
+
"type": "record",
+
"key": "literal:post",
+
"record": {
+
"type": "object",
+
"properties": {
+
"title": {"type": "string", "maxLength": 300},
+
"content": {"type": "string"},
+
"createdAt": {"type": "string", "format": "datetime"},
+
"tags": {
+
"type": "array",
+
"items": {"type": "string"},
+
"maxLength": 10,
+
},
+
},
+
"required": ["title", "content", "createdAt"],
+
},
+
}
+
},
+
}
+
+
try:
+
# Parse and register the Lexicon
+
parser.parse_and_register(sample_lexicon)
+
print("Lexicon parsed and registered successfully!")
+
+
# Get the generated model
+
model_class = registry.get_model("com.example.blog.post")
+
if model_class:
+
print(f"Generated model: {model_class.__name__}")
+
+
# Create an instance using the model
+
post_data = {
+
"title": "Test Post",
+
"content": "This is a test post content",
+
"createdAt": "2024-01-15T10:30:00.000Z",
+
"tags": ["test", "demo"],
+
}
+
+
validated_post = model_class(**post_data)
+
print(f"Validated post: {validated_post.model_dump()}")
+
+
except Exception as e:
+
print(f"Error: {e}")
+
+
print()
+
+
+
def demonstrate_custom_types():
+
"""Demonstrate custom type validation."""
+
print("=== Custom Type Validation Demo ===")
+
+
# DateTimeString validation
+
try:
+
valid_dt = DateTimeString("2024-01-15T10:30:00.000Z")
+
print(f"Valid datetime: {valid_dt}")
+
except Exception as e:
+
print(f"DateTime validation error: {e}")
+
+
# Invalid datetime
+
try:
+
invalid_dt = DateTimeString("invalid-date")
+
print(f"Invalid datetime: {invalid_dt}")
+
except Exception as e:
+
print(f"DateTime validation caught: {e}")
+
+
print()
+
+
+
if __name__ == "__main__":
+
demonstrate_data_serialization()
+
demonstrate_lexicon_parsing()
+
demonstrate_custom_types()
+
print("Demo completed!")
+11
src/atpasser/__init__.py
···
+
"""ATProto Python implementation - Tools for Authenticated Transfer Protocol."""
+
+
from . import uri
+
from . import data
+
from . import lexicon
+
+
__all__ = ["uri", "data", "lexicon"]
+
+
__version__ = "0.1.0"
+
__author__ = "diaowinner"
+
__email__ = "diaowinner@qq.com"
+215
src/atpasser/data/ARCHITECTURE.md
···
+
# ATProto 数据模型模块架构设计
+
+
## 概述
+
+
本模块负责实现 ATProto 数据模型的序列化、反序列化和验证功能,支持 JSON 和 DAG-CBOR 两种格式的数据编码。
+
+
## 核心架构设计
+
+
### 1. 基础类型系统
+
+
#### 1.1 基础类型映射
+
+
```python
+
# 基础类型映射
+
DATA_MODEL_TYPE_MAPPING = {
+
"null": NoneType,
+
"boolean": bool,
+
"integer": int,
+
"string": str,
+
"bytes": bytes,
+
"cid-link": CIDLink,
+
"blob": BlobRef,
+
"array": list,
+
"object": dict
+
}
+
```
+
+
#### 1.2 自定义字段类型
+
+
- **CIDLink**: 处理 CID 链接,支持二进制和字符串表示
+
- **BlobRef**: 处理 blob 引用,支持新旧格式兼容
+
- **DateTimeString**: RFC 3339 日期时间格式验证
+
- **LanguageTag**: BCP 47 语言标签验证
+
+
### 2. 序列化器架构
+
+
#### 2.1 序列化器层级结构
+
+
```
+
ATProtoSerializer
+
├── JSONSerializer
+
│ ├── Normalizer
+
│ └── Denormalizer
+
└── CBORSerializer
+
├── DAGCBOREncoder
+
└── DAGCBORDecoder
+
```
+
+
#### 2.2 序列化流程
+
+
1. **数据验证**: 使用 Pydantic 模型验证数据
+
2. **格式转换**: 特殊类型转换(CID、bytes 等)
+
3. **编码**: 根据目标格式进行编码
+
4. **规范化**: 确保输出符合 ATProto 规范
+
+
### 3. 验证系统
+
+
#### 3.1 验证层级
+
+
1. **语法验证**: 基础数据类型验证
+
2. **格式验证**: 字符串格式验证(datetime、uri、did 等)
+
3. **约束验证**: 长度、范围、枚举等约束验证
+
4. **引用验证**: CID 和 blob 引用有效性验证
+
+
#### 3.2 自定义验证器
+
+
```python
+
class DataModelValidator:
+
def validate_cid(self, value: str) -> bool:
+
"""验证 CID 格式"""
+
pass
+
+
def validate_datetime(self, value: str) -> bool:
+
"""验证 RFC 3339 datetime 格式"""
+
pass
+
+
def validate_did(self, value: str) -> bool:
+
"""验证 DID 格式"""
+
pass
+
+
def validate_handle(self, value: str) -> bool:
+
"""验证 handle 格式"""
+
pass
+
+
def validate_nsid(self, value: str) -> bool:
+
"""验证 NSID 格式"""
+
pass
+
```
+
+
### 4. 特殊类型处理
+
+
#### 4.1 CID 链接处理
+
+
```python
+
class CIDLink:
+
"""处理 CID 链接类型"""
+
+
def __init__(self, cid: Union[str, bytes]):
+
self.cid = cid
+
+
def to_json(self) -> dict:
+
"""序列化为 JSON 格式: {"$link": "cid-string"}"""
+
return {"$link": str(self.cid)}
+
+
def to_cbor(self) -> bytes:
+
"""序列化为 DAG-CBOR 格式"""
+
pass
+
```
+
+
#### 4.2 Blob 引用处理
+
+
```python
+
class BlobRef:
+
"""处理 blob 引用,支持新旧格式"""
+
+
def __init__(self, ref: CIDLink, mime_type: str, size: int):
+
self.ref = ref
+
self.mime_type = mime_type
+
self.size = size
+
+
def to_json(self) -> dict:
+
"""序列化为 JSON 格式"""
+
return {
+
"$type": "blob",
+
"ref": self.ref.to_json(),
+
"mimeType": self.mime_type,
+
"size": self.size
+
}
+
+
@classmethod
+
def from_legacy(cls, data: dict):
+
"""从旧格式解析"""
+
pass
+
```
+
+
### 5. 错误处理系统
+
+
#### 5.1 错误类型体系
+
+
```python
+
class DataModelError(Exception):
+
"""基础数据模型错误"""
+
pass
+
+
class SerializationError(DataModelError):
+
"""序列化错误"""
+
pass
+
+
class ValidationError(DataModelError):
+
"""验证错误"""
+
pass
+
+
class FormatError(DataModelError):
+
"""格式错误"""
+
pass
+
```
+
+
#### 5.2 错误消息格式
+
+
- **详细路径信息**: 包含字段路径
+
- **期望值描述**: 明确的期望格式说明
+
- **上下文信息**: 验证时的上下文数据
+
+
### 6. 模块文件结构
+
+
```
+
src/atpasser/data/
+
├── __init__.py # 模块导出
+
├── ARCHITECTURE.md # 架构文档
+
├── types.py # 基础类型定义
+
├── serializer.py # 序列化器实现
+
├── validator.py # 验证器实现
+
├── exceptions.py # 异常定义
+
├── cid.py # CID 链接处理
+
├── blob.py # Blob 引用处理
+
└── formats.py # 格式验证器
+
```
+
+
### 7. 依赖关系
+
+
- **内部依赖**: `src/atpasser/uri` (NSID、DID、Handle 验证)
+
- **外部依赖**:
+
- `pydantic`: 数据验证
+
- `cbor2`: CBOR 序列化
+
- `py-cid`: CID 处理
+
+
## 实现策略
+
+
### 1. 渐进式实现
+
+
1. **阶段一**: 实现基础类型和 JSON 序列化
+
2. **阶段二**: 添加 CBOR 序列化和验证器
+
3. **阶段三**: 实现高级格式验证
+
4. **阶段四**: 性能优化和内存管理
+
+
### 2. 测试策略
+
+
- **单元测试**: 测试各个组件功能
+
- **集成测试**: 测试端到端数据流
+
- **兼容性测试**: 确保与现有实现兼容
+
- **性能测试**: 验证序列化性能
+
+
### 3. 扩展性考虑
+
+
- **插件系统**: 支持自定义格式验证
+
- **中间件**: 支持预处理和后处理钩子
+
- **缓存**: 序列化结果缓存优化
+
+
## 优势
+
+
1. **类型安全**: 基于 Pydantic 的强类型系统
+
2. **性能**: 优化的序列化实现
+
3. **兼容性**: 支持新旧格式兼容
+
4. **可扩展**: 模块化设计支持未来扩展
+
5. **错误友好**: 详细的错误消息和诊断信息
+47
src/atpasser/data/__init__.py
···
+
"""ATProto data model module for serialization and validation."""
+
+
from .exceptions import (
+
DataModelError,
+
SerializationError,
+
ValidationError,
+
FormatError,
+
CIDError,
+
BlobError,
+
)
+
+
from .types import (
+
CIDLink,
+
DateTimeString,
+
LanguageTag,
+
ATUri,
+
DIDString,
+
HandleString,
+
NSIDString,
+
)
+
+
from .formats import format_validator, FormatValidator
+
from .serializer import ATProtoSerializer, serializer
+
+
__all__ = [
+
# Exceptions
+
"DataModelError",
+
"SerializationError",
+
"ValidationError",
+
"FormatError",
+
"CIDError",
+
"BlobError",
+
# Types
+
"CIDLink",
+
"DateTimeString",
+
"LanguageTag",
+
"ATUri",
+
"DIDString",
+
"HandleString",
+
"NSIDString",
+
# Validators
+
"format_validator",
+
"FormatValidator",
+
# Serializers
+
"ATProtoSerializer",
+
"serializer",
+
]
+87
src/atpasser/data/exceptions.py
···
+
"""Exceptions for ATProto data model module."""
+
+
from typing import Optional
+
+
+
class DataModelError(Exception):
+
"""Base exception for data model errors."""
+
+
def __init__(self, message: str, details: Optional[str] = None):
+
self.message = message
+
self.details = details
+
super().__init__(message)
+
+
+
class SerializationError(DataModelError):
+
"""Raised when serialization fails."""
+
+
def __init__(self, message: str, details: Optional[str] = None):
+
super().__init__(f"Serialization error: {message}", details)
+
+
+
class ValidationError(DataModelError):
+
"""Raised when data validation fails."""
+
+
def __init__(
+
self,
+
message: str,
+
field_path: Optional[str] = None,
+
expected: Optional[str] = None,
+
actual: Optional[str] = None,
+
):
+
self.fieldPath = field_path
+
self.expected = expected
+
self.actual = actual
+
+
details = []
+
if field_path:
+
details.append(f"Field: {field_path}")
+
if expected:
+
details.append(f"Expected: {expected}")
+
if actual:
+
details.append(f"Actual: {actual}")
+
+
super().__init__(
+
f"Validation error: {message}", "; ".join(details) if details else None
+
)
+
+
+
class FormatError(DataModelError):
+
"""Raised when format validation fails."""
+
+
def __init__(
+
self,
+
message: str,
+
format_type: Optional[str] = None,
+
value: Optional[str] = None,
+
):
+
self.formatType = format_type
+
self.value = value
+
+
details = []
+
if format_type:
+
details.append(f"Format: {format_type}")
+
if value:
+
details.append(f"Value: {value}")
+
+
super().__init__(
+
f"Format error: {message}", "; ".join(details) if details else None
+
)
+
+
+
class CIDError(DataModelError):
+
"""Raised when CID processing fails."""
+
+
def __init__(self, message: str, cid: Optional[str] = None):
+
self.cid = cid
+
super().__init__(f"CID error: {message}", f"CID: {cid}" if cid else None)
+
+
+
class BlobError(DataModelError):
+
"""Raised when blob processing fails."""
+
+
def __init__(self, message: str, blob_ref: Optional[str] = None):
+
self.blobRef = blob_ref
+
super().__init__(
+
f"Blob error: {message}", f"Blob ref: {blob_ref}" if blob_ref else None
+
)
+190
src/atpasser/data/formats.py
···
+
"""Format validators for ATProto data model."""
+
+
import re
+
from typing import Any, Optional
+
from .exceptions import FormatError
+
+
+
class FormatValidator:
+
"""Validates string formats according to ATProto specifications."""
+
+
@staticmethod
+
def validate_datetime(value: str) -> str:
+
"""Validate RFC 3339 datetime format."""
+
# RFC 3339 pattern with strict validation
+
pattern = (
+
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:\d{2})$"
+
)
+
if not re.match(pattern, value):
+
raise FormatError("Invalid RFC 3339 datetime format", "datetime", value)
+
+
# Additional semantic validation
+
try:
+
# Extract date parts for validation
+
date_part, time_part = value.split("T", 1)
+
year, month, day = map(int, date_part.split("-"))
+
+
# Basic date validation
+
if not (1 <= month <= 12):
+
raise FormatError("Month must be between 01 and 12", "datetime", value)
+
if not (1 <= day <= 31):
+
raise FormatError("Day must be between 01 and 31", "datetime", value)
+
if year < 0:
+
raise FormatError("Year must be positive", "datetime", value)
+
+
except ValueError:
+
raise FormatError("Invalid datetime structure", "datetime", value)
+
+
return value
+
+
@staticmethod
+
def validate_did(value: str) -> str:
+
"""Validate DID format."""
+
pattern = r"^did:[a-z]+:[a-zA-Z0-9._:%-]*[a-zA-Z0-9._-]$"
+
if not re.match(pattern, value):
+
raise FormatError("Invalid DID format", "did", value)
+
+
if len(value) > 2048:
+
raise FormatError("DID too long", "did", value)
+
+
return value
+
+
@staticmethod
+
def validate_handle(value: str) -> str:
+
"""Validate handle format."""
+
if len(value) > 253:
+
raise FormatError("Handle too long", "handle", value)
+
+
labels = value.lower().split(".")
+
if len(labels) < 2:
+
raise FormatError("Handle must contain at least one dot", "handle", value)
+
+
for i, label in enumerate(labels):
+
if not (1 <= len(label) <= 63):
+
raise FormatError(
+
f"Label {i+1} length must be 1-63 characters", "handle", value
+
)
+
+
if not re.match(r"^[a-z0-9-]+$", label):
+
raise FormatError(
+
f"Label {i+1} contains invalid characters", "handle", value
+
)
+
+
if label.startswith("-") or label.endswith("-"):
+
raise FormatError(
+
f"Label {i+1} cannot start or end with hyphen", "handle", value
+
)
+
+
if labels[-1][0].isdigit():
+
raise FormatError("TLD cannot start with digit", "handle", value)
+
+
return value
+
+
@staticmethod
+
def validate_nsid(value: str) -> str:
+
"""Validate NSID format."""
+
if len(value) > 317:
+
raise FormatError("NSID too long", "nsid", value)
+
+
if not all(ord(c) < 128 for c in value):
+
raise FormatError("NSID must contain only ASCII characters", "nsid", value)
+
+
if value.startswith(".") or value.endswith("."):
+
raise FormatError("NSID cannot start or end with dot", "nsid", value)
+
+
segments = value.split(".")
+
if len(segments) < 3:
+
raise FormatError("NSID must have at least 3 segments", "nsid", value)
+
+
# Validate domain authority segments
+
for i, segment in enumerate(segments[:-1]):
+
if not (1 <= len(segment) <= 63):
+
raise FormatError(
+
f"Domain segment {i+1} length must be 1-63", "nsid", value
+
)
+
+
if not re.match(r"^[a-z0-9-]+$", segment):
+
raise FormatError(
+
f"Domain segment {i+1} contains invalid chars", "nsid", value
+
)
+
+
if segment.startswith("-") or segment.endswith("-"):
+
raise FormatError(
+
f"Domain segment {i+1} cannot start/end with hyphen", "nsid", value
+
)
+
+
# Validate name segment
+
name = segments[-1]
+
if not (1 <= len(name) <= 63):
+
raise FormatError("Name segment length must be 1-63", "nsid", value)
+
+
if not re.match(r"^[a-zA-Z0-9]+$", name):
+
raise FormatError("Name segment contains invalid characters", "nsid", value)
+
+
if name[0].isdigit():
+
raise FormatError("Name segment cannot start with digit", "nsid", value)
+
+
return value
+
+
@staticmethod
+
def validate_uri(value: str) -> str:
+
"""Validate URI format."""
+
if len(value) > 8192: # 8 KB limit
+
raise FormatError("URI too long", "uri", value)
+
+
# Basic URI pattern validation
+
uri_pattern = r"^[a-zA-Z][a-zA-Z0-9+.-]*:.*$"
+
if not re.match(uri_pattern, value):
+
raise FormatError("Invalid URI format", "uri", value)
+
+
return value
+
+
@staticmethod
+
def validate_cid(value: str) -> str:
+
"""Validate CID format."""
+
# Basic CID pattern validation (simplified)
+
cid_pattern = r"^[a-zA-Z0-9]+$"
+
if not re.match(cid_pattern, value):
+
raise FormatError("Invalid CID format", "cid", value)
+
+
return value
+
+
@staticmethod
+
def validate_at_identifier(value: str) -> str:
+
"""Validate at-identifier format (DID or handle)."""
+
try:
+
# Try DID first
+
return FormatValidator.validate_did(value)
+
except FormatError:
+
try:
+
# Fall back to handle
+
return FormatValidator.validate_handle(value)
+
except FormatError:
+
raise FormatError(
+
"Invalid at-identifier (not a DID or handle)",
+
"at-identifier",
+
value,
+
)
+
+
@staticmethod
+
def validate_at_uri(value: str) -> str:
+
"""Validate at-uri format."""
+
if not value.startswith("at://"):
+
raise FormatError("AT URI must start with 'at://'", "at-uri", value)
+
+
# Additional validation can be added here
+
return value
+
+
@staticmethod
+
def validate_language(value: str) -> str:
+
"""Validate language tag format."""
+
# BCP 47 pattern validation
+
pattern = r"^[a-zA-Z]{1,8}(?:-[a-zA-Z0-9]{1,8})*$"
+
if not re.match(pattern, value):
+
raise FormatError("Invalid language tag format", "language", value)
+
+
return value
+
+
+
# Global validator instance
+
format_validator = FormatValidator()
+125
src/atpasser/data/serializer.py
···
+
"""Serializer for ATProto data model formats."""
+
+
import json
+
import base64
+
from typing import Any, Dict, Type, Union, Optional
+
from pydantic import BaseModel
+
from .exceptions import SerializationError, ValidationError
+
from .types import CIDLink
+
+
+
class ATProtoSerializer:
+
"""Serializer for ATProto JSON and CBOR formats."""
+
+
def __init__(self):
+
self.json_encoder = JSONEncoder()
+
self.json_decoder = JSONDecoder()
+
+
def to_json(self, obj: Any, indent: Optional[int] = None) -> str:
+
"""Serialize object to ATProto JSON format."""
+
try:
+
if isinstance(obj, BaseModel):
+
obj = obj.model_dump(mode="json")
+
+
serialized = self.json_encoder.encode(obj)
+
return json.dumps(serialized, indent=indent, ensure_ascii=False)
+
except Exception as e:
+
raise SerializationError(f"JSON serialization failed: {str(e)}")
+
+
def from_json(
+
self, data: Union[str, bytes, dict], model: Optional[Type[BaseModel]] = None
+
) -> Any:
+
"""Deserialize from ATProto JSON format."""
+
try:
+
if isinstance(data, (str, bytes)):
+
data = json.loads(data)
+
+
decoded = self.json_decoder.decode(data)
+
+
if model and issubclass(model, BaseModel):
+
return model.model_validate(decoded)
+
return decoded
+
except Exception as e:
+
raise SerializationError(f"JSON deserialization failed: {str(e)}")
+
+
def to_cbor(self, obj: Any) -> bytes:
+
"""Serialize object to DAG-CBOR format."""
+
try:
+
# This is a placeholder - actual CBOR implementation would go here
+
# For now, we'll convert to JSON and then encode as bytes
+
json_str = self.to_json(obj)
+
return json_str.encode("utf-8")
+
except Exception as e:
+
raise SerializationError(f"CBOR serialization failed: {str(e)}")
+
+
def from_cbor(self, data: bytes, model: Optional[Type[BaseModel]] = None) -> Any:
+
"""Deserialize from DAG-CBOR format."""
+
try:
+
# This is a placeholder - actual CBOR implementation would go here
+
# For now, we'll decode from bytes and then parse JSON
+
json_str = data.decode("utf-8")
+
return self.from_json(json_str, model)
+
except Exception as e:
+
raise SerializationError(f"CBOR deserialization failed: {str(e)}")
+
+
+
class JSONEncoder:
+
"""Encodes Python objects to ATProto JSON format."""
+
+
def encode(self, obj: Any) -> Any:
+
"""Recursively encode object to ATProto JSON format."""
+
if isinstance(obj, dict):
+
return {k: self.encode(v) for k, v in obj.items()}
+
elif isinstance(obj, list):
+
return [self.encode(item) for item in obj]
+
elif isinstance(obj, CIDLink):
+
return obj.to_json()
+
elif isinstance(obj, bytes):
+
return self._encode_bytes(obj)
+
else:
+
return obj
+
+
def _encode_bytes(self, data: bytes) -> Dict[str, str]:
+
"""Encode bytes to ATProto bytes format."""
+
return {"$bytes": base64.b64encode(data).decode("ascii")}
+
+
+
class JSONDecoder:
+
"""Decodes ATProto JSON format to Python objects."""
+
+
def decode(self, obj: Any) -> Any:
+
"""Recursively decode ATProto JSON format to Python objects."""
+
if isinstance(obj, dict):
+
return self._decode_object(obj)
+
elif isinstance(obj, list):
+
return [self.decode(item) for item in obj]
+
else:
+
return obj
+
+
def _decode_object(self, obj: Dict[str, Any]) -> Any:
+
"""Decode a JSON object, handling special ATProto formats."""
+
if len(obj) == 1:
+
key = next(iter(obj.keys()))
+
value = obj[key]
+
+
if key == "$link" and isinstance(value, str):
+
return CIDLink(value)
+
elif key == "$bytes" and isinstance(value, str):
+
return self._decode_bytes(value)
+
elif key == "$type" and value == "blob":
+
# This would be handled by a blob-specific decoder
+
return obj
+
+
# Regular object - decode recursively
+
return {k: self.decode(v) for k, v in obj.items()}
+
+
def _decode_bytes(self, value: str) -> bytes:
+
"""Decode ATProto bytes format."""
+
try:
+
return base64.b64decode(value)
+
except Exception as e:
+
raise SerializationError(f"Invalid base64 encoding: {str(e)}")
+
+
+
# Global serializer instance
+
serializer = ATProtoSerializer()
+179
src/atpasser/data/types.py
···
+
"""Base types for ATProto data model."""
+
+
from typing import Any, Union, Optional, TypeVar
+
from datetime import datetime
+
import re
+
import base64
+
from pydantic import BaseModel, Field, validator
+
from .exceptions import ValidationError, FormatError
+
+
T = TypeVar("T")
+
+
+
class CIDLink:
+
"""Represents a CID link in ATProto data model."""
+
+
def __init__(self, cid: Union[str, bytes]):
+
if isinstance(cid, bytes):
+
# Convert bytes to string representation
+
# This is a simplified implementation
+
self.cid = f"bafy{base64.b64encode(cid).decode()[:44]}"
+
else:
+
self.cid = cid
+
+
def __str__(self) -> str:
+
return self.cid
+
+
def __eq__(self, other: Any) -> bool:
+
if isinstance(other, CIDLink):
+
return self.cid == other.cid
+
elif isinstance(other, str):
+
return self.cid == other
+
return False
+
+
def to_json(self) -> dict:
+
"""Convert to JSON representation."""
+
return {"$link": self.cid}
+
+
@classmethod
+
def from_json(cls, data: dict) -> "CIDLink":
+
"""Create from JSON representation."""
+
if not isinstance(data, dict) or "$link" not in data:
+
raise ValidationError(
+
"Invalid CID link format", expected="{'$link': 'cid_string'}"
+
)
+
return cls(data["$link"])
+
+
+
class DateTimeString(str):
+
"""RFC 3339 datetime string with validation."""
+
+
@classmethod
+
def __get_validators__(cls):
+
yield cls.validate
+
+
@classmethod
+
def validate(cls, v: Any) -> "DateTimeString":
+
if not isinstance(v, str):
+
raise ValidationError("Must be a string", actual=type(v).__name__)
+
+
# RFC 3339 pattern validation
+
pattern = (
+
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:\d{2})$"
+
)
+
if not re.match(pattern, v):
+
raise FormatError("Invalid RFC 3339 datetime format", "datetime", v)
+
+
# Additional semantic validation
+
try:
+
# Try to parse to ensure it's a valid datetime
+
datetime.fromisoformat(v.replace("Z", "+00:00").replace("z", "+00:00"))
+
except ValueError:
+
raise FormatError("Invalid datetime value", "datetime", v)
+
+
return cls(v)
+
+
+
class LanguageTag(str):
+
"""BCP 47 language tag with validation."""
+
+
@classmethod
+
def __get_validators__(cls):
+
yield cls.validate
+
+
@classmethod
+
def validate(cls, v: Any) -> "LanguageTag":
+
if not isinstance(v, str):
+
raise ValidationError("Must be a string", actual=type(v).__name__)
+
+
# Basic BCP 47 pattern validation
+
pattern = r"^[a-zA-Z]{1,8}(?:-[a-zA-Z0-9]{1,8})*$"
+
if not re.match(pattern, v):
+
raise FormatError("Invalid BCP 47 language tag format", "language", v)
+
+
return cls(v)
+
+
+
class ATUri(str):
+
"""AT Protocol URI with validation."""
+
+
@classmethod
+
def __get_validators__(cls):
+
yield cls.validate
+
+
@classmethod
+
def validate(cls, v: Any) -> "ATUri":
+
if not isinstance(v, str):
+
raise ValidationError("Must be a string", actual=type(v).__name__)
+
+
# Basic AT URI validation
+
if not v.startswith("at://"):
+
raise FormatError("AT URI must start with 'at://'", "at-uri", v)
+
+
# Additional validation can be added here
+
return cls(v)
+
+
+
class DIDString(str):
+
"""DID string with validation."""
+
+
@classmethod
+
def __get_validators__(cls):
+
yield cls.validate
+
+
@classmethod
+
def validate(cls, v: Any) -> "DIDString":
+
if not isinstance(v, str):
+
raise ValidationError("Must be a string", actual=type(v).__name__)
+
+
# Basic DID format validation
+
pattern = r"^did:[a-z]+:[a-zA-Z0-9._:%-]*[a-zA-Z0-9._-]$"
+
if not re.match(pattern, v):
+
raise FormatError("Invalid DID format", "did", v)
+
+
return cls(v)
+
+
+
class HandleString(str):
+
"""Handle string with validation."""
+
+
@classmethod
+
def __get_validators__(cls):
+
yield cls.validate
+
+
@classmethod
+
def validate(cls, v: Any) -> "HandleString":
+
if not isinstance(v, str):
+
raise ValidationError("Must be a string", actual=type(v).__name__)
+
+
# Basic handle validation
+
if len(v) > 253:
+
raise FormatError("Handle too long", "handle", v)
+
+
labels = v.lower().split(".")
+
if len(labels) < 2:
+
raise FormatError("Handle must contain at least one dot", "handle", v)
+
+
return cls(v)
+
+
+
class NSIDString(str):
+
"""NSID string with validation."""
+
+
@classmethod
+
def __get_validators__(cls):
+
yield cls.validate
+
+
@classmethod
+
def validate(cls, v: Any) -> "NSIDString":
+
if not isinstance(v, str):
+
raise ValidationError("Must be a string", actual=type(v).__name__)
+
+
# Basic NSID validation
+
if len(v) > 317:
+
raise FormatError("NSID too long", "nsid", v)
+
+
if not all(ord(c) < 128 for c in v):
+
raise FormatError("NSID must contain only ASCII characters", "nsid", v)
+
+
return cls(v)
+263
src/atpasser/lexicon/ARCHITECTURE.md
···
+
# ATProto Lexicon 模块架构设计
+
+
## 概述
+
+
本模块负责解析、验证和管理 ATProto Lexicon 定义文件,将 JSON Schema 转换为可执行的 Pydantic 模型,并提供类型安全的接口。
+
+
## 核心架构设计
+
+
### 1. Lexicon 解析系统
+
+
#### 1.1 解析器层级结构
+
+
```
+
LexiconParser
+
├── DefinitionParser
+
│ ├── PrimaryDefinitionParser
+
│ │ ├── RecordParser
+
│ │ ├── QueryParser
+
│ │ ├── ProcedureParser
+
│ │ └── SubscriptionParser
+
│ └── FieldDefinitionParser
+
│ ├── SimpleTypeParser
+
│ ├── CompoundTypeParser
+
│ └── MetaTypeParser
+
└── Validator
+
├── SchemaValidator
+
└── CrossReferenceValidator
+
```
+
+
#### 1.2 解析流程
+
+
1. **加载 Lexicon JSON**: 读取并验证 Lexicon 文件结构
+
2. **解析定义**: 根据类型分发到相应的解析器
+
3. **构建模型**: 生成对应的 Pydantic 模型类
+
4. **验证引用**: 检查跨定义引用的有效性
+
5. **注册模型**: 将模型注册到全局注册表
+
+
### 2. 类型映射系统
+
+
#### 2.1 Lexicon 类型到 Python 类型映射
+
+
```python
+
LEXICON_TYPE_MAPPING = {
+
"null": None,
+
"boolean": bool,
+
"integer": int,
+
"string": str,
+
"bytes": bytes,
+
"cid-link": "CIDLink",
+
"blob": "BlobRef",
+
"array": list,
+
"object": dict,
+
"params": dict,
+
"token": "LexiconToken",
+
"ref": "LexiconRef",
+
"union": "LexiconUnion",
+
"unknown": Any,
+
"record": "RecordModel",
+
"query": "QueryModel",
+
"procedure": "ProcedureModel",
+
"subscription": "SubscriptionModel"
+
}
+
```
+
+
#### 2.2 自定义类型处理器
+
+
- **LexiconRef**: 处理跨定义引用解析
+
- **LexiconUnion**: 处理联合类型验证
+
- **LexiconToken**: 处理符号化值
+
- **RecordModel**: 记录类型基类
+
- **QueryModel**: 查询类型基类
+
+
### 3. 模型生成系统
+
+
#### 3.1 动态模型生成
+
+
```python
+
class ModelGenerator:
+
"""动态生成 Pydantic 模型"""
+
+
def generate_record_model(self, definition: dict) -> Type[BaseModel]:
+
"""生成记录模型"""
+
pass
+
+
def generate_query_model(self, definition: dict) -> Type[BaseModel]:
+
"""生成查询模型"""
+
pass
+
+
def generate_field_validator(self, field_def: dict) -> Callable:
+
"""生成字段验证器"""
+
pass
+
```
+
+
#### 3.2 约束处理
+
+
```python
+
class ConstraintProcessor:
+
"""处理字段约束"""
+
+
def process_integer_constraints(self, field_def: dict) -> dict:
+
"""处理整数约束 (min, max, enum)"""
+
pass
+
+
def process_string_constraints(self, field_def: dict) -> dict:
+
"""处理字符串约束 (format, length, enum)"""
+
pass
+
+
def process_array_constraints(self, field_def: dict) -> dict:
+
"""处理数组约束 (minLength, maxLength)"""
+
pass
+
```
+
+
### 4. 注册表和缓存机制
+
+
#### 4.1 模型注册表
+
+
```python
+
class LexiconRegistry:
+
"""Lexicon 模型注册表"""
+
+
def __init__(self):
+
self._models: Dict[str, Type[BaseModel]] = {}
+
self._definitions: Dict[str, dict] = {}
+
self._ref_cache: Dict[str, Type[BaseModel]] = {}
+
+
def register(self, nsid: str, model: Type[BaseModel], definition: dict):
+
"""注册 Lexicon 模型"""
+
pass
+
+
def get_model(self, nsid: str) -> Optional[Type[BaseModel]]:
+
"""获取已注册的模型"""
+
pass
+
+
def resolve_ref(self, ref: str) -> Optional[Type[BaseModel]]:
+
"""解析引用到具体模型"""
+
pass
+
+
def clear_cache(self):
+
"""清空缓存"""
+
pass
+
```
+
+
#### 4.2 缓存策略
+
+
- **内存缓存**: 缓存已解析的模型定义
+
- **文件缓存**: 缓存序列化结果以提高性能
+
- **LRU 策略**: 使用最近最少使用算法管理缓存
+
+
### 5. 验证系统
+
+
#### 5.1 验证层级
+
+
1. **语法验证**: JSON Schema 结构验证
+
2. **语义验证**: 类型约束和业务规则验证
+
3. **引用验证**: 跨定义引用有效性验证
+
4. **兼容性验证**: 前向和后向兼容性检查
+
+
#### 5.2 自定义验证器
+
+
```python
+
class LexiconValidator:
+
"""Lexicon 定义验证器"""
+
+
def validate_definition(self, definition: dict) -> bool:
+
"""验证 Lexicon 定义完整性"""
+
pass
+
+
def validate_refs(self, definition: dict) -> List[str]:
+
"""验证所有引用的有效性"""
+
pass
+
+
def validate_compatibility(self, old_def: dict, new_def: dict) -> bool:
+
"""验证版本兼容性"""
+
pass
+
```
+
+
### 6. 错误处理系统
+
+
#### 6.1 错误类型体系
+
+
```python
+
class LexiconError(Exception):
+
"""基础 Lexicon 错误"""
+
pass
+
+
class ParseError(LexiconError):
+
"""解析错误"""
+
pass
+
+
class ValidationError(LexiconError):
+
"""验证错误"""
+
pass
+
+
class ResolutionError(LexiconError):
+
"""引用解析错误"""
+
pass
+
+
class GenerationError(LexiconError):
+
"""模型生成错误"""
+
pass
+
```
+
+
#### 6.2 诊断信息
+
+
- **详细错误消息**: 包含具体的字段路径和期望值
+
- **上下文信息**: 提供验证时的上下文信息
+
- **建议修复**: 提供可能的修复建议
+
+
### 7. 模块文件结构
+
+
```
+
src/atpasser/lexicon/
+
├── __init__.py # 模块导出
+
├── ARCHITECTURE.md # 架构文档
+
├── parser.py # 主解析器
+
├── generator.py # 模型生成器
+
├── registry.py # 注册表实现
+
├── validator.py # 验证器实现
+
├── types.py # 类型定义
+
├── exceptions.py # 异常定义
+
├── constraints.py # 约束处理器
+
└── utils.py # 工具函数
+
```
+
+
### 8. 依赖关系
+
+
- **内部依赖**:
+
- `src/atpasser/data` (数据序列化和验证)
+
- `src/atpasser/uri` (NSID 验证和处理)
+
- **外部依赖**:
+
- `pydantic`: 模型生成和验证
+
- `jsonpath-ng`: JSONPath 支持
+
- `cbor2`: CBOR 序列化支持
+
+
## 实现策略
+
+
### 1. 渐进式实现
+
+
1. **阶段一**: 实现基础解析器和简单类型映射
+
2. **阶段二**: 添加复杂类型和引用解析
+
3. **阶段三**: 实现模型生成和注册表
+
4. **阶段四**: 添加高级验证和错误处理
+
+
### 2. 测试策略
+
+
- **单元测试**: 测试各个解析器组件
+
- **集成测试**: 测试端到端 Lexicon 解析流程
+
- **兼容性测试**: 确保与现有 Lexicon 文件兼容
+
- **性能测试**: 验证解析和模型生成性能
+
+
### 3. 扩展性考虑
+
+
- **插件系统**: 支持自定义类型解析器
+
- **中间件**: 支持预处理和后处理钩子
+
- **监控**: 集成性能监控和日志记录
+
+
## 优势
+
+
1. **类型安全**: 利用 Pydantic 的强类型系统
+
2. **性能**: 优化的解析和缓存机制
+
3. **可扩展**: 模块化设计支持未来扩展
+
4. **兼容性**: 保持与 ATProto Lexicon 规范完全兼容
+
5. **开发者友好**: 提供清晰的错误消息和文档
+71
src/atpasser/lexicon/__init__.py
···
+
"""ATProto Lexicon module for parsing and managing schema definitions."""
+
+
from .exceptions import (
+
LexiconError,
+
ParseError,
+
ValidationError,
+
ResolutionError,
+
GenerationError,
+
CompatibilityError,
+
)
+
+
from .types import (
+
LexiconType,
+
LexiconDefinition,
+
IntegerConstraints,
+
StringConstraints,
+
ArrayConstraints,
+
ObjectConstraints,
+
BlobConstraints,
+
ParamsConstraints,
+
RefDefinition,
+
UnionDefinition,
+
RecordDefinition,
+
QueryDefinition,
+
ProcedureDefinition,
+
SubscriptionDefinition,
+
LexiconDocument,
+
ErrorDefinition,
+
LexiconSchema,
+
PropertyMap,
+
DefinitionMap,
+
)
+
+
from .registry import LexiconRegistry, registry
+
from .parser import LexiconParser, parser
+
+
__all__ = [
+
# Exceptions
+
"LexiconError",
+
"ParseError",
+
"ValidationError",
+
"ResolutionError",
+
"GenerationError",
+
"CompatibilityError",
+
# Types
+
"LexiconType",
+
"LexiconDefinition",
+
"IntegerConstraints",
+
"StringConstraints",
+
"ArrayConstraints",
+
"ObjectConstraints",
+
"BlobConstraints",
+
"ParamsConstraints",
+
"RefDefinition",
+
"UnionDefinition",
+
"RecordDefinition",
+
"QueryDefinition",
+
"ProcedureDefinition",
+
"SubscriptionDefinition",
+
"LexiconDocument",
+
"ErrorDefinition",
+
"LexiconSchema",
+
"PropertyMap",
+
"DefinitionMap",
+
# Registry
+
"LexiconRegistry",
+
"registry",
+
# Parser
+
"LexiconParser",
+
"parser",
+
]
+125
src/atpasser/lexicon/exceptions.py
···
+
"""Exceptions for ATProto Lexicon module."""
+
+
from typing import Optional
+
+
+
class LexiconError(Exception):
+
"""Base exception for Lexicon errors."""
+
+
def __init__(self, message: str, details: Optional[str] = None):
+
self.message = message
+
self.details = details
+
super().__init__(message)
+
+
+
class ParseError(LexiconError):
+
"""Raised when Lexicon parsing fails."""
+
+
def __init__(
+
self, message: str, nsid: Optional[str] = None, definition: Optional[str] = None
+
):
+
self.nsid = nsid
+
self.definition = definition
+
+
details = []
+
if nsid:
+
details.append(f"NSID: {nsid}")
+
if definition:
+
details.append(f"Definition: {definition}")
+
+
super().__init__(
+
f"Parse error: {message}", "; ".join(details) if details else None
+
)
+
+
+
class ValidationError(LexiconError):
+
"""Raised when Lexicon validation fails."""
+
+
def __init__(
+
self,
+
message: str,
+
nsid: Optional[str] = None,
+
field: Optional[str] = None,
+
expected: Optional[str] = None,
+
):
+
self.nsid = nsid
+
self.field = field
+
self.expected = expected
+
+
details = []
+
if nsid:
+
details.append(f"NSID: {nsid}")
+
if field:
+
details.append(f"Field: {field}")
+
if expected:
+
details.append(f"Expected: {expected}")
+
+
super().__init__(
+
f"Validation error: {message}", "; ".join(details) if details else None
+
)
+
+
+
class ResolutionError(LexiconError):
+
"""Raised when reference resolution fails."""
+
+
def __init__(
+
self, message: str, ref: Optional[str] = None, context: Optional[str] = None
+
):
+
self.ref = ref
+
self.context = context
+
+
details = []
+
if ref:
+
details.append(f"Reference: {ref}")
+
if context:
+
details.append(f"Context: {context}")
+
+
super().__init__(
+
f"Resolution error: {message}", "; ".join(details) if details else None
+
)
+
+
+
class GenerationError(LexiconError):
+
"""Raised when model generation fails."""
+
+
def __init__(
+
self,
+
message: str,
+
nsid: Optional[str] = None,
+
definition_type: Optional[str] = None,
+
):
+
self.nsid = nsid
+
self.definitionType = definition_type
+
+
details = []
+
if nsid:
+
details.append(f"NSID: {nsid}")
+
if definition_type:
+
details.append(f"Type: {definition_type}")
+
+
super().__init__(
+
f"Generation error: {message}", "; ".join(details) if details else None
+
)
+
+
+
class CompatibilityError(LexiconError):
+
"""Raised when compatibility checks fail."""
+
+
def __init__(
+
self,
+
message: str,
+
old_nsid: Optional[str] = None,
+
new_nsid: Optional[str] = None,
+
):
+
self.oldNsid = old_nsid
+
self.newNsid = new_nsid
+
+
details = []
+
if old_nsid:
+
details.append(f"Old NSID: {old_nsid}")
+
if new_nsid:
+
details.append(f"New NSID: {new_nsid}")
+
+
super().__init__(
+
f"Compatibility error: {message}", "; ".join(details) if details else None
+
)
+208
src/atpasser/lexicon/parser.py
···
+
"""Parser for ATProto Lexicon definitions."""
+
+
import json
+
from typing import Dict, Any, Optional, Type, Union
+
from pydantic import BaseModel, create_model
+
from .exceptions import ParseError, ValidationError
+
from .types import LexiconDocument, LexiconType
+
from .registry import registry
+
+
+
class LexiconParser:
+
"""Parser for ATProto Lexicon JSON definitions."""
+
+
def __init__(self):
+
self.validators = LexiconValidator()
+
+
def parse_document(self, json_data: Union[str, dict]) -> LexiconDocument:
+
"""Parse a Lexicon JSON document."""
+
try:
+
if isinstance(json_data, str):
+
data = json.loads(json_data)
+
else:
+
data = json_data
+
+
# Validate basic document structure
+
self.validators.validate_document_structure(data)
+
+
# Parse into Pydantic model
+
document = LexiconDocument.model_validate(data)
+
+
# Validate semantic rules
+
self.validators.validate_document_semantics(document)
+
+
return document
+
+
except Exception as e:
+
if isinstance(e, (ParseError, ValidationError)):
+
raise
+
raise ParseError(f"Failed to parse Lexicon document: {str(e)}")
+
+
def parse_and_register(self, json_data: Union[str, dict]) -> None:
+
"""Parse a Lexicon document and register it."""
+
document = self.parse_document(json_data)
+
registry.register_lexicon(document)
+
+
# Generate and register models for all definitions
+
generator = ModelGenerator()
+
for def_name, def_data in document.defs.items():
+
try:
+
model = generator.generate_model(document.id, def_name, def_data)
+
registry.register_model(document.id, model, def_name)
+
except Exception as e:
+
raise ParseError(
+
f"Failed to generate model for {def_name}: {str(e)}",
+
document.id,
+
def_name,
+
)
+
+
+
class LexiconValidator:
+
"""Validator for Lexicon documents."""
+
+
def validate_document_structure(self, data: Dict[str, Any]) -> None:
+
"""Validate basic document structure."""
+
required_fields = ["lexicon", "id", "defs"]
+
for field in required_fields:
+
if field not in data:
+
raise ValidationError(f"Missing required field: {field}")
+
+
if not isinstance(data["defs"], dict) or not data["defs"]:
+
raise ValidationError("defs must be a non-empty dictionary")
+
+
if data["lexicon"] != 1:
+
raise ValidationError("lexicon version must be 1")
+
+
def validate_document_semantics(self, document: LexiconDocument) -> None:
+
"""Validate semantic rules for Lexicon document."""
+
# Check primary type constraints
+
primary_types = {
+
LexiconType.RECORD,
+
LexiconType.QUERY,
+
LexiconType.PROCEDURE,
+
LexiconType.SUBSCRIPTION,
+
}
+
+
primary_defs = []
+
for def_name, def_data in document.defs.items():
+
def_type = def_data.get("type")
+
if def_type in primary_types:
+
primary_defs.append((def_name, def_type))
+
+
# Primary types should usually be named 'main'
+
if def_name != "main":
+
# This is a warning, not an error
+
pass
+
+
# Only one primary type allowed per document
+
if len(primary_defs) > 1:
+
raise ValidationError(
+
f"Multiple primary types found: {[name for name, _ in primary_defs]}",
+
document.id,
+
)
+
+
+
class ModelGenerator:
+
"""Generates Pydantic models from Lexicon definitions."""
+
+
def generate_model(
+
self, nsid: str, def_name: str, definition: Dict[str, Any]
+
) -> Type[BaseModel]:
+
"""Generate a Pydantic model from a Lexicon definition."""
+
def_type = definition.get("type")
+
+
if def_type == LexiconType.RECORD:
+
return self._generate_record_model(nsid, def_name, definition)
+
elif def_type == LexiconType.OBJECT:
+
return self._generate_object_model(nsid, def_name, definition)
+
elif def_type in [
+
LexiconType.QUERY,
+
LexiconType.PROCEDURE,
+
LexiconType.SUBSCRIPTION,
+
]:
+
return self._generate_primary_model(nsid, def_name, definition)
+
else:
+
# For simple types, create a basic model
+
return self._generate_simple_model(nsid, def_name, definition)
+
+
def _generate_record_model(
+
self, nsid: str, def_name: str, definition: Dict[str, Any]
+
) -> Type[BaseModel]:
+
"""Generate a model for record type."""
+
record_schema = definition.get("record", {})
+
return self._generate_object_model(nsid, def_name, record_schema)
+
+
def _generate_object_model(
+
self, nsid: str, def_name: str, definition: Dict[str, Any]
+
) -> Type[BaseModel]:
+
"""Generate a model for object type."""
+
properties = definition.get("properties", {})
+
required = definition.get("required", [])
+
+
field_definitions = {}
+
for prop_name, prop_schema in properties.items():
+
field_type = self._get_field_type(prop_schema)
+
field_definitions[prop_name] = (
+
field_type,
+
... if prop_name in required else None,
+
)
+
+
model_name = self._get_model_name(nsid, def_name)
+
return create_model(model_name, **field_definitions)
+
+
def _generate_primary_model(
+
self, nsid: str, def_name: str, definition: Dict[str, Any]
+
) -> Type[BaseModel]:
+
"""Generate a model for primary types (query, procedure, subscription)."""
+
# For now, create a basic model - specific handling can be added later
+
return self._generate_simple_model(nsid, def_name, definition)
+
+
def _generate_simple_model(
+
self, nsid: str, def_name: str, definition: Dict[str, Any]
+
) -> Type[BaseModel]:
+
"""Generate a simple model for basic types."""
+
field_type = self._get_field_type(definition)
+
model_name = self._get_model_name(nsid, def_name)
+
return create_model(model_name, value=(field_type, ...))
+
+
def _get_field_type(self, schema: Dict[str, Any]) -> Any:
+
"""Get the Python type for a schema definition."""
+
schema_type = schema.get("type")
+
+
type_mapping = {
+
LexiconType.NULL: type(None),
+
LexiconType.BOOLEAN: bool,
+
LexiconType.INTEGER: int,
+
LexiconType.STRING: str,
+
LexiconType.BYTES: bytes,
+
LexiconType.ARRAY: list,
+
LexiconType.OBJECT: dict,
+
}
+
+
if schema_type and schema_type in type_mapping:
+
return type_mapping[schema_type]
+
+
if schema_type == LexiconType.REF:
+
ref = schema.get("ref")
+
if ref:
+
return registry.resolve_ref(ref)
+
+
# Default to Any for complex types
+
return Any
+
+
def _get_model_name(self, nsid: str, def_name: str) -> str:
+
"""Generate a valid Python class name from NSID and definition name."""
+
# Convert NSID to PascalCase
+
parts = nsid.split(".")
+
name_parts = [part.capitalize() for part in parts]
+
+
# Add definition name
+
if def_name != "main":
+
def_part = def_name.capitalize()
+
name_parts.append(def_part)
+
+
return "".join(name_parts)
+
+
+
# Global parser instance
+
parser = LexiconParser()
+114
src/atpasser/lexicon/registry.py
···
+
"""Registry for managing Lexicon definitions and generated models."""
+
+
from typing import Dict, Optional, Type, Any
+
from pydantic import BaseModel
+
from .exceptions import ResolutionError
+
from .types import LexiconDocument
+
+
+
class LexiconRegistry:
+
"""Registry for storing and resolving Lexicon definitions and models."""
+
+
def __init__(self):
+
self._definitions: Dict[str, LexiconDocument] = {}
+
self._models: Dict[str, Type[BaseModel]] = {}
+
self._ref_cache: Dict[str, Type[BaseModel]] = {}
+
+
def register_lexicon(self, document: LexiconDocument) -> None:
+
"""Register a Lexicon document."""
+
nsid = document.id
+
if nsid in self._definitions:
+
raise ValueError(f"Lexicon with NSID {nsid} is already registered")
+
+
self._definitions[nsid] = document
+
+
# Clear cache for this NSID
+
self._clear_cache_for_nsid(nsid)
+
+
def get_lexicon(self, nsid: str) -> Optional[LexiconDocument]:
+
"""Get a registered Lexicon document by NSID."""
+
return self._definitions.get(nsid)
+
+
def register_model(
+
self, nsid: str, model: Type[BaseModel], definition_name: Optional[str] = None
+
) -> None:
+
"""Register a generated model for a Lexicon definition."""
+
key = self._get_model_key(nsid, definition_name)
+
self._models[key] = model
+
+
# Also cache for quick reference resolution
+
if definition_name and definition_name != "main":
+
ref_key = f"{nsid}#{definition_name}"
+
self._ref_cache[ref_key] = model
+
+
def get_model(
+
self, nsid: str, definition_name: Optional[str] = None
+
) -> Optional[Type[BaseModel]]:
+
"""Get a registered model by NSID and optional definition name."""
+
key = self._get_model_key(nsid, definition_name)
+
return self._models.get(key)
+
+
def resolve_ref(self, ref: str) -> Type[BaseModel]:
+
"""Resolve a reference to a model."""
+
if ref in self._ref_cache:
+
return self._ref_cache[ref]
+
+
# Parse the reference
+
if "#" in ref:
+
nsid, definition_name = ref.split("#", 1)
+
else:
+
nsid, definition_name = ref, "main"
+
+
model = self.get_model(nsid, definition_name)
+
if model is None:
+
raise ResolutionError(f"Reference not found: {ref}", ref)
+
+
# Cache for future use
+
self._ref_cache[ref] = model
+
return model
+
+
def has_lexicon(self, nsid: str) -> bool:
+
"""Check if a Lexicon is registered."""
+
return nsid in self._definitions
+
+
def has_model(self, nsid: str, definition_name: Optional[str] = None) -> bool:
+
"""Check if a model is registered."""
+
key = self._get_model_key(nsid, definition_name)
+
return key in self._models
+
+
def clear_cache(self) -> None:
+
"""Clear all cached models and references."""
+
self._models.clear()
+
self._ref_cache.clear()
+
+
def _get_model_key(self, nsid: str, definition_name: Optional[str]) -> str:
+
"""Get the internal key for model storage."""
+
if definition_name:
+
return f"{nsid}#{definition_name}"
+
return f"{nsid}#main"
+
+
def _clear_cache_for_nsid(self, nsid: str) -> None:
+
"""Clear cache entries for a specific NSID."""
+
# Clear models
+
keys_to_remove = [
+
key for key in self._models.keys() if key.startswith(f"{nsid}#")
+
]
+
for key in keys_to_remove:
+
del self._models[key]
+
+
# Clear ref cache
+
keys_to_remove = [key for key in self._ref_cache.keys() if key.startswith(nsid)]
+
for key in keys_to_remove:
+
del self._ref_cache[key]
+
+
def list_lexicons(self) -> Dict[str, LexiconDocument]:
+
"""List all registered Lexicon documents."""
+
return self._definitions.copy()
+
+
def list_models(self) -> Dict[str, Type[BaseModel]]:
+
"""List all registered models."""
+
return self._models.copy()
+
+
+
# Global registry instance
+
registry = LexiconRegistry()
+155
src/atpasser/lexicon/types.py
···
+
"""Type definitions for ATProto Lexicon module."""
+
+
from typing import Dict, List, Optional, Union, Any, Type
+
from enum import Enum
+
from pydantic import BaseModel, Field
+
+
+
class LexiconType(str, Enum):
+
"""Enumeration of Lexicon definition types."""
+
+
NULL = "null"
+
BOOLEAN = "boolean"
+
INTEGER = "integer"
+
STRING = "string"
+
BYTES = "bytes"
+
CID_LINK = "cid-link"
+
BLOB = "blob"
+
ARRAY = "array"
+
OBJECT = "object"
+
PARAMS = "params"
+
TOKEN = "token"
+
REF = "ref"
+
UNION = "union"
+
UNKNOWN = "unknown"
+
RECORD = "record"
+
QUERY = "query"
+
PROCEDURE = "procedure"
+
SUBSCRIPTION = "subscription"
+
+
+
class LexiconDefinition(BaseModel):
+
"""Base class for Lexicon definitions."""
+
+
type: LexiconType
+
description: Optional[str] = None
+
+
+
class IntegerConstraints(BaseModel):
+
"""Constraints for integer fields."""
+
+
minimum: Optional[int] = None
+
maximum: Optional[int] = None
+
enum: Optional[List[int]] = None
+
default: Optional[int] = None
+
const: Optional[int] = None
+
+
+
class StringConstraints(BaseModel):
+
"""Constraints for string fields."""
+
+
format: Optional[str] = None
+
maxLength: Optional[int] = None
+
minLength: Optional[int] = None
+
maxGraphemes: Optional[int] = None
+
minGraphemes: Optional[int] = None
+
knownValues: Optional[List[str]] = None
+
enum: Optional[List[str]] = None
+
default: Optional[str] = None
+
const: Optional[str] = None
+
+
+
class ArrayConstraints(BaseModel):
+
"""Constraints for array fields."""
+
+
items: Dict[str, Any] # Schema definition for array items
+
minLength: Optional[int] = None
+
maxLength: Optional[int] = None
+
+
+
class ObjectConstraints(BaseModel):
+
"""Constraints for object fields."""
+
+
properties: Dict[str, Dict[str, Any]] # Map of property names to schemas
+
required: Optional[List[str]] = None
+
nullable: Optional[List[str]] = None
+
+
+
class BlobConstraints(BaseModel):
+
"""Constraints for blob fields."""
+
+
accept: Optional[List[str]] = None # MIME types
+
maxSize: Optional[int] = None # Maximum size in bytes
+
+
+
class ParamsConstraints(BaseModel):
+
"""Constraints for params fields."""
+
+
properties: Dict[str, Dict[str, Any]]
+
required: Optional[List[str]] = None
+
+
+
class RefDefinition(BaseModel):
+
"""Reference definition."""
+
+
ref: str # Reference to another schema
+
+
+
class UnionDefinition(BaseModel):
+
"""Union type definition."""
+
+
refs: List[str] # List of references
+
closed: Optional[bool] = False # Whether union is closed
+
+
+
class RecordDefinition(LexiconDefinition):
+
"""Record type definition."""
+
+
key: str # Record key type
+
record: Dict[str, Any] # Object schema
+
+
+
class QueryDefinition(LexiconDefinition):
+
"""Query type definition."""
+
+
parameters: Optional[Dict[str, Any]] = None # Params schema
+
output: Optional[Dict[str, Any]] = None # Output schema
+
+
+
class ProcedureDefinition(LexiconDefinition):
+
"""Procedure type definition."""
+
+
parameters: Optional[Dict[str, Any]] = None # Params schema
+
input: Optional[Dict[str, Any]] = None # Input schema
+
output: Optional[Dict[str, Any]] = None # Output schema
+
errors: Optional[List[Dict[str, Any]]] = None # Error definitions
+
+
+
class SubscriptionDefinition(LexiconDefinition):
+
"""Subscription type definition."""
+
+
parameters: Optional[Dict[str, Any]] = None # Params schema
+
message: Optional[Dict[str, Any]] = None # Message schema
+
errors: Optional[List[Dict[str, Any]]] = None # Error definitions
+
+
+
class LexiconDocument(BaseModel):
+
"""Complete Lexicon document."""
+
+
lexicon: int # Lexicon version (always 1)
+
id: str # NSID of the Lexicon
+
description: Optional[str] = None
+
defs: Dict[str, Dict[str, Any]] # Map of definition names to schemas
+
+
+
class ErrorDefinition(BaseModel):
+
"""Error definition for procedures and subscriptions."""
+
+
name: str # Error name
+
description: Optional[str] = None
+
+
+
# Type aliases for convenience
+
LexiconSchema = Dict[str, Any]
+
PropertyMap = Dict[str, LexiconSchema]
+
DefinitionMap = Dict[str, Union[LexiconDefinition, Dict[str, Any]]]