A community based topic aggregation platform built on atproto

feat(posts): integrate aggregator authentication and authorization

Modify post creation flow to support aggregator posting with
server-side validation and rate limiting.

Changes to internal/core/posts/service.go:
- Server-side aggregator detection via database query
- Dual validation flow:
* Aggregators: Authorization + rate limits, skip membership checks
* Users: Existing visibility/membership validation
- Post tracking after successful creation for rate limiting
- Clear logging to distinguish aggregator vs user posts

Changes to internal/core/posts/errors.go:
- Added ErrRateLimitExceeded for aggregator rate limiting

Changes to internal/api/handlers/post/errors.go:
- Map both aggregators.ErrRateLimitExceeded and posts.ErrRateLimitExceeded to 429

Security:
- DID extracted from JWT (cryptographically verified)
- Database lookup confirms aggregator status (no client-provided flag)
- Authorization checked against indexed records from firehose
- Rate limiting: 10 posts/hour per community per aggregator

Flow:
1. Extract DID from JWT (verified by auth middleware)
2. Query: Is this DID an aggregator? (database lookup)
3a. If aggregator: Check authorization + rate limits
3b. If user: Check community membership/visibility
4. Write post to PDS
5. If aggregator: Record post for rate limiting

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+97 -22
internal
api
handlers
post
core
+6
internal/api/handlers/post/errors.go
···
package post
import (
"Coves/internal/core/posts"
"encoding/json"
"log"
···
case posts.IsNotFound(err):
writeError(w, http.StatusNotFound, "NotFound", err.Error())
default:
// Don't leak internal error details to clients
···
package post
import (
+
"Coves/internal/core/aggregators"
"Coves/internal/core/posts"
"encoding/json"
"log"
···
case posts.IsNotFound(err):
writeError(w, http.StatusNotFound, "NotFound", err.Error())
+
+
// Check both aggregator and post rate limit errors
+
case aggregators.IsRateLimited(err) || err == posts.ErrRateLimitExceeded:
+
writeError(w, http.StatusTooManyRequests, "RateLimitExceeded",
+
"Rate limit exceeded. Please try again later.")
default:
// Don't leak internal error details to clients
+3
internal/core/posts/errors.go
···
// ErrNotFound is returned when a post is not found by URI
ErrNotFound = errors.New("post not found")
)
// ValidationError represents a validation error with field context
···
// ErrNotFound is returned when a post is not found by URI
ErrNotFound = errors.New("post not found")
+
+
// ErrRateLimitExceeded is returned when an aggregator exceeds rate limits
+
ErrRateLimitExceeded = errors.New("rate limit exceeded")
)
// ValidationError represents a validation error with field context
+88 -22
internal/core/posts/service.go
···
package posts
import (
"Coves/internal/core/communities"
"bytes"
"context"
···
)
type postService struct {
-
repo Repository
-
communityService communities.Service
-
pdsURL string
}
// NewPostService creates a new post service
func NewPostService(
repo Repository,
communityService communities.Service,
pdsURL string,
) Service {
return &postService{
-
repo: repo,
-
communityService: communityService,
-
pdsURL: pdsURL,
}
}
// CreatePost creates a new post in a community
// Flow:
// 1. Validate input
-
// 2. Resolve community at-identifier (handle or DID) to DID
-
// 3. Fetch community from AppView
-
// 4. Ensure community has fresh PDS credentials
// 5. Build post record
// 6. Write to community's PDS repository
-
// 7. Return URI/CID (AppView indexes asynchronously via Jetstream)
func (s *postService) CreatePost(ctx context.Context, req CreatePostRequest) (*CreatePostResponse, error) {
-
// 1. Validate basic input
if err := s.validateCreateRequest(req); err != nil {
return nil, err
}
-
// 2. Resolve community at-identifier (handle or DID) to DID
// This accepts both formats per atProto best practices:
// - Handles: !gardening.communities.coves.social
// - DIDs: did:plc:abc123 or did:web:coves.social
···
return nil, fmt.Errorf("failed to resolve community identifier: %w", err)
}
-
// 3. Fetch community from AppView (includes all metadata)
community, err := s.communityService.GetByDID(ctx, communityDID)
if err != nil {
if communities.IsNotFound(err) {
···
return nil, fmt.Errorf("failed to fetch community: %w", err)
}
-
// 4. Check community visibility (Alpha: public/unlisted only)
-
// Beta will add membership checks for private communities
-
if community.Visibility == "private" {
-
return nil, ErrNotAuthorized
}
-
// 5. Ensure community has fresh PDS credentials (token refresh if needed)
community, err = s.communityService.EnsureFreshToken(ctx, community)
if err != nil {
return nil, fmt.Errorf("failed to refresh community credentials: %w", err)
}
-
// 6. Build post record for PDS
postRecord := PostRecord{
Type: "social.coves.post.record",
Community: communityDID,
···
CreatedAt: time.Now().UTC().Format(time.RFC3339),
}
-
// 7. Write to community's PDS repository
uri, cid, err := s.createPostOnPDS(ctx, community, postRecord)
if err != nil {
return nil, fmt.Errorf("failed to write post to PDS: %w", err)
}
-
// 8. Return response (AppView will index via Jetstream consumer)
-
log.Printf("[POST-CREATE] Author: %s, Community: %s, URI: %s", req.AuthorDID, communityDID, uri)
return &CreatePostResponse{
URI: uri,
···
package posts
import (
+
"Coves/internal/api/middleware"
+
"Coves/internal/core/aggregators"
"Coves/internal/core/communities"
"bytes"
"context"
···
)
type postService struct {
+
repo Repository
+
communityService communities.Service
+
aggregatorService aggregators.Service
+
pdsURL string
}
// NewPostService creates a new post service
+
// aggregatorService can be nil if aggregator support is not needed (e.g., in tests or minimal setups)
func NewPostService(
repo Repository,
communityService communities.Service,
+
aggregatorService aggregators.Service, // Optional: can be nil
pdsURL string,
) Service {
return &postService{
+
repo: repo,
+
communityService: communityService,
+
aggregatorService: aggregatorService,
+
pdsURL: pdsURL,
}
}
// CreatePost creates a new post in a community
// Flow:
// 1. Validate input
+
// 2. Check if author is an aggregator (server-side validation using DID from JWT)
+
// 3. If aggregator: validate authorization and rate limits, skip membership checks
+
// 4. If user: resolve community and perform membership/ban validation
// 5. Build post record
// 6. Write to community's PDS repository
+
// 7. If aggregator: record post for rate limiting
+
// 8. Return URI/CID (AppView indexes asynchronously via Jetstream)
func (s *postService) CreatePost(ctx context.Context, req CreatePostRequest) (*CreatePostResponse, error) {
+
// 1. SECURITY: Extract authenticated DID from context (set by JWT middleware)
+
// Defense-in-depth: verify service layer receives correct DID even if handler is bypassed
+
authenticatedDID := middleware.GetAuthenticatedDID(ctx)
+
if authenticatedDID == "" {
+
return nil, fmt.Errorf("no authenticated DID in context - authentication required")
+
}
+
+
// SECURITY: Verify request DID matches authenticated DID from JWT
+
// This prevents DID spoofing where a malicious client or compromised handler
+
// could provide a different DID than what was authenticated
+
if authenticatedDID != req.AuthorDID {
+
log.Printf("[SECURITY] DID mismatch: authenticated=%s, request=%s", authenticatedDID, req.AuthorDID)
+
return nil, fmt.Errorf("authenticated DID does not match author DID")
+
}
+
+
// 2. Validate basic input
if err := s.validateCreateRequest(req); err != nil {
return nil, err
}
+
// 3. SECURITY: Check if the authenticated DID is a registered aggregator
+
// This is server-side verification - we query the database to confirm
+
// the DID from the JWT corresponds to a registered aggregator service
+
// If aggregatorService is nil (tests or environments without aggregators), treat all posts as user posts
+
isAggregator := false
+
if s.aggregatorService != nil {
+
var err error
+
isAggregator, err = s.aggregatorService.IsAggregator(ctx, req.AuthorDID)
+
if err != nil {
+
return nil, fmt.Errorf("failed to check if author is aggregator: %w", err)
+
}
+
}
+
+
// 4. Resolve community at-identifier (handle or DID) to DID
// This accepts both formats per atProto best practices:
// - Handles: !gardening.communities.coves.social
// - DIDs: did:plc:abc123 or did:web:coves.social
···
return nil, fmt.Errorf("failed to resolve community identifier: %w", err)
}
+
// 5. Fetch community from AppView (includes all metadata)
community, err := s.communityService.GetByDID(ctx, communityDID)
if err != nil {
if communities.IsNotFound(err) {
···
return nil, fmt.Errorf("failed to fetch community: %w", err)
}
+
// 6. Apply validation based on actor type (aggregator vs user)
+
if isAggregator {
+
// AGGREGATOR VALIDATION FLOW
+
// Following Bluesky's pattern: feed generators and labelers are authorized services
+
log.Printf("[POST-CREATE] Aggregator detected: %s posting to community: %s", req.AuthorDID, communityDID)
+
+
// Check authorization exists and is enabled, and verify rate limits
+
if err := s.aggregatorService.ValidateAggregatorPost(ctx, req.AuthorDID, communityDID); err != nil {
+
if aggregators.IsUnauthorized(err) {
+
return nil, ErrNotAuthorized
+
}
+
if aggregators.IsRateLimited(err) {
+
return nil, ErrRateLimitExceeded
+
}
+
return nil, fmt.Errorf("aggregator validation failed: %w", err)
+
}
+
+
// Aggregators skip membership checks and visibility restrictions
+
// They are authorized services, not community members
+
} else {
+
// USER VALIDATION FLOW
+
// Check community visibility (Alpha: public/unlisted only)
+
// Beta will add membership checks for private communities
+
if community.Visibility == "private" {
+
return nil, ErrNotAuthorized
+
}
}
+
// 7. Ensure community has fresh PDS credentials (token refresh if needed)
community, err = s.communityService.EnsureFreshToken(ctx, community)
if err != nil {
return nil, fmt.Errorf("failed to refresh community credentials: %w", err)
}
+
// 8. Build post record for PDS
postRecord := PostRecord{
Type: "social.coves.post.record",
Community: communityDID,
···
CreatedAt: time.Now().UTC().Format(time.RFC3339),
}
+
// 9. Write to community's PDS repository
uri, cid, err := s.createPostOnPDS(ctx, community, postRecord)
if err != nil {
return nil, fmt.Errorf("failed to write post to PDS: %w", err)
}
+
// 10. If aggregator, record post for rate limiting and statistics
+
if isAggregator && s.aggregatorService != nil {
+
if err := s.aggregatorService.RecordAggregatorPost(ctx, req.AuthorDID, communityDID, uri, cid); err != nil {
+
// Log error but don't fail the request (post was already created on PDS)
+
log.Printf("[POST-CREATE] Warning: failed to record aggregator post for rate limiting: %v", err)
+
}
+
}
+
+
// 11. Return response (AppView will index via Jetstream consumer)
+
log.Printf("[POST-CREATE] Author: %s (aggregator=%v), Community: %s, URI: %s",
+
req.AuthorDID, isAggregator, communityDID, uri)
return &CreatePostResponse{
URI: uri,