A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "strings" 9 "time" 10 11 "Coves/internal/core/communities" 12) 13 14// CommunityEventConsumer consumes community-related events from Jetstream 15type CommunityEventConsumer struct { 16 repo communities.Repository 17} 18 19// NewCommunityEventConsumer creates a new Jetstream consumer for community events 20func NewCommunityEventConsumer(repo communities.Repository) *CommunityEventConsumer { 21 return &CommunityEventConsumer{ 22 repo: repo, 23 } 24} 25 26// HandleEvent processes a Jetstream event for community records 27// This is called by the main Jetstream consumer when it receives commit events 28func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 29 // We only care about commit events for community records 30 if event.Kind != "commit" || event.Commit == nil { 31 return nil 32 } 33 34 commit := event.Commit 35 36 // Route to appropriate handler based on collection 37 switch commit.Collection { 38 case "social.coves.community.profile": 39 return c.handleCommunityProfile(ctx, event.Did, commit) 40 case "social.coves.community.subscribe": 41 return c.handleSubscription(ctx, event.Did, commit) 42 case "social.coves.community.unsubscribe": 43 return c.handleUnsubscribe(ctx, event.Did, commit) 44 default: 45 // Not a community-related collection 46 return nil 47 } 48} 49 50// handleCommunityProfile processes community profile create/update/delete events 51func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error { 52 switch commit.Operation { 53 case "create": 54 return c.createCommunity(ctx, did, commit) 55 case "update": 56 return c.updateCommunity(ctx, did, commit) 57 case "delete": 58 return c.deleteCommunity(ctx, did) 59 default: 60 log.Printf("Unknown operation for community profile: %s", commit.Operation) 61 return nil 62 } 63} 64 65// createCommunity indexes a new community from the firehose 66func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error { 67 if commit.Record == nil { 68 return fmt.Errorf("community profile create event missing record data") 69 } 70 71 // Parse the community profile record 72 profile, err := parseCommunityProfile(commit.Record) 73 if err != nil { 74 return fmt.Errorf("failed to parse community profile: %w", err) 75 } 76 77 // Build AT-URI for this record 78 // V2 Architecture (ONLY): 79 // - 'did' parameter IS the community DID (community owns its own repo) 80 // - rkey MUST be "self" for community profiles 81 // - URI: at://community_did/social.coves.community.profile/self 82 83 // REJECT non-V2 communities (pre-production: no V1 compatibility) 84 if commit.RKey != "self" { 85 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 86 } 87 88 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did) 89 90 // V2: Community ALWAYS owns itself 91 ownerDID := did 92 93 // Create community entity 94 community := &communities.Community{ 95 DID: did, // V2: Repository DID IS the community DID 96 Handle: profile.Handle, 97 Name: profile.Name, 98 DisplayName: profile.DisplayName, 99 Description: profile.Description, 100 OwnerDID: ownerDID, // V2: same as DID (self-owned) 101 CreatedByDID: profile.CreatedBy, 102 HostedByDID: profile.HostedBy, 103 Visibility: profile.Visibility, 104 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery, 105 ModerationType: profile.ModerationType, 106 ContentWarnings: profile.ContentWarnings, 107 MemberCount: profile.MemberCount, 108 SubscriberCount: profile.SubscriberCount, 109 FederatedFrom: profile.FederatedFrom, 110 FederatedID: profile.FederatedID, 111 CreatedAt: profile.CreatedAt, 112 UpdatedAt: time.Now(), 113 RecordURI: uri, 114 RecordCID: commit.CID, 115 } 116 117 // Handle blobs (avatar/banner) if present 118 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 119 community.AvatarCID = avatarCID 120 } 121 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 122 community.BannerCID = bannerCID 123 } 124 125 // Handle description facets (rich text) 126 if profile.DescriptionFacets != nil { 127 facetsJSON, err := json.Marshal(profile.DescriptionFacets) 128 if err == nil { 129 community.DescriptionFacets = facetsJSON 130 } 131 } 132 133 // Index in AppView database 134 _, err = c.repo.Create(ctx, community) 135 if err != nil { 136 // Check if it already exists (idempotency) 137 if communities.IsConflict(err) { 138 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID) 139 return nil 140 } 141 return fmt.Errorf("failed to index community: %w", err) 142 } 143 144 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID) 145 return nil 146} 147 148// updateCommunity updates an existing community from the firehose 149func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error { 150 if commit.Record == nil { 151 return fmt.Errorf("community profile update event missing record data") 152 } 153 154 // REJECT non-V2 communities (pre-production: no V1 compatibility) 155 if commit.RKey != "self" { 156 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 157 } 158 159 // Parse profile 160 profile, err := parseCommunityProfile(commit.Record) 161 if err != nil { 162 return fmt.Errorf("failed to parse community profile: %w", err) 163 } 164 165 // V2: Repository DID IS the community DID 166 // Get existing community using the repo DID 167 existing, err := c.repo.GetByDID(ctx, did) 168 if err != nil { 169 if communities.IsNotFound(err) { 170 // Community doesn't exist yet - treat as create 171 log.Printf("Community not found for update, creating: %s", did) 172 return c.createCommunity(ctx, did, commit) 173 } 174 return fmt.Errorf("failed to get existing community: %w", err) 175 } 176 177 // Update fields 178 existing.Handle = profile.Handle 179 existing.Name = profile.Name 180 existing.DisplayName = profile.DisplayName 181 existing.Description = profile.Description 182 existing.Visibility = profile.Visibility 183 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery 184 existing.ModerationType = profile.ModerationType 185 existing.ContentWarnings = profile.ContentWarnings 186 existing.RecordCID = commit.CID 187 188 // Update blobs 189 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 190 existing.AvatarCID = avatarCID 191 } 192 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 193 existing.BannerCID = bannerCID 194 } 195 196 // Update description facets 197 if profile.DescriptionFacets != nil { 198 facetsJSON, err := json.Marshal(profile.DescriptionFacets) 199 if err == nil { 200 existing.DescriptionFacets = facetsJSON 201 } 202 } 203 204 // Save updates 205 _, err = c.repo.Update(ctx, existing) 206 if err != nil { 207 return fmt.Errorf("failed to update community: %w", err) 208 } 209 210 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID) 211 return nil 212} 213 214// deleteCommunity removes a community from the index 215func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error { 216 err := c.repo.Delete(ctx, did) 217 if err != nil { 218 if communities.IsNotFound(err) { 219 log.Printf("Community already deleted: %s", did) 220 return nil 221 } 222 return fmt.Errorf("failed to delete community: %w", err) 223 } 224 225 log.Printf("Deleted community: %s", did) 226 return nil 227} 228 229// handleSubscription indexes a subscription event 230func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 231 if commit.Operation != "create" { 232 return nil // Subscriptions are only created, not updated 233 } 234 235 if commit.Record == nil { 236 return fmt.Errorf("subscription event missing record data") 237 } 238 239 // Extract community DID from record 240 communityDID, ok := commit.Record["community"].(string) 241 if !ok { 242 return fmt.Errorf("subscription record missing community field") 243 } 244 245 // Build AT-URI for subscription record 246 uri := fmt.Sprintf("at://%s/social.coves.community.subscribe/%s", userDID, commit.RKey) 247 248 // Create subscription 249 subscription := &communities.Subscription{ 250 UserDID: userDID, 251 CommunityDID: communityDID, 252 SubscribedAt: time.Now(), 253 RecordURI: uri, 254 RecordCID: commit.CID, 255 } 256 257 // Use transactional method to ensure subscription and count are atomically updated 258 // This is idempotent - safe for Jetstream replays 259 _, err := c.repo.SubscribeWithCount(ctx, subscription) 260 if err != nil { 261 return fmt.Errorf("failed to index subscription: %w", err) 262 } 263 264 log.Printf("Indexed subscription: %s -> %s", userDID, communityDID) 265 return nil 266} 267 268// handleUnsubscribe removes a subscription 269func (c *CommunityEventConsumer) handleUnsubscribe(ctx context.Context, userDID string, commit *CommitEvent) error { 270 if commit.Operation != "delete" { 271 return nil 272 } 273 274 // For unsubscribe, we need to extract the community DID from the record key or metadata 275 // This might need adjustment based on actual Jetstream structure 276 if commit.Record == nil { 277 return fmt.Errorf("unsubscribe event missing record data") 278 } 279 280 communityDID, ok := commit.Record["community"].(string) 281 if !ok { 282 return fmt.Errorf("unsubscribe record missing community field") 283 } 284 285 // Use transactional method to ensure unsubscribe and count are atomically updated 286 // This is idempotent - safe for Jetstream replays 287 err := c.repo.UnsubscribeWithCount(ctx, userDID, communityDID) 288 if err != nil { 289 return fmt.Errorf("failed to remove subscription: %w", err) 290 } 291 292 log.Printf("Removed subscription: %s -> %s", userDID, communityDID) 293 return nil 294} 295 296// Helper types and functions 297 298type CommunityProfile struct { 299 // V2 ONLY: No DID field (repo DID is authoritative) 300 Handle string `json:"handle"` // Scoped handle (!gaming@coves.social) 301 AtprotoHandle string `json:"atprotoHandle"` // Real atProto handle (gaming.communities.coves.social) 302 Name string `json:"name"` 303 DisplayName string `json:"displayName"` 304 Description string `json:"description"` 305 DescriptionFacets []interface{} `json:"descriptionFacets"` 306 Avatar map[string]interface{} `json:"avatar"` 307 Banner map[string]interface{} `json:"banner"` 308 // Owner field removed - V2 communities ALWAYS self-own (owner == repo DID) 309 CreatedBy string `json:"createdBy"` 310 HostedBy string `json:"hostedBy"` 311 Visibility string `json:"visibility"` 312 Federation FederationConfig `json:"federation"` 313 ModerationType string `json:"moderationType"` 314 ContentWarnings []string `json:"contentWarnings"` 315 MemberCount int `json:"memberCount"` 316 SubscriberCount int `json:"subscriberCount"` 317 FederatedFrom string `json:"federatedFrom"` 318 FederatedID string `json:"federatedId"` 319 CreatedAt time.Time `json:"createdAt"` 320} 321 322type FederationConfig struct { 323 AllowExternalDiscovery bool `json:"allowExternalDiscovery"` 324} 325 326// parseCommunityProfile converts a raw record map to a CommunityProfile 327func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) { 328 recordJSON, err := json.Marshal(record) 329 if err != nil { 330 return nil, fmt.Errorf("failed to marshal record: %w", err) 331 } 332 333 var profile CommunityProfile 334 if err := json.Unmarshal(recordJSON, &profile); err != nil { 335 return nil, fmt.Errorf("failed to unmarshal profile: %w", err) 336 } 337 338 return &profile, nil 339} 340 341// extractBlobCID extracts the CID from a blob reference 342// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123} 343func extractBlobCID(blob map[string]interface{}) (string, bool) { 344 if blob == nil { 345 return "", false 346 } 347 348 // Check if it's a blob type 349 blobType, ok := blob["$type"].(string) 350 if !ok || blobType != "blob" { 351 return "", false 352 } 353 354 // Extract ref 355 ref, ok := blob["ref"].(map[string]interface{}) 356 if !ok { 357 return "", false 358 } 359 360 // Extract $link (the CID) 361 link, ok := ref["$link"].(string) 362 if !ok { 363 return "", false 364 } 365 366 return link, true 367} 368 369// validateHandle checks if a handle matches expected format (!name@instance) 370func validateHandle(handle string) bool { 371 if !strings.HasPrefix(handle, "!") { 372 return false 373 } 374 375 parts := strings.Split(handle, "@") 376 if len(parts) != 2 { 377 return false 378 } 379 380 return true 381}