A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/atproto/utils"
5 "Coves/internal/core/communities"
6 "context"
7 "encoding/json"
8 "fmt"
9 "log"
10 "time"
11)
12
13// CommunityEventConsumer consumes community-related events from Jetstream
14type CommunityEventConsumer struct {
15 repo communities.Repository
16}
17
18// NewCommunityEventConsumer creates a new Jetstream consumer for community events
19func NewCommunityEventConsumer(repo communities.Repository) *CommunityEventConsumer {
20 return &CommunityEventConsumer{
21 repo: repo,
22 }
23}
24
25// HandleEvent processes a Jetstream event for community records
26// This is called by the main Jetstream consumer when it receives commit events
27func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
28 // We only care about commit events for community records
29 if event.Kind != "commit" || event.Commit == nil {
30 return nil
31 }
32
33 commit := event.Commit
34
35 // Route to appropriate handler based on collection
36 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures
37 // - social.coves.community.profile: Community profile records (in community's own repo)
38 // - social.coves.community.subscription: Subscription records (in user's repo)
39 // - social.coves.community.block: Block records (in user's repo)
40 //
41 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints
42 // that CREATE or DELETE records in these collections
43 switch commit.Collection {
44 case "social.coves.community.profile":
45 return c.handleCommunityProfile(ctx, event.Did, commit)
46 case "social.coves.community.subscription":
47 // Handle both create (subscribe) and delete (unsubscribe) operations
48 return c.handleSubscription(ctx, event.Did, commit)
49 case "social.coves.community.block":
50 // Handle both create (block) and delete (unblock) operations
51 return c.handleBlock(ctx, event.Did, commit)
52 default:
53 // Not a community-related collection
54 return nil
55 }
56}
57
58// handleCommunityProfile processes community profile create/update/delete events
59func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error {
60 switch commit.Operation {
61 case "create":
62 return c.createCommunity(ctx, did, commit)
63 case "update":
64 return c.updateCommunity(ctx, did, commit)
65 case "delete":
66 return c.deleteCommunity(ctx, did)
67 default:
68 log.Printf("Unknown operation for community profile: %s", commit.Operation)
69 return nil
70 }
71}
72
73// createCommunity indexes a new community from the firehose
74func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error {
75 if commit.Record == nil {
76 return fmt.Errorf("community profile create event missing record data")
77 }
78
79 // Parse the community profile record
80 profile, err := parseCommunityProfile(commit.Record)
81 if err != nil {
82 return fmt.Errorf("failed to parse community profile: %w", err)
83 }
84
85 // Build AT-URI for this record
86 // V2 Architecture (ONLY):
87 // - 'did' parameter IS the community DID (community owns its own repo)
88 // - rkey MUST be "self" for community profiles
89 // - URI: at://community_did/social.coves.community.profile/self
90
91 // REJECT non-V2 communities (pre-production: no V1 compatibility)
92 if commit.RKey != "self" {
93 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
94 }
95
96 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did)
97
98 // V2: Community ALWAYS owns itself
99 ownerDID := did
100
101 // Create community entity
102 community := &communities.Community{
103 DID: did, // V2: Repository DID IS the community DID
104 Handle: profile.Handle,
105 Name: profile.Name,
106 DisplayName: profile.DisplayName,
107 Description: profile.Description,
108 OwnerDID: ownerDID, // V2: same as DID (self-owned)
109 CreatedByDID: profile.CreatedBy,
110 HostedByDID: profile.HostedBy,
111 Visibility: profile.Visibility,
112 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery,
113 ModerationType: profile.ModerationType,
114 ContentWarnings: profile.ContentWarnings,
115 MemberCount: profile.MemberCount,
116 SubscriberCount: profile.SubscriberCount,
117 FederatedFrom: profile.FederatedFrom,
118 FederatedID: profile.FederatedID,
119 CreatedAt: profile.CreatedAt,
120 UpdatedAt: time.Now(),
121 RecordURI: uri,
122 RecordCID: commit.CID,
123 }
124
125 // Handle blobs (avatar/banner) if present
126 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
127 community.AvatarCID = avatarCID
128 }
129 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
130 community.BannerCID = bannerCID
131 }
132
133 // Handle description facets (rich text)
134 if profile.DescriptionFacets != nil {
135 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
136 if marshalErr == nil {
137 community.DescriptionFacets = facetsJSON
138 }
139 }
140
141 // Index in AppView database
142 _, err = c.repo.Create(ctx, community)
143 if err != nil {
144 // Check if it already exists (idempotency)
145 if communities.IsConflict(err) {
146 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID)
147 return nil
148 }
149 return fmt.Errorf("failed to index community: %w", err)
150 }
151
152 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID)
153 return nil
154}
155
156// updateCommunity updates an existing community from the firehose
157func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error {
158 if commit.Record == nil {
159 return fmt.Errorf("community profile update event missing record data")
160 }
161
162 // REJECT non-V2 communities (pre-production: no V1 compatibility)
163 if commit.RKey != "self" {
164 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
165 }
166
167 // Parse profile
168 profile, err := parseCommunityProfile(commit.Record)
169 if err != nil {
170 return fmt.Errorf("failed to parse community profile: %w", err)
171 }
172
173 // V2: Repository DID IS the community DID
174 // Get existing community using the repo DID
175 existing, err := c.repo.GetByDID(ctx, did)
176 if err != nil {
177 if communities.IsNotFound(err) {
178 // Community doesn't exist yet - treat as create
179 log.Printf("Community not found for update, creating: %s", did)
180 return c.createCommunity(ctx, did, commit)
181 }
182 return fmt.Errorf("failed to get existing community: %w", err)
183 }
184
185 // Update fields
186 existing.Handle = profile.Handle
187 existing.Name = profile.Name
188 existing.DisplayName = profile.DisplayName
189 existing.Description = profile.Description
190 existing.Visibility = profile.Visibility
191 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery
192 existing.ModerationType = profile.ModerationType
193 existing.ContentWarnings = profile.ContentWarnings
194 existing.RecordCID = commit.CID
195
196 // Update blobs
197 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
198 existing.AvatarCID = avatarCID
199 }
200 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
201 existing.BannerCID = bannerCID
202 }
203
204 // Update description facets
205 if profile.DescriptionFacets != nil {
206 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
207 if marshalErr == nil {
208 existing.DescriptionFacets = facetsJSON
209 }
210 }
211
212 // Save updates
213 _, err = c.repo.Update(ctx, existing)
214 if err != nil {
215 return fmt.Errorf("failed to update community: %w", err)
216 }
217
218 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID)
219 return nil
220}
221
222// deleteCommunity removes a community from the index
223func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error {
224 err := c.repo.Delete(ctx, did)
225 if err != nil {
226 if communities.IsNotFound(err) {
227 log.Printf("Community already deleted: %s", did)
228 return nil
229 }
230 return fmt.Errorf("failed to delete community: %w", err)
231 }
232
233 log.Printf("Deleted community: %s", did)
234 return nil
235}
236
237// handleSubscription processes subscription create/delete events
238// CREATE operation = user subscribed to community
239// DELETE operation = user unsubscribed from community
240func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
241 switch commit.Operation {
242 case "create":
243 return c.createSubscription(ctx, userDID, commit)
244 case "delete":
245 return c.deleteSubscription(ctx, userDID, commit)
246 default:
247 // Update operations shouldn't happen on subscriptions, but ignore gracefully
248 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)",
249 commit.Operation, userDID, commit.RKey)
250 return nil
251 }
252}
253
254// createSubscription indexes a new subscription with retry logic
255func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
256 if commit.Record == nil {
257 return fmt.Errorf("subscription create event missing record data")
258 }
259
260 // Extract community DID from record's subject field (following atProto conventions)
261 communityDID, ok := commit.Record["subject"].(string)
262 if !ok {
263 return fmt.Errorf("subscription record missing subject field")
264 }
265
266 // Extract contentVisibility with clamping and default value
267 contentVisibility := extractContentVisibility(commit.Record)
268
269 // Build AT-URI for subscription record
270 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint
271 // The record lives in the USER's repository, but uses the communities namespace
272 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
273
274 // Create subscription entity
275 // Parse createdAt from record to preserve chronological ordering during replays
276 subscription := &communities.Subscription{
277 UserDID: userDID,
278 CommunityDID: communityDID,
279 ContentVisibility: contentVisibility,
280 SubscribedAt: utils.ParseCreatedAt(commit.Record),
281 RecordURI: uri,
282 RecordCID: commit.CID,
283 }
284
285 // Use transactional method to ensure subscription and count are atomically updated
286 // This is idempotent - safe for Jetstream replays
287 _, err := c.repo.SubscribeWithCount(ctx, subscription)
288 if err != nil {
289 // If already exists, that's fine (idempotency)
290 if communities.IsConflict(err) {
291 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)",
292 userDID, communityDID, contentVisibility)
293 return nil
294 }
295 return fmt.Errorf("failed to index subscription: %w", err)
296 }
297
298 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)",
299 userDID, communityDID, contentVisibility)
300 return nil
301}
302
303// deleteSubscription removes a subscription from the index
304// DELETE operations don't include record data, so we need to look up the subscription
305// by its URI to find which community the user unsubscribed from
306func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
307 // Build AT-URI from the rkey
308 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
309
310 // Look up the subscription to get the community DID
311 // (DELETE operations don't include record data in Jetstream)
312 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri)
313 if err != nil {
314 if communities.IsNotFound(err) {
315 // Already deleted - this is fine (idempotency)
316 log.Printf("Subscription already deleted: %s", uri)
317 return nil
318 }
319 return fmt.Errorf("failed to find subscription for deletion: %w", err)
320 }
321
322 // Use transactional method to ensure unsubscribe and count are atomically updated
323 // This is idempotent - safe for Jetstream replays
324 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID)
325 if err != nil {
326 if communities.IsNotFound(err) {
327 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID)
328 return nil
329 }
330 return fmt.Errorf("failed to remove subscription: %w", err)
331 }
332
333 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID)
334 return nil
335}
336
337// handleBlock processes block create/delete events
338// CREATE operation = user blocked a community
339// DELETE operation = user unblocked a community
340func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
341 switch commit.Operation {
342 case "create":
343 return c.createBlock(ctx, userDID, commit)
344 case "delete":
345 return c.deleteBlock(ctx, userDID, commit)
346 default:
347 // Update operations shouldn't happen on blocks, but ignore gracefully
348 log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)",
349 commit.Operation, userDID, commit.RKey)
350 return nil
351 }
352}
353
354// createBlock indexes a new block
355func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
356 if commit.Record == nil {
357 return fmt.Errorf("block create event missing record data")
358 }
359
360 // Extract community DID from record's subject field (following atProto conventions)
361 communityDID, ok := commit.Record["subject"].(string)
362 if !ok {
363 return fmt.Errorf("block record missing subject field")
364 }
365
366 // Build AT-URI for block record
367 // The record lives in the USER's repository
368 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
369
370 // Create block entity
371 // Parse createdAt from record to preserve chronological ordering during replays
372 block := &communities.CommunityBlock{
373 UserDID: userDID,
374 CommunityDID: communityDID,
375 BlockedAt: utils.ParseCreatedAt(commit.Record),
376 RecordURI: uri,
377 RecordCID: commit.CID,
378 }
379
380 // Index the block
381 // This is idempotent - safe for Jetstream replays
382 _, err := c.repo.BlockCommunity(ctx, block)
383 if err != nil {
384 // If already exists, that's fine (idempotency)
385 if communities.IsConflict(err) {
386 log.Printf("Block already indexed: %s -> %s", userDID, communityDID)
387 return nil
388 }
389 return fmt.Errorf("failed to index block: %w", err)
390 }
391
392 log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID)
393 return nil
394}
395
396// deleteBlock removes a block from the index
397// DELETE operations don't include record data, so we need to look up the block
398// by its URI to find which community the user unblocked
399func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
400 // Build AT-URI from the rkey
401 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
402
403 // Look up the block to get the community DID
404 // (DELETE operations don't include record data in Jetstream)
405 block, err := c.repo.GetBlockByURI(ctx, uri)
406 if err != nil {
407 if communities.IsNotFound(err) {
408 // Already deleted - this is fine (idempotency)
409 log.Printf("Block already deleted: %s", uri)
410 return nil
411 }
412 return fmt.Errorf("failed to find block for deletion: %w", err)
413 }
414
415 // Remove the block from the index
416 err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID)
417 if err != nil {
418 if communities.IsNotFound(err) {
419 log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID)
420 return nil
421 }
422 return fmt.Errorf("failed to remove block: %w", err)
423 }
424
425 log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID)
426 return nil
427}
428
429// Helper types and functions
430
431type CommunityProfile struct {
432 CreatedAt time.Time `json:"createdAt"`
433 Avatar map[string]interface{} `json:"avatar"`
434 Banner map[string]interface{} `json:"banner"`
435 CreatedBy string `json:"createdBy"`
436 Visibility string `json:"visibility"`
437 AtprotoHandle string `json:"atprotoHandle"`
438 DisplayName string `json:"displayName"`
439 Name string `json:"name"`
440 Handle string `json:"handle"`
441 HostedBy string `json:"hostedBy"`
442 Description string `json:"description"`
443 FederatedID string `json:"federatedId"`
444 ModerationType string `json:"moderationType"`
445 FederatedFrom string `json:"federatedFrom"`
446 ContentWarnings []string `json:"contentWarnings"`
447 DescriptionFacets []interface{} `json:"descriptionFacets"`
448 MemberCount int `json:"memberCount"`
449 SubscriberCount int `json:"subscriberCount"`
450 Federation FederationConfig `json:"federation"`
451}
452
453type FederationConfig struct {
454 AllowExternalDiscovery bool `json:"allowExternalDiscovery"`
455}
456
457// parseCommunityProfile converts a raw record map to a CommunityProfile
458func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) {
459 recordJSON, err := json.Marshal(record)
460 if err != nil {
461 return nil, fmt.Errorf("failed to marshal record: %w", err)
462 }
463
464 var profile CommunityProfile
465 if err := json.Unmarshal(recordJSON, &profile); err != nil {
466 return nil, fmt.Errorf("failed to unmarshal profile: %w", err)
467 }
468
469 return &profile, nil
470}
471
472// extractContentVisibility extracts contentVisibility from subscription record with clamping
473// Returns default value of 3 if missing or invalid
474func extractContentVisibility(record map[string]interface{}) int {
475 const defaultVisibility = 3
476
477 cv, ok := record["contentVisibility"]
478 if !ok {
479 // Field missing - use default
480 return defaultVisibility
481 }
482
483 // JSON numbers decode as float64
484 cvFloat, ok := cv.(float64)
485 if !ok {
486 // Try int (shouldn't happen but handle gracefully)
487 if cvInt, isInt := cv.(int); isInt {
488 return clampContentVisibility(cvInt)
489 }
490 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv)
491 return defaultVisibility
492 }
493
494 // Convert and clamp
495 clamped := clampContentVisibility(int(cvFloat))
496 if clamped != int(cvFloat) {
497 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped)
498 }
499 return clamped
500}
501
502// clampContentVisibility ensures value is within valid range (1-5)
503func clampContentVisibility(value int) int {
504 if value < 1 {
505 return 1
506 }
507 if value > 5 {
508 return 5
509 }
510 return value
511}
512
513// extractBlobCID extracts the CID from a blob reference
514// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123}
515func extractBlobCID(blob map[string]interface{}) (string, bool) {
516 if blob == nil {
517 return "", false
518 }
519
520 // Check if it's a blob type
521 blobType, ok := blob["$type"].(string)
522 if !ok || blobType != "blob" {
523 return "", false
524 }
525
526 // Extract ref
527 ref, ok := blob["ref"].(map[string]interface{})
528 if !ok {
529 return "", false
530 }
531
532 // Extract $link (the CID)
533 link, ok := ref["$link"].(string)
534 if !ok {
535 return "", false
536 }
537
538 return link, true
539}