A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/atproto/identity"
5 "Coves/internal/atproto/utils"
6 "Coves/internal/core/communities"
7 "context"
8 "encoding/json"
9 "fmt"
10 "log"
11 "net/http"
12 "strings"
13 "time"
14
15 lru "github.com/hashicorp/golang-lru/v2"
16 "golang.org/x/net/publicsuffix"
17 "golang.org/x/time/rate"
18)
19
20// CommunityEventConsumer consumes community-related events from Jetstream
21type CommunityEventConsumer struct {
22 repo communities.Repository // Repository for community operations
23 identityResolver interface {
24 Resolve(context.Context, string) (*identity.Identity, error)
25 } // For resolving handles from DIDs
26 httpClient *http.Client // Shared HTTP client with connection pooling
27 didCache *lru.Cache[string, cachedDIDDoc] // Bounded LRU cache for .well-known verification results
28 wellKnownLimiter *rate.Limiter // Rate limiter for .well-known fetches
29 instanceDID string // DID of this Coves instance
30 skipVerification bool // Skip did:web verification (for dev mode)
31}
32
33// cachedDIDDoc represents a cached verification result with expiration
34type cachedDIDDoc struct {
35 expiresAt time.Time // When this cache entry expires
36 valid bool // Whether verification passed
37}
38
39// NewCommunityEventConsumer creates a new Jetstream consumer for community events
40// instanceDID: The DID of this Coves instance (for hostedBy verification)
41// skipVerification: Skip did:web verification (for dev mode)
42// identityResolver: Optional resolver for resolving handles from DIDs (can be nil for tests)
43func NewCommunityEventConsumer(repo communities.Repository, instanceDID string, skipVerification bool, identityResolver interface {
44 Resolve(context.Context, string) (*identity.Identity, error)
45},
46) *CommunityEventConsumer {
47 // Create bounded LRU cache for DID document verification results
48 // Max 1000 entries to prevent unbounded memory growth (PR review feedback)
49 // Each entry ~100 bytes → max ~100KB memory overhead
50 cache, err := lru.New[string, cachedDIDDoc](1000)
51 if err != nil {
52 // This should never happen with a valid size, but handle gracefully
53 log.Printf("WARNING: Failed to create DID cache, verification will be slower: %v", err)
54 // Create minimal cache to avoid nil pointer
55 cache, _ = lru.New[string, cachedDIDDoc](1)
56 }
57
58 return &CommunityEventConsumer{
59 repo: repo,
60 identityResolver: identityResolver, // Optional - can be nil for tests
61 instanceDID: instanceDID,
62 skipVerification: skipVerification,
63 // Shared HTTP client with connection pooling for .well-known fetches
64 httpClient: &http.Client{
65 Timeout: 10 * time.Second,
66 Transport: &http.Transport{
67 MaxIdleConns: 100,
68 MaxIdleConnsPerHost: 10,
69 IdleConnTimeout: 90 * time.Second,
70 },
71 },
72 // Bounded LRU cache for .well-known verification results (max 1000 entries)
73 // Automatically evicts least-recently-used entries when full
74 didCache: cache,
75 // Rate limiter: 10 requests per second, burst of 20
76 // Prevents DoS via excessive .well-known fetches
77 wellKnownLimiter: rate.NewLimiter(10, 20),
78 }
79}
80
81// HandleEvent processes a Jetstream event for community records
82// This is called by the main Jetstream consumer when it receives commit events
83func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
84 // We only care about commit events for community records
85 if event.Kind != "commit" || event.Commit == nil {
86 return nil
87 }
88
89 commit := event.Commit
90
91 // Route to appropriate handler based on collection
92 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures
93 // - social.coves.community.profile: Community profile records (in community's own repo)
94 // - social.coves.community.subscription: Subscription records (in user's repo)
95 // - social.coves.community.block: Block records (in user's repo)
96 //
97 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints
98 // that CREATE or DELETE records in these collections
99 switch commit.Collection {
100 case "social.coves.community.profile":
101 return c.handleCommunityProfile(ctx, event.Did, commit)
102 case "social.coves.community.subscription":
103 // Handle both create (subscribe) and delete (unsubscribe) operations
104 return c.handleSubscription(ctx, event.Did, commit)
105 case "social.coves.community.block":
106 // Handle both create (block) and delete (unblock) operations
107 return c.handleBlock(ctx, event.Did, commit)
108 default:
109 // Not a community-related collection
110 return nil
111 }
112}
113
114// handleCommunityProfile processes community profile create/update/delete events
115func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error {
116 switch commit.Operation {
117 case "create":
118 return c.createCommunity(ctx, did, commit)
119 case "update":
120 return c.updateCommunity(ctx, did, commit)
121 case "delete":
122 return c.deleteCommunity(ctx, did)
123 default:
124 log.Printf("Unknown operation for community profile: %s", commit.Operation)
125 return nil
126 }
127}
128
129// createCommunity indexes a new community from the firehose
130func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error {
131 if commit.Record == nil {
132 return fmt.Errorf("community profile create event missing record data")
133 }
134
135 // Parse the community profile record
136 profile, err := parseCommunityProfile(commit.Record)
137 if err != nil {
138 return fmt.Errorf("failed to parse community profile: %w", err)
139 }
140
141 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs)
142 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID
143 if profile.Handle == "" {
144 if c.identityResolver != nil {
145 // Production: Resolve handle from PLC (source of truth)
146 // NO FALLBACK - if PLC is down, we fail and backfill later
147 // This prevents creating communities with incorrect handles in federated scenarios
148 identity, err := c.identityResolver.Resolve(ctx, did)
149 if err != nil {
150 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err)
151 }
152 profile.Handle = identity.Handle
153 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)",
154 profile.Handle, did, identity.Method)
155 } else {
156 // Test mode only: construct deterministically when no resolver available
157 profile.Handle = constructHandleFromProfile(profile)
158 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)",
159 profile.Handle, profile.Name, profile.HostedBy)
160 }
161 }
162
163 // SECURITY: Verify hostedBy claim matches handle domain
164 // This prevents malicious instances from claiming to host communities for domains they don't own
165 if err := c.verifyHostedByClaim(ctx, profile.Handle, profile.HostedBy); err != nil {
166 log.Printf("🚨 SECURITY: Rejecting community %s - hostedBy verification failed: %v", did, err)
167 log.Printf(" Handle: %s, HostedBy: %s", profile.Handle, profile.HostedBy)
168 return fmt.Errorf("hostedBy verification failed: %w", err)
169 }
170
171 // Build AT-URI for this record
172 // V2 Architecture (ONLY):
173 // - 'did' parameter IS the community DID (community owns its own repo)
174 // - rkey MUST be "self" for community profiles
175 // - URI: at://community_did/social.coves.community.profile/self
176
177 // REJECT non-V2 communities (pre-production: no V1 compatibility)
178 if commit.RKey != "self" {
179 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
180 }
181
182 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did)
183
184 // V2: Community ALWAYS owns itself
185 ownerDID := did
186
187 // Create community entity
188 community := &communities.Community{
189 DID: did, // V2: Repository DID IS the community DID
190 Handle: profile.Handle,
191 Name: profile.Name,
192 DisplayName: profile.DisplayName,
193 Description: profile.Description,
194 OwnerDID: ownerDID, // V2: same as DID (self-owned)
195 CreatedByDID: profile.CreatedBy,
196 HostedByDID: profile.HostedBy,
197 Visibility: profile.Visibility,
198 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery,
199 ModerationType: profile.ModerationType,
200 ContentWarnings: profile.ContentWarnings,
201 MemberCount: profile.MemberCount,
202 SubscriberCount: profile.SubscriberCount,
203 FederatedFrom: profile.FederatedFrom,
204 FederatedID: profile.FederatedID,
205 CreatedAt: profile.CreatedAt,
206 UpdatedAt: time.Now(),
207 RecordURI: uri,
208 RecordCID: commit.CID,
209 }
210
211 // Handle blobs (avatar/banner) if present
212 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
213 community.AvatarCID = avatarCID
214 }
215 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
216 community.BannerCID = bannerCID
217 }
218
219 // Handle description facets (rich text)
220 if profile.DescriptionFacets != nil {
221 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
222 if marshalErr == nil {
223 community.DescriptionFacets = facetsJSON
224 }
225 }
226
227 // Index in AppView database
228 _, err = c.repo.Create(ctx, community)
229 if err != nil {
230 // Check if it already exists (idempotency)
231 if communities.IsConflict(err) {
232 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID)
233 return nil
234 }
235 return fmt.Errorf("failed to index community: %w", err)
236 }
237
238 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID)
239 return nil
240}
241
242// updateCommunity updates an existing community from the firehose
243func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error {
244 if commit.Record == nil {
245 return fmt.Errorf("community profile update event missing record data")
246 }
247
248 // REJECT non-V2 communities (pre-production: no V1 compatibility)
249 if commit.RKey != "self" {
250 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
251 }
252
253 // Parse profile
254 profile, err := parseCommunityProfile(commit.Record)
255 if err != nil {
256 return fmt.Errorf("failed to parse community profile: %w", err)
257 }
258
259 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs)
260 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID
261 if profile.Handle == "" {
262 if c.identityResolver != nil {
263 // Production: Resolve handle from PLC (source of truth)
264 // NO FALLBACK - if PLC is down, we fail and backfill later
265 // This prevents creating communities with incorrect handles in federated scenarios
266 identity, err := c.identityResolver.Resolve(ctx, did)
267 if err != nil {
268 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err)
269 }
270 profile.Handle = identity.Handle
271 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)",
272 profile.Handle, did, identity.Method)
273 } else {
274 // Test mode only: construct deterministically when no resolver available
275 profile.Handle = constructHandleFromProfile(profile)
276 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)",
277 profile.Handle, profile.Name, profile.HostedBy)
278 }
279 }
280
281 // V2: Repository DID IS the community DID
282 // Get existing community using the repo DID
283 existing, err := c.repo.GetByDID(ctx, did)
284 if err != nil {
285 if communities.IsNotFound(err) {
286 // Community doesn't exist yet - treat as create
287 log.Printf("Community not found for update, creating: %s", did)
288 return c.createCommunity(ctx, did, commit)
289 }
290 return fmt.Errorf("failed to get existing community: %w", err)
291 }
292
293 // Update fields
294 existing.Handle = profile.Handle
295 existing.Name = profile.Name
296 existing.DisplayName = profile.DisplayName
297 existing.Description = profile.Description
298 existing.Visibility = profile.Visibility
299 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery
300 existing.ModerationType = profile.ModerationType
301 existing.ContentWarnings = profile.ContentWarnings
302 existing.RecordCID = commit.CID
303
304 // Update blobs
305 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
306 existing.AvatarCID = avatarCID
307 }
308 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
309 existing.BannerCID = bannerCID
310 }
311
312 // Update description facets
313 if profile.DescriptionFacets != nil {
314 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
315 if marshalErr == nil {
316 existing.DescriptionFacets = facetsJSON
317 }
318 }
319
320 // Save updates
321 _, err = c.repo.Update(ctx, existing)
322 if err != nil {
323 return fmt.Errorf("failed to update community: %w", err)
324 }
325
326 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID)
327 return nil
328}
329
330// deleteCommunity removes a community from the index
331func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error {
332 err := c.repo.Delete(ctx, did)
333 if err != nil {
334 if communities.IsNotFound(err) {
335 log.Printf("Community already deleted: %s", did)
336 return nil
337 }
338 return fmt.Errorf("failed to delete community: %w", err)
339 }
340
341 log.Printf("Deleted community: %s", did)
342 return nil
343}
344
345// verifyHostedByClaim verifies that the community's hostedBy claim matches the handle domain
346// This prevents malicious instances from claiming to host communities for domains they don't own
347func (c *CommunityEventConsumer) verifyHostedByClaim(ctx context.Context, handle, hostedByDID string) error {
348 // Skip verification in dev mode
349 if c.skipVerification {
350 return nil
351 }
352
353 // Add 15 second overall timeout to prevent slow verification from blocking consumer (PR review feedback)
354 ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
355 defer cancel()
356
357 // Verify hostedByDID is did:web format
358 if !strings.HasPrefix(hostedByDID, "did:web:") {
359 return fmt.Errorf("hostedByDID must use did:web method, got: %s", hostedByDID)
360 }
361
362 // Extract domain from did:web DID
363 hostedByDomain := strings.TrimPrefix(hostedByDID, "did:web:")
364
365 // Extract domain from community handle
366 // Handle format examples:
367 // - "!gaming@coves.social" → domain: "coves.social"
368 // - "gaming.community.coves.social" → domain: "coves.social"
369 handleDomain := extractDomainFromHandle(handle)
370 if handleDomain == "" {
371 return fmt.Errorf("failed to extract domain from handle: %s", handle)
372 }
373
374 // Verify handle domain matches hostedBy domain
375 if handleDomain != hostedByDomain {
376 return fmt.Errorf("handle domain (%s) doesn't match hostedBy domain (%s)", handleDomain, hostedByDomain)
377 }
378
379 // SECURITY: Verify DID document exists and is valid (Bluesky-compatible security model)
380 // MANDATORY bidirectional verification: DID document must claim this handle in alsoKnownAs
381 // This matches Bluesky's security requirements and prevents domain impersonation
382 if err := c.verifyDIDDocument(ctx, hostedByDID, hostedByDomain, handle); err != nil {
383 log.Printf("🚨 SECURITY: Rejecting community - bidirectional DID verification failed: %v", err)
384 return fmt.Errorf("bidirectional DID verification required: %w", err)
385 }
386
387 return nil
388}
389
390// verifyDIDDocument fetches and validates the DID document from .well-known/did.json
391// Implements Bluesky's bidirectional verification model:
392// 1. Verify DID document exists at https://domain/.well-known/did.json
393// 2. Verify DID document ID matches claimed DID
394// 3. Verify DID document claims the handle in alsoKnownAs field
395// Results are cached with TTL and rate-limited to prevent DoS attacks
396func (c *CommunityEventConsumer) verifyDIDDocument(ctx context.Context, did, domain, handle string) error {
397 // Skip verification in dev mode
398 if c.skipVerification {
399 return nil
400 }
401
402 // Check bounded LRU cache first (thread-safe, no locks needed)
403 if cached, ok := c.didCache.Get(did); ok {
404 // Check if cache entry is still valid (not expired)
405 if time.Now().Before(cached.expiresAt) {
406 if !cached.valid {
407 return fmt.Errorf("cached verification failure for %s", did)
408 }
409 log.Printf("✓ DID document verification (cached): %s", domain)
410 return nil
411 }
412 // Cache entry expired - remove it to free up space for fresh entries
413 c.didCache.Remove(did)
414 }
415
416 // Rate limit .well-known fetches to prevent DoS
417 if err := c.wellKnownLimiter.Wait(ctx); err != nil {
418 return fmt.Errorf("rate limit exceeded for .well-known fetch: %w", err)
419 }
420
421 // Construct .well-known URL
422 didDocURL := fmt.Sprintf("https://%s/.well-known/did.json", domain)
423
424 // Create HTTP request with timeout
425 req, err := http.NewRequestWithContext(ctx, "GET", didDocURL, nil)
426 if err != nil {
427 // Cache the failure
428 c.cacheVerificationResult(did, false, 5*time.Minute)
429 return fmt.Errorf("failed to create request: %w", err)
430 }
431
432 // Fetch DID document using shared HTTP client
433 resp, err := c.httpClient.Do(req)
434 if err != nil {
435 // Cache the failure (shorter TTL for network errors)
436 c.cacheVerificationResult(did, false, 5*time.Minute)
437 return fmt.Errorf("failed to fetch DID document from %s: %w", didDocURL, err)
438 }
439 defer func() {
440 if closeErr := resp.Body.Close(); closeErr != nil {
441 log.Printf("Failed to close response body: %v", closeErr)
442 }
443 }()
444
445 // Verify HTTP status
446 if resp.StatusCode != http.StatusOK {
447 // Cache the failure
448 c.cacheVerificationResult(did, false, 5*time.Minute)
449 return fmt.Errorf("DID document returned HTTP %d from %s", resp.StatusCode, didDocURL)
450 }
451
452 // Parse DID document
453 var didDoc struct {
454 ID string `json:"id"`
455 AlsoKnownAs []string `json:"alsoKnownAs"`
456 }
457 if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil {
458 // Cache the failure
459 c.cacheVerificationResult(did, false, 5*time.Minute)
460 return fmt.Errorf("failed to parse DID document JSON: %w", err)
461 }
462
463 // Verify DID document ID matches claimed DID
464 if didDoc.ID != did {
465 // Cache the failure
466 c.cacheVerificationResult(did, false, 5*time.Minute)
467 return fmt.Errorf("DID document ID (%s) doesn't match claimed DID (%s)", didDoc.ID, did)
468 }
469
470 // SECURITY: Bidirectional verification - DID document must claim this handle
471 // Prevents impersonation where someone points DNS to another user's DID
472 // Format: handle "coves.social" or "!community@coves.social" → check for "at://coves.social"
473 handleDomain := extractDomainFromHandle(handle)
474 expectedAlias := fmt.Sprintf("at://%s", handleDomain)
475
476 found := false
477 for _, alias := range didDoc.AlsoKnownAs {
478 if alias == expectedAlias {
479 found = true
480 break
481 }
482 }
483
484 if !found {
485 // Cache the failure
486 c.cacheVerificationResult(did, false, 5*time.Minute)
487 return fmt.Errorf("DID document does not claim handle domain %s in alsoKnownAs (expected %s, got %v)",
488 handleDomain, expectedAlias, didDoc.AlsoKnownAs)
489 }
490
491 // Cache the success (24 hour TTL - matches Bluesky recommendations)
492 c.cacheVerificationResult(did, true, 24*time.Hour)
493
494 log.Printf("✓ DID document verified: %s", domain)
495 return nil
496}
497
498// cacheVerificationResult stores a verification result in the bounded LRU cache with the given TTL
499// The LRU cache is thread-safe and automatically evicts least-recently-used entries when full
500func (c *CommunityEventConsumer) cacheVerificationResult(did string, valid bool, ttl time.Duration) {
501 c.didCache.Add(did, cachedDIDDoc{
502 valid: valid,
503 expiresAt: time.Now().Add(ttl),
504 })
505}
506
507// extractDomainFromHandle extracts the registrable domain from a community handle
508// Handles both formats:
509// - Bluesky-style: "!gaming@coves.social" → "coves.social"
510// - DNS-style: "gaming.community.coves.social" → "coves.social"
511//
512// Uses golang.org/x/net/publicsuffix to correctly handle multi-part TLDs:
513// - "gaming.community.coves.co.uk" → "coves.co.uk" (not "co.uk")
514// - "gaming.community.example.com.au" → "example.com.au" (not "com.au")
515func extractDomainFromHandle(handle string) string {
516 // Remove leading ! if present
517 handle = strings.TrimPrefix(handle, "!")
518
519 // Check for @-separated format (e.g., "gaming@coves.social")
520 if strings.Contains(handle, "@") {
521 parts := strings.Split(handle, "@")
522 if len(parts) == 2 {
523 domain := parts[1]
524 // Validate and extract eTLD+1 from the @-domain part
525 registrable, err := publicsuffix.EffectiveTLDPlusOne(domain)
526 if err != nil {
527 // If publicsuffix fails, fall back to returning the full domain part
528 // This handles edge cases like localhost, IP addresses, etc.
529 return domain
530 }
531 return registrable
532 }
533 return ""
534 }
535
536 // For DNS-style handles (e.g., "gaming.community.coves.social")
537 // Extract the registrable domain (eTLD+1) using publicsuffix
538 // This correctly handles multi-part TLDs like .co.uk, .com.au, etc.
539 registrable, err := publicsuffix.EffectiveTLDPlusOne(handle)
540 if err != nil {
541 // If publicsuffix fails (e.g., invalid TLD, localhost, IP address)
542 // fall back to naive extraction (last 2 parts)
543 // This maintains backward compatibility for edge cases
544 parts := strings.Split(handle, ".")
545 if len(parts) < 2 {
546 return "" // Invalid handle
547 }
548 return strings.Join(parts[len(parts)-2:], ".")
549 }
550
551 return registrable
552}
553
554// handleSubscription processes subscription create/delete events
555// CREATE operation = user subscribed to community
556// DELETE operation = user unsubscribed from community
557func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
558 switch commit.Operation {
559 case "create":
560 return c.createSubscription(ctx, userDID, commit)
561 case "delete":
562 return c.deleteSubscription(ctx, userDID, commit)
563 default:
564 // Update operations shouldn't happen on subscriptions, but ignore gracefully
565 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)",
566 commit.Operation, userDID, commit.RKey)
567 return nil
568 }
569}
570
571// createSubscription indexes a new subscription with retry logic
572func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
573 if commit.Record == nil {
574 return fmt.Errorf("subscription create event missing record data")
575 }
576
577 // Extract community DID from record's subject field (following atProto conventions)
578 communityDID, ok := commit.Record["subject"].(string)
579 if !ok {
580 return fmt.Errorf("subscription record missing subject field")
581 }
582
583 // Extract contentVisibility with clamping and default value
584 contentVisibility := extractContentVisibility(commit.Record)
585
586 // Build AT-URI for subscription record
587 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint
588 // The record lives in the USER's repository, but uses the communities namespace
589 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
590
591 // Create subscription entity
592 // Parse createdAt from record to preserve chronological ordering during replays
593 subscription := &communities.Subscription{
594 UserDID: userDID,
595 CommunityDID: communityDID,
596 ContentVisibility: contentVisibility,
597 SubscribedAt: utils.ParseCreatedAt(commit.Record),
598 RecordURI: uri,
599 RecordCID: commit.CID,
600 }
601
602 // Use transactional method to ensure subscription and count are atomically updated
603 // This is idempotent - safe for Jetstream replays
604 _, err := c.repo.SubscribeWithCount(ctx, subscription)
605 if err != nil {
606 // If already exists, that's fine (idempotency)
607 if communities.IsConflict(err) {
608 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)",
609 userDID, communityDID, contentVisibility)
610 return nil
611 }
612 return fmt.Errorf("failed to index subscription: %w", err)
613 }
614
615 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)",
616 userDID, communityDID, contentVisibility)
617 return nil
618}
619
620// deleteSubscription removes a subscription from the index
621// DELETE operations don't include record data, so we need to look up the subscription
622// by its URI to find which community the user unsubscribed from
623func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
624 // Build AT-URI from the rkey
625 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
626
627 // Look up the subscription to get the community DID
628 // (DELETE operations don't include record data in Jetstream)
629 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri)
630 if err != nil {
631 if communities.IsNotFound(err) {
632 // Already deleted - this is fine (idempotency)
633 log.Printf("Subscription already deleted: %s", uri)
634 return nil
635 }
636 return fmt.Errorf("failed to find subscription for deletion: %w", err)
637 }
638
639 // Use transactional method to ensure unsubscribe and count are atomically updated
640 // This is idempotent - safe for Jetstream replays
641 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID)
642 if err != nil {
643 if communities.IsNotFound(err) {
644 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID)
645 return nil
646 }
647 return fmt.Errorf("failed to remove subscription: %w", err)
648 }
649
650 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID)
651 return nil
652}
653
654// handleBlock processes block create/delete events
655// CREATE operation = user blocked a community
656// DELETE operation = user unblocked a community
657func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
658 switch commit.Operation {
659 case "create":
660 return c.createBlock(ctx, userDID, commit)
661 case "delete":
662 return c.deleteBlock(ctx, userDID, commit)
663 default:
664 // Update operations shouldn't happen on blocks, but ignore gracefully
665 log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)",
666 commit.Operation, userDID, commit.RKey)
667 return nil
668 }
669}
670
671// createBlock indexes a new block
672func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
673 if commit.Record == nil {
674 return fmt.Errorf("block create event missing record data")
675 }
676
677 // Extract community DID from record's subject field (following atProto conventions)
678 communityDID, ok := commit.Record["subject"].(string)
679 if !ok {
680 return fmt.Errorf("block record missing subject field")
681 }
682
683 // Build AT-URI for block record
684 // The record lives in the USER's repository
685 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
686
687 // Create block entity
688 // Parse createdAt from record to preserve chronological ordering during replays
689 block := &communities.CommunityBlock{
690 UserDID: userDID,
691 CommunityDID: communityDID,
692 BlockedAt: utils.ParseCreatedAt(commit.Record),
693 RecordURI: uri,
694 RecordCID: commit.CID,
695 }
696
697 // Index the block
698 // This is idempotent - safe for Jetstream replays
699 _, err := c.repo.BlockCommunity(ctx, block)
700 if err != nil {
701 // If already exists, that's fine (idempotency)
702 if communities.IsConflict(err) {
703 log.Printf("Block already indexed: %s -> %s", userDID, communityDID)
704 return nil
705 }
706 return fmt.Errorf("failed to index block: %w", err)
707 }
708
709 log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID)
710 return nil
711}
712
713// deleteBlock removes a block from the index
714// DELETE operations don't include record data, so we need to look up the block
715// by its URI to find which community the user unblocked
716func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
717 // Build AT-URI from the rkey
718 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
719
720 // Look up the block to get the community DID
721 // (DELETE operations don't include record data in Jetstream)
722 block, err := c.repo.GetBlockByURI(ctx, uri)
723 if err != nil {
724 if communities.IsNotFound(err) {
725 // Already deleted - this is fine (idempotency)
726 log.Printf("Block already deleted: %s", uri)
727 return nil
728 }
729 return fmt.Errorf("failed to find block for deletion: %w", err)
730 }
731
732 // Remove the block from the index
733 err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID)
734 if err != nil {
735 if communities.IsNotFound(err) {
736 log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID)
737 return nil
738 }
739 return fmt.Errorf("failed to remove block: %w", err)
740 }
741
742 log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID)
743 return nil
744}
745
746// Helper types and functions
747
748type CommunityProfile struct {
749 CreatedAt time.Time `json:"createdAt"`
750 Avatar map[string]interface{} `json:"avatar"`
751 Banner map[string]interface{} `json:"banner"`
752 CreatedBy string `json:"createdBy"`
753 Visibility string `json:"visibility"`
754 AtprotoHandle string `json:"atprotoHandle"`
755 DisplayName string `json:"displayName"`
756 Name string `json:"name"`
757 Handle string `json:"handle"`
758 HostedBy string `json:"hostedBy"`
759 Description string `json:"description"`
760 FederatedID string `json:"federatedId"`
761 ModerationType string `json:"moderationType"`
762 FederatedFrom string `json:"federatedFrom"`
763 ContentWarnings []string `json:"contentWarnings"`
764 DescriptionFacets []interface{} `json:"descriptionFacets"`
765 MemberCount int `json:"memberCount"`
766 SubscriberCount int `json:"subscriberCount"`
767 Federation FederationConfig `json:"federation"`
768}
769
770type FederationConfig struct {
771 AllowExternalDiscovery bool `json:"allowExternalDiscovery"`
772}
773
774// parseCommunityProfile converts a raw record map to a CommunityProfile
775func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) {
776 recordJSON, err := json.Marshal(record)
777 if err != nil {
778 return nil, fmt.Errorf("failed to marshal record: %w", err)
779 }
780
781 var profile CommunityProfile
782 if err := json.Unmarshal(recordJSON, &profile); err != nil {
783 return nil, fmt.Errorf("failed to unmarshal profile: %w", err)
784 }
785
786 return &profile, nil
787}
788
789// constructHandleFromProfile constructs a deterministic handle from profile data
790// Format: {name}.community.{instanceDomain}
791// Example: gaming.community.coves.social
792// This is ONLY used in test mode (when identity resolver is nil)
793// Production MUST resolve handles from PLC (source of truth)
794// Returns empty string if hostedBy is not did:web format (caller will fail validation)
795func constructHandleFromProfile(profile *CommunityProfile) string {
796 if !strings.HasPrefix(profile.HostedBy, "did:web:") {
797 // hostedBy must be did:web format for handle construction
798 // Return empty to trigger validation error in repository
799 return ""
800 }
801 instanceDomain := strings.TrimPrefix(profile.HostedBy, "did:web:")
802 return fmt.Sprintf("%s.community.%s", profile.Name, instanceDomain)
803}
804
805// extractContentVisibility extracts contentVisibility from subscription record with clamping
806// Returns default value of 3 if missing or invalid
807func extractContentVisibility(record map[string]interface{}) int {
808 const defaultVisibility = 3
809
810 cv, ok := record["contentVisibility"]
811 if !ok {
812 // Field missing - use default
813 return defaultVisibility
814 }
815
816 // JSON numbers decode as float64
817 cvFloat, ok := cv.(float64)
818 if !ok {
819 // Try int (shouldn't happen but handle gracefully)
820 if cvInt, isInt := cv.(int); isInt {
821 return clampContentVisibility(cvInt)
822 }
823 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv)
824 return defaultVisibility
825 }
826
827 // Convert and clamp
828 clamped := clampContentVisibility(int(cvFloat))
829 if clamped != int(cvFloat) {
830 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped)
831 }
832 return clamped
833}
834
835// clampContentVisibility ensures value is within valid range (1-5)
836func clampContentVisibility(value int) int {
837 if value < 1 {
838 return 1
839 }
840 if value > 5 {
841 return 5
842 }
843 return value
844}
845
846// extractBlobCID extracts the CID from a blob reference
847// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123}
848func extractBlobCID(blob map[string]interface{}) (string, bool) {
849 if blob == nil {
850 return "", false
851 }
852
853 // Check if it's a blob type
854 blobType, ok := blob["$type"].(string)
855 if !ok || blobType != "blob" {
856 return "", false
857 }
858
859 // Extract ref
860 ref, ok := blob["ref"].(map[string]interface{})
861 if !ok {
862 return "", false
863 }
864
865 // Extract $link (the CID)
866 link, ok := ref["$link"].(string)
867 if !ok {
868 return "", false
869 }
870
871 return link, true
872}