A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "strings"
9 "time"
10
11 "Coves/internal/core/communities"
12)
13
14// CommunityEventConsumer consumes community-related events from Jetstream
15type CommunityEventConsumer struct {
16 repo communities.Repository
17}
18
19// NewCommunityEventConsumer creates a new Jetstream consumer for community events
20func NewCommunityEventConsumer(repo communities.Repository) *CommunityEventConsumer {
21 return &CommunityEventConsumer{
22 repo: repo,
23 }
24}
25
26// HandleEvent processes a Jetstream event for community records
27// This is called by the main Jetstream consumer when it receives commit events
28func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
29 // We only care about commit events for community records
30 if event.Kind != "commit" || event.Commit == nil {
31 return nil
32 }
33
34 commit := event.Commit
35
36 // Route to appropriate handler based on collection
37 switch commit.Collection {
38 case "social.coves.community.profile":
39 return c.handleCommunityProfile(ctx, event.Did, commit)
40 case "social.coves.community.subscribe":
41 return c.handleSubscription(ctx, event.Did, commit)
42 case "social.coves.community.unsubscribe":
43 return c.handleUnsubscribe(ctx, event.Did, commit)
44 default:
45 // Not a community-related collection
46 return nil
47 }
48}
49
50// handleCommunityProfile processes community profile create/update/delete events
51func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error {
52 switch commit.Operation {
53 case "create":
54 return c.createCommunity(ctx, did, commit)
55 case "update":
56 return c.updateCommunity(ctx, did, commit)
57 case "delete":
58 return c.deleteCommunity(ctx, did)
59 default:
60 log.Printf("Unknown operation for community profile: %s", commit.Operation)
61 return nil
62 }
63}
64
65// createCommunity indexes a new community from the firehose
66func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error {
67 if commit.Record == nil {
68 return fmt.Errorf("community profile create event missing record data")
69 }
70
71 // Parse the community profile record
72 profile, err := parseCommunityProfile(commit.Record)
73 if err != nil {
74 return fmt.Errorf("failed to parse community profile: %w", err)
75 }
76
77 // Build AT-URI for this record
78 // IMPORTANT: 'did' parameter is the repository owner (instance DID)
79 // The community's DID comes from profile.Did field in the record
80 uri := fmt.Sprintf("at://%s/social.coves.community.profile/%s", did, commit.RKey)
81
82 // Create community entity
83 community := &communities.Community{
84 DID: profile.Did, // Community's unique DID from record, not repo owner!
85 Handle: profile.Handle,
86 Name: profile.Name,
87 DisplayName: profile.DisplayName,
88 Description: profile.Description,
89 OwnerDID: profile.Owner,
90 CreatedByDID: profile.CreatedBy,
91 HostedByDID: profile.HostedBy,
92 Visibility: profile.Visibility,
93 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery,
94 ModerationType: profile.ModerationType,
95 ContentWarnings: profile.ContentWarnings,
96 MemberCount: profile.MemberCount,
97 SubscriberCount: profile.SubscriberCount,
98 FederatedFrom: profile.FederatedFrom,
99 FederatedID: profile.FederatedID,
100 CreatedAt: profile.CreatedAt,
101 UpdatedAt: time.Now(),
102 RecordURI: uri,
103 RecordCID: commit.CID,
104 }
105
106 // Handle blobs (avatar/banner) if present
107 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
108 community.AvatarCID = avatarCID
109 }
110 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
111 community.BannerCID = bannerCID
112 }
113
114 // Handle description facets (rich text)
115 if profile.DescriptionFacets != nil {
116 facetsJSON, err := json.Marshal(profile.DescriptionFacets)
117 if err == nil {
118 community.DescriptionFacets = facetsJSON
119 }
120 }
121
122 // Index in AppView database
123 _, err = c.repo.Create(ctx, community)
124 if err != nil {
125 // Check if it already exists (idempotency)
126 if communities.IsConflict(err) {
127 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID)
128 return nil
129 }
130 return fmt.Errorf("failed to index community: %w", err)
131 }
132
133 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID)
134 return nil
135}
136
137// updateCommunity updates an existing community from the firehose
138func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error {
139 if commit.Record == nil {
140 return fmt.Errorf("community profile update event missing record data")
141 }
142
143 // Parse profile to get the community DID
144 profile, err := parseCommunityProfile(commit.Record)
145 if err != nil {
146 return fmt.Errorf("failed to parse community profile: %w", err)
147 }
148
149 // Get existing community using the community DID from the record, not repo owner
150 existing, err := c.repo.GetByDID(ctx, profile.Did)
151 if err != nil {
152 if communities.IsNotFound(err) {
153 // Community doesn't exist yet - treat as create
154 log.Printf("Community not found for update, creating: %s", profile.Did)
155 return c.createCommunity(ctx, did, commit)
156 }
157 return fmt.Errorf("failed to get existing community: %w", err)
158 }
159
160 // Update fields
161 existing.Handle = profile.Handle
162 existing.Name = profile.Name
163 existing.DisplayName = profile.DisplayName
164 existing.Description = profile.Description
165 existing.Visibility = profile.Visibility
166 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery
167 existing.ModerationType = profile.ModerationType
168 existing.ContentWarnings = profile.ContentWarnings
169 existing.RecordCID = commit.CID
170
171 // Update blobs
172 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
173 existing.AvatarCID = avatarCID
174 }
175 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
176 existing.BannerCID = bannerCID
177 }
178
179 // Update description facets
180 if profile.DescriptionFacets != nil {
181 facetsJSON, err := json.Marshal(profile.DescriptionFacets)
182 if err == nil {
183 existing.DescriptionFacets = facetsJSON
184 }
185 }
186
187 // Save updates
188 _, err = c.repo.Update(ctx, existing)
189 if err != nil {
190 return fmt.Errorf("failed to update community: %w", err)
191 }
192
193 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID)
194 return nil
195}
196
197// deleteCommunity removes a community from the index
198func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error {
199 err := c.repo.Delete(ctx, did)
200 if err != nil {
201 if communities.IsNotFound(err) {
202 log.Printf("Community already deleted: %s", did)
203 return nil
204 }
205 return fmt.Errorf("failed to delete community: %w", err)
206 }
207
208 log.Printf("Deleted community: %s", did)
209 return nil
210}
211
212// handleSubscription indexes a subscription event
213func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
214 if commit.Operation != "create" {
215 return nil // Subscriptions are only created, not updated
216 }
217
218 if commit.Record == nil {
219 return fmt.Errorf("subscription event missing record data")
220 }
221
222 // Extract community DID from record
223 communityDID, ok := commit.Record["community"].(string)
224 if !ok {
225 return fmt.Errorf("subscription record missing community field")
226 }
227
228 // Build AT-URI for subscription record
229 uri := fmt.Sprintf("at://%s/social.coves.community.subscribe/%s", userDID, commit.RKey)
230
231 // Create subscription
232 subscription := &communities.Subscription{
233 UserDID: userDID,
234 CommunityDID: communityDID,
235 SubscribedAt: time.Now(),
236 RecordURI: uri,
237 RecordCID: commit.CID,
238 }
239
240 // Use transactional method to ensure subscription and count are atomically updated
241 // This is idempotent - safe for Jetstream replays
242 _, err := c.repo.SubscribeWithCount(ctx, subscription)
243 if err != nil {
244 return fmt.Errorf("failed to index subscription: %w", err)
245 }
246
247 log.Printf("Indexed subscription: %s -> %s", userDID, communityDID)
248 return nil
249}
250
251// handleUnsubscribe removes a subscription
252func (c *CommunityEventConsumer) handleUnsubscribe(ctx context.Context, userDID string, commit *CommitEvent) error {
253 if commit.Operation != "delete" {
254 return nil
255 }
256
257 // For unsubscribe, we need to extract the community DID from the record key or metadata
258 // This might need adjustment based on actual Jetstream structure
259 if commit.Record == nil {
260 return fmt.Errorf("unsubscribe event missing record data")
261 }
262
263 communityDID, ok := commit.Record["community"].(string)
264 if !ok {
265 return fmt.Errorf("unsubscribe record missing community field")
266 }
267
268 // Use transactional method to ensure unsubscribe and count are atomically updated
269 // This is idempotent - safe for Jetstream replays
270 err := c.repo.UnsubscribeWithCount(ctx, userDID, communityDID)
271 if err != nil {
272 return fmt.Errorf("failed to remove subscription: %w", err)
273 }
274
275 log.Printf("Removed subscription: %s -> %s", userDID, communityDID)
276 return nil
277}
278
279// Helper types and functions
280
281type CommunityProfile struct {
282 Did string `json:"did"` // Community's unique DID
283 Handle string `json:"handle"`
284 Name string `json:"name"`
285 DisplayName string `json:"displayName"`
286 Description string `json:"description"`
287 DescriptionFacets []interface{} `json:"descriptionFacets"`
288 Avatar map[string]interface{} `json:"avatar"`
289 Banner map[string]interface{} `json:"banner"`
290 Owner string `json:"owner"`
291 CreatedBy string `json:"createdBy"`
292 HostedBy string `json:"hostedBy"`
293 Visibility string `json:"visibility"`
294 Federation FederationConfig `json:"federation"`
295 ModerationType string `json:"moderationType"`
296 ContentWarnings []string `json:"contentWarnings"`
297 MemberCount int `json:"memberCount"`
298 SubscriberCount int `json:"subscriberCount"`
299 FederatedFrom string `json:"federatedFrom"`
300 FederatedID string `json:"federatedId"`
301 CreatedAt time.Time `json:"createdAt"`
302}
303
304type FederationConfig struct {
305 AllowExternalDiscovery bool `json:"allowExternalDiscovery"`
306}
307
308// parseCommunityProfile converts a raw record map to a CommunityProfile
309func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) {
310 recordJSON, err := json.Marshal(record)
311 if err != nil {
312 return nil, fmt.Errorf("failed to marshal record: %w", err)
313 }
314
315 var profile CommunityProfile
316 if err := json.Unmarshal(recordJSON, &profile); err != nil {
317 return nil, fmt.Errorf("failed to unmarshal profile: %w", err)
318 }
319
320 return &profile, nil
321}
322
323// extractBlobCID extracts the CID from a blob reference
324// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123}
325func extractBlobCID(blob map[string]interface{}) (string, bool) {
326 if blob == nil {
327 return "", false
328 }
329
330 // Check if it's a blob type
331 blobType, ok := blob["$type"].(string)
332 if !ok || blobType != "blob" {
333 return "", false
334 }
335
336 // Extract ref
337 ref, ok := blob["ref"].(map[string]interface{})
338 if !ok {
339 return "", false
340 }
341
342 // Extract $link (the CID)
343 link, ok := ref["$link"].(string)
344 if !ok {
345 return "", false
346 }
347
348 return link, true
349}
350
351// validateHandle checks if a handle matches expected format (!name@instance)
352func validateHandle(handle string) bool {
353 if !strings.HasPrefix(handle, "!") {
354 return false
355 }
356
357 parts := strings.Split(handle, "@")
358 if len(parts) != 2 {
359 return false
360 }
361
362 return true
363}