A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/core/communities"
5 "context"
6 "encoding/json"
7 "fmt"
8 "log"
9 "time"
10)
11
12// CommunityEventConsumer consumes community-related events from Jetstream
13type CommunityEventConsumer struct {
14 repo communities.Repository
15}
16
17// NewCommunityEventConsumer creates a new Jetstream consumer for community events
18func NewCommunityEventConsumer(repo communities.Repository) *CommunityEventConsumer {
19 return &CommunityEventConsumer{
20 repo: repo,
21 }
22}
23
24// HandleEvent processes a Jetstream event for community records
25// This is called by the main Jetstream consumer when it receives commit events
26func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
27 // We only care about commit events for community records
28 if event.Kind != "commit" || event.Commit == nil {
29 return nil
30 }
31
32 commit := event.Commit
33
34 // Route to appropriate handler based on collection
35 switch commit.Collection {
36 case "social.coves.community.profile":
37 return c.handleCommunityProfile(ctx, event.Did, commit)
38 case "social.coves.community.subscribe":
39 return c.handleSubscription(ctx, event.Did, commit)
40 case "social.coves.community.unsubscribe":
41 return c.handleUnsubscribe(ctx, event.Did, commit)
42 default:
43 // Not a community-related collection
44 return nil
45 }
46}
47
48// handleCommunityProfile processes community profile create/update/delete events
49func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error {
50 switch commit.Operation {
51 case "create":
52 return c.createCommunity(ctx, did, commit)
53 case "update":
54 return c.updateCommunity(ctx, did, commit)
55 case "delete":
56 return c.deleteCommunity(ctx, did)
57 default:
58 log.Printf("Unknown operation for community profile: %s", commit.Operation)
59 return nil
60 }
61}
62
63// createCommunity indexes a new community from the firehose
64func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error {
65 if commit.Record == nil {
66 return fmt.Errorf("community profile create event missing record data")
67 }
68
69 // Parse the community profile record
70 profile, err := parseCommunityProfile(commit.Record)
71 if err != nil {
72 return fmt.Errorf("failed to parse community profile: %w", err)
73 }
74
75 // Build AT-URI for this record
76 // V2 Architecture (ONLY):
77 // - 'did' parameter IS the community DID (community owns its own repo)
78 // - rkey MUST be "self" for community profiles
79 // - URI: at://community_did/social.coves.community.profile/self
80
81 // REJECT non-V2 communities (pre-production: no V1 compatibility)
82 if commit.RKey != "self" {
83 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
84 }
85
86 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did)
87
88 // V2: Community ALWAYS owns itself
89 ownerDID := did
90
91 // Create community entity
92 community := &communities.Community{
93 DID: did, // V2: Repository DID IS the community DID
94 Handle: profile.Handle,
95 Name: profile.Name,
96 DisplayName: profile.DisplayName,
97 Description: profile.Description,
98 OwnerDID: ownerDID, // V2: same as DID (self-owned)
99 CreatedByDID: profile.CreatedBy,
100 HostedByDID: profile.HostedBy,
101 Visibility: profile.Visibility,
102 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery,
103 ModerationType: profile.ModerationType,
104 ContentWarnings: profile.ContentWarnings,
105 MemberCount: profile.MemberCount,
106 SubscriberCount: profile.SubscriberCount,
107 FederatedFrom: profile.FederatedFrom,
108 FederatedID: profile.FederatedID,
109 CreatedAt: profile.CreatedAt,
110 UpdatedAt: time.Now(),
111 RecordURI: uri,
112 RecordCID: commit.CID,
113 }
114
115 // Handle blobs (avatar/banner) if present
116 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
117 community.AvatarCID = avatarCID
118 }
119 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
120 community.BannerCID = bannerCID
121 }
122
123 // Handle description facets (rich text)
124 if profile.DescriptionFacets != nil {
125 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
126 if marshalErr == nil {
127 community.DescriptionFacets = facetsJSON
128 }
129 }
130
131 // Index in AppView database
132 _, err = c.repo.Create(ctx, community)
133 if err != nil {
134 // Check if it already exists (idempotency)
135 if communities.IsConflict(err) {
136 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID)
137 return nil
138 }
139 return fmt.Errorf("failed to index community: %w", err)
140 }
141
142 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID)
143 return nil
144}
145
146// updateCommunity updates an existing community from the firehose
147func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error {
148 if commit.Record == nil {
149 return fmt.Errorf("community profile update event missing record data")
150 }
151
152 // REJECT non-V2 communities (pre-production: no V1 compatibility)
153 if commit.RKey != "self" {
154 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
155 }
156
157 // Parse profile
158 profile, err := parseCommunityProfile(commit.Record)
159 if err != nil {
160 return fmt.Errorf("failed to parse community profile: %w", err)
161 }
162
163 // V2: Repository DID IS the community DID
164 // Get existing community using the repo DID
165 existing, err := c.repo.GetByDID(ctx, did)
166 if err != nil {
167 if communities.IsNotFound(err) {
168 // Community doesn't exist yet - treat as create
169 log.Printf("Community not found for update, creating: %s", did)
170 return c.createCommunity(ctx, did, commit)
171 }
172 return fmt.Errorf("failed to get existing community: %w", err)
173 }
174
175 // Update fields
176 existing.Handle = profile.Handle
177 existing.Name = profile.Name
178 existing.DisplayName = profile.DisplayName
179 existing.Description = profile.Description
180 existing.Visibility = profile.Visibility
181 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery
182 existing.ModerationType = profile.ModerationType
183 existing.ContentWarnings = profile.ContentWarnings
184 existing.RecordCID = commit.CID
185
186 // Update blobs
187 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
188 existing.AvatarCID = avatarCID
189 }
190 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
191 existing.BannerCID = bannerCID
192 }
193
194 // Update description facets
195 if profile.DescriptionFacets != nil {
196 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
197 if marshalErr == nil {
198 existing.DescriptionFacets = facetsJSON
199 }
200 }
201
202 // Save updates
203 _, err = c.repo.Update(ctx, existing)
204 if err != nil {
205 return fmt.Errorf("failed to update community: %w", err)
206 }
207
208 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID)
209 return nil
210}
211
212// deleteCommunity removes a community from the index
213func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error {
214 err := c.repo.Delete(ctx, did)
215 if err != nil {
216 if communities.IsNotFound(err) {
217 log.Printf("Community already deleted: %s", did)
218 return nil
219 }
220 return fmt.Errorf("failed to delete community: %w", err)
221 }
222
223 log.Printf("Deleted community: %s", did)
224 return nil
225}
226
227// handleSubscription indexes a subscription event
228func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
229 if commit.Operation != "create" {
230 return nil // Subscriptions are only created, not updated
231 }
232
233 if commit.Record == nil {
234 return fmt.Errorf("subscription event missing record data")
235 }
236
237 // Extract community DID from record
238 communityDID, ok := commit.Record["community"].(string)
239 if !ok {
240 return fmt.Errorf("subscription record missing community field")
241 }
242
243 // Build AT-URI for subscription record
244 uri := fmt.Sprintf("at://%s/social.coves.community.subscribe/%s", userDID, commit.RKey)
245
246 // Create subscription
247 subscription := &communities.Subscription{
248 UserDID: userDID,
249 CommunityDID: communityDID,
250 SubscribedAt: time.Now(),
251 RecordURI: uri,
252 RecordCID: commit.CID,
253 }
254
255 // Use transactional method to ensure subscription and count are atomically updated
256 // This is idempotent - safe for Jetstream replays
257 _, err := c.repo.SubscribeWithCount(ctx, subscription)
258 if err != nil {
259 return fmt.Errorf("failed to index subscription: %w", err)
260 }
261
262 log.Printf("Indexed subscription: %s -> %s", userDID, communityDID)
263 return nil
264}
265
266// handleUnsubscribe removes a subscription
267func (c *CommunityEventConsumer) handleUnsubscribe(ctx context.Context, userDID string, commit *CommitEvent) error {
268 if commit.Operation != "delete" {
269 return nil
270 }
271
272 // For unsubscribe, we need to extract the community DID from the record key or metadata
273 // This might need adjustment based on actual Jetstream structure
274 if commit.Record == nil {
275 return fmt.Errorf("unsubscribe event missing record data")
276 }
277
278 communityDID, ok := commit.Record["community"].(string)
279 if !ok {
280 return fmt.Errorf("unsubscribe record missing community field")
281 }
282
283 // Use transactional method to ensure unsubscribe and count are atomically updated
284 // This is idempotent - safe for Jetstream replays
285 err := c.repo.UnsubscribeWithCount(ctx, userDID, communityDID)
286 if err != nil {
287 return fmt.Errorf("failed to remove subscription: %w", err)
288 }
289
290 log.Printf("Removed subscription: %s -> %s", userDID, communityDID)
291 return nil
292}
293
294// Helper types and functions
295
296type CommunityProfile struct {
297 CreatedAt time.Time `json:"createdAt"`
298 Avatar map[string]interface{} `json:"avatar"`
299 Banner map[string]interface{} `json:"banner"`
300 CreatedBy string `json:"createdBy"`
301 Visibility string `json:"visibility"`
302 AtprotoHandle string `json:"atprotoHandle"`
303 DisplayName string `json:"displayName"`
304 Name string `json:"name"`
305 Handle string `json:"handle"`
306 HostedBy string `json:"hostedBy"`
307 Description string `json:"description"`
308 FederatedID string `json:"federatedId"`
309 ModerationType string `json:"moderationType"`
310 FederatedFrom string `json:"federatedFrom"`
311 ContentWarnings []string `json:"contentWarnings"`
312 DescriptionFacets []interface{} `json:"descriptionFacets"`
313 MemberCount int `json:"memberCount"`
314 SubscriberCount int `json:"subscriberCount"`
315 Federation FederationConfig `json:"federation"`
316}
317
318type FederationConfig struct {
319 AllowExternalDiscovery bool `json:"allowExternalDiscovery"`
320}
321
322// parseCommunityProfile converts a raw record map to a CommunityProfile
323func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) {
324 recordJSON, err := json.Marshal(record)
325 if err != nil {
326 return nil, fmt.Errorf("failed to marshal record: %w", err)
327 }
328
329 var profile CommunityProfile
330 if err := json.Unmarshal(recordJSON, &profile); err != nil {
331 return nil, fmt.Errorf("failed to unmarshal profile: %w", err)
332 }
333
334 return &profile, nil
335}
336
337// extractBlobCID extracts the CID from a blob reference
338// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123}
339func extractBlobCID(blob map[string]interface{}) (string, bool) {
340 if blob == nil {
341 return "", false
342 }
343
344 // Check if it's a blob type
345 blobType, ok := blob["$type"].(string)
346 if !ok || blobType != "blob" {
347 return "", false
348 }
349
350 // Extract ref
351 ref, ok := blob["ref"].(map[string]interface{})
352 if !ok {
353 return "", false
354 }
355
356 // Extract $link (the CID)
357 link, ok := ref["$link"].(string)
358 if !ok {
359 return "", false
360 }
361
362 return link, true
363}