A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "net/http" 9 "strings" 10 "time" 11 12 "Coves/internal/atproto/identity" 13 "Coves/internal/atproto/utils" 14 "Coves/internal/core/communities" 15 16 lru "github.com/hashicorp/golang-lru/v2" 17 "golang.org/x/net/publicsuffix" 18 "golang.org/x/time/rate" 19) 20 21// CommunityEventConsumer consumes community-related events from Jetstream 22type CommunityEventConsumer struct { 23 repo communities.Repository // Repository for community operations 24 identityResolver interface { 25 Resolve(context.Context, string) (*identity.Identity, error) 26 } // For resolving handles from DIDs 27 httpClient *http.Client // Shared HTTP client with connection pooling 28 didCache *lru.Cache[string, cachedDIDDoc] // Bounded LRU cache for .well-known verification results 29 wellKnownLimiter *rate.Limiter // Rate limiter for .well-known fetches 30 instanceDID string // DID of this Coves instance 31 skipVerification bool // Skip did:web verification (for dev mode) 32} 33 34// cachedDIDDoc represents a cached verification result with expiration 35type cachedDIDDoc struct { 36 expiresAt time.Time // When this cache entry expires 37 valid bool // Whether verification passed 38} 39 40// NewCommunityEventConsumer creates a new Jetstream consumer for community events 41// instanceDID: The DID of this Coves instance (for hostedBy verification) 42// skipVerification: Skip did:web verification (for dev mode) 43// identityResolver: Optional resolver for resolving handles from DIDs (can be nil for tests) 44func NewCommunityEventConsumer(repo communities.Repository, instanceDID string, skipVerification bool, identityResolver interface { 45 Resolve(context.Context, string) (*identity.Identity, error) 46}, 47) *CommunityEventConsumer { 48 // Create bounded LRU cache for DID document verification results 49 // Max 1000 entries to prevent unbounded memory growth (PR review feedback) 50 // Each entry ~100 bytes → max ~100KB memory overhead 51 cache, err := lru.New[string, cachedDIDDoc](1000) 52 if err != nil { 53 // This should never happen with a valid size, but handle gracefully 54 log.Printf("WARNING: Failed to create DID cache, verification will be slower: %v", err) 55 // Create minimal cache to avoid nil pointer 56 cache, _ = lru.New[string, cachedDIDDoc](1) 57 } 58 59 return &CommunityEventConsumer{ 60 repo: repo, 61 identityResolver: identityResolver, // Optional - can be nil for tests 62 instanceDID: instanceDID, 63 skipVerification: skipVerification, 64 // Shared HTTP client with connection pooling for .well-known fetches 65 httpClient: &http.Client{ 66 Timeout: 10 * time.Second, 67 Transport: &http.Transport{ 68 MaxIdleConns: 100, 69 MaxIdleConnsPerHost: 10, 70 IdleConnTimeout: 90 * time.Second, 71 }, 72 }, 73 // Bounded LRU cache for .well-known verification results (max 1000 entries) 74 // Automatically evicts least-recently-used entries when full 75 didCache: cache, 76 // Rate limiter: 10 requests per second, burst of 20 77 // Prevents DoS via excessive .well-known fetches 78 wellKnownLimiter: rate.NewLimiter(10, 20), 79 } 80} 81 82// HandleEvent processes a Jetstream event for community records 83// This is called by the main Jetstream consumer when it receives commit events 84func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 85 // We only care about commit events for community records 86 if event.Kind != "commit" || event.Commit == nil { 87 return nil 88 } 89 90 commit := event.Commit 91 92 // Route to appropriate handler based on collection 93 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures 94 // - social.coves.community.profile: Community profile records (in community's own repo) 95 // - social.coves.community.subscription: Subscription records (in user's repo) 96 // - social.coves.community.block: Block records (in user's repo) 97 // 98 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints 99 // that CREATE or DELETE records in these collections 100 switch commit.Collection { 101 case "social.coves.community.profile": 102 return c.handleCommunityProfile(ctx, event.Did, commit) 103 case "social.coves.community.subscription": 104 // Handle both create (subscribe) and delete (unsubscribe) operations 105 return c.handleSubscription(ctx, event.Did, commit) 106 case "social.coves.community.block": 107 // Handle both create (block) and delete (unblock) operations 108 return c.handleBlock(ctx, event.Did, commit) 109 default: 110 // Not a community-related collection 111 return nil 112 } 113} 114 115// handleCommunityProfile processes community profile create/update/delete events 116func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error { 117 switch commit.Operation { 118 case "create": 119 return c.createCommunity(ctx, did, commit) 120 case "update": 121 return c.updateCommunity(ctx, did, commit) 122 case "delete": 123 return c.deleteCommunity(ctx, did) 124 default: 125 log.Printf("Unknown operation for community profile: %s", commit.Operation) 126 return nil 127 } 128} 129 130// createCommunity indexes a new community from the firehose 131func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error { 132 if commit.Record == nil { 133 return fmt.Errorf("community profile create event missing record data") 134 } 135 136 // Parse the community profile record 137 profile, err := parseCommunityProfile(commit.Record) 138 if err != nil { 139 return fmt.Errorf("failed to parse community profile: %w", err) 140 } 141 142 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs) 143 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID 144 if profile.Handle == "" { 145 if c.identityResolver != nil { 146 // Production: Resolve handle from PLC (source of truth) 147 // NO FALLBACK - if PLC is down, we fail and backfill later 148 // This prevents creating communities with incorrect handles in federated scenarios 149 identity, err := c.identityResolver.Resolve(ctx, did) 150 if err != nil { 151 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err) 152 } 153 profile.Handle = identity.Handle 154 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)", 155 profile.Handle, did, identity.Method) 156 } else { 157 // Test mode only: construct deterministically when no resolver available 158 profile.Handle = constructHandleFromProfile(profile) 159 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)", 160 profile.Handle, profile.Name, profile.HostedBy) 161 } 162 } 163 164 // SECURITY: Verify hostedBy claim matches handle domain 165 // This prevents malicious instances from claiming to host communities for domains they don't own 166 if err := c.verifyHostedByClaim(ctx, profile.Handle, profile.HostedBy); err != nil { 167 log.Printf("🚨 SECURITY: Rejecting community %s - hostedBy verification failed: %v", did, err) 168 log.Printf(" Handle: %s, HostedBy: %s", profile.Handle, profile.HostedBy) 169 return fmt.Errorf("hostedBy verification failed: %w", err) 170 } 171 172 // Build AT-URI for this record 173 // V2 Architecture (ONLY): 174 // - 'did' parameter IS the community DID (community owns its own repo) 175 // - rkey MUST be "self" for community profiles 176 // - URI: at://community_did/social.coves.community.profile/self 177 178 // REJECT non-V2 communities (pre-production: no V1 compatibility) 179 if commit.RKey != "self" { 180 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 181 } 182 183 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did) 184 185 // V2: Community ALWAYS owns itself 186 ownerDID := did 187 188 // Create community entity 189 community := &communities.Community{ 190 DID: did, // V2: Repository DID IS the community DID 191 Handle: profile.Handle, 192 Name: profile.Name, 193 DisplayName: profile.DisplayName, 194 Description: profile.Description, 195 OwnerDID: ownerDID, // V2: same as DID (self-owned) 196 CreatedByDID: profile.CreatedBy, 197 HostedByDID: profile.HostedBy, 198 Visibility: profile.Visibility, 199 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery, 200 ModerationType: profile.ModerationType, 201 ContentWarnings: profile.ContentWarnings, 202 MemberCount: profile.MemberCount, 203 SubscriberCount: profile.SubscriberCount, 204 FederatedFrom: profile.FederatedFrom, 205 FederatedID: profile.FederatedID, 206 CreatedAt: profile.CreatedAt, 207 UpdatedAt: time.Now(), 208 RecordURI: uri, 209 RecordCID: commit.CID, 210 } 211 212 // Handle blobs (avatar/banner) if present 213 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 214 community.AvatarCID = avatarCID 215 } 216 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 217 community.BannerCID = bannerCID 218 } 219 220 // Handle description facets (rich text) 221 if profile.DescriptionFacets != nil { 222 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 223 if marshalErr == nil { 224 community.DescriptionFacets = facetsJSON 225 } 226 } 227 228 // Index in AppView database 229 _, err = c.repo.Create(ctx, community) 230 if err != nil { 231 // Check if it already exists (idempotency) 232 if communities.IsConflict(err) { 233 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID) 234 return nil 235 } 236 return fmt.Errorf("failed to index community: %w", err) 237 } 238 239 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID) 240 return nil 241} 242 243// updateCommunity updates an existing community from the firehose 244func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error { 245 if commit.Record == nil { 246 return fmt.Errorf("community profile update event missing record data") 247 } 248 249 // REJECT non-V2 communities (pre-production: no V1 compatibility) 250 if commit.RKey != "self" { 251 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 252 } 253 254 // Parse profile 255 profile, err := parseCommunityProfile(commit.Record) 256 if err != nil { 257 return fmt.Errorf("failed to parse community profile: %w", err) 258 } 259 260 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs) 261 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID 262 if profile.Handle == "" { 263 if c.identityResolver != nil { 264 // Production: Resolve handle from PLC (source of truth) 265 // NO FALLBACK - if PLC is down, we fail and backfill later 266 // This prevents creating communities with incorrect handles in federated scenarios 267 identity, err := c.identityResolver.Resolve(ctx, did) 268 if err != nil { 269 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err) 270 } 271 profile.Handle = identity.Handle 272 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)", 273 profile.Handle, did, identity.Method) 274 } else { 275 // Test mode only: construct deterministically when no resolver available 276 profile.Handle = constructHandleFromProfile(profile) 277 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)", 278 profile.Handle, profile.Name, profile.HostedBy) 279 } 280 } 281 282 // V2: Repository DID IS the community DID 283 // Get existing community using the repo DID 284 existing, err := c.repo.GetByDID(ctx, did) 285 if err != nil { 286 if communities.IsNotFound(err) { 287 // Community doesn't exist yet - treat as create 288 log.Printf("Community not found for update, creating: %s", did) 289 return c.createCommunity(ctx, did, commit) 290 } 291 return fmt.Errorf("failed to get existing community: %w", err) 292 } 293 294 // Update fields 295 existing.Handle = profile.Handle 296 existing.Name = profile.Name 297 existing.DisplayName = profile.DisplayName 298 existing.Description = profile.Description 299 existing.Visibility = profile.Visibility 300 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery 301 existing.ModerationType = profile.ModerationType 302 existing.ContentWarnings = profile.ContentWarnings 303 existing.RecordCID = commit.CID 304 305 // Update blobs 306 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 307 existing.AvatarCID = avatarCID 308 } 309 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 310 existing.BannerCID = bannerCID 311 } 312 313 // Update description facets 314 if profile.DescriptionFacets != nil { 315 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 316 if marshalErr == nil { 317 existing.DescriptionFacets = facetsJSON 318 } 319 } 320 321 // Save updates 322 _, err = c.repo.Update(ctx, existing) 323 if err != nil { 324 return fmt.Errorf("failed to update community: %w", err) 325 } 326 327 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID) 328 return nil 329} 330 331// deleteCommunity removes a community from the index 332func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error { 333 err := c.repo.Delete(ctx, did) 334 if err != nil { 335 if communities.IsNotFound(err) { 336 log.Printf("Community already deleted: %s", did) 337 return nil 338 } 339 return fmt.Errorf("failed to delete community: %w", err) 340 } 341 342 log.Printf("Deleted community: %s", did) 343 return nil 344} 345 346// verifyHostedByClaim verifies that the community's hostedBy claim matches the handle domain 347// This prevents malicious instances from claiming to host communities for domains they don't own 348func (c *CommunityEventConsumer) verifyHostedByClaim(ctx context.Context, handle, hostedByDID string) error { 349 // Skip verification in dev mode 350 if c.skipVerification { 351 return nil 352 } 353 354 // Add 15 second overall timeout to prevent slow verification from blocking consumer (PR review feedback) 355 ctx, cancel := context.WithTimeout(ctx, 15*time.Second) 356 defer cancel() 357 358 // Verify hostedByDID is did:web format 359 if !strings.HasPrefix(hostedByDID, "did:web:") { 360 return fmt.Errorf("hostedByDID must use did:web method, got: %s", hostedByDID) 361 } 362 363 // Extract domain from did:web DID 364 hostedByDomain := strings.TrimPrefix(hostedByDID, "did:web:") 365 366 // Extract domain from community handle 367 // Handle format examples: 368 // - "!gaming@coves.social" → domain: "coves.social" 369 // - "gaming.community.coves.social" → domain: "coves.social" 370 handleDomain := extractDomainFromHandle(handle) 371 if handleDomain == "" { 372 return fmt.Errorf("failed to extract domain from handle: %s", handle) 373 } 374 375 // Verify handle domain matches hostedBy domain 376 if handleDomain != hostedByDomain { 377 return fmt.Errorf("handle domain (%s) doesn't match hostedBy domain (%s)", handleDomain, hostedByDomain) 378 } 379 380 // Optional: Verify DID document exists and is valid 381 // This provides cryptographic proof of domain ownership 382 if err := c.verifyDIDDocument(ctx, hostedByDID, hostedByDomain); err != nil { 383 // Soft-fail: Log warning but don't reject the community 384 // This allows operation during network issues or .well-known misconfiguration 385 log.Printf("⚠️ WARNING: DID document verification failed for %s: %v", hostedByDomain, err) 386 log.Printf(" Community will be indexed, but hostedBy claim cannot be cryptographically verified") 387 } 388 389 return nil 390} 391 392// verifyDIDDocument fetches and validates the DID document from .well-known/did.json 393// This provides cryptographic proof that the instance controls the domain 394// Results are cached with TTL and rate-limited to prevent DoS attacks 395func (c *CommunityEventConsumer) verifyDIDDocument(ctx context.Context, did, domain string) error { 396 // Skip verification in dev mode 397 if c.skipVerification { 398 return nil 399 } 400 401 // Check bounded LRU cache first (thread-safe, no locks needed) 402 if cached, ok := c.didCache.Get(did); ok { 403 // Check if cache entry is still valid (not expired) 404 if time.Now().Before(cached.expiresAt) { 405 if !cached.valid { 406 return fmt.Errorf("cached verification failure for %s", did) 407 } 408 log.Printf("✓ DID document verification (cached): %s", domain) 409 return nil 410 } 411 // Cache entry expired - remove it to free up space for fresh entries 412 c.didCache.Remove(did) 413 } 414 415 // Rate limit .well-known fetches to prevent DoS 416 if err := c.wellKnownLimiter.Wait(ctx); err != nil { 417 return fmt.Errorf("rate limit exceeded for .well-known fetch: %w", err) 418 } 419 420 // Construct .well-known URL 421 didDocURL := fmt.Sprintf("https://%s/.well-known/did.json", domain) 422 423 // Create HTTP request with timeout 424 req, err := http.NewRequestWithContext(ctx, "GET", didDocURL, nil) 425 if err != nil { 426 // Cache the failure 427 c.cacheVerificationResult(did, false, 5*time.Minute) 428 return fmt.Errorf("failed to create request: %w", err) 429 } 430 431 // Fetch DID document using shared HTTP client 432 resp, err := c.httpClient.Do(req) 433 if err != nil { 434 // Cache the failure (shorter TTL for network errors) 435 c.cacheVerificationResult(did, false, 5*time.Minute) 436 return fmt.Errorf("failed to fetch DID document from %s: %w", didDocURL, err) 437 } 438 defer func() { 439 if closeErr := resp.Body.Close(); closeErr != nil { 440 log.Printf("Failed to close response body: %v", closeErr) 441 } 442 }() 443 444 // Verify HTTP status 445 if resp.StatusCode != http.StatusOK { 446 // Cache the failure 447 c.cacheVerificationResult(did, false, 5*time.Minute) 448 return fmt.Errorf("DID document returned HTTP %d from %s", resp.StatusCode, didDocURL) 449 } 450 451 // Parse DID document 452 var didDoc struct { 453 ID string `json:"id"` 454 } 455 if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil { 456 // Cache the failure 457 c.cacheVerificationResult(did, false, 5*time.Minute) 458 return fmt.Errorf("failed to parse DID document JSON: %w", err) 459 } 460 461 // Verify DID document ID matches claimed DID 462 if didDoc.ID != did { 463 // Cache the failure 464 c.cacheVerificationResult(did, false, 5*time.Minute) 465 return fmt.Errorf("DID document ID (%s) doesn't match claimed DID (%s)", didDoc.ID, did) 466 } 467 468 // Cache the success (1 hour TTL) 469 c.cacheVerificationResult(did, true, 1*time.Hour) 470 471 log.Printf("✓ DID document verified: %s", domain) 472 return nil 473} 474 475// cacheVerificationResult stores a verification result in the bounded LRU cache with the given TTL 476// The LRU cache is thread-safe and automatically evicts least-recently-used entries when full 477func (c *CommunityEventConsumer) cacheVerificationResult(did string, valid bool, ttl time.Duration) { 478 c.didCache.Add(did, cachedDIDDoc{ 479 valid: valid, 480 expiresAt: time.Now().Add(ttl), 481 }) 482} 483 484// extractDomainFromHandle extracts the registrable domain from a community handle 485// Handles both formats: 486// - Bluesky-style: "!gaming@coves.social" → "coves.social" 487// - DNS-style: "gaming.community.coves.social" → "coves.social" 488// 489// Uses golang.org/x/net/publicsuffix to correctly handle multi-part TLDs: 490// - "gaming.community.coves.co.uk" → "coves.co.uk" (not "co.uk") 491// - "gaming.community.example.com.au" → "example.com.au" (not "com.au") 492func extractDomainFromHandle(handle string) string { 493 // Remove leading ! if present 494 handle = strings.TrimPrefix(handle, "!") 495 496 // Check for @-separated format (e.g., "gaming@coves.social") 497 if strings.Contains(handle, "@") { 498 parts := strings.Split(handle, "@") 499 if len(parts) == 2 { 500 domain := parts[1] 501 // Validate and extract eTLD+1 from the @-domain part 502 registrable, err := publicsuffix.EffectiveTLDPlusOne(domain) 503 if err != nil { 504 // If publicsuffix fails, fall back to returning the full domain part 505 // This handles edge cases like localhost, IP addresses, etc. 506 return domain 507 } 508 return registrable 509 } 510 return "" 511 } 512 513 // For DNS-style handles (e.g., "gaming.community.coves.social") 514 // Extract the registrable domain (eTLD+1) using publicsuffix 515 // This correctly handles multi-part TLDs like .co.uk, .com.au, etc. 516 registrable, err := publicsuffix.EffectiveTLDPlusOne(handle) 517 if err != nil { 518 // If publicsuffix fails (e.g., invalid TLD, localhost, IP address) 519 // fall back to naive extraction (last 2 parts) 520 // This maintains backward compatibility for edge cases 521 parts := strings.Split(handle, ".") 522 if len(parts) < 2 { 523 return "" // Invalid handle 524 } 525 return strings.Join(parts[len(parts)-2:], ".") 526 } 527 528 return registrable 529} 530 531// handleSubscription processes subscription create/delete events 532// CREATE operation = user subscribed to community 533// DELETE operation = user unsubscribed from community 534func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 535 switch commit.Operation { 536 case "create": 537 return c.createSubscription(ctx, userDID, commit) 538 case "delete": 539 return c.deleteSubscription(ctx, userDID, commit) 540 default: 541 // Update operations shouldn't happen on subscriptions, but ignore gracefully 542 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)", 543 commit.Operation, userDID, commit.RKey) 544 return nil 545 } 546} 547 548// createSubscription indexes a new subscription with retry logic 549func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 550 if commit.Record == nil { 551 return fmt.Errorf("subscription create event missing record data") 552 } 553 554 // Extract community DID from record's subject field (following atProto conventions) 555 communityDID, ok := commit.Record["subject"].(string) 556 if !ok { 557 return fmt.Errorf("subscription record missing subject field") 558 } 559 560 // Extract contentVisibility with clamping and default value 561 contentVisibility := extractContentVisibility(commit.Record) 562 563 // Build AT-URI for subscription record 564 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint 565 // The record lives in the USER's repository, but uses the communities namespace 566 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 567 568 // Create subscription entity 569 // Parse createdAt from record to preserve chronological ordering during replays 570 subscription := &communities.Subscription{ 571 UserDID: userDID, 572 CommunityDID: communityDID, 573 ContentVisibility: contentVisibility, 574 SubscribedAt: utils.ParseCreatedAt(commit.Record), 575 RecordURI: uri, 576 RecordCID: commit.CID, 577 } 578 579 // Use transactional method to ensure subscription and count are atomically updated 580 // This is idempotent - safe for Jetstream replays 581 _, err := c.repo.SubscribeWithCount(ctx, subscription) 582 if err != nil { 583 // If already exists, that's fine (idempotency) 584 if communities.IsConflict(err) { 585 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)", 586 userDID, communityDID, contentVisibility) 587 return nil 588 } 589 return fmt.Errorf("failed to index subscription: %w", err) 590 } 591 592 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)", 593 userDID, communityDID, contentVisibility) 594 return nil 595} 596 597// deleteSubscription removes a subscription from the index 598// DELETE operations don't include record data, so we need to look up the subscription 599// by its URI to find which community the user unsubscribed from 600func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 601 // Build AT-URI from the rkey 602 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 603 604 // Look up the subscription to get the community DID 605 // (DELETE operations don't include record data in Jetstream) 606 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri) 607 if err != nil { 608 if communities.IsNotFound(err) { 609 // Already deleted - this is fine (idempotency) 610 log.Printf("Subscription already deleted: %s", uri) 611 return nil 612 } 613 return fmt.Errorf("failed to find subscription for deletion: %w", err) 614 } 615 616 // Use transactional method to ensure unsubscribe and count are atomically updated 617 // This is idempotent - safe for Jetstream replays 618 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID) 619 if err != nil { 620 if communities.IsNotFound(err) { 621 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID) 622 return nil 623 } 624 return fmt.Errorf("failed to remove subscription: %w", err) 625 } 626 627 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID) 628 return nil 629} 630 631// handleBlock processes block create/delete events 632// CREATE operation = user blocked a community 633// DELETE operation = user unblocked a community 634func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 635 switch commit.Operation { 636 case "create": 637 return c.createBlock(ctx, userDID, commit) 638 case "delete": 639 return c.deleteBlock(ctx, userDID, commit) 640 default: 641 // Update operations shouldn't happen on blocks, but ignore gracefully 642 log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)", 643 commit.Operation, userDID, commit.RKey) 644 return nil 645 } 646} 647 648// createBlock indexes a new block 649func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 650 if commit.Record == nil { 651 return fmt.Errorf("block create event missing record data") 652 } 653 654 // Extract community DID from record's subject field (following atProto conventions) 655 communityDID, ok := commit.Record["subject"].(string) 656 if !ok { 657 return fmt.Errorf("block record missing subject field") 658 } 659 660 // Build AT-URI for block record 661 // The record lives in the USER's repository 662 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 663 664 // Create block entity 665 // Parse createdAt from record to preserve chronological ordering during replays 666 block := &communities.CommunityBlock{ 667 UserDID: userDID, 668 CommunityDID: communityDID, 669 BlockedAt: utils.ParseCreatedAt(commit.Record), 670 RecordURI: uri, 671 RecordCID: commit.CID, 672 } 673 674 // Index the block 675 // This is idempotent - safe for Jetstream replays 676 _, err := c.repo.BlockCommunity(ctx, block) 677 if err != nil { 678 // If already exists, that's fine (idempotency) 679 if communities.IsConflict(err) { 680 log.Printf("Block already indexed: %s -> %s", userDID, communityDID) 681 return nil 682 } 683 return fmt.Errorf("failed to index block: %w", err) 684 } 685 686 log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID) 687 return nil 688} 689 690// deleteBlock removes a block from the index 691// DELETE operations don't include record data, so we need to look up the block 692// by its URI to find which community the user unblocked 693func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 694 // Build AT-URI from the rkey 695 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 696 697 // Look up the block to get the community DID 698 // (DELETE operations don't include record data in Jetstream) 699 block, err := c.repo.GetBlockByURI(ctx, uri) 700 if err != nil { 701 if communities.IsNotFound(err) { 702 // Already deleted - this is fine (idempotency) 703 log.Printf("Block already deleted: %s", uri) 704 return nil 705 } 706 return fmt.Errorf("failed to find block for deletion: %w", err) 707 } 708 709 // Remove the block from the index 710 err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID) 711 if err != nil { 712 if communities.IsNotFound(err) { 713 log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID) 714 return nil 715 } 716 return fmt.Errorf("failed to remove block: %w", err) 717 } 718 719 log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID) 720 return nil 721} 722 723// Helper types and functions 724 725type CommunityProfile struct { 726 CreatedAt time.Time `json:"createdAt"` 727 Avatar map[string]interface{} `json:"avatar"` 728 Banner map[string]interface{} `json:"banner"` 729 CreatedBy string `json:"createdBy"` 730 Visibility string `json:"visibility"` 731 AtprotoHandle string `json:"atprotoHandle"` 732 DisplayName string `json:"displayName"` 733 Name string `json:"name"` 734 Handle string `json:"handle"` 735 HostedBy string `json:"hostedBy"` 736 Description string `json:"description"` 737 FederatedID string `json:"federatedId"` 738 ModerationType string `json:"moderationType"` 739 FederatedFrom string `json:"federatedFrom"` 740 ContentWarnings []string `json:"contentWarnings"` 741 DescriptionFacets []interface{} `json:"descriptionFacets"` 742 MemberCount int `json:"memberCount"` 743 SubscriberCount int `json:"subscriberCount"` 744 Federation FederationConfig `json:"federation"` 745} 746 747type FederationConfig struct { 748 AllowExternalDiscovery bool `json:"allowExternalDiscovery"` 749} 750 751// parseCommunityProfile converts a raw record map to a CommunityProfile 752func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) { 753 recordJSON, err := json.Marshal(record) 754 if err != nil { 755 return nil, fmt.Errorf("failed to marshal record: %w", err) 756 } 757 758 var profile CommunityProfile 759 if err := json.Unmarshal(recordJSON, &profile); err != nil { 760 return nil, fmt.Errorf("failed to unmarshal profile: %w", err) 761 } 762 763 return &profile, nil 764} 765 766// constructHandleFromProfile constructs a deterministic handle from profile data 767// Format: {name}.community.{instanceDomain} 768// Example: gaming.community.coves.social 769// This is ONLY used in test mode (when identity resolver is nil) 770// Production MUST resolve handles from PLC (source of truth) 771// Returns empty string if hostedBy is not did:web format (caller will fail validation) 772func constructHandleFromProfile(profile *CommunityProfile) string { 773 if !strings.HasPrefix(profile.HostedBy, "did:web:") { 774 // hostedBy must be did:web format for handle construction 775 // Return empty to trigger validation error in repository 776 return "" 777 } 778 instanceDomain := strings.TrimPrefix(profile.HostedBy, "did:web:") 779 return fmt.Sprintf("%s.community.%s", profile.Name, instanceDomain) 780} 781 782// extractContentVisibility extracts contentVisibility from subscription record with clamping 783// Returns default value of 3 if missing or invalid 784func extractContentVisibility(record map[string]interface{}) int { 785 const defaultVisibility = 3 786 787 cv, ok := record["contentVisibility"] 788 if !ok { 789 // Field missing - use default 790 return defaultVisibility 791 } 792 793 // JSON numbers decode as float64 794 cvFloat, ok := cv.(float64) 795 if !ok { 796 // Try int (shouldn't happen but handle gracefully) 797 if cvInt, isInt := cv.(int); isInt { 798 return clampContentVisibility(cvInt) 799 } 800 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv) 801 return defaultVisibility 802 } 803 804 // Convert and clamp 805 clamped := clampContentVisibility(int(cvFloat)) 806 if clamped != int(cvFloat) { 807 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped) 808 } 809 return clamped 810} 811 812// clampContentVisibility ensures value is within valid range (1-5) 813func clampContentVisibility(value int) int { 814 if value < 1 { 815 return 1 816 } 817 if value > 5 { 818 return 5 819 } 820 return value 821} 822 823// extractBlobCID extracts the CID from a blob reference 824// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123} 825func extractBlobCID(blob map[string]interface{}) (string, bool) { 826 if blob == nil { 827 return "", false 828 } 829 830 // Check if it's a blob type 831 blobType, ok := blob["$type"].(string) 832 if !ok || blobType != "blob" { 833 return "", false 834 } 835 836 // Extract ref 837 ref, ok := blob["ref"].(map[string]interface{}) 838 if !ok { 839 return "", false 840 } 841 842 // Extract $link (the CID) 843 link, ok := ref["$link"].(string) 844 if !ok { 845 return "", false 846 } 847 848 return link, true 849}