A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "time"
9
10 "Coves/internal/core/aggregators"
11)
12
13// AggregatorEventConsumer consumes aggregator-related events from Jetstream
14// Following Bluesky's pattern: feed generators (app.bsky.feed.generator) and labelers (app.bsky.labeler.service)
15type AggregatorEventConsumer struct {
16 repo aggregators.Repository // Repository for aggregator operations
17}
18
19// NewAggregatorEventConsumer creates a new Jetstream consumer for aggregator events
20func NewAggregatorEventConsumer(repo aggregators.Repository) *AggregatorEventConsumer {
21 return &AggregatorEventConsumer{
22 repo: repo,
23 }
24}
25
26// HandleEvent processes a Jetstream event for aggregator records
27// This is called by the main Jetstream consumer when it receives commit events
28func (c *AggregatorEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
29 // We only care about commit events for aggregator records
30 if event.Kind != "commit" || event.Commit == nil {
31 return nil
32 }
33
34 commit := event.Commit
35
36 // Route to appropriate handler based on collection
37 // IMPORTANT: Collection names refer to RECORD TYPES in repositories
38 // - social.coves.aggregator.service: Service declaration (in aggregator's own repo, rkey="self")
39 // - social.coves.aggregator.authorization: Authorization (in community's repo, any rkey)
40 switch commit.Collection {
41 case "social.coves.aggregator.service":
42 return c.handleServiceDeclaration(ctx, event.Did, commit)
43 case "social.coves.aggregator.authorization":
44 return c.handleAuthorization(ctx, event.Did, commit)
45 default:
46 // Not an aggregator-related collection
47 return nil
48 }
49}
50
51// handleServiceDeclaration processes aggregator service declaration events
52// Service declarations are stored at: at://aggregator_did/social.coves.aggregator.service/self
53func (c *AggregatorEventConsumer) handleServiceDeclaration(ctx context.Context, did string, commit *CommitEvent) error {
54 switch commit.Operation {
55 case "create", "update":
56 // Both create and update are handled the same way (upsert)
57 return c.upsertAggregator(ctx, did, commit)
58 case "delete":
59 return c.deleteAggregator(ctx, did)
60 default:
61 log.Printf("Unknown operation for aggregator service: %s", commit.Operation)
62 return nil
63 }
64}
65
66// handleAuthorization processes authorization record events
67// Authorizations are stored at: at://community_did/social.coves.aggregator.authorization/{rkey}
68func (c *AggregatorEventConsumer) handleAuthorization(ctx context.Context, communityDID string, commit *CommitEvent) error {
69 switch commit.Operation {
70 case "create", "update":
71 // Both create and update are handled the same way (upsert)
72 return c.upsertAuthorization(ctx, communityDID, commit)
73 case "delete":
74 return c.deleteAuthorization(ctx, communityDID, commit)
75 default:
76 log.Printf("Unknown operation for aggregator authorization: %s", commit.Operation)
77 return nil
78 }
79}
80
81// upsertAggregator indexes or updates an aggregator service declaration
82func (c *AggregatorEventConsumer) upsertAggregator(ctx context.Context, did string, commit *CommitEvent) error {
83 if commit.Record == nil {
84 return fmt.Errorf("aggregator service event missing record data")
85 }
86
87 // Verify rkey is "self" (canonical location for service declaration)
88 // Following Bluesky's pattern: app.bsky.feed.generator and app.bsky.labeler.service use /self
89 if commit.RKey != "self" {
90 return fmt.Errorf("invalid aggregator service rkey: expected 'self', got '%s'", commit.RKey)
91 }
92
93 // Parse the service declaration record
94 service, err := parseAggregatorService(commit.Record)
95 if err != nil {
96 return fmt.Errorf("failed to parse aggregator service: %w", err)
97 }
98
99 // Validate DID matches repo DID (security check)
100 if service.DID != "" && service.DID != did {
101 return fmt.Errorf("service record DID (%s) does not match repo DID (%s)", service.DID, did)
102 }
103
104 // Build AT-URI for this record
105 uri := fmt.Sprintf("at://%s/social.coves.aggregator.service/self", did)
106
107 // Parse createdAt from service record
108 var createdAt time.Time
109 if service.CreatedAt != "" {
110 createdAt, err = time.Parse(time.RFC3339, service.CreatedAt)
111 if err != nil {
112 createdAt = time.Now() // Fallback
113 log.Printf("Warning: invalid createdAt format for aggregator %s: %v", did, err)
114 }
115 } else {
116 createdAt = time.Now()
117 }
118
119 // Extract avatar CID from blob if present
120 var avatarCID string
121 if service.Avatar != nil {
122 if cid, ok := extractBlobCID(service.Avatar); ok {
123 avatarCID = cid
124 }
125 }
126
127 // Build aggregator domain model
128 agg := &aggregators.Aggregator{
129 DID: did,
130 DisplayName: service.DisplayName,
131 Description: service.Description,
132 AvatarURL: avatarCID, // Now contains the CID from blob
133 MaintainerDID: service.MaintainerDID,
134 SourceURL: service.SourceURL,
135 CreatedAt: createdAt,
136 IndexedAt: time.Now(),
137 RecordURI: uri,
138 RecordCID: commit.CID,
139 }
140
141 // Handle config schema (JSONB)
142 if service.ConfigSchema != nil {
143 schemaBytes, err := json.Marshal(service.ConfigSchema)
144 if err != nil {
145 return fmt.Errorf("failed to marshal config schema: %w", err)
146 }
147 agg.ConfigSchema = schemaBytes
148 }
149
150 // Create or update in database
151 if err := c.repo.CreateAggregator(ctx, agg); err != nil {
152 return fmt.Errorf("failed to index aggregator: %w", err)
153 }
154
155 log.Printf("[AGGREGATOR-CONSUMER] Indexed service: %s (%s)", agg.DisplayName, did)
156 return nil
157}
158
159// deleteAggregator removes an aggregator from the index
160func (c *AggregatorEventConsumer) deleteAggregator(ctx context.Context, did string) error {
161 // Delete from database (cascade deletes authorizations and posts via FK)
162 if err := c.repo.DeleteAggregator(ctx, did); err != nil {
163 // Log but don't fail if not found (idempotent delete)
164 if aggregators.IsNotFound(err) {
165 log.Printf("[AGGREGATOR-CONSUMER] Aggregator not found for deletion: %s (already deleted?)", did)
166 return nil
167 }
168 return fmt.Errorf("failed to delete aggregator: %w", err)
169 }
170
171 log.Printf("[AGGREGATOR-CONSUMER] Deleted aggregator: %s", did)
172 return nil
173}
174
175// upsertAuthorization indexes or updates an authorization record
176func (c *AggregatorEventConsumer) upsertAuthorization(ctx context.Context, communityDID string, commit *CommitEvent) error {
177 if commit.Record == nil {
178 return fmt.Errorf("authorization event missing record data")
179 }
180
181 // Parse the authorization record
182 authRecord, err := parseAggregatorAuthorization(commit.Record)
183 if err != nil {
184 return fmt.Errorf("failed to parse authorization: %w", err)
185 }
186
187 // Validate communityDid matches repo DID (security check)
188 if authRecord.CommunityDid != "" && authRecord.CommunityDid != communityDID {
189 return fmt.Errorf("authorization record communityDid (%s) does not match repo DID (%s)",
190 authRecord.CommunityDid, communityDID)
191 }
192
193 // Build AT-URI for this record
194 uri := fmt.Sprintf("at://%s/social.coves.aggregator.authorization/%s", communityDID, commit.RKey)
195
196 // Parse createdAt from authorization record
197 var createdAt time.Time
198 if authRecord.CreatedAt != "" {
199 createdAt, err = time.Parse(time.RFC3339, authRecord.CreatedAt)
200 if err != nil {
201 createdAt = time.Now() // Fallback
202 log.Printf("Warning: invalid createdAt format for authorization %s: %v", uri, err)
203 }
204 } else {
205 createdAt = time.Now()
206 }
207
208 // Parse disabledAt from authorization record (optional, for modlog/audit)
209 var disabledAt *time.Time
210 if authRecord.DisabledAt != "" {
211 parsed, err := time.Parse(time.RFC3339, authRecord.DisabledAt)
212 if err != nil {
213 log.Printf("Warning: invalid disabledAt format for authorization %s: %v", uri, err)
214 } else {
215 disabledAt = &parsed
216 }
217 }
218
219 // Build authorization domain model
220 auth := &aggregators.Authorization{
221 AggregatorDID: authRecord.Aggregator,
222 CommunityDID: communityDID,
223 Enabled: authRecord.Enabled,
224 CreatedBy: authRecord.CreatedBy,
225 DisabledBy: authRecord.DisabledBy,
226 DisabledAt: disabledAt,
227 CreatedAt: createdAt,
228 IndexedAt: time.Now(),
229 RecordURI: uri,
230 RecordCID: commit.CID,
231 }
232
233 // Handle config (JSONB)
234 if authRecord.Config != nil {
235 configBytes, err := json.Marshal(authRecord.Config)
236 if err != nil {
237 return fmt.Errorf("failed to marshal config: %w", err)
238 }
239 auth.Config = configBytes
240 }
241
242 // Create or update in database
243 if err := c.repo.CreateAuthorization(ctx, auth); err != nil {
244 return fmt.Errorf("failed to index authorization: %w", err)
245 }
246
247 log.Printf("[AGGREGATOR-CONSUMER] Indexed authorization: community=%s, aggregator=%s, enabled=%v",
248 communityDID, authRecord.Aggregator, authRecord.Enabled)
249 return nil
250}
251
252// deleteAuthorization removes an authorization from the index
253func (c *AggregatorEventConsumer) deleteAuthorization(ctx context.Context, communityDID string, commit *CommitEvent) error {
254 // Build AT-URI to find the authorization
255 uri := fmt.Sprintf("at://%s/social.coves.aggregator.authorization/%s", communityDID, commit.RKey)
256
257 // Delete from database
258 if err := c.repo.DeleteAuthorizationByURI(ctx, uri); err != nil {
259 // Log but don't fail if not found (idempotent delete)
260 if aggregators.IsNotFound(err) {
261 log.Printf("[AGGREGATOR-CONSUMER] Authorization not found for deletion: %s (already deleted?)", uri)
262 return nil
263 }
264 return fmt.Errorf("failed to delete authorization: %w", err)
265 }
266
267 log.Printf("[AGGREGATOR-CONSUMER] Deleted authorization: %s", uri)
268 return nil
269}
270
271// ===== Record Parsing Functions =====
272
273// AggregatorServiceRecord represents the service declaration record structure
274type AggregatorServiceRecord struct {
275 Type string `json:"$type"`
276 DID string `json:"did"` // DID of aggregator (must match repo DID)
277 DisplayName string `json:"displayName"`
278 Description string `json:"description,omitempty"`
279 Avatar map[string]interface{} `json:"avatar,omitempty"` // Blob reference (CID will be extracted)
280 ConfigSchema map[string]interface{} `json:"configSchema,omitempty"` // JSON Schema
281 MaintainerDID string `json:"maintainer,omitempty"` // Fixed: was maintainerDid
282 SourceURL string `json:"sourceUrl,omitempty"` // Fixed: was homepageUrl
283 CreatedAt string `json:"createdAt"`
284}
285
286// parseAggregatorService parses an aggregator service record
287func parseAggregatorService(record interface{}) (*AggregatorServiceRecord, error) {
288 recordBytes, err := json.Marshal(record)
289 if err != nil {
290 return nil, fmt.Errorf("failed to marshal record: %w", err)
291 }
292
293 var service AggregatorServiceRecord
294 if err := json.Unmarshal(recordBytes, &service); err != nil {
295 return nil, fmt.Errorf("failed to unmarshal service record: %w", err)
296 }
297
298 // Validate required fields
299 if service.DisplayName == "" {
300 return nil, fmt.Errorf("displayName is required")
301 }
302
303 return &service, nil
304}
305
306// Note: extractBlobCID is defined in community_consumer.go and shared across consumers
307
308// AggregatorAuthorizationRecord represents the authorization record structure
309type AggregatorAuthorizationRecord struct {
310 Config map[string]interface{} `json:"config,omitempty"`
311 Type string `json:"$type"`
312 Aggregator string `json:"aggregatorDid"`
313 CommunityDid string `json:"communityDid"`
314 CreatedBy string `json:"createdBy"`
315 DisabledBy string `json:"disabledBy,omitempty"`
316 DisabledAt string `json:"disabledAt,omitempty"`
317 CreatedAt string `json:"createdAt"`
318 Enabled bool `json:"enabled"`
319}
320
321// parseAggregatorAuthorization parses an aggregator authorization record
322func parseAggregatorAuthorization(record interface{}) (*AggregatorAuthorizationRecord, error) {
323 recordBytes, err := json.Marshal(record)
324 if err != nil {
325 return nil, fmt.Errorf("failed to marshal record: %w", err)
326 }
327
328 var auth AggregatorAuthorizationRecord
329 if err := json.Unmarshal(recordBytes, &auth); err != nil {
330 return nil, fmt.Errorf("failed to unmarshal authorization record: %w", err)
331 }
332
333 // Validate required fields per lexicon
334 if auth.Aggregator == "" {
335 return nil, fmt.Errorf("aggregatorDid is required")
336 }
337 if auth.CommunityDid == "" {
338 return nil, fmt.Errorf("communityDid is required")
339 }
340 if auth.CreatedAt == "" {
341 return nil, fmt.Errorf("createdAt is required")
342 }
343 if auth.CreatedBy == "" {
344 return nil, fmt.Errorf("createdBy is required")
345 }
346
347 return &auth, nil
348}