A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/atproto/identity" 5 "Coves/internal/atproto/utils" 6 "Coves/internal/core/communities" 7 "context" 8 "encoding/json" 9 "fmt" 10 "log" 11 "net/http" 12 "strings" 13 "time" 14 15 lru "github.com/hashicorp/golang-lru/v2" 16 "golang.org/x/net/publicsuffix" 17 "golang.org/x/time/rate" 18) 19 20// CommunityEventConsumer consumes community-related events from Jetstream 21type CommunityEventConsumer struct { 22 repo communities.Repository // Repository for community operations 23 identityResolver interface { 24 Resolve(context.Context, string) (*identity.Identity, error) 25 } // For resolving handles from DIDs 26 httpClient *http.Client // Shared HTTP client with connection pooling 27 didCache *lru.Cache[string, cachedDIDDoc] // Bounded LRU cache for .well-known verification results 28 wellKnownLimiter *rate.Limiter // Rate limiter for .well-known fetches 29 instanceDID string // DID of this Coves instance 30 skipVerification bool // Skip did:web verification (for dev mode) 31} 32 33// cachedDIDDoc represents a cached verification result with expiration 34type cachedDIDDoc struct { 35 expiresAt time.Time // When this cache entry expires 36 valid bool // Whether verification passed 37} 38 39// NewCommunityEventConsumer creates a new Jetstream consumer for community events 40// instanceDID: The DID of this Coves instance (for hostedBy verification) 41// skipVerification: Skip did:web verification (for dev mode) 42// identityResolver: Optional resolver for resolving handles from DIDs (can be nil for tests) 43func NewCommunityEventConsumer(repo communities.Repository, instanceDID string, skipVerification bool, identityResolver interface { 44 Resolve(context.Context, string) (*identity.Identity, error) 45}, 46) *CommunityEventConsumer { 47 // Create bounded LRU cache for DID document verification results 48 // Max 1000 entries to prevent unbounded memory growth (PR review feedback) 49 // Each entry ~100 bytes → max ~100KB memory overhead 50 cache, err := lru.New[string, cachedDIDDoc](1000) 51 if err != nil { 52 // This should never happen with a valid size, but handle gracefully 53 log.Printf("WARNING: Failed to create DID cache, verification will be slower: %v", err) 54 // Create minimal cache to avoid nil pointer 55 cache, _ = lru.New[string, cachedDIDDoc](1) 56 } 57 58 return &CommunityEventConsumer{ 59 repo: repo, 60 identityResolver: identityResolver, // Optional - can be nil for tests 61 instanceDID: instanceDID, 62 skipVerification: skipVerification, 63 // Shared HTTP client with connection pooling for .well-known fetches 64 httpClient: &http.Client{ 65 Timeout: 10 * time.Second, 66 Transport: &http.Transport{ 67 MaxIdleConns: 100, 68 MaxIdleConnsPerHost: 10, 69 IdleConnTimeout: 90 * time.Second, 70 }, 71 }, 72 // Bounded LRU cache for .well-known verification results (max 1000 entries) 73 // Automatically evicts least-recently-used entries when full 74 didCache: cache, 75 // Rate limiter: 10 requests per second, burst of 20 76 // Prevents DoS via excessive .well-known fetches 77 wellKnownLimiter: rate.NewLimiter(10, 20), 78 } 79} 80 81// HandleEvent processes a Jetstream event for community records 82// This is called by the main Jetstream consumer when it receives commit events 83func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 84 // We only care about commit events for community records 85 if event.Kind != "commit" || event.Commit == nil { 86 return nil 87 } 88 89 commit := event.Commit 90 91 // Route to appropriate handler based on collection 92 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures 93 // - social.coves.community.profile: Community profile records (in community's own repo) 94 // - social.coves.community.subscription: Subscription records (in user's repo) 95 // - social.coves.community.block: Block records (in user's repo) 96 // 97 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints 98 // that CREATE or DELETE records in these collections 99 switch commit.Collection { 100 case "social.coves.community.profile": 101 return c.handleCommunityProfile(ctx, event.Did, commit) 102 case "social.coves.community.subscription": 103 // Handle both create (subscribe) and delete (unsubscribe) operations 104 return c.handleSubscription(ctx, event.Did, commit) 105 case "social.coves.community.block": 106 // Handle both create (block) and delete (unblock) operations 107 return c.handleBlock(ctx, event.Did, commit) 108 default: 109 // Not a community-related collection 110 return nil 111 } 112} 113 114// handleCommunityProfile processes community profile create/update/delete events 115func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error { 116 switch commit.Operation { 117 case "create": 118 return c.createCommunity(ctx, did, commit) 119 case "update": 120 return c.updateCommunity(ctx, did, commit) 121 case "delete": 122 return c.deleteCommunity(ctx, did) 123 default: 124 log.Printf("Unknown operation for community profile: %s", commit.Operation) 125 return nil 126 } 127} 128 129// createCommunity indexes a new community from the firehose 130func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error { 131 if commit.Record == nil { 132 return fmt.Errorf("community profile create event missing record data") 133 } 134 135 // Parse the community profile record 136 profile, err := parseCommunityProfile(commit.Record) 137 if err != nil { 138 return fmt.Errorf("failed to parse community profile: %w", err) 139 } 140 141 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs) 142 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID 143 if profile.Handle == "" { 144 if c.identityResolver != nil { 145 // Production: Resolve handle from PLC (source of truth) 146 // NO FALLBACK - if PLC is down, we fail and backfill later 147 // This prevents creating communities with incorrect handles in federated scenarios 148 identity, err := c.identityResolver.Resolve(ctx, did) 149 if err != nil { 150 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err) 151 } 152 profile.Handle = identity.Handle 153 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)", 154 profile.Handle, did, identity.Method) 155 } else { 156 // Test mode only: construct deterministically when no resolver available 157 profile.Handle = constructHandleFromProfile(profile) 158 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)", 159 profile.Handle, profile.Name, profile.HostedBy) 160 } 161 } 162 163 // SECURITY: Verify hostedBy claim matches handle domain 164 // This prevents malicious instances from claiming to host communities for domains they don't own 165 if err := c.verifyHostedByClaim(ctx, profile.Handle, profile.HostedBy); err != nil { 166 log.Printf("🚨 SECURITY: Rejecting community %s - hostedBy verification failed: %v", did, err) 167 log.Printf(" Handle: %s, HostedBy: %s", profile.Handle, profile.HostedBy) 168 return fmt.Errorf("hostedBy verification failed: %w", err) 169 } 170 171 // Build AT-URI for this record 172 // V2 Architecture (ONLY): 173 // - 'did' parameter IS the community DID (community owns its own repo) 174 // - rkey MUST be "self" for community profiles 175 // - URI: at://community_did/social.coves.community.profile/self 176 177 // REJECT non-V2 communities (pre-production: no V1 compatibility) 178 if commit.RKey != "self" { 179 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 180 } 181 182 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did) 183 184 // V2: Community ALWAYS owns itself 185 ownerDID := did 186 187 // Create community entity 188 community := &communities.Community{ 189 DID: did, // V2: Repository DID IS the community DID 190 Handle: profile.Handle, 191 Name: profile.Name, 192 DisplayName: profile.DisplayName, 193 Description: profile.Description, 194 OwnerDID: ownerDID, // V2: same as DID (self-owned) 195 CreatedByDID: profile.CreatedBy, 196 HostedByDID: profile.HostedBy, 197 Visibility: profile.Visibility, 198 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery, 199 ModerationType: profile.ModerationType, 200 ContentWarnings: profile.ContentWarnings, 201 MemberCount: profile.MemberCount, 202 SubscriberCount: profile.SubscriberCount, 203 FederatedFrom: profile.FederatedFrom, 204 FederatedID: profile.FederatedID, 205 CreatedAt: profile.CreatedAt, 206 UpdatedAt: time.Now(), 207 RecordURI: uri, 208 RecordCID: commit.CID, 209 } 210 211 // Handle blobs (avatar/banner) if present 212 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 213 community.AvatarCID = avatarCID 214 } 215 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 216 community.BannerCID = bannerCID 217 } 218 219 // Handle description facets (rich text) 220 if profile.DescriptionFacets != nil { 221 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 222 if marshalErr == nil { 223 community.DescriptionFacets = facetsJSON 224 } 225 } 226 227 // Index in AppView database 228 _, err = c.repo.Create(ctx, community) 229 if err != nil { 230 // Check if it already exists (idempotency) 231 if communities.IsConflict(err) { 232 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID) 233 return nil 234 } 235 return fmt.Errorf("failed to index community: %w", err) 236 } 237 238 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID) 239 return nil 240} 241 242// updateCommunity updates an existing community from the firehose 243func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error { 244 if commit.Record == nil { 245 return fmt.Errorf("community profile update event missing record data") 246 } 247 248 // REJECT non-V2 communities (pre-production: no V1 compatibility) 249 if commit.RKey != "self" { 250 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 251 } 252 253 // Parse profile 254 profile, err := parseCommunityProfile(commit.Record) 255 if err != nil { 256 return fmt.Errorf("failed to parse community profile: %w", err) 257 } 258 259 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs) 260 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID 261 if profile.Handle == "" { 262 if c.identityResolver != nil { 263 // Production: Resolve handle from PLC (source of truth) 264 // NO FALLBACK - if PLC is down, we fail and backfill later 265 // This prevents creating communities with incorrect handles in federated scenarios 266 identity, err := c.identityResolver.Resolve(ctx, did) 267 if err != nil { 268 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err) 269 } 270 profile.Handle = identity.Handle 271 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)", 272 profile.Handle, did, identity.Method) 273 } else { 274 // Test mode only: construct deterministically when no resolver available 275 profile.Handle = constructHandleFromProfile(profile) 276 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)", 277 profile.Handle, profile.Name, profile.HostedBy) 278 } 279 } 280 281 // V2: Repository DID IS the community DID 282 // Get existing community using the repo DID 283 existing, err := c.repo.GetByDID(ctx, did) 284 if err != nil { 285 if communities.IsNotFound(err) { 286 // Community doesn't exist yet - treat as create 287 log.Printf("Community not found for update, creating: %s", did) 288 return c.createCommunity(ctx, did, commit) 289 } 290 return fmt.Errorf("failed to get existing community: %w", err) 291 } 292 293 // Update fields 294 existing.Handle = profile.Handle 295 existing.Name = profile.Name 296 existing.DisplayName = profile.DisplayName 297 existing.Description = profile.Description 298 existing.Visibility = profile.Visibility 299 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery 300 existing.ModerationType = profile.ModerationType 301 existing.ContentWarnings = profile.ContentWarnings 302 existing.RecordCID = commit.CID 303 304 // Update blobs 305 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 306 existing.AvatarCID = avatarCID 307 } 308 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 309 existing.BannerCID = bannerCID 310 } 311 312 // Update description facets 313 if profile.DescriptionFacets != nil { 314 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 315 if marshalErr == nil { 316 existing.DescriptionFacets = facetsJSON 317 } 318 } 319 320 // Save updates 321 _, err = c.repo.Update(ctx, existing) 322 if err != nil { 323 return fmt.Errorf("failed to update community: %w", err) 324 } 325 326 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID) 327 return nil 328} 329 330// deleteCommunity removes a community from the index 331func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error { 332 err := c.repo.Delete(ctx, did) 333 if err != nil { 334 if communities.IsNotFound(err) { 335 log.Printf("Community already deleted: %s", did) 336 return nil 337 } 338 return fmt.Errorf("failed to delete community: %w", err) 339 } 340 341 log.Printf("Deleted community: %s", did) 342 return nil 343} 344 345// verifyHostedByClaim verifies that the community's hostedBy claim matches the handle domain 346// This prevents malicious instances from claiming to host communities for domains they don't own 347func (c *CommunityEventConsumer) verifyHostedByClaim(ctx context.Context, handle, hostedByDID string) error { 348 // Skip verification in dev mode 349 if c.skipVerification { 350 return nil 351 } 352 353 // Add 15 second overall timeout to prevent slow verification from blocking consumer (PR review feedback) 354 ctx, cancel := context.WithTimeout(ctx, 15*time.Second) 355 defer cancel() 356 357 // Verify hostedByDID is did:web format 358 if !strings.HasPrefix(hostedByDID, "did:web:") { 359 return fmt.Errorf("hostedByDID must use did:web method, got: %s", hostedByDID) 360 } 361 362 // Extract domain from did:web DID 363 hostedByDomain := strings.TrimPrefix(hostedByDID, "did:web:") 364 365 // Extract domain from community handle 366 // Handle format examples: 367 // - "!gaming@coves.social" → domain: "coves.social" 368 // - "gaming.community.coves.social" → domain: "coves.social" 369 handleDomain := extractDomainFromHandle(handle) 370 if handleDomain == "" { 371 return fmt.Errorf("failed to extract domain from handle: %s", handle) 372 } 373 374 // Verify handle domain matches hostedBy domain 375 if handleDomain != hostedByDomain { 376 return fmt.Errorf("handle domain (%s) doesn't match hostedBy domain (%s)", handleDomain, hostedByDomain) 377 } 378 379 // Optional: Verify DID document exists and is valid 380 // This provides cryptographic proof of domain ownership 381 if err := c.verifyDIDDocument(ctx, hostedByDID, hostedByDomain); err != nil { 382 // Soft-fail: Log warning but don't reject the community 383 // This allows operation during network issues or .well-known misconfiguration 384 log.Printf("⚠️ WARNING: DID document verification failed for %s: %v", hostedByDomain, err) 385 log.Printf(" Community will be indexed, but hostedBy claim cannot be cryptographically verified") 386 } 387 388 return nil 389} 390 391// verifyDIDDocument fetches and validates the DID document from .well-known/did.json 392// This provides cryptographic proof that the instance controls the domain 393// Results are cached with TTL and rate-limited to prevent DoS attacks 394func (c *CommunityEventConsumer) verifyDIDDocument(ctx context.Context, did, domain string) error { 395 // Skip verification in dev mode 396 if c.skipVerification { 397 return nil 398 } 399 400 // Check bounded LRU cache first (thread-safe, no locks needed) 401 if cached, ok := c.didCache.Get(did); ok { 402 // Check if cache entry is still valid (not expired) 403 if time.Now().Before(cached.expiresAt) { 404 if !cached.valid { 405 return fmt.Errorf("cached verification failure for %s", did) 406 } 407 log.Printf("✓ DID document verification (cached): %s", domain) 408 return nil 409 } 410 // Cache entry expired - remove it to free up space for fresh entries 411 c.didCache.Remove(did) 412 } 413 414 // Rate limit .well-known fetches to prevent DoS 415 if err := c.wellKnownLimiter.Wait(ctx); err != nil { 416 return fmt.Errorf("rate limit exceeded for .well-known fetch: %w", err) 417 } 418 419 // Construct .well-known URL 420 didDocURL := fmt.Sprintf("https://%s/.well-known/did.json", domain) 421 422 // Create HTTP request with timeout 423 req, err := http.NewRequestWithContext(ctx, "GET", didDocURL, nil) 424 if err != nil { 425 // Cache the failure 426 c.cacheVerificationResult(did, false, 5*time.Minute) 427 return fmt.Errorf("failed to create request: %w", err) 428 } 429 430 // Fetch DID document using shared HTTP client 431 resp, err := c.httpClient.Do(req) 432 if err != nil { 433 // Cache the failure (shorter TTL for network errors) 434 c.cacheVerificationResult(did, false, 5*time.Minute) 435 return fmt.Errorf("failed to fetch DID document from %s: %w", didDocURL, err) 436 } 437 defer func() { 438 if closeErr := resp.Body.Close(); closeErr != nil { 439 log.Printf("Failed to close response body: %v", closeErr) 440 } 441 }() 442 443 // Verify HTTP status 444 if resp.StatusCode != http.StatusOK { 445 // Cache the failure 446 c.cacheVerificationResult(did, false, 5*time.Minute) 447 return fmt.Errorf("DID document returned HTTP %d from %s", resp.StatusCode, didDocURL) 448 } 449 450 // Parse DID document 451 var didDoc struct { 452 ID string `json:"id"` 453 } 454 if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil { 455 // Cache the failure 456 c.cacheVerificationResult(did, false, 5*time.Minute) 457 return fmt.Errorf("failed to parse DID document JSON: %w", err) 458 } 459 460 // Verify DID document ID matches claimed DID 461 if didDoc.ID != did { 462 // Cache the failure 463 c.cacheVerificationResult(did, false, 5*time.Minute) 464 return fmt.Errorf("DID document ID (%s) doesn't match claimed DID (%s)", didDoc.ID, did) 465 } 466 467 // Cache the success (1 hour TTL) 468 c.cacheVerificationResult(did, true, 1*time.Hour) 469 470 log.Printf("✓ DID document verified: %s", domain) 471 return nil 472} 473 474// cacheVerificationResult stores a verification result in the bounded LRU cache with the given TTL 475// The LRU cache is thread-safe and automatically evicts least-recently-used entries when full 476func (c *CommunityEventConsumer) cacheVerificationResult(did string, valid bool, ttl time.Duration) { 477 c.didCache.Add(did, cachedDIDDoc{ 478 valid: valid, 479 expiresAt: time.Now().Add(ttl), 480 }) 481} 482 483// extractDomainFromHandle extracts the registrable domain from a community handle 484// Handles both formats: 485// - Bluesky-style: "!gaming@coves.social" → "coves.social" 486// - DNS-style: "gaming.community.coves.social" → "coves.social" 487// 488// Uses golang.org/x/net/publicsuffix to correctly handle multi-part TLDs: 489// - "gaming.community.coves.co.uk" → "coves.co.uk" (not "co.uk") 490// - "gaming.community.example.com.au" → "example.com.au" (not "com.au") 491func extractDomainFromHandle(handle string) string { 492 // Remove leading ! if present 493 handle = strings.TrimPrefix(handle, "!") 494 495 // Check for @-separated format (e.g., "gaming@coves.social") 496 if strings.Contains(handle, "@") { 497 parts := strings.Split(handle, "@") 498 if len(parts) == 2 { 499 domain := parts[1] 500 // Validate and extract eTLD+1 from the @-domain part 501 registrable, err := publicsuffix.EffectiveTLDPlusOne(domain) 502 if err != nil { 503 // If publicsuffix fails, fall back to returning the full domain part 504 // This handles edge cases like localhost, IP addresses, etc. 505 return domain 506 } 507 return registrable 508 } 509 return "" 510 } 511 512 // For DNS-style handles (e.g., "gaming.community.coves.social") 513 // Extract the registrable domain (eTLD+1) using publicsuffix 514 // This correctly handles multi-part TLDs like .co.uk, .com.au, etc. 515 registrable, err := publicsuffix.EffectiveTLDPlusOne(handle) 516 if err != nil { 517 // If publicsuffix fails (e.g., invalid TLD, localhost, IP address) 518 // fall back to naive extraction (last 2 parts) 519 // This maintains backward compatibility for edge cases 520 parts := strings.Split(handle, ".") 521 if len(parts) < 2 { 522 return "" // Invalid handle 523 } 524 return strings.Join(parts[len(parts)-2:], ".") 525 } 526 527 return registrable 528} 529 530// handleSubscription processes subscription create/delete events 531// CREATE operation = user subscribed to community 532// DELETE operation = user unsubscribed from community 533func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 534 switch commit.Operation { 535 case "create": 536 return c.createSubscription(ctx, userDID, commit) 537 case "delete": 538 return c.deleteSubscription(ctx, userDID, commit) 539 default: 540 // Update operations shouldn't happen on subscriptions, but ignore gracefully 541 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)", 542 commit.Operation, userDID, commit.RKey) 543 return nil 544 } 545} 546 547// createSubscription indexes a new subscription with retry logic 548func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 549 if commit.Record == nil { 550 return fmt.Errorf("subscription create event missing record data") 551 } 552 553 // Extract community DID from record's subject field (following atProto conventions) 554 communityDID, ok := commit.Record["subject"].(string) 555 if !ok { 556 return fmt.Errorf("subscription record missing subject field") 557 } 558 559 // Extract contentVisibility with clamping and default value 560 contentVisibility := extractContentVisibility(commit.Record) 561 562 // Build AT-URI for subscription record 563 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint 564 // The record lives in the USER's repository, but uses the communities namespace 565 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 566 567 // Create subscription entity 568 // Parse createdAt from record to preserve chronological ordering during replays 569 subscription := &communities.Subscription{ 570 UserDID: userDID, 571 CommunityDID: communityDID, 572 ContentVisibility: contentVisibility, 573 SubscribedAt: utils.ParseCreatedAt(commit.Record), 574 RecordURI: uri, 575 RecordCID: commit.CID, 576 } 577 578 // Use transactional method to ensure subscription and count are atomically updated 579 // This is idempotent - safe for Jetstream replays 580 _, err := c.repo.SubscribeWithCount(ctx, subscription) 581 if err != nil { 582 // If already exists, that's fine (idempotency) 583 if communities.IsConflict(err) { 584 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)", 585 userDID, communityDID, contentVisibility) 586 return nil 587 } 588 return fmt.Errorf("failed to index subscription: %w", err) 589 } 590 591 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)", 592 userDID, communityDID, contentVisibility) 593 return nil 594} 595 596// deleteSubscription removes a subscription from the index 597// DELETE operations don't include record data, so we need to look up the subscription 598// by its URI to find which community the user unsubscribed from 599func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 600 // Build AT-URI from the rkey 601 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 602 603 // Look up the subscription to get the community DID 604 // (DELETE operations don't include record data in Jetstream) 605 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri) 606 if err != nil { 607 if communities.IsNotFound(err) { 608 // Already deleted - this is fine (idempotency) 609 log.Printf("Subscription already deleted: %s", uri) 610 return nil 611 } 612 return fmt.Errorf("failed to find subscription for deletion: %w", err) 613 } 614 615 // Use transactional method to ensure unsubscribe and count are atomically updated 616 // This is idempotent - safe for Jetstream replays 617 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID) 618 if err != nil { 619 if communities.IsNotFound(err) { 620 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID) 621 return nil 622 } 623 return fmt.Errorf("failed to remove subscription: %w", err) 624 } 625 626 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID) 627 return nil 628} 629 630// handleBlock processes block create/delete events 631// CREATE operation = user blocked a community 632// DELETE operation = user unblocked a community 633func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 634 switch commit.Operation { 635 case "create": 636 return c.createBlock(ctx, userDID, commit) 637 case "delete": 638 return c.deleteBlock(ctx, userDID, commit) 639 default: 640 // Update operations shouldn't happen on blocks, but ignore gracefully 641 log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)", 642 commit.Operation, userDID, commit.RKey) 643 return nil 644 } 645} 646 647// createBlock indexes a new block 648func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 649 if commit.Record == nil { 650 return fmt.Errorf("block create event missing record data") 651 } 652 653 // Extract community DID from record's subject field (following atProto conventions) 654 communityDID, ok := commit.Record["subject"].(string) 655 if !ok { 656 return fmt.Errorf("block record missing subject field") 657 } 658 659 // Build AT-URI for block record 660 // The record lives in the USER's repository 661 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 662 663 // Create block entity 664 // Parse createdAt from record to preserve chronological ordering during replays 665 block := &communities.CommunityBlock{ 666 UserDID: userDID, 667 CommunityDID: communityDID, 668 BlockedAt: utils.ParseCreatedAt(commit.Record), 669 RecordURI: uri, 670 RecordCID: commit.CID, 671 } 672 673 // Index the block 674 // This is idempotent - safe for Jetstream replays 675 _, err := c.repo.BlockCommunity(ctx, block) 676 if err != nil { 677 // If already exists, that's fine (idempotency) 678 if communities.IsConflict(err) { 679 log.Printf("Block already indexed: %s -> %s", userDID, communityDID) 680 return nil 681 } 682 return fmt.Errorf("failed to index block: %w", err) 683 } 684 685 log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID) 686 return nil 687} 688 689// deleteBlock removes a block from the index 690// DELETE operations don't include record data, so we need to look up the block 691// by its URI to find which community the user unblocked 692func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 693 // Build AT-URI from the rkey 694 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 695 696 // Look up the block to get the community DID 697 // (DELETE operations don't include record data in Jetstream) 698 block, err := c.repo.GetBlockByURI(ctx, uri) 699 if err != nil { 700 if communities.IsNotFound(err) { 701 // Already deleted - this is fine (idempotency) 702 log.Printf("Block already deleted: %s", uri) 703 return nil 704 } 705 return fmt.Errorf("failed to find block for deletion: %w", err) 706 } 707 708 // Remove the block from the index 709 err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID) 710 if err != nil { 711 if communities.IsNotFound(err) { 712 log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID) 713 return nil 714 } 715 return fmt.Errorf("failed to remove block: %w", err) 716 } 717 718 log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID) 719 return nil 720} 721 722// Helper types and functions 723 724type CommunityProfile struct { 725 CreatedAt time.Time `json:"createdAt"` 726 Avatar map[string]interface{} `json:"avatar"` 727 Banner map[string]interface{} `json:"banner"` 728 CreatedBy string `json:"createdBy"` 729 Visibility string `json:"visibility"` 730 AtprotoHandle string `json:"atprotoHandle"` 731 DisplayName string `json:"displayName"` 732 Name string `json:"name"` 733 Handle string `json:"handle"` 734 HostedBy string `json:"hostedBy"` 735 Description string `json:"description"` 736 FederatedID string `json:"federatedId"` 737 ModerationType string `json:"moderationType"` 738 FederatedFrom string `json:"federatedFrom"` 739 ContentWarnings []string `json:"contentWarnings"` 740 DescriptionFacets []interface{} `json:"descriptionFacets"` 741 MemberCount int `json:"memberCount"` 742 SubscriberCount int `json:"subscriberCount"` 743 Federation FederationConfig `json:"federation"` 744} 745 746type FederationConfig struct { 747 AllowExternalDiscovery bool `json:"allowExternalDiscovery"` 748} 749 750// parseCommunityProfile converts a raw record map to a CommunityProfile 751func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) { 752 recordJSON, err := json.Marshal(record) 753 if err != nil { 754 return nil, fmt.Errorf("failed to marshal record: %w", err) 755 } 756 757 var profile CommunityProfile 758 if err := json.Unmarshal(recordJSON, &profile); err != nil { 759 return nil, fmt.Errorf("failed to unmarshal profile: %w", err) 760 } 761 762 return &profile, nil 763} 764 765// constructHandleFromProfile constructs a deterministic handle from profile data 766// Format: {name}.community.{instanceDomain} 767// Example: gaming.community.coves.social 768// This is ONLY used in test mode (when identity resolver is nil) 769// Production MUST resolve handles from PLC (source of truth) 770// Returns empty string if hostedBy is not did:web format (caller will fail validation) 771func constructHandleFromProfile(profile *CommunityProfile) string { 772 if !strings.HasPrefix(profile.HostedBy, "did:web:") { 773 // hostedBy must be did:web format for handle construction 774 // Return empty to trigger validation error in repository 775 return "" 776 } 777 instanceDomain := strings.TrimPrefix(profile.HostedBy, "did:web:") 778 return fmt.Sprintf("%s.community.%s", profile.Name, instanceDomain) 779} 780 781// extractContentVisibility extracts contentVisibility from subscription record with clamping 782// Returns default value of 3 if missing or invalid 783func extractContentVisibility(record map[string]interface{}) int { 784 const defaultVisibility = 3 785 786 cv, ok := record["contentVisibility"] 787 if !ok { 788 // Field missing - use default 789 return defaultVisibility 790 } 791 792 // JSON numbers decode as float64 793 cvFloat, ok := cv.(float64) 794 if !ok { 795 // Try int (shouldn't happen but handle gracefully) 796 if cvInt, isInt := cv.(int); isInt { 797 return clampContentVisibility(cvInt) 798 } 799 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv) 800 return defaultVisibility 801 } 802 803 // Convert and clamp 804 clamped := clampContentVisibility(int(cvFloat)) 805 if clamped != int(cvFloat) { 806 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped) 807 } 808 return clamped 809} 810 811// clampContentVisibility ensures value is within valid range (1-5) 812func clampContentVisibility(value int) int { 813 if value < 1 { 814 return 1 815 } 816 if value > 5 { 817 return 5 818 } 819 return value 820} 821 822// extractBlobCID extracts the CID from a blob reference 823// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123} 824func extractBlobCID(blob map[string]interface{}) (string, bool) { 825 if blob == nil { 826 return "", false 827 } 828 829 // Check if it's a blob type 830 blobType, ok := blob["$type"].(string) 831 if !ok || blobType != "blob" { 832 return "", false 833 } 834 835 // Extract ref 836 ref, ok := blob["ref"].(map[string]interface{}) 837 if !ok { 838 return "", false 839 } 840 841 // Extract $link (the CID) 842 link, ok := ref["$link"].(string) 843 if !ok { 844 return "", false 845 } 846 847 return link, true 848}