A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "strings" 9 "time" 10 11 "Coves/internal/core/communities" 12) 13 14// CommunityEventConsumer consumes community-related events from Jetstream 15type CommunityEventConsumer struct { 16 repo communities.Repository 17} 18 19// NewCommunityEventConsumer creates a new Jetstream consumer for community events 20func NewCommunityEventConsumer(repo communities.Repository) *CommunityEventConsumer { 21 return &CommunityEventConsumer{ 22 repo: repo, 23 } 24} 25 26// HandleEvent processes a Jetstream event for community records 27// This is called by the main Jetstream consumer when it receives commit events 28func (c *CommunityEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 29 // We only care about commit events for community records 30 if event.Kind != "commit" || event.Commit == nil { 31 return nil 32 } 33 34 commit := event.Commit 35 36 // Route to appropriate handler based on collection 37 switch commit.Collection { 38 case "social.coves.community.profile": 39 return c.handleCommunityProfile(ctx, event.Did, commit) 40 case "social.coves.community.subscribe": 41 return c.handleSubscription(ctx, event.Did, commit) 42 case "social.coves.community.unsubscribe": 43 return c.handleUnsubscribe(ctx, event.Did, commit) 44 default: 45 // Not a community-related collection 46 return nil 47 } 48} 49 50// handleCommunityProfile processes community profile create/update/delete events 51func (c *CommunityEventConsumer) handleCommunityProfile(ctx context.Context, did string, commit *CommitEvent) error { 52 switch commit.Operation { 53 case "create": 54 return c.createCommunity(ctx, did, commit) 55 case "update": 56 return c.updateCommunity(ctx, did, commit) 57 case "delete": 58 return c.deleteCommunity(ctx, did) 59 default: 60 log.Printf("Unknown operation for community profile: %s", commit.Operation) 61 return nil 62 } 63} 64 65// createCommunity indexes a new community from the firehose 66func (c *CommunityEventConsumer) createCommunity(ctx context.Context, did string, commit *CommitEvent) error { 67 if commit.Record == nil { 68 return fmt.Errorf("community profile create event missing record data") 69 } 70 71 // Parse the community profile record 72 profile, err := parseCommunityProfile(commit.Record) 73 if err != nil { 74 return fmt.Errorf("failed to parse community profile: %w", err) 75 } 76 77 // Build AT-URI for this record 78 // IMPORTANT: 'did' parameter is the repository owner (instance DID) 79 // The community's DID comes from profile.Did field in the record 80 uri := fmt.Sprintf("at://%s/social.coves.community.profile/%s", did, commit.RKey) 81 82 // Create community entity 83 community := &communities.Community{ 84 DID: profile.Did, // Community's unique DID from record, not repo owner! 85 Handle: profile.Handle, 86 Name: profile.Name, 87 DisplayName: profile.DisplayName, 88 Description: profile.Description, 89 OwnerDID: profile.Owner, 90 CreatedByDID: profile.CreatedBy, 91 HostedByDID: profile.HostedBy, 92 Visibility: profile.Visibility, 93 AllowExternalDiscovery: profile.Federation.AllowExternalDiscovery, 94 ModerationType: profile.ModerationType, 95 ContentWarnings: profile.ContentWarnings, 96 MemberCount: profile.MemberCount, 97 SubscriberCount: profile.SubscriberCount, 98 FederatedFrom: profile.FederatedFrom, 99 FederatedID: profile.FederatedID, 100 CreatedAt: profile.CreatedAt, 101 UpdatedAt: time.Now(), 102 RecordURI: uri, 103 RecordCID: commit.CID, 104 } 105 106 // Handle blobs (avatar/banner) if present 107 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 108 community.AvatarCID = avatarCID 109 } 110 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 111 community.BannerCID = bannerCID 112 } 113 114 // Handle description facets (rich text) 115 if profile.DescriptionFacets != nil { 116 facetsJSON, err := json.Marshal(profile.DescriptionFacets) 117 if err == nil { 118 community.DescriptionFacets = facetsJSON 119 } 120 } 121 122 // Index in AppView database 123 _, err = c.repo.Create(ctx, community) 124 if err != nil { 125 // Check if it already exists (idempotency) 126 if communities.IsConflict(err) { 127 log.Printf("Community already indexed: %s (%s)", community.Handle, community.DID) 128 return nil 129 } 130 return fmt.Errorf("failed to index community: %w", err) 131 } 132 133 log.Printf("Indexed new community: %s (%s)", community.Handle, community.DID) 134 return nil 135} 136 137// updateCommunity updates an existing community from the firehose 138func (c *CommunityEventConsumer) updateCommunity(ctx context.Context, did string, commit *CommitEvent) error { 139 if commit.Record == nil { 140 return fmt.Errorf("community profile update event missing record data") 141 } 142 143 // Parse profile to get the community DID 144 profile, err := parseCommunityProfile(commit.Record) 145 if err != nil { 146 return fmt.Errorf("failed to parse community profile: %w", err) 147 } 148 149 // Get existing community using the community DID from the record, not repo owner 150 existing, err := c.repo.GetByDID(ctx, profile.Did) 151 if err != nil { 152 if communities.IsNotFound(err) { 153 // Community doesn't exist yet - treat as create 154 log.Printf("Community not found for update, creating: %s", profile.Did) 155 return c.createCommunity(ctx, did, commit) 156 } 157 return fmt.Errorf("failed to get existing community: %w", err) 158 } 159 160 // Update fields 161 existing.Handle = profile.Handle 162 existing.Name = profile.Name 163 existing.DisplayName = profile.DisplayName 164 existing.Description = profile.Description 165 existing.Visibility = profile.Visibility 166 existing.AllowExternalDiscovery = profile.Federation.AllowExternalDiscovery 167 existing.ModerationType = profile.ModerationType 168 existing.ContentWarnings = profile.ContentWarnings 169 existing.RecordCID = commit.CID 170 171 // Update blobs 172 if avatarCID, ok := extractBlobCID(profile.Avatar); ok { 173 existing.AvatarCID = avatarCID 174 } 175 if bannerCID, ok := extractBlobCID(profile.Banner); ok { 176 existing.BannerCID = bannerCID 177 } 178 179 // Update description facets 180 if profile.DescriptionFacets != nil { 181 facetsJSON, err := json.Marshal(profile.DescriptionFacets) 182 if err == nil { 183 existing.DescriptionFacets = facetsJSON 184 } 185 } 186 187 // Save updates 188 _, err = c.repo.Update(ctx, existing) 189 if err != nil { 190 return fmt.Errorf("failed to update community: %w", err) 191 } 192 193 log.Printf("Updated community: %s (%s)", existing.Handle, existing.DID) 194 return nil 195} 196 197// deleteCommunity removes a community from the index 198func (c *CommunityEventConsumer) deleteCommunity(ctx context.Context, did string) error { 199 err := c.repo.Delete(ctx, did) 200 if err != nil { 201 if communities.IsNotFound(err) { 202 log.Printf("Community already deleted: %s", did) 203 return nil 204 } 205 return fmt.Errorf("failed to delete community: %w", err) 206 } 207 208 log.Printf("Deleted community: %s", did) 209 return nil 210} 211 212// handleSubscription indexes a subscription event 213func (c *CommunityEventConsumer) handleSubscription(ctx context.Context, userDID string, commit *CommitEvent) error { 214 if commit.Operation != "create" { 215 return nil // Subscriptions are only created, not updated 216 } 217 218 if commit.Record == nil { 219 return fmt.Errorf("subscription event missing record data") 220 } 221 222 // Extract community DID from record 223 communityDID, ok := commit.Record["community"].(string) 224 if !ok { 225 return fmt.Errorf("subscription record missing community field") 226 } 227 228 // Build AT-URI for subscription record 229 uri := fmt.Sprintf("at://%s/social.coves.community.subscribe/%s", userDID, commit.RKey) 230 231 // Create subscription 232 subscription := &communities.Subscription{ 233 UserDID: userDID, 234 CommunityDID: communityDID, 235 SubscribedAt: time.Now(), 236 RecordURI: uri, 237 RecordCID: commit.CID, 238 } 239 240 // Use transactional method to ensure subscription and count are atomically updated 241 // This is idempotent - safe for Jetstream replays 242 _, err := c.repo.SubscribeWithCount(ctx, subscription) 243 if err != nil { 244 return fmt.Errorf("failed to index subscription: %w", err) 245 } 246 247 log.Printf("Indexed subscription: %s -> %s", userDID, communityDID) 248 return nil 249} 250 251// handleUnsubscribe removes a subscription 252func (c *CommunityEventConsumer) handleUnsubscribe(ctx context.Context, userDID string, commit *CommitEvent) error { 253 if commit.Operation != "delete" { 254 return nil 255 } 256 257 // For unsubscribe, we need to extract the community DID from the record key or metadata 258 // This might need adjustment based on actual Jetstream structure 259 if commit.Record == nil { 260 return fmt.Errorf("unsubscribe event missing record data") 261 } 262 263 communityDID, ok := commit.Record["community"].(string) 264 if !ok { 265 return fmt.Errorf("unsubscribe record missing community field") 266 } 267 268 // Use transactional method to ensure unsubscribe and count are atomically updated 269 // This is idempotent - safe for Jetstream replays 270 err := c.repo.UnsubscribeWithCount(ctx, userDID, communityDID) 271 if err != nil { 272 return fmt.Errorf("failed to remove subscription: %w", err) 273 } 274 275 log.Printf("Removed subscription: %s -> %s", userDID, communityDID) 276 return nil 277} 278 279// Helper types and functions 280 281type CommunityProfile struct { 282 Did string `json:"did"` // Community's unique DID 283 Handle string `json:"handle"` 284 Name string `json:"name"` 285 DisplayName string `json:"displayName"` 286 Description string `json:"description"` 287 DescriptionFacets []interface{} `json:"descriptionFacets"` 288 Avatar map[string]interface{} `json:"avatar"` 289 Banner map[string]interface{} `json:"banner"` 290 Owner string `json:"owner"` 291 CreatedBy string `json:"createdBy"` 292 HostedBy string `json:"hostedBy"` 293 Visibility string `json:"visibility"` 294 Federation FederationConfig `json:"federation"` 295 ModerationType string `json:"moderationType"` 296 ContentWarnings []string `json:"contentWarnings"` 297 MemberCount int `json:"memberCount"` 298 SubscriberCount int `json:"subscriberCount"` 299 FederatedFrom string `json:"federatedFrom"` 300 FederatedID string `json:"federatedId"` 301 CreatedAt time.Time `json:"createdAt"` 302} 303 304type FederationConfig struct { 305 AllowExternalDiscovery bool `json:"allowExternalDiscovery"` 306} 307 308// parseCommunityProfile converts a raw record map to a CommunityProfile 309func parseCommunityProfile(record map[string]interface{}) (*CommunityProfile, error) { 310 recordJSON, err := json.Marshal(record) 311 if err != nil { 312 return nil, fmt.Errorf("failed to marshal record: %w", err) 313 } 314 315 var profile CommunityProfile 316 if err := json.Unmarshal(recordJSON, &profile); err != nil { 317 return nil, fmt.Errorf("failed to unmarshal profile: %w", err) 318 } 319 320 return &profile, nil 321} 322 323// extractBlobCID extracts the CID from a blob reference 324// Blob format: {"$type": "blob", "ref": {"$link": "cid"}, "mimeType": "...", "size": 123} 325func extractBlobCID(blob map[string]interface{}) (string, bool) { 326 if blob == nil { 327 return "", false 328 } 329 330 // Check if it's a blob type 331 blobType, ok := blob["$type"].(string) 332 if !ok || blobType != "blob" { 333 return "", false 334 } 335 336 // Extract ref 337 ref, ok := blob["ref"].(map[string]interface{}) 338 if !ok { 339 return "", false 340 } 341 342 // Extract $link (the CID) 343 link, ok := ref["$link"].(string) 344 if !ok { 345 return "", false 346 } 347 348 return link, true 349} 350 351// validateHandle checks if a handle matches expected format (!name@instance) 352func validateHandle(handle string) bool { 353 if !strings.HasPrefix(handle, "!") { 354 return false 355 } 356 357 parts := strings.Split(handle, "@") 358 if len(parts) != 2 { 359 return false 360 } 361 362 return true 363}