A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "strings"
9 "time"
10
11 "Coves/internal/core/communities"
12)
13
14// CommunityEventConsumer consumes community-related events from Jetstream
15type CommunityEventConsumer struct {
16 repo communities.Repository
17}
18
19// NewCommunityEventConsumer creates a new Jetstream consumer for community events
20func NewCommunityEventConsumer(repo communities.Repository) *CommunityEventConsumer {
21 return &CommunityEventConsumer{
22 repo: repo,
23 }
24}
25
26// HandleEvent processes a Jetstream event for community records
27// This is called by the main Jetstream consumer when it receives commit events
28func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
29 // We only care about commit events for community records
30 if event.Kind != "commit" || event.Commit == nil {
31 return nil
32 }
33
34 commit := event.Commit
35
36 // Route to appropriate handler based on collection
37 switch commit.Collection {
38 case "social.coves.community.profile":
39 return c.handleCommunityProfile(ctx, event.Did, commit)
40 case "social.coves.community.subscribe":
41 return c.handleSubscription(ctx, event.Did, commit)
42 case "social.coves.community.unsubscribe":
43 return c.handleUnsubscribe(ctx, event.Did, commit)
44 default:
45 // Not a community-related collection
46 return nil
47 }
48}
49
50// handleCommunityProfile processes community profile create/update/delete events
51func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error {
52 switch commit.Operation {
53 case "create":
54 return c.createCommunity(ctx, did, commit)
55 case "update":
56 return c.updateCommunity(ctx, did, commit)
57 case "delete":
58 return c.deleteCommunity(ctx, did)
59 default:
60 log.Printf("Unknown operation for community profile: %s", commit.Operation)
61 return nil
62 }
63}
64
65// createCommunity indexes a new community from the firehose
66func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error {
67 if commit.Record == nil {
68 return fmt.Errorf("community profile create event missing record data")
69 }
70
71 // Parse the community profile record
72 profile, err := parseCommunityProfile(commit.Record)
73 if err != nil {
74 return fmt.Errorf("failed to parse community profile: %w", err)
75 }
76
77 // Build AT-URI for this record
78 // V2 Architecture (ONLY):
79 // - 'did' parameter IS the community DID (community owns its own repo)
80 // - rkey MUST be "self" for community profiles
81 // - URI: at://community_did/social.coves.community.profile/self
82
83 // REJECT non-V2 communities (pre-production: no V1 compatibility)
84 if commit.RKey != "self" {
85 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
86 }
87
88 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did)
89
90 // V2: Community ALWAYS owns itself
91 ownerDID := did
92
93 // Create community entity
94 community := &communities.Community{
95 DID: did, // V2: Repository DID IS the community DID
96 Handle: profile.Handle,
97 Name: profile.Name,
98 DisplayName: profile.DisplayName,
99 Description: profile.Description,
100 OwnerDID: ownerDID, // V2: same as DID (self-owned)
101 CreatedByDID: profile.CreatedBy,
102 HostedByDID: profile.HostedBy,
103 Visibility: profile.Visibility,
104 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery,
105 ModerationType: profile.ModerationType,
106 ContentWarnings: profile.ContentWarnings,
107 MemberCount: profile.MemberCount,
108 SubscriberCount: profile.SubscriberCount,
109 FederatedFrom: profile.FederatedFrom,
110 FederatedID: profile.FederatedID,
111 CreatedAt: profile.CreatedAt,
112 UpdatedAt: time.Now(),
113 RecordURI: uri,
114 RecordCID: commit.CID,
115 }
116
117 // Handle blobs (avatar/banner) if present
118 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
119 community.AvatarCID = avatarCID
120 }
121 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
122 community.BannerCID = bannerCID
123 }
124
125 // Handle description facets (rich text)
126 if profile.DescriptionFacets != nil {
127 facetsJSON, err := json.Marshal(profile.DescriptionFacets)
128 if err == nil {
129 community.DescriptionFacets = facetsJSON
130 }
131 }
132
133 // Index in AppView database
134 _, err = c.repo.Create(ctx, community)
135 if err != nil {
136 // Check if it already exists (idempotency)
137 if communities.IsConflict(err) {
138 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID)
139 return nil
140 }
141 return fmt.Errorf("failed to index community: %w", err)
142 }
143
144 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID)
145 return nil
146}
147
148// updateCommunity updates an existing community from the firehose
149func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error {
150 if commit.Record == nil {
151 return fmt.Errorf("community profile update event missing record data")
152 }
153
154 // REJECT non-V2 communities (pre-production: no V1 compatibility)
155 if commit.RKey != "self" {
156 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
157 }
158
159 // Parse profile
160 profile, err := parseCommunityProfile(commit.Record)
161 if err != nil {
162 return fmt.Errorf("failed to parse community profile: %w", err)
163 }
164
165 // V2: Repository DID IS the community DID
166 // Get existing community using the repo DID
167 existing, err := c.repo.GetByDID(ctx, did)
168 if err != nil {
169 if communities.IsNotFound(err) {
170 // Community doesn't exist yet - treat as create
171 log.Printf("Community not found for update, creating: %s", did)
172 return c.createCommunity(ctx, did, commit)
173 }
174 return fmt.Errorf("failed to get existing community: %w", err)
175 }
176
177 // Update fields
178 existing.Handle = profile.Handle
179 existing.Name = profile.Name
180 existing.DisplayName = profile.DisplayName
181 existing.Description = profile.Description
182 existing.Visibility = profile.Visibility
183 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery
184 existing.ModerationType = profile.ModerationType
185 existing.ContentWarnings = profile.ContentWarnings
186 existing.RecordCID = commit.CID
187
188 // Update blobs
189 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
190 existing.AvatarCID = avatarCID
191 }
192 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
193 existing.BannerCID = bannerCID
194 }
195
196 // Update description facets
197 if profile.DescriptionFacets != nil {
198 facetsJSON, err := json.Marshal(profile.DescriptionFacets)
199 if err == nil {
200 existing.DescriptionFacets = facetsJSON
201 }
202 }
203
204 // Save updates
205 _, err = c.repo.Update(ctx, existing)
206 if err != nil {
207 return fmt.Errorf("failed to update community: %w", err)
208 }
209
210 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID)
211 return nil
212}
213
214// deleteCommunity removes a community from the index
215func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error {
216 err := c.repo.Delete(ctx, did)
217 if err != nil {
218 if communities.IsNotFound(err) {
219 log.Printf("Community already deleted: %s", did)
220 return nil
221 }
222 return fmt.Errorf("failed to delete community: %w", err)
223 }
224
225 log.Printf("Deleted community: %s", did)
226 return nil
227}
228
229// handleSubscription indexes a subscription event
230func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
231 if commit.Operation != "create" {
232 return nil // Subscriptions are only created, not updated
233 }
234
235 if commit.Record == nil {
236 return fmt.Errorf("subscription event missing record data")
237 }
238
239 // Extract community DID from record
240 communityDID, ok := commit.Record["community"].(string)
241 if !ok {
242 return fmt.Errorf("subscription record missing community field")
243 }
244
245 // Build AT-URI for subscription record
246 uri := fmt.Sprintf("at://%s/social.coves.community.subscribe/%s", userDID, commit.RKey)
247
248 // Create subscription
249 subscription := &communities.Subscription{
250 UserDID: userDID,
251 CommunityDID: communityDID,
252 SubscribedAt: time.Now(),
253 RecordURI: uri,
254 RecordCID: commit.CID,
255 }
256
257 // Use transactional method to ensure subscription and count are atomically updated
258 // This is idempotent - safe for Jetstream replays
259 _, err := c.repo.SubscribeWithCount(ctx, subscription)
260 if err != nil {
261 return fmt.Errorf("failed to index subscription: %w", err)
262 }
263
264 log.Printf("Indexed subscription: %s -> %s", userDID, communityDID)
265 return nil
266}
267
268// handleUnsubscribe removes a subscription
269func (c *CommunityEventConsumer) handleUnsubscribe(ctx context.Context, userDID string, commit *CommitEvent) error {
270 if commit.Operation != "delete" {
271 return nil
272 }
273
274 // For unsubscribe, we need to extract the community DID from the record key or metadata
275 // This might need adjustment based on actual Jetstream structure
276 if commit.Record == nil {
277 return fmt.Errorf("unsubscribe event missing record data")
278 }
279
280 communityDID, ok := commit.Record["community"].(string)
281 if !ok {
282 return fmt.Errorf("unsubscribe record missing community field")
283 }
284
285 // Use transactional method to ensure unsubscribe and count are atomically updated
286 // This is idempotent - safe for Jetstream replays
287 err := c.repo.UnsubscribeWithCount(ctx, userDID, communityDID)
288 if err != nil {
289 return fmt.Errorf("failed to remove subscription: %w", err)
290 }
291
292 log.Printf("Removed subscription: %s -> %s", userDID, communityDID)
293 return nil
294}
295
296// Helper types and functions
297
298type CommunityProfile struct {
299 // V2 ONLY: No DID field (repo DID is authoritative)
300 Handle string `json:"handle"` // Scoped handle (!gaming@coves.social)
301 AtprotoHandle string `json:"atprotoHandle"` // Real atProto handle (gaming.communities.coves.social)
302 Name string `json:"name"`
303 DisplayName string `json:"displayName"`
304 Description string `json:"description"`
305 DescriptionFacets []interface{} `json:"descriptionFacets"`
306 Avatar map[string]interface{} `json:"avatar"`
307 Banner map[string]interface{} `json:"banner"`
308 // Owner field removed - V2 communities ALWAYS self-own (owner == repo DID)
309 CreatedBy string `json:"createdBy"`
310 HostedBy string `json:"hostedBy"`
311 Visibility string `json:"visibility"`
312 Federation FederationConfig `json:"federation"`
313 ModerationType string `json:"moderationType"`
314 ContentWarnings []string `json:"contentWarnings"`
315 MemberCount int `json:"memberCount"`
316 SubscriberCount int `json:"subscriberCount"`
317 FederatedFrom string `json:"federatedFrom"`
318 FederatedID string `json:"federatedId"`
319 CreatedAt time.Time `json:"createdAt"`
320}
321
322type FederationConfig struct {
323 AllowExternalDiscovery bool `json:"allowExternalDiscovery"`
324}
325
326// parseCommunityProfile converts a raw record map to a CommunityProfile
327func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) {
328 recordJSON, err := json.Marshal(record)
329 if err != nil {
330 return nil, fmt.Errorf("failed to marshal record: %w", err)
331 }
332
333 var profile CommunityProfile
334 if err := json.Unmarshal(recordJSON, &profile); err != nil {
335 return nil, fmt.Errorf("failed to unmarshal profile: %w", err)
336 }
337
338 return &profile, nil
339}
340
341// extractBlobCID extracts the CID from a blob reference
342// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123}
343func extractBlobCID(blob map[string]interface{}) (string, bool) {
344 if blob == nil {
345 return "", false
346 }
347
348 // Check if it's a blob type
349 blobType, ok := blob["$type"].(string)
350 if !ok || blobType != "blob" {
351 return "", false
352 }
353
354 // Extract ref
355 ref, ok := blob["ref"].(map[string]interface{})
356 if !ok {
357 return "", false
358 }
359
360 // Extract $link (the CID)
361 link, ok := ref["$link"].(string)
362 if !ok {
363 return "", false
364 }
365
366 return link, true
367}
368
369// validateHandle checks if a handle matches expected format (!name@instance)
370func validateHandle(handle string) bool {
371 if !strings.HasPrefix(handle, "!") {
372 return false
373 }
374
375 parts := strings.Split(handle, "@")
376 if len(parts) != 2 {
377 return false
378 }
379
380 return true
381}