A community based topic aggregation platform built on atproto
1package aggregators 2 3import ( 4 "Coves/internal/core/communities" 5 "context" 6 "encoding/json" 7 "fmt" 8 "time" 9 10 "github.com/xeipuuv/gojsonschema" 11) 12 13// Rate limit constants 14const ( 15 RateLimitWindow = 1 * time.Hour // Rolling 1-hour window for rate limit enforcement 16 RateLimitMaxPosts = 10 // Conservative limit for alpha: 10 posts/hour per community (prevents spam while allowing real-time updates) 17 DefaultQueryLimit = 50 // Balance between UX (reasonable page size) and server load 18 MaxQueryLimit = 100 // Prevent abuse while allowing batch operations (e.g., fetching multiple aggregators at once) 19) 20 21type aggregatorService struct { 22 repo Repository 23 communityService communities.Service 24} 25 26// NewAggregatorService creates a new aggregator service 27func NewAggregatorService(repo Repository, communityService communities.Service) Service { 28 return &aggregatorService{ 29 repo: repo, 30 communityService: communityService, 31 } 32} 33 34// ===== Query Operations (Read from AppView) ===== 35 36// GetAggregator retrieves a single aggregator by DID 37func (s *aggregatorService) GetAggregator(ctx context.Context, did string) (*Aggregator, error) { 38 if did == "" { 39 return nil, NewValidationError("did", "DID is required") 40 } 41 42 return s.repo.GetAggregator(ctx, did) 43} 44 45// GetAggregators retrieves multiple aggregators by DIDs 46func (s *aggregatorService) GetAggregators(ctx context.Context, dids []string) ([]*Aggregator, error) { 47 if len(dids) == 0 { 48 return []*Aggregator{}, nil 49 } 50 51 if len(dids) > MaxQueryLimit { 52 return nil, NewValidationError("dids", fmt.Sprintf("maximum %d DIDs allowed", MaxQueryLimit)) 53 } 54 55 // Use bulk fetch to avoid N+1 queries 56 return s.repo.GetAggregatorsByDIDs(ctx, dids) 57} 58 59// ListAggregators retrieves all aggregators with pagination 60func (s *aggregatorService) ListAggregators(ctx context.Context, limit, offset int) ([]*Aggregator, error) { 61 // Apply defaults and limits 62 if limit <= 0 { 63 limit = DefaultQueryLimit 64 } 65 if limit > MaxQueryLimit { 66 limit = MaxQueryLimit 67 } 68 if offset < 0 { 69 offset = 0 70 } 71 72 return s.repo.ListAggregators(ctx, limit, offset) 73} 74 75// GetAuthorizationsForAggregator retrieves all communities that authorized an aggregator 76func (s *aggregatorService) GetAuthorizationsForAggregator(ctx context.Context, req GetAuthorizationsRequest) ([]*Authorization, error) { 77 if req.AggregatorDID == "" { 78 return nil, NewValidationError("aggregatorDid", "aggregator DID is required") 79 } 80 81 // Apply defaults and limits 82 if req.Limit <= 0 { 83 req.Limit = DefaultQueryLimit 84 } 85 if req.Limit > MaxQueryLimit { 86 req.Limit = MaxQueryLimit 87 } 88 if req.Offset < 0 { 89 req.Offset = 0 90 } 91 92 return s.repo.ListAuthorizationsForAggregator(ctx, req.AggregatorDID, req.EnabledOnly, req.Limit, req.Offset) 93} 94 95// ListAggregatorsForCommunity retrieves all aggregators authorized by a community 96func (s *aggregatorService) ListAggregatorsForCommunity(ctx context.Context, req ListForCommunityRequest) ([]*Authorization, error) { 97 if req.CommunityDID == "" { 98 return nil, NewValidationError("communityDid", "community DID is required") 99 } 100 101 // Apply defaults and limits 102 if req.Limit <= 0 { 103 req.Limit = DefaultQueryLimit 104 } 105 if req.Limit > MaxQueryLimit { 106 req.Limit = MaxQueryLimit 107 } 108 if req.Offset < 0 { 109 req.Offset = 0 110 } 111 112 return s.repo.ListAuthorizationsForCommunity(ctx, req.CommunityDID, req.EnabledOnly, req.Limit, req.Offset) 113} 114 115// ===== Authorization Management (Write-forward to PDS) ===== 116 117// EnableAggregator creates an authorization record for an aggregator in a community 118// Following Bluesky's pattern: similar to enabling a labeler or feed generator 119// Note: This is a PLACEHOLDER for the write-forward implementation 120// TODO: Implement actual XRPC write to community's PDS repository 121func (s *aggregatorService) EnableAggregator(ctx context.Context, req EnableAggregatorRequest) (*Authorization, error) { 122 // Validate request 123 if err := s.validateEnableRequest(ctx, req); err != nil { 124 return nil, err 125 } 126 127 // Verify aggregator exists 128 aggregator, err := s.repo.GetAggregator(ctx, req.AggregatorDID) 129 if err != nil { 130 return nil, err 131 } 132 133 // Validate config against aggregator's schema if provided 134 if len(req.Config) > 0 && len(aggregator.ConfigSchema) > 0 { 135 if err := s.validateConfig(req.Config, aggregator.ConfigSchema); err != nil { 136 return nil, err 137 } 138 } 139 140 // Check if already authorized 141 existing, err := s.repo.GetAuthorization(ctx, req.AggregatorDID, req.CommunityDID) 142 if err == nil && existing.Enabled { 143 return nil, ErrAlreadyAuthorized 144 } 145 146 // TODO Phase 2: Write-forward to PDS 147 // For now, return placeholder response 148 // The actual implementation will: 149 // 1. Create authorization record in community's repository on PDS 150 // 2. Wait for Jetstream to index it 151 // 3. Return the indexed authorization 152 // 153 // Record structure: 154 // at://community_did/social.coves.aggregator.authorization/{rkey} 155 // { 156 // "$type": "social.coves.aggregator.authorization", 157 // "aggregator": req.AggregatorDID, 158 // "enabled": true, 159 // "config": req.Config, 160 // "createdBy": req.EnabledByDID, 161 // "createdAt": "2025-10-20T12:00:00Z" 162 // } 163 164 return nil, ErrNotImplemented 165} 166 167// DisableAggregator updates an authorization to disabled 168// Note: This is a PLACEHOLDER for the write-forward implementation 169func (s *aggregatorService) DisableAggregator(ctx context.Context, req DisableAggregatorRequest) (*Authorization, error) { 170 // Validate request 171 if err := s.validateDisableRequest(ctx, req); err != nil { 172 return nil, err 173 } 174 175 // Verify authorization exists 176 auth, err := s.repo.GetAuthorization(ctx, req.AggregatorDID, req.CommunityDID) 177 if err != nil { 178 return nil, err 179 } 180 181 if !auth.Enabled { 182 // Already disabled 183 return auth, nil 184 } 185 186 // TODO Phase 2: Write-forward to PDS 187 // Update the authorization record with enabled=false 188 return nil, ErrNotImplemented 189} 190 191// UpdateAggregatorConfig updates an aggregator's configuration 192// Note: This is a PLACEHOLDER for the write-forward implementation 193func (s *aggregatorService) UpdateAggregatorConfig(ctx context.Context, req UpdateConfigRequest) (*Authorization, error) { 194 // Validate request 195 if err := s.validateUpdateConfigRequest(ctx, req); err != nil { 196 return nil, err 197 } 198 199 // Verify authorization exists 200 auth, err := s.repo.GetAuthorization(ctx, req.AggregatorDID, req.CommunityDID) 201 if err != nil { 202 return nil, err 203 } 204 205 // Get aggregator for schema validation 206 aggregator, err := s.repo.GetAggregator(ctx, req.AggregatorDID) 207 if err != nil { 208 return nil, err 209 } 210 211 // Validate new config against schema 212 if len(req.Config) > 0 && len(aggregator.ConfigSchema) > 0 { 213 if err := s.validateConfig(req.Config, aggregator.ConfigSchema); err != nil { 214 return nil, err 215 } 216 } 217 218 // TODO Phase 2: Write-forward to PDS 219 // Update the authorization record with new config 220 return auth, ErrNotImplemented 221} 222 223// ===== Validation and Authorization Checks ===== 224 225// ValidateAggregatorPost validates that an aggregator can post to a community 226// Checks: 1) Authorization exists and is enabled, 2) Rate limit not exceeded 227// This is called by the post creation handler BEFORE writing to PDS 228func (s *aggregatorService) ValidateAggregatorPost(ctx context.Context, aggregatorDID, communityDID string) error { 229 // Check authorization exists and is enabled 230 authorized, err := s.repo.IsAuthorized(ctx, aggregatorDID, communityDID) 231 if err != nil { 232 return fmt.Errorf("failed to check authorization: %w", err) 233 } 234 if !authorized { 235 return ErrNotAuthorized 236 } 237 238 // Check rate limit (10 posts per hour per community) 239 since := time.Now().Add(-RateLimitWindow) 240 recentPostCount, err := s.repo.CountRecentPosts(ctx, aggregatorDID, communityDID, since) 241 if err != nil { 242 return fmt.Errorf("failed to check rate limit: %w", err) 243 } 244 245 if recentPostCount >= RateLimitMaxPosts { 246 return ErrRateLimitExceeded 247 } 248 249 return nil 250} 251 252// IsAggregator checks if a DID is a registered aggregator 253// Fast check used by post creation handler 254func (s *aggregatorService) IsAggregator(ctx context.Context, did string) (bool, error) { 255 if did == "" { 256 return false, nil 257 } 258 return s.repo.IsAggregator(ctx, did) 259} 260 261// RecordAggregatorPost tracks a post created by an aggregator 262// Called AFTER successful post creation to update statistics and rate limiting 263func (s *aggregatorService) RecordAggregatorPost(ctx context.Context, aggregatorDID, communityDID, postURI, postCID string) error { 264 if aggregatorDID == "" || communityDID == "" || postURI == "" || postCID == "" { 265 return NewValidationError("post_tracking", "aggregatorDID, communityDID, postURI, and postCID are required") 266 } 267 268 return s.repo.RecordAggregatorPost(ctx, aggregatorDID, communityDID, postURI, postCID) 269} 270 271// ===== Validation Helpers ===== 272 273func (s *aggregatorService) validateEnableRequest(ctx context.Context, req EnableAggregatorRequest) error { 274 if req.AggregatorDID == "" { 275 return NewValidationError("aggregatorDid", "aggregator DID is required") 276 } 277 if req.CommunityDID == "" { 278 return NewValidationError("communityDid", "community DID is required") 279 } 280 if req.EnabledByDID == "" { 281 return NewValidationError("enabledByDid", "enabledByDID is required") 282 } 283 284 // Verify user is a moderator of the community 285 // TODO: Implement moderator check 286 // membership, err := s.communityService.GetMembership(ctx, req.EnabledByDID, req.CommunityDID) 287 // if err != nil || !membership.IsModerator { 288 // return ErrNotModerator 289 // } 290 291 return nil 292} 293 294func (s *aggregatorService) validateDisableRequest(ctx context.Context, req DisableAggregatorRequest) error { 295 if req.AggregatorDID == "" { 296 return NewValidationError("aggregatorDid", "aggregator DID is required") 297 } 298 if req.CommunityDID == "" { 299 return NewValidationError("communityDid", "community DID is required") 300 } 301 if req.DisabledByDID == "" { 302 return NewValidationError("disabledByDid", "disabledByDID is required") 303 } 304 305 // Verify user is a moderator of the community 306 // TODO: Implement moderator check 307 308 return nil 309} 310 311func (s *aggregatorService) validateUpdateConfigRequest(ctx context.Context, req UpdateConfigRequest) error { 312 if req.AggregatorDID == "" { 313 return NewValidationError("aggregatorDid", "aggregator DID is required") 314 } 315 if req.CommunityDID == "" { 316 return NewValidationError("communityDid", "community DID is required") 317 } 318 if req.UpdatedByDID == "" { 319 return NewValidationError("updatedByDid", "updatedByDID is required") 320 } 321 if len(req.Config) == 0 { 322 return NewValidationError("config", "config is required") 323 } 324 325 // Verify user is a moderator of the community 326 // TODO: Implement moderator check 327 328 return nil 329} 330 331// validateConfig validates a config object against a JSON Schema 332// Following Bluesky's pattern for feed generator configuration 333func (s *aggregatorService) validateConfig(config map[string]interface{}, schemaBytes []byte) error { 334 // Parse schema 335 schemaLoader := gojsonschema.NewBytesLoader(schemaBytes) 336 337 // Convert config to JSON bytes 338 configBytes, err := json.Marshal(config) 339 if err != nil { 340 return fmt.Errorf("failed to marshal config: %w", err) 341 } 342 configLoader := gojsonschema.NewBytesLoader(configBytes) 343 344 // Validate 345 result, err := gojsonschema.Validate(schemaLoader, configLoader) 346 if err != nil { 347 return fmt.Errorf("failed to validate config: %w", err) 348 } 349 350 if !result.Valid() { 351 // Collect validation errors 352 var errorMessages []string 353 for _, desc := range result.Errors() { 354 errorMessages = append(errorMessages, desc.String()) 355 } 356 return fmt.Errorf("%w: %s", ErrConfigSchemaValidation, errorMessages) 357 } 358 359 return nil 360}