A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/core/communities" 5 "context" 6 "encoding/json" 7 "fmt" 8 "log" 9 "time" 10) 11 12// CommunityEventConsumer consumes community-related events from Jetstream 13type CommunityEventConsumer struct { 14 repo communities.Repository 15} 16 17// NewCommunityEventConsumer creates a new Jetstream consumer for community events 18func NewCommunityEventConsumer(repo communities.Repository) *CommunityEventConsumer { 19 return &CommunityEventConsumer{ 20 repo: repo, 21 } 22} 23 24// HandleEvent processes a Jetstream event for community records 25// This is called by the main Jetstream consumer when it receives commit events 26func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 27 // We only care about commit events for community records 28 if event.Kind != "commit" || event.Commit == nil { 29 return nil 30 } 31 32 commit := event.Commit 33 34 // Route to appropriate handler based on collection 35 // IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures 36 // - social.coves.community.profile: Community profile records (in community's own repo) 37 // - social.coves.community.subscription: Subscription records (in user's repo) 38 // 39 // XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints 40 // that CREATE or DELETE records in these collections 41 switch commit.Collection { 42 case "social.coves.community.profile": 43 return c.handleCommunityProfile(ctx, event.Did, commit) 44 case "social.coves.community.subscription": 45 // Handle both create (subscribe) and delete (unsubscribe) operations 46 return c.handleSubscription(ctx, event.Did, commit) 47 default: 48 // Not a community-related collection 49 return nil 50 } 51} 52 53// handleCommunityProfile processes community profile create/update/delete events 54func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error { 55 switch commit.Operation { 56 case "create": 57 return c.createCommunity(ctx, did, commit) 58 case "update": 59 return c.updateCommunity(ctx, did, commit) 60 case "delete": 61 return c.deleteCommunity(ctx, did) 62 default: 63 log.Printf("Unknown operation for community profile: %s", commit.Operation) 64 return nil 65 } 66} 67 68// createCommunity indexes a new community from the firehose 69func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error { 70 if commit.Record == nil { 71 return fmt.Errorf("community profile create event missing record data") 72 } 73 74 // Parse the community profile record 75 profile, err := parseCommunityProfile(commit.Record) 76 if err != nil { 77 return fmt.Errorf("failed to parse community profile: %w", err) 78 } 79 80 // Build AT-URI for this record 81 // V2 Architecture (ONLY): 82 // - 'did' parameter IS the community DID (community owns its own repo) 83 // - rkey MUST be "self" for community profiles 84 // - URI: at://community_did/social.coves.community.profile/self 85 86 // REJECT non-V2 communities (pre-production: no V1 compatibility) 87 if commit.RKey != "self" { 88 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 89 } 90 91 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did) 92 93 // V2: Community ALWAYS owns itself 94 ownerDID := did 95 96 // Create community entity 97 community := &communities.Community{ 98 DID: did, // V2: Repository DID IS the community DID 99 Handle: profile.Handle, 100 Name: profile.Name, 101 DisplayName: profile.DisplayName, 102 Description: profile.Description, 103 OwnerDID: ownerDID, // V2: same as DID (self-owned) 104 CreatedByDID: profile.CreatedBy, 105 HostedByDID: profile.HostedBy, 106 Visibility: profile.Visibility, 107 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery, 108 ModerationType: profile.ModerationType, 109 ContentWarnings: profile.ContentWarnings, 110 MemberCount: profile.MemberCount, 111 SubscriberCount: profile.SubscriberCount, 112 FederatedFrom: profile.FederatedFrom, 113 FederatedID: profile.FederatedID, 114 CreatedAt: profile.CreatedAt, 115 UpdatedAt: time.Now(), 116 RecordURI: uri, 117 RecordCID: commit.CID, 118 } 119 120 // Handle blobs (avatar/banner) if present 121 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 122 community.AvatarCID = avatarCID 123 } 124 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 125 community.BannerCID = bannerCID 126 } 127 128 // Handle description facets (rich text) 129 if profile.DescriptionFacets != nil { 130 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 131 if marshalErr == nil { 132 community.DescriptionFacets = facetsJSON 133 } 134 } 135 136 // Index in AppView database 137 _, err = c.repo.Create(ctx, community) 138 if err != nil { 139 // Check if it already exists (idempotency) 140 if communities.IsConflict(err) { 141 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID) 142 return nil 143 } 144 return fmt.Errorf("failed to index community: %w", err) 145 } 146 147 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID) 148 return nil 149} 150 151// updateCommunity updates an existing community from the firehose 152func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error { 153 if commit.Record == nil { 154 return fmt.Errorf("community profile update event missing record data") 155 } 156 157 // REJECT non-V2 communities (pre-production: no V1 compatibility) 158 if commit.RKey != "self" { 159 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 160 } 161 162 // Parse profile 163 profile, err := parseCommunityProfile(commit.Record) 164 if err != nil { 165 return fmt.Errorf("failed to parse community profile: %w", err) 166 } 167 168 // V2: Repository DID IS the community DID 169 // Get existing community using the repo DID 170 existing, err := c.repo.GetByDID(ctx, did) 171 if err != nil { 172 if communities.IsNotFound(err) { 173 // Community doesn't exist yet - treat as create 174 log.Printf("Community not found for update, creating: %s", did) 175 return c.createCommunity(ctx, did, commit) 176 } 177 return fmt.Errorf("failed to get existing community: %w", err) 178 } 179 180 // Update fields 181 existing.Handle = profile.Handle 182 existing.Name = profile.Name 183 existing.DisplayName = profile.DisplayName 184 existing.Description = profile.Description 185 existing.Visibility = profile.Visibility 186 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery 187 existing.ModerationType = profile.ModerationType 188 existing.ContentWarnings = profile.ContentWarnings 189 existing.RecordCID = commit.CID 190 191 // Update blobs 192 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 193 existing.AvatarCID = avatarCID 194 } 195 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 196 existing.BannerCID = bannerCID 197 } 198 199 // Update description facets 200 if profile.DescriptionFacets != nil { 201 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 202 if marshalErr == nil { 203 existing.DescriptionFacets = facetsJSON 204 } 205 } 206 207 // Save updates 208 _, err = c.repo.Update(ctx, existing) 209 if err != nil { 210 return fmt.Errorf("failed to update community: %w", err) 211 } 212 213 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID) 214 return nil 215} 216 217// deleteCommunity removes a community from the index 218func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error { 219 err := c.repo.Delete(ctx, did) 220 if err != nil { 221 if communities.IsNotFound(err) { 222 log.Printf("Community already deleted: %s", did) 223 return nil 224 } 225 return fmt.Errorf("failed to delete community: %w", err) 226 } 227 228 log.Printf("Deleted community: %s", did) 229 return nil 230} 231 232// handleSubscription processes subscription create/delete events 233// CREATE operation = user subscribed to community 234// DELETE operation = user unsubscribed from community 235func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 236 switch commit.Operation { 237 case "create": 238 return c.createSubscription(ctx, userDID, commit) 239 case "delete": 240 return c.deleteSubscription(ctx, userDID, commit) 241 default: 242 // Update operations shouldn't happen on subscriptions, but ignore gracefully 243 log.Printf("Ignoring unexpected operation on subscription: %s (userDID=%s, rkey=%s)", 244 commit.Operation, userDID, commit.RKey) 245 return nil 246 } 247} 248 249// createSubscription indexes a new subscription with retry logic 250func (c *CommunityEventConsumer) createSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 251 if commit.Record == nil { 252 return fmt.Errorf("subscription create event missing record data") 253 } 254 255 // Extract community DID from record's subject field (following atProto conventions) 256 communityDID, ok := commit.Record["subject"].(string) 257 if !ok { 258 return fmt.Errorf("subscription record missing subject field") 259 } 260 261 // Extract contentVisibility with clamping and default value 262 contentVisibility := extractContentVisibility(commit.Record) 263 264 // Build AT-URI for subscription record 265 // IMPORTANT: Collection is social.coves.community.subscription (record type), not the XRPC endpoint 266 // The record lives in the USER's repository, but uses the communities namespace 267 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 268 269 // Create subscription entity 270 subscription := &communities.Subscription{ 271 UserDID: userDID, 272 CommunityDID: communityDID, 273 ContentVisibility: contentVisibility, 274 SubscribedAt: time.Now(), 275 RecordURI: uri, 276 RecordCID: commit.CID, 277 } 278 279 // Use transactional method to ensure subscription and count are atomically updated 280 // This is idempotent - safe for Jetstream replays 281 _, err := c.repo.SubscribeWithCount(ctx, subscription) 282 if err != nil { 283 // If already exists, that's fine (idempotency) 284 if communities.IsConflict(err) { 285 log.Printf("Subscription already indexed: %s -> %s (visibility: %d)", 286 userDID, communityDID, contentVisibility) 287 return nil 288 } 289 return fmt.Errorf("failed to index subscription: %w", err) 290 } 291 292 log.Printf("✓ Indexed subscription: %s -> %s (visibility: %d)", 293 userDID, communityDID, contentVisibility) 294 return nil 295} 296 297// deleteSubscription removes a subscription from the index 298// DELETE operations don't include record data, so we need to look up the subscription 299// by its URI to find which community the user unsubscribed from 300func (c *CommunityEventConsumer) deleteSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 301 // Build AT-URI from the rkey 302 uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey) 303 304 // Look up the subscription to get the community DID 305 // (DELETE operations don't include record data in Jetstream) 306 subscription, err := c.repo.GetSubscriptionByURI(ctx, uri) 307 if err != nil { 308 if communities.IsNotFound(err) { 309 // Already deleted - this is fine (idempotency) 310 log.Printf("Subscription already deleted: %s", uri) 311 return nil 312 } 313 return fmt.Errorf("failed to find subscription for deletion: %w", err) 314 } 315 316 // Use transactional method to ensure unsubscribe and count are atomically updated 317 // This is idempotent - safe for Jetstream replays 318 err = c.repo.UnsubscribeWithCount(ctx, userDID, subscription.CommunityDID) 319 if err != nil { 320 if communities.IsNotFound(err) { 321 log.Printf("Subscription already removed: %s -> %s", userDID, subscription.CommunityDID) 322 return nil 323 } 324 return fmt.Errorf("failed to remove subscription: %w", err) 325 } 326 327 log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID) 328 return nil 329} 330 331// Helper types and functions 332 333type CommunityProfile struct { 334 CreatedAt time.Time `json:"createdAt"` 335 Avatar map[string]interface{} `json:"avatar"` 336 Banner map[string]interface{} `json:"banner"` 337 CreatedBy string `json:"createdBy"` 338 Visibility string `json:"visibility"` 339 AtprotoHandle string `json:"atprotoHandle"` 340 DisplayName string `json:"displayName"` 341 Name string `json:"name"` 342 Handle string `json:"handle"` 343 HostedBy string `json:"hostedBy"` 344 Description string `json:"description"` 345 FederatedID string `json:"federatedId"` 346 ModerationType string `json:"moderationType"` 347 FederatedFrom string `json:"federatedFrom"` 348 ContentWarnings []string `json:"contentWarnings"` 349 DescriptionFacets []interface{} `json:"descriptionFacets"` 350 MemberCount int `json:"memberCount"` 351 SubscriberCount int `json:"subscriberCount"` 352 Federation FederationConfig `json:"federation"` 353} 354 355type FederationConfig struct { 356 AllowExternalDiscovery bool `json:"allowExternalDiscovery"` 357} 358 359// parseCommunityProfile converts a raw record map to a CommunityProfile 360func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) { 361 recordJSON, err := json.Marshal(record) 362 if err != nil { 363 return nil, fmt.Errorf("failed to marshal record: %w", err) 364 } 365 366 var profile CommunityProfile 367 if err := json.Unmarshal(recordJSON, &profile); err != nil { 368 return nil, fmt.Errorf("failed to unmarshal profile: %w", err) 369 } 370 371 return &profile, nil 372} 373 374// extractContentVisibility extracts contentVisibility from subscription record with clamping 375// Returns default value of 3 if missing or invalid 376func extractContentVisibility(record map[string]interface{}) int { 377 const defaultVisibility = 3 378 379 cv, ok := record["contentVisibility"] 380 if !ok { 381 // Field missing - use default 382 return defaultVisibility 383 } 384 385 // JSON numbers decode as float64 386 cvFloat, ok := cv.(float64) 387 if !ok { 388 // Try int (shouldn't happen but handle gracefully) 389 if cvInt, isInt := cv.(int); isInt { 390 return clampContentVisibility(cvInt) 391 } 392 log.Printf("WARNING: contentVisibility has unexpected type %T, using default", cv) 393 return defaultVisibility 394 } 395 396 // Convert and clamp 397 clamped := clampContentVisibility(int(cvFloat)) 398 if clamped != int(cvFloat) { 399 log.Printf("WARNING: Clamped contentVisibility from %d to %d", int(cvFloat), clamped) 400 } 401 return clamped 402} 403 404// clampContentVisibility ensures value is within valid range (1-5) 405func clampContentVisibility(value int) int { 406 if value < 1 { 407 return 1 408 } 409 if value > 5 { 410 return 5 411 } 412 return value 413} 414 415// extractBlobCID extracts the CID from a blob reference 416// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123} 417func extractBlobCID(blob map[string]interface{}) (string, bool) { 418 if blob == nil { 419 return "", false 420 } 421 422 // Check if it's a blob type 423 blobType, ok := blob["$type"].(string) 424 if !ok || blobType != "blob" { 425 return "", false 426 } 427 428 // Extract ref 429 ref, ok := blob["ref"].(map[string]interface{}) 430 if !ok { 431 return "", false 432 } 433 434 // Extract $link (the CID) 435 link, ok := ref["$link"].(string) 436 if !ok { 437 return "", false 438 } 439 440 return link, true 441}