A community based topic aggregation platform built on atproto
1package aggregators
2
3import (
4 "Coves/internal/core/communities"
5 "context"
6 "encoding/json"
7 "fmt"
8 "time"
9
10 "github.com/xeipuuv/gojsonschema"
11)
12
13// Rate limit constants
14const (
15 RateLimitWindow = 1 * time.Hour // Rolling 1-hour window for rate limit enforcement
16 RateLimitMaxPosts = 10 // Conservative limit for alpha: 10 posts/hour per community (prevents spam while allowing real-time updates)
17 DefaultQueryLimit = 50 // Balance between UX (reasonable page size) and server load
18 MaxQueryLimit = 100 // Prevent abuse while allowing batch operations (e.g., fetching multiple aggregators at once)
19)
20
21type aggregatorService struct {
22 repo Repository
23 communityService communities.Service
24}
25
26// NewAggregatorService creates a new aggregator service
27func NewAggregatorService(repo Repository, communityService communities.Service) Service {
28 return &aggregatorService{
29 repo: repo,
30 communityService: communityService,
31 }
32}
33
34// ===== Query Operations (Read from AppView) =====
35
36// GetAggregator retrieves a single aggregator by DID
37func (s *aggregatorService) GetAggregator(ctx context.Context, did string) (*Aggregator, error) {
38 if did == "" {
39 return nil, NewValidationError("did", "DID is required")
40 }
41
42 return s.repo.GetAggregator(ctx, did)
43}
44
45// GetAggregators retrieves multiple aggregators by DIDs
46func (s *aggregatorService) GetAggregators(ctx context.Context, dids []string) ([]*Aggregator, error) {
47 if len(dids) == 0 {
48 return []*Aggregator{}, nil
49 }
50
51 if len(dids) > MaxQueryLimit {
52 return nil, NewValidationError("dids", fmt.Sprintf("maximum %d DIDs allowed", MaxQueryLimit))
53 }
54
55 // Use bulk fetch to avoid N+1 queries
56 return s.repo.GetAggregatorsByDIDs(ctx, dids)
57}
58
59// ListAggregators retrieves all aggregators with pagination
60func (s *aggregatorService) ListAggregators(ctx context.Context, limit, offset int) ([]*Aggregator, error) {
61 // Apply defaults and limits
62 if limit <= 0 {
63 limit = DefaultQueryLimit
64 }
65 if limit > MaxQueryLimit {
66 limit = MaxQueryLimit
67 }
68 if offset < 0 {
69 offset = 0
70 }
71
72 return s.repo.ListAggregators(ctx, limit, offset)
73}
74
75// GetAuthorizationsForAggregator retrieves all communities that authorized an aggregator
76func (s *aggregatorService) GetAuthorizationsForAggregator(ctx context.Context, req GetAuthorizationsRequest) ([]*Authorization, error) {
77 if req.AggregatorDID == "" {
78 return nil, NewValidationError("aggregatorDid", "aggregator DID is required")
79 }
80
81 // Apply defaults and limits
82 if req.Limit <= 0 {
83 req.Limit = DefaultQueryLimit
84 }
85 if req.Limit > MaxQueryLimit {
86 req.Limit = MaxQueryLimit
87 }
88 if req.Offset < 0 {
89 req.Offset = 0
90 }
91
92 return s.repo.ListAuthorizationsForAggregator(ctx, req.AggregatorDID, req.EnabledOnly, req.Limit, req.Offset)
93}
94
95// ListAggregatorsForCommunity retrieves all aggregators authorized by a community
96func (s *aggregatorService) ListAggregatorsForCommunity(ctx context.Context, req ListForCommunityRequest) ([]*Authorization, error) {
97 if req.CommunityDID == "" {
98 return nil, NewValidationError("communityDid", "community DID is required")
99 }
100
101 // Apply defaults and limits
102 if req.Limit <= 0 {
103 req.Limit = DefaultQueryLimit
104 }
105 if req.Limit > MaxQueryLimit {
106 req.Limit = MaxQueryLimit
107 }
108 if req.Offset < 0 {
109 req.Offset = 0
110 }
111
112 return s.repo.ListAuthorizationsForCommunity(ctx, req.CommunityDID, req.EnabledOnly, req.Limit, req.Offset)
113}
114
115// ===== Authorization Management (Write-forward to PDS) =====
116
117// EnableAggregator creates an authorization record for an aggregator in a community
118// Following Bluesky's pattern: similar to enabling a labeler or feed generator
119// Note: This is a PLACEHOLDER for the write-forward implementation
120// TODO: Implement actual XRPC write to community's PDS repository
121func (s *aggregatorService) EnableAggregator(ctx context.Context, req EnableAggregatorRequest) (*Authorization, error) {
122 // Validate request
123 if err := s.validateEnableRequest(ctx, req); err != nil {
124 return nil, err
125 }
126
127 // Verify aggregator exists
128 aggregator, err := s.repo.GetAggregator(ctx, req.AggregatorDID)
129 if err != nil {
130 return nil, err
131 }
132
133 // Validate config against aggregator's schema if provided
134 if len(req.Config) > 0 && len(aggregator.ConfigSchema) > 0 {
135 if err := s.validateConfig(req.Config, aggregator.ConfigSchema); err != nil {
136 return nil, err
137 }
138 }
139
140 // Check if already authorized
141 existing, err := s.repo.GetAuthorization(ctx, req.AggregatorDID, req.CommunityDID)
142 if err == nil && existing.Enabled {
143 return nil, ErrAlreadyAuthorized
144 }
145
146 // TODO Phase 2: Write-forward to PDS
147 // For now, return placeholder response
148 // The actual implementation will:
149 // 1. Create authorization record in community's repository on PDS
150 // 2. Wait for Jetstream to index it
151 // 3. Return the indexed authorization
152 //
153 // Record structure:
154 // at://community_did/social.coves.aggregator.authorization/{rkey}
155 // {
156 // "$type": "social.coves.aggregator.authorization",
157 // "aggregator": req.AggregatorDID,
158 // "enabled": true,
159 // "config": req.Config,
160 // "createdBy": req.EnabledByDID,
161 // "createdAt": "2025-10-20T12:00:00Z"
162 // }
163
164 return nil, ErrNotImplemented
165}
166
167// DisableAggregator updates an authorization to disabled
168// Note: This is a PLACEHOLDER for the write-forward implementation
169func (s *aggregatorService) DisableAggregator(ctx context.Context, req DisableAggregatorRequest) (*Authorization, error) {
170 // Validate request
171 if err := s.validateDisableRequest(ctx, req); err != nil {
172 return nil, err
173 }
174
175 // Verify authorization exists
176 auth, err := s.repo.GetAuthorization(ctx, req.AggregatorDID, req.CommunityDID)
177 if err != nil {
178 return nil, err
179 }
180
181 if !auth.Enabled {
182 // Already disabled
183 return auth, nil
184 }
185
186 // TODO Phase 2: Write-forward to PDS
187 // Update the authorization record with enabled=false
188 return nil, ErrNotImplemented
189}
190
191// UpdateAggregatorConfig updates an aggregator's configuration
192// Note: This is a PLACEHOLDER for the write-forward implementation
193func (s *aggregatorService) UpdateAggregatorConfig(ctx context.Context, req UpdateConfigRequest) (*Authorization, error) {
194 // Validate request
195 if err := s.validateUpdateConfigRequest(ctx, req); err != nil {
196 return nil, err
197 }
198
199 // Verify authorization exists
200 auth, err := s.repo.GetAuthorization(ctx, req.AggregatorDID, req.CommunityDID)
201 if err != nil {
202 return nil, err
203 }
204
205 // Get aggregator for schema validation
206 aggregator, err := s.repo.GetAggregator(ctx, req.AggregatorDID)
207 if err != nil {
208 return nil, err
209 }
210
211 // Validate new config against schema
212 if len(req.Config) > 0 && len(aggregator.ConfigSchema) > 0 {
213 if err := s.validateConfig(req.Config, aggregator.ConfigSchema); err != nil {
214 return nil, err
215 }
216 }
217
218 // TODO Phase 2: Write-forward to PDS
219 // Update the authorization record with new config
220 return auth, ErrNotImplemented
221}
222
223// ===== Validation and Authorization Checks =====
224
225// ValidateAggregatorPost validates that an aggregator can post to a community
226// Checks: 1) Authorization exists and is enabled, 2) Rate limit not exceeded
227// This is called by the post creation handler BEFORE writing to PDS
228func (s *aggregatorService) ValidateAggregatorPost(ctx context.Context, aggregatorDID, communityDID string) error {
229 // Check authorization exists and is enabled
230 authorized, err := s.repo.IsAuthorized(ctx, aggregatorDID, communityDID)
231 if err != nil {
232 return fmt.Errorf("failed to check authorization: %w", err)
233 }
234 if !authorized {
235 return ErrNotAuthorized
236 }
237
238 // Check rate limit (10 posts per hour per community)
239 since := time.Now().Add(-RateLimitWindow)
240 recentPostCount, err := s.repo.CountRecentPosts(ctx, aggregatorDID, communityDID, since)
241 if err != nil {
242 return fmt.Errorf("failed to check rate limit: %w", err)
243 }
244
245 if recentPostCount >= RateLimitMaxPosts {
246 return ErrRateLimitExceeded
247 }
248
249 return nil
250}
251
252// IsAggregator checks if a DID is a registered aggregator
253// Fast check used by post creation handler
254func (s *aggregatorService) IsAggregator(ctx context.Context, did string) (bool, error) {
255 if did == "" {
256 return false, nil
257 }
258 return s.repo.IsAggregator(ctx, did)
259}
260
261// RecordAggregatorPost tracks a post created by an aggregator
262// Called AFTER successful post creation to update statistics and rate limiting
263func (s *aggregatorService) RecordAggregatorPost(ctx context.Context, aggregatorDID, communityDID, postURI, postCID string) error {
264 if aggregatorDID == "" || communityDID == "" || postURI == "" || postCID == "" {
265 return NewValidationError("post_tracking", "aggregatorDID, communityDID, postURI, and postCID are required")
266 }
267
268 return s.repo.RecordAggregatorPost(ctx, aggregatorDID, communityDID, postURI, postCID)
269}
270
271// ===== Validation Helpers =====
272
273func (s *aggregatorService) validateEnableRequest(ctx context.Context, req EnableAggregatorRequest) error {
274 if req.AggregatorDID == "" {
275 return NewValidationError("aggregatorDid", "aggregator DID is required")
276 }
277 if req.CommunityDID == "" {
278 return NewValidationError("communityDid", "community DID is required")
279 }
280 if req.EnabledByDID == "" {
281 return NewValidationError("enabledByDid", "enabledByDID is required")
282 }
283
284 // Verify user is a moderator of the community
285 // TODO: Implement moderator check
286 // membership, err := s.communityService.GetMembership(ctx, req.EnabledByDID, req.CommunityDID)
287 // if err != nil || !membership.IsModerator {
288 // return ErrNotModerator
289 // }
290
291 return nil
292}
293
294func (s *aggregatorService) validateDisableRequest(ctx context.Context, req DisableAggregatorRequest) error {
295 if req.AggregatorDID == "" {
296 return NewValidationError("aggregatorDid", "aggregator DID is required")
297 }
298 if req.CommunityDID == "" {
299 return NewValidationError("communityDid", "community DID is required")
300 }
301 if req.DisabledByDID == "" {
302 return NewValidationError("disabledByDid", "disabledByDID is required")
303 }
304
305 // Verify user is a moderator of the community
306 // TODO: Implement moderator check
307
308 return nil
309}
310
311func (s *aggregatorService) validateUpdateConfigRequest(ctx context.Context, req UpdateConfigRequest) error {
312 if req.AggregatorDID == "" {
313 return NewValidationError("aggregatorDid", "aggregator DID is required")
314 }
315 if req.CommunityDID == "" {
316 return NewValidationError("communityDid", "community DID is required")
317 }
318 if req.UpdatedByDID == "" {
319 return NewValidationError("updatedByDid", "updatedByDID is required")
320 }
321 if len(req.Config) == 0 {
322 return NewValidationError("config", "config is required")
323 }
324
325 // Verify user is a moderator of the community
326 // TODO: Implement moderator check
327
328 return nil
329}
330
331// validateConfig validates a config object against a JSON Schema
332// Following Bluesky's pattern for feed generator configuration
333func (s *aggregatorService) validateConfig(config map[string]interface{}, schemaBytes []byte) error {
334 // Parse schema
335 schemaLoader := gojsonschema.NewBytesLoader(schemaBytes)
336
337 // Convert config to JSON bytes
338 configBytes, err := json.Marshal(config)
339 if err != nil {
340 return fmt.Errorf("failed to marshal config: %w", err)
341 }
342 configLoader := gojsonschema.NewBytesLoader(configBytes)
343
344 // Validate
345 result, err := gojsonschema.Validate(schemaLoader, configLoader)
346 if err != nil {
347 return fmt.Errorf("failed to validate config: %w", err)
348 }
349
350 if !result.Valid() {
351 // Collect validation errors
352 var errorMessages []string
353 for _, desc := range result.Errors() {
354 errorMessages = append(errorMessages, desc.String())
355 }
356 return fmt.Errorf("%w: %s", ErrConfigSchemaValidation, errorMessages)
357 }
358
359 return nil
360}