A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "net/http"
9 "strings"
10 "time"
11
12 "Coves/internal/atproto/identity"
13 "Coves/internal/atproto/utils"
14 "Coves/internal/core/communities"
15
16 lru "github.com/hashicorp/golang-lru/v2"
17 "golang.org/x/net/publicsuffix"
18 "golang.org/x/time/rate"
19)
20
21// CommunityEventConsumer consumes community-related events from Jetstream
22type CommunityEventConsumer struct {
23 repo communities.Repository // Repository for community operations
24 identityResolver interface {
25 Resolve(context.Context, string) (*identity.Identity, error)
26 } // For resolving handles from DIDs
27 httpClient *http.Client // Shared HTTP client with connection pooling
28 didCache *lru.Cache[string, cachedDIDDoc] // Bounded LRU cache for .well-known verification results
29 wellKnownLimiter *rate.Limiter // Rate limiter for .well-known fetches
30 instanceDID string // DID of this Coves instance
31 skipVerification bool // Skip did:web verification (for dev mode)
32}
33
34// cachedDIDDoc represents a cached verification result with expiration
35type cachedDIDDoc struct {
36 expiresAt time.Time // When this cache entry expires
37 valid bool // Whether verification passed
38}
39
40// NewCommunityEventConsumer creates a new Jetstream consumer for community events
41// instanceDID: The DID of this Coves instance (for hostedBy verification)
42// skipVerification: Skip did:web verification (for dev mode)
43// identityResolver: Optional resolver for resolving handles from DIDs (can be nil for tests)
44func NewCommunityEventConsumer(repo communities.Repository, instanceDID string, skipVerification bool, identityResolver interface {
45 Resolve(context.Context, string) (*identity.Identity, error)
46},
47) *CommunityEventConsumer {
48 // Create bounded LRU cache for DID document verification results
49 // Max 1000 entries to prevent unbounded memory growth (PR review feedback)
50 // Each entry ~100 bytes → max ~100KB memory overhead
51 cache, err := lru.New[string, cachedDIDDoc](1000)
52 if err != nil {
53 // This should never happen with a valid size, but handle gracefully
54 log.Printf("WARNING: Failed to create DID cache, verification will be slower: %v", err)
55 // Create minimal cache to avoid nil pointer
56 cache, _ = lru.New[string, cachedDIDDoc](1)
57 }
58
59 return &CommunityEventConsumer{
60 repo: repo,
61 identityResolver: identityResolver, // Optional - can be nil for tests
62 instanceDID: instanceDID,
63 skipVerification: skipVerification,
64 // Shared HTTP client with connection pooling for .well-known fetches
65 httpClient: &http.Client{
66 Timeout: 10 * time.Second,
67 Transport: &http.Transport{
68 MaxIdleConns: 100,
69 MaxIdleConnsPerHost: 10,
70 IdleConnTimeout: 90 * time.Second,
71 },
72 },
73 // Bounded LRU cache for .well-known verification results (max 1000 entries)
74 // Automatically evicts least-recently-used entries when full
75 didCache: cache,
76 // Rate limiter: 10 requests per second, burst of 20
77 // Prevents DoS via excessive .well-known fetches
78 wellKnownLimiter: rate.NewLimiter(10, 20),
79 }
80}
81
82// HandleEvent processes a Jetstream event for community records
83// This is called by the main Jetstream consumer when it receives commit events
84func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
85 // We only care about commit events for community records
86 if event.Kind != "commit" || event.Commit == nil {
87 return nil
88 }
89
90 commit := event.Commit
91
92 // Route to appropriate handler based on collection
93 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures
94 // - social.coves.community.profile: Community profile records (in community's own repo)
95 // - social.coves.community.subscription: Subscription records (in user's repo)
96 // - social.coves.community.block: Block records (in user's repo)
97 //
98 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints
99 // that CREATE or DELETE records in these collections
100 switch commit.Collection {
101 case "social.coves.community.profile":
102 return c.handleCommunityProfile(ctx, event.Did, commit)
103 case "social.coves.community.subscription":
104 // Handle both create (subscribe) and delete (unsubscribe) operations
105 return c.handleSubscription(ctx, event.Did, commit)
106 case "social.coves.community.block":
107 // Handle both create (block) and delete (unblock) operations
108 return c.handleBlock(ctx, event.Did, commit)
109 default:
110 // Not a community-related collection
111 return nil
112 }
113}
114
115// handleCommunityProfile processes community profile create/update/delete events
116func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error {
117 switch commit.Operation {
118 case "create":
119 return c.createCommunity(ctx, did, commit)
120 case "update":
121 return c.updateCommunity(ctx, did, commit)
122 case "delete":
123 return c.deleteCommunity(ctx, did)
124 default:
125 log.Printf("Unknown operation for community profile: %s", commit.Operation)
126 return nil
127 }
128}
129
130// createCommunity indexes a new community from the firehose
131func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error {
132 if commit.Record == nil {
133 return fmt.Errorf("community profile create event missing record data")
134 }
135
136 // Parse the community profile record
137 profile, err := parseCommunityProfile(commit.Record)
138 if err != nil {
139 return fmt.Errorf("failed to parse community profile: %w", err)
140 }
141
142 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs)
143 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID
144 if profile.Handle == "" {
145 if c.identityResolver != nil {
146 // Production: Resolve handle from PLC (source of truth)
147 // NO FALLBACK - if PLC is down, we fail and backfill later
148 // This prevents creating communities with incorrect handles in federated scenarios
149 identity, err := c.identityResolver.Resolve(ctx, did)
150 if err != nil {
151 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err)
152 }
153 profile.Handle = identity.Handle
154 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)",
155 profile.Handle, did, identity.Method)
156 } else {
157 // Test mode only: construct deterministically when no resolver available
158 profile.Handle = constructHandleFromProfile(profile)
159 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)",
160 profile.Handle, profile.Name, profile.HostedBy)
161 }
162 }
163
164 // SECURITY: Verify hostedBy claim matches handle domain
165 // This prevents malicious instances from claiming to host communities for domains they don't own
166 if err := c.verifyHostedByClaim(ctx, profile.Handle, profile.HostedBy); err != nil {
167 log.Printf("🚨 SECURITY: Rejecting community %s - hostedBy verification failed: %v", did, err)
168 log.Printf(" Handle: %s, HostedBy: %s", profile.Handle, profile.HostedBy)
169 return fmt.Errorf("hostedBy verification failed: %w", err)
170 }
171
172 // Build AT-URI for this record
173 // V2 Architecture (ONLY):
174 // - 'did' parameter IS the community DID (community owns its own repo)
175 // - rkey MUST be "self" for community profiles
176 // - URI: at://community_did/social.coves.community.profile/self
177
178 // REJECT non-V2 communities (pre-production: no V1 compatibility)
179 if commit.RKey != "self" {
180 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
181 }
182
183 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did)
184
185 // V2: Community ALWAYS owns itself
186 ownerDID := did
187
188 // Create community entity
189 community := &communities.Community{
190 DID: did, // V2: Repository DID IS the community DID
191 Handle: profile.Handle,
192 Name: profile.Name,
193 DisplayName: profile.DisplayName,
194 Description: profile.Description,
195 OwnerDID: ownerDID, // V2: same as DID (self-owned)
196 CreatedByDID: profile.CreatedBy,
197 HostedByDID: profile.HostedBy,
198 Visibility: profile.Visibility,
199 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery,
200 ModerationType: profile.ModerationType,
201 ContentWarnings: profile.ContentWarnings,
202 MemberCount: profile.MemberCount,
203 SubscriberCount: profile.SubscriberCount,
204 FederatedFrom: profile.FederatedFrom,
205 FederatedID: profile.FederatedID,
206 CreatedAt: profile.CreatedAt,
207 UpdatedAt: time.Now(),
208 RecordURI: uri,
209 RecordCID: commit.CID,
210 }
211
212 // Handle blobs (avatar/banner) if present
213 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
214 community.AvatarCID = avatarCID
215 }
216 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
217 community.BannerCID = bannerCID
218 }
219
220 // Handle description facets (rich text)
221 if profile.DescriptionFacets != nil {
222 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
223 if marshalErr == nil {
224 community.DescriptionFacets = facetsJSON
225 }
226 }
227
228 // Index in AppView database
229 _, err = c.repo.Create(ctx, community)
230 if err != nil {
231 // Check if it already exists (idempotency)
232 if communities.IsConflict(err) {
233 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID)
234 return nil
235 }
236 return fmt.Errorf("failed to index community: %w", err)
237 }
238
239 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID)
240 return nil
241}
242
243// updateCommunity updates an existing community from the firehose
244func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error {
245 if commit.Record == nil {
246 return fmt.Errorf("community profile update event missing record data")
247 }
248
249 // REJECT non-V2 communities (pre-production: no V1 compatibility)
250 if commit.RKey != "self" {
251 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
252 }
253
254 // Parse profile
255 profile, err := parseCommunityProfile(commit.Record)
256 if err != nil {
257 return fmt.Errorf("failed to parse community profile: %w", err)
258 }
259
260 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs)
261 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID
262 if profile.Handle == "" {
263 if c.identityResolver != nil {
264 // Production: Resolve handle from PLC (source of truth)
265 // NO FALLBACK - if PLC is down, we fail and backfill later
266 // This prevents creating communities with incorrect handles in federated scenarios
267 identity, err := c.identityResolver.Resolve(ctx, did)
268 if err != nil {
269 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err)
270 }
271 profile.Handle = identity.Handle
272 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)",
273 profile.Handle, did, identity.Method)
274 } else {
275 // Test mode only: construct deterministically when no resolver available
276 profile.Handle = constructHandleFromProfile(profile)
277 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)",
278 profile.Handle, profile.Name, profile.HostedBy)
279 }
280 }
281
282 // V2: Repository DID IS the community DID
283 // Get existing community using the repo DID
284 existing, err := c.repo.GetByDID(ctx, did)
285 if err != nil {
286 if communities.IsNotFound(err) {
287 // Community doesn't exist yet - treat as create
288 log.Printf("Community not found for update, creating: %s", did)
289 return c.createCommunity(ctx, did, commit)
290 }
291 return fmt.Errorf("failed to get existing community: %w", err)
292 }
293
294 // Update fields
295 existing.Handle = profile.Handle
296 existing.Name = profile.Name
297 existing.DisplayName = profile.DisplayName
298 existing.Description = profile.Description
299 existing.Visibility = profile.Visibility
300 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery
301 existing.ModerationType = profile.ModerationType
302 existing.ContentWarnings = profile.ContentWarnings
303 existing.RecordCID = commit.CID
304
305 // Update blobs
306 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
307 existing.AvatarCID = avatarCID
308 }
309 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
310 existing.BannerCID = bannerCID
311 }
312
313 // Update description facets
314 if profile.DescriptionFacets != nil {
315 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
316 if marshalErr == nil {
317 existing.DescriptionFacets = facetsJSON
318 }
319 }
320
321 // Save updates
322 _, err = c.repo.Update(ctx, existing)
323 if err != nil {
324 return fmt.Errorf("failed to update community: %w", err)
325 }
326
327 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID)
328 return nil
329}
330
331// deleteCommunity removes a community from the index
332func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error {
333 err := c.repo.Delete(ctx, did)
334 if err != nil {
335 if communities.IsNotFound(err) {
336 log.Printf("Community already deleted: %s", did)
337 return nil
338 }
339 return fmt.Errorf("failed to delete community: %w", err)
340 }
341
342 log.Printf("Deleted community: %s", did)
343 return nil
344}
345
346// verifyHostedByClaim verifies that the community's hostedBy claim matches the handle domain
347// This prevents malicious instances from claiming to host communities for domains they don't own
348func (c *CommunityEventConsumer) verifyHostedByClaim(ctx context.Context, handle, hostedByDID string) error {
349 // Skip verification in dev mode
350 if c.skipVerification {
351 return nil
352 }
353
354 // Add 15 second overall timeout to prevent slow verification from blocking consumer (PR review feedback)
355 ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
356 defer cancel()
357
358 // Verify hostedByDID is did:web format
359 if !strings.HasPrefix(hostedByDID, "did:web:") {
360 return fmt.Errorf("hostedByDID must use did:web method, got: %s", hostedByDID)
361 }
362
363 // Extract domain from did:web DID
364 hostedByDomain := strings.TrimPrefix(hostedByDID, "did:web:")
365
366 // Extract domain from community handle
367 // Handle format examples:
368 // - "!gaming@coves.social" → domain: "coves.social"
369 // - "gaming.community.coves.social" → domain: "coves.social"
370 handleDomain := extractDomainFromHandle(handle)
371 if handleDomain == "" {
372 return fmt.Errorf("failed to extract domain from handle: %s", handle)
373 }
374
375 // Verify handle domain matches hostedBy domain
376 if handleDomain != hostedByDomain {
377 return fmt.Errorf("handle domain (%s) doesn't match hostedBy domain (%s)", handleDomain, hostedByDomain)
378 }
379
380 // SECURITY: Verify DID document exists and is valid (Bluesky-compatible security model)
381 // MANDATORY bidirectional verification: DID document must claim this handle in alsoKnownAs
382 // This matches Bluesky's security requirements and prevents domain impersonation
383 if err := c.verifyDIDDocument(ctx, hostedByDID, hostedByDomain, handle); err != nil {
384 log.Printf("🚨 SECURITY: Rejecting community - bidirectional DID verification failed: %v", err)
385 return fmt.Errorf("bidirectional DID verification required: %w", err)
386 }
387
388 return nil
389}
390
391// verifyDIDDocument fetches and validates the DID document from .well-known/did.json
392// Implements Bluesky's bidirectional verification model:
393// 1. Verify DID document exists at https://domain/.well-known/did.json
394// 2. Verify DID document ID matches claimed DID
395// 3. Verify DID document claims the handle in alsoKnownAs field
396//
397// Results are cached with TTL and rate-limited to prevent DoS attacks
398func (c *CommunityEventConsumer) verifyDIDDocument(ctx context.Context, did, domain, handle string) error {
399 // Skip verification in dev mode
400 if c.skipVerification {
401 return nil
402 }
403
404 // Check bounded LRU cache first (thread-safe, no locks needed)
405 if cached, ok := c.didCache.Get(did); ok {
406 // Check if cache entry is still valid (not expired)
407 if time.Now().Before(cached.expiresAt) {
408 if !cached.valid {
409 return fmt.Errorf("cached verification failure for %s", did)
410 }
411 log.Printf("✓ DID document verification (cached): %s", domain)
412 return nil
413 }
414 // Cache entry expired - remove it to free up space for fresh entries
415 c.didCache.Remove(did)
416 }
417
418 // Rate limit .well-known fetches to prevent DoS
419 if err := c.wellKnownLimiter.Wait(ctx); err != nil {
420 return fmt.Errorf("rate limit exceeded for .well-known fetch: %w", err)
421 }
422
423 // Construct .well-known URL
424 didDocURL := fmt.Sprintf("https://%s/.well-known/did.json", domain)
425
426 // Create HTTP request with timeout
427 req, err := http.NewRequestWithContext(ctx, "GET", didDocURL, nil)
428 if err != nil {
429 // Cache the failure
430 c.cacheVerificationResult(did, false, 5*time.Minute)
431 return fmt.Errorf("failed to create request: %w", err)
432 }
433
434 // Fetch DID document using shared HTTP client
435 resp, err := c.httpClient.Do(req)
436 if err != nil {
437 // Cache the failure (shorter TTL for network errors)
438 c.cacheVerificationResult(did, false, 5*time.Minute)
439 return fmt.Errorf("failed to fetch DID document from %s: %w", didDocURL, err)
440 }
441 defer func() {
442 if closeErr := resp.Body.Close(); closeErr != nil {
443 log.Printf("Failed to close response body: %v", closeErr)
444 }
445 }()
446
447 // Verify HTTP status
448 if resp.StatusCode != http.StatusOK {
449 // Cache the failure
450 c.cacheVerificationResult(did, false, 5*time.Minute)
451 return fmt.Errorf("DID document returned HTTP %d from %s", resp.StatusCode, didDocURL)
452 }
453
454 // Parse DID document
455 var didDoc struct {
456 ID string `json:"id"`
457 AlsoKnownAs []string `json:"alsoKnownAs"`
458 }
459 if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil {
460 // Cache the failure
461 c.cacheVerificationResult(did, false, 5*time.Minute)
462 return fmt.Errorf("failed to parse DID document JSON: %w", err)
463 }
464
465 // Verify DID document ID matches claimed DID
466 if didDoc.ID != did {
467 // Cache the failure
468 c.cacheVerificationResult(did, false, 5*time.Minute)
469 return fmt.Errorf("DID document ID (%s) doesn't match claimed DID (%s)", didDoc.ID, did)
470 }
471
472 // SECURITY: Bidirectional verification - DID document must claim this handle
473 // Prevents impersonation where someone points DNS to another user's DID
474 // Format: handle "coves.social" or "!community@coves.social" → check for "at://coves.social"
475 handleDomain := extractDomainFromHandle(handle)
476 expectedAlias := fmt.Sprintf("at://%s", handleDomain)
477
478 found := false
479 for _, alias := range didDoc.AlsoKnownAs {
480 if alias == expectedAlias {
481 found = true
482 break
483 }
484 }
485
486 if !found {
487 // Cache the failure
488 c.cacheVerificationResult(did, false, 5*time.Minute)
489 return fmt.Errorf("DID document does not claim handle domain %s in alsoKnownAs (expected %s, got %v)",
490 handleDomain, expectedAlias, didDoc.AlsoKnownAs)
491 }
492
493 // Cache the success (24 hour TTL - matches Bluesky recommendations)
494 c.cacheVerificationResult(did, true, 24*time.Hour)
495
496 log.Printf("✓ DID document verified: %s", domain)
497 return nil
498}
499
500// cacheVerificationResult stores a verification result in the bounded LRU cache with the given TTL
501// The LRU cache is thread-safe and automatically evicts least-recently-used entries when full
502func (c *CommunityEventConsumer) cacheVerificationResult(did string, valid bool, ttl time.Duration) {
503 c.didCache.Add(did, cachedDIDDoc{
504 valid: valid,
505 expiresAt: time.Now().Add(ttl),
506 })
507}
508
509// extractDomainFromHandle extracts the registrable domain from a community handle
510// Handles both formats:
511// - Bluesky-style: "!gaming@coves.social" → "coves.social"
512// - DNS-style: "gaming.community.coves.social" → "coves.social"
513//
514// Uses golang.org/x/net/publicsuffix to correctly handle multi-part TLDs:
515// - "gaming.community.coves.co.uk" → "coves.co.uk" (not "co.uk")
516// - "gaming.community.example.com.au" → "example.com.au" (not "com.au")
517func extractDomainFromHandle(handle string) string {
518 // Remove leading ! if present
519 handle = strings.TrimPrefix(handle, "!")
520
521 // Check for @-separated format (e.g., "gaming@coves.social")
522 if strings.Contains(handle, "@") {
523 parts := strings.Split(handle, "@")
524 if len(parts) == 2 {
525 domain := parts[1]
526 // Validate and extract eTLD+1 from the @-domain part
527 registrable, err := publicsuffix.EffectiveTLDPlusOne(domain)
528 if err != nil {
529 // If publicsuffix fails, fall back to returning the full domain part
530 // This handles edge cases like localhost, IP addresses, etc.
531 return domain
532 }
533 return registrable
534 }
535 return ""
536 }
537
538 // For DNS-style handles (e.g., "gaming.community.coves.social")
539 // Extract the registrable domain (eTLD+1) using publicsuffix
540 // This correctly handles multi-part TLDs like .co.uk, .com.au, etc.
541 registrable, err := publicsuffix.EffectiveTLDPlusOne(handle)
542 if err != nil {
543 // If publicsuffix fails (e.g., invalid TLD, localhost, IP address)
544 // fall back to naive extraction (last 2 parts)
545 // This maintains backward compatibility for edge cases
546 parts := strings.Split(handle, ".")
547 if len(parts) < 2 {
548 return "" // Invalid handle
549 }
550 return strings.Join(parts[len(parts)-2:], ".")
551 }
552
553 return registrable
554}
555
556// handleSubscription processes subscription create/delete events
557// CREATE operation = user subscribed to community
558// DELETE operation = user unsubscribed from community
559func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
560 switch commit.Operation {
561 case "create":
562 return c.createSubscription(ctx, userDID, commit)
563 case "delete":
564 return c.deleteSubscription(ctx, userDID, commit)
565 default:
566 // Update operations shouldn't happen on subscriptions, but ignore gracefully
567 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)",
568 commit.Operation, userDID, commit.RKey)
569 return nil
570 }
571}
572
573// createSubscription indexes a new subscription with retry logic
574func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
575 if commit.Record == nil {
576 return fmt.Errorf("subscription create event missing record data")
577 }
578
579 // Extract community DID from record's subject field (following atProto conventions)
580 communityDID, ok := commit.Record["subject"].(string)
581 if !ok {
582 return fmt.Errorf("subscription record missing subject field")
583 }
584
585 // Extract contentVisibility with clamping and default value
586 contentVisibility := extractContentVisibility(commit.Record)
587
588 // Build AT-URI for subscription record
589 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint
590 // The record lives in the USER's repository, but uses the communities namespace
591 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
592
593 // Create subscription entity
594 // Parse createdAt from record to preserve chronological ordering during replays
595 subscription := &communities.Subscription{
596 UserDID: userDID,
597 CommunityDID: communityDID,
598 ContentVisibility: contentVisibility,
599 SubscribedAt: utils.ParseCreatedAt(commit.Record),
600 RecordURI: uri,
601 RecordCID: commit.CID,
602 }
603
604 // Use transactional method to ensure subscription and count are atomically updated
605 // This is idempotent - safe for Jetstream replays
606 _, err := c.repo.SubscribeWithCount(ctx, subscription)
607 if err != nil {
608 // If already exists, that's fine (idempotency)
609 if communities.IsConflict(err) {
610 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)",
611 userDID, communityDID, contentVisibility)
612 return nil
613 }
614 return fmt.Errorf("failed to index subscription: %w", err)
615 }
616
617 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)",
618 userDID, communityDID, contentVisibility)
619 return nil
620}
621
622// deleteSubscription removes a subscription from the index
623// DELETE operations don't include record data, so we need to look up the subscription
624// by its URI to find which community the user unsubscribed from
625func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
626 // Build AT-URI from the rkey
627 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
628
629 // Look up the subscription to get the community DID
630 // (DELETE operations don't include record data in Jetstream)
631 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri)
632 if err != nil {
633 if communities.IsNotFound(err) {
634 // Already deleted - this is fine (idempotency)
635 log.Printf("Subscription already deleted: %s", uri)
636 return nil
637 }
638 return fmt.Errorf("failed to find subscription for deletion: %w", err)
639 }
640
641 // Use transactional method to ensure unsubscribe and count are atomically updated
642 // This is idempotent - safe for Jetstream replays
643 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID)
644 if err != nil {
645 if communities.IsNotFound(err) {
646 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID)
647 return nil
648 }
649 return fmt.Errorf("failed to remove subscription: %w", err)
650 }
651
652 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID)
653 return nil
654}
655
656// handleBlock processes block create/delete events
657// CREATE operation = user blocked a community
658// DELETE operation = user unblocked a community
659func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
660 switch commit.Operation {
661 case "create":
662 return c.createBlock(ctx, userDID, commit)
663 case "delete":
664 return c.deleteBlock(ctx, userDID, commit)
665 default:
666 // Update operations shouldn't happen on blocks, but ignore gracefully
667 log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)",
668 commit.Operation, userDID, commit.RKey)
669 return nil
670 }
671}
672
673// createBlock indexes a new block
674func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
675 if commit.Record == nil {
676 return fmt.Errorf("block create event missing record data")
677 }
678
679 // Extract community DID from record's subject field (following atProto conventions)
680 communityDID, ok := commit.Record["subject"].(string)
681 if !ok {
682 return fmt.Errorf("block record missing subject field")
683 }
684
685 // Build AT-URI for block record
686 // The record lives in the USER's repository
687 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
688
689 // Create block entity
690 // Parse createdAt from record to preserve chronological ordering during replays
691 block := &communities.CommunityBlock{
692 UserDID: userDID,
693 CommunityDID: communityDID,
694 BlockedAt: utils.ParseCreatedAt(commit.Record),
695 RecordURI: uri,
696 RecordCID: commit.CID,
697 }
698
699 // Index the block
700 // This is idempotent - safe for Jetstream replays
701 _, err := c.repo.BlockCommunity(ctx, block)
702 if err != nil {
703 // If already exists, that's fine (idempotency)
704 if communities.IsConflict(err) {
705 log.Printf("Block already indexed: %s -> %s", userDID, communityDID)
706 return nil
707 }
708 return fmt.Errorf("failed to index block: %w", err)
709 }
710
711 log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID)
712 return nil
713}
714
715// deleteBlock removes a block from the index
716// DELETE operations don't include record data, so we need to look up the block
717// by its URI to find which community the user unblocked
718func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
719 // Build AT-URI from the rkey
720 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
721
722 // Look up the block to get the community DID
723 // (DELETE operations don't include record data in Jetstream)
724 block, err := c.repo.GetBlockByURI(ctx, uri)
725 if err != nil {
726 if communities.IsNotFound(err) {
727 // Already deleted - this is fine (idempotency)
728 log.Printf("Block already deleted: %s", uri)
729 return nil
730 }
731 return fmt.Errorf("failed to find block for deletion: %w", err)
732 }
733
734 // Remove the block from the index
735 err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID)
736 if err != nil {
737 if communities.IsNotFound(err) {
738 log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID)
739 return nil
740 }
741 return fmt.Errorf("failed to remove block: %w", err)
742 }
743
744 log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID)
745 return nil
746}
747
748// Helper types and functions
749
750type CommunityProfile struct {
751 CreatedAt time.Time `json:"createdAt"`
752 Avatar map[string]interface{} `json:"avatar"`
753 Banner map[string]interface{} `json:"banner"`
754 CreatedBy string `json:"createdBy"`
755 Visibility string `json:"visibility"`
756 AtprotoHandle string `json:"atprotoHandle"`
757 DisplayName string `json:"displayName"`
758 Name string `json:"name"`
759 Handle string `json:"handle"`
760 HostedBy string `json:"hostedBy"`
761 Description string `json:"description"`
762 FederatedID string `json:"federatedId"`
763 ModerationType string `json:"moderationType"`
764 FederatedFrom string `json:"federatedFrom"`
765 ContentWarnings []string `json:"contentWarnings"`
766 DescriptionFacets []interface{} `json:"descriptionFacets"`
767 MemberCount int `json:"memberCount"`
768 SubscriberCount int `json:"subscriberCount"`
769 Federation FederationConfig `json:"federation"`
770}
771
772type FederationConfig struct {
773 AllowExternalDiscovery bool `json:"allowExternalDiscovery"`
774}
775
776// parseCommunityProfile converts a raw record map to a CommunityProfile
777func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) {
778 recordJSON, err := json.Marshal(record)
779 if err != nil {
780 return nil, fmt.Errorf("failed to marshal record: %w", err)
781 }
782
783 var profile CommunityProfile
784 if err := json.Unmarshal(recordJSON, &profile); err != nil {
785 return nil, fmt.Errorf("failed to unmarshal profile: %w", err)
786 }
787
788 return &profile, nil
789}
790
791// constructHandleFromProfile constructs a deterministic handle from profile data
792// Format: {name}.community.{instanceDomain}
793// Example: gaming.community.coves.social
794// This is ONLY used in test mode (when identity resolver is nil)
795// Production MUST resolve handles from PLC (source of truth)
796// Returns empty string if hostedBy is not did:web format (caller will fail validation)
797func constructHandleFromProfile(profile *CommunityProfile) string {
798 if !strings.HasPrefix(profile.HostedBy, "did:web:") {
799 // hostedBy must be did:web format for handle construction
800 // Return empty to trigger validation error in repository
801 return ""
802 }
803 instanceDomain := strings.TrimPrefix(profile.HostedBy, "did:web:")
804 return fmt.Sprintf("%s.community.%s", profile.Name, instanceDomain)
805}
806
807// extractContentVisibility extracts contentVisibility from subscription record with clamping
808// Returns default value of 3 if missing or invalid
809func extractContentVisibility(record map[string]interface{}) int {
810 const defaultVisibility = 3
811
812 cv, ok := record["contentVisibility"]
813 if !ok {
814 // Field missing - use default
815 return defaultVisibility
816 }
817
818 // JSON numbers decode as float64
819 cvFloat, ok := cv.(float64)
820 if !ok {
821 // Try int (shouldn't happen but handle gracefully)
822 if cvInt, isInt := cv.(int); isInt {
823 return clampContentVisibility(cvInt)
824 }
825 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv)
826 return defaultVisibility
827 }
828
829 // Convert and clamp
830 clamped := clampContentVisibility(int(cvFloat))
831 if clamped != int(cvFloat) {
832 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped)
833 }
834 return clamped
835}
836
837// clampContentVisibility ensures value is within valid range (1-5)
838func clampContentVisibility(value int) int {
839 if value < 1 {
840 return 1
841 }
842 if value > 5 {
843 return 5
844 }
845 return value
846}
847
848// extractBlobCID extracts the CID from a blob reference
849// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123}
850func extractBlobCID(blob map[string]interface{}) (string, bool) {
851 if blob == nil {
852 return "", false
853 }
854
855 // Check if it's a blob type
856 blobType, ok := blob["$type"].(string)
857 if !ok || blobType != "blob" {
858 return "", false
859 }
860
861 // Extract ref
862 ref, ok := blob["ref"].(map[string]interface{})
863 if !ok {
864 return "", false
865 }
866
867 // Extract $link (the CID)
868 link, ok := ref["$link"].(string)
869 if !ok {
870 return "", false
871 }
872
873 return link, true
874}