A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/atproto/identity"
5 "Coves/internal/atproto/utils"
6 "Coves/internal/core/communities"
7 "context"
8 "encoding/json"
9 "fmt"
10 "log"
11 "net/http"
12 "strings"
13 "time"
14
15 lru "github.com/hashicorp/golang-lru/v2"
16 "golang.org/x/net/publicsuffix"
17 "golang.org/x/time/rate"
18)
19
20// CommunityEventConsumer consumes community-related events from Jetstream
21type CommunityEventConsumer struct {
22 repo communities.Repository // Repository for community operations
23 identityResolver interface {
24 Resolve(context.Context, string) (*identity.Identity, error)
25 } // For resolving handles from DIDs
26 httpClient *http.Client // Shared HTTP client with connection pooling
27 didCache *lru.Cache[string, cachedDIDDoc] // Bounded LRU cache for .well-known verification results
28 wellKnownLimiter *rate.Limiter // Rate limiter for .well-known fetches
29 instanceDID string // DID of this Coves instance
30 skipVerification bool // Skip did:web verification (for dev mode)
31}
32
33// cachedDIDDoc represents a cached verification result with expiration
34type cachedDIDDoc struct {
35 expiresAt time.Time // When this cache entry expires
36 valid bool // Whether verification passed
37}
38
39// NewCommunityEventConsumer creates a new Jetstream consumer for community events
40// instanceDID: The DID of this Coves instance (for hostedBy verification)
41// skipVerification: Skip did:web verification (for dev mode)
42// identityResolver: Optional resolver for resolving handles from DIDs (can be nil for tests)
43func NewCommunityEventConsumer(repo communities.Repository, instanceDID string, skipVerification bool, identityResolver interface {
44 Resolve(context.Context, string) (*identity.Identity, error)
45},
46) *CommunityEventConsumer {
47 // Create bounded LRU cache for DID document verification results
48 // Max 1000 entries to prevent unbounded memory growth (PR review feedback)
49 // Each entry ~100 bytes → max ~100KB memory overhead
50 cache, err := lru.New[string, cachedDIDDoc](1000)
51 if err != nil {
52 // This should never happen with a valid size, but handle gracefully
53 log.Printf("WARNING: Failed to create DID cache, verification will be slower: %v", err)
54 // Create minimal cache to avoid nil pointer
55 cache, _ = lru.New[string, cachedDIDDoc](1)
56 }
57
58 return &CommunityEventConsumer{
59 repo: repo,
60 identityResolver: identityResolver, // Optional - can be nil for tests
61 instanceDID: instanceDID,
62 skipVerification: skipVerification,
63 // Shared HTTP client with connection pooling for .well-known fetches
64 httpClient: &http.Client{
65 Timeout: 10 * time.Second,
66 Transport: &http.Transport{
67 MaxIdleConns: 100,
68 MaxIdleConnsPerHost: 10,
69 IdleConnTimeout: 90 * time.Second,
70 },
71 },
72 // Bounded LRU cache for .well-known verification results (max 1000 entries)
73 // Automatically evicts least-recently-used entries when full
74 didCache: cache,
75 // Rate limiter: 10 requests per second, burst of 20
76 // Prevents DoS via excessive .well-known fetches
77 wellKnownLimiter: rate.NewLimiter(10, 20),
78 }
79}
80
81// HandleEvent processes a Jetstream event for community records
82// This is called by the main Jetstream consumer when it receives commit events
83func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
84 // We only care about commit events for community records
85 if event.Kind != "commit" || event.Commit == nil {
86 return nil
87 }
88
89 commit := event.Commit
90
91 // Route to appropriate handler based on collection
92 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures
93 // - social.coves.community.profile: Community profile records (in community's own repo)
94 // - social.coves.community.subscription: Subscription records (in user's repo)
95 // - social.coves.community.block: Block records (in user's repo)
96 //
97 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints
98 // that CREATE or DELETE records in these collections
99 switch commit.Collection {
100 case "social.coves.community.profile":
101 return c.handleCommunityProfile(ctx, event.Did, commit)
102 case "social.coves.community.subscription":
103 // Handle both create (subscribe) and delete (unsubscribe) operations
104 return c.handleSubscription(ctx, event.Did, commit)
105 case "social.coves.community.block":
106 // Handle both create (block) and delete (unblock) operations
107 return c.handleBlock(ctx, event.Did, commit)
108 default:
109 // Not a community-related collection
110 return nil
111 }
112}
113
114// handleCommunityProfile processes community profile create/update/delete events
115func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error {
116 switch commit.Operation {
117 case "create":
118 return c.createCommunity(ctx, did, commit)
119 case "update":
120 return c.updateCommunity(ctx, did, commit)
121 case "delete":
122 return c.deleteCommunity(ctx, did)
123 default:
124 log.Printf("Unknown operation for community profile: %s", commit.Operation)
125 return nil
126 }
127}
128
129// createCommunity indexes a new community from the firehose
130func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error {
131 if commit.Record == nil {
132 return fmt.Errorf("community profile create event missing record data")
133 }
134
135 // Parse the community profile record
136 profile, err := parseCommunityProfile(commit.Record)
137 if err != nil {
138 return fmt.Errorf("failed to parse community profile: %w", err)
139 }
140
141 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs)
142 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID
143 if profile.Handle == "" {
144 if c.identityResolver != nil {
145 // Production: Resolve handle from PLC (source of truth)
146 // NO FALLBACK - if PLC is down, we fail and backfill later
147 // This prevents creating communities with incorrect handles in federated scenarios
148 identity, err := c.identityResolver.Resolve(ctx, did)
149 if err != nil {
150 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err)
151 }
152 profile.Handle = identity.Handle
153 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)",
154 profile.Handle, did, identity.Method)
155 } else {
156 // Test mode only: construct deterministically when no resolver available
157 profile.Handle = constructHandleFromProfile(profile)
158 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)",
159 profile.Handle, profile.Name, profile.HostedBy)
160 }
161 }
162
163 // SECURITY: Verify hostedBy claim matches handle domain
164 // This prevents malicious instances from claiming to host communities for domains they don't own
165 if err := c.verifyHostedByClaim(ctx, profile.Handle, profile.HostedBy); err != nil {
166 log.Printf("🚨 SECURITY: Rejecting community %s - hostedBy verification failed: %v", did, err)
167 log.Printf(" Handle: %s, HostedBy: %s", profile.Handle, profile.HostedBy)
168 return fmt.Errorf("hostedBy verification failed: %w", err)
169 }
170
171 // Build AT-URI for this record
172 // V2 Architecture (ONLY):
173 // - 'did' parameter IS the community DID (community owns its own repo)
174 // - rkey MUST be "self" for community profiles
175 // - URI: at://community_did/social.coves.community.profile/self
176
177 // REJECT non-V2 communities (pre-production: no V1 compatibility)
178 if commit.RKey != "self" {
179 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
180 }
181
182 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did)
183
184 // V2: Community ALWAYS owns itself
185 ownerDID := did
186
187 // Create community entity
188 community := &communities.Community{
189 DID: did, // V2: Repository DID IS the community DID
190 Handle: profile.Handle,
191 Name: profile.Name,
192 DisplayName: profile.DisplayName,
193 Description: profile.Description,
194 OwnerDID: ownerDID, // V2: same as DID (self-owned)
195 CreatedByDID: profile.CreatedBy,
196 HostedByDID: profile.HostedBy,
197 Visibility: profile.Visibility,
198 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery,
199 ModerationType: profile.ModerationType,
200 ContentWarnings: profile.ContentWarnings,
201 MemberCount: profile.MemberCount,
202 SubscriberCount: profile.SubscriberCount,
203 FederatedFrom: profile.FederatedFrom,
204 FederatedID: profile.FederatedID,
205 CreatedAt: profile.CreatedAt,
206 UpdatedAt: time.Now(),
207 RecordURI: uri,
208 RecordCID: commit.CID,
209 }
210
211 // Handle blobs (avatar/banner) if present
212 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
213 community.AvatarCID = avatarCID
214 }
215 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
216 community.BannerCID = bannerCID
217 }
218
219 // Handle description facets (rich text)
220 if profile.DescriptionFacets != nil {
221 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
222 if marshalErr == nil {
223 community.DescriptionFacets = facetsJSON
224 }
225 }
226
227 // Index in AppView database
228 _, err = c.repo.Create(ctx, community)
229 if err != nil {
230 // Check if it already exists (idempotency)
231 if communities.IsConflict(err) {
232 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID)
233 return nil
234 }
235 return fmt.Errorf("failed to index community: %w", err)
236 }
237
238 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID)
239 return nil
240}
241
242// updateCommunity updates an existing community from the firehose
243func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error {
244 if commit.Record == nil {
245 return fmt.Errorf("community profile update event missing record data")
246 }
247
248 // REJECT non-V2 communities (pre-production: no V1 compatibility)
249 if commit.RKey != "self" {
250 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
251 }
252
253 // Parse profile
254 profile, err := parseCommunityProfile(commit.Record)
255 if err != nil {
256 return fmt.Errorf("failed to parse community profile: %w", err)
257 }
258
259 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs)
260 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID
261 if profile.Handle == "" {
262 if c.identityResolver != nil {
263 // Production: Resolve handle from PLC (source of truth)
264 // NO FALLBACK - if PLC is down, we fail and backfill later
265 // This prevents creating communities with incorrect handles in federated scenarios
266 identity, err := c.identityResolver.Resolve(ctx, did)
267 if err != nil {
268 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err)
269 }
270 profile.Handle = identity.Handle
271 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)",
272 profile.Handle, did, identity.Method)
273 } else {
274 // Test mode only: construct deterministically when no resolver available
275 profile.Handle = constructHandleFromProfile(profile)
276 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)",
277 profile.Handle, profile.Name, profile.HostedBy)
278 }
279 }
280
281 // V2: Repository DID IS the community DID
282 // Get existing community using the repo DID
283 existing, err := c.repo.GetByDID(ctx, did)
284 if err != nil {
285 if communities.IsNotFound(err) {
286 // Community doesn't exist yet - treat as create
287 log.Printf("Community not found for update, creating: %s", did)
288 return c.createCommunity(ctx, did, commit)
289 }
290 return fmt.Errorf("failed to get existing community: %w", err)
291 }
292
293 // Update fields
294 existing.Handle = profile.Handle
295 existing.Name = profile.Name
296 existing.DisplayName = profile.DisplayName
297 existing.Description = profile.Description
298 existing.Visibility = profile.Visibility
299 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery
300 existing.ModerationType = profile.ModerationType
301 existing.ContentWarnings = profile.ContentWarnings
302 existing.RecordCID = commit.CID
303
304 // Update blobs
305 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
306 existing.AvatarCID = avatarCID
307 }
308 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
309 existing.BannerCID = bannerCID
310 }
311
312 // Update description facets
313 if profile.DescriptionFacets != nil {
314 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
315 if marshalErr == nil {
316 existing.DescriptionFacets = facetsJSON
317 }
318 }
319
320 // Save updates
321 _, err = c.repo.Update(ctx, existing)
322 if err != nil {
323 return fmt.Errorf("failed to update community: %w", err)
324 }
325
326 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID)
327 return nil
328}
329
330// deleteCommunity removes a community from the index
331func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error {
332 err := c.repo.Delete(ctx, did)
333 if err != nil {
334 if communities.IsNotFound(err) {
335 log.Printf("Community already deleted: %s", did)
336 return nil
337 }
338 return fmt.Errorf("failed to delete community: %w", err)
339 }
340
341 log.Printf("Deleted community: %s", did)
342 return nil
343}
344
345// verifyHostedByClaim verifies that the community's hostedBy claim matches the handle domain
346// This prevents malicious instances from claiming to host communities for domains they don't own
347func (c *CommunityEventConsumer) verifyHostedByClaim(ctx context.Context, handle, hostedByDID string) error {
348 // Skip verification in dev mode
349 if c.skipVerification {
350 return nil
351 }
352
353 // Add 15 second overall timeout to prevent slow verification from blocking consumer (PR review feedback)
354 ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
355 defer cancel()
356
357 // Verify hostedByDID is did:web format
358 if !strings.HasPrefix(hostedByDID, "did:web:") {
359 return fmt.Errorf("hostedByDID must use did:web method, got: %s", hostedByDID)
360 }
361
362 // Extract domain from did:web DID
363 hostedByDomain := strings.TrimPrefix(hostedByDID, "did:web:")
364
365 // Extract domain from community handle
366 // Handle format examples:
367 // - "!gaming@coves.social" → domain: "coves.social"
368 // - "gaming.community.coves.social" → domain: "coves.social"
369 handleDomain := extractDomainFromHandle(handle)
370 if handleDomain == "" {
371 return fmt.Errorf("failed to extract domain from handle: %s", handle)
372 }
373
374 // Verify handle domain matches hostedBy domain
375 if handleDomain != hostedByDomain {
376 return fmt.Errorf("handle domain (%s) doesn't match hostedBy domain (%s)", handleDomain, hostedByDomain)
377 }
378
379 // Optional: Verify DID document exists and is valid
380 // This provides cryptographic proof of domain ownership
381 if err := c.verifyDIDDocument(ctx, hostedByDID, hostedByDomain); err != nil {
382 // Soft-fail: Log warning but don't reject the community
383 // This allows operation during network issues or .well-known misconfiguration
384 log.Printf("⚠️ WARNING: DID document verification failed for %s: %v", hostedByDomain, err)
385 log.Printf(" Community will be indexed, but hostedBy claim cannot be cryptographically verified")
386 }
387
388 return nil
389}
390
391// verifyDIDDocument fetches and validates the DID document from .well-known/did.json
392// This provides cryptographic proof that the instance controls the domain
393// Results are cached with TTL and rate-limited to prevent DoS attacks
394func (c *CommunityEventConsumer) verifyDIDDocument(ctx context.Context, did, domain string) error {
395 // Skip verification in dev mode
396 if c.skipVerification {
397 return nil
398 }
399
400 // Check bounded LRU cache first (thread-safe, no locks needed)
401 if cached, ok := c.didCache.Get(did); ok {
402 // Check if cache entry is still valid (not expired)
403 if time.Now().Before(cached.expiresAt) {
404 if !cached.valid {
405 return fmt.Errorf("cached verification failure for %s", did)
406 }
407 log.Printf("✓ DID document verification (cached): %s", domain)
408 return nil
409 }
410 // Cache entry expired - remove it to free up space for fresh entries
411 c.didCache.Remove(did)
412 }
413
414 // Rate limit .well-known fetches to prevent DoS
415 if err := c.wellKnownLimiter.Wait(ctx); err != nil {
416 return fmt.Errorf("rate limit exceeded for .well-known fetch: %w", err)
417 }
418
419 // Construct .well-known URL
420 didDocURL := fmt.Sprintf("https://%s/.well-known/did.json", domain)
421
422 // Create HTTP request with timeout
423 req, err := http.NewRequestWithContext(ctx, "GET", didDocURL, nil)
424 if err != nil {
425 // Cache the failure
426 c.cacheVerificationResult(did, false, 5*time.Minute)
427 return fmt.Errorf("failed to create request: %w", err)
428 }
429
430 // Fetch DID document using shared HTTP client
431 resp, err := c.httpClient.Do(req)
432 if err != nil {
433 // Cache the failure (shorter TTL for network errors)
434 c.cacheVerificationResult(did, false, 5*time.Minute)
435 return fmt.Errorf("failed to fetch DID document from %s: %w", didDocURL, err)
436 }
437 defer func() {
438 if closeErr := resp.Body.Close(); closeErr != nil {
439 log.Printf("Failed to close response body: %v", closeErr)
440 }
441 }()
442
443 // Verify HTTP status
444 if resp.StatusCode != http.StatusOK {
445 // Cache the failure
446 c.cacheVerificationResult(did, false, 5*time.Minute)
447 return fmt.Errorf("DID document returned HTTP %d from %s", resp.StatusCode, didDocURL)
448 }
449
450 // Parse DID document
451 var didDoc struct {
452 ID string `json:"id"`
453 }
454 if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil {
455 // Cache the failure
456 c.cacheVerificationResult(did, false, 5*time.Minute)
457 return fmt.Errorf("failed to parse DID document JSON: %w", err)
458 }
459
460 // Verify DID document ID matches claimed DID
461 if didDoc.ID != did {
462 // Cache the failure
463 c.cacheVerificationResult(did, false, 5*time.Minute)
464 return fmt.Errorf("DID document ID (%s) doesn't match claimed DID (%s)", didDoc.ID, did)
465 }
466
467 // Cache the success (1 hour TTL)
468 c.cacheVerificationResult(did, true, 1*time.Hour)
469
470 log.Printf("✓ DID document verified: %s", domain)
471 return nil
472}
473
474// cacheVerificationResult stores a verification result in the bounded LRU cache with the given TTL
475// The LRU cache is thread-safe and automatically evicts least-recently-used entries when full
476func (c *CommunityEventConsumer) cacheVerificationResult(did string, valid bool, ttl time.Duration) {
477 c.didCache.Add(did, cachedDIDDoc{
478 valid: valid,
479 expiresAt: time.Now().Add(ttl),
480 })
481}
482
483// extractDomainFromHandle extracts the registrable domain from a community handle
484// Handles both formats:
485// - Bluesky-style: "!gaming@coves.social" → "coves.social"
486// - DNS-style: "gaming.community.coves.social" → "coves.social"
487//
488// Uses golang.org/x/net/publicsuffix to correctly handle multi-part TLDs:
489// - "gaming.community.coves.co.uk" → "coves.co.uk" (not "co.uk")
490// - "gaming.community.example.com.au" → "example.com.au" (not "com.au")
491func extractDomainFromHandle(handle string) string {
492 // Remove leading ! if present
493 handle = strings.TrimPrefix(handle, "!")
494
495 // Check for @-separated format (e.g., "gaming@coves.social")
496 if strings.Contains(handle, "@") {
497 parts := strings.Split(handle, "@")
498 if len(parts) == 2 {
499 domain := parts[1]
500 // Validate and extract eTLD+1 from the @-domain part
501 registrable, err := publicsuffix.EffectiveTLDPlusOne(domain)
502 if err != nil {
503 // If publicsuffix fails, fall back to returning the full domain part
504 // This handles edge cases like localhost, IP addresses, etc.
505 return domain
506 }
507 return registrable
508 }
509 return ""
510 }
511
512 // For DNS-style handles (e.g., "gaming.community.coves.social")
513 // Extract the registrable domain (eTLD+1) using publicsuffix
514 // This correctly handles multi-part TLDs like .co.uk, .com.au, etc.
515 registrable, err := publicsuffix.EffectiveTLDPlusOne(handle)
516 if err != nil {
517 // If publicsuffix fails (e.g., invalid TLD, localhost, IP address)
518 // fall back to naive extraction (last 2 parts)
519 // This maintains backward compatibility for edge cases
520 parts := strings.Split(handle, ".")
521 if len(parts) < 2 {
522 return "" // Invalid handle
523 }
524 return strings.Join(parts[len(parts)-2:], ".")
525 }
526
527 return registrable
528}
529
530// handleSubscription processes subscription create/delete events
531// CREATE operation = user subscribed to community
532// DELETE operation = user unsubscribed from community
533func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
534 switch commit.Operation {
535 case "create":
536 return c.createSubscription(ctx, userDID, commit)
537 case "delete":
538 return c.deleteSubscription(ctx, userDID, commit)
539 default:
540 // Update operations shouldn't happen on subscriptions, but ignore gracefully
541 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)",
542 commit.Operation, userDID, commit.RKey)
543 return nil
544 }
545}
546
547// createSubscription indexes a new subscription with retry logic
548func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
549 if commit.Record == nil {
550 return fmt.Errorf("subscription create event missing record data")
551 }
552
553 // Extract community DID from record's subject field (following atProto conventions)
554 communityDID, ok := commit.Record["subject"].(string)
555 if !ok {
556 return fmt.Errorf("subscription record missing subject field")
557 }
558
559 // Extract contentVisibility with clamping and default value
560 contentVisibility := extractContentVisibility(commit.Record)
561
562 // Build AT-URI for subscription record
563 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint
564 // The record lives in the USER's repository, but uses the communities namespace
565 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
566
567 // Create subscription entity
568 // Parse createdAt from record to preserve chronological ordering during replays
569 subscription := &communities.Subscription{
570 UserDID: userDID,
571 CommunityDID: communityDID,
572 ContentVisibility: contentVisibility,
573 SubscribedAt: utils.ParseCreatedAt(commit.Record),
574 RecordURI: uri,
575 RecordCID: commit.CID,
576 }
577
578 // Use transactional method to ensure subscription and count are atomically updated
579 // This is idempotent - safe for Jetstream replays
580 _, err := c.repo.SubscribeWithCount(ctx, subscription)
581 if err != nil {
582 // If already exists, that's fine (idempotency)
583 if communities.IsConflict(err) {
584 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)",
585 userDID, communityDID, contentVisibility)
586 return nil
587 }
588 return fmt.Errorf("failed to index subscription: %w", err)
589 }
590
591 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)",
592 userDID, communityDID, contentVisibility)
593 return nil
594}
595
596// deleteSubscription removes a subscription from the index
597// DELETE operations don't include record data, so we need to look up the subscription
598// by its URI to find which community the user unsubscribed from
599func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
600 // Build AT-URI from the rkey
601 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
602
603 // Look up the subscription to get the community DID
604 // (DELETE operations don't include record data in Jetstream)
605 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri)
606 if err != nil {
607 if communities.IsNotFound(err) {
608 // Already deleted - this is fine (idempotency)
609 log.Printf("Subscription already deleted: %s", uri)
610 return nil
611 }
612 return fmt.Errorf("failed to find subscription for deletion: %w", err)
613 }
614
615 // Use transactional method to ensure unsubscribe and count are atomically updated
616 // This is idempotent - safe for Jetstream replays
617 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID)
618 if err != nil {
619 if communities.IsNotFound(err) {
620 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID)
621 return nil
622 }
623 return fmt.Errorf("failed to remove subscription: %w", err)
624 }
625
626 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID)
627 return nil
628}
629
630// handleBlock processes block create/delete events
631// CREATE operation = user blocked a community
632// DELETE operation = user unblocked a community
633func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
634 switch commit.Operation {
635 case "create":
636 return c.createBlock(ctx, userDID, commit)
637 case "delete":
638 return c.deleteBlock(ctx, userDID, commit)
639 default:
640 // Update operations shouldn't happen on blocks, but ignore gracefully
641 log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)",
642 commit.Operation, userDID, commit.RKey)
643 return nil
644 }
645}
646
647// createBlock indexes a new block
648func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
649 if commit.Record == nil {
650 return fmt.Errorf("block create event missing record data")
651 }
652
653 // Extract community DID from record's subject field (following atProto conventions)
654 communityDID, ok := commit.Record["subject"].(string)
655 if !ok {
656 return fmt.Errorf("block record missing subject field")
657 }
658
659 // Build AT-URI for block record
660 // The record lives in the USER's repository
661 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
662
663 // Create block entity
664 // Parse createdAt from record to preserve chronological ordering during replays
665 block := &communities.CommunityBlock{
666 UserDID: userDID,
667 CommunityDID: communityDID,
668 BlockedAt: utils.ParseCreatedAt(commit.Record),
669 RecordURI: uri,
670 RecordCID: commit.CID,
671 }
672
673 // Index the block
674 // This is idempotent - safe for Jetstream replays
675 _, err := c.repo.BlockCommunity(ctx, block)
676 if err != nil {
677 // If already exists, that's fine (idempotency)
678 if communities.IsConflict(err) {
679 log.Printf("Block already indexed: %s -> %s", userDID, communityDID)
680 return nil
681 }
682 return fmt.Errorf("failed to index block: %w", err)
683 }
684
685 log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID)
686 return nil
687}
688
689// deleteBlock removes a block from the index
690// DELETE operations don't include record data, so we need to look up the block
691// by its URI to find which community the user unblocked
692func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
693 // Build AT-URI from the rkey
694 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
695
696 // Look up the block to get the community DID
697 // (DELETE operations don't include record data in Jetstream)
698 block, err := c.repo.GetBlockByURI(ctx, uri)
699 if err != nil {
700 if communities.IsNotFound(err) {
701 // Already deleted - this is fine (idempotency)
702 log.Printf("Block already deleted: %s", uri)
703 return nil
704 }
705 return fmt.Errorf("failed to find block for deletion: %w", err)
706 }
707
708 // Remove the block from the index
709 err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID)
710 if err != nil {
711 if communities.IsNotFound(err) {
712 log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID)
713 return nil
714 }
715 return fmt.Errorf("failed to remove block: %w", err)
716 }
717
718 log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID)
719 return nil
720}
721
722// Helper types and functions
723
724type CommunityProfile struct {
725 CreatedAt time.Time `json:"createdAt"`
726 Avatar map[string]interface{} `json:"avatar"`
727 Banner map[string]interface{} `json:"banner"`
728 CreatedBy string `json:"createdBy"`
729 Visibility string `json:"visibility"`
730 AtprotoHandle string `json:"atprotoHandle"`
731 DisplayName string `json:"displayName"`
732 Name string `json:"name"`
733 Handle string `json:"handle"`
734 HostedBy string `json:"hostedBy"`
735 Description string `json:"description"`
736 FederatedID string `json:"federatedId"`
737 ModerationType string `json:"moderationType"`
738 FederatedFrom string `json:"federatedFrom"`
739 ContentWarnings []string `json:"contentWarnings"`
740 DescriptionFacets []interface{} `json:"descriptionFacets"`
741 MemberCount int `json:"memberCount"`
742 SubscriberCount int `json:"subscriberCount"`
743 Federation FederationConfig `json:"federation"`
744}
745
746type FederationConfig struct {
747 AllowExternalDiscovery bool `json:"allowExternalDiscovery"`
748}
749
750// parseCommunityProfile converts a raw record map to a CommunityProfile
751func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) {
752 recordJSON, err := json.Marshal(record)
753 if err != nil {
754 return nil, fmt.Errorf("failed to marshal record: %w", err)
755 }
756
757 var profile CommunityProfile
758 if err := json.Unmarshal(recordJSON, &profile); err != nil {
759 return nil, fmt.Errorf("failed to unmarshal profile: %w", err)
760 }
761
762 return &profile, nil
763}
764
765// constructHandleFromProfile constructs a deterministic handle from profile data
766// Format: {name}.community.{instanceDomain}
767// Example: gaming.community.coves.social
768// This is ONLY used in test mode (when identity resolver is nil)
769// Production MUST resolve handles from PLC (source of truth)
770// Returns empty string if hostedBy is not did:web format (caller will fail validation)
771func constructHandleFromProfile(profile *CommunityProfile) string {
772 if !strings.HasPrefix(profile.HostedBy, "did:web:") {
773 // hostedBy must be did:web format for handle construction
774 // Return empty to trigger validation error in repository
775 return ""
776 }
777 instanceDomain := strings.TrimPrefix(profile.HostedBy, "did:web:")
778 return fmt.Sprintf("%s.community.%s", profile.Name, instanceDomain)
779}
780
781// extractContentVisibility extracts contentVisibility from subscription record with clamping
782// Returns default value of 3 if missing or invalid
783func extractContentVisibility(record map[string]interface{}) int {
784 const defaultVisibility = 3
785
786 cv, ok := record["contentVisibility"]
787 if !ok {
788 // Field missing - use default
789 return defaultVisibility
790 }
791
792 // JSON numbers decode as float64
793 cvFloat, ok := cv.(float64)
794 if !ok {
795 // Try int (shouldn't happen but handle gracefully)
796 if cvInt, isInt := cv.(int); isInt {
797 return clampContentVisibility(cvInt)
798 }
799 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv)
800 return defaultVisibility
801 }
802
803 // Convert and clamp
804 clamped := clampContentVisibility(int(cvFloat))
805 if clamped != int(cvFloat) {
806 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped)
807 }
808 return clamped
809}
810
811// clampContentVisibility ensures value is within valid range (1-5)
812func clampContentVisibility(value int) int {
813 if value < 1 {
814 return 1
815 }
816 if value > 5 {
817 return 5
818 }
819 return value
820}
821
822// extractBlobCID extracts the CID from a blob reference
823// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123}
824func extractBlobCID(blob map[string]interface{}) (string, bool) {
825 if blob == nil {
826 return "", false
827 }
828
829 // Check if it's a blob type
830 blobType, ok := blob["$type"].(string)
831 if !ok || blobType != "blob" {
832 return "", false
833 }
834
835 // Extract ref
836 ref, ok := blob["ref"].(map[string]interface{})
837 if !ok {
838 return "", false
839 }
840
841 // Extract $link (the CID)
842 link, ok := ref["$link"].(string)
843 if !ok {
844 return "", false
845 }
846
847 return link, true
848}