A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/atproto/identity" 5 "Coves/internal/atproto/utils" 6 "Coves/internal/core/communities" 7 "context" 8 "encoding/json" 9 "fmt" 10 "log" 11 "net/http" 12 "strings" 13 "time" 14 15 lru "github.com/hashicorp/golang-lru/v2" 16 "golang.org/x/net/publicsuffix" 17 "golang.org/x/time/rate" 18) 19 20// CommunityEventConsumer consumes community-related events from Jetstream 21type CommunityEventConsumer struct { 22 repo communities.Repository // Repository for community operations 23 identityResolver interface{ Resolve(context.Context, string) (*identity.Identity, error) } // For resolving handles from DIDs 24 httpClient *http.Client // Shared HTTP client with connection pooling 25 didCache *lru.Cache[string, cachedDIDDoc] // Bounded LRU cache for .well-known verification results 26 wellKnownLimiter *rate.Limiter // Rate limiter for .well-known fetches 27 instanceDID string // DID of this Coves instance 28 skipVerification bool // Skip did:web verification (for dev mode) 29} 30 31// cachedDIDDoc represents a cached verification result with expiration 32type cachedDIDDoc struct { 33 expiresAt time.Time // When this cache entry expires 34 valid bool // Whether verification passed 35} 36 37// NewCommunityEventConsumer creates a new Jetstream consumer for community events 38// instanceDID: The DID of this Coves instance (for hostedBy verification) 39// skipVerification: Skip did:web verification (for dev mode) 40// identityResolver: Optional resolver for resolving handles from DIDs (can be nil for tests) 41func NewCommunityEventConsumer(repo communities.Repository, instanceDID string, skipVerification bool, identityResolver interface{ Resolve(context.Context, string) (*identity.Identity, error) }) *CommunityEventConsumer { 42 // Create bounded LRU cache for DID document verification results 43 // Max 1000 entries to prevent unbounded memory growth (PR review feedback) 44 // Each entry ~100 bytes → max ~100KB memory overhead 45 cache, err := lru.New[string, cachedDIDDoc](1000) 46 if err != nil { 47 // This should never happen with a valid size, but handle gracefully 48 log.Printf("WARNING: Failed to create DID cache, verification will be slower: %v", err) 49 // Create minimal cache to avoid nil pointer 50 cache, _ = lru.New[string, cachedDIDDoc](1) 51 } 52 53 return &CommunityEventConsumer{ 54 repo: repo, 55 identityResolver: identityResolver, // Optional - can be nil for tests 56 instanceDID: instanceDID, 57 skipVerification: skipVerification, 58 // Shared HTTP client with connection pooling for .well-known fetches 59 httpClient: &http.Client{ 60 Timeout: 10 * time.Second, 61 Transport: &http.Transport{ 62 MaxIdleConns: 100, 63 MaxIdleConnsPerHost: 10, 64 IdleConnTimeout: 90 * time.Second, 65 }, 66 }, 67 // Bounded LRU cache for .well-known verification results (max 1000 entries) 68 // Automatically evicts least-recently-used entries when full 69 didCache: cache, 70 // Rate limiter: 10 requests per second, burst of 20 71 // Prevents DoS via excessive .well-known fetches 72 wellKnownLimiter: rate.NewLimiter(10, 20), 73 } 74} 75 76// HandleEvent processes a Jetstream event for community records 77// This is called by the main Jetstream consumer when it receives commit events 78func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 79 // We only care about commit events for community records 80 if event.Kind != "commit" || event.Commit == nil { 81 return nil 82 } 83 84 commit := event.Commit 85 86 // Route to appropriate handler based on collection 87 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures 88 // - social.coves.community.profile: Community profile records (in community's own repo) 89 // - social.coves.community.subscription: Subscription records (in user's repo) 90 // - social.coves.community.block: Block records (in user's repo) 91 // 92 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints 93 // that CREATE or DELETE records in these collections 94 switch commit.Collection { 95 case "social.coves.community.profile": 96 return c.handleCommunityProfile(ctx, event.Did, commit) 97 case "social.coves.community.subscription": 98 // Handle both create (subscribe) and delete (unsubscribe) operations 99 return c.handleSubscription(ctx, event.Did, commit) 100 case "social.coves.community.block": 101 // Handle both create (block) and delete (unblock) operations 102 return c.handleBlock(ctx, event.Did, commit) 103 default: 104 // Not a community-related collection 105 return nil 106 } 107} 108 109// handleCommunityProfile processes community profile create/update/delete events 110func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error { 111 switch commit.Operation { 112 case "create": 113 return c.createCommunity(ctx, did, commit) 114 case "update": 115 return c.updateCommunity(ctx, did, commit) 116 case "delete": 117 return c.deleteCommunity(ctx, did) 118 default: 119 log.Printf("Unknown operation for community profile: %s", commit.Operation) 120 return nil 121 } 122} 123 124// createCommunity indexes a new community from the firehose 125func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error { 126 if commit.Record == nil { 127 return fmt.Errorf("community profile create event missing record data") 128 } 129 130 // Parse the community profile record 131 profile, err := parseCommunityProfile(commit.Record) 132 if err != nil { 133 return fmt.Errorf("failed to parse community profile: %w", err) 134 } 135 136 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs) 137 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID 138 if profile.Handle == "" { 139 if c.identityResolver != nil { 140 // Production: Resolve handle from PLC (source of truth) 141 // NO FALLBACK - if PLC is down, we fail and backfill later 142 // This prevents creating communities with incorrect handles in federated scenarios 143 identity, err := c.identityResolver.Resolve(ctx, did) 144 if err != nil { 145 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err) 146 } 147 profile.Handle = identity.Handle 148 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)", 149 profile.Handle, did, identity.Method) 150 } else { 151 // Test mode only: construct deterministically when no resolver available 152 profile.Handle = constructHandleFromProfile(profile) 153 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)", 154 profile.Handle, profile.Name, profile.HostedBy) 155 } 156 } 157 158 // SECURITY: Verify hostedBy claim matches handle domain 159 // This prevents malicious instances from claiming to host communities for domains they don't own 160 if err := c.verifyHostedByClaim(ctx, profile.Handle, profile.HostedBy); err != nil { 161 log.Printf("🚨 SECURITY: Rejecting community %s - hostedBy verification failed: %v", did, err) 162 log.Printf(" Handle: %s, HostedBy: %s", profile.Handle, profile.HostedBy) 163 return fmt.Errorf("hostedBy verification failed: %w", err) 164 } 165 166 // Build AT-URI for this record 167 // V2 Architecture (ONLY): 168 // - 'did' parameter IS the community DID (community owns its own repo) 169 // - rkey MUST be "self" for community profiles 170 // - URI: at://community_did/social.coves.community.profile/self 171 172 // REJECT non-V2 communities (pre-production: no V1 compatibility) 173 if commit.RKey != "self" { 174 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 175 } 176 177 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did) 178 179 // V2: Community ALWAYS owns itself 180 ownerDID := did 181 182 // Create community entity 183 community := &communities.Community{ 184 DID: did, // V2: Repository DID IS the community DID 185 Handle: profile.Handle, 186 Name: profile.Name, 187 DisplayName: profile.DisplayName, 188 Description: profile.Description, 189 OwnerDID: ownerDID, // V2: same as DID (self-owned) 190 CreatedByDID: profile.CreatedBy, 191 HostedByDID: profile.HostedBy, 192 Visibility: profile.Visibility, 193 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery, 194 ModerationType: profile.ModerationType, 195 ContentWarnings: profile.ContentWarnings, 196 MemberCount: profile.MemberCount, 197 SubscriberCount: profile.SubscriberCount, 198 FederatedFrom: profile.FederatedFrom, 199 FederatedID: profile.FederatedID, 200 CreatedAt: profile.CreatedAt, 201 UpdatedAt: time.Now(), 202 RecordURI: uri, 203 RecordCID: commit.CID, 204 } 205 206 // Handle blobs (avatar/banner) if present 207 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 208 community.AvatarCID = avatarCID 209 } 210 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 211 community.BannerCID = bannerCID 212 } 213 214 // Handle description facets (rich text) 215 if profile.DescriptionFacets != nil { 216 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 217 if marshalErr == nil { 218 community.DescriptionFacets = facetsJSON 219 } 220 } 221 222 // Index in AppView database 223 _, err = c.repo.Create(ctx, community) 224 if err != nil { 225 // Check if it already exists (idempotency) 226 if communities.IsConflict(err) { 227 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID) 228 return nil 229 } 230 return fmt.Errorf("failed to index community: %w", err) 231 } 232 233 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID) 234 return nil 235} 236 237// updateCommunity updates an existing community from the firehose 238func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error { 239 if commit.Record == nil { 240 return fmt.Errorf("community profile update event missing record data") 241 } 242 243 // REJECT non-V2 communities (pre-production: no V1 compatibility) 244 if commit.RKey != "self" { 245 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 246 } 247 248 // Parse profile 249 profile, err := parseCommunityProfile(commit.Record) 250 if err != nil { 251 return fmt.Errorf("failed to parse community profile: %w", err) 252 } 253 254 // atProto Best Practice: Handles are NOT stored in records (they're mutable, resolved from DIDs) 255 // If handle is missing from record (new atProto-compliant records), resolve it from PLC/DID 256 if profile.Handle == "" { 257 if c.identityResolver != nil { 258 // Production: Resolve handle from PLC (source of truth) 259 // NO FALLBACK - if PLC is down, we fail and backfill later 260 // This prevents creating communities with incorrect handles in federated scenarios 261 identity, err := c.identityResolver.Resolve(ctx, did) 262 if err != nil { 263 return fmt.Errorf("failed to resolve handle from PLC for %s: %w (no fallback - will retry during backfill)", did, err) 264 } 265 profile.Handle = identity.Handle 266 log.Printf("✓ Resolved handle from PLC: %s (did=%s, method=%s)", 267 profile.Handle, did, identity.Method) 268 } else { 269 // Test mode only: construct deterministically when no resolver available 270 profile.Handle = constructHandleFromProfile(profile) 271 log.Printf("✓ Constructed handle (test mode): %s (name=%s, hostedBy=%s)", 272 profile.Handle, profile.Name, profile.HostedBy) 273 } 274 } 275 276 // V2: Repository DID IS the community DID 277 // Get existing community using the repo DID 278 existing, err := c.repo.GetByDID(ctx, did) 279 if err != nil { 280 if communities.IsNotFound(err) { 281 // Community doesn't exist yet - treat as create 282 log.Printf("Community not found for update, creating: %s", did) 283 return c.createCommunity(ctx, did, commit) 284 } 285 return fmt.Errorf("failed to get existing community: %w", err) 286 } 287 288 // Update fields 289 existing.Handle = profile.Handle 290 existing.Name = profile.Name 291 existing.DisplayName = profile.DisplayName 292 existing.Description = profile.Description 293 existing.Visibility = profile.Visibility 294 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery 295 existing.ModerationType = profile.ModerationType 296 existing.ContentWarnings = profile.ContentWarnings 297 existing.RecordCID = commit.CID 298 299 // Update blobs 300 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 301 existing.AvatarCID = avatarCID 302 } 303 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 304 existing.BannerCID = bannerCID 305 } 306 307 // Update description facets 308 if profile.DescriptionFacets != nil { 309 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 310 if marshalErr == nil { 311 existing.DescriptionFacets = facetsJSON 312 } 313 } 314 315 // Save updates 316 _, err = c.repo.Update(ctx, existing) 317 if err != nil { 318 return fmt.Errorf("failed to update community: %w", err) 319 } 320 321 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID) 322 return nil 323} 324 325// deleteCommunity removes a community from the index 326func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error { 327 err := c.repo.Delete(ctx, did) 328 if err != nil { 329 if communities.IsNotFound(err) { 330 log.Printf("Community already deleted: %s", did) 331 return nil 332 } 333 return fmt.Errorf("failed to delete community: %w", err) 334 } 335 336 log.Printf("Deleted community: %s", did) 337 return nil 338} 339 340// verifyHostedByClaim verifies that the community's hostedBy claim matches the handle domain 341// This prevents malicious instances from claiming to host communities for domains they don't own 342func (c *CommunityEventConsumer) verifyHostedByClaim(ctx context.Context, handle, hostedByDID string) error { 343 // Skip verification in dev mode 344 if c.skipVerification { 345 return nil 346 } 347 348 // Add 15 second overall timeout to prevent slow verification from blocking consumer (PR review feedback) 349 ctx, cancel := context.WithTimeout(ctx, 15*time.Second) 350 defer cancel() 351 352 // Verify hostedByDID is did:web format 353 if !strings.HasPrefix(hostedByDID, "did:web:") { 354 return fmt.Errorf("hostedByDID must use did:web method, got: %s", hostedByDID) 355 } 356 357 // Extract domain from did:web DID 358 hostedByDomain := strings.TrimPrefix(hostedByDID, "did:web:") 359 360 // Extract domain from community handle 361 // Handle format examples: 362 // - "!gaming@coves.social" → domain: "coves.social" 363 // - "gaming.community.coves.social" → domain: "coves.social" 364 handleDomain := extractDomainFromHandle(handle) 365 if handleDomain == "" { 366 return fmt.Errorf("failed to extract domain from handle: %s", handle) 367 } 368 369 // Verify handle domain matches hostedBy domain 370 if handleDomain != hostedByDomain { 371 return fmt.Errorf("handle domain (%s) doesn't match hostedBy domain (%s)", handleDomain, hostedByDomain) 372 } 373 374 // Optional: Verify DID document exists and is valid 375 // This provides cryptographic proof of domain ownership 376 if err := c.verifyDIDDocument(ctx, hostedByDID, hostedByDomain); err != nil { 377 // Soft-fail: Log warning but don't reject the community 378 // This allows operation during network issues or .well-known misconfiguration 379 log.Printf("⚠️ WARNING: DID document verification failed for %s: %v", hostedByDomain, err) 380 log.Printf(" Community will be indexed, but hostedBy claim cannot be cryptographically verified") 381 } 382 383 return nil 384} 385 386// verifyDIDDocument fetches and validates the DID document from .well-known/did.json 387// This provides cryptographic proof that the instance controls the domain 388// Results are cached with TTL and rate-limited to prevent DoS attacks 389func (c *CommunityEventConsumer) verifyDIDDocument(ctx context.Context, did, domain string) error { 390 // Skip verification in dev mode 391 if c.skipVerification { 392 return nil 393 } 394 395 // Check bounded LRU cache first (thread-safe, no locks needed) 396 if cached, ok := c.didCache.Get(did); ok { 397 // Check if cache entry is still valid (not expired) 398 if time.Now().Before(cached.expiresAt) { 399 if !cached.valid { 400 return fmt.Errorf("cached verification failure for %s", did) 401 } 402 log.Printf("✓ DID document verification (cached): %s", domain) 403 return nil 404 } 405 // Cache entry expired - remove it to free up space for fresh entries 406 c.didCache.Remove(did) 407 } 408 409 // Rate limit .well-known fetches to prevent DoS 410 if err := c.wellKnownLimiter.Wait(ctx); err != nil { 411 return fmt.Errorf("rate limit exceeded for .well-known fetch: %w", err) 412 } 413 414 // Construct .well-known URL 415 didDocURL := fmt.Sprintf("https://%s/.well-known/did.json", domain) 416 417 // Create HTTP request with timeout 418 req, err := http.NewRequestWithContext(ctx, "GET", didDocURL, nil) 419 if err != nil { 420 // Cache the failure 421 c.cacheVerificationResult(did, false, 5*time.Minute) 422 return fmt.Errorf("failed to create request: %w", err) 423 } 424 425 // Fetch DID document using shared HTTP client 426 resp, err := c.httpClient.Do(req) 427 if err != nil { 428 // Cache the failure (shorter TTL for network errors) 429 c.cacheVerificationResult(did, false, 5*time.Minute) 430 return fmt.Errorf("failed to fetch DID document from %s: %w", didDocURL, err) 431 } 432 defer func() { 433 if closeErr := resp.Body.Close(); closeErr != nil { 434 log.Printf("Failed to close response body: %v", closeErr) 435 } 436 }() 437 438 // Verify HTTP status 439 if resp.StatusCode != http.StatusOK { 440 // Cache the failure 441 c.cacheVerificationResult(did, false, 5*time.Minute) 442 return fmt.Errorf("DID document returned HTTP %d from %s", resp.StatusCode, didDocURL) 443 } 444 445 // Parse DID document 446 var didDoc struct { 447 ID string `json:"id"` 448 } 449 if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil { 450 // Cache the failure 451 c.cacheVerificationResult(did, false, 5*time.Minute) 452 return fmt.Errorf("failed to parse DID document JSON: %w", err) 453 } 454 455 // Verify DID document ID matches claimed DID 456 if didDoc.ID != did { 457 // Cache the failure 458 c.cacheVerificationResult(did, false, 5*time.Minute) 459 return fmt.Errorf("DID document ID (%s) doesn't match claimed DID (%s)", didDoc.ID, did) 460 } 461 462 // Cache the success (1 hour TTL) 463 c.cacheVerificationResult(did, true, 1*time.Hour) 464 465 log.Printf("✓ DID document verified: %s", domain) 466 return nil 467} 468 469// cacheVerificationResult stores a verification result in the bounded LRU cache with the given TTL 470// The LRU cache is thread-safe and automatically evicts least-recently-used entries when full 471func (c *CommunityEventConsumer) cacheVerificationResult(did string, valid bool, ttl time.Duration) { 472 c.didCache.Add(did, cachedDIDDoc{ 473 valid: valid, 474 expiresAt: time.Now().Add(ttl), 475 }) 476} 477 478// extractDomainFromHandle extracts the registrable domain from a community handle 479// Handles both formats: 480// - Bluesky-style: "!gaming@coves.social" → "coves.social" 481// - DNS-style: "gaming.community.coves.social" → "coves.social" 482// 483// Uses golang.org/x/net/publicsuffix to correctly handle multi-part TLDs: 484// - "gaming.community.coves.co.uk" → "coves.co.uk" (not "co.uk") 485// - "gaming.community.example.com.au" → "example.com.au" (not "com.au") 486func extractDomainFromHandle(handle string) string { 487 // Remove leading ! if present 488 handle = strings.TrimPrefix(handle, "!") 489 490 // Check for @-separated format (e.g., "gaming@coves.social") 491 if strings.Contains(handle, "@") { 492 parts := strings.Split(handle, "@") 493 if len(parts) == 2 { 494 domain := parts[1] 495 // Validate and extract eTLD+1 from the @-domain part 496 registrable, err := publicsuffix.EffectiveTLDPlusOne(domain) 497 if err != nil { 498 // If publicsuffix fails, fall back to returning the full domain part 499 // This handles edge cases like localhost, IP addresses, etc. 500 return domain 501 } 502 return registrable 503 } 504 return "" 505 } 506 507 // For DNS-style handles (e.g., "gaming.community.coves.social") 508 // Extract the registrable domain (eTLD+1) using publicsuffix 509 // This correctly handles multi-part TLDs like .co.uk, .com.au, etc. 510 registrable, err := publicsuffix.EffectiveTLDPlusOne(handle) 511 if err != nil { 512 // If publicsuffix fails (e.g., invalid TLD, localhost, IP address) 513 // fall back to naive extraction (last 2 parts) 514 // This maintains backward compatibility for edge cases 515 parts := strings.Split(handle, ".") 516 if len(parts) < 2 { 517 return "" // Invalid handle 518 } 519 return strings.Join(parts[len(parts)-2:], ".") 520 } 521 522 return registrable 523} 524 525// handleSubscription processes subscription create/delete events 526// CREATE operation = user subscribed to community 527// DELETE operation = user unsubscribed from community 528func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 529 switch commit.Operation { 530 case "create": 531 return c.createSubscription(ctx, userDID, commit) 532 case "delete": 533 return c.deleteSubscription(ctx, userDID, commit) 534 default: 535 // Update operations shouldn't happen on subscriptions, but ignore gracefully 536 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)", 537 commit.Operation, userDID, commit.RKey) 538 return nil 539 } 540} 541 542// createSubscription indexes a new subscription with retry logic 543func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 544 if commit.Record == nil { 545 return fmt.Errorf("subscription create event missing record data") 546 } 547 548 // Extract community DID from record's subject field (following atProto conventions) 549 communityDID, ok := commit.Record["subject"].(string) 550 if !ok { 551 return fmt.Errorf("subscription record missing subject field") 552 } 553 554 // Extract contentVisibility with clamping and default value 555 contentVisibility := extractContentVisibility(commit.Record) 556 557 // Build AT-URI for subscription record 558 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint 559 // The record lives in the USER's repository, but uses the communities namespace 560 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 561 562 // Create subscription entity 563 // Parse createdAt from record to preserve chronological ordering during replays 564 subscription := &communities.Subscription{ 565 UserDID: userDID, 566 CommunityDID: communityDID, 567 ContentVisibility: contentVisibility, 568 SubscribedAt: utils.ParseCreatedAt(commit.Record), 569 RecordURI: uri, 570 RecordCID: commit.CID, 571 } 572 573 // Use transactional method to ensure subscription and count are atomically updated 574 // This is idempotent - safe for Jetstream replays 575 _, err := c.repo.SubscribeWithCount(ctx, subscription) 576 if err != nil { 577 // If already exists, that's fine (idempotency) 578 if communities.IsConflict(err) { 579 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)", 580 userDID, communityDID, contentVisibility) 581 return nil 582 } 583 return fmt.Errorf("failed to index subscription: %w", err) 584 } 585 586 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)", 587 userDID, communityDID, contentVisibility) 588 return nil 589} 590 591// deleteSubscription removes a subscription from the index 592// DELETE operations don't include record data, so we need to look up the subscription 593// by its URI to find which community the user unsubscribed from 594func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 595 // Build AT-URI from the rkey 596 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 597 598 // Look up the subscription to get the community DID 599 // (DELETE operations don't include record data in Jetstream) 600 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri) 601 if err != nil { 602 if communities.IsNotFound(err) { 603 // Already deleted - this is fine (idempotency) 604 log.Printf("Subscription already deleted: %s", uri) 605 return nil 606 } 607 return fmt.Errorf("failed to find subscription for deletion: %w", err) 608 } 609 610 // Use transactional method to ensure unsubscribe and count are atomically updated 611 // This is idempotent - safe for Jetstream replays 612 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID) 613 if err != nil { 614 if communities.IsNotFound(err) { 615 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID) 616 return nil 617 } 618 return fmt.Errorf("failed to remove subscription: %w", err) 619 } 620 621 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID) 622 return nil 623} 624 625// handleBlock processes block create/delete events 626// CREATE operation = user blocked a community 627// DELETE operation = user unblocked a community 628func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 629 switch commit.Operation { 630 case "create": 631 return c.createBlock(ctx, userDID, commit) 632 case "delete": 633 return c.deleteBlock(ctx, userDID, commit) 634 default: 635 // Update operations shouldn't happen on blocks, but ignore gracefully 636 log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)", 637 commit.Operation, userDID, commit.RKey) 638 return nil 639 } 640} 641 642// createBlock indexes a new block 643func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 644 if commit.Record == nil { 645 return fmt.Errorf("block create event missing record data") 646 } 647 648 // Extract community DID from record's subject field (following atProto conventions) 649 communityDID, ok := commit.Record["subject"].(string) 650 if !ok { 651 return fmt.Errorf("block record missing subject field") 652 } 653 654 // Build AT-URI for block record 655 // The record lives in the USER's repository 656 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 657 658 // Create block entity 659 // Parse createdAt from record to preserve chronological ordering during replays 660 block := &communities.CommunityBlock{ 661 UserDID: userDID, 662 CommunityDID: communityDID, 663 BlockedAt: utils.ParseCreatedAt(commit.Record), 664 RecordURI: uri, 665 RecordCID: commit.CID, 666 } 667 668 // Index the block 669 // This is idempotent - safe for Jetstream replays 670 _, err := c.repo.BlockCommunity(ctx, block) 671 if err != nil { 672 // If already exists, that's fine (idempotency) 673 if communities.IsConflict(err) { 674 log.Printf("Block already indexed: %s -> %s", userDID, communityDID) 675 return nil 676 } 677 return fmt.Errorf("failed to index block: %w", err) 678 } 679 680 log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID) 681 return nil 682} 683 684// deleteBlock removes a block from the index 685// DELETE operations don't include record data, so we need to look up the block 686// by its URI to find which community the user unblocked 687func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 688 // Build AT-URI from the rkey 689 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 690 691 // Look up the block to get the community DID 692 // (DELETE operations don't include record data in Jetstream) 693 block, err := c.repo.GetBlockByURI(ctx, uri) 694 if err != nil { 695 if communities.IsNotFound(err) { 696 // Already deleted - this is fine (idempotency) 697 log.Printf("Block already deleted: %s", uri) 698 return nil 699 } 700 return fmt.Errorf("failed to find block for deletion: %w", err) 701 } 702 703 // Remove the block from the index 704 err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID) 705 if err != nil { 706 if communities.IsNotFound(err) { 707 log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID) 708 return nil 709 } 710 return fmt.Errorf("failed to remove block: %w", err) 711 } 712 713 log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID) 714 return nil 715} 716 717// Helper types and functions 718 719type CommunityProfile struct { 720 CreatedAt time.Time `json:"createdAt"` 721 Avatar map[string]interface{} `json:"avatar"` 722 Banner map[string]interface{} `json:"banner"` 723 CreatedBy string `json:"createdBy"` 724 Visibility string `json:"visibility"` 725 AtprotoHandle string `json:"atprotoHandle"` 726 DisplayName string `json:"displayName"` 727 Name string `json:"name"` 728 Handle string `json:"handle"` 729 HostedBy string `json:"hostedBy"` 730 Description string `json:"description"` 731 FederatedID string `json:"federatedId"` 732 ModerationType string `json:"moderationType"` 733 FederatedFrom string `json:"federatedFrom"` 734 ContentWarnings []string `json:"contentWarnings"` 735 DescriptionFacets []interface{} `json:"descriptionFacets"` 736 MemberCount int `json:"memberCount"` 737 SubscriberCount int `json:"subscriberCount"` 738 Federation FederationConfig `json:"federation"` 739} 740 741type FederationConfig struct { 742 AllowExternalDiscovery bool `json:"allowExternalDiscovery"` 743} 744 745// parseCommunityProfile converts a raw record map to a CommunityProfile 746func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) { 747 recordJSON, err := json.Marshal(record) 748 if err != nil { 749 return nil, fmt.Errorf("failed to marshal record: %w", err) 750 } 751 752 var profile CommunityProfile 753 if err := json.Unmarshal(recordJSON, &profile); err != nil { 754 return nil, fmt.Errorf("failed to unmarshal profile: %w", err) 755 } 756 757 return &profile, nil 758} 759 760// constructHandleFromProfile constructs a deterministic handle from profile data 761// Format: {name}.community.{instanceDomain} 762// Example: gaming.community.coves.social 763// This is ONLY used in test mode (when identity resolver is nil) 764// Production MUST resolve handles from PLC (source of truth) 765// Returns empty string if hostedBy is not did:web format (caller will fail validation) 766func constructHandleFromProfile(profile *CommunityProfile) string { 767 if !strings.HasPrefix(profile.HostedBy, "did:web:") { 768 // hostedBy must be did:web format for handle construction 769 // Return empty to trigger validation error in repository 770 return "" 771 } 772 instanceDomain := strings.TrimPrefix(profile.HostedBy, "did:web:") 773 return fmt.Sprintf("%s.community.%s", profile.Name, instanceDomain) 774} 775 776// extractContentVisibility extracts contentVisibility from subscription record with clamping 777// Returns default value of 3 if missing or invalid 778func extractContentVisibility(record map[string]interface{}) int { 779 const defaultVisibility = 3 780 781 cv, ok := record["contentVisibility"] 782 if !ok { 783 // Field missing - use default 784 return defaultVisibility 785 } 786 787 // JSON numbers decode as float64 788 cvFloat, ok := cv.(float64) 789 if !ok { 790 // Try int (shouldn't happen but handle gracefully) 791 if cvInt, isInt := cv.(int); isInt { 792 return clampContentVisibility(cvInt) 793 } 794 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv) 795 return defaultVisibility 796 } 797 798 // Convert and clamp 799 clamped := clampContentVisibility(int(cvFloat)) 800 if clamped != int(cvFloat) { 801 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped) 802 } 803 return clamped 804} 805 806// clampContentVisibility ensures value is within valid range (1-5) 807func clampContentVisibility(value int) int { 808 if value < 1 { 809 return 1 810 } 811 if value > 5 { 812 return 5 813 } 814 return value 815} 816 817// extractBlobCID extracts the CID from a blob reference 818// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123} 819func extractBlobCID(blob map[string]interface{}) (string, bool) { 820 if blob == nil { 821 return "", false 822 } 823 824 // Check if it's a blob type 825 blobType, ok := blob["$type"].(string) 826 if !ok || blobType != "blob" { 827 return "", false 828 } 829 830 // Extract ref 831 ref, ok := blob["ref"].(map[string]interface{}) 832 if !ok { 833 return "", false 834 } 835 836 // Extract $link (the CID) 837 link, ok := ref["$link"].(string) 838 if !ok { 839 return "", false 840 } 841 842 return link, true 843}