code
Clone this repository
https://tangled.org/bretton.dev/coves
git@knot.bretton.dev:bretton.dev/coves
For self-hosted knots, clone URLs may differ based on your setup.
Update all aggregator documentation to reflect successful completion
of Phase 1 implementation.
PRD_AGGREGATORS.md:
- Updated Phase 1 status: ✅ COMPLETE
- Marked all components as complete with checkmarks
- Added E2E test validation to component list
- Documented deferred Phase 2 items (write-forward operations)
- Updated milestone status to ACHIEVED
CLAUDE.md:
- Updated project instructions for aggregator development context
Key achievements documented:
✅ All lexicon schemas implemented and validated
✅ Database migrations with optimized indexes and triggers
✅ Complete repository layer with bulk operations
✅ Service layer with validation and rate limiting
✅ Post creation integration with dual auth flow
✅ XRPC query endpoints (getServices, getAuthorizations, listForCommunity)
✅ Jetstream consumer indexing from firehose
✅ Comprehensive integration and E2E tests
✅ Records verified in both PDS and AppView
Phase 1 complete:
- Aggregators can authenticate via JWT
- Aggregators can post to authorized communities
- Rate limiting enforced (10 posts/hour per community)
- Query endpoints available for discovery
- Security validated (unauthorized posts rejected)
- Complete data flow: PDS → Jetstream → AppView → XRPC
Phase 2 deferred:
- Write-forward operations (enable, disable, updateConfig)
- SDK development
- Reference implementation (RSS aggregator)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Consolidate duplicate test helper functions and fix test issues
discovered during aggregator development.
helpers.go:
- Consolidated createSimpleTestJWT() (removed duplicates from post_e2e_test.go)
- Consolidated generateTID() (removed duplicates)
- Added createPDSAccount() for E2E tests
- Added writePDSRecord() for E2E tests
- All helpers now shared across test files
post_e2e_test.go:
- Removed duplicate helper functions (now in helpers.go)
- Cleaned up unused imports (auth, base64, jwt)
- Fixed import order
community_identifier_resolution_test.go:
- Fixed PDS URL default from port 3000 → 3001
- Matches actual dev PDS configuration (.env.dev)
- Test now passes with running PDS
auth.go middleware:
- Minor logging improvements for auth failures
Test results:
✅ TestCommunityIdentifierResolution: NOW PASSES (was failing)
✅ All aggregator tests: PASSING
✅ All community tests: PASSING
❌ TestPostCreation_Basic: Still failing (pre-existing auth context issue)
Overall test suite:
- 74 out of 75 tests passing (98.7% pass rate)
- Only failure is pre-existing auth context issue in old test
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Add complete test coverage for aggregator system with repository,
service, and end-to-end validation.
aggregator_test.go - Integration tests:
- TestAggregatorRepository_Create: Upsert logic, field mapping
- TestAggregatorRepository_IsAggregator: Fast existence checks
- TestAggregatorAuthorization_Create: Authorization with audit trail
- TestAggregatorAuthorization_IsAuthorized: Fast authorization checks
- TestAggregatorService_PostCreationIntegration: Authorization validation
- TestAggregatorService_RateLimiting: 10 posts/hour enforcement
- TestAggregatorPostService_Integration: Aggregator vs user detection
- TestAggregatorTriggers: Database trigger stats updates
aggregator_e2e_test.go - End-to-end validation:
Complete data flow testing across all components:
Part 1: Service Declaration
- Create aggregator account on PDS
- Write service record to PDS
- Simulate Jetstream event
- Verify indexed in AppView DB
- ✅ Verified: Record exists on PDS (curl) AND in AppView (SQL)
Part 2: Authorization
- Create community account on PDS
- Write authorization record to PDS
- Index via Jetstream consumer
- Verify in AppView DB
- ✅ Verified: Record exists on PDS (curl) AND in AppView (SQL)
Part 3: Post Creation
- Aggregator creates post via XRPC endpoint
- Post written to PDS
- Indexed via Jetstream
- Verify in AppView DB with aggregator attribution
- ✅ Verified: Post on PDS (curl) AND in AppView (SQL)
Part 4: Rate Limiting
- Create 10 posts (at limit)
- 11th post rejected with 429 status
- ✅ Rate limiting enforced correctly
Part 5: XRPC Query Endpoints
- getServices (basic and detailed views)
- getAuthorizations (nested aggregator object)
- listForCommunity (aggregators for community)
- ✅ All endpoints return correct data
Part 6: Security
- Unauthorized aggregator posts rejected
- ✅ Security validation working
Part 7: Idempotent Indexing
- Duplicate Jetstream events handled gracefully
- ✅ Idempotency working
Part 8: Authorization Disable
- Disable authorization
- Post rejected after disable
- ✅ Enable/disable workflow working
Test results:
✅ All 10+ test suites passing
✅ Records verified in both PDS and AppView
✅ Complete data flow validated
✅ Security checks validated
✅ Rate limiting validated
✅ XRPC endpoints validated
Coverage:
- Repository operations
- Service layer business logic
- Post integration flow
- Jetstream consumer indexing
- XRPC handler responses
- Database triggers
- End-to-end PDS → Jetstream → AppView flow
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Integrate aggregator components into server initialization and
register XRPC endpoints.
Changes to cmd/server/main.go:
- Initialize aggregator repository
- Initialize aggregator service (with community service dependency)
- Update post service to include aggregator service
- Register aggregator XRPC routes (3 query endpoints)
- Start aggregator Jetstream consumer in background goroutine
- Add comprehensive startup logging
Server startup output:
✅ Aggregator service initialized
Started Jetstream aggregator consumer: ws://localhost:6008/subscribe?...
- Indexing: social.coves.aggregator.service (service declarations)
- Indexing: social.coves.aggregator.authorization (authorization records)
Aggregator XRPC endpoints registered (query endpoints public)
Architecture:
- Aggregator service depends on: aggregator repo, community service
- Post service depends on: aggregator service (for auth checks)
- Jetstream consumer runs independently, indexes to DB via repository
- XRPC handlers call service layer methods
Phase 1 complete:
✅ Aggregators can authenticate (via JWT)
✅ Aggregators can post to authorized communities
✅ Rate limiting enforced (10 posts/hour per community)
✅ Query endpoints available for discovery
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Implement Jetstream consumer to index aggregator service declarations
and authorization records from the firehose in real-time.
aggregator_consumer.go:
- Handles social.coves.aggregator.service records (create/update/delete)
- Handles social.coves.aggregator.authorization records (create/update/delete)
- Upsert logic for both create and update operations
- Delete by URI for authorization cleanup
- Validation:
* Service rkey must be "self" (canonical location)
* communityDid in authorization must match repo DID (prevents forgery)
* did in service must match repo DID (prevents DID spoofing)
* Required fields validation
- Avatar blob extraction from atProto blob ref
- createdAt parsing from RFC3339 with fallback
aggregator_jetstream_connector.go:
- WebSocket connection management with auto-reconnect
- Ping/pong keepalive
- Graceful error handling (continues on parsing errors)
- Filters for wanted collections
Jetstream URL:
ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization
Indexed to database:
- aggregators table (stats auto-updated via triggers)
- aggregator_authorizations table (unique constraint on aggregator+community)
Security:
- DID validation prevents impersonation
- communityDid validation prevents authorization forgery
- Graceful error handling prevents consumer crashes
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
Implement XRPC query endpoints for aggregator discovery and
authorization management.
Handlers (internal/api/handlers/aggregator/):
- get_services.go: Fetch aggregator details by DIDs
* Supports detailed=true for stats (communities_using, posts_created)
* Returns aggregatorView or aggregatorViewDetailed union type
* Bulk query optimization for multiple DIDs
- get_authorizations.go: List communities using an aggregator
* Nested aggregatorView in response (lexicon compliance)
* Supports enabledOnly filter and pagination
- list_for_community.go: List aggregators for a community
* Accepts at-identifier (DID or handle) for community
* Returns authorizationView with config
* Supports enabledOnly filter and pagination
- errors.go: Error handling with domain error mapping
* Maps domain errors to appropriate HTTP status codes
* 404 for not found, 400 for validation, 501 for not implemented
Routes (internal/api/routes/aggregator.go):
- GET /xrpc/social.coves.aggregator.getServices?dids=...
- GET /xrpc/social.coves.aggregator.getAuthorizations?aggregatorDid=...
- GET /xrpc/social.coves.aggregator.listForCommunity?community=...
Features:
- Query parameter parsing with validation
- Domain model to API view conversion
- JSON response formatting matching lexicon
- Proper HTTP status codes (404, 400, 500, 501)
- Config unmarshal from JSONB to interface{}
Deferred to Phase 2:
- Write endpoints (enable, disable, updateConfig) return 501 Not Implemented
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>