A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/atproto/utils" 5 "Coves/internal/core/communities" 6 "context" 7 "encoding/json" 8 "fmt" 9 "log" 10 "time" 11) 12 13// CommunityEventConsumer consumes community-related events from Jetstream 14type CommunityEventConsumer struct { 15 repo communities.Repository 16} 17 18// NewCommunityEventConsumer creates a new Jetstream consumer for community events 19func NewCommunityEventConsumer(repo communities.Repository) *CommunityEventConsumer { 20 return &CommunityEventConsumer{ 21 repo: repo, 22 } 23} 24 25// HandleEvent processes a Jetstream event for community records 26// This is called by the main Jetstream consumer when it receives commit events 27func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 28 // We only care about commit events for community records 29 if event.Kind != "commit" || event.Commit == nil { 30 return nil 31 } 32 33 commit := event.Commit 34 35 // Route to appropriate handler based on collection 36 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures 37 // - social.coves.community.profile: Community profile records (in community's own repo) 38 // - social.coves.community.subscription: Subscription records (in user's repo) 39 // - social.coves.community.block: Block records (in user's repo) 40 // 41 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints 42 // that CREATE or DELETE records in these collections 43 switch commit.Collection { 44 case "social.coves.community.profile": 45 return c.handleCommunityProfile(ctx, event.Did, commit) 46 case "social.coves.community.subscription": 47 // Handle both create (subscribe) and delete (unsubscribe) operations 48 return c.handleSubscription(ctx, event.Did, commit) 49 case "social.coves.community.block": 50 // Handle both create (block) and delete (unblock) operations 51 return c.handleBlock(ctx, event.Did, commit) 52 default: 53 // Not a community-related collection 54 return nil 55 } 56} 57 58// handleCommunityProfile processes community profile create/update/delete events 59func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error { 60 switch commit.Operation { 61 case "create": 62 return c.createCommunity(ctx, did, commit) 63 case "update": 64 return c.updateCommunity(ctx, did, commit) 65 case "delete": 66 return c.deleteCommunity(ctx, did) 67 default: 68 log.Printf("Unknown operation for community profile: %s", commit.Operation) 69 return nil 70 } 71} 72 73// createCommunity indexes a new community from the firehose 74func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error { 75 if commit.Record == nil { 76 return fmt.Errorf("community profile create event missing record data") 77 } 78 79 // Parse the community profile record 80 profile, err := parseCommunityProfile(commit.Record) 81 if err != nil { 82 return fmt.Errorf("failed to parse community profile: %w", err) 83 } 84 85 // Build AT-URI for this record 86 // V2 Architecture (ONLY): 87 // - 'did' parameter IS the community DID (community owns its own repo) 88 // - rkey MUST be "self" for community profiles 89 // - URI: at://community_did/social.coves.community.profile/self 90 91 // REJECT non-V2 communities (pre-production: no V1 compatibility) 92 if commit.RKey != "self" { 93 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 94 } 95 96 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did) 97 98 // V2: Community ALWAYS owns itself 99 ownerDID := did 100 101 // Create community entity 102 community := &communities.Community{ 103 DID: did, // V2: Repository DID IS the community DID 104 Handle: profile.Handle, 105 Name: profile.Name, 106 DisplayName: profile.DisplayName, 107 Description: profile.Description, 108 OwnerDID: ownerDID, // V2: same as DID (self-owned) 109 CreatedByDID: profile.CreatedBy, 110 HostedByDID: profile.HostedBy, 111 Visibility: profile.Visibility, 112 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery, 113 ModerationType: profile.ModerationType, 114 ContentWarnings: profile.ContentWarnings, 115 MemberCount: profile.MemberCount, 116 SubscriberCount: profile.SubscriberCount, 117 FederatedFrom: profile.FederatedFrom, 118 FederatedID: profile.FederatedID, 119 CreatedAt: profile.CreatedAt, 120 UpdatedAt: time.Now(), 121 RecordURI: uri, 122 RecordCID: commit.CID, 123 } 124 125 // Handle blobs (avatar/banner) if present 126 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 127 community.AvatarCID = avatarCID 128 } 129 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 130 community.BannerCID = bannerCID 131 } 132 133 // Handle description facets (rich text) 134 if profile.DescriptionFacets != nil { 135 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 136 if marshalErr == nil { 137 community.DescriptionFacets = facetsJSON 138 } 139 } 140 141 // Index in AppView database 142 _, err = c.repo.Create(ctx, community) 143 if err != nil { 144 // Check if it already exists (idempotency) 145 if communities.IsConflict(err) { 146 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID) 147 return nil 148 } 149 return fmt.Errorf("failed to index community: %w", err) 150 } 151 152 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID) 153 return nil 154} 155 156// updateCommunity updates an existing community from the firehose 157func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error { 158 if commit.Record == nil { 159 return fmt.Errorf("community profile update event missing record data") 160 } 161 162 // REJECT non-V2 communities (pre-production: no V1 compatibility) 163 if commit.RKey != "self" { 164 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 165 } 166 167 // Parse profile 168 profile, err := parseCommunityProfile(commit.Record) 169 if err != nil { 170 return fmt.Errorf("failed to parse community profile: %w", err) 171 } 172 173 // V2: Repository DID IS the community DID 174 // Get existing community using the repo DID 175 existing, err := c.repo.GetByDID(ctx, did) 176 if err != nil { 177 if communities.IsNotFound(err) { 178 // Community doesn't exist yet - treat as create 179 log.Printf("Community not found for update, creating: %s", did) 180 return c.createCommunity(ctx, did, commit) 181 } 182 return fmt.Errorf("failed to get existing community: %w", err) 183 } 184 185 // Update fields 186 existing.Handle = profile.Handle 187 existing.Name = profile.Name 188 existing.DisplayName = profile.DisplayName 189 existing.Description = profile.Description 190 existing.Visibility = profile.Visibility 191 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery 192 existing.ModerationType = profile.ModerationType 193 existing.ContentWarnings = profile.ContentWarnings 194 existing.RecordCID = commit.CID 195 196 // Update blobs 197 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 198 existing.AvatarCID = avatarCID 199 } 200 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 201 existing.BannerCID = bannerCID 202 } 203 204 // Update description facets 205 if profile.DescriptionFacets != nil { 206 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 207 if marshalErr == nil { 208 existing.DescriptionFacets = facetsJSON 209 } 210 } 211 212 // Save updates 213 _, err = c.repo.Update(ctx, existing) 214 if err != nil { 215 return fmt.Errorf("failed to update community: %w", err) 216 } 217 218 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID) 219 return nil 220} 221 222// deleteCommunity removes a community from the index 223func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error { 224 err := c.repo.Delete(ctx, did) 225 if err != nil { 226 if communities.IsNotFound(err) { 227 log.Printf("Community already deleted: %s", did) 228 return nil 229 } 230 return fmt.Errorf("failed to delete community: %w", err) 231 } 232 233 log.Printf("Deleted community: %s", did) 234 return nil 235} 236 237// handleSubscription processes subscription create/delete events 238// CREATE operation = user subscribed to community 239// DELETE operation = user unsubscribed from community 240func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 241 switch commit.Operation { 242 case "create": 243 return c.createSubscription(ctx, userDID, commit) 244 case "delete": 245 return c.deleteSubscription(ctx, userDID, commit) 246 default: 247 // Update operations shouldn't happen on subscriptions, but ignore gracefully 248 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)", 249 commit.Operation, userDID, commit.RKey) 250 return nil 251 } 252} 253 254// createSubscription indexes a new subscription with retry logic 255func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 256 if commit.Record == nil { 257 return fmt.Errorf("subscription create event missing record data") 258 } 259 260 // Extract community DID from record's subject field (following atProto conventions) 261 communityDID, ok := commit.Record["subject"].(string) 262 if !ok { 263 return fmt.Errorf("subscription record missing subject field") 264 } 265 266 // Extract contentVisibility with clamping and default value 267 contentVisibility := extractContentVisibility(commit.Record) 268 269 // Build AT-URI for subscription record 270 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint 271 // The record lives in the USER's repository, but uses the communities namespace 272 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 273 274 // Create subscription entity 275 // Parse createdAt from record to preserve chronological ordering during replays 276 subscription := &communities.Subscription{ 277 UserDID: userDID, 278 CommunityDID: communityDID, 279 ContentVisibility: contentVisibility, 280 SubscribedAt: utils.ParseCreatedAt(commit.Record), 281 RecordURI: uri, 282 RecordCID: commit.CID, 283 } 284 285 // Use transactional method to ensure subscription and count are atomically updated 286 // This is idempotent - safe for Jetstream replays 287 _, err := c.repo.SubscribeWithCount(ctx, subscription) 288 if err != nil { 289 // If already exists, that's fine (idempotency) 290 if communities.IsConflict(err) { 291 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)", 292 userDID, communityDID, contentVisibility) 293 return nil 294 } 295 return fmt.Errorf("failed to index subscription: %w", err) 296 } 297 298 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)", 299 userDID, communityDID, contentVisibility) 300 return nil 301} 302 303// deleteSubscription removes a subscription from the index 304// DELETE operations don't include record data, so we need to look up the subscription 305// by its URI to find which community the user unsubscribed from 306func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 307 // Build AT-URI from the rkey 308 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 309 310 // Look up the subscription to get the community DID 311 // (DELETE operations don't include record data in Jetstream) 312 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri) 313 if err != nil { 314 if communities.IsNotFound(err) { 315 // Already deleted - this is fine (idempotency) 316 log.Printf("Subscription already deleted: %s", uri) 317 return nil 318 } 319 return fmt.Errorf("failed to find subscription for deletion: %w", err) 320 } 321 322 // Use transactional method to ensure unsubscribe and count are atomically updated 323 // This is idempotent - safe for Jetstream replays 324 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID) 325 if err != nil { 326 if communities.IsNotFound(err) { 327 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID) 328 return nil 329 } 330 return fmt.Errorf("failed to remove subscription: %w", err) 331 } 332 333 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID) 334 return nil 335} 336 337// handleBlock processes block create/delete events 338// CREATE operation = user blocked a community 339// DELETE operation = user unblocked a community 340func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 341 switch commit.Operation { 342 case "create": 343 return c.createBlock(ctx, userDID, commit) 344 case "delete": 345 return c.deleteBlock(ctx, userDID, commit) 346 default: 347 // Update operations shouldn't happen on blocks, but ignore gracefully 348 log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)", 349 commit.Operation, userDID, commit.RKey) 350 return nil 351 } 352} 353 354// createBlock indexes a new block 355func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 356 if commit.Record == nil { 357 return fmt.Errorf("block create event missing record data") 358 } 359 360 // Extract community DID from record's subject field (following atProto conventions) 361 communityDID, ok := commit.Record["subject"].(string) 362 if !ok { 363 return fmt.Errorf("block record missing subject field") 364 } 365 366 // Build AT-URI for block record 367 // The record lives in the USER's repository 368 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 369 370 // Create block entity 371 // Parse createdAt from record to preserve chronological ordering during replays 372 block := &communities.CommunityBlock{ 373 UserDID: userDID, 374 CommunityDID: communityDID, 375 BlockedAt: utils.ParseCreatedAt(commit.Record), 376 RecordURI: uri, 377 RecordCID: commit.CID, 378 } 379 380 // Index the block 381 // This is idempotent - safe for Jetstream replays 382 _, err := c.repo.BlockCommunity(ctx, block) 383 if err != nil { 384 // If already exists, that's fine (idempotency) 385 if communities.IsConflict(err) { 386 log.Printf("Block already indexed: %s -> %s", userDID, communityDID) 387 return nil 388 } 389 return fmt.Errorf("failed to index block: %w", err) 390 } 391 392 log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID) 393 return nil 394} 395 396// deleteBlock removes a block from the index 397// DELETE operations don't include record data, so we need to look up the block 398// by its URI to find which community the user unblocked 399func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 400 // Build AT-URI from the rkey 401 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey) 402 403 // Look up the block to get the community DID 404 // (DELETE operations don't include record data in Jetstream) 405 block, err := c.repo.GetBlockByURI(ctx, uri) 406 if err != nil { 407 if communities.IsNotFound(err) { 408 // Already deleted - this is fine (idempotency) 409 log.Printf("Block already deleted: %s", uri) 410 return nil 411 } 412 return fmt.Errorf("failed to find block for deletion: %w", err) 413 } 414 415 // Remove the block from the index 416 err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID) 417 if err != nil { 418 if communities.IsNotFound(err) { 419 log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID) 420 return nil 421 } 422 return fmt.Errorf("failed to remove block: %w", err) 423 } 424 425 log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID) 426 return nil 427} 428 429// Helper types and functions 430 431type CommunityProfile struct { 432 CreatedAt time.Time `json:"createdAt"` 433 Avatar map[string]interface{} `json:"avatar"` 434 Banner map[string]interface{} `json:"banner"` 435 CreatedBy string `json:"createdBy"` 436 Visibility string `json:"visibility"` 437 AtprotoHandle string `json:"atprotoHandle"` 438 DisplayName string `json:"displayName"` 439 Name string `json:"name"` 440 Handle string `json:"handle"` 441 HostedBy string `json:"hostedBy"` 442 Description string `json:"description"` 443 FederatedID string `json:"federatedId"` 444 ModerationType string `json:"moderationType"` 445 FederatedFrom string `json:"federatedFrom"` 446 ContentWarnings []string `json:"contentWarnings"` 447 DescriptionFacets []interface{} `json:"descriptionFacets"` 448 MemberCount int `json:"memberCount"` 449 SubscriberCount int `json:"subscriberCount"` 450 Federation FederationConfig `json:"federation"` 451} 452 453type FederationConfig struct { 454 AllowExternalDiscovery bool `json:"allowExternalDiscovery"` 455} 456 457// parseCommunityProfile converts a raw record map to a CommunityProfile 458func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) { 459 recordJSON, err := json.Marshal(record) 460 if err != nil { 461 return nil, fmt.Errorf("failed to marshal record: %w", err) 462 } 463 464 var profile CommunityProfile 465 if err := json.Unmarshal(recordJSON, &profile); err != nil { 466 return nil, fmt.Errorf("failed to unmarshal profile: %w", err) 467 } 468 469 return &profile, nil 470} 471 472// extractContentVisibility extracts contentVisibility from subscription record with clamping 473// Returns default value of 3 if missing or invalid 474func extractContentVisibility(record map[string]interface{}) int { 475 const defaultVisibility = 3 476 477 cv, ok := record["contentVisibility"] 478 if !ok { 479 // Field missing - use default 480 return defaultVisibility 481 } 482 483 // JSON numbers decode as float64 484 cvFloat, ok := cv.(float64) 485 if !ok { 486 // Try int (shouldn't happen but handle gracefully) 487 if cvInt, isInt := cv.(int); isInt { 488 return clampContentVisibility(cvInt) 489 } 490 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv) 491 return defaultVisibility 492 } 493 494 // Convert and clamp 495 clamped := clampContentVisibility(int(cvFloat)) 496 if clamped != int(cvFloat) { 497 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped) 498 } 499 return clamped 500} 501 502// clampContentVisibility ensures value is within valid range (1-5) 503func clampContentVisibility(value int) int { 504 if value < 1 { 505 return 1 506 } 507 if value > 5 { 508 return 5 509 } 510 return value 511} 512 513// extractBlobCID extracts the CID from a blob reference 514// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123} 515func extractBlobCID(blob map[string]interface{}) (string, bool) { 516 if blob == nil { 517 return "", false 518 } 519 520 // Check if it's a blob type 521 blobType, ok := blob["$type"].(string) 522 if !ok || blobType != "blob" { 523 return "", false 524 } 525 526 // Extract ref 527 ref, ok := blob["ref"].(map[string]interface{}) 528 if !ok { 529 return "", false 530 } 531 532 // Extract $link (the CID) 533 link, ok := ref["$link"].(string) 534 if !ok { 535 return "", false 536 } 537 538 return link, true 539}