A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/atproto/identity"
5 "Coves/internal/atproto/utils"
6 "Coves/internal/core/communities"
7 "context"
8 "encoding/json"
9 "fmt"
10 "log"
11 "net/http"
12 "strings"
13 "time"
14
15 lru "github.com/hashicorp/golang-lru/v2"
16 "golang.org/x/net/publicsuffix"
17 "golang.org/x/time/rate"
18)
19
20// CommunityEventConsumer consumes community-related events from Jetstream
21type CommunityEventConsumer struct {
22 repo communities.Repository // Repository for community operations
23 identityResolver interface {
24 Resolve(context.Context, string) (*identity.Identity, error)
25 } // For resolving handles from DIDs
26 httpClient *http.Client // Shared HTTP client with connection pooling
27 didCache *lru.Cache[string, cachedDIDDoc] // Bounded LRU cache for .well-known verification results
28 wellKnownLimiter *rate.Limiter // Rate limiter for .well-known fetches
29 instanceDID string // DID of this Coves instance
30 skipVerification bool // Skip did:web verification (for dev mode)
31}
32
33// cachedDIDDoc represents a cached verification result with expiration
34type cachedDIDDoc struct {
35 expiresAt time.Time // When this cache entry expires
36 valid bool // Whether verification passed
37}
38
39// NewCommunityEventConsumer creates a new Jetstream consumer for community events
40// instanceDID: The DID of this Coves instance (for hostedBy verification)
41// skipVerification: Skip did:web verification (for dev mode)
42// identityResolver: Optional resolver for resolving handles from DIDs (can be nil for tests)
43func NewCommunityEventConsumer(repo communities.Repository, instanceDID string, skipVerification bool, identityResolver interface {
44 Resolve(context.Context, string) (*identity.Identity, error)
45},
46) *CommunityEventConsumer {
47 // Create bounded LRU cache for DID document verification results
48 // Max 1000 entries to prevent unbounded memory growth (PR review feedback)
49 // Each entry ~100 bytes → max ~100KB memory overhead
50 cache, err := lru.New[string, cachedDIDDoc](1000)
51 if err != nil {
52 // This should never happen with a valid size, but handle gracefully
53 log.Printf("WARNING: Failed to create DID cache, verification will be slower: %v", err)
54 // Create minimal cache to avoid nil pointer
55 cache, _ = lru.New[string, cachedDIDDoc](1)
56 }
57
58 return &CommunityEventConsumer{
59 repo: repo,
60 identityResolver: identityResolver, // Optional - can be nil for tests
61 instanceDID: instanceDID,
62 skipVerification: skipVerification,
63 // Shared HTTP client with connection pooling for .well-known fetches
64 httpClient: &http.Client{
65 Timeout: 10 * time.Second,
66 Transport: &http.Transport{
67 MaxIdleConns: 100,
68 MaxIdleConnsPerHost: 10,
69 IdleConnTimeout: 90 * time.Second,
70 },
71 },
72 // Bounded LRU cache for .well-known verification results (max 1000 entries)
73 // Automatically evicts least-recently-used entries when full
74 didCache: cache,
75 // Rate limiter: 10 requests per second, burst of 20
76 // Prevents DoS via excessive .well-known fetches
77 wellKnownLimiter: rate.NewLimiter(10, 20),
78 }
79}
80
81// HandleEvent processes a Jetstream event for community records
82// This is called by the main Jetstream consumer when it receives commit events
83func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
84 // We only care about commit events for community records
85 if event.Kind != "commit" || event.Commit == nil {
86 return nil
87 }
88
89 commit := event.Commit
90
91 // Route to appropriate handler based on collection
92 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures
93 // - social.coves.community.profile: Community profile records (in community's own repo)
94 // - social.coves.community.subscription: Subscription records (in user's repo)
95 // - social.coves.community.block: Block records (in user's repo)
96 //
97 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints
98 // that CREATE or DELETE records in these collections
99 switch commit.Collection {
100 case "social.coves.community.profile":
101 return c.handleCommunityProfile(ctx, event.Did, commit)
102 case "social.coves.community.subscription":
103 // Handle both create (subscribe) and delete (unsubscribe) operations
104 return c.handleSubscription(ctx, event.Did, commit)
105 case "social.coves.community.block":
106 // Handle both create (block) and delete (unblock) operations
107 return c.handleBlock(ctx, event.Did, commit)
108 default:
109 // Not a community-related collection
110 return nil
111 }
112}
113
114// handleCommunityProfile processes community profile create/update/delete events
115func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error {
116 switch commit.Operation {
117 case "create":
118 return c.createCommunity(ctx, did, commit)
119 case "update":
120 return c.updateCommunity(ctx, did, commit)
121 case "delete":
122 return c.deleteCommunity(ctx, did)
123 default:
124 log.Printf("Unknown operation for community profile: %s", commit.Operation)
125 return nil
126 }
127}
128
129// createCommunity indexes a new community from the firehose
130func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error {
131 if commit.Record == nil {
132 return fmt.Errorf("community profile create event missing record data")
133 }
134
135 // Parse the community profile record
136 profile, err := parseCommunityProfile(commit.Record)
137 if err != nil {
138 return fmt.Errorf("failed to parse community profile: %w", err)
139 }
140
141 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs)
142 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID
143 if profile.Handle == "" {
144 if c.identityResolver != nil {
145 // Production: Resolve handle from PLC (source of truth)
146 // NO FALLBACK - if PLC is down, we fail and backfill later
147 // This prevents creating communities with incorrect handles in federated scenarios
148 identity, err := c.identityResolver.Resolve(ctx, did)
149 if err != nil {
150 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err)
151 }
152 profile.Handle = identity.Handle
153 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)",
154 profile.Handle, did, identity.Method)
155 } else {
156 // Test mode only: construct deterministically when no resolver available
157 profile.Handle = constructHandleFromProfile(profile)
158 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)",
159 profile.Handle, profile.Name, profile.HostedBy)
160 }
161 }
162
163 // SECURITY: Verify hostedBy claim matches handle domain
164 // This prevents malicious instances from claiming to host communities for domains they don't own
165 if err := c.verifyHostedByClaim(ctx, profile.Handle, profile.HostedBy); err != nil {
166 log.Printf("🚨 SECURITY: Rejecting community %s - hostedBy verification failed: %v", did, err)
167 log.Printf(" Handle: %s, HostedBy: %s", profile.Handle, profile.HostedBy)
168 return fmt.Errorf("hostedBy verification failed: %w", err)
169 }
170
171 // Build AT-URI for this record
172 // V2 Architecture (ONLY):
173 // - 'did' parameter IS the community DID (community owns its own repo)
174 // - rkey MUST be "self" for community profiles
175 // - URI: at://community_did/social.coves.community.profile/self
176
177 // REJECT non-V2 communities (pre-production: no V1 compatibility)
178 if commit.RKey != "self" {
179 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
180 }
181
182 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did)
183
184 // V2: Community ALWAYS owns itself
185 ownerDID := did
186
187 // Create community entity
188 community := &communities.Community{
189 DID: did, // V2: Repository DID IS the community DID
190 Handle: profile.Handle,
191 Name: profile.Name,
192 DisplayName: profile.DisplayName,
193 Description: profile.Description,
194 OwnerDID: ownerDID, // V2: same as DID (self-owned)
195 CreatedByDID: profile.CreatedBy,
196 HostedByDID: profile.HostedBy,
197 Visibility: profile.Visibility,
198 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery,
199 ModerationType: profile.ModerationType,
200 ContentWarnings: profile.ContentWarnings,
201 MemberCount: profile.MemberCount,
202 SubscriberCount: profile.SubscriberCount,
203 FederatedFrom: profile.FederatedFrom,
204 FederatedID: profile.FederatedID,
205 CreatedAt: profile.CreatedAt,
206 UpdatedAt: time.Now(),
207 RecordURI: uri,
208 RecordCID: commit.CID,
209 }
210
211 // Handle blobs (avatar/banner) if present
212 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
213 community.AvatarCID = avatarCID
214 }
215 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
216 community.BannerCID = bannerCID
217 }
218
219 // Handle description facets (rich text)
220 if profile.DescriptionFacets != nil {
221 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
222 if marshalErr == nil {
223 community.DescriptionFacets = facetsJSON
224 }
225 }
226
227 // Index in AppView database
228 _, err = c.repo.Create(ctx, community)
229 if err != nil {
230 // Check if it already exists (idempotency)
231 if communities.IsConflict(err) {
232 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID)
233 return nil
234 }
235 return fmt.Errorf("failed to index community: %w", err)
236 }
237
238 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID)
239 return nil
240}
241
242// updateCommunity updates an existing community from the firehose
243func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error {
244 if commit.Record == nil {
245 return fmt.Errorf("community profile update event missing record data")
246 }
247
248 // REJECT non-V2 communities (pre-production: no V1 compatibility)
249 if commit.RKey != "self" {
250 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
251 }
252
253 // Parse profile
254 profile, err := parseCommunityProfile(commit.Record)
255 if err != nil {
256 return fmt.Errorf("failed to parse community profile: %w", err)
257 }
258
259 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs)
260 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID
261 if profile.Handle == "" {
262 if c.identityResolver != nil {
263 // Production: Resolve handle from PLC (source of truth)
264 // NO FALLBACK - if PLC is down, we fail and backfill later
265 // This prevents creating communities with incorrect handles in federated scenarios
266 identity, err := c.identityResolver.Resolve(ctx, did)
267 if err != nil {
268 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err)
269 }
270 profile.Handle = identity.Handle
271 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)",
272 profile.Handle, did, identity.Method)
273 } else {
274 // Test mode only: construct deterministically when no resolver available
275 profile.Handle = constructHandleFromProfile(profile)
276 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)",
277 profile.Handle, profile.Name, profile.HostedBy)
278 }
279 }
280
281 // V2: Repository DID IS the community DID
282 // Get existing community using the repo DID
283 existing, err := c.repo.GetByDID(ctx, did)
284 if err != nil {
285 if communities.IsNotFound(err) {
286 // Community doesn't exist yet - treat as create
287 log.Printf("Community not found for update, creating: %s", did)
288 return c.createCommunity(ctx, did, commit)
289 }
290 return fmt.Errorf("failed to get existing community: %w", err)
291 }
292
293 // Update fields
294 existing.Handle = profile.Handle
295 existing.Name = profile.Name
296 existing.DisplayName = profile.DisplayName
297 existing.Description = profile.Description
298 existing.Visibility = profile.Visibility
299 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery
300 existing.ModerationType = profile.ModerationType
301 existing.ContentWarnings = profile.ContentWarnings
302 existing.RecordCID = commit.CID
303
304 // Update blobs
305 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
306 existing.AvatarCID = avatarCID
307 }
308 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
309 existing.BannerCID = bannerCID
310 }
311
312 // Update description facets
313 if profile.DescriptionFacets != nil {
314 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
315 if marshalErr == nil {
316 existing.DescriptionFacets = facetsJSON
317 }
318 }
319
320 // Save updates
321 _, err = c.repo.Update(ctx, existing)
322 if err != nil {
323 return fmt.Errorf("failed to update community: %w", err)
324 }
325
326 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID)
327 return nil
328}
329
330// deleteCommunity removes a community from the index
331func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error {
332 err := c.repo.Delete(ctx, did)
333 if err != nil {
334 if communities.IsNotFound(err) {
335 log.Printf("Community already deleted: %s", did)
336 return nil
337 }
338 return fmt.Errorf("failed to delete community: %w", err)
339 }
340
341 log.Printf("Deleted community: %s", did)
342 return nil
343}
344
345// verifyHostedByClaim verifies that the community's hostedBy claim matches the handle domain
346// This prevents malicious instances from claiming to host communities for domains they don't own
347func (c *CommunityEventConsumer) verifyHostedByClaim(ctx context.Context, handle, hostedByDID string) error {
348 // Skip verification in dev mode
349 if c.skipVerification {
350 return nil
351 }
352
353 // Add 15 second overall timeout to prevent slow verification from blocking consumer (PR review feedback)
354 ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
355 defer cancel()
356
357 // Verify hostedByDID is did:web format
358 if !strings.HasPrefix(hostedByDID, "did:web:") {
359 return fmt.Errorf("hostedByDID must use did:web method, got: %s", hostedByDID)
360 }
361
362 // Extract domain from did:web DID
363 hostedByDomain := strings.TrimPrefix(hostedByDID, "did:web:")
364
365 // Extract domain from community handle
366 // Handle format examples:
367 // - "!gaming@coves.social" → domain: "coves.social"
368 // - "gaming.community.coves.social" → domain: "coves.social"
369 handleDomain := extractDomainFromHandle(handle)
370 if handleDomain == "" {
371 return fmt.Errorf("failed to extract domain from handle: %s", handle)
372 }
373
374 // Verify handle domain matches hostedBy domain
375 if handleDomain != hostedByDomain {
376 return fmt.Errorf("handle domain (%s) doesn't match hostedBy domain (%s)", handleDomain, hostedByDomain)
377 }
378
379 // SECURITY: Verify DID document exists and is valid (Bluesky-compatible security model)
380 // MANDATORY bidirectional verification: DID document must claim this handle in alsoKnownAs
381 // This matches Bluesky's security requirements and prevents domain impersonation
382 if err := c.verifyDIDDocument(ctx, hostedByDID, hostedByDomain, handle); err != nil {
383 log.Printf("🚨 SECURITY: Rejecting community - bidirectional DID verification failed: %v", err)
384 return fmt.Errorf("bidirectional DID verification required: %w", err)
385 }
386
387 return nil
388}
389
390// verifyDIDDocument fetches and validates the DID document from .well-known/did.json
391// Implements Bluesky's bidirectional verification model:
392// 1. Verify DID document exists at https://domain/.well-known/did.json
393// 2. Verify DID document ID matches claimed DID
394// 3. Verify DID document claims the handle in alsoKnownAs field
395//
396// Results are cached with TTL and rate-limited to prevent DoS attacks
397func (c *CommunityEventConsumer) verifyDIDDocument(ctx context.Context, did, domain, handle string) error {
398 // Skip verification in dev mode
399 if c.skipVerification {
400 return nil
401 }
402
403 // Check bounded LRU cache first (thread-safe, no locks needed)
404 if cached, ok := c.didCache.Get(did); ok {
405 // Check if cache entry is still valid (not expired)
406 if time.Now().Before(cached.expiresAt) {
407 if !cached.valid {
408 return fmt.Errorf("cached verification failure for %s", did)
409 }
410 log.Printf("✓ DID document verification (cached): %s", domain)
411 return nil
412 }
413 // Cache entry expired - remove it to free up space for fresh entries
414 c.didCache.Remove(did)
415 }
416
417 // Rate limit .well-known fetches to prevent DoS
418 if err := c.wellKnownLimiter.Wait(ctx); err != nil {
419 return fmt.Errorf("rate limit exceeded for .well-known fetch: %w", err)
420 }
421
422 // Construct .well-known URL
423 didDocURL := fmt.Sprintf("https://%s/.well-known/did.json", domain)
424
425 // Create HTTP request with timeout
426 req, err := http.NewRequestWithContext(ctx, "GET", didDocURL, nil)
427 if err != nil {
428 // Cache the failure
429 c.cacheVerificationResult(did, false, 5*time.Minute)
430 return fmt.Errorf("failed to create request: %w", err)
431 }
432
433 // Fetch DID document using shared HTTP client
434 resp, err := c.httpClient.Do(req)
435 if err != nil {
436 // Cache the failure (shorter TTL for network errors)
437 c.cacheVerificationResult(did, false, 5*time.Minute)
438 return fmt.Errorf("failed to fetch DID document from %s: %w", didDocURL, err)
439 }
440 defer func() {
441 if closeErr := resp.Body.Close(); closeErr != nil {
442 log.Printf("Failed to close response body: %v", closeErr)
443 }
444 }()
445
446 // Verify HTTP status
447 if resp.StatusCode != http.StatusOK {
448 // Cache the failure
449 c.cacheVerificationResult(did, false, 5*time.Minute)
450 return fmt.Errorf("DID document returned HTTP %d from %s", resp.StatusCode, didDocURL)
451 }
452
453 // Parse DID document
454 var didDoc struct {
455 ID string `json:"id"`
456 AlsoKnownAs []string `json:"alsoKnownAs"`
457 }
458 if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil {
459 // Cache the failure
460 c.cacheVerificationResult(did, false, 5*time.Minute)
461 return fmt.Errorf("failed to parse DID document JSON: %w", err)
462 }
463
464 // Verify DID document ID matches claimed DID
465 if didDoc.ID != did {
466 // Cache the failure
467 c.cacheVerificationResult(did, false, 5*time.Minute)
468 return fmt.Errorf("DID document ID (%s) doesn't match claimed DID (%s)", didDoc.ID, did)
469 }
470
471 // SECURITY: Bidirectional verification - DID document must claim this handle
472 // Prevents impersonation where someone points DNS to another user's DID
473 // Format: handle "coves.social" or "!community@coves.social" → check for "at://coves.social"
474 handleDomain := extractDomainFromHandle(handle)
475 expectedAlias := fmt.Sprintf("at://%s", handleDomain)
476
477 found := false
478 for _, alias := range didDoc.AlsoKnownAs {
479 if alias == expectedAlias {
480 found = true
481 break
482 }
483 }
484
485 if !found {
486 // Cache the failure
487 c.cacheVerificationResult(did, false, 5*time.Minute)
488 return fmt.Errorf("DID document does not claim handle domain %s in alsoKnownAs (expected %s, got %v)",
489 handleDomain, expectedAlias, didDoc.AlsoKnownAs)
490 }
491
492 // Cache the success (24 hour TTL - matches Bluesky recommendations)
493 c.cacheVerificationResult(did, true, 24*time.Hour)
494
495 log.Printf("✓ DID document verified: %s", domain)
496 return nil
497}
498
499// cacheVerificationResult stores a verification result in the bounded LRU cache with the given TTL
500// The LRU cache is thread-safe and automatically evicts least-recently-used entries when full
501func (c *CommunityEventConsumer) cacheVerificationResult(did string, valid bool, ttl time.Duration) {
502 c.didCache.Add(did, cachedDIDDoc{
503 valid: valid,
504 expiresAt: time.Now().Add(ttl),
505 })
506}
507
508// extractDomainFromHandle extracts the registrable domain from a community handle
509// Handles both formats:
510// - Bluesky-style: "!gaming@coves.social" → "coves.social"
511// - DNS-style: "gaming.community.coves.social" → "coves.social"
512//
513// Uses golang.org/x/net/publicsuffix to correctly handle multi-part TLDs:
514// - "gaming.community.coves.co.uk" → "coves.co.uk" (not "co.uk")
515// - "gaming.community.example.com.au" → "example.com.au" (not "com.au")
516func extractDomainFromHandle(handle string) string {
517 // Remove leading ! if present
518 handle = strings.TrimPrefix(handle, "!")
519
520 // Check for @-separated format (e.g., "gaming@coves.social")
521 if strings.Contains(handle, "@") {
522 parts := strings.Split(handle, "@")
523 if len(parts) == 2 {
524 domain := parts[1]
525 // Validate and extract eTLD+1 from the @-domain part
526 registrable, err := publicsuffix.EffectiveTLDPlusOne(domain)
527 if err != nil {
528 // If publicsuffix fails, fall back to returning the full domain part
529 // This handles edge cases like localhost, IP addresses, etc.
530 return domain
531 }
532 return registrable
533 }
534 return ""
535 }
536
537 // For DNS-style handles (e.g., "gaming.community.coves.social")
538 // Extract the registrable domain (eTLD+1) using publicsuffix
539 // This correctly handles multi-part TLDs like .co.uk, .com.au, etc.
540 registrable, err := publicsuffix.EffectiveTLDPlusOne(handle)
541 if err != nil {
542 // If publicsuffix fails (e.g., invalid TLD, localhost, IP address)
543 // fall back to naive extraction (last 2 parts)
544 // This maintains backward compatibility for edge cases
545 parts := strings.Split(handle, ".")
546 if len(parts) < 2 {
547 return "" // Invalid handle
548 }
549 return strings.Join(parts[len(parts)-2:], ".")
550 }
551
552 return registrable
553}
554
555// handleSubscription processes subscription create/delete events
556// CREATE operation = user subscribed to community
557// DELETE operation = user unsubscribed from community
558func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
559 switch commit.Operation {
560 case "create":
561 return c.createSubscription(ctx, userDID, commit)
562 case "delete":
563 return c.deleteSubscription(ctx, userDID, commit)
564 default:
565 // Update operations shouldn't happen on subscriptions, but ignore gracefully
566 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)",
567 commit.Operation, userDID, commit.RKey)
568 return nil
569 }
570}
571
572// createSubscription indexes a new subscription with retry logic
573func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
574 if commit.Record == nil {
575 return fmt.Errorf("subscription create event missing record data")
576 }
577
578 // Extract community DID from record's subject field (following atProto conventions)
579 communityDID, ok := commit.Record["subject"].(string)
580 if !ok {
581 return fmt.Errorf("subscription record missing subject field")
582 }
583
584 // Extract contentVisibility with clamping and default value
585 contentVisibility := extractContentVisibility(commit.Record)
586
587 // Build AT-URI for subscription record
588 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint
589 // The record lives in the USER's repository, but uses the communities namespace
590 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
591
592 // Create subscription entity
593 // Parse createdAt from record to preserve chronological ordering during replays
594 subscription := &communities.Subscription{
595 UserDID: userDID,
596 CommunityDID: communityDID,
597 ContentVisibility: contentVisibility,
598 SubscribedAt: utils.ParseCreatedAt(commit.Record),
599 RecordURI: uri,
600 RecordCID: commit.CID,
601 }
602
603 // Use transactional method to ensure subscription and count are atomically updated
604 // This is idempotent - safe for Jetstream replays
605 _, err := c.repo.SubscribeWithCount(ctx, subscription)
606 if err != nil {
607 // If already exists, that's fine (idempotency)
608 if communities.IsConflict(err) {
609 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)",
610 userDID, communityDID, contentVisibility)
611 return nil
612 }
613 return fmt.Errorf("failed to index subscription: %w", err)
614 }
615
616 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)",
617 userDID, communityDID, contentVisibility)
618 return nil
619}
620
621// deleteSubscription removes a subscription from the index
622// DELETE operations don't include record data, so we need to look up the subscription
623// by its URI to find which community the user unsubscribed from
624func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
625 // Build AT-URI from the rkey
626 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
627
628 // Look up the subscription to get the community DID
629 // (DELETE operations don't include record data in Jetstream)
630 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri)
631 if err != nil {
632 if communities.IsNotFound(err) {
633 // Already deleted - this is fine (idempotency)
634 log.Printf("Subscription already deleted: %s", uri)
635 return nil
636 }
637 return fmt.Errorf("failed to find subscription for deletion: %w", err)
638 }
639
640 // Use transactional method to ensure unsubscribe and count are atomically updated
641 // This is idempotent - safe for Jetstream replays
642 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID)
643 if err != nil {
644 if communities.IsNotFound(err) {
645 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID)
646 return nil
647 }
648 return fmt.Errorf("failed to remove subscription: %w", err)
649 }
650
651 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID)
652 return nil
653}
654
655// handleBlock processes block create/delete events
656// CREATE operation = user blocked a community
657// DELETE operation = user unblocked a community
658func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
659 switch commit.Operation {
660 case "create":
661 return c.createBlock(ctx, userDID, commit)
662 case "delete":
663 return c.deleteBlock(ctx, userDID, commit)
664 default:
665 // Update operations shouldn't happen on blocks, but ignore gracefully
666 log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)",
667 commit.Operation, userDID, commit.RKey)
668 return nil
669 }
670}
671
672// createBlock indexes a new block
673func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
674 if commit.Record == nil {
675 return fmt.Errorf("block create event missing record data")
676 }
677
678 // Extract community DID from record's subject field (following atProto conventions)
679 communityDID, ok := commit.Record["subject"].(string)
680 if !ok {
681 return fmt.Errorf("block record missing subject field")
682 }
683
684 // Build AT-URI for block record
685 // The record lives in the USER's repository
686 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
687
688 // Create block entity
689 // Parse createdAt from record to preserve chronological ordering during replays
690 block := &communities.CommunityBlock{
691 UserDID: userDID,
692 CommunityDID: communityDID,
693 BlockedAt: utils.ParseCreatedAt(commit.Record),
694 RecordURI: uri,
695 RecordCID: commit.CID,
696 }
697
698 // Index the block
699 // This is idempotent - safe for Jetstream replays
700 _, err := c.repo.BlockCommunity(ctx, block)
701 if err != nil {
702 // If already exists, that's fine (idempotency)
703 if communities.IsConflict(err) {
704 log.Printf("Block already indexed: %s -> %s", userDID, communityDID)
705 return nil
706 }
707 return fmt.Errorf("failed to index block: %w", err)
708 }
709
710 log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID)
711 return nil
712}
713
714// deleteBlock removes a block from the index
715// DELETE operations don't include record data, so we need to look up the block
716// by its URI to find which community the user unblocked
717func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
718 // Build AT-URI from the rkey
719 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
720
721 // Look up the block to get the community DID
722 // (DELETE operations don't include record data in Jetstream)
723 block, err := c.repo.GetBlockByURI(ctx, uri)
724 if err != nil {
725 if communities.IsNotFound(err) {
726 // Already deleted - this is fine (idempotency)
727 log.Printf("Block already deleted: %s", uri)
728 return nil
729 }
730 return fmt.Errorf("failed to find block for deletion: %w", err)
731 }
732
733 // Remove the block from the index
734 err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID)
735 if err != nil {
736 if communities.IsNotFound(err) {
737 log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID)
738 return nil
739 }
740 return fmt.Errorf("failed to remove block: %w", err)
741 }
742
743 log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID)
744 return nil
745}
746
747// Helper types and functions
748
749type CommunityProfile struct {
750 CreatedAt time.Time `json:"createdAt"`
751 Avatar map[string]interface{} `json:"avatar"`
752 Banner map[string]interface{} `json:"banner"`
753 CreatedBy string `json:"createdBy"`
754 Visibility string `json:"visibility"`
755 AtprotoHandle string `json:"atprotoHandle"`
756 DisplayName string `json:"displayName"`
757 Name string `json:"name"`
758 Handle string `json:"handle"`
759 HostedBy string `json:"hostedBy"`
760 Description string `json:"description"`
761 FederatedID string `json:"federatedId"`
762 ModerationType string `json:"moderationType"`
763 FederatedFrom string `json:"federatedFrom"`
764 ContentWarnings []string `json:"contentWarnings"`
765 DescriptionFacets []interface{} `json:"descriptionFacets"`
766 MemberCount int `json:"memberCount"`
767 SubscriberCount int `json:"subscriberCount"`
768 Federation FederationConfig `json:"federation"`
769}
770
771type FederationConfig struct {
772 AllowExternalDiscovery bool `json:"allowExternalDiscovery"`
773}
774
775// parseCommunityProfile converts a raw record map to a CommunityProfile
776func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) {
777 recordJSON, err := json.Marshal(record)
778 if err != nil {
779 return nil, fmt.Errorf("failed to marshal record: %w", err)
780 }
781
782 var profile CommunityProfile
783 if err := json.Unmarshal(recordJSON, &profile); err != nil {
784 return nil, fmt.Errorf("failed to unmarshal profile: %w", err)
785 }
786
787 return &profile, nil
788}
789
790// constructHandleFromProfile constructs a deterministic handle from profile data
791// Format: {name}.community.{instanceDomain}
792// Example: gaming.community.coves.social
793// This is ONLY used in test mode (when identity resolver is nil)
794// Production MUST resolve handles from PLC (source of truth)
795// Returns empty string if hostedBy is not did:web format (caller will fail validation)
796func constructHandleFromProfile(profile *CommunityProfile) string {
797 if !strings.HasPrefix(profile.HostedBy, "did:web:") {
798 // hostedBy must be did:web format for handle construction
799 // Return empty to trigger validation error in repository
800 return ""
801 }
802 instanceDomain := strings.TrimPrefix(profile.HostedBy, "did:web:")
803 return fmt.Sprintf("%s.community.%s", profile.Name, instanceDomain)
804}
805
806// extractContentVisibility extracts contentVisibility from subscription record with clamping
807// Returns default value of 3 if missing or invalid
808func extractContentVisibility(record map[string]interface{}) int {
809 const defaultVisibility = 3
810
811 cv, ok := record["contentVisibility"]
812 if !ok {
813 // Field missing - use default
814 return defaultVisibility
815 }
816
817 // JSON numbers decode as float64
818 cvFloat, ok := cv.(float64)
819 if !ok {
820 // Try int (shouldn't happen but handle gracefully)
821 if cvInt, isInt := cv.(int); isInt {
822 return clampContentVisibility(cvInt)
823 }
824 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv)
825 return defaultVisibility
826 }
827
828 // Convert and clamp
829 clamped := clampContentVisibility(int(cvFloat))
830 if clamped != int(cvFloat) {
831 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped)
832 }
833 return clamped
834}
835
836// clampContentVisibility ensures value is within valid range (1-5)
837func clampContentVisibility(value int) int {
838 if value < 1 {
839 return 1
840 }
841 if value > 5 {
842 return 5
843 }
844 return value
845}
846
847// extractBlobCID extracts the CID from a blob reference
848// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123}
849func extractBlobCID(blob map[string]interface{}) (string, bool) {
850 if blob == nil {
851 return "", false
852 }
853
854 // Check if it's a blob type
855 blobType, ok := blob["$type"].(string)
856 if !ok || blobType != "blob" {
857 return "", false
858 }
859
860 // Extract ref
861 ref, ok := blob["ref"].(map[string]interface{})
862 if !ok {
863 return "", false
864 }
865
866 // Extract $link (the CID)
867 link, ok := ref["$link"].(string)
868 if !ok {
869 return "", false
870 }
871
872 return link, true
873}