A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/atproto/identity" 5 "Coves/internal/atproto/utils" 6 "Coves/internal/core/communities" 7 "context" 8 "encoding/json" 9 "fmt" 10 "log" 11 "net/http" 12 "strings" 13 "time" 14 15 lru "github.com/hashicorp/golang-lru/v2" 16 "golang.org/x/net/publicsuffix" 17 "golang.org/x/time/rate" 18) 19 20// CommunityEventConsumer consumes community-related events from Jetstream 21type CommunityEventConsumer struct { 22 repo communities.Repository // Repository for community operations 23 identityResolver interface { 24 Resolve(context.Context, string) (*identity.Identity, error) 25 } // For resolving handles from DIDs 26 httpClient *http.Client // Shared HTTP client with connection pooling 27 didCache *lru.Cache[string, cachedDIDDoc] // Bounded LRU cache for .well-known verification results 28 wellKnownLimiter *rate.Limiter // Rate limiter for .well-known fetches 29 instanceDID string // DID of this Coves instance 30 skipVerification bool // Skip did:web verification (for dev mode) 31} 32 33// cachedDIDDoc represents a cached verification result with expiration 34type cachedDIDDoc struct { 35 expiresAt time.Time // When this cache entry expires 36 valid bool // Whether verification passed 37} 38 39// NewCommunityEventConsumer creates a new Jetstream consumer for community events 40// instanceDID: The DID of this Coves instance (for hostedBy verification) 41// skipVerification: Skip did:web verification (for dev mode) 42// identityResolver: Optional resolver for resolving handles from DIDs (can be nil for tests) 43func NewCommunityEventConsumer(repo communities.Repository, instanceDID string, skipVerification bool, identityResolver interface { 44 Resolve(context.Context, string) (*identity.Identity, error) 45}, 46) *CommunityEventConsumer { 47 // Create bounded LRU cache for DID document verification results 48 // Max 1000 entries to prevent unbounded memory growth (PR review feedback) 49 // Each entry ~100 bytes → max ~100KB memory overhead 50 cache, err := lru.New[string, cachedDIDDoc](1000) 51 if err != nil { 52 // This should never happen with a valid size, but handle gracefully 53 log.Printf("WARNING: Failed to create DID cache, verification will be slower: %v", err) 54 // Create minimal cache to avoid nil pointer 55 cache, _ = lru.New[string, cachedDIDDoc](1) 56 } 57 58 return &CommunityEventConsumer{ 59 repo: repo, 60 identityResolver: identityResolver, // Optional - can be nil for tests 61 instanceDID: instanceDID, 62 skipVerification: skipVerification, 63 // Shared HTTP client with connection pooling for .well-known fetches 64 httpClient: &http.Client{ 65 Timeout: 10 * time.Second, 66 Transport: &http.Transport{ 67 MaxIdleConns: 100, 68 MaxIdleConnsPerHost: 10, 69 IdleConnTimeout: 90 * time.Second, 70 }, 71 }, 72 // Bounded LRU cache for .well-known verification results (max 1000 entries) 73 // Automatically evicts least-recently-used entries when full 74 didCache: cache, 75 // Rate limiter: 10 requests per second, burst of 20 76 // Prevents DoS via excessive .well-known fetches 77 wellKnownLimiter: rate.NewLimiter(10, 20), 78 } 79} 80 81// HandleEvent processes a Jetstream event for community records 82// This is called by the main Jetstream consumer when it receives commit events 83func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 84 // We only care about commit events for community records 85 if event.Kind != "commit" || event.Commit == nil { 86 return nil 87 } 88 89 commit := event.Commit 90 91 // Route to appropriate handler based on collection 92 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures 93 // - social.coves.community.profile: Community profile records (in community's own repo) 94 // - social.coves.community.subscription: Subscription records (in user's repo) 95 // - social.coves.community.block: Block records (in user's repo) 96 // 97 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints 98 // that CREATE or DELETE records in these collections 99 switch commit.Collection { 100 case "social.coves.community.profile": 101 return c.handleCommunityProfile(ctx, event.Did, commit) 102 case "social.coves.community.subscription": 103 // Handle both create (subscribe) and delete (unsubscribe) operations 104 return c.handleSubscription(ctx, event.Did, commit) 105 case "social.coves.community.block": 106 // Handle both create (block) and delete (unblock) operations 107 return c.handleBlock(ctx, event.Did, commit) 108 default: 109 // Not a community-related collection 110 return nil 111 } 112} 113 114// handleCommunityProfile processes community profile create/update/delete events 115func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error { 116 switch commit.Operation { 117 case "create": 118 return c.createCommunity(ctx, did, commit) 119 case "update": 120 return c.updateCommunity(ctx, did, commit) 121 case "delete": 122 return c.deleteCommunity(ctx, did) 123 default: 124 log.Printf("Unknown operation for community profile: %s", commit.Operation) 125 return nil 126 } 127} 128 129// createCommunity indexes a new community from the firehose 130func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error { 131 if commit.Record == nil { 132 return fmt.Errorf("community profile create event missing record data") 133 } 134 135 // Parse the community profile record 136 profile, err := parseCommunityProfile(commit.Record) 137 if err != nil { 138 return fmt.Errorf("failed to parse community profile: %w", err) 139 } 140 141 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs) 142 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID 143 if profile.Handle == "" { 144 if c.identityResolver != nil { 145 // Production: Resolve handle from PLC (source of truth) 146 // NO FALLBACK - if PLC is down, we fail and backfill later 147 // This prevents creating communities with incorrect handles in federated scenarios 148 identity, err := c.identityResolver.Resolve(ctx, did) 149 if err != nil { 150 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err) 151 } 152 profile.Handle = identity.Handle 153 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)", 154 profile.Handle, did, identity.Method) 155 } else { 156 // Test mode only: construct deterministically when no resolver available 157 profile.Handle = constructHandleFromProfile(profile) 158 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)", 159 profile.Handle, profile.Name, profile.HostedBy) 160 } 161 } 162 163 // SECURITY: Verify hostedBy claim matches handle domain 164 // This prevents malicious instances from claiming to host communities for domains they don't own 165 if err := c.verifyHostedByClaim(ctx, profile.Handle, profile.HostedBy); err != nil { 166 log.Printf("🚨 SECURITY: Rejecting community %s - hostedBy verification failed: %v", did, err) 167 log.Printf(" Handle: %s, HostedBy: %s", profile.Handle, profile.HostedBy) 168 return fmt.Errorf("hostedBy verification failed: %w", err) 169 } 170 171 // Build AT-URI for this record 172 // V2 Architecture (ONLY): 173 // - 'did' parameter IS the community DID (community owns its own repo) 174 // - rkey MUST be "self" for community profiles 175 // - URI: at://community_did/social.coves.community.profile/self 176 177 // REJECT non-V2 communities (pre-production: no V1 compatibility) 178 if commit.RKey != "self" { 179 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 180 } 181 182 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did) 183 184 // V2: Community ALWAYS owns itself 185 ownerDID := did 186 187 // Create community entity 188 community := &communities.Community{ 189 DID: did, // V2: Repository DID IS the community DID 190 Handle: profile.Handle, 191 Name: profile.Name, 192 DisplayName: profile.DisplayName, 193 Description: profile.Description, 194 OwnerDID: ownerDID, // V2: same as DID (self-owned) 195 CreatedByDID: profile.CreatedBy, 196 HostedByDID: profile.HostedBy, 197 Visibility: profile.Visibility, 198 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery, 199 ModerationType: profile.ModerationType, 200 ContentWarnings: profile.ContentWarnings, 201 MemberCount: profile.MemberCount, 202 SubscriberCount: profile.SubscriberCount, 203 FederatedFrom: profile.FederatedFrom, 204 FederatedID: profile.FederatedID, 205 CreatedAt: profile.CreatedAt, 206 UpdatedAt: time.Now(), 207 RecordURI: uri, 208 RecordCID: commit.CID, 209 } 210 211 // Handle blobs (avatar/banner) if present 212 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 213 community.AvatarCID = avatarCID 214 } 215 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 216 community.BannerCID = bannerCID 217 } 218 219 // Handle description facets (rich text) 220 if profile.DescriptionFacets != nil { 221 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 222 if marshalErr == nil { 223 community.DescriptionFacets = facetsJSON 224 } 225 } 226 227 // Index in AppView database 228 _, err = c.repo.Create(ctx, community) 229 if err != nil { 230 // Check if it already exists (idempotency) 231 if communities.IsConflict(err) { 232 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID) 233 return nil 234 } 235 return fmt.Errorf("failed to index community: %w", err) 236 } 237 238 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID) 239 return nil 240} 241 242// updateCommunity updates an existing community from the firehose 243func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error { 244 if commit.Record == nil { 245 return fmt.Errorf("community profile update event missing record data") 246 } 247 248 // REJECT non-V2 communities (pre-production: no V1 compatibility) 249 if commit.RKey != "self" { 250 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 251 } 252 253 // Parse profile 254 profile, err := parseCommunityProfile(commit.Record) 255 if err != nil { 256 return fmt.Errorf("failed to parse community profile: %w", err) 257 } 258 259 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs) 260 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID 261 if profile.Handle == "" { 262 if c.identityResolver != nil { 263 // Production: Resolve handle from PLC (source of truth) 264 // NO FALLBACK - if PLC is down, we fail and backfill later 265 // This prevents creating communities with incorrect handles in federated scenarios 266 identity, err := c.identityResolver.Resolve(ctx, did) 267 if err != nil { 268 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err) 269 } 270 profile.Handle = identity.Handle 271 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)", 272 profile.Handle, did, identity.Method) 273 } else { 274 // Test mode only: construct deterministically when no resolver available 275 profile.Handle = constructHandleFromProfile(profile) 276 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)", 277 profile.Handle, profile.Name, profile.HostedBy) 278 } 279 } 280 281 // V2: Repository DID IS the community DID 282 // Get existing community using the repo DID 283 existing, err := c.repo.GetByDID(ctx, did) 284 if err != nil { 285 if communities.IsNotFound(err) { 286 // Community doesn't exist yet - treat as create 287 log.Printf("Community not found for update, creating: %s", did) 288 return c.createCommunity(ctx, did, commit) 289 } 290 return fmt.Errorf("failed to get existing community: %w", err) 291 } 292 293 // Update fields 294 existing.Handle = profile.Handle 295 existing.Name = profile.Name 296 existing.DisplayName = profile.DisplayName 297 existing.Description = profile.Description 298 existing.Visibility = profile.Visibility 299 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery 300 existing.ModerationType = profile.ModerationType 301 existing.ContentWarnings = profile.ContentWarnings 302 existing.RecordCID = commit.CID 303 304 // Update blobs 305 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 306 existing.AvatarCID = avatarCID 307 } 308 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 309 existing.BannerCID = bannerCID 310 } 311 312 // Update description facets 313 if profile.DescriptionFacets != nil { 314 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 315 if marshalErr == nil { 316 existing.DescriptionFacets = facetsJSON 317 } 318 } 319 320 // Save updates 321 _, err = c.repo.Update(ctx, existing) 322 if err != nil { 323 return fmt.Errorf("failed to update community: %w", err) 324 } 325 326 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID) 327 return nil 328} 329 330// deleteCommunity removes a community from the index 331func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error { 332 err := c.repo.Delete(ctx, did) 333 if err != nil { 334 if communities.IsNotFound(err) { 335 log.Printf("Community already deleted: %s", did) 336 return nil 337 } 338 return fmt.Errorf("failed to delete community: %w", err) 339 } 340 341 log.Printf("Deleted community: %s", did) 342 return nil 343} 344 345// verifyHostedByClaim verifies that the community's hostedBy claim matches the handle domain 346// This prevents malicious instances from claiming to host communities for domains they don't own 347func (c *CommunityEventConsumer) verifyHostedByClaim(ctx context.Context, handle, hostedByDID string) error { 348 // Skip verification in dev mode 349 if c.skipVerification { 350 return nil 351 } 352 353 // Add 15 second overall timeout to prevent slow verification from blocking consumer (PR review feedback) 354 ctx, cancel := context.WithTimeout(ctx, 15*time.Second) 355 defer cancel() 356 357 // Verify hostedByDID is did:web format 358 if !strings.HasPrefix(hostedByDID, "did:web:") { 359 return fmt.Errorf("hostedByDID must use did:web method, got: %s", hostedByDID) 360 } 361 362 // Extract domain from did:web DID 363 hostedByDomain := strings.TrimPrefix(hostedByDID, "did:web:") 364 365 // Extract domain from community handle 366 // Handle format examples: 367 // - "!gaming@coves.social" → domain: "coves.social" 368 // - "gaming.community.coves.social" → domain: "coves.social" 369 handleDomain := extractDomainFromHandle(handle) 370 if handleDomain == "" { 371 return fmt.Errorf("failed to extract domain from handle: %s", handle) 372 } 373 374 // Verify handle domain matches hostedBy domain 375 if handleDomain != hostedByDomain { 376 return fmt.Errorf("handle domain (%s) doesn't match hostedBy domain (%s)", handleDomain, hostedByDomain) 377 } 378 379 // SECURITY: Verify DID document exists and is valid (Bluesky-compatible security model) 380 // MANDATORY bidirectional verification: DID document must claim this handle in alsoKnownAs 381 // This matches Bluesky's security requirements and prevents domain impersonation 382 if err := c.verifyDIDDocument(ctx, hostedByDID, hostedByDomain, handle); err != nil { 383 log.Printf("🚨 SECURITY: Rejecting community - bidirectional DID verification failed: %v", err) 384 return fmt.Errorf("bidirectional DID verification required: %w", err) 385 } 386 387 return nil 388} 389 390// verifyDIDDocument fetches and validates the DID document from .well-known/did.json 391// Implements Bluesky's bidirectional verification model: 392// 1. Verify DID document exists at https://domain/.well-known/did.json 393// 2. Verify DID document ID matches claimed DID 394// 3. Verify DID document claims the handle in alsoKnownAs field 395// 396// Results are cached with TTL and rate-limited to prevent DoS attacks 397func (c *CommunityEventConsumer) verifyDIDDocument(ctx context.Context, did, domain, handle string) error { 398 // Skip verification in dev mode 399 if c.skipVerification { 400 return nil 401 } 402 403 // Check bounded LRU cache first (thread-safe, no locks needed) 404 if cached, ok := c.didCache.Get(did); ok { 405 // Check if cache entry is still valid (not expired) 406 if time.Now().Before(cached.expiresAt) { 407 if !cached.valid { 408 return fmt.Errorf("cached verification failure for %s", did) 409 } 410 log.Printf("✓ DID document verification (cached): %s", domain) 411 return nil 412 } 413 // Cache entry expired - remove it to free up space for fresh entries 414 c.didCache.Remove(did) 415 } 416 417 // Rate limit .well-known fetches to prevent DoS 418 if err := c.wellKnownLimiter.Wait(ctx); err != nil { 419 return fmt.Errorf("rate limit exceeded for .well-known fetch: %w", err) 420 } 421 422 // Construct .well-known URL 423 didDocURL := fmt.Sprintf("https://%s/.well-known/did.json", domain) 424 425 // Create HTTP request with timeout 426 req, err := http.NewRequestWithContext(ctx, "GET", didDocURL, nil) 427 if err != nil { 428 // Cache the failure 429 c.cacheVerificationResult(did, false, 5*time.Minute) 430 return fmt.Errorf("failed to create request: %w", err) 431 } 432 433 // Fetch DID document using shared HTTP client 434 resp, err := c.httpClient.Do(req) 435 if err != nil { 436 // Cache the failure (shorter TTL for network errors) 437 c.cacheVerificationResult(did, false, 5*time.Minute) 438 return fmt.Errorf("failed to fetch DID document from %s: %w", didDocURL, err) 439 } 440 defer func() { 441 if closeErr := resp.Body.Close(); closeErr != nil { 442 log.Printf("Failed to close response body: %v", closeErr) 443 } 444 }() 445 446 // Verify HTTP status 447 if resp.StatusCode != http.StatusOK { 448 // Cache the failure 449 c.cacheVerificationResult(did, false, 5*time.Minute) 450 return fmt.Errorf("DID document returned HTTP %d from %s", resp.StatusCode, didDocURL) 451 } 452 453 // Parse DID document 454 var didDoc struct { 455 ID string `json:"id"` 456 AlsoKnownAs []string `json:"alsoKnownAs"` 457 } 458 if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil { 459 // Cache the failure 460 c.cacheVerificationResult(did, false, 5*time.Minute) 461 return fmt.Errorf("failed to parse DID document JSON: %w", err) 462 } 463 464 // Verify DID document ID matches claimed DID 465 if didDoc.ID != did { 466 // Cache the failure 467 c.cacheVerificationResult(did, false, 5*time.Minute) 468 return fmt.Errorf("DID document ID (%s) doesn't match claimed DID (%s)", didDoc.ID, did) 469 } 470 471 // SECURITY: Bidirectional verification - DID document must claim this handle 472 // Prevents impersonation where someone points DNS to another user's DID 473 // Format: handle "coves.social" or "!community@coves.social" → check for "at://coves.social" 474 handleDomain := extractDomainFromHandle(handle) 475 expectedAlias := fmt.Sprintf("at://%s", handleDomain) 476 477 found := false 478 for _, alias := range didDoc.AlsoKnownAs { 479 if alias == expectedAlias { 480 found = true 481 break 482 } 483 } 484 485 if !found { 486 // Cache the failure 487 c.cacheVerificationResult(did, false, 5*time.Minute) 488 return fmt.Errorf("DID document does not claim handle domain %s in alsoKnownAs (expected %s, got %v)", 489 handleDomain, expectedAlias, didDoc.AlsoKnownAs) 490 } 491 492 // Cache the success (24 hour TTL - matches Bluesky recommendations) 493 c.cacheVerificationResult(did, true, 24*time.Hour) 494 495 log.Printf("✓ DID document verified: %s", domain) 496 return nil 497} 498 499// cacheVerificationResult stores a verification result in the bounded LRU cache with the given TTL 500// The LRU cache is thread-safe and automatically evicts least-recently-used entries when full 501func (c *CommunityEventConsumer) cacheVerificationResult(did string, valid bool, ttl time.Duration) { 502 c.didCache.Add(did, cachedDIDDoc{ 503 valid: valid, 504 expiresAt: time.Now().Add(ttl), 505 }) 506} 507 508// extractDomainFromHandle extracts the registrable domain from a community handle 509// Handles both formats: 510// - Bluesky-style: "!gaming@coves.social" → "coves.social" 511// - DNS-style: "gaming.community.coves.social" → "coves.social" 512// 513// Uses golang.org/x/net/publicsuffix to correctly handle multi-part TLDs: 514// - "gaming.community.coves.co.uk" → "coves.co.uk" (not "co.uk") 515// - "gaming.community.example.com.au" → "example.com.au" (not "com.au") 516func extractDomainFromHandle(handle string) string { 517 // Remove leading ! if present 518 handle = strings.TrimPrefix(handle, "!") 519 520 // Check for @-separated format (e.g., "gaming@coves.social") 521 if strings.Contains(handle, "@") { 522 parts := strings.Split(handle, "@") 523 if len(parts) == 2 { 524 domain := parts[1] 525 // Validate and extract eTLD+1 from the @-domain part 526 registrable, err := publicsuffix.EffectiveTLDPlusOne(domain) 527 if err != nil { 528 // If publicsuffix fails, fall back to returning the full domain part 529 // This handles edge cases like localhost, IP addresses, etc. 530 return domain 531 } 532 return registrable 533 } 534 return "" 535 } 536 537 // For DNS-style handles (e.g., "gaming.community.coves.social") 538 // Extract the registrable domain (eTLD+1) using publicsuffix 539 // This correctly handles multi-part TLDs like .co.uk, .com.au, etc. 540 registrable, err := publicsuffix.EffectiveTLDPlusOne(handle) 541 if err != nil { 542 // If publicsuffix fails (e.g., invalid TLD, localhost, IP address) 543 // fall back to naive extraction (last 2 parts) 544 // This maintains backward compatibility for edge cases 545 parts := strings.Split(handle, ".") 546 if len(parts) < 2 { 547 return "" // Invalid handle 548 } 549 return strings.Join(parts[len(parts)-2:], ".") 550 } 551 552 return registrable 553} 554 555// handleSubscription processes subscription create/delete events 556// CREATE operation = user subscribed to community 557// DELETE operation = user unsubscribed from community 558func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 559 switch commit.Operation { 560 case "create": 561 return c.createSubscription(ctx, userDID, commit) 562 case "delete": 563 return c.deleteSubscription(ctx, userDID, commit) 564 default: 565 // Update operations shouldn't happen on subscriptions, but ignore gracefully 566 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)", 567 commit.Operation, userDID, commit.RKey) 568 return nil 569 } 570} 571 572// createSubscription indexes a new subscription with retry logic 573func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 574 if commit.Record == nil { 575 return fmt.Errorf("subscription create event missing record data") 576 } 577 578 // Extract community DID from record's subject field (following atProto conventions) 579 communityDID, ok := commit.Record["subject"].(string) 580 if !ok { 581 return fmt.Errorf("subscription record missing subject field") 582 } 583 584 // Extract contentVisibility with clamping and default value 585 contentVisibility := extractContentVisibility(commit.Record) 586 587 // Build AT-URI for subscription record 588 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint 589 // The record lives in the USER's repository, but uses the communities namespace 590 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 591 592 // Create subscription entity 593 // Parse createdAt from record to preserve chronological ordering during replays 594 subscription := &communities.Subscription{ 595 UserDID: userDID, 596 CommunityDID: communityDID, 597 ContentVisibility: contentVisibility, 598 SubscribedAt: utils.ParseCreatedAt(commit.Record), 599 RecordURI: uri, 600 RecordCID: commit.CID, 601 } 602 603 // Use transactional method to ensure subscription and count are atomically updated 604 // This is idempotent - safe for Jetstream replays 605 _, err := c.repo.SubscribeWithCount(ctx, subscription) 606 if err != nil { 607 // If already exists, that's fine (idempotency) 608 if communities.IsConflict(err) { 609 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)", 610 userDID, communityDID, contentVisibility) 611 return nil 612 } 613 return fmt.Errorf("failed to index subscription: %w", err) 614 } 615 616 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)", 617 userDID, communityDID, contentVisibility) 618 return nil 619} 620 621// deleteSubscription removes a subscription from the index 622// DELETE operations don't include record data, so we need to look up the subscription 623// by its URI to find which community the user unsubscribed from 624func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 625 // Build AT-URI from the rkey 626 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 627 628 // Look up the subscription to get the community DID 629 // (DELETE operations don't include record data in Jetstream) 630 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri) 631 if err != nil { 632 if communities.IsNotFound(err) { 633 // Already deleted - this is fine (idempotency) 634 log.Printf("Subscription already deleted: %s", uri) 635 return nil 636 } 637 return fmt.Errorf("failed to find subscription for deletion: %w", err) 638 } 639 640 // Use transactional method to ensure unsubscribe and count are atomically updated 641 // This is idempotent - safe for Jetstream replays 642 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID) 643 if err != nil { 644 if communities.IsNotFound(err) { 645 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID) 646 return nil 647 } 648 return fmt.Errorf("failed to remove subscription: %w", err) 649 } 650 651 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID) 652 return nil 653} 654 655// handleBlock processes block create/delete events 656// CREATE operation = user blocked a community 657// DELETE operation = user unblocked a community 658func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 659 switch commit.Operation { 660 case "create": 661 return c.createBlock(ctx, userDID, commit) 662 case "delete": 663 return c.deleteBlock(ctx, userDID, commit) 664 default: 665 // Update operations shouldn't happen on blocks, but ignore gracefully 666 log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)", 667 commit.Operation, userDID, commit.RKey) 668 return nil 669 } 670} 671 672// createBlock indexes a new block 673func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 674 if commit.Record == nil { 675 return fmt.Errorf("block create event missing record data") 676 } 677 678 // Extract community DID from record's subject field (following atProto conventions) 679 communityDID, ok := commit.Record["subject"].(string) 680 if !ok { 681 return fmt.Errorf("block record missing subject field") 682 } 683 684 // Build AT-URI for block record 685 // The record lives in the USER's repository 686 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 687 688 // Create block entity 689 // Parse createdAt from record to preserve chronological ordering during replays 690 block := &communities.CommunityBlock{ 691 UserDID: userDID, 692 CommunityDID: communityDID, 693 BlockedAt: utils.ParseCreatedAt(commit.Record), 694 RecordURI: uri, 695 RecordCID: commit.CID, 696 } 697 698 // Index the block 699 // This is idempotent - safe for Jetstream replays 700 _, err := c.repo.BlockCommunity(ctx, block) 701 if err != nil { 702 // If already exists, that's fine (idempotency) 703 if communities.IsConflict(err) { 704 log.Printf("Block already indexed: %s -> %s", userDID, communityDID) 705 return nil 706 } 707 return fmt.Errorf("failed to index block: %w", err) 708 } 709 710 log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID) 711 return nil 712} 713 714// deleteBlock removes a block from the index 715// DELETE operations don't include record data, so we need to look up the block 716// by its URI to find which community the user unblocked 717func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 718 // Build AT-URI from the rkey 719 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 720 721 // Look up the block to get the community DID 722 // (DELETE operations don't include record data in Jetstream) 723 block, err := c.repo.GetBlockByURI(ctx, uri) 724 if err != nil { 725 if communities.IsNotFound(err) { 726 // Already deleted - this is fine (idempotency) 727 log.Printf("Block already deleted: %s", uri) 728 return nil 729 } 730 return fmt.Errorf("failed to find block for deletion: %w", err) 731 } 732 733 // Remove the block from the index 734 err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID) 735 if err != nil { 736 if communities.IsNotFound(err) { 737 log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID) 738 return nil 739 } 740 return fmt.Errorf("failed to remove block: %w", err) 741 } 742 743 log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID) 744 return nil 745} 746 747// Helper types and functions 748 749type CommunityProfile struct { 750 CreatedAt time.Time `json:"createdAt"` 751 Avatar map[string]interface{} `json:"avatar"` 752 Banner map[string]interface{} `json:"banner"` 753 CreatedBy string `json:"createdBy"` 754 Visibility string `json:"visibility"` 755 AtprotoHandle string `json:"atprotoHandle"` 756 DisplayName string `json:"displayName"` 757 Name string `json:"name"` 758 Handle string `json:"handle"` 759 HostedBy string `json:"hostedBy"` 760 Description string `json:"description"` 761 FederatedID string `json:"federatedId"` 762 ModerationType string `json:"moderationType"` 763 FederatedFrom string `json:"federatedFrom"` 764 ContentWarnings []string `json:"contentWarnings"` 765 DescriptionFacets []interface{} `json:"descriptionFacets"` 766 MemberCount int `json:"memberCount"` 767 SubscriberCount int `json:"subscriberCount"` 768 Federation FederationConfig `json:"federation"` 769} 770 771type FederationConfig struct { 772 AllowExternalDiscovery bool `json:"allowExternalDiscovery"` 773} 774 775// parseCommunityProfile converts a raw record map to a CommunityProfile 776func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) { 777 recordJSON, err := json.Marshal(record) 778 if err != nil { 779 return nil, fmt.Errorf("failed to marshal record: %w", err) 780 } 781 782 var profile CommunityProfile 783 if err := json.Unmarshal(recordJSON, &profile); err != nil { 784 return nil, fmt.Errorf("failed to unmarshal profile: %w", err) 785 } 786 787 return &profile, nil 788} 789 790// constructHandleFromProfile constructs a deterministic handle from profile data 791// Format: {name}.community.{instanceDomain} 792// Example: gaming.community.coves.social 793// This is ONLY used in test mode (when identity resolver is nil) 794// Production MUST resolve handles from PLC (source of truth) 795// Returns empty string if hostedBy is not did:web format (caller will fail validation) 796func constructHandleFromProfile(profile *CommunityProfile) string { 797 if !strings.HasPrefix(profile.HostedBy, "did:web:") { 798 // hostedBy must be did:web format for handle construction 799 // Return empty to trigger validation error in repository 800 return "" 801 } 802 instanceDomain := strings.TrimPrefix(profile.HostedBy, "did:web:") 803 return fmt.Sprintf("%s.community.%s", profile.Name, instanceDomain) 804} 805 806// extractContentVisibility extracts contentVisibility from subscription record with clamping 807// Returns default value of 3 if missing or invalid 808func extractContentVisibility(record map[string]interface{}) int { 809 const defaultVisibility = 3 810 811 cv, ok := record["contentVisibility"] 812 if !ok { 813 // Field missing - use default 814 return defaultVisibility 815 } 816 817 // JSON numbers decode as float64 818 cvFloat, ok := cv.(float64) 819 if !ok { 820 // Try int (shouldn't happen but handle gracefully) 821 if cvInt, isInt := cv.(int); isInt { 822 return clampContentVisibility(cvInt) 823 } 824 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv) 825 return defaultVisibility 826 } 827 828 // Convert and clamp 829 clamped := clampContentVisibility(int(cvFloat)) 830 if clamped != int(cvFloat) { 831 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped) 832 } 833 return clamped 834} 835 836// clampContentVisibility ensures value is within valid range (1-5) 837func clampContentVisibility(value int) int { 838 if value < 1 { 839 return 1 840 } 841 if value > 5 { 842 return 5 843 } 844 return value 845} 846 847// extractBlobCID extracts the CID from a blob reference 848// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123} 849func extractBlobCID(blob map[string]interface{}) (string, bool) { 850 if blob == nil { 851 return "", false 852 } 853 854 // Check if it's a blob type 855 blobType, ok := blob["$type"].(string) 856 if !ok || blobType != "blob" { 857 return "", false 858 } 859 860 // Extract ref 861 ref, ok := blob["ref"].(map[string]interface{}) 862 if !ok { 863 return "", false 864 } 865 866 // Extract $link (the CID) 867 link, ok := ref["$link"].(string) 868 if !ok { 869 return "", false 870 } 871 872 return link, true 873}