A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/core/aggregators"
5 "context"
6 "encoding/json"
7 "fmt"
8 "log"
9 "time"
10)
11
12// AggregatorEventConsumer consumes aggregator-related events from Jetstream
13// Following Bluesky's pattern: feed generators (app.bsky.feed.generator) and labelers (app.bsky.labeler.service)
14type AggregatorEventConsumer struct {
15 repo aggregators.Repository // Repository for aggregator operations
16}
17
18// NewAggregatorEventConsumer creates a new Jetstream consumer for aggregator events
19func NewAggregatorEventConsumer(repo aggregators.Repository) *AggregatorEventConsumer {
20 return &AggregatorEventConsumer{
21 repo: repo,
22 }
23}
24
25// HandleEvent processes a Jetstream event for aggregator records
26// This is called by the main Jetstream consumer when it receives commit events
27func (c *AggregatorEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
28 // We only care about commit events for aggregator records
29 if event.Kind != "commit" || event.Commit == nil {
30 return nil
31 }
32
33 commit := event.Commit
34
35 // Route to appropriate handler based on collection
36 // IMPORTANT: Collection names refer to RECORD TYPES in repositories
37 // - social.coves.aggregator.service: Service declaration (in aggregator's own repo, rkey="self")
38 // - social.coves.aggregator.authorization: Authorization (in community's repo, any rkey)
39 switch commit.Collection {
40 case "social.coves.aggregator.service":
41 return c.handleServiceDeclaration(ctx, event.Did, commit)
42 case "social.coves.aggregator.authorization":
43 return c.handleAuthorization(ctx, event.Did, commit)
44 default:
45 // Not an aggregator-related collection
46 return nil
47 }
48}
49
50// handleServiceDeclaration processes aggregator service declaration events
51// Service declarations are stored at: at://aggregator_did/social.coves.aggregator.service/self
52func (c *AggregatorEventConsumer) handleServiceDeclaration(ctx context.Context, did string, commit *CommitEvent) error {
53 switch commit.Operation {
54 case "create", "update":
55 // Both create and update are handled the same way (upsert)
56 return c.upsertAggregator(ctx, did, commit)
57 case "delete":
58 return c.deleteAggregator(ctx, did)
59 default:
60 log.Printf("Unknown operation for aggregator service: %s", commit.Operation)
61 return nil
62 }
63}
64
65// handleAuthorization processes authorization record events
66// Authorizations are stored at: at://community_did/social.coves.aggregator.authorization/{rkey}
67func (c *AggregatorEventConsumer) handleAuthorization(ctx context.Context, communityDID string, commit *CommitEvent) error {
68 switch commit.Operation {
69 case "create", "update":
70 // Both create and update are handled the same way (upsert)
71 return c.upsertAuthorization(ctx, communityDID, commit)
72 case "delete":
73 return c.deleteAuthorization(ctx, communityDID, commit)
74 default:
75 log.Printf("Unknown operation for aggregator authorization: %s", commit.Operation)
76 return nil
77 }
78}
79
80// upsertAggregator indexes or updates an aggregator service declaration
81func (c *AggregatorEventConsumer) upsertAggregator(ctx context.Context, did string, commit *CommitEvent) error {
82 if commit.Record == nil {
83 return fmt.Errorf("aggregator service event missing record data")
84 }
85
86 // Verify rkey is "self" (canonical location for service declaration)
87 // Following Bluesky's pattern: app.bsky.feed.generator and app.bsky.labeler.service use /self
88 if commit.RKey != "self" {
89 return fmt.Errorf("invalid aggregator service rkey: expected 'self', got '%s'", commit.RKey)
90 }
91
92 // Parse the service declaration record
93 service, err := parseAggregatorService(commit.Record)
94 if err != nil {
95 return fmt.Errorf("failed to parse aggregator service: %w", err)
96 }
97
98 // Validate DID matches repo DID (security check)
99 if service.DID != "" && service.DID != did {
100 return fmt.Errorf("service record DID (%s) does not match repo DID (%s)", service.DID, did)
101 }
102
103 // Build AT-URI for this record
104 uri := fmt.Sprintf("at://%s/social.coves.aggregator.service/self", did)
105
106 // Parse createdAt from service record
107 var createdAt time.Time
108 if service.CreatedAt != "" {
109 createdAt, err = time.Parse(time.RFC3339, service.CreatedAt)
110 if err != nil {
111 createdAt = time.Now() // Fallback
112 log.Printf("Warning: invalid createdAt format for aggregator %s: %v", did, err)
113 }
114 } else {
115 createdAt = time.Now()
116 }
117
118 // Extract avatar CID from blob if present
119 var avatarCID string
120 if service.Avatar != nil {
121 if cid, ok := extractBlobCID(service.Avatar); ok {
122 avatarCID = cid
123 }
124 }
125
126 // Build aggregator domain model
127 agg := &aggregators.Aggregator{
128 DID: did,
129 DisplayName: service.DisplayName,
130 Description: service.Description,
131 AvatarURL: avatarCID, // Now contains the CID from blob
132 MaintainerDID: service.MaintainerDID,
133 SourceURL: service.SourceURL,
134 CreatedAt: createdAt,
135 IndexedAt: time.Now(),
136 RecordURI: uri,
137 RecordCID: commit.CID,
138 }
139
140 // Handle config schema (JSONB)
141 if service.ConfigSchema != nil {
142 schemaBytes, err := json.Marshal(service.ConfigSchema)
143 if err != nil {
144 return fmt.Errorf("failed to marshal config schema: %w", err)
145 }
146 agg.ConfigSchema = schemaBytes
147 }
148
149 // Create or update in database
150 if err := c.repo.CreateAggregator(ctx, agg); err != nil {
151 return fmt.Errorf("failed to index aggregator: %w", err)
152 }
153
154 log.Printf("[AGGREGATOR-CONSUMER] Indexed service: %s (%s)", agg.DisplayName, did)
155 return nil
156}
157
158// deleteAggregator removes an aggregator from the index
159func (c *AggregatorEventConsumer) deleteAggregator(ctx context.Context, did string) error {
160 // Delete from database (cascade deletes authorizations and posts via FK)
161 if err := c.repo.DeleteAggregator(ctx, did); err != nil {
162 // Log but don't fail if not found (idempotent delete)
163 if aggregators.IsNotFound(err) {
164 log.Printf("[AGGREGATOR-CONSUMER] Aggregator not found for deletion: %s (already deleted?)", did)
165 return nil
166 }
167 return fmt.Errorf("failed to delete aggregator: %w", err)
168 }
169
170 log.Printf("[AGGREGATOR-CONSUMER] Deleted aggregator: %s", did)
171 return nil
172}
173
174// upsertAuthorization indexes or updates an authorization record
175func (c *AggregatorEventConsumer) upsertAuthorization(ctx context.Context, communityDID string, commit *CommitEvent) error {
176 if commit.Record == nil {
177 return fmt.Errorf("authorization event missing record data")
178 }
179
180 // Parse the authorization record
181 authRecord, err := parseAggregatorAuthorization(commit.Record)
182 if err != nil {
183 return fmt.Errorf("failed to parse authorization: %w", err)
184 }
185
186 // Validate communityDid matches repo DID (security check)
187 if authRecord.CommunityDid != "" && authRecord.CommunityDid != communityDID {
188 return fmt.Errorf("authorization record communityDid (%s) does not match repo DID (%s)",
189 authRecord.CommunityDid, communityDID)
190 }
191
192 // Build AT-URI for this record
193 uri := fmt.Sprintf("at://%s/social.coves.aggregator.authorization/%s", communityDID, commit.RKey)
194
195 // Parse createdAt from authorization record
196 var createdAt time.Time
197 if authRecord.CreatedAt != "" {
198 createdAt, err = time.Parse(time.RFC3339, authRecord.CreatedAt)
199 if err != nil {
200 createdAt = time.Now() // Fallback
201 log.Printf("Warning: invalid createdAt format for authorization %s: %v", uri, err)
202 }
203 } else {
204 createdAt = time.Now()
205 }
206
207 // Parse disabledAt from authorization record (optional, for modlog/audit)
208 var disabledAt *time.Time
209 if authRecord.DisabledAt != "" {
210 parsed, err := time.Parse(time.RFC3339, authRecord.DisabledAt)
211 if err != nil {
212 log.Printf("Warning: invalid disabledAt format for authorization %s: %v", uri, err)
213 } else {
214 disabledAt = &parsed
215 }
216 }
217
218 // Build authorization domain model
219 auth := &aggregators.Authorization{
220 AggregatorDID: authRecord.Aggregator,
221 CommunityDID: communityDID,
222 Enabled: authRecord.Enabled,
223 CreatedBy: authRecord.CreatedBy,
224 DisabledBy: authRecord.DisabledBy,
225 DisabledAt: disabledAt,
226 CreatedAt: createdAt,
227 IndexedAt: time.Now(),
228 RecordURI: uri,
229 RecordCID: commit.CID,
230 }
231
232 // Handle config (JSONB)
233 if authRecord.Config != nil {
234 configBytes, err := json.Marshal(authRecord.Config)
235 if err != nil {
236 return fmt.Errorf("failed to marshal config: %w", err)
237 }
238 auth.Config = configBytes
239 }
240
241 // Create or update in database
242 if err := c.repo.CreateAuthorization(ctx, auth); err != nil {
243 return fmt.Errorf("failed to index authorization: %w", err)
244 }
245
246 log.Printf("[AGGREGATOR-CONSUMER] Indexed authorization: community=%s, aggregator=%s, enabled=%v",
247 communityDID, authRecord.Aggregator, authRecord.Enabled)
248 return nil
249}
250
251// deleteAuthorization removes an authorization from the index
252func (c *AggregatorEventConsumer) deleteAuthorization(ctx context.Context, communityDID string, commit *CommitEvent) error {
253 // Build AT-URI to find the authorization
254 uri := fmt.Sprintf("at://%s/social.coves.aggregator.authorization/%s", communityDID, commit.RKey)
255
256 // Delete from database
257 if err := c.repo.DeleteAuthorizationByURI(ctx, uri); err != nil {
258 // Log but don't fail if not found (idempotent delete)
259 if aggregators.IsNotFound(err) {
260 log.Printf("[AGGREGATOR-CONSUMER] Authorization not found for deletion: %s (already deleted?)", uri)
261 return nil
262 }
263 return fmt.Errorf("failed to delete authorization: %w", err)
264 }
265
266 log.Printf("[AGGREGATOR-CONSUMER] Deleted authorization: %s", uri)
267 return nil
268}
269
270// ===== Record Parsing Functions =====
271
272// AggregatorServiceRecord represents the service declaration record structure
273type AggregatorServiceRecord struct {
274 Type string `json:"$type"`
275 DID string `json:"did"` // DID of aggregator (must match repo DID)
276 DisplayName string `json:"displayName"`
277 Description string `json:"description,omitempty"`
278 Avatar map[string]interface{} `json:"avatar,omitempty"` // Blob reference (CID will be extracted)
279 ConfigSchema map[string]interface{} `json:"configSchema,omitempty"` // JSON Schema
280 MaintainerDID string `json:"maintainer,omitempty"` // Fixed: was maintainerDid
281 SourceURL string `json:"sourceUrl,omitempty"` // Fixed: was homepageUrl
282 CreatedAt string `json:"createdAt"`
283}
284
285// parseAggregatorService parses an aggregator service record
286func parseAggregatorService(record interface{}) (*AggregatorServiceRecord, error) {
287 recordBytes, err := json.Marshal(record)
288 if err != nil {
289 return nil, fmt.Errorf("failed to marshal record: %w", err)
290 }
291
292 var service AggregatorServiceRecord
293 if err := json.Unmarshal(recordBytes, &service); err != nil {
294 return nil, fmt.Errorf("failed to unmarshal service record: %w", err)
295 }
296
297 // Validate required fields
298 if service.DisplayName == "" {
299 return nil, fmt.Errorf("displayName is required")
300 }
301
302 return &service, nil
303}
304
305// Note: extractBlobCID is defined in community_consumer.go and shared across consumers
306
307// AggregatorAuthorizationRecord represents the authorization record structure
308type AggregatorAuthorizationRecord struct {
309 Config map[string]interface{} `json:"config,omitempty"`
310 Type string `json:"$type"`
311 Aggregator string `json:"aggregatorDid"`
312 CommunityDid string `json:"communityDid"`
313 CreatedBy string `json:"createdBy"`
314 DisabledBy string `json:"disabledBy,omitempty"`
315 DisabledAt string `json:"disabledAt,omitempty"`
316 CreatedAt string `json:"createdAt"`
317 Enabled bool `json:"enabled"`
318}
319
320// parseAggregatorAuthorization parses an aggregator authorization record
321func parseAggregatorAuthorization(record interface{}) (*AggregatorAuthorizationRecord, error) {
322 recordBytes, err := json.Marshal(record)
323 if err != nil {
324 return nil, fmt.Errorf("failed to marshal record: %w", err)
325 }
326
327 var auth AggregatorAuthorizationRecord
328 if err := json.Unmarshal(recordBytes, &auth); err != nil {
329 return nil, fmt.Errorf("failed to unmarshal authorization record: %w", err)
330 }
331
332 // Validate required fields per lexicon
333 if auth.Aggregator == "" {
334 return nil, fmt.Errorf("aggregatorDid is required")
335 }
336 if auth.CommunityDid == "" {
337 return nil, fmt.Errorf("communityDid is required")
338 }
339 if auth.CreatedAt == "" {
340 return nil, fmt.Errorf("createdAt is required")
341 }
342 if auth.CreatedBy == "" {
343 return nil, fmt.Errorf("createdBy is required")
344 }
345
346 return &auth, nil
347}