A community based topic aggregation platform built on atproto
1package aggregators 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "time" 8 9 "Coves/internal/core/communities" 10 11 "github.com/xeipuuv/gojsonschema" 12) 13 14// Rate limit constants 15const ( 16 RateLimitWindow = 1 * time.Hour // Rolling 1-hour window for rate limit enforcement 17 RateLimitMaxPosts = 10 // Conservative limit for alpha: 10 posts/hour per community (prevents spam while allowing real-time updates) 18 DefaultQueryLimit = 50 // Balance between UX (reasonable page size) and server load 19 MaxQueryLimit = 100 // Prevent abuse while allowing batch operations (e.g., fetching multiple aggregators at once) 20) 21 22type aggregatorService struct { 23 repo Repository 24 communityService communities.Service 25} 26 27// NewAggregatorService creates a new aggregator service 28func NewAggregatorService(repo Repository, communityService communities.Service) Service { 29 return &aggregatorService{ 30 repo: repo, 31 communityService: communityService, 32 } 33} 34 35// ===== Query Operations (Read from AppView) ===== 36 37// GetAggregator retrieves a single aggregator by DID 38func (s *aggregatorService) GetAggregator(ctx context.Context, did string) (*Aggregator, error) { 39 if did == "" { 40 return nil, NewValidationError("did", "DID is required") 41 } 42 43 return s.repo.GetAggregator(ctx, did) 44} 45 46// GetAggregators retrieves multiple aggregators by DIDs 47func (s *aggregatorService) GetAggregators(ctx context.Context, dids []string) ([]*Aggregator, error) { 48 if len(dids) == 0 { 49 return []*Aggregator{}, nil 50 } 51 52 if len(dids) > MaxQueryLimit { 53 return nil, NewValidationError("dids", fmt.Sprintf("maximum %d DIDs allowed", MaxQueryLimit)) 54 } 55 56 // Use bulk fetch to avoid N+1 queries 57 return s.repo.GetAggregatorsByDIDs(ctx, dids) 58} 59 60// ListAggregators retrieves all aggregators with pagination 61func (s *aggregatorService) ListAggregators(ctx context.Context, limit, offset int) ([]*Aggregator, error) { 62 // Apply defaults and limits 63 if limit <= 0 { 64 limit = DefaultQueryLimit 65 } 66 if limit > MaxQueryLimit { 67 limit = MaxQueryLimit 68 } 69 if offset < 0 { 70 offset = 0 71 } 72 73 return s.repo.ListAggregators(ctx, limit, offset) 74} 75 76// GetAuthorizationsForAggregator retrieves all communities that authorized an aggregator 77func (s *aggregatorService) GetAuthorizationsForAggregator(ctx context.Context, req GetAuthorizationsRequest) ([]*Authorization, error) { 78 if req.AggregatorDID == "" { 79 return nil, NewValidationError("aggregatorDid", "aggregator DID is required") 80 } 81 82 // Apply defaults and limits 83 if req.Limit <= 0 { 84 req.Limit = DefaultQueryLimit 85 } 86 if req.Limit > MaxQueryLimit { 87 req.Limit = MaxQueryLimit 88 } 89 if req.Offset < 0 { 90 req.Offset = 0 91 } 92 93 return s.repo.ListAuthorizationsForAggregator(ctx, req.AggregatorDID, req.EnabledOnly, req.Limit, req.Offset) 94} 95 96// ListAggregatorsForCommunity retrieves all aggregators authorized by a community 97func (s *aggregatorService) ListAggregatorsForCommunity(ctx context.Context, req ListForCommunityRequest) ([]*Authorization, error) { 98 if req.CommunityDID == "" { 99 return nil, NewValidationError("communityDid", "community DID is required") 100 } 101 102 // Apply defaults and limits 103 if req.Limit <= 0 { 104 req.Limit = DefaultQueryLimit 105 } 106 if req.Limit > MaxQueryLimit { 107 req.Limit = MaxQueryLimit 108 } 109 if req.Offset < 0 { 110 req.Offset = 0 111 } 112 113 return s.repo.ListAuthorizationsForCommunity(ctx, req.CommunityDID, req.EnabledOnly, req.Limit, req.Offset) 114} 115 116// ===== Authorization Management (Write-forward to PDS) ===== 117 118// EnableAggregator creates an authorization record for an aggregator in a community 119// Following Bluesky's pattern: similar to enabling a labeler or feed generator 120// Note: This is a PLACEHOLDER for the write-forward implementation 121// TODO: Implement actual XRPC write to community's PDS repository 122func (s *aggregatorService) EnableAggregator(ctx context.Context, req EnableAggregatorRequest) (*Authorization, error) { 123 // Validate request 124 if err := s.validateEnableRequest(ctx, req); err != nil { 125 return nil, err 126 } 127 128 // Verify aggregator exists 129 aggregator, err := s.repo.GetAggregator(ctx, req.AggregatorDID) 130 if err != nil { 131 return nil, err 132 } 133 134 // Validate config against aggregator's schema if provided 135 if len(req.Config) > 0 && len(aggregator.ConfigSchema) > 0 { 136 if err := s.validateConfig(req.Config, aggregator.ConfigSchema); err != nil { 137 return nil, err 138 } 139 } 140 141 // Check if already authorized 142 existing, err := s.repo.GetAuthorization(ctx, req.AggregatorDID, req.CommunityDID) 143 if err == nil && existing.Enabled { 144 return nil, ErrAlreadyAuthorized 145 } 146 147 // TODO Phase 2: Write-forward to PDS 148 // For now, return placeholder response 149 // The actual implementation will: 150 // 1. Create authorization record in community's repository on PDS 151 // 2. Wait for Jetstream to index it 152 // 3. Return the indexed authorization 153 // 154 // Record structure: 155 // at://community_did/social.coves.aggregator.authorization/{rkey} 156 // { 157 // "$type": "social.coves.aggregator.authorization", 158 // "aggregator": req.AggregatorDID, 159 // "enabled": true, 160 // "config": req.Config, 161 // "createdBy": req.EnabledByDID, 162 // "createdAt": "2025-10-20T12:00:00Z" 163 // } 164 165 return nil, ErrNotImplemented 166} 167 168// DisableAggregator updates an authorization to disabled 169// Note: This is a PLACEHOLDER for the write-forward implementation 170func (s *aggregatorService) DisableAggregator(ctx context.Context, req DisableAggregatorRequest) (*Authorization, error) { 171 // Validate request 172 if err := s.validateDisableRequest(ctx, req); err != nil { 173 return nil, err 174 } 175 176 // Verify authorization exists 177 auth, err := s.repo.GetAuthorization(ctx, req.AggregatorDID, req.CommunityDID) 178 if err != nil { 179 return nil, err 180 } 181 182 if !auth.Enabled { 183 // Already disabled 184 return auth, nil 185 } 186 187 // TODO Phase 2: Write-forward to PDS 188 // Update the authorization record with enabled=false 189 return nil, ErrNotImplemented 190} 191 192// UpdateAggregatorConfig updates an aggregator's configuration 193// Note: This is a PLACEHOLDER for the write-forward implementation 194func (s *aggregatorService) UpdateAggregatorConfig(ctx context.Context, req UpdateConfigRequest) (*Authorization, error) { 195 // Validate request 196 if err := s.validateUpdateConfigRequest(ctx, req); err != nil { 197 return nil, err 198 } 199 200 // Verify authorization exists 201 auth, err := s.repo.GetAuthorization(ctx, req.AggregatorDID, req.CommunityDID) 202 if err != nil { 203 return nil, err 204 } 205 206 // Get aggregator for schema validation 207 aggregator, err := s.repo.GetAggregator(ctx, req.AggregatorDID) 208 if err != nil { 209 return nil, err 210 } 211 212 // Validate new config against schema 213 if len(req.Config) > 0 && len(aggregator.ConfigSchema) > 0 { 214 if err := s.validateConfig(req.Config, aggregator.ConfigSchema); err != nil { 215 return nil, err 216 } 217 } 218 219 // TODO Phase 2: Write-forward to PDS 220 // Update the authorization record with new config 221 return auth, ErrNotImplemented 222} 223 224// ===== Validation and Authorization Checks ===== 225 226// ValidateAggregatorPost validates that an aggregator can post to a community 227// Checks: 1) Authorization exists and is enabled, 2) Rate limit not exceeded 228// This is called by the post creation handler BEFORE writing to PDS 229func (s *aggregatorService) ValidateAggregatorPost(ctx context.Context, aggregatorDID, communityDID string) error { 230 // Check authorization exists and is enabled 231 authorized, err := s.repo.IsAuthorized(ctx, aggregatorDID, communityDID) 232 if err != nil { 233 return fmt.Errorf("failed to check authorization: %w", err) 234 } 235 if !authorized { 236 return ErrNotAuthorized 237 } 238 239 // Check rate limit (10 posts per hour per community) 240 since := time.Now().Add(-RateLimitWindow) 241 recentPostCount, err := s.repo.CountRecentPosts(ctx, aggregatorDID, communityDID, since) 242 if err != nil { 243 return fmt.Errorf("failed to check rate limit: %w", err) 244 } 245 246 if recentPostCount >= RateLimitMaxPosts { 247 return ErrRateLimitExceeded 248 } 249 250 return nil 251} 252 253// IsAggregator checks if a DID is a registered aggregator 254// Fast check used by post creation handler 255func (s *aggregatorService) IsAggregator(ctx context.Context, did string) (bool, error) { 256 if did == "" { 257 return false, nil 258 } 259 return s.repo.IsAggregator(ctx, did) 260} 261 262// RecordAggregatorPost tracks a post created by an aggregator 263// Called AFTER successful post creation to update statistics and rate limiting 264func (s *aggregatorService) RecordAggregatorPost(ctx context.Context, aggregatorDID, communityDID, postURI, postCID string) error { 265 if aggregatorDID == "" || communityDID == "" || postURI == "" || postCID == "" { 266 return NewValidationError("post_tracking", "aggregatorDID, communityDID, postURI, and postCID are required") 267 } 268 269 return s.repo.RecordAggregatorPost(ctx, aggregatorDID, communityDID, postURI, postCID) 270} 271 272// ===== Validation Helpers ===== 273 274func (s *aggregatorService) validateEnableRequest(ctx context.Context, req EnableAggregatorRequest) error { 275 if req.AggregatorDID == "" { 276 return NewValidationError("aggregatorDid", "aggregator DID is required") 277 } 278 if req.CommunityDID == "" { 279 return NewValidationError("communityDid", "community DID is required") 280 } 281 if req.EnabledByDID == "" { 282 return NewValidationError("enabledByDid", "enabledByDID is required") 283 } 284 285 // Verify user is a moderator of the community 286 // TODO: Implement moderator check 287 // membership, err := s.communityService.GetMembership(ctx, req.EnabledByDID, req.CommunityDID) 288 // if err != nil || !membership.IsModerator { 289 // return ErrNotModerator 290 // } 291 292 return nil 293} 294 295func (s *aggregatorService) validateDisableRequest(ctx context.Context, req DisableAggregatorRequest) error { 296 if req.AggregatorDID == "" { 297 return NewValidationError("aggregatorDid", "aggregator DID is required") 298 } 299 if req.CommunityDID == "" { 300 return NewValidationError("communityDid", "community DID is required") 301 } 302 if req.DisabledByDID == "" { 303 return NewValidationError("disabledByDid", "disabledByDID is required") 304 } 305 306 // Verify user is a moderator of the community 307 // TODO: Implement moderator check 308 309 return nil 310} 311 312func (s *aggregatorService) validateUpdateConfigRequest(ctx context.Context, req UpdateConfigRequest) error { 313 if req.AggregatorDID == "" { 314 return NewValidationError("aggregatorDid", "aggregator DID is required") 315 } 316 if req.CommunityDID == "" { 317 return NewValidationError("communityDid", "community DID is required") 318 } 319 if req.UpdatedByDID == "" { 320 return NewValidationError("updatedByDid", "updatedByDID is required") 321 } 322 if len(req.Config) == 0 { 323 return NewValidationError("config", "config is required") 324 } 325 326 // Verify user is a moderator of the community 327 // TODO: Implement moderator check 328 329 return nil 330} 331 332// validateConfig validates a config object against a JSON Schema 333// Following Bluesky's pattern for feed generator configuration 334func (s *aggregatorService) validateConfig(config map[string]interface{}, schemaBytes []byte) error { 335 // Parse schema 336 schemaLoader := gojsonschema.NewBytesLoader(schemaBytes) 337 338 // Convert config to JSON bytes 339 configBytes, err := json.Marshal(config) 340 if err != nil { 341 return fmt.Errorf("failed to marshal config: %w", err) 342 } 343 configLoader := gojsonschema.NewBytesLoader(configBytes) 344 345 // Validate 346 result, err := gojsonschema.Validate(schemaLoader, configLoader) 347 if err != nil { 348 return fmt.Errorf("failed to validate config: %w", err) 349 } 350 351 if !result.Valid() { 352 // Collect validation errors 353 var errorMessages []string 354 for _, desc := range result.Errors() { 355 errorMessages = append(errorMessages, desc.String()) 356 } 357 return fmt.Errorf("%w: %s", ErrConfigSchemaValidation, errorMessages) 358 } 359 360 return nil 361}