A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "net/http" 9 "strings" 10 "time" 11 12 "Coves/internal/atproto/identity" 13 "Coves/internal/atproto/utils" 14 "Coves/internal/core/communities" 15 16 lru "github.com/hashicorp/golang-lru/v2" 17 "golang.org/x/net/publicsuffix" 18 "golang.org/x/time/rate" 19) 20 21// CommunityEventConsumer consumes community-related events from Jetstream 22type CommunityEventConsumer struct { 23 repo communities.Repository // Repository for community operations 24 identityResolver interface { 25 Resolve(context.Context, string) (*identity.Identity, error) 26 } // For resolving handles from DIDs 27 httpClient *http.Client // Shared HTTP client with connection pooling 28 didCache *lru.Cache[string, cachedDIDDoc] // Bounded LRU cache for .well-known verification results 29 wellKnownLimiter *rate.Limiter // Rate limiter for .well-known fetches 30 instanceDID string // DID of this Coves instance 31 skipVerification bool // Skip did:web verification (for dev mode) 32} 33 34// cachedDIDDoc represents a cached verification result with expiration 35type cachedDIDDoc struct { 36 expiresAt time.Time // When this cache entry expires 37 valid bool // Whether verification passed 38} 39 40// NewCommunityEventConsumer creates a new Jetstream consumer for community events 41// instanceDID: The DID of this Coves instance (for hostedBy verification) 42// skipVerification: Skip did:web verification (for dev mode) 43// identityResolver: Optional resolver for resolving handles from DIDs (can be nil for tests) 44func NewCommunityEventConsumer(repo communities.Repository, instanceDID string, skipVerification bool, identityResolver interface { 45 Resolve(context.Context, string) (*identity.Identity, error) 46}, 47) *CommunityEventConsumer { 48 // Create bounded LRU cache for DID document verification results 49 // Max 1000 entries to prevent unbounded memory growth (PR review feedback) 50 // Each entry ~100 bytes → max ~100KB memory overhead 51 cache, err := lru.New[string, cachedDIDDoc](1000) 52 if err != nil { 53 // This should never happen with a valid size, but handle gracefully 54 log.Printf("WARNING: Failed to create DID cache, verification will be slower: %v", err) 55 // Create minimal cache to avoid nil pointer 56 cache, _ = lru.New[string, cachedDIDDoc](1) 57 } 58 59 return &CommunityEventConsumer{ 60 repo: repo, 61 identityResolver: identityResolver, // Optional - can be nil for tests 62 instanceDID: instanceDID, 63 skipVerification: skipVerification, 64 // Shared HTTP client with connection pooling for .well-known fetches 65 httpClient: &http.Client{ 66 Timeout: 10 * time.Second, 67 Transport: &http.Transport{ 68 MaxIdleConns: 100, 69 MaxIdleConnsPerHost: 10, 70 IdleConnTimeout: 90 * time.Second, 71 }, 72 }, 73 // Bounded LRU cache for .well-known verification results (max 1000 entries) 74 // Automatically evicts least-recently-used entries when full 75 didCache: cache, 76 // Rate limiter: 10 requests per second, burst of 20 77 // Prevents DoS via excessive .well-known fetches 78 wellKnownLimiter: rate.NewLimiter(10, 20), 79 } 80} 81 82// HandleEvent processes a Jetstream event for community records 83// This is called by the main Jetstream consumer when it receives commit events 84func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 85 // We only care about commit events for community records 86 if event.Kind != "commit" || event.Commit == nil { 87 return nil 88 } 89 90 commit := event.Commit 91 92 // Route to appropriate handler based on collection 93 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures 94 // - social.coves.community.profile: Community profile records (in community's own repo) 95 // - social.coves.community.subscription: Subscription records (in user's repo) 96 // - social.coves.community.block: Block records (in user's repo) 97 // 98 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints 99 // that CREATE or DELETE records in these collections 100 switch commit.Collection { 101 case "social.coves.community.profile": 102 return c.handleCommunityProfile(ctx, event.Did, commit) 103 case "social.coves.community.subscription": 104 // Handle both create (subscribe) and delete (unsubscribe) operations 105 return c.handleSubscription(ctx, event.Did, commit) 106 case "social.coves.community.block": 107 // Handle both create (block) and delete (unblock) operations 108 return c.handleBlock(ctx, event.Did, commit) 109 default: 110 // Not a community-related collection 111 return nil 112 } 113} 114 115// handleCommunityProfile processes community profile create/update/delete events 116func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error { 117 switch commit.Operation { 118 case "create": 119 return c.createCommunity(ctx, did, commit) 120 case "update": 121 return c.updateCommunity(ctx, did, commit) 122 case "delete": 123 return c.deleteCommunity(ctx, did) 124 default: 125 log.Printf("Unknown operation for community profile: %s", commit.Operation) 126 return nil 127 } 128} 129 130// createCommunity indexes a new community from the firehose 131func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error { 132 if commit.Record == nil { 133 return fmt.Errorf("community profile create event missing record data") 134 } 135 136 // Parse the community profile record 137 profile, err := parseCommunityProfile(commit.Record) 138 if err != nil { 139 return fmt.Errorf("failed to parse community profile: %w", err) 140 } 141 142 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs) 143 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID 144 if profile.Handle == "" { 145 if c.identityResolver != nil { 146 // Production: Resolve handle from PLC (source of truth) 147 // NO FALLBACK - if PLC is down, we fail and backfill later 148 // This prevents creating communities with incorrect handles in federated scenarios 149 identity, err := c.identityResolver.Resolve(ctx, did) 150 if err != nil { 151 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err) 152 } 153 profile.Handle = identity.Handle 154 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)", 155 profile.Handle, did, identity.Method) 156 } else { 157 // Test mode only: construct deterministically when no resolver available 158 profile.Handle = constructHandleFromProfile(profile) 159 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)", 160 profile.Handle, profile.Name, profile.HostedBy) 161 } 162 } 163 164 // SECURITY: Verify hostedBy claim matches handle domain 165 // This prevents malicious instances from claiming to host communities for domains they don't own 166 if err := c.verifyHostedByClaim(ctx, profile.Handle, profile.HostedBy); err != nil { 167 log.Printf("🚨 SECURITY: Rejecting community %s - hostedBy verification failed: %v", did, err) 168 log.Printf(" Handle: %s, HostedBy: %s", profile.Handle, profile.HostedBy) 169 return fmt.Errorf("hostedBy verification failed: %w", err) 170 } 171 172 // Build AT-URI for this record 173 // V2 Architecture (ONLY): 174 // - 'did' parameter IS the community DID (community owns its own repo) 175 // - rkey MUST be "self" for community profiles 176 // - URI: at://community_did/social.coves.community.profile/self 177 178 // REJECT non-V2 communities (pre-production: no V1 compatibility) 179 if commit.RKey != "self" { 180 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 181 } 182 183 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did) 184 185 // V2: Community ALWAYS owns itself 186 ownerDID := did 187 188 // Create community entity 189 community := &communities.Community{ 190 DID: did, // V2: Repository DID IS the community DID 191 Handle: profile.Handle, 192 Name: profile.Name, 193 DisplayName: profile.DisplayName, 194 Description: profile.Description, 195 OwnerDID: ownerDID, // V2: same as DID (self-owned) 196 CreatedByDID: profile.CreatedBy, 197 HostedByDID: profile.HostedBy, 198 Visibility: profile.Visibility, 199 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery, 200 ModerationType: profile.ModerationType, 201 ContentWarnings: profile.ContentWarnings, 202 MemberCount: profile.MemberCount, 203 SubscriberCount: profile.SubscriberCount, 204 FederatedFrom: profile.FederatedFrom, 205 FederatedID: profile.FederatedID, 206 CreatedAt: profile.CreatedAt, 207 UpdatedAt: time.Now(), 208 RecordURI: uri, 209 RecordCID: commit.CID, 210 } 211 212 // Handle blobs (avatar/banner) if present 213 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 214 community.AvatarCID = avatarCID 215 } 216 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 217 community.BannerCID = bannerCID 218 } 219 220 // Handle description facets (rich text) 221 if profile.DescriptionFacets != nil { 222 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 223 if marshalErr == nil { 224 community.DescriptionFacets = facetsJSON 225 } 226 } 227 228 // Index in AppView database 229 _, err = c.repo.Create(ctx, community) 230 if err != nil { 231 // Check if it already exists (idempotency) 232 if communities.IsConflict(err) { 233 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID) 234 return nil 235 } 236 return fmt.Errorf("failed to index community: %w", err) 237 } 238 239 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID) 240 return nil 241} 242 243// updateCommunity updates an existing community from the firehose 244func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error { 245 if commit.Record == nil { 246 return fmt.Errorf("community profile update event missing record data") 247 } 248 249 // REJECT non-V2 communities (pre-production: no V1 compatibility) 250 if commit.RKey != "self" { 251 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 252 } 253 254 // Parse profile 255 profile, err := parseCommunityProfile(commit.Record) 256 if err != nil { 257 return fmt.Errorf("failed to parse community profile: %w", err) 258 } 259 260 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs) 261 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID 262 if profile.Handle == "" { 263 if c.identityResolver != nil { 264 // Production: Resolve handle from PLC (source of truth) 265 // NO FALLBACK - if PLC is down, we fail and backfill later 266 // This prevents creating communities with incorrect handles in federated scenarios 267 identity, err := c.identityResolver.Resolve(ctx, did) 268 if err != nil { 269 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err) 270 } 271 profile.Handle = identity.Handle 272 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)", 273 profile.Handle, did, identity.Method) 274 } else { 275 // Test mode only: construct deterministically when no resolver available 276 profile.Handle = constructHandleFromProfile(profile) 277 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)", 278 profile.Handle, profile.Name, profile.HostedBy) 279 } 280 } 281 282 // V2: Repository DID IS the community DID 283 // Get existing community using the repo DID 284 existing, err := c.repo.GetByDID(ctx, did) 285 if err != nil { 286 if communities.IsNotFound(err) { 287 // Community doesn't exist yet - treat as create 288 log.Printf("Community not found for update, creating: %s", did) 289 return c.createCommunity(ctx, did, commit) 290 } 291 return fmt.Errorf("failed to get existing community: %w", err) 292 } 293 294 // Update fields 295 existing.Handle = profile.Handle 296 existing.Name = profile.Name 297 existing.DisplayName = profile.DisplayName 298 existing.Description = profile.Description 299 existing.Visibility = profile.Visibility 300 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery 301 existing.ModerationType = profile.ModerationType 302 existing.ContentWarnings = profile.ContentWarnings 303 existing.RecordCID = commit.CID 304 305 // Update blobs 306 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 307 existing.AvatarCID = avatarCID 308 } 309 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 310 existing.BannerCID = bannerCID 311 } 312 313 // Update description facets 314 if profile.DescriptionFacets != nil { 315 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 316 if marshalErr == nil { 317 existing.DescriptionFacets = facetsJSON 318 } 319 } 320 321 // Save updates 322 _, err = c.repo.Update(ctx, existing) 323 if err != nil { 324 return fmt.Errorf("failed to update community: %w", err) 325 } 326 327 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID) 328 return nil 329} 330 331// deleteCommunity removes a community from the index 332func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error { 333 err := c.repo.Delete(ctx, did) 334 if err != nil { 335 if communities.IsNotFound(err) { 336 log.Printf("Community already deleted: %s", did) 337 return nil 338 } 339 return fmt.Errorf("failed to delete community: %w", err) 340 } 341 342 log.Printf("Deleted community: %s", did) 343 return nil 344} 345 346// verifyHostedByClaim verifies that the community's hostedBy claim matches the handle domain 347// This prevents malicious instances from claiming to host communities for domains they don't own 348func (c *CommunityEventConsumer) verifyHostedByClaim(ctx context.Context, handle, hostedByDID string) error { 349 // Skip verification in dev mode 350 if c.skipVerification { 351 return nil 352 } 353 354 // Add 15 second overall timeout to prevent slow verification from blocking consumer (PR review feedback) 355 ctx, cancel := context.WithTimeout(ctx, 15*time.Second) 356 defer cancel() 357 358 // Verify hostedByDID is did:web format 359 if !strings.HasPrefix(hostedByDID, "did:web:") { 360 return fmt.Errorf("hostedByDID must use did:web method, got: %s", hostedByDID) 361 } 362 363 // Extract domain from did:web DID 364 hostedByDomain := strings.TrimPrefix(hostedByDID, "did:web:") 365 366 // Extract domain from community handle 367 // Handle format examples: 368 // - "!gaming@coves.social" → domain: "coves.social" 369 // - "gaming.community.coves.social" → domain: "coves.social" 370 handleDomain := extractDomainFromHandle(handle) 371 if handleDomain == "" { 372 return fmt.Errorf("failed to extract domain from handle: %s", handle) 373 } 374 375 // Verify handle domain matches hostedBy domain 376 if handleDomain != hostedByDomain { 377 return fmt.Errorf("handle domain (%s) doesn't match hostedBy domain (%s)", handleDomain, hostedByDomain) 378 } 379 380 // SECURITY: Verify DID document exists and is valid (Bluesky-compatible security model) 381 // MANDATORY bidirectional verification: DID document must claim this handle in alsoKnownAs 382 // This matches Bluesky's security requirements and prevents domain impersonation 383 if err := c.verifyDIDDocument(ctx, hostedByDID, hostedByDomain, handle); err != nil { 384 log.Printf("🚨 SECURITY: Rejecting community - bidirectional DID verification failed: %v", err) 385 return fmt.Errorf("bidirectional DID verification required: %w", err) 386 } 387 388 return nil 389} 390 391// verifyDIDDocument fetches and validates the DID document from .well-known/did.json 392// Implements Bluesky's bidirectional verification model: 393// 1. Verify DID document exists at https://domain/.well-known/did.json 394// 2. Verify DID document ID matches claimed DID 395// 3. Verify DID document claims the handle in alsoKnownAs field 396// 397// Results are cached with TTL and rate-limited to prevent DoS attacks 398func (c *CommunityEventConsumer) verifyDIDDocument(ctx context.Context, did, domain, handle string) error { 399 // Skip verification in dev mode 400 if c.skipVerification { 401 return nil 402 } 403 404 // Check bounded LRU cache first (thread-safe, no locks needed) 405 if cached, ok := c.didCache.Get(did); ok { 406 // Check if cache entry is still valid (not expired) 407 if time.Now().Before(cached.expiresAt) { 408 if !cached.valid { 409 return fmt.Errorf("cached verification failure for %s", did) 410 } 411 log.Printf("✓ DID document verification (cached): %s", domain) 412 return nil 413 } 414 // Cache entry expired - remove it to free up space for fresh entries 415 c.didCache.Remove(did) 416 } 417 418 // Rate limit .well-known fetches to prevent DoS 419 if err := c.wellKnownLimiter.Wait(ctx); err != nil { 420 return fmt.Errorf("rate limit exceeded for .well-known fetch: %w", err) 421 } 422 423 // Construct .well-known URL 424 didDocURL := fmt.Sprintf("https://%s/.well-known/did.json", domain) 425 426 // Create HTTP request with timeout 427 req, err := http.NewRequestWithContext(ctx, "GET", didDocURL, nil) 428 if err != nil { 429 // Cache the failure 430 c.cacheVerificationResult(did, false, 5*time.Minute) 431 return fmt.Errorf("failed to create request: %w", err) 432 } 433 434 // Fetch DID document using shared HTTP client 435 resp, err := c.httpClient.Do(req) 436 if err != nil { 437 // Cache the failure (shorter TTL for network errors) 438 c.cacheVerificationResult(did, false, 5*time.Minute) 439 return fmt.Errorf("failed to fetch DID document from %s: %w", didDocURL, err) 440 } 441 defer func() { 442 if closeErr := resp.Body.Close(); closeErr != nil { 443 log.Printf("Failed to close response body: %v", closeErr) 444 } 445 }() 446 447 // Verify HTTP status 448 if resp.StatusCode != http.StatusOK { 449 // Cache the failure 450 c.cacheVerificationResult(did, false, 5*time.Minute) 451 return fmt.Errorf("DID document returned HTTP %d from %s", resp.StatusCode, didDocURL) 452 } 453 454 // Parse DID document 455 var didDoc struct { 456 ID string `json:"id"` 457 AlsoKnownAs []string `json:"alsoKnownAs"` 458 } 459 if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil { 460 // Cache the failure 461 c.cacheVerificationResult(did, false, 5*time.Minute) 462 return fmt.Errorf("failed to parse DID document JSON: %w", err) 463 } 464 465 // Verify DID document ID matches claimed DID 466 if didDoc.ID != did { 467 // Cache the failure 468 c.cacheVerificationResult(did, false, 5*time.Minute) 469 return fmt.Errorf("DID document ID (%s) doesn't match claimed DID (%s)", didDoc.ID, did) 470 } 471 472 // SECURITY: Bidirectional verification - DID document must claim this handle 473 // Prevents impersonation where someone points DNS to another user's DID 474 // Format: handle "coves.social" or "!community@coves.social" → check for "at://coves.social" 475 handleDomain := extractDomainFromHandle(handle) 476 expectedAlias := fmt.Sprintf("at://%s", handleDomain) 477 478 found := false 479 for _, alias := range didDoc.AlsoKnownAs { 480 if alias == expectedAlias { 481 found = true 482 break 483 } 484 } 485 486 if !found { 487 // Cache the failure 488 c.cacheVerificationResult(did, false, 5*time.Minute) 489 return fmt.Errorf("DID document does not claim handle domain %s in alsoKnownAs (expected %s, got %v)", 490 handleDomain, expectedAlias, didDoc.AlsoKnownAs) 491 } 492 493 // Cache the success (24 hour TTL - matches Bluesky recommendations) 494 c.cacheVerificationResult(did, true, 24*time.Hour) 495 496 log.Printf("✓ DID document verified: %s", domain) 497 return nil 498} 499 500// cacheVerificationResult stores a verification result in the bounded LRU cache with the given TTL 501// The LRU cache is thread-safe and automatically evicts least-recently-used entries when full 502func (c *CommunityEventConsumer) cacheVerificationResult(did string, valid bool, ttl time.Duration) { 503 c.didCache.Add(did, cachedDIDDoc{ 504 valid: valid, 505 expiresAt: time.Now().Add(ttl), 506 }) 507} 508 509// extractDomainFromHandle extracts the registrable domain from a community handle 510// Handles both formats: 511// - Bluesky-style: "!gaming@coves.social" → "coves.social" 512// - DNS-style: "gaming.community.coves.social" → "coves.social" 513// 514// Uses golang.org/x/net/publicsuffix to correctly handle multi-part TLDs: 515// - "gaming.community.coves.co.uk" → "coves.co.uk" (not "co.uk") 516// - "gaming.community.example.com.au" → "example.com.au" (not "com.au") 517func extractDomainFromHandle(handle string) string { 518 // Remove leading ! if present 519 handle = strings.TrimPrefix(handle, "!") 520 521 // Check for @-separated format (e.g., "gaming@coves.social") 522 if strings.Contains(handle, "@") { 523 parts := strings.Split(handle, "@") 524 if len(parts) == 2 { 525 domain := parts[1] 526 // Validate and extract eTLD+1 from the @-domain part 527 registrable, err := publicsuffix.EffectiveTLDPlusOne(domain) 528 if err != nil { 529 // If publicsuffix fails, fall back to returning the full domain part 530 // This handles edge cases like localhost, IP addresses, etc. 531 return domain 532 } 533 return registrable 534 } 535 return "" 536 } 537 538 // For DNS-style handles (e.g., "gaming.community.coves.social") 539 // Extract the registrable domain (eTLD+1) using publicsuffix 540 // This correctly handles multi-part TLDs like .co.uk, .com.au, etc. 541 registrable, err := publicsuffix.EffectiveTLDPlusOne(handle) 542 if err != nil { 543 // If publicsuffix fails (e.g., invalid TLD, localhost, IP address) 544 // fall back to naive extraction (last 2 parts) 545 // This maintains backward compatibility for edge cases 546 parts := strings.Split(handle, ".") 547 if len(parts) < 2 { 548 return "" // Invalid handle 549 } 550 return strings.Join(parts[len(parts)-2:], ".") 551 } 552 553 return registrable 554} 555 556// handleSubscription processes subscription create/delete events 557// CREATE operation = user subscribed to community 558// DELETE operation = user unsubscribed from community 559func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 560 switch commit.Operation { 561 case "create": 562 return c.createSubscription(ctx, userDID, commit) 563 case "delete": 564 return c.deleteSubscription(ctx, userDID, commit) 565 default: 566 // Update operations shouldn't happen on subscriptions, but ignore gracefully 567 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)", 568 commit.Operation, userDID, commit.RKey) 569 return nil 570 } 571} 572 573// createSubscription indexes a new subscription with retry logic 574func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 575 if commit.Record == nil { 576 return fmt.Errorf("subscription create event missing record data") 577 } 578 579 // Extract community DID from record's subject field (following atProto conventions) 580 communityDID, ok := commit.Record["subject"].(string) 581 if !ok { 582 return fmt.Errorf("subscription record missing subject field") 583 } 584 585 // Extract contentVisibility with clamping and default value 586 contentVisibility := extractContentVisibility(commit.Record) 587 588 // Build AT-URI for subscription record 589 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint 590 // The record lives in the USER's repository, but uses the communities namespace 591 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 592 593 // Create subscription entity 594 // Parse createdAt from record to preserve chronological ordering during replays 595 subscription := &communities.Subscription{ 596 UserDID: userDID, 597 CommunityDID: communityDID, 598 ContentVisibility: contentVisibility, 599 SubscribedAt: utils.ParseCreatedAt(commit.Record), 600 RecordURI: uri, 601 RecordCID: commit.CID, 602 } 603 604 // Use transactional method to ensure subscription and count are atomically updated 605 // This is idempotent - safe for Jetstream replays 606 _, err := c.repo.SubscribeWithCount(ctx, subscription) 607 if err != nil { 608 // If already exists, that's fine (idempotency) 609 if communities.IsConflict(err) { 610 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)", 611 userDID, communityDID, contentVisibility) 612 return nil 613 } 614 return fmt.Errorf("failed to index subscription: %w", err) 615 } 616 617 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)", 618 userDID, communityDID, contentVisibility) 619 return nil 620} 621 622// deleteSubscription removes a subscription from the index 623// DELETE operations don't include record data, so we need to look up the subscription 624// by its URI to find which community the user unsubscribed from 625func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 626 // Build AT-URI from the rkey 627 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 628 629 // Look up the subscription to get the community DID 630 // (DELETE operations don't include record data in Jetstream) 631 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri) 632 if err != nil { 633 if communities.IsNotFound(err) { 634 // Already deleted - this is fine (idempotency) 635 log.Printf("Subscription already deleted: %s", uri) 636 return nil 637 } 638 return fmt.Errorf("failed to find subscription for deletion: %w", err) 639 } 640 641 // Use transactional method to ensure unsubscribe and count are atomically updated 642 // This is idempotent - safe for Jetstream replays 643 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID) 644 if err != nil { 645 if communities.IsNotFound(err) { 646 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID) 647 return nil 648 } 649 return fmt.Errorf("failed to remove subscription: %w", err) 650 } 651 652 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID) 653 return nil 654} 655 656// handleBlock processes block create/delete events 657// CREATE operation = user blocked a community 658// DELETE operation = user unblocked a community 659func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 660 switch commit.Operation { 661 case "create": 662 return c.createBlock(ctx, userDID, commit) 663 case "delete": 664 return c.deleteBlock(ctx, userDID, commit) 665 default: 666 // Update operations shouldn't happen on blocks, but ignore gracefully 667 log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)", 668 commit.Operation, userDID, commit.RKey) 669 return nil 670 } 671} 672 673// createBlock indexes a new block 674func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 675 if commit.Record == nil { 676 return fmt.Errorf("block create event missing record data") 677 } 678 679 // Extract community DID from record's subject field (following atProto conventions) 680 communityDID, ok := commit.Record["subject"].(string) 681 if !ok { 682 return fmt.Errorf("block record missing subject field") 683 } 684 685 // Build AT-URI for block record 686 // The record lives in the USER's repository 687 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 688 689 // Create block entity 690 // Parse createdAt from record to preserve chronological ordering during replays 691 block := &communities.CommunityBlock{ 692 UserDID: userDID, 693 CommunityDID: communityDID, 694 BlockedAt: utils.ParseCreatedAt(commit.Record), 695 RecordURI: uri, 696 RecordCID: commit.CID, 697 } 698 699 // Index the block 700 // This is idempotent - safe for Jetstream replays 701 _, err := c.repo.BlockCommunity(ctx, block) 702 if err != nil { 703 // If already exists, that's fine (idempotency) 704 if communities.IsConflict(err) { 705 log.Printf("Block already indexed: %s -> %s", userDID, communityDID) 706 return nil 707 } 708 return fmt.Errorf("failed to index block: %w", err) 709 } 710 711 log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID) 712 return nil 713} 714 715// deleteBlock removes a block from the index 716// DELETE operations don't include record data, so we need to look up the block 717// by its URI to find which community the user unblocked 718func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 719 // Build AT-URI from the rkey 720 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 721 722 // Look up the block to get the community DID 723 // (DELETE operations don't include record data in Jetstream) 724 block, err := c.repo.GetBlockByURI(ctx, uri) 725 if err != nil { 726 if communities.IsNotFound(err) { 727 // Already deleted - this is fine (idempotency) 728 log.Printf("Block already deleted: %s", uri) 729 return nil 730 } 731 return fmt.Errorf("failed to find block for deletion: %w", err) 732 } 733 734 // Remove the block from the index 735 err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID) 736 if err != nil { 737 if communities.IsNotFound(err) { 738 log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID) 739 return nil 740 } 741 return fmt.Errorf("failed to remove block: %w", err) 742 } 743 744 log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID) 745 return nil 746} 747 748// Helper types and functions 749 750type CommunityProfile struct { 751 CreatedAt time.Time `json:"createdAt"` 752 Avatar map[string]interface{} `json:"avatar"` 753 Banner map[string]interface{} `json:"banner"` 754 CreatedBy string `json:"createdBy"` 755 Visibility string `json:"visibility"` 756 AtprotoHandle string `json:"atprotoHandle"` 757 DisplayName string `json:"displayName"` 758 Name string `json:"name"` 759 Handle string `json:"handle"` 760 HostedBy string `json:"hostedBy"` 761 Description string `json:"description"` 762 FederatedID string `json:"federatedId"` 763 ModerationType string `json:"moderationType"` 764 FederatedFrom string `json:"federatedFrom"` 765 ContentWarnings []string `json:"contentWarnings"` 766 DescriptionFacets []interface{} `json:"descriptionFacets"` 767 MemberCount int `json:"memberCount"` 768 SubscriberCount int `json:"subscriberCount"` 769 Federation FederationConfig `json:"federation"` 770} 771 772type FederationConfig struct { 773 AllowExternalDiscovery bool `json:"allowExternalDiscovery"` 774} 775 776// parseCommunityProfile converts a raw record map to a CommunityProfile 777func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) { 778 recordJSON, err := json.Marshal(record) 779 if err != nil { 780 return nil, fmt.Errorf("failed to marshal record: %w", err) 781 } 782 783 var profile CommunityProfile 784 if err := json.Unmarshal(recordJSON, &profile); err != nil { 785 return nil, fmt.Errorf("failed to unmarshal profile: %w", err) 786 } 787 788 return &profile, nil 789} 790 791// constructHandleFromProfile constructs a deterministic handle from profile data 792// Format: {name}.community.{instanceDomain} 793// Example: gaming.community.coves.social 794// This is ONLY used in test mode (when identity resolver is nil) 795// Production MUST resolve handles from PLC (source of truth) 796// Returns empty string if hostedBy is not did:web format (caller will fail validation) 797func constructHandleFromProfile(profile *CommunityProfile) string { 798 if !strings.HasPrefix(profile.HostedBy, "did:web:") { 799 // hostedBy must be did:web format for handle construction 800 // Return empty to trigger validation error in repository 801 return "" 802 } 803 instanceDomain := strings.TrimPrefix(profile.HostedBy, "did:web:") 804 return fmt.Sprintf("%s.community.%s", profile.Name, instanceDomain) 805} 806 807// extractContentVisibility extracts contentVisibility from subscription record with clamping 808// Returns default value of 3 if missing or invalid 809func extractContentVisibility(record map[string]interface{}) int { 810 const defaultVisibility = 3 811 812 cv, ok := record["contentVisibility"] 813 if !ok { 814 // Field missing - use default 815 return defaultVisibility 816 } 817 818 // JSON numbers decode as float64 819 cvFloat, ok := cv.(float64) 820 if !ok { 821 // Try int (shouldn't happen but handle gracefully) 822 if cvInt, isInt := cv.(int); isInt { 823 return clampContentVisibility(cvInt) 824 } 825 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv) 826 return defaultVisibility 827 } 828 829 // Convert and clamp 830 clamped := clampContentVisibility(int(cvFloat)) 831 if clamped != int(cvFloat) { 832 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped) 833 } 834 return clamped 835} 836 837// clampContentVisibility ensures value is within valid range (1-5) 838func clampContentVisibility(value int) int { 839 if value < 1 { 840 return 1 841 } 842 if value > 5 { 843 return 5 844 } 845 return value 846} 847 848// extractBlobCID extracts the CID from a blob reference 849// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123} 850func extractBlobCID(blob map[string]interface{}) (string, bool) { 851 if blob == nil { 852 return "", false 853 } 854 855 // Check if it's a blob type 856 blobType, ok := blob["$type"].(string) 857 if !ok || blobType != "blob" { 858 return "", false 859 } 860 861 // Extract ref 862 ref, ok := blob["ref"].(map[string]interface{}) 863 if !ok { 864 return "", false 865 } 866 867 // Extract $link (the CID) 868 link, ok := ref["$link"].(string) 869 if !ok { 870 return "", false 871 } 872 873 return link, true 874}