A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/atproto/identity"
5 "Coves/internal/atproto/utils"
6 "Coves/internal/core/communities"
7 "context"
8 "encoding/json"
9 "fmt"
10 "log"
11 "net/http"
12 "strings"
13 "time"
14
15 lru "github.com/hashicorp/golang-lru/v2"
16 "golang.org/x/net/publicsuffix"
17 "golang.org/x/time/rate"
18)
19
20// CommunityEventConsumer consumes community-related events from Jetstream
21type CommunityEventConsumer struct {
22 repo communities.Repository // Repository for community operations
23 identityResolver interface{ Resolve(context.Context, string) (*identity.Identity, error) } // For resolving handles from DIDs
24 httpClient *http.Client // Shared HTTP client with connection pooling
25 didCache *lru.Cache[string, cachedDIDDoc] // Bounded LRU cache for .well-known verification results
26 wellKnownLimiter *rate.Limiter // Rate limiter for .well-known fetches
27 instanceDID string // DID of this Coves instance
28 skipVerification bool // Skip did:web verification (for dev mode)
29}
30
31// cachedDIDDoc represents a cached verification result with expiration
32type cachedDIDDoc struct {
33 expiresAt time.Time // When this cache entry expires
34 valid bool // Whether verification passed
35}
36
37// NewCommunityEventConsumer creates a new Jetstream consumer for community events
38// instanceDID: The DID of this Coves instance (for hostedBy verification)
39// skipVerification: Skip did:web verification (for dev mode)
40// identityResolver: Optional resolver for resolving handles from DIDs (can be nil for tests)
41func NewCommunityEventConsumer(repo communities.Repository, instanceDID string, skipVerification bool, identityResolver interface{ Resolve(context.Context, string) (*identity.Identity, error) }) *CommunityEventConsumer {
42 // Create bounded LRU cache for DID document verification results
43 // Max 1000 entries to prevent unbounded memory growth (PR review feedback)
44 // Each entry ~100 bytes → max ~100KB memory overhead
45 cache, err := lru.New[string, cachedDIDDoc](1000)
46 if err != nil {
47 // This should never happen with a valid size, but handle gracefully
48 log.Printf("WARNING: Failed to create DID cache, verification will be slower: %v", err)
49 // Create minimal cache to avoid nil pointer
50 cache, _ = lru.New[string, cachedDIDDoc](1)
51 }
52
53 return &CommunityEventConsumer{
54 repo: repo,
55 identityResolver: identityResolver, // Optional - can be nil for tests
56 instanceDID: instanceDID,
57 skipVerification: skipVerification,
58 // Shared HTTP client with connection pooling for .well-known fetches
59 httpClient: &http.Client{
60 Timeout: 10 * time.Second,
61 Transport: &http.Transport{
62 MaxIdleConns: 100,
63 MaxIdleConnsPerHost: 10,
64 IdleConnTimeout: 90 * time.Second,
65 },
66 },
67 // Bounded LRU cache for .well-known verification results (max 1000 entries)
68 // Automatically evicts least-recently-used entries when full
69 didCache: cache,
70 // Rate limiter: 10 requests per second, burst of 20
71 // Prevents DoS via excessive .well-known fetches
72 wellKnownLimiter: rate.NewLimiter(10, 20),
73 }
74}
75
76// HandleEvent processes a Jetstream event for community records
77// This is called by the main Jetstream consumer when it receives commit events
78func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
79 // We only care about commit events for community records
80 if event.Kind != "commit" || event.Commit == nil {
81 return nil
82 }
83
84 commit := event.Commit
85
86 // Route to appropriate handler based on collection
87 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures
88 // - social.coves.community.profile: Community profile records (in community's own repo)
89 // - social.coves.community.subscription: Subscription records (in user's repo)
90 // - social.coves.community.block: Block records (in user's repo)
91 //
92 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints
93 // that CREATE or DELETE records in these collections
94 switch commit.Collection {
95 case "social.coves.community.profile":
96 return c.handleCommunityProfile(ctx, event.Did, commit)
97 case "social.coves.community.subscription":
98 // Handle both create (subscribe) and delete (unsubscribe) operations
99 return c.handleSubscription(ctx, event.Did, commit)
100 case "social.coves.community.block":
101 // Handle both create (block) and delete (unblock) operations
102 return c.handleBlock(ctx, event.Did, commit)
103 default:
104 // Not a community-related collection
105 return nil
106 }
107}
108
109// handleCommunityProfile processes community profile create/update/delete events
110func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error {
111 switch commit.Operation {
112 case "create":
113 return c.createCommunity(ctx, did, commit)
114 case "update":
115 return c.updateCommunity(ctx, did, commit)
116 case "delete":
117 return c.deleteCommunity(ctx, did)
118 default:
119 log.Printf("Unknown operation for community profile: %s", commit.Operation)
120 return nil
121 }
122}
123
124// createCommunity indexes a new community from the firehose
125func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error {
126 if commit.Record == nil {
127 return fmt.Errorf("community profile create event missing record data")
128 }
129
130 // Parse the community profile record
131 profile, err := parseCommunityProfile(commit.Record)
132 if err != nil {
133 return fmt.Errorf("failed to parse community profile: %w", err)
134 }
135
136 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs)
137 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID
138 if profile.Handle == "" {
139 if c.identityResolver != nil {
140 // Production: Resolve handle from PLC (source of truth)
141 // NO FALLBACK - if PLC is down, we fail and backfill later
142 // This prevents creating communities with incorrect handles in federated scenarios
143 identity, err := c.identityResolver.Resolve(ctx, did)
144 if err != nil {
145 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err)
146 }
147 profile.Handle = identity.Handle
148 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)",
149 profile.Handle, did, identity.Method)
150 } else {
151 // Test mode only: construct deterministically when no resolver available
152 profile.Handle = constructHandleFromProfile(profile)
153 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)",
154 profile.Handle, profile.Name, profile.HostedBy)
155 }
156 }
157
158 // SECURITY: Verify hostedBy claim matches handle domain
159 // This prevents malicious instances from claiming to host communities for domains they don't own
160 if err := c.verifyHostedByClaim(ctx, profile.Handle, profile.HostedBy); err != nil {
161 log.Printf("🚨 SECURITY: Rejecting community %s - hostedBy verification failed: %v", did, err)
162 log.Printf(" Handle: %s, HostedBy: %s", profile.Handle, profile.HostedBy)
163 return fmt.Errorf("hostedBy verification failed: %w", err)
164 }
165
166 // Build AT-URI for this record
167 // V2 Architecture (ONLY):
168 // - 'did' parameter IS the community DID (community owns its own repo)
169 // - rkey MUST be "self" for community profiles
170 // - URI: at://community_did/social.coves.community.profile/self
171
172 // REJECT non-V2 communities (pre-production: no V1 compatibility)
173 if commit.RKey != "self" {
174 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
175 }
176
177 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did)
178
179 // V2: Community ALWAYS owns itself
180 ownerDID := did
181
182 // Create community entity
183 community := &communities.Community{
184 DID: did, // V2: Repository DID IS the community DID
185 Handle: profile.Handle,
186 Name: profile.Name,
187 DisplayName: profile.DisplayName,
188 Description: profile.Description,
189 OwnerDID: ownerDID, // V2: same as DID (self-owned)
190 CreatedByDID: profile.CreatedBy,
191 HostedByDID: profile.HostedBy,
192 Visibility: profile.Visibility,
193 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery,
194 ModerationType: profile.ModerationType,
195 ContentWarnings: profile.ContentWarnings,
196 MemberCount: profile.MemberCount,
197 SubscriberCount: profile.SubscriberCount,
198 FederatedFrom: profile.FederatedFrom,
199 FederatedID: profile.FederatedID,
200 CreatedAt: profile.CreatedAt,
201 UpdatedAt: time.Now(),
202 RecordURI: uri,
203 RecordCID: commit.CID,
204 }
205
206 // Handle blobs (avatar/banner) if present
207 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
208 community.AvatarCID = avatarCID
209 }
210 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
211 community.BannerCID = bannerCID
212 }
213
214 // Handle description facets (rich text)
215 if profile.DescriptionFacets != nil {
216 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
217 if marshalErr == nil {
218 community.DescriptionFacets = facetsJSON
219 }
220 }
221
222 // Index in AppView database
223 _, err = c.repo.Create(ctx, community)
224 if err != nil {
225 // Check if it already exists (idempotency)
226 if communities.IsConflict(err) {
227 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID)
228 return nil
229 }
230 return fmt.Errorf("failed to index community: %w", err)
231 }
232
233 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID)
234 return nil
235}
236
237// updateCommunity updates an existing community from the firehose
238func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error {
239 if commit.Record == nil {
240 return fmt.Errorf("community profile update event missing record data")
241 }
242
243 // REJECT non-V2 communities (pre-production: no V1 compatibility)
244 if commit.RKey != "self" {
245 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
246 }
247
248 // Parse profile
249 profile, err := parseCommunityProfile(commit.Record)
250 if err != nil {
251 return fmt.Errorf("failed to parse community profile: %w", err)
252 }
253
254 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs)
255 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID
256 if profile.Handle == "" {
257 if c.identityResolver != nil {
258 // Production: Resolve handle from PLC (source of truth)
259 // NO FALLBACK - if PLC is down, we fail and backfill later
260 // This prevents creating communities with incorrect handles in federated scenarios
261 identity, err := c.identityResolver.Resolve(ctx, did)
262 if err != nil {
263 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err)
264 }
265 profile.Handle = identity.Handle
266 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)",
267 profile.Handle, did, identity.Method)
268 } else {
269 // Test mode only: construct deterministically when no resolver available
270 profile.Handle = constructHandleFromProfile(profile)
271 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)",
272 profile.Handle, profile.Name, profile.HostedBy)
273 }
274 }
275
276 // V2: Repository DID IS the community DID
277 // Get existing community using the repo DID
278 existing, err := c.repo.GetByDID(ctx, did)
279 if err != nil {
280 if communities.IsNotFound(err) {
281 // Community doesn't exist yet - treat as create
282 log.Printf("Community not found for update, creating: %s", did)
283 return c.createCommunity(ctx, did, commit)
284 }
285 return fmt.Errorf("failed to get existing community: %w", err)
286 }
287
288 // Update fields
289 existing.Handle = profile.Handle
290 existing.Name = profile.Name
291 existing.DisplayName = profile.DisplayName
292 existing.Description = profile.Description
293 existing.Visibility = profile.Visibility
294 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery
295 existing.ModerationType = profile.ModerationType
296 existing.ContentWarnings = profile.ContentWarnings
297 existing.RecordCID = commit.CID
298
299 // Update blobs
300 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
301 existing.AvatarCID = avatarCID
302 }
303 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
304 existing.BannerCID = bannerCID
305 }
306
307 // Update description facets
308 if profile.DescriptionFacets != nil {
309 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
310 if marshalErr == nil {
311 existing.DescriptionFacets = facetsJSON
312 }
313 }
314
315 // Save updates
316 _, err = c.repo.Update(ctx, existing)
317 if err != nil {
318 return fmt.Errorf("failed to update community: %w", err)
319 }
320
321 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID)
322 return nil
323}
324
325// deleteCommunity removes a community from the index
326func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error {
327 err := c.repo.Delete(ctx, did)
328 if err != nil {
329 if communities.IsNotFound(err) {
330 log.Printf("Community already deleted: %s", did)
331 return nil
332 }
333 return fmt.Errorf("failed to delete community: %w", err)
334 }
335
336 log.Printf("Deleted community: %s", did)
337 return nil
338}
339
340// verifyHostedByClaim verifies that the community's hostedBy claim matches the handle domain
341// This prevents malicious instances from claiming to host communities for domains they don't own
342func (c *CommunityEventConsumer) verifyHostedByClaim(ctx context.Context, handle, hostedByDID string) error {
343 // Skip verification in dev mode
344 if c.skipVerification {
345 return nil
346 }
347
348 // Add 15 second overall timeout to prevent slow verification from blocking consumer (PR review feedback)
349 ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
350 defer cancel()
351
352 // Verify hostedByDID is did:web format
353 if !strings.HasPrefix(hostedByDID, "did:web:") {
354 return fmt.Errorf("hostedByDID must use did:web method, got: %s", hostedByDID)
355 }
356
357 // Extract domain from did:web DID
358 hostedByDomain := strings.TrimPrefix(hostedByDID, "did:web:")
359
360 // Extract domain from community handle
361 // Handle format examples:
362 // - "!gaming@coves.social" → domain: "coves.social"
363 // - "gaming.community.coves.social" → domain: "coves.social"
364 handleDomain := extractDomainFromHandle(handle)
365 if handleDomain == "" {
366 return fmt.Errorf("failed to extract domain from handle: %s", handle)
367 }
368
369 // Verify handle domain matches hostedBy domain
370 if handleDomain != hostedByDomain {
371 return fmt.Errorf("handle domain (%s) doesn't match hostedBy domain (%s)", handleDomain, hostedByDomain)
372 }
373
374 // Optional: Verify DID document exists and is valid
375 // This provides cryptographic proof of domain ownership
376 if err := c.verifyDIDDocument(ctx, hostedByDID, hostedByDomain); err != nil {
377 // Soft-fail: Log warning but don't reject the community
378 // This allows operation during network issues or .well-known misconfiguration
379 log.Printf("⚠️ WARNING: DID document verification failed for %s: %v", hostedByDomain, err)
380 log.Printf(" Community will be indexed, but hostedBy claim cannot be cryptographically verified")
381 }
382
383 return nil
384}
385
386// verifyDIDDocument fetches and validates the DID document from .well-known/did.json
387// This provides cryptographic proof that the instance controls the domain
388// Results are cached with TTL and rate-limited to prevent DoS attacks
389func (c *CommunityEventConsumer) verifyDIDDocument(ctx context.Context, did, domain string) error {
390 // Skip verification in dev mode
391 if c.skipVerification {
392 return nil
393 }
394
395 // Check bounded LRU cache first (thread-safe, no locks needed)
396 if cached, ok := c.didCache.Get(did); ok {
397 // Check if cache entry is still valid (not expired)
398 if time.Now().Before(cached.expiresAt) {
399 if !cached.valid {
400 return fmt.Errorf("cached verification failure for %s", did)
401 }
402 log.Printf("✓ DID document verification (cached): %s", domain)
403 return nil
404 }
405 // Cache entry expired - remove it to free up space for fresh entries
406 c.didCache.Remove(did)
407 }
408
409 // Rate limit .well-known fetches to prevent DoS
410 if err := c.wellKnownLimiter.Wait(ctx); err != nil {
411 return fmt.Errorf("rate limit exceeded for .well-known fetch: %w", err)
412 }
413
414 // Construct .well-known URL
415 didDocURL := fmt.Sprintf("https://%s/.well-known/did.json", domain)
416
417 // Create HTTP request with timeout
418 req, err := http.NewRequestWithContext(ctx, "GET", didDocURL, nil)
419 if err != nil {
420 // Cache the failure
421 c.cacheVerificationResult(did, false, 5*time.Minute)
422 return fmt.Errorf("failed to create request: %w", err)
423 }
424
425 // Fetch DID document using shared HTTP client
426 resp, err := c.httpClient.Do(req)
427 if err != nil {
428 // Cache the failure (shorter TTL for network errors)
429 c.cacheVerificationResult(did, false, 5*time.Minute)
430 return fmt.Errorf("failed to fetch DID document from %s: %w", didDocURL, err)
431 }
432 defer func() {
433 if closeErr := resp.Body.Close(); closeErr != nil {
434 log.Printf("Failed to close response body: %v", closeErr)
435 }
436 }()
437
438 // Verify HTTP status
439 if resp.StatusCode != http.StatusOK {
440 // Cache the failure
441 c.cacheVerificationResult(did, false, 5*time.Minute)
442 return fmt.Errorf("DID document returned HTTP %d from %s", resp.StatusCode, didDocURL)
443 }
444
445 // Parse DID document
446 var didDoc struct {
447 ID string `json:"id"`
448 }
449 if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil {
450 // Cache the failure
451 c.cacheVerificationResult(did, false, 5*time.Minute)
452 return fmt.Errorf("failed to parse DID document JSON: %w", err)
453 }
454
455 // Verify DID document ID matches claimed DID
456 if didDoc.ID != did {
457 // Cache the failure
458 c.cacheVerificationResult(did, false, 5*time.Minute)
459 return fmt.Errorf("DID document ID (%s) doesn't match claimed DID (%s)", didDoc.ID, did)
460 }
461
462 // Cache the success (1 hour TTL)
463 c.cacheVerificationResult(did, true, 1*time.Hour)
464
465 log.Printf("✓ DID document verified: %s", domain)
466 return nil
467}
468
469// cacheVerificationResult stores a verification result in the bounded LRU cache with the given TTL
470// The LRU cache is thread-safe and automatically evicts least-recently-used entries when full
471func (c *CommunityEventConsumer) cacheVerificationResult(did string, valid bool, ttl time.Duration) {
472 c.didCache.Add(did, cachedDIDDoc{
473 valid: valid,
474 expiresAt: time.Now().Add(ttl),
475 })
476}
477
478// extractDomainFromHandle extracts the registrable domain from a community handle
479// Handles both formats:
480// - Bluesky-style: "!gaming@coves.social" → "coves.social"
481// - DNS-style: "gaming.community.coves.social" → "coves.social"
482//
483// Uses golang.org/x/net/publicsuffix to correctly handle multi-part TLDs:
484// - "gaming.community.coves.co.uk" → "coves.co.uk" (not "co.uk")
485// - "gaming.community.example.com.au" → "example.com.au" (not "com.au")
486func extractDomainFromHandle(handle string) string {
487 // Remove leading ! if present
488 handle = strings.TrimPrefix(handle, "!")
489
490 // Check for @-separated format (e.g., "gaming@coves.social")
491 if strings.Contains(handle, "@") {
492 parts := strings.Split(handle, "@")
493 if len(parts) == 2 {
494 domain := parts[1]
495 // Validate and extract eTLD+1 from the @-domain part
496 registrable, err := publicsuffix.EffectiveTLDPlusOne(domain)
497 if err != nil {
498 // If publicsuffix fails, fall back to returning the full domain part
499 // This handles edge cases like localhost, IP addresses, etc.
500 return domain
501 }
502 return registrable
503 }
504 return ""
505 }
506
507 // For DNS-style handles (e.g., "gaming.community.coves.social")
508 // Extract the registrable domain (eTLD+1) using publicsuffix
509 // This correctly handles multi-part TLDs like .co.uk, .com.au, etc.
510 registrable, err := publicsuffix.EffectiveTLDPlusOne(handle)
511 if err != nil {
512 // If publicsuffix fails (e.g., invalid TLD, localhost, IP address)
513 // fall back to naive extraction (last 2 parts)
514 // This maintains backward compatibility for edge cases
515 parts := strings.Split(handle, ".")
516 if len(parts) < 2 {
517 return "" // Invalid handle
518 }
519 return strings.Join(parts[len(parts)-2:], ".")
520 }
521
522 return registrable
523}
524
525// handleSubscription processes subscription create/delete events
526// CREATE operation = user subscribed to community
527// DELETE operation = user unsubscribed from community
528func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
529 switch commit.Operation {
530 case "create":
531 return c.createSubscription(ctx, userDID, commit)
532 case "delete":
533 return c.deleteSubscription(ctx, userDID, commit)
534 default:
535 // Update operations shouldn't happen on subscriptions, but ignore gracefully
536 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)",
537 commit.Operation, userDID, commit.RKey)
538 return nil
539 }
540}
541
542// createSubscription indexes a new subscription with retry logic
543func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
544 if commit.Record == nil {
545 return fmt.Errorf("subscription create event missing record data")
546 }
547
548 // Extract community DID from record's subject field (following atProto conventions)
549 communityDID, ok := commit.Record["subject"].(string)
550 if !ok {
551 return fmt.Errorf("subscription record missing subject field")
552 }
553
554 // Extract contentVisibility with clamping and default value
555 contentVisibility := extractContentVisibility(commit.Record)
556
557 // Build AT-URI for subscription record
558 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint
559 // The record lives in the USER's repository, but uses the communities namespace
560 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
561
562 // Create subscription entity
563 // Parse createdAt from record to preserve chronological ordering during replays
564 subscription := &communities.Subscription{
565 UserDID: userDID,
566 CommunityDID: communityDID,
567 ContentVisibility: contentVisibility,
568 SubscribedAt: utils.ParseCreatedAt(commit.Record),
569 RecordURI: uri,
570 RecordCID: commit.CID,
571 }
572
573 // Use transactional method to ensure subscription and count are atomically updated
574 // This is idempotent - safe for Jetstream replays
575 _, err := c.repo.SubscribeWithCount(ctx, subscription)
576 if err != nil {
577 // If already exists, that's fine (idempotency)
578 if communities.IsConflict(err) {
579 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)",
580 userDID, communityDID, contentVisibility)
581 return nil
582 }
583 return fmt.Errorf("failed to index subscription: %w", err)
584 }
585
586 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)",
587 userDID, communityDID, contentVisibility)
588 return nil
589}
590
591// deleteSubscription removes a subscription from the index
592// DELETE operations don't include record data, so we need to look up the subscription
593// by its URI to find which community the user unsubscribed from
594func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
595 // Build AT-URI from the rkey
596 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
597
598 // Look up the subscription to get the community DID
599 // (DELETE operations don't include record data in Jetstream)
600 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri)
601 if err != nil {
602 if communities.IsNotFound(err) {
603 // Already deleted - this is fine (idempotency)
604 log.Printf("Subscription already deleted: %s", uri)
605 return nil
606 }
607 return fmt.Errorf("failed to find subscription for deletion: %w", err)
608 }
609
610 // Use transactional method to ensure unsubscribe and count are atomically updated
611 // This is idempotent - safe for Jetstream replays
612 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID)
613 if err != nil {
614 if communities.IsNotFound(err) {
615 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID)
616 return nil
617 }
618 return fmt.Errorf("failed to remove subscription: %w", err)
619 }
620
621 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID)
622 return nil
623}
624
625// handleBlock processes block create/delete events
626// CREATE operation = user blocked a community
627// DELETE operation = user unblocked a community
628func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
629 switch commit.Operation {
630 case "create":
631 return c.createBlock(ctx, userDID, commit)
632 case "delete":
633 return c.deleteBlock(ctx, userDID, commit)
634 default:
635 // Update operations shouldn't happen on blocks, but ignore gracefully
636 log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)",
637 commit.Operation, userDID, commit.RKey)
638 return nil
639 }
640}
641
642// createBlock indexes a new block
643func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
644 if commit.Record == nil {
645 return fmt.Errorf("block create event missing record data")
646 }
647
648 // Extract community DID from record's subject field (following atProto conventions)
649 communityDID, ok := commit.Record["subject"].(string)
650 if !ok {
651 return fmt.Errorf("block record missing subject field")
652 }
653
654 // Build AT-URI for block record
655 // The record lives in the USER's repository
656 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
657
658 // Create block entity
659 // Parse createdAt from record to preserve chronological ordering during replays
660 block := &communities.CommunityBlock{
661 UserDID: userDID,
662 CommunityDID: communityDID,
663 BlockedAt: utils.ParseCreatedAt(commit.Record),
664 RecordURI: uri,
665 RecordCID: commit.CID,
666 }
667
668 // Index the block
669 // This is idempotent - safe for Jetstream replays
670 _, err := c.repo.BlockCommunity(ctx, block)
671 if err != nil {
672 // If already exists, that's fine (idempotency)
673 if communities.IsConflict(err) {
674 log.Printf("Block already indexed: %s -> %s", userDID, communityDID)
675 return nil
676 }
677 return fmt.Errorf("failed to index block: %w", err)
678 }
679
680 log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID)
681 return nil
682}
683
684// deleteBlock removes a block from the index
685// DELETE operations don't include record data, so we need to look up the block
686// by its URI to find which community the user unblocked
687func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
688 // Build AT-URI from the rkey
689 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
690
691 // Look up the block to get the community DID
692 // (DELETE operations don't include record data in Jetstream)
693 block, err := c.repo.GetBlockByURI(ctx, uri)
694 if err != nil {
695 if communities.IsNotFound(err) {
696 // Already deleted - this is fine (idempotency)
697 log.Printf("Block already deleted: %s", uri)
698 return nil
699 }
700 return fmt.Errorf("failed to find block for deletion: %w", err)
701 }
702
703 // Remove the block from the index
704 err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID)
705 if err != nil {
706 if communities.IsNotFound(err) {
707 log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID)
708 return nil
709 }
710 return fmt.Errorf("failed to remove block: %w", err)
711 }
712
713 log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID)
714 return nil
715}
716
717// Helper types and functions
718
719type CommunityProfile struct {
720 CreatedAt time.Time `json:"createdAt"`
721 Avatar map[string]interface{} `json:"avatar"`
722 Banner map[string]interface{} `json:"banner"`
723 CreatedBy string `json:"createdBy"`
724 Visibility string `json:"visibility"`
725 AtprotoHandle string `json:"atprotoHandle"`
726 DisplayName string `json:"displayName"`
727 Name string `json:"name"`
728 Handle string `json:"handle"`
729 HostedBy string `json:"hostedBy"`
730 Description string `json:"description"`
731 FederatedID string `json:"federatedId"`
732 ModerationType string `json:"moderationType"`
733 FederatedFrom string `json:"federatedFrom"`
734 ContentWarnings []string `json:"contentWarnings"`
735 DescriptionFacets []interface{} `json:"descriptionFacets"`
736 MemberCount int `json:"memberCount"`
737 SubscriberCount int `json:"subscriberCount"`
738 Federation FederationConfig `json:"federation"`
739}
740
741type FederationConfig struct {
742 AllowExternalDiscovery bool `json:"allowExternalDiscovery"`
743}
744
745// parseCommunityProfile converts a raw record map to a CommunityProfile
746func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) {
747 recordJSON, err := json.Marshal(record)
748 if err != nil {
749 return nil, fmt.Errorf("failed to marshal record: %w", err)
750 }
751
752 var profile CommunityProfile
753 if err := json.Unmarshal(recordJSON, &profile); err != nil {
754 return nil, fmt.Errorf("failed to unmarshal profile: %w", err)
755 }
756
757 return &profile, nil
758}
759
760// constructHandleFromProfile constructs a deterministic handle from profile data
761// Format: {name}.community.{instanceDomain}
762// Example: gaming.community.coves.social
763// This is ONLY used in test mode (when identity resolver is nil)
764// Production MUST resolve handles from PLC (source of truth)
765// Returns empty string if hostedBy is not did:web format (caller will fail validation)
766func constructHandleFromProfile(profile *CommunityProfile) string {
767 if !strings.HasPrefix(profile.HostedBy, "did:web:") {
768 // hostedBy must be did:web format for handle construction
769 // Return empty to trigger validation error in repository
770 return ""
771 }
772 instanceDomain := strings.TrimPrefix(profile.HostedBy, "did:web:")
773 return fmt.Sprintf("%s.community.%s", profile.Name, instanceDomain)
774}
775
776// extractContentVisibility extracts contentVisibility from subscription record with clamping
777// Returns default value of 3 if missing or invalid
778func extractContentVisibility(record map[string]interface{}) int {
779 const defaultVisibility = 3
780
781 cv, ok := record["contentVisibility"]
782 if !ok {
783 // Field missing - use default
784 return defaultVisibility
785 }
786
787 // JSON numbers decode as float64
788 cvFloat, ok := cv.(float64)
789 if !ok {
790 // Try int (shouldn't happen but handle gracefully)
791 if cvInt, isInt := cv.(int); isInt {
792 return clampContentVisibility(cvInt)
793 }
794 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv)
795 return defaultVisibility
796 }
797
798 // Convert and clamp
799 clamped := clampContentVisibility(int(cvFloat))
800 if clamped != int(cvFloat) {
801 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped)
802 }
803 return clamped
804}
805
806// clampContentVisibility ensures value is within valid range (1-5)
807func clampContentVisibility(value int) int {
808 if value < 1 {
809 return 1
810 }
811 if value > 5 {
812 return 5
813 }
814 return value
815}
816
817// extractBlobCID extracts the CID from a blob reference
818// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123}
819func extractBlobCID(blob map[string]interface{}) (string, bool) {
820 if blob == nil {
821 return "", false
822 }
823
824 // Check if it's a blob type
825 blobType, ok := blob["$type"].(string)
826 if !ok || blobType != "blob" {
827 return "", false
828 }
829
830 // Extract ref
831 ref, ok := blob["ref"].(map[string]interface{})
832 if !ok {
833 return "", false
834 }
835
836 // Extract $link (the CID)
837 link, ok := ref["$link"].(string)
838 if !ok {
839 return "", false
840 }
841
842 return link, true
843}