A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/atproto/identity" 5 "Coves/internal/atproto/utils" 6 "Coves/internal/core/communities" 7 "context" 8 "encoding/json" 9 "fmt" 10 "log" 11 "net/http" 12 "strings" 13 "time" 14 15 lru "github.com/hashicorp/golang-lru/v2" 16 "golang.org/x/net/publicsuffix" 17 "golang.org/x/time/rate" 18) 19 20// CommunityEventConsumer consumes community-related events from Jetstream 21type CommunityEventConsumer struct { 22 repo communities.Repository // Repository for community operations 23 identityResolver interface { 24 Resolve(context.Context, string) (*identity.Identity, error) 25 } // For resolving handles from DIDs 26 httpClient *http.Client // Shared HTTP client with connection pooling 27 didCache *lru.Cache[string, cachedDIDDoc] // Bounded LRU cache for .well-known verification results 28 wellKnownLimiter *rate.Limiter // Rate limiter for .well-known fetches 29 instanceDID string // DID of this Coves instance 30 skipVerification bool // Skip did:web verification (for dev mode) 31} 32 33// cachedDIDDoc represents a cached verification result with expiration 34type cachedDIDDoc struct { 35 expiresAt time.Time // When this cache entry expires 36 valid bool // Whether verification passed 37} 38 39// NewCommunityEventConsumer creates a new Jetstream consumer for community events 40// instanceDID: The DID of this Coves instance (for hostedBy verification) 41// skipVerification: Skip did:web verification (for dev mode) 42// identityResolver: Optional resolver for resolving handles from DIDs (can be nil for tests) 43func NewCommunityEventConsumer(repo communities.Repository, instanceDID string, skipVerification bool, identityResolver interface { 44 Resolve(context.Context, string) (*identity.Identity, error) 45}, 46) *CommunityEventConsumer { 47 // Create bounded LRU cache for DID document verification results 48 // Max 1000 entries to prevent unbounded memory growth (PR review feedback) 49 // Each entry ~100 bytes → max ~100KB memory overhead 50 cache, err := lru.New[string, cachedDIDDoc](1000) 51 if err != nil { 52 // This should never happen with a valid size, but handle gracefully 53 log.Printf("WARNING: Failed to create DID cache, verification will be slower: %v", err) 54 // Create minimal cache to avoid nil pointer 55 cache, _ = lru.New[string, cachedDIDDoc](1) 56 } 57 58 return &CommunityEventConsumer{ 59 repo: repo, 60 identityResolver: identityResolver, // Optional - can be nil for tests 61 instanceDID: instanceDID, 62 skipVerification: skipVerification, 63 // Shared HTTP client with connection pooling for .well-known fetches 64 httpClient: &http.Client{ 65 Timeout: 10 * time.Second, 66 Transport: &http.Transport{ 67 MaxIdleConns: 100, 68 MaxIdleConnsPerHost: 10, 69 IdleConnTimeout: 90 * time.Second, 70 }, 71 }, 72 // Bounded LRU cache for .well-known verification results (max 1000 entries) 73 // Automatically evicts least-recently-used entries when full 74 didCache: cache, 75 // Rate limiter: 10 requests per second, burst of 20 76 // Prevents DoS via excessive .well-known fetches 77 wellKnownLimiter: rate.NewLimiter(10, 20), 78 } 79} 80 81// HandleEvent processes a Jetstream event for community records 82// This is called by the main Jetstream consumer when it receives commit events 83func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 84 // We only care about commit events for community records 85 if event.Kind != "commit" || event.Commit == nil { 86 return nil 87 } 88 89 commit := event.Commit 90 91 // Route to appropriate handler based on collection 92 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures 93 // - social.coves.community.profile: Community profile records (in community's own repo) 94 // - social.coves.community.subscription: Subscription records (in user's repo) 95 // - social.coves.community.block: Block records (in user's repo) 96 // 97 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints 98 // that CREATE or DELETE records in these collections 99 switch commit.Collection { 100 case "social.coves.community.profile": 101 return c.handleCommunityProfile(ctx, event.Did, commit) 102 case "social.coves.community.subscription": 103 // Handle both create (subscribe) and delete (unsubscribe) operations 104 return c.handleSubscription(ctx, event.Did, commit) 105 case "social.coves.community.block": 106 // Handle both create (block) and delete (unblock) operations 107 return c.handleBlock(ctx, event.Did, commit) 108 default: 109 // Not a community-related collection 110 return nil 111 } 112} 113 114// handleCommunityProfile processes community profile create/update/delete events 115func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error { 116 switch commit.Operation { 117 case "create": 118 return c.createCommunity(ctx, did, commit) 119 case "update": 120 return c.updateCommunity(ctx, did, commit) 121 case "delete": 122 return c.deleteCommunity(ctx, did) 123 default: 124 log.Printf("Unknown operation for community profile: %s", commit.Operation) 125 return nil 126 } 127} 128 129// createCommunity indexes a new community from the firehose 130func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error { 131 if commit.Record == nil { 132 return fmt.Errorf("community profile create event missing record data") 133 } 134 135 // Parse the community profile record 136 profile, err := parseCommunityProfile(commit.Record) 137 if err != nil { 138 return fmt.Errorf("failed to parse community profile: %w", err) 139 } 140 141 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs) 142 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID 143 if profile.Handle == "" { 144 if c.identityResolver != nil { 145 // Production: Resolve handle from PLC (source of truth) 146 // NO FALLBACK - if PLC is down, we fail and backfill later 147 // This prevents creating communities with incorrect handles in federated scenarios 148 identity, err := c.identityResolver.Resolve(ctx, did) 149 if err != nil { 150 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err) 151 } 152 profile.Handle = identity.Handle 153 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)", 154 profile.Handle, did, identity.Method) 155 } else { 156 // Test mode only: construct deterministically when no resolver available 157 profile.Handle = constructHandleFromProfile(profile) 158 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)", 159 profile.Handle, profile.Name, profile.HostedBy) 160 } 161 } 162 163 // SECURITY: Verify hostedBy claim matches handle domain 164 // This prevents malicious instances from claiming to host communities for domains they don't own 165 if err := c.verifyHostedByClaim(ctx, profile.Handle, profile.HostedBy); err != nil { 166 log.Printf("🚨 SECURITY: Rejecting community %s - hostedBy verification failed: %v", did, err) 167 log.Printf(" Handle: %s, HostedBy: %s", profile.Handle, profile.HostedBy) 168 return fmt.Errorf("hostedBy verification failed: %w", err) 169 } 170 171 // Build AT-URI for this record 172 // V2 Architecture (ONLY): 173 // - 'did' parameter IS the community DID (community owns its own repo) 174 // - rkey MUST be "self" for community profiles 175 // - URI: at://community_did/social.coves.community.profile/self 176 177 // REJECT non-V2 communities (pre-production: no V1 compatibility) 178 if commit.RKey != "self" { 179 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 180 } 181 182 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did) 183 184 // V2: Community ALWAYS owns itself 185 ownerDID := did 186 187 // Create community entity 188 community := &communities.Community{ 189 DID: did, // V2: Repository DID IS the community DID 190 Handle: profile.Handle, 191 Name: profile.Name, 192 DisplayName: profile.DisplayName, 193 Description: profile.Description, 194 OwnerDID: ownerDID, // V2: same as DID (self-owned) 195 CreatedByDID: profile.CreatedBy, 196 HostedByDID: profile.HostedBy, 197 Visibility: profile.Visibility, 198 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery, 199 ModerationType: profile.ModerationType, 200 ContentWarnings: profile.ContentWarnings, 201 MemberCount: profile.MemberCount, 202 SubscriberCount: profile.SubscriberCount, 203 FederatedFrom: profile.FederatedFrom, 204 FederatedID: profile.FederatedID, 205 CreatedAt: profile.CreatedAt, 206 UpdatedAt: time.Now(), 207 RecordURI: uri, 208 RecordCID: commit.CID, 209 } 210 211 // Handle blobs (avatar/banner) if present 212 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 213 community.AvatarCID = avatarCID 214 } 215 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 216 community.BannerCID = bannerCID 217 } 218 219 // Handle description facets (rich text) 220 if profile.DescriptionFacets != nil { 221 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 222 if marshalErr == nil { 223 community.DescriptionFacets = facetsJSON 224 } 225 } 226 227 // Index in AppView database 228 _, err = c.repo.Create(ctx, community) 229 if err != nil { 230 // Check if it already exists (idempotency) 231 if communities.IsConflict(err) { 232 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID) 233 return nil 234 } 235 return fmt.Errorf("failed to index community: %w", err) 236 } 237 238 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID) 239 return nil 240} 241 242// updateCommunity updates an existing community from the firehose 243func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error { 244 if commit.Record == nil { 245 return fmt.Errorf("community profile update event missing record data") 246 } 247 248 // REJECT non-V2 communities (pre-production: no V1 compatibility) 249 if commit.RKey != "self" { 250 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 251 } 252 253 // Parse profile 254 profile, err := parseCommunityProfile(commit.Record) 255 if err != nil { 256 return fmt.Errorf("failed to parse community profile: %w", err) 257 } 258 259 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs) 260 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID 261 if profile.Handle == "" { 262 if c.identityResolver != nil { 263 // Production: Resolve handle from PLC (source of truth) 264 // NO FALLBACK - if PLC is down, we fail and backfill later 265 // This prevents creating communities with incorrect handles in federated scenarios 266 identity, err := c.identityResolver.Resolve(ctx, did) 267 if err != nil { 268 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err) 269 } 270 profile.Handle = identity.Handle 271 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)", 272 profile.Handle, did, identity.Method) 273 } else { 274 // Test mode only: construct deterministically when no resolver available 275 profile.Handle = constructHandleFromProfile(profile) 276 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)", 277 profile.Handle, profile.Name, profile.HostedBy) 278 } 279 } 280 281 // V2: Repository DID IS the community DID 282 // Get existing community using the repo DID 283 existing, err := c.repo.GetByDID(ctx, did) 284 if err != nil { 285 if communities.IsNotFound(err) { 286 // Community doesn't exist yet - treat as create 287 log.Printf("Community not found for update, creating: %s", did) 288 return c.createCommunity(ctx, did, commit) 289 } 290 return fmt.Errorf("failed to get existing community: %w", err) 291 } 292 293 // Update fields 294 existing.Handle = profile.Handle 295 existing.Name = profile.Name 296 existing.DisplayName = profile.DisplayName 297 existing.Description = profile.Description 298 existing.Visibility = profile.Visibility 299 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery 300 existing.ModerationType = profile.ModerationType 301 existing.ContentWarnings = profile.ContentWarnings 302 existing.RecordCID = commit.CID 303 304 // Update blobs 305 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 306 existing.AvatarCID = avatarCID 307 } 308 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 309 existing.BannerCID = bannerCID 310 } 311 312 // Update description facets 313 if profile.DescriptionFacets != nil { 314 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 315 if marshalErr == nil { 316 existing.DescriptionFacets = facetsJSON 317 } 318 } 319 320 // Save updates 321 _, err = c.repo.Update(ctx, existing) 322 if err != nil { 323 return fmt.Errorf("failed to update community: %w", err) 324 } 325 326 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID) 327 return nil 328} 329 330// deleteCommunity removes a community from the index 331func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error { 332 err := c.repo.Delete(ctx, did) 333 if err != nil { 334 if communities.IsNotFound(err) { 335 log.Printf("Community already deleted: %s", did) 336 return nil 337 } 338 return fmt.Errorf("failed to delete community: %w", err) 339 } 340 341 log.Printf("Deleted community: %s", did) 342 return nil 343} 344 345// verifyHostedByClaim verifies that the community's hostedBy claim matches the handle domain 346// This prevents malicious instances from claiming to host communities for domains they don't own 347func (c *CommunityEventConsumer) verifyHostedByClaim(ctx context.Context, handle, hostedByDID string) error { 348 // Skip verification in dev mode 349 if c.skipVerification { 350 return nil 351 } 352 353 // Add 15 second overall timeout to prevent slow verification from blocking consumer (PR review feedback) 354 ctx, cancel := context.WithTimeout(ctx, 15*time.Second) 355 defer cancel() 356 357 // Verify hostedByDID is did:web format 358 if !strings.HasPrefix(hostedByDID, "did:web:") { 359 return fmt.Errorf("hostedByDID must use did:web method, got: %s", hostedByDID) 360 } 361 362 // Extract domain from did:web DID 363 hostedByDomain := strings.TrimPrefix(hostedByDID, "did:web:") 364 365 // Extract domain from community handle 366 // Handle format examples: 367 // - "!gaming@coves.social" → domain: "coves.social" 368 // - "gaming.community.coves.social" → domain: "coves.social" 369 handleDomain := extractDomainFromHandle(handle) 370 if handleDomain == "" { 371 return fmt.Errorf("failed to extract domain from handle: %s", handle) 372 } 373 374 // Verify handle domain matches hostedBy domain 375 if handleDomain != hostedByDomain { 376 return fmt.Errorf("handle domain (%s) doesn't match hostedBy domain (%s)", handleDomain, hostedByDomain) 377 } 378 379 // SECURITY: Verify DID document exists and is valid (Bluesky-compatible security model) 380 // MANDATORY bidirectional verification: DID document must claim this handle in alsoKnownAs 381 // This matches Bluesky's security requirements and prevents domain impersonation 382 if err := c.verifyDIDDocument(ctx, hostedByDID, hostedByDomain, handle); err != nil { 383 log.Printf("🚨 SECURITY: Rejecting community - bidirectional DID verification failed: %v", err) 384 return fmt.Errorf("bidirectional DID verification required: %w", err) 385 } 386 387 return nil 388} 389 390// verifyDIDDocument fetches and validates the DID document from .well-known/did.json 391// Implements Bluesky's bidirectional verification model: 392// 1. Verify DID document exists at https://domain/.well-known/did.json 393// 2. Verify DID document ID matches claimed DID 394// 3. Verify DID document claims the handle in alsoKnownAs field 395// Results are cached with TTL and rate-limited to prevent DoS attacks 396func (c *CommunityEventConsumer) verifyDIDDocument(ctx context.Context, did, domain, handle string) error { 397 // Skip verification in dev mode 398 if c.skipVerification { 399 return nil 400 } 401 402 // Check bounded LRU cache first (thread-safe, no locks needed) 403 if cached, ok := c.didCache.Get(did); ok { 404 // Check if cache entry is still valid (not expired) 405 if time.Now().Before(cached.expiresAt) { 406 if !cached.valid { 407 return fmt.Errorf("cached verification failure for %s", did) 408 } 409 log.Printf("✓ DID document verification (cached): %s", domain) 410 return nil 411 } 412 // Cache entry expired - remove it to free up space for fresh entries 413 c.didCache.Remove(did) 414 } 415 416 // Rate limit .well-known fetches to prevent DoS 417 if err := c.wellKnownLimiter.Wait(ctx); err != nil { 418 return fmt.Errorf("rate limit exceeded for .well-known fetch: %w", err) 419 } 420 421 // Construct .well-known URL 422 didDocURL := fmt.Sprintf("https://%s/.well-known/did.json", domain) 423 424 // Create HTTP request with timeout 425 req, err := http.NewRequestWithContext(ctx, "GET", didDocURL, nil) 426 if err != nil { 427 // Cache the failure 428 c.cacheVerificationResult(did, false, 5*time.Minute) 429 return fmt.Errorf("failed to create request: %w", err) 430 } 431 432 // Fetch DID document using shared HTTP client 433 resp, err := c.httpClient.Do(req) 434 if err != nil { 435 // Cache the failure (shorter TTL for network errors) 436 c.cacheVerificationResult(did, false, 5*time.Minute) 437 return fmt.Errorf("failed to fetch DID document from %s: %w", didDocURL, err) 438 } 439 defer func() { 440 if closeErr := resp.Body.Close(); closeErr != nil { 441 log.Printf("Failed to close response body: %v", closeErr) 442 } 443 }() 444 445 // Verify HTTP status 446 if resp.StatusCode != http.StatusOK { 447 // Cache the failure 448 c.cacheVerificationResult(did, false, 5*time.Minute) 449 return fmt.Errorf("DID document returned HTTP %d from %s", resp.StatusCode, didDocURL) 450 } 451 452 // Parse DID document 453 var didDoc struct { 454 ID string `json:"id"` 455 AlsoKnownAs []string `json:"alsoKnownAs"` 456 } 457 if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil { 458 // Cache the failure 459 c.cacheVerificationResult(did, false, 5*time.Minute) 460 return fmt.Errorf("failed to parse DID document JSON: %w", err) 461 } 462 463 // Verify DID document ID matches claimed DID 464 if didDoc.ID != did { 465 // Cache the failure 466 c.cacheVerificationResult(did, false, 5*time.Minute) 467 return fmt.Errorf("DID document ID (%s) doesn't match claimed DID (%s)", didDoc.ID, did) 468 } 469 470 // SECURITY: Bidirectional verification - DID document must claim this handle 471 // Prevents impersonation where someone points DNS to another user's DID 472 // Format: handle "coves.social" or "!community@coves.social" → check for "at://coves.social" 473 handleDomain := extractDomainFromHandle(handle) 474 expectedAlias := fmt.Sprintf("at://%s", handleDomain) 475 476 found := false 477 for _, alias := range didDoc.AlsoKnownAs { 478 if alias == expectedAlias { 479 found = true 480 break 481 } 482 } 483 484 if !found { 485 // Cache the failure 486 c.cacheVerificationResult(did, false, 5*time.Minute) 487 return fmt.Errorf("DID document does not claim handle domain %s in alsoKnownAs (expected %s, got %v)", 488 handleDomain, expectedAlias, didDoc.AlsoKnownAs) 489 } 490 491 // Cache the success (24 hour TTL - matches Bluesky recommendations) 492 c.cacheVerificationResult(did, true, 24*time.Hour) 493 494 log.Printf("✓ DID document verified: %s", domain) 495 return nil 496} 497 498// cacheVerificationResult stores a verification result in the bounded LRU cache with the given TTL 499// The LRU cache is thread-safe and automatically evicts least-recently-used entries when full 500func (c *CommunityEventConsumer) cacheVerificationResult(did string, valid bool, ttl time.Duration) { 501 c.didCache.Add(did, cachedDIDDoc{ 502 valid: valid, 503 expiresAt: time.Now().Add(ttl), 504 }) 505} 506 507// extractDomainFromHandle extracts the registrable domain from a community handle 508// Handles both formats: 509// - Bluesky-style: "!gaming@coves.social" → "coves.social" 510// - DNS-style: "gaming.community.coves.social" → "coves.social" 511// 512// Uses golang.org/x/net/publicsuffix to correctly handle multi-part TLDs: 513// - "gaming.community.coves.co.uk" → "coves.co.uk" (not "co.uk") 514// - "gaming.community.example.com.au" → "example.com.au" (not "com.au") 515func extractDomainFromHandle(handle string) string { 516 // Remove leading ! if present 517 handle = strings.TrimPrefix(handle, "!") 518 519 // Check for @-separated format (e.g., "gaming@coves.social") 520 if strings.Contains(handle, "@") { 521 parts := strings.Split(handle, "@") 522 if len(parts) == 2 { 523 domain := parts[1] 524 // Validate and extract eTLD+1 from the @-domain part 525 registrable, err := publicsuffix.EffectiveTLDPlusOne(domain) 526 if err != nil { 527 // If publicsuffix fails, fall back to returning the full domain part 528 // This handles edge cases like localhost, IP addresses, etc. 529 return domain 530 } 531 return registrable 532 } 533 return "" 534 } 535 536 // For DNS-style handles (e.g., "gaming.community.coves.social") 537 // Extract the registrable domain (eTLD+1) using publicsuffix 538 // This correctly handles multi-part TLDs like .co.uk, .com.au, etc. 539 registrable, err := publicsuffix.EffectiveTLDPlusOne(handle) 540 if err != nil { 541 // If publicsuffix fails (e.g., invalid TLD, localhost, IP address) 542 // fall back to naive extraction (last 2 parts) 543 // This maintains backward compatibility for edge cases 544 parts := strings.Split(handle, ".") 545 if len(parts) < 2 { 546 return "" // Invalid handle 547 } 548 return strings.Join(parts[len(parts)-2:], ".") 549 } 550 551 return registrable 552} 553 554// handleSubscription processes subscription create/delete events 555// CREATE operation = user subscribed to community 556// DELETE operation = user unsubscribed from community 557func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 558 switch commit.Operation { 559 case "create": 560 return c.createSubscription(ctx, userDID, commit) 561 case "delete": 562 return c.deleteSubscription(ctx, userDID, commit) 563 default: 564 // Update operations shouldn't happen on subscriptions, but ignore gracefully 565 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)", 566 commit.Operation, userDID, commit.RKey) 567 return nil 568 } 569} 570 571// createSubscription indexes a new subscription with retry logic 572func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 573 if commit.Record == nil { 574 return fmt.Errorf("subscription create event missing record data") 575 } 576 577 // Extract community DID from record's subject field (following atProto conventions) 578 communityDID, ok := commit.Record["subject"].(string) 579 if !ok { 580 return fmt.Errorf("subscription record missing subject field") 581 } 582 583 // Extract contentVisibility with clamping and default value 584 contentVisibility := extractContentVisibility(commit.Record) 585 586 // Build AT-URI for subscription record 587 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint 588 // The record lives in the USER's repository, but uses the communities namespace 589 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 590 591 // Create subscription entity 592 // Parse createdAt from record to preserve chronological ordering during replays 593 subscription := &communities.Subscription{ 594 UserDID: userDID, 595 CommunityDID: communityDID, 596 ContentVisibility: contentVisibility, 597 SubscribedAt: utils.ParseCreatedAt(commit.Record), 598 RecordURI: uri, 599 RecordCID: commit.CID, 600 } 601 602 // Use transactional method to ensure subscription and count are atomically updated 603 // This is idempotent - safe for Jetstream replays 604 _, err := c.repo.SubscribeWithCount(ctx, subscription) 605 if err != nil { 606 // If already exists, that's fine (idempotency) 607 if communities.IsConflict(err) { 608 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)", 609 userDID, communityDID, contentVisibility) 610 return nil 611 } 612 return fmt.Errorf("failed to index subscription: %w", err) 613 } 614 615 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)", 616 userDID, communityDID, contentVisibility) 617 return nil 618} 619 620// deleteSubscription removes a subscription from the index 621// DELETE operations don't include record data, so we need to look up the subscription 622// by its URI to find which community the user unsubscribed from 623func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 624 // Build AT-URI from the rkey 625 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 626 627 // Look up the subscription to get the community DID 628 // (DELETE operations don't include record data in Jetstream) 629 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri) 630 if err != nil { 631 if communities.IsNotFound(err) { 632 // Already deleted - this is fine (idempotency) 633 log.Printf("Subscription already deleted: %s", uri) 634 return nil 635 } 636 return fmt.Errorf("failed to find subscription for deletion: %w", err) 637 } 638 639 // Use transactional method to ensure unsubscribe and count are atomically updated 640 // This is idempotent - safe for Jetstream replays 641 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID) 642 if err != nil { 643 if communities.IsNotFound(err) { 644 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID) 645 return nil 646 } 647 return fmt.Errorf("failed to remove subscription: %w", err) 648 } 649 650 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID) 651 return nil 652} 653 654// handleBlock processes block create/delete events 655// CREATE operation = user blocked a community 656// DELETE operation = user unblocked a community 657func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 658 switch commit.Operation { 659 case "create": 660 return c.createBlock(ctx, userDID, commit) 661 case "delete": 662 return c.deleteBlock(ctx, userDID, commit) 663 default: 664 // Update operations shouldn't happen on blocks, but ignore gracefully 665 log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)", 666 commit.Operation, userDID, commit.RKey) 667 return nil 668 } 669} 670 671// createBlock indexes a new block 672func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 673 if commit.Record == nil { 674 return fmt.Errorf("block create event missing record data") 675 } 676 677 // Extract community DID from record's subject field (following atProto conventions) 678 communityDID, ok := commit.Record["subject"].(string) 679 if !ok { 680 return fmt.Errorf("block record missing subject field") 681 } 682 683 // Build AT-URI for block record 684 // The record lives in the USER's repository 685 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 686 687 // Create block entity 688 // Parse createdAt from record to preserve chronological ordering during replays 689 block := &communities.CommunityBlock{ 690 UserDID: userDID, 691 CommunityDID: communityDID, 692 BlockedAt: utils.ParseCreatedAt(commit.Record), 693 RecordURI: uri, 694 RecordCID: commit.CID, 695 } 696 697 // Index the block 698 // This is idempotent - safe for Jetstream replays 699 _, err := c.repo.BlockCommunity(ctx, block) 700 if err != nil { 701 // If already exists, that's fine (idempotency) 702 if communities.IsConflict(err) { 703 log.Printf("Block already indexed: %s -> %s", userDID, communityDID) 704 return nil 705 } 706 return fmt.Errorf("failed to index block: %w", err) 707 } 708 709 log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID) 710 return nil 711} 712 713// deleteBlock removes a block from the index 714// DELETE operations don't include record data, so we need to look up the block 715// by its URI to find which community the user unblocked 716func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 717 // Build AT-URI from the rkey 718 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 719 720 // Look up the block to get the community DID 721 // (DELETE operations don't include record data in Jetstream) 722 block, err := c.repo.GetBlockByURI(ctx, uri) 723 if err != nil { 724 if communities.IsNotFound(err) { 725 // Already deleted - this is fine (idempotency) 726 log.Printf("Block already deleted: %s", uri) 727 return nil 728 } 729 return fmt.Errorf("failed to find block for deletion: %w", err) 730 } 731 732 // Remove the block from the index 733 err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID) 734 if err != nil { 735 if communities.IsNotFound(err) { 736 log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID) 737 return nil 738 } 739 return fmt.Errorf("failed to remove block: %w", err) 740 } 741 742 log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID) 743 return nil 744} 745 746// Helper types and functions 747 748type CommunityProfile struct { 749 CreatedAt time.Time `json:"createdAt"` 750 Avatar map[string]interface{} `json:"avatar"` 751 Banner map[string]interface{} `json:"banner"` 752 CreatedBy string `json:"createdBy"` 753 Visibility string `json:"visibility"` 754 AtprotoHandle string `json:"atprotoHandle"` 755 DisplayName string `json:"displayName"` 756 Name string `json:"name"` 757 Handle string `json:"handle"` 758 HostedBy string `json:"hostedBy"` 759 Description string `json:"description"` 760 FederatedID string `json:"federatedId"` 761 ModerationType string `json:"moderationType"` 762 FederatedFrom string `json:"federatedFrom"` 763 ContentWarnings []string `json:"contentWarnings"` 764 DescriptionFacets []interface{} `json:"descriptionFacets"` 765 MemberCount int `json:"memberCount"` 766 SubscriberCount int `json:"subscriberCount"` 767 Federation FederationConfig `json:"federation"` 768} 769 770type FederationConfig struct { 771 AllowExternalDiscovery bool `json:"allowExternalDiscovery"` 772} 773 774// parseCommunityProfile converts a raw record map to a CommunityProfile 775func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) { 776 recordJSON, err := json.Marshal(record) 777 if err != nil { 778 return nil, fmt.Errorf("failed to marshal record: %w", err) 779 } 780 781 var profile CommunityProfile 782 if err := json.Unmarshal(recordJSON, &profile); err != nil { 783 return nil, fmt.Errorf("failed to unmarshal profile: %w", err) 784 } 785 786 return &profile, nil 787} 788 789// constructHandleFromProfile constructs a deterministic handle from profile data 790// Format: {name}.community.{instanceDomain} 791// Example: gaming.community.coves.social 792// This is ONLY used in test mode (when identity resolver is nil) 793// Production MUST resolve handles from PLC (source of truth) 794// Returns empty string if hostedBy is not did:web format (caller will fail validation) 795func constructHandleFromProfile(profile *CommunityProfile) string { 796 if !strings.HasPrefix(profile.HostedBy, "did:web:") { 797 // hostedBy must be did:web format for handle construction 798 // Return empty to trigger validation error in repository 799 return "" 800 } 801 instanceDomain := strings.TrimPrefix(profile.HostedBy, "did:web:") 802 return fmt.Sprintf("%s.community.%s", profile.Name, instanceDomain) 803} 804 805// extractContentVisibility extracts contentVisibility from subscription record with clamping 806// Returns default value of 3 if missing or invalid 807func extractContentVisibility(record map[string]interface{}) int { 808 const defaultVisibility = 3 809 810 cv, ok := record["contentVisibility"] 811 if !ok { 812 // Field missing - use default 813 return defaultVisibility 814 } 815 816 // JSON numbers decode as float64 817 cvFloat, ok := cv.(float64) 818 if !ok { 819 // Try int (shouldn't happen but handle gracefully) 820 if cvInt, isInt := cv.(int); isInt { 821 return clampContentVisibility(cvInt) 822 } 823 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv) 824 return defaultVisibility 825 } 826 827 // Convert and clamp 828 clamped := clampContentVisibility(int(cvFloat)) 829 if clamped != int(cvFloat) { 830 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped) 831 } 832 return clamped 833} 834 835// clampContentVisibility ensures value is within valid range (1-5) 836func clampContentVisibility(value int) int { 837 if value < 1 { 838 return 1 839 } 840 if value > 5 { 841 return 5 842 } 843 return value 844} 845 846// extractBlobCID extracts the CID from a blob reference 847// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123} 848func extractBlobCID(blob map[string]interface{}) (string, bool) { 849 if blob == nil { 850 return "", false 851 } 852 853 // Check if it's a blob type 854 blobType, ok := blob["$type"].(string) 855 if !ok || blobType != "blob" { 856 return "", false 857 } 858 859 // Extract ref 860 ref, ok := blob["ref"].(map[string]interface{}) 861 if !ok { 862 return "", false 863 } 864 865 // Extract $link (the CID) 866 link, ok := ref["$link"].(string) 867 if !ok { 868 return "", false 869 } 870 871 return link, true 872}