A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/atproto/utils"
5 "Coves/internal/core/communities"
6 "context"
7 "encoding/json"
8 "fmt"
9 "log"
10 "net/http"
11 "strings"
12 "time"
13
14 lru "github.com/hashicorp/golang-lru/v2"
15 "golang.org/x/net/publicsuffix"
16 "golang.org/x/time/rate"
17)
18
19// CommunityEventConsumer consumes community-related events from Jetstream
20type CommunityEventConsumer struct {
21 repo communities.Repository // Repository for community operations
22 httpClient *http.Client // Shared HTTP client with connection pooling
23 didCache *lru.Cache[string, cachedDIDDoc] // Bounded LRU cache for .well-known verification results
24 wellKnownLimiter *rate.Limiter // Rate limiter for .well-known fetches
25 instanceDID string // DID of this Coves instance
26 skipVerification bool // Skip did:web verification (for dev mode)
27}
28
29// cachedDIDDoc represents a cached verification result with expiration
30type cachedDIDDoc struct {
31 expiresAt time.Time // When this cache entry expires
32 valid bool // Whether verification passed
33}
34
35// NewCommunityEventConsumer creates a new Jetstream consumer for community events
36// instanceDID: The DID of this Coves instance (for hostedBy verification)
37// skipVerification: Skip did:web verification (for dev mode)
38func NewCommunityEventConsumer(repo communities.Repository, instanceDID string, skipVerification bool) *CommunityEventConsumer {
39 // Create bounded LRU cache for DID document verification results
40 // Max 1000 entries to prevent unbounded memory growth (PR review feedback)
41 // Each entry ~100 bytes → max ~100KB memory overhead
42 cache, err := lru.New[string, cachedDIDDoc](1000)
43 if err != nil {
44 // This should never happen with a valid size, but handle gracefully
45 log.Printf("WARNING: Failed to create DID cache, verification will be slower: %v", err)
46 // Create minimal cache to avoid nil pointer
47 cache, _ = lru.New[string, cachedDIDDoc](1)
48 }
49
50 return &CommunityEventConsumer{
51 repo: repo,
52 instanceDID: instanceDID,
53 skipVerification: skipVerification,
54 // Shared HTTP client with connection pooling for .well-known fetches
55 httpClient: &http.Client{
56 Timeout: 10 * time.Second,
57 Transport: &http.Transport{
58 MaxIdleConns: 100,
59 MaxIdleConnsPerHost: 10,
60 IdleConnTimeout: 90 * time.Second,
61 },
62 },
63 // Bounded LRU cache for .well-known verification results (max 1000 entries)
64 // Automatically evicts least-recently-used entries when full
65 didCache: cache,
66 // Rate limiter: 10 requests per second, burst of 20
67 // Prevents DoS via excessive .well-known fetches
68 wellKnownLimiter: rate.NewLimiter(10, 20),
69 }
70}
71
72// HandleEvent processes a Jetstream event for community records
73// This is called by the main Jetstream consumer when it receives commit events
74func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
75 // We only care about commit events for community records
76 if event.Kind != "commit" || event.Commit == nil {
77 return nil
78 }
79
80 commit := event.Commit
81
82 // Route to appropriate handler based on collection
83 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures
84 // - social.coves.community.profile: Community profile records (in community's own repo)
85 // - social.coves.community.subscription: Subscription records (in user's repo)
86 // - social.coves.community.block: Block records (in user's repo)
87 //
88 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints
89 // that CREATE or DELETE records in these collections
90 switch commit.Collection {
91 case "social.coves.community.profile":
92 return c.handleCommunityProfile(ctx, event.Did, commit)
93 case "social.coves.community.subscription":
94 // Handle both create (subscribe) and delete (unsubscribe) operations
95 return c.handleSubscription(ctx, event.Did, commit)
96 case "social.coves.community.block":
97 // Handle both create (block) and delete (unblock) operations
98 return c.handleBlock(ctx, event.Did, commit)
99 default:
100 // Not a community-related collection
101 return nil
102 }
103}
104
105// handleCommunityProfile processes community profile create/update/delete events
106func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error {
107 switch commit.Operation {
108 case "create":
109 return c.createCommunity(ctx, did, commit)
110 case "update":
111 return c.updateCommunity(ctx, did, commit)
112 case "delete":
113 return c.deleteCommunity(ctx, did)
114 default:
115 log.Printf("Unknown operation for community profile: %s", commit.Operation)
116 return nil
117 }
118}
119
120// createCommunity indexes a new community from the firehose
121func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error {
122 if commit.Record == nil {
123 return fmt.Errorf("community profile create event missing record data")
124 }
125
126 // Parse the community profile record
127 profile, err := parseCommunityProfile(commit.Record)
128 if err != nil {
129 return fmt.Errorf("failed to parse community profile: %w", err)
130 }
131
132 // SECURITY: Verify hostedBy claim matches handle domain
133 // This prevents malicious instances from claiming to host communities for domains they don't own
134 if err := c.verifyHostedByClaim(ctx, profile.Handle, profile.HostedBy); err != nil {
135 log.Printf("🚨 SECURITY: Rejecting community %s - hostedBy verification failed: %v", did, err)
136 log.Printf(" Handle: %s, HostedBy: %s", profile.Handle, profile.HostedBy)
137 return fmt.Errorf("hostedBy verification failed: %w", err)
138 }
139
140 // Build AT-URI for this record
141 // V2 Architecture (ONLY):
142 // - 'did' parameter IS the community DID (community owns its own repo)
143 // - rkey MUST be "self" for community profiles
144 // - URI: at://community_did/social.coves.community.profile/self
145
146 // REJECT non-V2 communities (pre-production: no V1 compatibility)
147 if commit.RKey != "self" {
148 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
149 }
150
151 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did)
152
153 // V2: Community ALWAYS owns itself
154 ownerDID := did
155
156 // Create community entity
157 community := &communities.Community{
158 DID: did, // V2: Repository DID IS the community DID
159 Handle: profile.Handle,
160 Name: profile.Name,
161 DisplayName: profile.DisplayName,
162 Description: profile.Description,
163 OwnerDID: ownerDID, // V2: same as DID (self-owned)
164 CreatedByDID: profile.CreatedBy,
165 HostedByDID: profile.HostedBy,
166 Visibility: profile.Visibility,
167 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery,
168 ModerationType: profile.ModerationType,
169 ContentWarnings: profile.ContentWarnings,
170 MemberCount: profile.MemberCount,
171 SubscriberCount: profile.SubscriberCount,
172 FederatedFrom: profile.FederatedFrom,
173 FederatedID: profile.FederatedID,
174 CreatedAt: profile.CreatedAt,
175 UpdatedAt: time.Now(),
176 RecordURI: uri,
177 RecordCID: commit.CID,
178 }
179
180 // Handle blobs (avatar/banner) if present
181 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
182 community.AvatarCID = avatarCID
183 }
184 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
185 community.BannerCID = bannerCID
186 }
187
188 // Handle description facets (rich text)
189 if profile.DescriptionFacets != nil {
190 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
191 if marshalErr == nil {
192 community.DescriptionFacets = facetsJSON
193 }
194 }
195
196 // Index in AppView database
197 _, err = c.repo.Create(ctx, community)
198 if err != nil {
199 // Check if it already exists (idempotency)
200 if communities.IsConflict(err) {
201 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID)
202 return nil
203 }
204 return fmt.Errorf("failed to index community: %w", err)
205 }
206
207 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID)
208 return nil
209}
210
211// updateCommunity updates an existing community from the firehose
212func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error {
213 if commit.Record == nil {
214 return fmt.Errorf("community profile update event missing record data")
215 }
216
217 // REJECT non-V2 communities (pre-production: no V1 compatibility)
218 if commit.RKey != "self" {
219 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey)
220 }
221
222 // Parse profile
223 profile, err := parseCommunityProfile(commit.Record)
224 if err != nil {
225 return fmt.Errorf("failed to parse community profile: %w", err)
226 }
227
228 // V2: Repository DID IS the community DID
229 // Get existing community using the repo DID
230 existing, err := c.repo.GetByDID(ctx, did)
231 if err != nil {
232 if communities.IsNotFound(err) {
233 // Community doesn't exist yet - treat as create
234 log.Printf("Community not found for update, creating: %s", did)
235 return c.createCommunity(ctx, did, commit)
236 }
237 return fmt.Errorf("failed to get existing community: %w", err)
238 }
239
240 // Update fields
241 existing.Handle = profile.Handle
242 existing.Name = profile.Name
243 existing.DisplayName = profile.DisplayName
244 existing.Description = profile.Description
245 existing.Visibility = profile.Visibility
246 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery
247 existing.ModerationType = profile.ModerationType
248 existing.ContentWarnings = profile.ContentWarnings
249 existing.RecordCID = commit.CID
250
251 // Update blobs
252 if avatarCID, ok := extractBlobCID(profile.Avatar); ok {
253 existing.AvatarCID = avatarCID
254 }
255 if bannerCID, ok := extractBlobCID(profile.Banner); ok {
256 existing.BannerCID = bannerCID
257 }
258
259 // Update description facets
260 if profile.DescriptionFacets != nil {
261 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets)
262 if marshalErr == nil {
263 existing.DescriptionFacets = facetsJSON
264 }
265 }
266
267 // Save updates
268 _, err = c.repo.Update(ctx, existing)
269 if err != nil {
270 return fmt.Errorf("failed to update community: %w", err)
271 }
272
273 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID)
274 return nil
275}
276
277// deleteCommunity removes a community from the index
278func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error {
279 err := c.repo.Delete(ctx, did)
280 if err != nil {
281 if communities.IsNotFound(err) {
282 log.Printf("Community already deleted: %s", did)
283 return nil
284 }
285 return fmt.Errorf("failed to delete community: %w", err)
286 }
287
288 log.Printf("Deleted community: %s", did)
289 return nil
290}
291
292// verifyHostedByClaim verifies that the community's hostedBy claim matches the handle domain
293// This prevents malicious instances from claiming to host communities for domains they don't own
294func (c *CommunityEventConsumer) verifyHostedByClaim(ctx context.Context, handle, hostedByDID string) error {
295 // Skip verification in dev mode
296 if c.skipVerification {
297 return nil
298 }
299
300 // Add 15 second overall timeout to prevent slow verification from blocking consumer (PR review feedback)
301 ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
302 defer cancel()
303
304 // Verify hostedByDID is did:web format
305 if !strings.HasPrefix(hostedByDID, "did:web:") {
306 return fmt.Errorf("hostedByDID must use did:web method, got: %s", hostedByDID)
307 }
308
309 // Extract domain from did:web DID
310 hostedByDomain := strings.TrimPrefix(hostedByDID, "did:web:")
311
312 // Extract domain from community handle
313 // Handle format examples:
314 // - "!gaming@coves.social" → domain: "coves.social"
315 // - "gaming.community.coves.social" → domain: "coves.social"
316 handleDomain := extractDomainFromHandle(handle)
317 if handleDomain == "" {
318 return fmt.Errorf("failed to extract domain from handle: %s", handle)
319 }
320
321 // Verify handle domain matches hostedBy domain
322 if handleDomain != hostedByDomain {
323 return fmt.Errorf("handle domain (%s) doesn't match hostedBy domain (%s)", handleDomain, hostedByDomain)
324 }
325
326 // Optional: Verify DID document exists and is valid
327 // This provides cryptographic proof of domain ownership
328 if err := c.verifyDIDDocument(ctx, hostedByDID, hostedByDomain); err != nil {
329 // Soft-fail: Log warning but don't reject the community
330 // This allows operation during network issues or .well-known misconfiguration
331 log.Printf("⚠️ WARNING: DID document verification failed for %s: %v", hostedByDomain, err)
332 log.Printf(" Community will be indexed, but hostedBy claim cannot be cryptographically verified")
333 }
334
335 return nil
336}
337
338// verifyDIDDocument fetches and validates the DID document from .well-known/did.json
339// This provides cryptographic proof that the instance controls the domain
340// Results are cached with TTL and rate-limited to prevent DoS attacks
341func (c *CommunityEventConsumer) verifyDIDDocument(ctx context.Context, did, domain string) error {
342 // Skip verification in dev mode
343 if c.skipVerification {
344 return nil
345 }
346
347 // Check bounded LRU cache first (thread-safe, no locks needed)
348 if cached, ok := c.didCache.Get(did); ok {
349 // Check if cache entry is still valid (not expired)
350 if time.Now().Before(cached.expiresAt) {
351 if !cached.valid {
352 return fmt.Errorf("cached verification failure for %s", did)
353 }
354 log.Printf("✓ DID document verification (cached): %s", domain)
355 return nil
356 }
357 // Cache entry expired - remove it to free up space for fresh entries
358 c.didCache.Remove(did)
359 }
360
361 // Rate limit .well-known fetches to prevent DoS
362 if err := c.wellKnownLimiter.Wait(ctx); err != nil {
363 return fmt.Errorf("rate limit exceeded for .well-known fetch: %w", err)
364 }
365
366 // Construct .well-known URL
367 didDocURL := fmt.Sprintf("https://%s/.well-known/did.json", domain)
368
369 // Create HTTP request with timeout
370 req, err := http.NewRequestWithContext(ctx, "GET", didDocURL, nil)
371 if err != nil {
372 // Cache the failure
373 c.cacheVerificationResult(did, false, 5*time.Minute)
374 return fmt.Errorf("failed to create request: %w", err)
375 }
376
377 // Fetch DID document using shared HTTP client
378 resp, err := c.httpClient.Do(req)
379 if err != nil {
380 // Cache the failure (shorter TTL for network errors)
381 c.cacheVerificationResult(did, false, 5*time.Minute)
382 return fmt.Errorf("failed to fetch DID document from %s: %w", didDocURL, err)
383 }
384 defer func() {
385 if closeErr := resp.Body.Close(); closeErr != nil {
386 log.Printf("Failed to close response body: %v", closeErr)
387 }
388 }()
389
390 // Verify HTTP status
391 if resp.StatusCode != http.StatusOK {
392 // Cache the failure
393 c.cacheVerificationResult(did, false, 5*time.Minute)
394 return fmt.Errorf("DID document returned HTTP %d from %s", resp.StatusCode, didDocURL)
395 }
396
397 // Parse DID document
398 var didDoc struct {
399 ID string `json:"id"`
400 }
401 if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil {
402 // Cache the failure
403 c.cacheVerificationResult(did, false, 5*time.Minute)
404 return fmt.Errorf("failed to parse DID document JSON: %w", err)
405 }
406
407 // Verify DID document ID matches claimed DID
408 if didDoc.ID != did {
409 // Cache the failure
410 c.cacheVerificationResult(did, false, 5*time.Minute)
411 return fmt.Errorf("DID document ID (%s) doesn't match claimed DID (%s)", didDoc.ID, did)
412 }
413
414 // Cache the success (1 hour TTL)
415 c.cacheVerificationResult(did, true, 1*time.Hour)
416
417 log.Printf("✓ DID document verified: %s", domain)
418 return nil
419}
420
421// cacheVerificationResult stores a verification result in the bounded LRU cache with the given TTL
422// The LRU cache is thread-safe and automatically evicts least-recently-used entries when full
423func (c *CommunityEventConsumer) cacheVerificationResult(did string, valid bool, ttl time.Duration) {
424 c.didCache.Add(did, cachedDIDDoc{
425 valid: valid,
426 expiresAt: time.Now().Add(ttl),
427 })
428}
429
430// extractDomainFromHandle extracts the registrable domain from a community handle
431// Handles both formats:
432// - Bluesky-style: "!gaming@coves.social" → "coves.social"
433// - DNS-style: "gaming.community.coves.social" → "coves.social"
434//
435// Uses golang.org/x/net/publicsuffix to correctly handle multi-part TLDs:
436// - "gaming.community.coves.co.uk" → "coves.co.uk" (not "co.uk")
437// - "gaming.community.example.com.au" → "example.com.au" (not "com.au")
438func extractDomainFromHandle(handle string) string {
439 // Remove leading ! if present
440 handle = strings.TrimPrefix(handle, "!")
441
442 // Check for @-separated format (e.g., "gaming@coves.social")
443 if strings.Contains(handle, "@") {
444 parts := strings.Split(handle, "@")
445 if len(parts) == 2 {
446 domain := parts[1]
447 // Validate and extract eTLD+1 from the @-domain part
448 registrable, err := publicsuffix.EffectiveTLDPlusOne(domain)
449 if err != nil {
450 // If publicsuffix fails, fall back to returning the full domain part
451 // This handles edge cases like localhost, IP addresses, etc.
452 return domain
453 }
454 return registrable
455 }
456 return ""
457 }
458
459 // For DNS-style handles (e.g., "gaming.community.coves.social")
460 // Extract the registrable domain (eTLD+1) using publicsuffix
461 // This correctly handles multi-part TLDs like .co.uk, .com.au, etc.
462 registrable, err := publicsuffix.EffectiveTLDPlusOne(handle)
463 if err != nil {
464 // If publicsuffix fails (e.g., invalid TLD, localhost, IP address)
465 // fall back to naive extraction (last 2 parts)
466 // This maintains backward compatibility for edge cases
467 parts := strings.Split(handle, ".")
468 if len(parts) < 2 {
469 return "" // Invalid handle
470 }
471 return strings.Join(parts[len(parts)-2:], ".")
472 }
473
474 return registrable
475}
476
477// handleSubscription processes subscription create/delete events
478// CREATE operation = user subscribed to community
479// DELETE operation = user unsubscribed from community
480func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
481 switch commit.Operation {
482 case "create":
483 return c.createSubscription(ctx, userDID, commit)
484 case "delete":
485 return c.deleteSubscription(ctx, userDID, commit)
486 default:
487 // Update operations shouldn't happen on subscriptions, but ignore gracefully
488 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)",
489 commit.Operation, userDID, commit.RKey)
490 return nil
491 }
492}
493
494// createSubscription indexes a new subscription with retry logic
495func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
496 if commit.Record == nil {
497 return fmt.Errorf("subscription create event missing record data")
498 }
499
500 // Extract community DID from record's subject field (following atProto conventions)
501 communityDID, ok := commit.Record["subject"].(string)
502 if !ok {
503 return fmt.Errorf("subscription record missing subject field")
504 }
505
506 // Extract contentVisibility with clamping and default value
507 contentVisibility := extractContentVisibility(commit.Record)
508
509 // Build AT-URI for subscription record
510 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint
511 // The record lives in the USER's repository, but uses the communities namespace
512 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
513
514 // Create subscription entity
515 // Parse createdAt from record to preserve chronological ordering during replays
516 subscription := &communities.Subscription{
517 UserDID: userDID,
518 CommunityDID: communityDID,
519 ContentVisibility: contentVisibility,
520 SubscribedAt: utils.ParseCreatedAt(commit.Record),
521 RecordURI: uri,
522 RecordCID: commit.CID,
523 }
524
525 // Use transactional method to ensure subscription and count are atomically updated
526 // This is idempotent - safe for Jetstream replays
527 _, err := c.repo.SubscribeWithCount(ctx, subscription)
528 if err != nil {
529 // If already exists, that's fine (idempotency)
530 if communities.IsConflict(err) {
531 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)",
532 userDID, communityDID, contentVisibility)
533 return nil
534 }
535 return fmt.Errorf("failed to index subscription: %w", err)
536 }
537
538 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)",
539 userDID, communityDID, contentVisibility)
540 return nil
541}
542
543// deleteSubscription removes a subscription from the index
544// DELETE operations don't include record data, so we need to look up the subscription
545// by its URI to find which community the user unsubscribed from
546func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error {
547 // Build AT-URI from the rkey
548 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
549
550 // Look up the subscription to get the community DID
551 // (DELETE operations don't include record data in Jetstream)
552 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri)
553 if err != nil {
554 if communities.IsNotFound(err) {
555 // Already deleted - this is fine (idempotency)
556 log.Printf("Subscription already deleted: %s", uri)
557 return nil
558 }
559 return fmt.Errorf("failed to find subscription for deletion: %w", err)
560 }
561
562 // Use transactional method to ensure unsubscribe and count are atomically updated
563 // This is idempotent - safe for Jetstream replays
564 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID)
565 if err != nil {
566 if communities.IsNotFound(err) {
567 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID)
568 return nil
569 }
570 return fmt.Errorf("failed to remove subscription: %w", err)
571 }
572
573 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID)
574 return nil
575}
576
577// handleBlock processes block create/delete events
578// CREATE operation = user blocked a community
579// DELETE operation = user unblocked a community
580func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
581 switch commit.Operation {
582 case "create":
583 return c.createBlock(ctx, userDID, commit)
584 case "delete":
585 return c.deleteBlock(ctx, userDID, commit)
586 default:
587 // Update operations shouldn't happen on blocks, but ignore gracefully
588 log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)",
589 commit.Operation, userDID, commit.RKey)
590 return nil
591 }
592}
593
594// createBlock indexes a new block
595func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
596 if commit.Record == nil {
597 return fmt.Errorf("block create event missing record data")
598 }
599
600 // Extract community DID from record's subject field (following atProto conventions)
601 communityDID, ok := commit.Record["subject"].(string)
602 if !ok {
603 return fmt.Errorf("block record missing subject field")
604 }
605
606 // Build AT-URI for block record
607 // The record lives in the USER's repository
608 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
609
610 // Create block entity
611 // Parse createdAt from record to preserve chronological ordering during replays
612 block := &communities.CommunityBlock{
613 UserDID: userDID,
614 CommunityDID: communityDID,
615 BlockedAt: utils.ParseCreatedAt(commit.Record),
616 RecordURI: uri,
617 RecordCID: commit.CID,
618 }
619
620 // Index the block
621 // This is idempotent - safe for Jetstream replays
622 _, err := c.repo.BlockCommunity(ctx, block)
623 if err != nil {
624 // If already exists, that's fine (idempotency)
625 if communities.IsConflict(err) {
626 log.Printf("Block already indexed: %s -> %s", userDID, communityDID)
627 return nil
628 }
629 return fmt.Errorf("failed to index block: %w", err)
630 }
631
632 log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID)
633 return nil
634}
635
636// deleteBlock removes a block from the index
637// DELETE operations don't include record data, so we need to look up the block
638// by its URI to find which community the user unblocked
639func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
640 // Build AT-URI from the rkey
641 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
642
643 // Look up the block to get the community DID
644 // (DELETE operations don't include record data in Jetstream)
645 block, err := c.repo.GetBlockByURI(ctx, uri)
646 if err != nil {
647 if communities.IsNotFound(err) {
648 // Already deleted - this is fine (idempotency)
649 log.Printf("Block already deleted: %s", uri)
650 return nil
651 }
652 return fmt.Errorf("failed to find block for deletion: %w", err)
653 }
654
655 // Remove the block from the index
656 err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID)
657 if err != nil {
658 if communities.IsNotFound(err) {
659 log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID)
660 return nil
661 }
662 return fmt.Errorf("failed to remove block: %w", err)
663 }
664
665 log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID)
666 return nil
667}
668
669// Helper types and functions
670
671type CommunityProfile struct {
672 CreatedAt time.Time `json:"createdAt"`
673 Avatar map[string]interface{} `json:"avatar"`
674 Banner map[string]interface{} `json:"banner"`
675 CreatedBy string `json:"createdBy"`
676 Visibility string `json:"visibility"`
677 AtprotoHandle string `json:"atprotoHandle"`
678 DisplayName string `json:"displayName"`
679 Name string `json:"name"`
680 Handle string `json:"handle"`
681 HostedBy string `json:"hostedBy"`
682 Description string `json:"description"`
683 FederatedID string `json:"federatedId"`
684 ModerationType string `json:"moderationType"`
685 FederatedFrom string `json:"federatedFrom"`
686 ContentWarnings []string `json:"contentWarnings"`
687 DescriptionFacets []interface{} `json:"descriptionFacets"`
688 MemberCount int `json:"memberCount"`
689 SubscriberCount int `json:"subscriberCount"`
690 Federation FederationConfig `json:"federation"`
691}
692
693type FederationConfig struct {
694 AllowExternalDiscovery bool `json:"allowExternalDiscovery"`
695}
696
697// parseCommunityProfile converts a raw record map to a CommunityProfile
698func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) {
699 recordJSON, err := json.Marshal(record)
700 if err != nil {
701 return nil, fmt.Errorf("failed to marshal record: %w", err)
702 }
703
704 var profile CommunityProfile
705 if err := json.Unmarshal(recordJSON, &profile); err != nil {
706 return nil, fmt.Errorf("failed to unmarshal profile: %w", err)
707 }
708
709 return &profile, nil
710}
711
712// extractContentVisibility extracts contentVisibility from subscription record with clamping
713// Returns default value of 3 if missing or invalid
714func extractContentVisibility(record map[string]interface{}) int {
715 const defaultVisibility = 3
716
717 cv, ok := record["contentVisibility"]
718 if !ok {
719 // Field missing - use default
720 return defaultVisibility
721 }
722
723 // JSON numbers decode as float64
724 cvFloat, ok := cv.(float64)
725 if !ok {
726 // Try int (shouldn't happen but handle gracefully)
727 if cvInt, isInt := cv.(int); isInt {
728 return clampContentVisibility(cvInt)
729 }
730 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv)
731 return defaultVisibility
732 }
733
734 // Convert and clamp
735 clamped := clampContentVisibility(int(cvFloat))
736 if clamped != int(cvFloat) {
737 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped)
738 }
739 return clamped
740}
741
742// clampContentVisibility ensures value is within valid range (1-5)
743func clampContentVisibility(value int) int {
744 if value < 1 {
745 return 1
746 }
747 if value > 5 {
748 return 5
749 }
750 return value
751}
752
753// extractBlobCID extracts the CID from a blob reference
754// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123}
755func extractBlobCID(blob map[string]interface{}) (string, bool) {
756 if blob == nil {
757 return "", false
758 }
759
760 // Check if it's a blob type
761 blobType, ok := blob["$type"].(string)
762 if !ok || blobType != "blob" {
763 return "", false
764 }
765
766 // Extract ref
767 ref, ok := blob["ref"].(map[string]interface{})
768 if !ok {
769 return "", false
770 }
771
772 // Extract $link (the CID)
773 link, ok := ref["$link"].(string)
774 if !ok {
775 return "", false
776 }
777
778 return link, true
779}