A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "net/http"
9 "strings"
10 "time"
11
12 "Coves/internal/atproto/identity"
13 "Coves/internal/atproto/utils"
14 "Coves/internal/core/communities"
15
16 lru "github.com/hashicorp/golang-lru/v2"
17 "golang.org/x/net/publicsuffix"
18 "golang.org/x/time/rate"
19)
20
21// CommunityEventConsumer consumes community-related events from Jetstream
22type CommunityEventConsumer struct {
23 repo communities.Repository // Repository for community operations
24 identityResolver interface {
25 Resolve(context.Context, string) (*identity.Identity, error)
26 } // For resolving handles from DIDs
27 httpClient *http.Client // Shared HTTP client with connection pooling
28 didCache *lru.Cache[string, cachedDIDDoc] // Bounded LRU cache for .well-known verification results
29 wellKnownLimiter *rate.Limiter // Rate limiter for .well-known fetches
30 instanceDID string // DID of this Coves instance
31 skipVerification bool // Skip did:web verification (for dev mode)
32}
33
34// cachedDIDDoc represents a cached verification result with expiration
35type cachedDIDDoc struct {
36 expiresAt time.Time // When this cache entry expires
37 valid bool // Whether verification passed
38}
39
40// NewCommunityEventConsumer creates a new Jetstream consumer for community events
41// instanceDID: The DID of this Coves instance (for hostedBy verification)
42// skipVerification: Skip did:web verification (for dev mode)
43// identityResolver: Optional resolver for resolving handles from DIDs (can be nil for tests)
44func NewCommunityEventConsumer(repo communities.Repository, instanceDID string, skipVerification bool, identityResolver interface {
45 Resolve(context.Context, string) (*identity.Identity, error)
46},
47) *CommunityEventConsumer {
48 // Create bounded LRU cache for DID document verification results
49 // Max 1000 entries to prevent unbounded memory growth (PR review feedback)
50 // Each entry ~100 bytes → max ~100KB memory overhead
51 cache, err := lru.New[string, cachedDIDDoc](1000)
52 if err != nil {
53 // This should never happen with a valid size, but handle gracefully
54 log.Printf("WARNING: Failed to create DID cache, verification will be slower: %v", err)
55 // Create minimal cache to avoid nil pointer
56 cache, _ = lru.New[string, cachedDIDDoc](1)
57 }
58
59 return &CommunityEventConsumer{
60 repo: repo,
61 identityResolver: identityResolver, // Optional - can be nil for tests
62 instanceDID: instanceDID,
63 skipVerification: skipVerification,
64 // Shared HTTP client with connection pooling for .well-known fetches
65 httpClient: &http.Client{
66 Timeout: 10 * time.Second,
67 Transport: &http.Transport{
68 MaxIdleConns: 100,
69 MaxIdleConnsPerHost: 10,
70 IdleConnTimeout: 90 * time.Second,
71 },
72 },
73 // Bounded LRU cache for .well-known verification results (max 1000 entries)
74 // Automatically evicts least-recently-used entries when full
75 didCache: cache,
76 // Rate limiter: 10 requests per second, burst of 20
77 // Prevents DoS via excessive .well-known fetches
78 wellKnownLimiter: rate.NewLimiter(10, 20),
79 }
80}
81
82// HandleEvent processes a Jetstream event for community records
83// This is called by the main Jetstream consumer when it receives commit events
84func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
85 // We only care about commit events for community records
86 if event.Kind != "commit" || event.Commit == nil {
87 return nil
88 }
89
90 commit := event.Commit
91
92 // Route to appropriate handler based on collection
93 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures
94 // - social.coves.community.profile: Community profile records (in community's own repo)
95 // - social.coves.community.subscription: Subscription records (in user's repo)
96 // - social.coves.community.block: Block records (in user's repo)
97 //
98 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints
99 // that CREATE or DELETE records in these collections
100 switch commit.Collection {
101 case "social.coves.community.profile":
102 return c.handleCommunityProfile(ctx, event.Did, commit)
103 case "social.coves.community.subscription":
104 // Handle both create (subscribe) and delete (unsubscribe) operations
105 return c.handleSubscription(ctx, event.Did, commit)
106 case "social.coves.community.block":
107 // Handle both create (block) and delete (unblock) operations
108 return c.handleBlock(ctx, event.Did, commit)
109 default:
110 // Not a community-related collection
111 return nil
112 }
113}
114
115// handleCommunityProfile processes community profile create/update/delete events
116func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error {
117 switch commit.Operation {
118 case "create":
119 return c.createCommunity(ctx, did, commit)
120 case "update":
121 return c.updateCommunity(ctx, did, commit)
122 case "delete":
123 return c.deleteCommunity(ctx, did)
124 default:
125 log.Printf("Unknown operation for community profile: %s", commit.Operation)
126 return nil
127 }
128}
129
130// createCommunity indexes a new community from the firehose
131func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error {
132 if commit.Record == nil {
133 return fmt.Errorf("community profile create event missing record data")
134 }
135
136 // Parse the community profile record
137 profile, err := parseCommunityProfile(commit.Record)
138 if err != nil {
139 return fmt.Errorf("failed to parse community profile: %w", err)
140 }
141
142 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs)
143 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID
144 if profile.Handle == "" {
145 if c.identityResolver != nil {
146 // Production: Resolve handle from PLC (source of truth)
147 // NO FALLBACK - if PLC is down, we fail and backfill later
148 // This prevents creating communities with incorrect handles in federated scenarios
149 identity, err := c.identityResolver.Resolve(ctx, did)
150 if err != nil {
151 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err)
152 }
153 profile.Handle = identity.Handle
154 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)",
155 profile.Handle, did, identity.Method)
156 } else {
157 // Test mode only: construct deterministically when no resolver available
158 profile.Handle = constructHandleFromProfile(profile)
159 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)",
160 profile.Handle, profile.Name, profile.HostedBy)
161 }
162 }
163
164 // SECURITY: Verify hostedBy claim matches handle domain
165 // This prevents malicious instances from claiming to host communities for domains they don't own
166 if err := c.verifyHostedByClaim(ctx, profile.Handle, profile.HostedBy); err != nil {
167 log.Printf("🚨 SECURITY: Rejecting community %s - hostedBy verification failed: %v", did, err)
168 log.Printf(" Handle: %s, HostedBy: %s", profile.Handle, profile.HostedBy)
169 return fmt.Errorf("hostedBy verification failed: %w", err)
170 }
171
172 // Build AT-URI for this record
173 // V2 Architecture (ONLY):
174 // - 'did' parameter IS the community DID (community owns its own repo)
175 // - rkey MUST be "self" for community profiles
176 // - URI: at://community_did/social.coves.community.profile/self
177
178 // REJECT non-V2 communities (pre-production: no V1 compatibility)
179 if commit.RKey != "self" {
180 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
181 }
182
183 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did)
184
185 // V2: Community ALWAYS owns itself
186 ownerDID := did
187
188 // Create community entity
189 community := &communities.Community{
190 DID: did, // V2: Repository DID IS the community DID
191 Handle: profile.Handle,
192 Name: profile.Name,
193 DisplayName: profile.DisplayName,
194 Description: profile.Description,
195 OwnerDID: ownerDID, // V2: same as DID (self-owned)
196 CreatedByDID: profile.CreatedBy,
197 HostedByDID: profile.HostedBy,
198 Visibility: profile.Visibility,
199 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery,
200 ModerationType: profile.ModerationType,
201 ContentWarnings: profile.ContentWarnings,
202 MemberCount: profile.MemberCount,
203 SubscriberCount: profile.SubscriberCount,
204 FederatedFrom: profile.FederatedFrom,
205 FederatedID: profile.FederatedID,
206 CreatedAt: profile.CreatedAt,
207 UpdatedAt: time.Now(),
208 RecordURI: uri,
209 RecordCID: commit.CID,
210 }
211
212 // Handle blobs (avatar/banner) if present
213 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
214 community.AvatarCID = avatarCID
215 }
216 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
217 community.BannerCID = bannerCID
218 }
219
220 // Handle description facets (rich text)
221 if profile.DescriptionFacets != nil {
222 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
223 if marshalErr == nil {
224 community.DescriptionFacets = facetsJSON
225 }
226 }
227
228 // Index in AppView database
229 _, err = c.repo.Create(ctx, community)
230 if err != nil {
231 // Check if it already exists (idempotency)
232 if communities.IsConflict(err) {
233 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID)
234 return nil
235 }
236 return fmt.Errorf("failed to index community: %w", err)
237 }
238
239 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID)
240 return nil
241}
242
243// updateCommunity updates an existing community from the firehose
244func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error {
245 if commit.Record == nil {
246 return fmt.Errorf("community profile update event missing record data")
247 }
248
249 // REJECT non-V2 communities (pre-production: no V1 compatibility)
250 if commit.RKey != "self" {
251 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
252 }
253
254 // Parse profile
255 profile, err := parseCommunityProfile(commit.Record)
256 if err != nil {
257 return fmt.Errorf("failed to parse community profile: %w", err)
258 }
259
260 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs)
261 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID
262 if profile.Handle == "" {
263 if c.identityResolver != nil {
264 // Production: Resolve handle from PLC (source of truth)
265 // NO FALLBACK - if PLC is down, we fail and backfill later
266 // This prevents creating communities with incorrect handles in federated scenarios
267 identity, err := c.identityResolver.Resolve(ctx, did)
268 if err != nil {
269 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err)
270 }
271 profile.Handle = identity.Handle
272 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)",
273 profile.Handle, did, identity.Method)
274 } else {
275 // Test mode only: construct deterministically when no resolver available
276 profile.Handle = constructHandleFromProfile(profile)
277 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)",
278 profile.Handle, profile.Name, profile.HostedBy)
279 }
280 }
281
282 // V2: Repository DID IS the community DID
283 // Get existing community using the repo DID
284 existing, err := c.repo.GetByDID(ctx, did)
285 if err != nil {
286 if communities.IsNotFound(err) {
287 // Community doesn't exist yet - treat as create
288 log.Printf("Community not found for update, creating: %s", did)
289 return c.createCommunity(ctx, did, commit)
290 }
291 return fmt.Errorf("failed to get existing community: %w", err)
292 }
293
294 // Update fields
295 existing.Handle = profile.Handle
296 existing.Name = profile.Name
297 existing.DisplayName = profile.DisplayName
298 existing.Description = profile.Description
299 existing.Visibility = profile.Visibility
300 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery
301 existing.ModerationType = profile.ModerationType
302 existing.ContentWarnings = profile.ContentWarnings
303 existing.RecordCID = commit.CID
304
305 // Update blobs
306 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
307 existing.AvatarCID = avatarCID
308 }
309 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
310 existing.BannerCID = bannerCID
311 }
312
313 // Update description facets
314 if profile.DescriptionFacets != nil {
315 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
316 if marshalErr == nil {
317 existing.DescriptionFacets = facetsJSON
318 }
319 }
320
321 // Save updates
322 _, err = c.repo.Update(ctx, existing)
323 if err != nil {
324 return fmt.Errorf("failed to update community: %w", err)
325 }
326
327 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID)
328 return nil
329}
330
331// deleteCommunity removes a community from the index
332func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error {
333 err := c.repo.Delete(ctx, did)
334 if err != nil {
335 if communities.IsNotFound(err) {
336 log.Printf("Community already deleted: %s", did)
337 return nil
338 }
339 return fmt.Errorf("failed to delete community: %w", err)
340 }
341
342 log.Printf("Deleted community: %s", did)
343 return nil
344}
345
346// verifyHostedByClaim verifies that the community's hostedBy claim matches the handle domain
347// This prevents malicious instances from claiming to host communities for domains they don't own
348func (c *CommunityEventConsumer) verifyHostedByClaim(ctx context.Context, handle, hostedByDID string) error {
349 // Skip verification in dev mode
350 if c.skipVerification {
351 return nil
352 }
353
354 // Add 15 second overall timeout to prevent slow verification from blocking consumer (PR review feedback)
355 ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
356 defer cancel()
357
358 // Verify hostedByDID is did:web format
359 if !strings.HasPrefix(hostedByDID, "did:web:") {
360 return fmt.Errorf("hostedByDID must use did:web method, got: %s", hostedByDID)
361 }
362
363 // Extract domain from did:web DID
364 hostedByDomain := strings.TrimPrefix(hostedByDID, "did:web:")
365
366 // Extract domain from community handle
367 // Handle format examples:
368 // - "!gaming@coves.social" → domain: "coves.social"
369 // - "gaming.community.coves.social" → domain: "coves.social"
370 handleDomain := extractDomainFromHandle(handle)
371 if handleDomain == "" {
372 return fmt.Errorf("failed to extract domain from handle: %s", handle)
373 }
374
375 // Verify handle domain matches hostedBy domain
376 if handleDomain != hostedByDomain {
377 return fmt.Errorf("handle domain (%s) doesn't match hostedBy domain (%s)", handleDomain, hostedByDomain)
378 }
379
380 // Optional: Verify DID document exists and is valid
381 // This provides cryptographic proof of domain ownership
382 if err := c.verifyDIDDocument(ctx, hostedByDID, hostedByDomain); err != nil {
383 // Soft-fail: Log warning but don't reject the community
384 // This allows operation during network issues or .well-known misconfiguration
385 log.Printf("⚠️ WARNING: DID document verification failed for %s: %v", hostedByDomain, err)
386 log.Printf(" Community will be indexed, but hostedBy claim cannot be cryptographically verified")
387 }
388
389 return nil
390}
391
392// verifyDIDDocument fetches and validates the DID document from .well-known/did.json
393// This provides cryptographic proof that the instance controls the domain
394// Results are cached with TTL and rate-limited to prevent DoS attacks
395func (c *CommunityEventConsumer) verifyDIDDocument(ctx context.Context, did, domain string) error {
396 // Skip verification in dev mode
397 if c.skipVerification {
398 return nil
399 }
400
401 // Check bounded LRU cache first (thread-safe, no locks needed)
402 if cached, ok := c.didCache.Get(did); ok {
403 // Check if cache entry is still valid (not expired)
404 if time.Now().Before(cached.expiresAt) {
405 if !cached.valid {
406 return fmt.Errorf("cached verification failure for %s", did)
407 }
408 log.Printf("✓ DID document verification (cached): %s", domain)
409 return nil
410 }
411 // Cache entry expired - remove it to free up space for fresh entries
412 c.didCache.Remove(did)
413 }
414
415 // Rate limit .well-known fetches to prevent DoS
416 if err := c.wellKnownLimiter.Wait(ctx); err != nil {
417 return fmt.Errorf("rate limit exceeded for .well-known fetch: %w", err)
418 }
419
420 // Construct .well-known URL
421 didDocURL := fmt.Sprintf("https://%s/.well-known/did.json", domain)
422
423 // Create HTTP request with timeout
424 req, err := http.NewRequestWithContext(ctx, "GET", didDocURL, nil)
425 if err != nil {
426 // Cache the failure
427 c.cacheVerificationResult(did, false, 5*time.Minute)
428 return fmt.Errorf("failed to create request: %w", err)
429 }
430
431 // Fetch DID document using shared HTTP client
432 resp, err := c.httpClient.Do(req)
433 if err != nil {
434 // Cache the failure (shorter TTL for network errors)
435 c.cacheVerificationResult(did, false, 5*time.Minute)
436 return fmt.Errorf("failed to fetch DID document from %s: %w", didDocURL, err)
437 }
438 defer func() {
439 if closeErr := resp.Body.Close(); closeErr != nil {
440 log.Printf("Failed to close response body: %v", closeErr)
441 }
442 }()
443
444 // Verify HTTP status
445 if resp.StatusCode != http.StatusOK {
446 // Cache the failure
447 c.cacheVerificationResult(did, false, 5*time.Minute)
448 return fmt.Errorf("DID document returned HTTP %d from %s", resp.StatusCode, didDocURL)
449 }
450
451 // Parse DID document
452 var didDoc struct {
453 ID string `json:"id"`
454 }
455 if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil {
456 // Cache the failure
457 c.cacheVerificationResult(did, false, 5*time.Minute)
458 return fmt.Errorf("failed to parse DID document JSON: %w", err)
459 }
460
461 // Verify DID document ID matches claimed DID
462 if didDoc.ID != did {
463 // Cache the failure
464 c.cacheVerificationResult(did, false, 5*time.Minute)
465 return fmt.Errorf("DID document ID (%s) doesn't match claimed DID (%s)", didDoc.ID, did)
466 }
467
468 // Cache the success (1 hour TTL)
469 c.cacheVerificationResult(did, true, 1*time.Hour)
470
471 log.Printf("✓ DID document verified: %s", domain)
472 return nil
473}
474
475// cacheVerificationResult stores a verification result in the bounded LRU cache with the given TTL
476// The LRU cache is thread-safe and automatically evicts least-recently-used entries when full
477func (c *CommunityEventConsumer) cacheVerificationResult(did string, valid bool, ttl time.Duration) {
478 c.didCache.Add(did, cachedDIDDoc{
479 valid: valid,
480 expiresAt: time.Now().Add(ttl),
481 })
482}
483
484// extractDomainFromHandle extracts the registrable domain from a community handle
485// Handles both formats:
486// - Bluesky-style: "!gaming@coves.social" → "coves.social"
487// - DNS-style: "gaming.community.coves.social" → "coves.social"
488//
489// Uses golang.org/x/net/publicsuffix to correctly handle multi-part TLDs:
490// - "gaming.community.coves.co.uk" → "coves.co.uk" (not "co.uk")
491// - "gaming.community.example.com.au" → "example.com.au" (not "com.au")
492func extractDomainFromHandle(handle string) string {
493 // Remove leading ! if present
494 handle = strings.TrimPrefix(handle, "!")
495
496 // Check for @-separated format (e.g., "gaming@coves.social")
497 if strings.Contains(handle, "@") {
498 parts := strings.Split(handle, "@")
499 if len(parts) == 2 {
500 domain := parts[1]
501 // Validate and extract eTLD+1 from the @-domain part
502 registrable, err := publicsuffix.EffectiveTLDPlusOne(domain)
503 if err != nil {
504 // If publicsuffix fails, fall back to returning the full domain part
505 // This handles edge cases like localhost, IP addresses, etc.
506 return domain
507 }
508 return registrable
509 }
510 return ""
511 }
512
513 // For DNS-style handles (e.g., "gaming.community.coves.social")
514 // Extract the registrable domain (eTLD+1) using publicsuffix
515 // This correctly handles multi-part TLDs like .co.uk, .com.au, etc.
516 registrable, err := publicsuffix.EffectiveTLDPlusOne(handle)
517 if err != nil {
518 // If publicsuffix fails (e.g., invalid TLD, localhost, IP address)
519 // fall back to naive extraction (last 2 parts)
520 // This maintains backward compatibility for edge cases
521 parts := strings.Split(handle, ".")
522 if len(parts) < 2 {
523 return "" // Invalid handle
524 }
525 return strings.Join(parts[len(parts)-2:], ".")
526 }
527
528 return registrable
529}
530
531// handleSubscription processes subscription create/delete events
532// CREATE operation = user subscribed to community
533// DELETE operation = user unsubscribed from community
534func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
535 switch commit.Operation {
536 case "create":
537 return c.createSubscription(ctx, userDID, commit)
538 case "delete":
539 return c.deleteSubscription(ctx, userDID, commit)
540 default:
541 // Update operations shouldn't happen on subscriptions, but ignore gracefully
542 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)",
543 commit.Operation, userDID, commit.RKey)
544 return nil
545 }
546}
547
548// createSubscription indexes a new subscription with retry logic
549func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
550 if commit.Record == nil {
551 return fmt.Errorf("subscription create event missing record data")
552 }
553
554 // Extract community DID from record's subject field (following atProto conventions)
555 communityDID, ok := commit.Record["subject"].(string)
556 if !ok {
557 return fmt.Errorf("subscription record missing subject field")
558 }
559
560 // Extract contentVisibility with clamping and default value
561 contentVisibility := extractContentVisibility(commit.Record)
562
563 // Build AT-URI for subscription record
564 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint
565 // The record lives in the USER's repository, but uses the communities namespace
566 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
567
568 // Create subscription entity
569 // Parse createdAt from record to preserve chronological ordering during replays
570 subscription := &communities.Subscription{
571 UserDID: userDID,
572 CommunityDID: communityDID,
573 ContentVisibility: contentVisibility,
574 SubscribedAt: utils.ParseCreatedAt(commit.Record),
575 RecordURI: uri,
576 RecordCID: commit.CID,
577 }
578
579 // Use transactional method to ensure subscription and count are atomically updated
580 // This is idempotent - safe for Jetstream replays
581 _, err := c.repo.SubscribeWithCount(ctx, subscription)
582 if err != nil {
583 // If already exists, that's fine (idempotency)
584 if communities.IsConflict(err) {
585 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)",
586 userDID, communityDID, contentVisibility)
587 return nil
588 }
589 return fmt.Errorf("failed to index subscription: %w", err)
590 }
591
592 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)",
593 userDID, communityDID, contentVisibility)
594 return nil
595}
596
597// deleteSubscription removes a subscription from the index
598// DELETE operations don't include record data, so we need to look up the subscription
599// by its URI to find which community the user unsubscribed from
600func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
601 // Build AT-URI from the rkey
602 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
603
604 // Look up the subscription to get the community DID
605 // (DELETE operations don't include record data in Jetstream)
606 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri)
607 if err != nil {
608 if communities.IsNotFound(err) {
609 // Already deleted - this is fine (idempotency)
610 log.Printf("Subscription already deleted: %s", uri)
611 return nil
612 }
613 return fmt.Errorf("failed to find subscription for deletion: %w", err)
614 }
615
616 // Use transactional method to ensure unsubscribe and count are atomically updated
617 // This is idempotent - safe for Jetstream replays
618 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID)
619 if err != nil {
620 if communities.IsNotFound(err) {
621 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID)
622 return nil
623 }
624 return fmt.Errorf("failed to remove subscription: %w", err)
625 }
626
627 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID)
628 return nil
629}
630
631// handleBlock processes block create/delete events
632// CREATE operation = user blocked a community
633// DELETE operation = user unblocked a community
634func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
635 switch commit.Operation {
636 case "create":
637 return c.createBlock(ctx, userDID, commit)
638 case "delete":
639 return c.deleteBlock(ctx, userDID, commit)
640 default:
641 // Update operations shouldn't happen on blocks, but ignore gracefully
642 log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)",
643 commit.Operation, userDID, commit.RKey)
644 return nil
645 }
646}
647
648// createBlock indexes a new block
649func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
650 if commit.Record == nil {
651 return fmt.Errorf("block create event missing record data")
652 }
653
654 // Extract community DID from record's subject field (following atProto conventions)
655 communityDID, ok := commit.Record["subject"].(string)
656 if !ok {
657 return fmt.Errorf("block record missing subject field")
658 }
659
660 // Build AT-URI for block record
661 // The record lives in the USER's repository
662 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
663
664 // Create block entity
665 // Parse createdAt from record to preserve chronological ordering during replays
666 block := &communities.CommunityBlock{
667 UserDID: userDID,
668 CommunityDID: communityDID,
669 BlockedAt: utils.ParseCreatedAt(commit.Record),
670 RecordURI: uri,
671 RecordCID: commit.CID,
672 }
673
674 // Index the block
675 // This is idempotent - safe for Jetstream replays
676 _, err := c.repo.BlockCommunity(ctx, block)
677 if err != nil {
678 // If already exists, that's fine (idempotency)
679 if communities.IsConflict(err) {
680 log.Printf("Block already indexed: %s -> %s", userDID, communityDID)
681 return nil
682 }
683 return fmt.Errorf("failed to index block: %w", err)
684 }
685
686 log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID)
687 return nil
688}
689
690// deleteBlock removes a block from the index
691// DELETE operations don't include record data, so we need to look up the block
692// by its URI to find which community the user unblocked
693func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
694 // Build AT-URI from the rkey
695 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
696
697 // Look up the block to get the community DID
698 // (DELETE operations don't include record data in Jetstream)
699 block, err := c.repo.GetBlockByURI(ctx, uri)
700 if err != nil {
701 if communities.IsNotFound(err) {
702 // Already deleted - this is fine (idempotency)
703 log.Printf("Block already deleted: %s", uri)
704 return nil
705 }
706 return fmt.Errorf("failed to find block for deletion: %w", err)
707 }
708
709 // Remove the block from the index
710 err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID)
711 if err != nil {
712 if communities.IsNotFound(err) {
713 log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID)
714 return nil
715 }
716 return fmt.Errorf("failed to remove block: %w", err)
717 }
718
719 log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID)
720 return nil
721}
722
723// Helper types and functions
724
725type CommunityProfile struct {
726 CreatedAt time.Time `json:"createdAt"`
727 Avatar map[string]interface{} `json:"avatar"`
728 Banner map[string]interface{} `json:"banner"`
729 CreatedBy string `json:"createdBy"`
730 Visibility string `json:"visibility"`
731 AtprotoHandle string `json:"atprotoHandle"`
732 DisplayName string `json:"displayName"`
733 Name string `json:"name"`
734 Handle string `json:"handle"`
735 HostedBy string `json:"hostedBy"`
736 Description string `json:"description"`
737 FederatedID string `json:"federatedId"`
738 ModerationType string `json:"moderationType"`
739 FederatedFrom string `json:"federatedFrom"`
740 ContentWarnings []string `json:"contentWarnings"`
741 DescriptionFacets []interface{} `json:"descriptionFacets"`
742 MemberCount int `json:"memberCount"`
743 SubscriberCount int `json:"subscriberCount"`
744 Federation FederationConfig `json:"federation"`
745}
746
747type FederationConfig struct {
748 AllowExternalDiscovery bool `json:"allowExternalDiscovery"`
749}
750
751// parseCommunityProfile converts a raw record map to a CommunityProfile
752func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) {
753 recordJSON, err := json.Marshal(record)
754 if err != nil {
755 return nil, fmt.Errorf("failed to marshal record: %w", err)
756 }
757
758 var profile CommunityProfile
759 if err := json.Unmarshal(recordJSON, &profile); err != nil {
760 return nil, fmt.Errorf("failed to unmarshal profile: %w", err)
761 }
762
763 return &profile, nil
764}
765
766// constructHandleFromProfile constructs a deterministic handle from profile data
767// Format: {name}.community.{instanceDomain}
768// Example: gaming.community.coves.social
769// This is ONLY used in test mode (when identity resolver is nil)
770// Production MUST resolve handles from PLC (source of truth)
771// Returns empty string if hostedBy is not did:web format (caller will fail validation)
772func constructHandleFromProfile(profile *CommunityProfile) string {
773 if !strings.HasPrefix(profile.HostedBy, "did:web:") {
774 // hostedBy must be did:web format for handle construction
775 // Return empty to trigger validation error in repository
776 return ""
777 }
778 instanceDomain := strings.TrimPrefix(profile.HostedBy, "did:web:")
779 return fmt.Sprintf("%s.community.%s", profile.Name, instanceDomain)
780}
781
782// extractContentVisibility extracts contentVisibility from subscription record with clamping
783// Returns default value of 3 if missing or invalid
784func extractContentVisibility(record map[string]interface{}) int {
785 const defaultVisibility = 3
786
787 cv, ok := record["contentVisibility"]
788 if !ok {
789 // Field missing - use default
790 return defaultVisibility
791 }
792
793 // JSON numbers decode as float64
794 cvFloat, ok := cv.(float64)
795 if !ok {
796 // Try int (shouldn't happen but handle gracefully)
797 if cvInt, isInt := cv.(int); isInt {
798 return clampContentVisibility(cvInt)
799 }
800 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv)
801 return defaultVisibility
802 }
803
804 // Convert and clamp
805 clamped := clampContentVisibility(int(cvFloat))
806 if clamped != int(cvFloat) {
807 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped)
808 }
809 return clamped
810}
811
812// clampContentVisibility ensures value is within valid range (1-5)
813func clampContentVisibility(value int) int {
814 if value < 1 {
815 return 1
816 }
817 if value > 5 {
818 return 5
819 }
820 return value
821}
822
823// extractBlobCID extracts the CID from a blob reference
824// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123}
825func extractBlobCID(blob map[string]interface{}) (string, bool) {
826 if blob == nil {
827 return "", false
828 }
829
830 // Check if it's a blob type
831 blobType, ok := blob["$type"].(string)
832 if !ok || blobType != "blob" {
833 return "", false
834 }
835
836 // Extract ref
837 ref, ok := blob["ref"].(map[string]interface{})
838 if !ok {
839 return "", false
840 }
841
842 // Extract $link (the CID)
843 link, ok := ref["$link"].(string)
844 if !ok {
845 return "", false
846 }
847
848 return link, true
849}