A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/atproto/utils" 5 "Coves/internal/core/communities" 6 "context" 7 "encoding/json" 8 "fmt" 9 "log" 10 "net/http" 11 "strings" 12 "time" 13 14 lru "github.com/hashicorp/golang-lru/v2" 15 "golang.org/x/net/publicsuffix" 16 "golang.org/x/time/rate" 17) 18 19// CommunityEventConsumer consumes community-related events from Jetstream 20type CommunityEventConsumer struct { 21 repo communities.Repository // Repository for community operations 22 httpClient *http.Client // Shared HTTP client with connection pooling 23 didCache *lru.Cache[string, cachedDIDDoc] // Bounded LRU cache for .well-known verification results 24 wellKnownLimiter *rate.Limiter // Rate limiter for .well-known fetches 25 instanceDID string // DID of this Coves instance 26 skipVerification bool // Skip did:web verification (for dev mode) 27} 28 29// cachedDIDDoc represents a cached verification result with expiration 30type cachedDIDDoc struct { 31 expiresAt time.Time // When this cache entry expires 32 valid bool // Whether verification passed 33} 34 35// NewCommunityEventConsumer creates a new Jetstream consumer for community events 36// instanceDID: The DID of this Coves instance (for hostedBy verification) 37// skipVerification: Skip did:web verification (for dev mode) 38func NewCommunityEventConsumer(repo communities.Repository, instanceDID string, skipVerification bool) *CommunityEventConsumer { 39 // Create bounded LRU cache for DID document verification results 40 // Max 1000 entries to prevent unbounded memory growth (PR review feedback) 41 // Each entry ~100 bytes → max ~100KB memory overhead 42 cache, err := lru.New[string, cachedDIDDoc](1000) 43 if err != nil { 44 // This should never happen with a valid size, but handle gracefully 45 log.Printf("WARNING: Failed to create DID cache, verification will be slower: %v", err) 46 // Create minimal cache to avoid nil pointer 47 cache, _ = lru.New[string, cachedDIDDoc](1) 48 } 49 50 return &CommunityEventConsumer{ 51 repo: repo, 52 instanceDID: instanceDID, 53 skipVerification: skipVerification, 54 // Shared HTTP client with connection pooling for .well-known fetches 55 httpClient: &http.Client{ 56 Timeout: 10 * time.Second, 57 Transport: &http.Transport{ 58 MaxIdleConns: 100, 59 MaxIdleConnsPerHost: 10, 60 IdleConnTimeout: 90 * time.Second, 61 }, 62 }, 63 // Bounded LRU cache for .well-known verification results (max 1000 entries) 64 // Automatically evicts least-recently-used entries when full 65 didCache: cache, 66 // Rate limiter: 10 requests per second, burst of 20 67 // Prevents DoS via excessive .well-known fetches 68 wellKnownLimiter: rate.NewLimiter(10, 20), 69 } 70} 71 72// HandleEvent processes a Jetstream event for community records 73// This is called by the main Jetstream consumer when it receives commit events 74func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 75 // We only care about commit events for community records 76 if event.Kind != "commit" || event.Commit == nil { 77 return nil 78 } 79 80 commit := event.Commit 81 82 // Route to appropriate handler based on collection 83 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures 84 // - social.coves.community.profile: Community profile records (in community's own repo) 85 // - social.coves.community.subscription: Subscription records (in user's repo) 86 // - social.coves.community.block: Block records (in user's repo) 87 // 88 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints 89 // that CREATE or DELETE records in these collections 90 switch commit.Collection { 91 case "social.coves.community.profile": 92 return c.handleCommunityProfile(ctx, event.Did, commit) 93 case "social.coves.community.subscription": 94 // Handle both create (subscribe) and delete (unsubscribe) operations 95 return c.handleSubscription(ctx, event.Did, commit) 96 case "social.coves.community.block": 97 // Handle both create (block) and delete (unblock) operations 98 return c.handleBlock(ctx, event.Did, commit) 99 default: 100 // Not a community-related collection 101 return nil 102 } 103} 104 105// handleCommunityProfile processes community profile create/update/delete events 106func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error { 107 switch commit.Operation { 108 case "create": 109 return c.createCommunity(ctx, did, commit) 110 case "update": 111 return c.updateCommunity(ctx, did, commit) 112 case "delete": 113 return c.deleteCommunity(ctx, did) 114 default: 115 log.Printf("Unknown operation for community profile: %s", commit.Operation) 116 return nil 117 } 118} 119 120// createCommunity indexes a new community from the firehose 121func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error { 122 if commit.Record == nil { 123 return fmt.Errorf("community profile create event missing record data") 124 } 125 126 // Parse the community profile record 127 profile, err := parseCommunityProfile(commit.Record) 128 if err != nil { 129 return fmt.Errorf("failed to parse community profile: %w", err) 130 } 131 132 // SECURITY: Verify hostedBy claim matches handle domain 133 // This prevents malicious instances from claiming to host communities for domains they don't own 134 if err := c.verifyHostedByClaim(ctx, profile.Handle, profile.HostedBy); err != nil { 135 log.Printf("🚨 SECURITY: Rejecting community %s - hostedBy verification failed: %v", did, err) 136 log.Printf(" Handle: %s, HostedBy: %s", profile.Handle, profile.HostedBy) 137 return fmt.Errorf("hostedBy verification failed: %w", err) 138 } 139 140 // Build AT-URI for this record 141 // V2 Architecture (ONLY): 142 // - 'did' parameter IS the community DID (community owns its own repo) 143 // - rkey MUST be "self" for community profiles 144 // - URI: at://community_did/social.coves.community.profile/self 145 146 // REJECT non-V2 communities (pre-production: no V1 compatibility) 147 if commit.RKey != "self" { 148 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 149 } 150 151 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did) 152 153 // V2: Community ALWAYS owns itself 154 ownerDID := did 155 156 // Create community entity 157 community := &communities.Community{ 158 DID: did, // V2: Repository DID IS the community DID 159 Handle: profile.Handle, 160 Name: profile.Name, 161 DisplayName: profile.DisplayName, 162 Description: profile.Description, 163 OwnerDID: ownerDID, // V2: same as DID (self-owned) 164 CreatedByDID: profile.CreatedBy, 165 HostedByDID: profile.HostedBy, 166 Visibility: profile.Visibility, 167 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery, 168 ModerationType: profile.ModerationType, 169 ContentWarnings: profile.ContentWarnings, 170 MemberCount: profile.MemberCount, 171 SubscriberCount: profile.SubscriberCount, 172 FederatedFrom: profile.FederatedFrom, 173 FederatedID: profile.FederatedID, 174 CreatedAt: profile.CreatedAt, 175 UpdatedAt: time.Now(), 176 RecordURI: uri, 177 RecordCID: commit.CID, 178 } 179 180 // Handle blobs (avatar/banner) if present 181 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 182 community.AvatarCID = avatarCID 183 } 184 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 185 community.BannerCID = bannerCID 186 } 187 188 // Handle description facets (rich text) 189 if profile.DescriptionFacets != nil { 190 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 191 if marshalErr == nil { 192 community.DescriptionFacets = facetsJSON 193 } 194 } 195 196 // Index in AppView database 197 _, err = c.repo.Create(ctx, community) 198 if err != nil { 199 // Check if it already exists (idempotency) 200 if communities.IsConflict(err) { 201 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID) 202 return nil 203 } 204 return fmt.Errorf("failed to index community: %w", err) 205 } 206 207 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID) 208 return nil 209} 210 211// updateCommunity updates an existing community from the firehose 212func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error { 213 if commit.Record == nil { 214 return fmt.Errorf("community profile update event missing record data") 215 } 216 217 // REJECT non-V2 communities (pre-production: no V1 compatibility) 218 if commit.RKey != "self" { 219 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 220 } 221 222 // Parse profile 223 profile, err := parseCommunityProfile(commit.Record) 224 if err != nil { 225 return fmt.Errorf("failed to parse community profile: %w", err) 226 } 227 228 // V2: Repository DID IS the community DID 229 // Get existing community using the repo DID 230 existing, err := c.repo.GetByDID(ctx, did) 231 if err != nil { 232 if communities.IsNotFound(err) { 233 // Community doesn't exist yet - treat as create 234 log.Printf("Community not found for update, creating: %s", did) 235 return c.createCommunity(ctx, did, commit) 236 } 237 return fmt.Errorf("failed to get existing community: %w", err) 238 } 239 240 // Update fields 241 existing.Handle = profile.Handle 242 existing.Name = profile.Name 243 existing.DisplayName = profile.DisplayName 244 existing.Description = profile.Description 245 existing.Visibility = profile.Visibility 246 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery 247 existing.ModerationType = profile.ModerationType 248 existing.ContentWarnings = profile.ContentWarnings 249 existing.RecordCID = commit.CID 250 251 // Update blobs 252 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 253 existing.AvatarCID = avatarCID 254 } 255 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 256 existing.BannerCID = bannerCID 257 } 258 259 // Update description facets 260 if profile.DescriptionFacets != nil { 261 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 262 if marshalErr == nil { 263 existing.DescriptionFacets = facetsJSON 264 } 265 } 266 267 // Save updates 268 _, err = c.repo.Update(ctx, existing) 269 if err != nil { 270 return fmt.Errorf("failed to update community: %w", err) 271 } 272 273 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID) 274 return nil 275} 276 277// deleteCommunity removes a community from the index 278func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error { 279 err := c.repo.Delete(ctx, did) 280 if err != nil { 281 if communities.IsNotFound(err) { 282 log.Printf("Community already deleted: %s", did) 283 return nil 284 } 285 return fmt.Errorf("failed to delete community: %w", err) 286 } 287 288 log.Printf("Deleted community: %s", did) 289 return nil 290} 291 292// verifyHostedByClaim verifies that the community's hostedBy claim matches the handle domain 293// This prevents malicious instances from claiming to host communities for domains they don't own 294func (c *CommunityEventConsumer) verifyHostedByClaim(ctx context.Context, handle, hostedByDID string) error { 295 // Skip verification in dev mode 296 if c.skipVerification { 297 return nil 298 } 299 300 // Add 15 second overall timeout to prevent slow verification from blocking consumer (PR review feedback) 301 ctx, cancel := context.WithTimeout(ctx, 15*time.Second) 302 defer cancel() 303 304 // Verify hostedByDID is did:web format 305 if !strings.HasPrefix(hostedByDID, "did:web:") { 306 return fmt.Errorf("hostedByDID must use did:web method, got: %s", hostedByDID) 307 } 308 309 // Extract domain from did:web DID 310 hostedByDomain := strings.TrimPrefix(hostedByDID, "did:web:") 311 312 // Extract domain from community handle 313 // Handle format examples: 314 // - "!gaming@coves.social" → domain: "coves.social" 315 // - "gaming.community.coves.social" → domain: "coves.social" 316 handleDomain := extractDomainFromHandle(handle) 317 if handleDomain == "" { 318 return fmt.Errorf("failed to extract domain from handle: %s", handle) 319 } 320 321 // Verify handle domain matches hostedBy domain 322 if handleDomain != hostedByDomain { 323 return fmt.Errorf("handle domain (%s) doesn't match hostedBy domain (%s)", handleDomain, hostedByDomain) 324 } 325 326 // Optional: Verify DID document exists and is valid 327 // This provides cryptographic proof of domain ownership 328 if err := c.verifyDIDDocument(ctx, hostedByDID, hostedByDomain); err != nil { 329 // Soft-fail: Log warning but don't reject the community 330 // This allows operation during network issues or .well-known misconfiguration 331 log.Printf("⚠️ WARNING: DID document verification failed for %s: %v", hostedByDomain, err) 332 log.Printf(" Community will be indexed, but hostedBy claim cannot be cryptographically verified") 333 } 334 335 return nil 336} 337 338// verifyDIDDocument fetches and validates the DID document from .well-known/did.json 339// This provides cryptographic proof that the instance controls the domain 340// Results are cached with TTL and rate-limited to prevent DoS attacks 341func (c *CommunityEventConsumer) verifyDIDDocument(ctx context.Context, did, domain string) error { 342 // Skip verification in dev mode 343 if c.skipVerification { 344 return nil 345 } 346 347 // Check bounded LRU cache first (thread-safe, no locks needed) 348 if cached, ok := c.didCache.Get(did); ok { 349 // Check if cache entry is still valid (not expired) 350 if time.Now().Before(cached.expiresAt) { 351 if !cached.valid { 352 return fmt.Errorf("cached verification failure for %s", did) 353 } 354 log.Printf("✓ DID document verification (cached): %s", domain) 355 return nil 356 } 357 // Cache entry expired - remove it to free up space for fresh entries 358 c.didCache.Remove(did) 359 } 360 361 // Rate limit .well-known fetches to prevent DoS 362 if err := c.wellKnownLimiter.Wait(ctx); err != nil { 363 return fmt.Errorf("rate limit exceeded for .well-known fetch: %w", err) 364 } 365 366 // Construct .well-known URL 367 didDocURL := fmt.Sprintf("https://%s/.well-known/did.json", domain) 368 369 // Create HTTP request with timeout 370 req, err := http.NewRequestWithContext(ctx, "GET", didDocURL, nil) 371 if err != nil { 372 // Cache the failure 373 c.cacheVerificationResult(did, false, 5*time.Minute) 374 return fmt.Errorf("failed to create request: %w", err) 375 } 376 377 // Fetch DID document using shared HTTP client 378 resp, err := c.httpClient.Do(req) 379 if err != nil { 380 // Cache the failure (shorter TTL for network errors) 381 c.cacheVerificationResult(did, false, 5*time.Minute) 382 return fmt.Errorf("failed to fetch DID document from %s: %w", didDocURL, err) 383 } 384 defer func() { 385 if closeErr := resp.Body.Close(); closeErr != nil { 386 log.Printf("Failed to close response body: %v", closeErr) 387 } 388 }() 389 390 // Verify HTTP status 391 if resp.StatusCode != http.StatusOK { 392 // Cache the failure 393 c.cacheVerificationResult(did, false, 5*time.Minute) 394 return fmt.Errorf("DID document returned HTTP %d from %s", resp.StatusCode, didDocURL) 395 } 396 397 // Parse DID document 398 var didDoc struct { 399 ID string `json:"id"` 400 } 401 if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil { 402 // Cache the failure 403 c.cacheVerificationResult(did, false, 5*time.Minute) 404 return fmt.Errorf("failed to parse DID document JSON: %w", err) 405 } 406 407 // Verify DID document ID matches claimed DID 408 if didDoc.ID != did { 409 // Cache the failure 410 c.cacheVerificationResult(did, false, 5*time.Minute) 411 return fmt.Errorf("DID document ID (%s) doesn't match claimed DID (%s)", didDoc.ID, did) 412 } 413 414 // Cache the success (1 hour TTL) 415 c.cacheVerificationResult(did, true, 1*time.Hour) 416 417 log.Printf("✓ DID document verified: %s", domain) 418 return nil 419} 420 421// cacheVerificationResult stores a verification result in the bounded LRU cache with the given TTL 422// The LRU cache is thread-safe and automatically evicts least-recently-used entries when full 423func (c *CommunityEventConsumer) cacheVerificationResult(did string, valid bool, ttl time.Duration) { 424 c.didCache.Add(did, cachedDIDDoc{ 425 valid: valid, 426 expiresAt: time.Now().Add(ttl), 427 }) 428} 429 430// extractDomainFromHandle extracts the registrable domain from a community handle 431// Handles both formats: 432// - Bluesky-style: "!gaming@coves.social" → "coves.social" 433// - DNS-style: "gaming.community.coves.social" → "coves.social" 434// 435// Uses golang.org/x/net/publicsuffix to correctly handle multi-part TLDs: 436// - "gaming.community.coves.co.uk" → "coves.co.uk" (not "co.uk") 437// - "gaming.community.example.com.au" → "example.com.au" (not "com.au") 438func extractDomainFromHandle(handle string) string { 439 // Remove leading ! if present 440 handle = strings.TrimPrefix(handle, "!") 441 442 // Check for @-separated format (e.g., "gaming@coves.social") 443 if strings.Contains(handle, "@") { 444 parts := strings.Split(handle, "@") 445 if len(parts) == 2 { 446 domain := parts[1] 447 // Validate and extract eTLD+1 from the @-domain part 448 registrable, err := publicsuffix.EffectiveTLDPlusOne(domain) 449 if err != nil { 450 // If publicsuffix fails, fall back to returning the full domain part 451 // This handles edge cases like localhost, IP addresses, etc. 452 return domain 453 } 454 return registrable 455 } 456 return "" 457 } 458 459 // For DNS-style handles (e.g., "gaming.community.coves.social") 460 // Extract the registrable domain (eTLD+1) using publicsuffix 461 // This correctly handles multi-part TLDs like .co.uk, .com.au, etc. 462 registrable, err := publicsuffix.EffectiveTLDPlusOne(handle) 463 if err != nil { 464 // If publicsuffix fails (e.g., invalid TLD, localhost, IP address) 465 // fall back to naive extraction (last 2 parts) 466 // This maintains backward compatibility for edge cases 467 parts := strings.Split(handle, ".") 468 if len(parts) < 2 { 469 return "" // Invalid handle 470 } 471 return strings.Join(parts[len(parts)-2:], ".") 472 } 473 474 return registrable 475} 476 477// handleSubscription processes subscription create/delete events 478// CREATE operation = user subscribed to community 479// DELETE operation = user unsubscribed from community 480func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 481 switch commit.Operation { 482 case "create": 483 return c.createSubscription(ctx, userDID, commit) 484 case "delete": 485 return c.deleteSubscription(ctx, userDID, commit) 486 default: 487 // Update operations shouldn't happen on subscriptions, but ignore gracefully 488 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)", 489 commit.Operation, userDID, commit.RKey) 490 return nil 491 } 492} 493 494// createSubscription indexes a new subscription with retry logic 495func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 496 if commit.Record == nil { 497 return fmt.Errorf("subscription create event missing record data") 498 } 499 500 // Extract community DID from record's subject field (following atProto conventions) 501 communityDID, ok := commit.Record["subject"].(string) 502 if !ok { 503 return fmt.Errorf("subscription record missing subject field") 504 } 505 506 // Extract contentVisibility with clamping and default value 507 contentVisibility := extractContentVisibility(commit.Record) 508 509 // Build AT-URI for subscription record 510 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint 511 // The record lives in the USER's repository, but uses the communities namespace 512 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 513 514 // Create subscription entity 515 // Parse createdAt from record to preserve chronological ordering during replays 516 subscription := &communities.Subscription{ 517 UserDID: userDID, 518 CommunityDID: communityDID, 519 ContentVisibility: contentVisibility, 520 SubscribedAt: utils.ParseCreatedAt(commit.Record), 521 RecordURI: uri, 522 RecordCID: commit.CID, 523 } 524 525 // Use transactional method to ensure subscription and count are atomically updated 526 // This is idempotent - safe for Jetstream replays 527 _, err := c.repo.SubscribeWithCount(ctx, subscription) 528 if err != nil { 529 // If already exists, that's fine (idempotency) 530 if communities.IsConflict(err) { 531 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)", 532 userDID, communityDID, contentVisibility) 533 return nil 534 } 535 return fmt.Errorf("failed to index subscription: %w", err) 536 } 537 538 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)", 539 userDID, communityDID, contentVisibility) 540 return nil 541} 542 543// deleteSubscription removes a subscription from the index 544// DELETE operations don't include record data, so we need to look up the subscription 545// by its URI to find which community the user unsubscribed from 546func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 547 // Build AT-URI from the rkey 548 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 549 550 // Look up the subscription to get the community DID 551 // (DELETE operations don't include record data in Jetstream) 552 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri) 553 if err != nil { 554 if communities.IsNotFound(err) { 555 // Already deleted - this is fine (idempotency) 556 log.Printf("Subscription already deleted: %s", uri) 557 return nil 558 } 559 return fmt.Errorf("failed to find subscription for deletion: %w", err) 560 } 561 562 // Use transactional method to ensure unsubscribe and count are atomically updated 563 // This is idempotent - safe for Jetstream replays 564 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID) 565 if err != nil { 566 if communities.IsNotFound(err) { 567 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID) 568 return nil 569 } 570 return fmt.Errorf("failed to remove subscription: %w", err) 571 } 572 573 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID) 574 return nil 575} 576 577// handleBlock processes block create/delete events 578// CREATE operation = user blocked a community 579// DELETE operation = user unblocked a community 580func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 581 switch commit.Operation { 582 case "create": 583 return c.createBlock(ctx, userDID, commit) 584 case "delete": 585 return c.deleteBlock(ctx, userDID, commit) 586 default: 587 // Update operations shouldn't happen on blocks, but ignore gracefully 588 log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)", 589 commit.Operation, userDID, commit.RKey) 590 return nil 591 } 592} 593 594// createBlock indexes a new block 595func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 596 if commit.Record == nil { 597 return fmt.Errorf("block create event missing record data") 598 } 599 600 // Extract community DID from record's subject field (following atProto conventions) 601 communityDID, ok := commit.Record["subject"].(string) 602 if !ok { 603 return fmt.Errorf("block record missing subject field") 604 } 605 606 // Build AT-URI for block record 607 // The record lives in the USER's repository 608 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 609 610 // Create block entity 611 // Parse createdAt from record to preserve chronological ordering during replays 612 block := &communities.CommunityBlock{ 613 UserDID: userDID, 614 CommunityDID: communityDID, 615 BlockedAt: utils.ParseCreatedAt(commit.Record), 616 RecordURI: uri, 617 RecordCID: commit.CID, 618 } 619 620 // Index the block 621 // This is idempotent - safe for Jetstream replays 622 _, err := c.repo.BlockCommunity(ctx, block) 623 if err != nil { 624 // If already exists, that's fine (idempotency) 625 if communities.IsConflict(err) { 626 log.Printf("Block already indexed: %s -> %s", userDID, communityDID) 627 return nil 628 } 629 return fmt.Errorf("failed to index block: %w", err) 630 } 631 632 log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID) 633 return nil 634} 635 636// deleteBlock removes a block from the index 637// DELETE operations don't include record data, so we need to look up the block 638// by its URI to find which community the user unblocked 639func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 640 // Build AT-URI from the rkey 641 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 642 643 // Look up the block to get the community DID 644 // (DELETE operations don't include record data in Jetstream) 645 block, err := c.repo.GetBlockByURI(ctx, uri) 646 if err != nil { 647 if communities.IsNotFound(err) { 648 // Already deleted - this is fine (idempotency) 649 log.Printf("Block already deleted: %s", uri) 650 return nil 651 } 652 return fmt.Errorf("failed to find block for deletion: %w", err) 653 } 654 655 // Remove the block from the index 656 err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID) 657 if err != nil { 658 if communities.IsNotFound(err) { 659 log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID) 660 return nil 661 } 662 return fmt.Errorf("failed to remove block: %w", err) 663 } 664 665 log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID) 666 return nil 667} 668 669// Helper types and functions 670 671type CommunityProfile struct { 672 CreatedAt time.Time `json:"createdAt"` 673 Avatar map[string]interface{} `json:"avatar"` 674 Banner map[string]interface{} `json:"banner"` 675 CreatedBy string `json:"createdBy"` 676 Visibility string `json:"visibility"` 677 AtprotoHandle string `json:"atprotoHandle"` 678 DisplayName string `json:"displayName"` 679 Name string `json:"name"` 680 Handle string `json:"handle"` 681 HostedBy string `json:"hostedBy"` 682 Description string `json:"description"` 683 FederatedID string `json:"federatedId"` 684 ModerationType string `json:"moderationType"` 685 FederatedFrom string `json:"federatedFrom"` 686 ContentWarnings []string `json:"contentWarnings"` 687 DescriptionFacets []interface{} `json:"descriptionFacets"` 688 MemberCount int `json:"memberCount"` 689 SubscriberCount int `json:"subscriberCount"` 690 Federation FederationConfig `json:"federation"` 691} 692 693type FederationConfig struct { 694 AllowExternalDiscovery bool `json:"allowExternalDiscovery"` 695} 696 697// parseCommunityProfile converts a raw record map to a CommunityProfile 698func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) { 699 recordJSON, err := json.Marshal(record) 700 if err != nil { 701 return nil, fmt.Errorf("failed to marshal record: %w", err) 702 } 703 704 var profile CommunityProfile 705 if err := json.Unmarshal(recordJSON, &profile); err != nil { 706 return nil, fmt.Errorf("failed to unmarshal profile: %w", err) 707 } 708 709 return &profile, nil 710} 711 712// extractContentVisibility extracts contentVisibility from subscription record with clamping 713// Returns default value of 3 if missing or invalid 714func extractContentVisibility(record map[string]interface{}) int { 715 const defaultVisibility = 3 716 717 cv, ok := record["contentVisibility"] 718 if !ok { 719 // Field missing - use default 720 return defaultVisibility 721 } 722 723 // JSON numbers decode as float64 724 cvFloat, ok := cv.(float64) 725 if !ok { 726 // Try int (shouldn't happen but handle gracefully) 727 if cvInt, isInt := cv.(int); isInt { 728 return clampContentVisibility(cvInt) 729 } 730 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv) 731 return defaultVisibility 732 } 733 734 // Convert and clamp 735 clamped := clampContentVisibility(int(cvFloat)) 736 if clamped != int(cvFloat) { 737 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped) 738 } 739 return clamped 740} 741 742// clampContentVisibility ensures value is within valid range (1-5) 743func clampContentVisibility(value int) int { 744 if value < 1 { 745 return 1 746 } 747 if value > 5 { 748 return 5 749 } 750 return value 751} 752 753// extractBlobCID extracts the CID from a blob reference 754// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123} 755func extractBlobCID(blob map[string]interface{}) (string, bool) { 756 if blob == nil { 757 return "", false 758 } 759 760 // Check if it's a blob type 761 blobType, ok := blob["$type"].(string) 762 if !ok || blobType != "blob" { 763 return "", false 764 } 765 766 // Extract ref 767 ref, ok := blob["ref"].(map[string]interface{}) 768 if !ok { 769 return "", false 770 } 771 772 // Extract $link (the CID) 773 link, ok := ref["$link"].(string) 774 if !ok { 775 return "", false 776 } 777 778 return link, true 779}