A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/core/communities" 5 "context" 6 "encoding/json" 7 "fmt" 8 "log" 9 "time" 10) 11 12// CommunityEventConsumer consumes community-related events from Jetstream 13type CommunityEventConsumer struct { 14 repo communities.Repository 15} 16 17// NewCommunityEventConsumer creates a new Jetstream consumer for community events 18func NewCommunityEventConsumer(repo communities.Repository) *CommunityEventConsumer { 19 return &CommunityEventConsumer{ 20 repo: repo, 21 } 22} 23 24// HandleEvent processes a Jetstream event for community records 25// This is called by the main Jetstream consumer when it receives commit events 26func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 27 // We only care about commit events for community records 28 if event.Kind != "commit" || event.Commit == nil { 29 return nil 30 } 31 32 commit := event.Commit 33 34 // Route to appropriate handler based on collection 35 switch commit.Collection { 36 case "social.coves.community.profile": 37 return c.handleCommunityProfile(ctx, event.Did, commit) 38 case "social.coves.community.subscribe": 39 return c.handleSubscription(ctx, event.Did, commit) 40 case "social.coves.community.unsubscribe": 41 return c.handleUnsubscribe(ctx, event.Did, commit) 42 default: 43 // Not a community-related collection 44 return nil 45 } 46} 47 48// handleCommunityProfile processes community profile create/update/delete events 49func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error { 50 switch commit.Operation { 51 case "create": 52 return c.createCommunity(ctx, did, commit) 53 case "update": 54 return c.updateCommunity(ctx, did, commit) 55 case "delete": 56 return c.deleteCommunity(ctx, did) 57 default: 58 log.Printf("Unknown operation for community profile: %s", commit.Operation) 59 return nil 60 } 61} 62 63// createCommunity indexes a new community from the firehose 64func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error { 65 if commit.Record == nil { 66 return fmt.Errorf("community profile create event missing record data") 67 } 68 69 // Parse the community profile record 70 profile, err := parseCommunityProfile(commit.Record) 71 if err != nil { 72 return fmt.Errorf("failed to parse community profile: %w", err) 73 } 74 75 // Build AT-URI for this record 76 // V2 Architecture (ONLY): 77 // - 'did' parameter IS the community DID (community owns its own repo) 78 // - rkey MUST be "self" for community profiles 79 // - URI: at://community_did/social.coves.community.profile/self 80 81 // REJECT non-V2 communities (pre-production: no V1 compatibility) 82 if commit.RKey != "self" { 83 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 84 } 85 86 uri := fmt.Sprintf("at://%s/social.coves.community.profile/self", did) 87 88 // V2: Community ALWAYS owns itself 89 ownerDID := did 90 91 // Create community entity 92 community := &communities.Community{ 93 DID: did, // V2: Repository DID IS the community DID 94 Handle: profile.Handle, 95 Name: profile.Name, 96 DisplayName: profile.DisplayName, 97 Description: profile.Description, 98 OwnerDID: ownerDID, // V2: same as DID (self-owned) 99 CreatedByDID: profile.CreatedBy, 100 HostedByDID: profile.HostedBy, 101 Visibility: profile.Visibility, 102 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery, 103 ModerationType: profile.ModerationType, 104 ContentWarnings: profile.ContentWarnings, 105 MemberCount: profile.MemberCount, 106 SubscriberCount: profile.SubscriberCount, 107 FederatedFrom: profile.FederatedFrom, 108 FederatedID: profile.FederatedID, 109 CreatedAt: profile.CreatedAt, 110 UpdatedAt: time.Now(), 111 RecordURI: uri, 112 RecordCID: commit.CID, 113 } 114 115 // Handle blobs (avatar/banner) if present 116 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 117 community.AvatarCID = avatarCID 118 } 119 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 120 community.BannerCID = bannerCID 121 } 122 123 // Handle description facets (rich text) 124 if profile.DescriptionFacets != nil { 125 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 126 if marshalErr == nil { 127 community.DescriptionFacets = facetsJSON 128 } 129 } 130 131 // Index in AppView database 132 _, err = c.repo.Create(ctx, community) 133 if err != nil { 134 // Check if it already exists (idempotency) 135 if communities.IsConflict(err) { 136 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID) 137 return nil 138 } 139 return fmt.Errorf("failed to index community: %w", err) 140 } 141 142 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID) 143 return nil 144} 145 146// updateCommunity updates an existing community from the firehose 147func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error { 148 if commit.Record == nil { 149 return fmt.Errorf("community profile update event missing record data") 150 } 151 152 // REJECT non-V2 communities (pre-production: no V1 compatibility) 153 if commit.RKey != "self" { 154 return fmt.Errorf("invalid community profile rkey: expected 'self', got '%s' (V1 communities not supported)", commit.RKey) 155 } 156 157 // Parse profile 158 profile, err := parseCommunityProfile(commit.Record) 159 if err != nil { 160 return fmt.Errorf("failed to parse community profile: %w", err) 161 } 162 163 // V2: Repository DID IS the community DID 164 // Get existing community using the repo DID 165 existing, err := c.repo.GetByDID(ctx, did) 166 if err != nil { 167 if communities.IsNotFound(err) { 168 // Community doesn't exist yet - treat as create 169 log.Printf("Community not found for update, creating: %s", did) 170 return c.createCommunity(ctx, did, commit) 171 } 172 return fmt.Errorf("failed to get existing community: %w", err) 173 } 174 175 // Update fields 176 existing.Handle = profile.Handle 177 existing.Name = profile.Name 178 existing.DisplayName = profile.DisplayName 179 existing.Description = profile.Description 180 existing.Visibility = profile.Visibility 181 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery 182 existing.ModerationType = profile.ModerationType 183 existing.ContentWarnings = profile.ContentWarnings 184 existing.RecordCID = commit.CID 185 186 // Update blobs 187 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 188 existing.AvatarCID = avatarCID 189 } 190 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 191 existing.BannerCID = bannerCID 192 } 193 194 // Update description facets 195 if profile.DescriptionFacets != nil { 196 facetsJSON, marshalErr := json.Marshal(profile.DescriptionFacets) 197 if marshalErr == nil { 198 existing.DescriptionFacets = facetsJSON 199 } 200 } 201 202 // Save updates 203 _, err = c.repo.Update(ctx, existing) 204 if err != nil { 205 return fmt.Errorf("failed to update community: %w", err) 206 } 207 208 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID) 209 return nil 210} 211 212// deleteCommunity removes a community from the index 213func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error { 214 err := c.repo.Delete(ctx, did) 215 if err != nil { 216 if communities.IsNotFound(err) { 217 log.Printf("Community already deleted: %s", did) 218 return nil 219 } 220 return fmt.Errorf("failed to delete community: %w", err) 221 } 222 223 log.Printf("Deleted community: %s", did) 224 return nil 225} 226 227// handleSubscription indexes a subscription event 228func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 229 if commit.Operation != "create" { 230 return nil // Subscriptions are only created, not updated 231 } 232 233 if commit.Record == nil { 234 return fmt.Errorf("subscription event missing record data") 235 } 236 237 // Extract community DID from record 238 communityDID, ok := commit.Record["community"].(string) 239 if !ok { 240 return fmt.Errorf("subscription record missing community field") 241 } 242 243 // Build AT-URI for subscription record 244 uri := fmt.Sprintf("at://%s/social.coves.community.subscribe/%s", userDID, commit.RKey) 245 246 // Create subscription 247 subscription := &communities.Subscription{ 248 UserDID: userDID, 249 CommunityDID: communityDID, 250 SubscribedAt: time.Now(), 251 RecordURI: uri, 252 RecordCID: commit.CID, 253 } 254 255 // Use transactional method to ensure subscription and count are atomically updated 256 // This is idempotent - safe for Jetstream replays 257 _, err := c.repo.SubscribeWithCount(ctx, subscription) 258 if err != nil { 259 return fmt.Errorf("failed to index subscription: %w", err) 260 } 261 262 log.Printf("Indexed subscription: %s -> %s", userDID, communityDID) 263 return nil 264} 265 266// handleUnsubscribe removes a subscription 267func (c *CommunityEventConsumer) handleUnsubscribe(ctx context.Context, userDID string, commit *CommitEvent) error { 268 if commit.Operation != "delete" { 269 return nil 270 } 271 272 // For unsubscribe, we need to extract the community DID from the record key or metadata 273 // This might need adjustment based on actual Jetstream structure 274 if commit.Record == nil { 275 return fmt.Errorf("unsubscribe event missing record data") 276 } 277 278 communityDID, ok := commit.Record["community"].(string) 279 if !ok { 280 return fmt.Errorf("unsubscribe record missing community field") 281 } 282 283 // Use transactional method to ensure unsubscribe and count are atomically updated 284 // This is idempotent - safe for Jetstream replays 285 err := c.repo.UnsubscribeWithCount(ctx, userDID, communityDID) 286 if err != nil { 287 return fmt.Errorf("failed to remove subscription: %w", err) 288 } 289 290 log.Printf("Removed subscription: %s -> %s", userDID, communityDID) 291 return nil 292} 293 294// Helper types and functions 295 296type CommunityProfile struct { 297 CreatedAt time.Time `json:"createdAt"` 298 Avatar map[string]interface{} `json:"avatar"` 299 Banner map[string]interface{} `json:"banner"` 300 CreatedBy string `json:"createdBy"` 301 Visibility string `json:"visibility"` 302 AtprotoHandle string `json:"atprotoHandle"` 303 DisplayName string `json:"displayName"` 304 Name string `json:"name"` 305 Handle string `json:"handle"` 306 HostedBy string `json:"hostedBy"` 307 Description string `json:"description"` 308 FederatedID string `json:"federatedId"` 309 ModerationType string `json:"moderationType"` 310 FederatedFrom string `json:"federatedFrom"` 311 ContentWarnings []string `json:"contentWarnings"` 312 DescriptionFacets []interface{} `json:"descriptionFacets"` 313 MemberCount int `json:"memberCount"` 314 SubscriberCount int `json:"subscriberCount"` 315 Federation FederationConfig `json:"federation"` 316} 317 318type FederationConfig struct { 319 AllowExternalDiscovery bool `json:"allowExternalDiscovery"` 320} 321 322// parseCommunityProfile converts a raw record map to a CommunityProfile 323func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) { 324 recordJSON, err := json.Marshal(record) 325 if err != nil { 326 return nil, fmt.Errorf("failed to marshal record: %w", err) 327 } 328 329 var profile CommunityProfile 330 if err := json.Unmarshal(recordJSON, &profile); err != nil { 331 return nil, fmt.Errorf("failed to unmarshal profile: %w", err) 332 } 333 334 return &profile, nil 335} 336 337// extractBlobCID extracts the CID from a blob reference 338// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123} 339func extractBlobCID(blob map[string]interface{}) (string, bool) { 340 if blob == nil { 341 return "", false 342 } 343 344 // Check if it's a blob type 345 blobType, ok := blob["$type"].(string) 346 if !ok || blobType != "blob" { 347 return "", false 348 } 349 350 // Extract ref 351 ref, ok := blob["ref"].(map[string]interface{}) 352 if !ok { 353 return "", false 354 } 355 356 // Extract $link (the CID) 357 link, ok := ref["$link"].(string) 358 if !ok { 359 return "", false 360 } 361 362 return link, true 363}