A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/core/communities"
5 "context"
6 "encoding/json"
7 "fmt"
8 "log"
9 "time"
10)
11
12// CommunityEventConsumer consumes community-related events from Jetstream
13type CommunityEventConsumer struct {
14 repo communities.Repository
15}
16
17// NewCommunityEventConsumer creates a new Jetstream consumer for community events
18func NewCommunityEventConsumer(repo communities.Repository) *CommunityEventConsumer {
19 return &CommunityEventConsumer{
20 repo: repo,
21 }
22}
23
24// HandleEvent processes a Jetstream event for community records
25// This is called by the main Jetstream consumer when it receives commit events
26func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
27 // We only care about commit events for community records
28 if event.Kind != "commit" || event.Commit == nil {
29 return nil
30 }
31
32 commit := event.Commit
33
34 // Route to appropriate handler based on collection
35 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures
36 // - social.coves.community.profile: Community profile records (in community's own repo)
37 // - social.coves.community.subscription: Subscription records (in user's repo)
38 //
39 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints
40 // that CREATE or DELETE records in these collections
41 switch commit.Collection {
42 case "social.coves.community.profile":
43 return c.handleCommunityProfile(ctx, event.Did, commit)
44 case "social.coves.community.subscription":
45 // Handle both create (subscribe) and delete (unsubscribe) operations
46 return c.handleSubscription(ctx, event.Did, commit)
47 default:
48 // Not a community-related collection
49 return nil
50 }
51}
52
53// handleCommunityProfile processes community profile create/update/delete events
54func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error {
55 switch commit.Operation {
56 case "create":
57 return c.createCommunity(ctx, did, commit)
58 case "update":
59 return c.updateCommunity(ctx, did, commit)
60 case "delete":
61 return c.deleteCommunity(ctx, did)
62 default:
63 log.Printf("Unknown operation for community profile: %s", commit.Operation)
64 return nil
65 }
66}
67
68// createCommunity indexes a new community from the firehose
69func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error {
70 if commit.Record == nil {
71 return fmt.Errorf("community profile create event missing record data")
72 }
73
74 // Parse the community profile record
75 profile, err := parseCommunityProfile(commit.Record)
76 if err != nil {
77 return fmt.Errorf("failed to parse community profile: %w", err)
78 }
79
80 // Build AT-URI for this record
81 // V2 Architecture (ONLY):
82 // - 'did' parameter IS the community DID (community owns its own repo)
83 // - rkey MUST be "self" for community profiles
84 // - URI: at://community_did/social.coves.community.profile/self
85
86 // REJECT non-V2 communities (pre-production: no V1 compatibility)
87 if commit.RKey != "self" {
88 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
89 }
90
91 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did)
92
93 // V2: Community ALWAYS owns itself
94 ownerDID := did
95
96 // Create community entity
97 community := &communities.Community{
98 DID: did, // V2: Repository DID IS the community DID
99 Handle: profile.Handle,
100 Name: profile.Name,
101 DisplayName: profile.DisplayName,
102 Description: profile.Description,
103 OwnerDID: ownerDID, // V2: same as DID (self-owned)
104 CreatedByDID: profile.CreatedBy,
105 HostedByDID: profile.HostedBy,
106 Visibility: profile.Visibility,
107 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery,
108 ModerationType: profile.ModerationType,
109 ContentWarnings: profile.ContentWarnings,
110 MemberCount: profile.MemberCount,
111 SubscriberCount: profile.SubscriberCount,
112 FederatedFrom: profile.FederatedFrom,
113 FederatedID: profile.FederatedID,
114 CreatedAt: profile.CreatedAt,
115 UpdatedAt: time.Now(),
116 RecordURI: uri,
117 RecordCID: commit.CID,
118 }
119
120 // Handle blobs (avatar/banner) if present
121 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
122 community.AvatarCID = avatarCID
123 }
124 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
125 community.BannerCID = bannerCID
126 }
127
128 // Handle description facets (rich text)
129 if profile.DescriptionFacets != nil {
130 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
131 if marshalErr == nil {
132 community.DescriptionFacets = facetsJSON
133 }
134 }
135
136 // Index in AppView database
137 _, err = c.repo.Create(ctx, community)
138 if err != nil {
139 // Check if it already exists (idempotency)
140 if communities.IsConflict(err) {
141 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID)
142 return nil
143 }
144 return fmt.Errorf("failed to index community: %w", err)
145 }
146
147 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID)
148 return nil
149}
150
151// updateCommunity updates an existing community from the firehose
152func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error {
153 if commit.Record == nil {
154 return fmt.Errorf("community profile update event missing record data")
155 }
156
157 // REJECT non-V2 communities (pre-production: no V1 compatibility)
158 if commit.RKey != "self" {
159 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
160 }
161
162 // Parse profile
163 profile, err := parseCommunityProfile(commit.Record)
164 if err != nil {
165 return fmt.Errorf("failed to parse community profile: %w", err)
166 }
167
168 // V2: Repository DID IS the community DID
169 // Get existing community using the repo DID
170 existing, err := c.repo.GetByDID(ctx, did)
171 if err != nil {
172 if communities.IsNotFound(err) {
173 // Community doesn't exist yet - treat as create
174 log.Printf("Community not found for update, creating: %s", did)
175 return c.createCommunity(ctx, did, commit)
176 }
177 return fmt.Errorf("failed to get existing community: %w", err)
178 }
179
180 // Update fields
181 existing.Handle = profile.Handle
182 existing.Name = profile.Name
183 existing.DisplayName = profile.DisplayName
184 existing.Description = profile.Description
185 existing.Visibility = profile.Visibility
186 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery
187 existing.ModerationType = profile.ModerationType
188 existing.ContentWarnings = profile.ContentWarnings
189 existing.RecordCID = commit.CID
190
191 // Update blobs
192 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
193 existing.AvatarCID = avatarCID
194 }
195 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
196 existing.BannerCID = bannerCID
197 }
198
199 // Update description facets
200 if profile.DescriptionFacets != nil {
201 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
202 if marshalErr == nil {
203 existing.DescriptionFacets = facetsJSON
204 }
205 }
206
207 // Save updates
208 _, err = c.repo.Update(ctx, existing)
209 if err != nil {
210 return fmt.Errorf("failed to update community: %w", err)
211 }
212
213 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID)
214 return nil
215}
216
217// deleteCommunity removes a community from the index
218func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error {
219 err := c.repo.Delete(ctx, did)
220 if err != nil {
221 if communities.IsNotFound(err) {
222 log.Printf("Community already deleted: %s", did)
223 return nil
224 }
225 return fmt.Errorf("failed to delete community: %w", err)
226 }
227
228 log.Printf("Deleted community: %s", did)
229 return nil
230}
231
232// handleSubscription processes subscription create/delete events
233// CREATE operation = user subscribed to community
234// DELETE operation = user unsubscribed from community
235func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
236 switch commit.Operation {
237 case "create":
238 return c.createSubscription(ctx, userDID, commit)
239 case "delete":
240 return c.deleteSubscription(ctx, userDID, commit)
241 default:
242 // Update operations shouldn't happen on subscriptions, but ignore gracefully
243 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)",
244 commit.Operation, userDID, commit.RKey)
245 return nil
246 }
247}
248
249// createSubscription indexes a new subscription with retry logic
250func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
251 if commit.Record == nil {
252 return fmt.Errorf("subscription create event missing record data")
253 }
254
255 // Extract community DID from record's subject field (following atProto conventions)
256 communityDID, ok := commit.Record["subject"].(string)
257 if !ok {
258 return fmt.Errorf("subscription record missing subject field")
259 }
260
261 // Extract contentVisibility with clamping and default value
262 contentVisibility := extractContentVisibility(commit.Record)
263
264 // Build AT-URI for subscription record
265 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint
266 // The record lives in the USER's repository, but uses the communities namespace
267 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
268
269 // Create subscription entity
270 subscription := &communities.Subscription{
271 UserDID: userDID,
272 CommunityDID: communityDID,
273 ContentVisibility: contentVisibility,
274 SubscribedAt: time.Now(),
275 RecordURI: uri,
276 RecordCID: commit.CID,
277 }
278
279 // Use transactional method to ensure subscription and count are atomically updated
280 // This is idempotent - safe for Jetstream replays
281 _, err := c.repo.SubscribeWithCount(ctx, subscription)
282 if err != nil {
283 // If already exists, that's fine (idempotency)
284 if communities.IsConflict(err) {
285 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)",
286 userDID, communityDID, contentVisibility)
287 return nil
288 }
289 return fmt.Errorf("failed to index subscription: %w", err)
290 }
291
292 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)",
293 userDID, communityDID, contentVisibility)
294 return nil
295}
296
297// deleteSubscription removes a subscription from the index
298// DELETE operations don't include record data, so we need to look up the subscription
299// by its URI to find which community the user unsubscribed from
300func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
301 // Build AT-URI from the rkey
302 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
303
304 // Look up the subscription to get the community DID
305 // (DELETE operations don't include record data in Jetstream)
306 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri)
307 if err != nil {
308 if communities.IsNotFound(err) {
309 // Already deleted - this is fine (idempotency)
310 log.Printf("Subscription already deleted: %s", uri)
311 return nil
312 }
313 return fmt.Errorf("failed to find subscription for deletion: %w", err)
314 }
315
316 // Use transactional method to ensure unsubscribe and count are atomically updated
317 // This is idempotent - safe for Jetstream replays
318 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID)
319 if err != nil {
320 if communities.IsNotFound(err) {
321 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID)
322 return nil
323 }
324 return fmt.Errorf("failed to remove subscription: %w", err)
325 }
326
327 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID)
328 return nil
329}
330
331// Helper types and functions
332
333type CommunityProfile struct {
334 CreatedAt time.Time `json:"createdAt"`
335 Avatar map[string]interface{} `json:"avatar"`
336 Banner map[string]interface{} `json:"banner"`
337 CreatedBy string `json:"createdBy"`
338 Visibility string `json:"visibility"`
339 AtprotoHandle string `json:"atprotoHandle"`
340 DisplayName string `json:"displayName"`
341 Name string `json:"name"`
342 Handle string `json:"handle"`
343 HostedBy string `json:"hostedBy"`
344 Description string `json:"description"`
345 FederatedID string `json:"federatedId"`
346 ModerationType string `json:"moderationType"`
347 FederatedFrom string `json:"federatedFrom"`
348 ContentWarnings []string `json:"contentWarnings"`
349 DescriptionFacets []interface{} `json:"descriptionFacets"`
350 MemberCount int `json:"memberCount"`
351 SubscriberCount int `json:"subscriberCount"`
352 Federation FederationConfig `json:"federation"`
353}
354
355type FederationConfig struct {
356 AllowExternalDiscovery bool `json:"allowExternalDiscovery"`
357}
358
359// parseCommunityProfile converts a raw record map to a CommunityProfile
360func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) {
361 recordJSON, err := json.Marshal(record)
362 if err != nil {
363 return nil, fmt.Errorf("failed to marshal record: %w", err)
364 }
365
366 var profile CommunityProfile
367 if err := json.Unmarshal(recordJSON, &profile); err != nil {
368 return nil, fmt.Errorf("failed to unmarshal profile: %w", err)
369 }
370
371 return &profile, nil
372}
373
374// extractContentVisibility extracts contentVisibility from subscription record with clamping
375// Returns default value of 3 if missing or invalid
376func extractContentVisibility(record map[string]interface{}) int {
377 const defaultVisibility = 3
378
379 cv, ok := record["contentVisibility"]
380 if !ok {
381 // Field missing - use default
382 return defaultVisibility
383 }
384
385 // JSON numbers decode as float64
386 cvFloat, ok := cv.(float64)
387 if !ok {
388 // Try int (shouldn't happen but handle gracefully)
389 if cvInt, isInt := cv.(int); isInt {
390 return clampContentVisibility(cvInt)
391 }
392 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv)
393 return defaultVisibility
394 }
395
396 // Convert and clamp
397 clamped := clampContentVisibility(int(cvFloat))
398 if clamped != int(cvFloat) {
399 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped)
400 }
401 return clamped
402}
403
404// clampContentVisibility ensures value is within valid range (1-5)
405func clampContentVisibility(value int) int {
406 if value < 1 {
407 return 1
408 }
409 if value > 5 {
410 return 5
411 }
412 return value
413}
414
415// extractBlobCID extracts the CID from a blob reference
416// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123}
417func extractBlobCID(blob map[string]interface{}) (string, bool) {
418 if blob == nil {
419 return "", false
420 }
421
422 // Check if it's a blob type
423 blobType, ok := blob["$type"].(string)
424 if !ok || blobType != "blob" {
425 return "", false
426 }
427
428 // Extract ref
429 ref, ok := blob["ref"].(map[string]interface{})
430 if !ok {
431 return "", false
432 }
433
434 // Extract $link (the CID)
435 link, ok := ref["$link"].(string)
436 if !ok {
437 return "", false
438 }
439
440 return link, true
441}