A community based topic aggregation platform built on atproto
1package aggregators
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "time"
8
9 "Coves/internal/core/communities"
10
11 "github.com/xeipuuv/gojsonschema"
12)
13
14// Rate limit constants
15const (
16 RateLimitWindow = 1 * time.Hour // Rolling 1-hour window for rate limit enforcement
17 RateLimitMaxPosts = 10 // Conservative limit for alpha: 10 posts/hour per community (prevents spam while allowing real-time updates)
18 DefaultQueryLimit = 50 // Balance between UX (reasonable page size) and server load
19 MaxQueryLimit = 100 // Prevent abuse while allowing batch operations (e.g., fetching multiple aggregators at once)
20)
21
22type aggregatorService struct {
23 repo Repository
24 communityService communities.Service
25}
26
27// NewAggregatorService creates a new aggregator service
28func NewAggregatorService(repo Repository, communityService communities.Service) Service {
29 return &aggregatorService{
30 repo: repo,
31 communityService: communityService,
32 }
33}
34
35// ===== Query Operations (Read from AppView) =====
36
37// GetAggregator retrieves a single aggregator by DID
38func (s *aggregatorService) GetAggregator(ctx context.Context, did string) (*Aggregator, error) {
39 if did == "" {
40 return nil, NewValidationError("did", "DID is required")
41 }
42
43 return s.repo.GetAggregator(ctx, did)
44}
45
46// GetAggregators retrieves multiple aggregators by DIDs
47func (s *aggregatorService) GetAggregators(ctx context.Context, dids []string) ([]*Aggregator, error) {
48 if len(dids) == 0 {
49 return []*Aggregator{}, nil
50 }
51
52 if len(dids) > MaxQueryLimit {
53 return nil, NewValidationError("dids", fmt.Sprintf("maximum %d DIDs allowed", MaxQueryLimit))
54 }
55
56 // Use bulk fetch to avoid N+1 queries
57 return s.repo.GetAggregatorsByDIDs(ctx, dids)
58}
59
60// ListAggregators retrieves all aggregators with pagination
61func (s *aggregatorService) ListAggregators(ctx context.Context, limit, offset int) ([]*Aggregator, error) {
62 // Apply defaults and limits
63 if limit <= 0 {
64 limit = DefaultQueryLimit
65 }
66 if limit > MaxQueryLimit {
67 limit = MaxQueryLimit
68 }
69 if offset < 0 {
70 offset = 0
71 }
72
73 return s.repo.ListAggregators(ctx, limit, offset)
74}
75
76// GetAuthorizationsForAggregator retrieves all communities that authorized an aggregator
77func (s *aggregatorService) GetAuthorizationsForAggregator(ctx context.Context, req GetAuthorizationsRequest) ([]*Authorization, error) {
78 if req.AggregatorDID == "" {
79 return nil, NewValidationError("aggregatorDid", "aggregator DID is required")
80 }
81
82 // Apply defaults and limits
83 if req.Limit <= 0 {
84 req.Limit = DefaultQueryLimit
85 }
86 if req.Limit > MaxQueryLimit {
87 req.Limit = MaxQueryLimit
88 }
89 if req.Offset < 0 {
90 req.Offset = 0
91 }
92
93 return s.repo.ListAuthorizationsForAggregator(ctx, req.AggregatorDID, req.EnabledOnly, req.Limit, req.Offset)
94}
95
96// ListAggregatorsForCommunity retrieves all aggregators authorized by a community
97func (s *aggregatorService) ListAggregatorsForCommunity(ctx context.Context, req ListForCommunityRequest) ([]*Authorization, error) {
98 if req.CommunityDID == "" {
99 return nil, NewValidationError("communityDid", "community DID is required")
100 }
101
102 // Apply defaults and limits
103 if req.Limit <= 0 {
104 req.Limit = DefaultQueryLimit
105 }
106 if req.Limit > MaxQueryLimit {
107 req.Limit = MaxQueryLimit
108 }
109 if req.Offset < 0 {
110 req.Offset = 0
111 }
112
113 return s.repo.ListAuthorizationsForCommunity(ctx, req.CommunityDID, req.EnabledOnly, req.Limit, req.Offset)
114}
115
116// ===== Authorization Management (Write-forward to PDS) =====
117
118// EnableAggregator creates an authorization record for an aggregator in a community
119// Following Bluesky's pattern: similar to enabling a labeler or feed generator
120// Note: This is a PLACEHOLDER for the write-forward implementation
121// TODO: Implement actual XRPC write to community's PDS repository
122func (s *aggregatorService) EnableAggregator(ctx context.Context, req EnableAggregatorRequest) (*Authorization, error) {
123 // Validate request
124 if err := s.validateEnableRequest(ctx, req); err != nil {
125 return nil, err
126 }
127
128 // Verify aggregator exists
129 aggregator, err := s.repo.GetAggregator(ctx, req.AggregatorDID)
130 if err != nil {
131 return nil, err
132 }
133
134 // Validate config against aggregator's schema if provided
135 if len(req.Config) > 0 && len(aggregator.ConfigSchema) > 0 {
136 if err := s.validateConfig(req.Config, aggregator.ConfigSchema); err != nil {
137 return nil, err
138 }
139 }
140
141 // Check if already authorized
142 existing, err := s.repo.GetAuthorization(ctx, req.AggregatorDID, req.CommunityDID)
143 if err == nil && existing.Enabled {
144 return nil, ErrAlreadyAuthorized
145 }
146
147 // TODO Phase 2: Write-forward to PDS
148 // For now, return placeholder response
149 // The actual implementation will:
150 // 1. Create authorization record in community's repository on PDS
151 // 2. Wait for Jetstream to index it
152 // 3. Return the indexed authorization
153 //
154 // Record structure:
155 // at://community_did/social.coves.aggregator.authorization/{rkey}
156 // {
157 // "$type": "social.coves.aggregator.authorization",
158 // "aggregator": req.AggregatorDID,
159 // "enabled": true,
160 // "config": req.Config,
161 // "createdBy": req.EnabledByDID,
162 // "createdAt": "2025-10-20T12:00:00Z"
163 // }
164
165 return nil, ErrNotImplemented
166}
167
168// DisableAggregator updates an authorization to disabled
169// Note: This is a PLACEHOLDER for the write-forward implementation
170func (s *aggregatorService) DisableAggregator(ctx context.Context, req DisableAggregatorRequest) (*Authorization, error) {
171 // Validate request
172 if err := s.validateDisableRequest(ctx, req); err != nil {
173 return nil, err
174 }
175
176 // Verify authorization exists
177 auth, err := s.repo.GetAuthorization(ctx, req.AggregatorDID, req.CommunityDID)
178 if err != nil {
179 return nil, err
180 }
181
182 if !auth.Enabled {
183 // Already disabled
184 return auth, nil
185 }
186
187 // TODO Phase 2: Write-forward to PDS
188 // Update the authorization record with enabled=false
189 return nil, ErrNotImplemented
190}
191
192// UpdateAggregatorConfig updates an aggregator's configuration
193// Note: This is a PLACEHOLDER for the write-forward implementation
194func (s *aggregatorService) UpdateAggregatorConfig(ctx context.Context, req UpdateConfigRequest) (*Authorization, error) {
195 // Validate request
196 if err := s.validateUpdateConfigRequest(ctx, req); err != nil {
197 return nil, err
198 }
199
200 // Verify authorization exists
201 auth, err := s.repo.GetAuthorization(ctx, req.AggregatorDID, req.CommunityDID)
202 if err != nil {
203 return nil, err
204 }
205
206 // Get aggregator for schema validation
207 aggregator, err := s.repo.GetAggregator(ctx, req.AggregatorDID)
208 if err != nil {
209 return nil, err
210 }
211
212 // Validate new config against schema
213 if len(req.Config) > 0 && len(aggregator.ConfigSchema) > 0 {
214 if err := s.validateConfig(req.Config, aggregator.ConfigSchema); err != nil {
215 return nil, err
216 }
217 }
218
219 // TODO Phase 2: Write-forward to PDS
220 // Update the authorization record with new config
221 return auth, ErrNotImplemented
222}
223
224// ===== Validation and Authorization Checks =====
225
226// ValidateAggregatorPost validates that an aggregator can post to a community
227// Checks: 1) Authorization exists and is enabled, 2) Rate limit not exceeded
228// This is called by the post creation handler BEFORE writing to PDS
229func (s *aggregatorService) ValidateAggregatorPost(ctx context.Context, aggregatorDID, communityDID string) error {
230 // Check authorization exists and is enabled
231 authorized, err := s.repo.IsAuthorized(ctx, aggregatorDID, communityDID)
232 if err != nil {
233 return fmt.Errorf("failed to check authorization: %w", err)
234 }
235 if !authorized {
236 return ErrNotAuthorized
237 }
238
239 // Check rate limit (10 posts per hour per community)
240 since := time.Now().Add(-RateLimitWindow)
241 recentPostCount, err := s.repo.CountRecentPosts(ctx, aggregatorDID, communityDID, since)
242 if err != nil {
243 return fmt.Errorf("failed to check rate limit: %w", err)
244 }
245
246 if recentPostCount >= RateLimitMaxPosts {
247 return ErrRateLimitExceeded
248 }
249
250 return nil
251}
252
253// IsAggregator checks if a DID is a registered aggregator
254// Fast check used by post creation handler
255func (s *aggregatorService) IsAggregator(ctx context.Context, did string) (bool, error) {
256 if did == "" {
257 return false, nil
258 }
259 return s.repo.IsAggregator(ctx, did)
260}
261
262// RecordAggregatorPost tracks a post created by an aggregator
263// Called AFTER successful post creation to update statistics and rate limiting
264func (s *aggregatorService) RecordAggregatorPost(ctx context.Context, aggregatorDID, communityDID, postURI, postCID string) error {
265 if aggregatorDID == "" || communityDID == "" || postURI == "" || postCID == "" {
266 return NewValidationError("post_tracking", "aggregatorDID, communityDID, postURI, and postCID are required")
267 }
268
269 return s.repo.RecordAggregatorPost(ctx, aggregatorDID, communityDID, postURI, postCID)
270}
271
272// ===== Validation Helpers =====
273
274func (s *aggregatorService) validateEnableRequest(ctx context.Context, req EnableAggregatorRequest) error {
275 if req.AggregatorDID == "" {
276 return NewValidationError("aggregatorDid", "aggregator DID is required")
277 }
278 if req.CommunityDID == "" {
279 return NewValidationError("communityDid", "community DID is required")
280 }
281 if req.EnabledByDID == "" {
282 return NewValidationError("enabledByDid", "enabledByDID is required")
283 }
284
285 // Verify user is a moderator of the community
286 // TODO: Implement moderator check
287 // membership, err := s.communityService.GetMembership(ctx, req.EnabledByDID, req.CommunityDID)
288 // if err != nil || !membership.IsModerator {
289 // return ErrNotModerator
290 // }
291
292 return nil
293}
294
295func (s *aggregatorService) validateDisableRequest(ctx context.Context, req DisableAggregatorRequest) error {
296 if req.AggregatorDID == "" {
297 return NewValidationError("aggregatorDid", "aggregator DID is required")
298 }
299 if req.CommunityDID == "" {
300 return NewValidationError("communityDid", "community DID is required")
301 }
302 if req.DisabledByDID == "" {
303 return NewValidationError("disabledByDid", "disabledByDID is required")
304 }
305
306 // Verify user is a moderator of the community
307 // TODO: Implement moderator check
308
309 return nil
310}
311
312func (s *aggregatorService) validateUpdateConfigRequest(ctx context.Context, req UpdateConfigRequest) error {
313 if req.AggregatorDID == "" {
314 return NewValidationError("aggregatorDid", "aggregator DID is required")
315 }
316 if req.CommunityDID == "" {
317 return NewValidationError("communityDid", "community DID is required")
318 }
319 if req.UpdatedByDID == "" {
320 return NewValidationError("updatedByDid", "updatedByDID is required")
321 }
322 if len(req.Config) == 0 {
323 return NewValidationError("config", "config is required")
324 }
325
326 // Verify user is a moderator of the community
327 // TODO: Implement moderator check
328
329 return nil
330}
331
332// validateConfig validates a config object against a JSON Schema
333// Following Bluesky's pattern for feed generator configuration
334func (s *aggregatorService) validateConfig(config map[string]interface{}, schemaBytes []byte) error {
335 // Parse schema
336 schemaLoader := gojsonschema.NewBytesLoader(schemaBytes)
337
338 // Convert config to JSON bytes
339 configBytes, err := json.Marshal(config)
340 if err != nil {
341 return fmt.Errorf("failed to marshal config: %w", err)
342 }
343 configLoader := gojsonschema.NewBytesLoader(configBytes)
344
345 // Validate
346 result, err := gojsonschema.Validate(schemaLoader, configLoader)
347 if err != nil {
348 return fmt.Errorf("failed to validate config: %w", err)
349 }
350
351 if !result.Valid() {
352 // Collect validation errors
353 var errorMessages []string
354 for _, desc := range result.Errors() {
355 errorMessages = append(errorMessages, desc.String())
356 }
357 return fmt.Errorf("%w: %s", ErrConfigSchemaValidation, errorMessages)
358 }
359
360 return nil
361}