A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "time" 9 10 "Coves/internal/core/aggregators" 11) 12 13// AggregatorEventConsumer consumes aggregator-related events from Jetstream 14// Following Bluesky's pattern: feed generators (app.bsky.feed.generator) and labelers (app.bsky.labeler.service) 15type AggregatorEventConsumer struct { 16 repo aggregators.Repository // Repository for aggregator operations 17} 18 19// NewAggregatorEventConsumer creates a new Jetstream consumer for aggregator events 20func NewAggregatorEventConsumer(repo aggregators.Repository) *AggregatorEventConsumer { 21 return &AggregatorEventConsumer{ 22 repo: repo, 23 } 24} 25 26// HandleEvent processes a Jetstream event for aggregator records 27// This is called by the main Jetstream consumer when it receives commit events 28func (c *AggregatorEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 29 // We only care about commit events for aggregator records 30 if event.Kind != "commit" || event.Commit == nil { 31 return nil 32 } 33 34 commit := event.Commit 35 36 // Route to appropriate handler based on collection 37 // IMPORTANT: Collection names refer to RECORD TYPES in repositories 38 // - social.coves.aggregator.service: Service declaration (in aggregator's own repo, rkey="self") 39 // - social.coves.aggregator.authorization: Authorization (in community's repo, any rkey) 40 switch commit.Collection { 41 case "social.coves.aggregator.service": 42 return c.handleServiceDeclaration(ctx, event.Did, commit) 43 case "social.coves.aggregator.authorization": 44 return c.handleAuthorization(ctx, event.Did, commit) 45 default: 46 // Not an aggregator-related collection 47 return nil 48 } 49} 50 51// handleServiceDeclaration processes aggregator service declaration events 52// Service declarations are stored at: at://aggregator_did/social.coves.aggregator.service/self 53func (c *AggregatorEventConsumer) handleServiceDeclaration(ctx context.Context, did string, commit *CommitEvent) error { 54 switch commit.Operation { 55 case "create", "update": 56 // Both create and update are handled the same way (upsert) 57 return c.upsertAggregator(ctx, did, commit) 58 case "delete": 59 return c.deleteAggregator(ctx, did) 60 default: 61 log.Printf("Unknown operation for aggregator service: %s", commit.Operation) 62 return nil 63 } 64} 65 66// handleAuthorization processes authorization record events 67// Authorizations are stored at: at://community_did/social.coves.aggregator.authorization/{rkey} 68func (c *AggregatorEventConsumer) handleAuthorization(ctx context.Context, communityDID string, commit *CommitEvent) error { 69 switch commit.Operation { 70 case "create", "update": 71 // Both create and update are handled the same way (upsert) 72 return c.upsertAuthorization(ctx, communityDID, commit) 73 case "delete": 74 return c.deleteAuthorization(ctx, communityDID, commit) 75 default: 76 log.Printf("Unknown operation for aggregator authorization: %s", commit.Operation) 77 return nil 78 } 79} 80 81// upsertAggregator indexes or updates an aggregator service declaration 82func (c *AggregatorEventConsumer) upsertAggregator(ctx context.Context, did string, commit *CommitEvent) error { 83 if commit.Record == nil { 84 return fmt.Errorf("aggregator service event missing record data") 85 } 86 87 // Verify rkey is "self" (canonical location for service declaration) 88 // Following Bluesky's pattern: app.bsky.feed.generator and app.bsky.labeler.service use /self 89 if commit.RKey != "self" { 90 return fmt.Errorf("invalid aggregator service rkey: expected 'self', got '%s'", commit.RKey) 91 } 92 93 // Parse the service declaration record 94 service, err := parseAggregatorService(commit.Record) 95 if err != nil { 96 return fmt.Errorf("failed to parse aggregator service: %w", err) 97 } 98 99 // Validate DID matches repo DID (security check) 100 if service.DID != "" && service.DID != did { 101 return fmt.Errorf("service record DID (%s) does not match repo DID (%s)", service.DID, did) 102 } 103 104 // Build AT-URI for this record 105 uri := fmt.Sprintf("at://%s/social.coves.aggregator.service/self", did) 106 107 // Parse createdAt from service record 108 var createdAt time.Time 109 if service.CreatedAt != "" { 110 createdAt, err = time.Parse(time.RFC3339, service.CreatedAt) 111 if err != nil { 112 createdAt = time.Now() // Fallback 113 log.Printf("Warning: invalid createdAt format for aggregator %s: %v", did, err) 114 } 115 } else { 116 createdAt = time.Now() 117 } 118 119 // Extract avatar CID from blob if present 120 var avatarCID string 121 if service.Avatar != nil { 122 if cid, ok := extractBlobCID(service.Avatar); ok { 123 avatarCID = cid 124 } 125 } 126 127 // Build aggregator domain model 128 agg := &aggregators.Aggregator{ 129 DID: did, 130 DisplayName: service.DisplayName, 131 Description: service.Description, 132 AvatarURL: avatarCID, // Now contains the CID from blob 133 MaintainerDID: service.MaintainerDID, 134 SourceURL: service.SourceURL, 135 CreatedAt: createdAt, 136 IndexedAt: time.Now(), 137 RecordURI: uri, 138 RecordCID: commit.CID, 139 } 140 141 // Handle config schema (JSONB) 142 if service.ConfigSchema != nil { 143 schemaBytes, err := json.Marshal(service.ConfigSchema) 144 if err != nil { 145 return fmt.Errorf("failed to marshal config schema: %w", err) 146 } 147 agg.ConfigSchema = schemaBytes 148 } 149 150 // Create or update in database 151 if err := c.repo.CreateAggregator(ctx, agg); err != nil { 152 return fmt.Errorf("failed to index aggregator: %w", err) 153 } 154 155 log.Printf("[AGGREGATOR-CONSUMER] Indexed service: %s (%s)", agg.DisplayName, did) 156 return nil 157} 158 159// deleteAggregator removes an aggregator from the index 160func (c *AggregatorEventConsumer) deleteAggregator(ctx context.Context, did string) error { 161 // Delete from database (cascade deletes authorizations and posts via FK) 162 if err := c.repo.DeleteAggregator(ctx, did); err != nil { 163 // Log but don't fail if not found (idempotent delete) 164 if aggregators.IsNotFound(err) { 165 log.Printf("[AGGREGATOR-CONSUMER] Aggregator not found for deletion: %s (already deleted?)", did) 166 return nil 167 } 168 return fmt.Errorf("failed to delete aggregator: %w", err) 169 } 170 171 log.Printf("[AGGREGATOR-CONSUMER] Deleted aggregator: %s", did) 172 return nil 173} 174 175// upsertAuthorization indexes or updates an authorization record 176func (c *AggregatorEventConsumer) upsertAuthorization(ctx context.Context, communityDID string, commit *CommitEvent) error { 177 if commit.Record == nil { 178 return fmt.Errorf("authorization event missing record data") 179 } 180 181 // Parse the authorization record 182 authRecord, err := parseAggregatorAuthorization(commit.Record) 183 if err != nil { 184 return fmt.Errorf("failed to parse authorization: %w", err) 185 } 186 187 // Validate communityDid matches repo DID (security check) 188 if authRecord.CommunityDid != "" && authRecord.CommunityDid != communityDID { 189 return fmt.Errorf("authorization record communityDid (%s) does not match repo DID (%s)", 190 authRecord.CommunityDid, communityDID) 191 } 192 193 // Build AT-URI for this record 194 uri := fmt.Sprintf("at://%s/social.coves.aggregator.authorization/%s", communityDID, commit.RKey) 195 196 // Parse createdAt from authorization record 197 var createdAt time.Time 198 if authRecord.CreatedAt != "" { 199 createdAt, err = time.Parse(time.RFC3339, authRecord.CreatedAt) 200 if err != nil { 201 createdAt = time.Now() // Fallback 202 log.Printf("Warning: invalid createdAt format for authorization %s: %v", uri, err) 203 } 204 } else { 205 createdAt = time.Now() 206 } 207 208 // Parse disabledAt from authorization record (optional, for modlog/audit) 209 var disabledAt *time.Time 210 if authRecord.DisabledAt != "" { 211 parsed, err := time.Parse(time.RFC3339, authRecord.DisabledAt) 212 if err != nil { 213 log.Printf("Warning: invalid disabledAt format for authorization %s: %v", uri, err) 214 } else { 215 disabledAt = &parsed 216 } 217 } 218 219 // Build authorization domain model 220 auth := &aggregators.Authorization{ 221 AggregatorDID: authRecord.Aggregator, 222 CommunityDID: communityDID, 223 Enabled: authRecord.Enabled, 224 CreatedBy: authRecord.CreatedBy, 225 DisabledBy: authRecord.DisabledBy, 226 DisabledAt: disabledAt, 227 CreatedAt: createdAt, 228 IndexedAt: time.Now(), 229 RecordURI: uri, 230 RecordCID: commit.CID, 231 } 232 233 // Handle config (JSONB) 234 if authRecord.Config != nil { 235 configBytes, err := json.Marshal(authRecord.Config) 236 if err != nil { 237 return fmt.Errorf("failed to marshal config: %w", err) 238 } 239 auth.Config = configBytes 240 } 241 242 // Create or update in database 243 if err := c.repo.CreateAuthorization(ctx, auth); err != nil { 244 return fmt.Errorf("failed to index authorization: %w", err) 245 } 246 247 log.Printf("[AGGREGATOR-CONSUMER] Indexed authorization: community=%s, aggregator=%s, enabled=%v", 248 communityDID, authRecord.Aggregator, authRecord.Enabled) 249 return nil 250} 251 252// deleteAuthorization removes an authorization from the index 253func (c *AggregatorEventConsumer) deleteAuthorization(ctx context.Context, communityDID string, commit *CommitEvent) error { 254 // Build AT-URI to find the authorization 255 uri := fmt.Sprintf("at://%s/social.coves.aggregator.authorization/%s", communityDID, commit.RKey) 256 257 // Delete from database 258 if err := c.repo.DeleteAuthorizationByURI(ctx, uri); err != nil { 259 // Log but don't fail if not found (idempotent delete) 260 if aggregators.IsNotFound(err) { 261 log.Printf("[AGGREGATOR-CONSUMER] Authorization not found for deletion: %s (already deleted?)", uri) 262 return nil 263 } 264 return fmt.Errorf("failed to delete authorization: %w", err) 265 } 266 267 log.Printf("[AGGREGATOR-CONSUMER] Deleted authorization: %s", uri) 268 return nil 269} 270 271// ===== Record Parsing Functions ===== 272 273// AggregatorServiceRecord represents the service declaration record structure 274type AggregatorServiceRecord struct { 275 Type string `json:"$type"` 276 DID string `json:"did"` // DID of aggregator (must match repo DID) 277 DisplayName string `json:"displayName"` 278 Description string `json:"description,omitempty"` 279 Avatar map[string]interface{} `json:"avatar,omitempty"` // Blob reference (CID will be extracted) 280 ConfigSchema map[string]interface{} `json:"configSchema,omitempty"` // JSON Schema 281 MaintainerDID string `json:"maintainer,omitempty"` // Fixed: was maintainerDid 282 SourceURL string `json:"sourceUrl,omitempty"` // Fixed: was homepageUrl 283 CreatedAt string `json:"createdAt"` 284} 285 286// parseAggregatorService parses an aggregator service record 287func parseAggregatorService(record interface{}) (*AggregatorServiceRecord, error) { 288 recordBytes, err := json.Marshal(record) 289 if err != nil { 290 return nil, fmt.Errorf("failed to marshal record: %w", err) 291 } 292 293 var service AggregatorServiceRecord 294 if err := json.Unmarshal(recordBytes, &service); err != nil { 295 return nil, fmt.Errorf("failed to unmarshal service record: %w", err) 296 } 297 298 // Validate required fields 299 if service.DisplayName == "" { 300 return nil, fmt.Errorf("displayName is required") 301 } 302 303 return &service, nil 304} 305 306// Note: extractBlobCID is defined in community_consumer.go and shared across consumers 307 308// AggregatorAuthorizationRecord represents the authorization record structure 309type AggregatorAuthorizationRecord struct { 310 Config map[string]interface{} `json:"config,omitempty"` 311 Type string `json:"$type"` 312 Aggregator string `json:"aggregatorDid"` 313 CommunityDid string `json:"communityDid"` 314 CreatedBy string `json:"createdBy"` 315 DisabledBy string `json:"disabledBy,omitempty"` 316 DisabledAt string `json:"disabledAt,omitempty"` 317 CreatedAt string `json:"createdAt"` 318 Enabled bool `json:"enabled"` 319} 320 321// parseAggregatorAuthorization parses an aggregator authorization record 322func parseAggregatorAuthorization(record interface{}) (*AggregatorAuthorizationRecord, error) { 323 recordBytes, err := json.Marshal(record) 324 if err != nil { 325 return nil, fmt.Errorf("failed to marshal record: %w", err) 326 } 327 328 var auth AggregatorAuthorizationRecord 329 if err := json.Unmarshal(recordBytes, &auth); err != nil { 330 return nil, fmt.Errorf("failed to unmarshal authorization record: %w", err) 331 } 332 333 // Validate required fields per lexicon 334 if auth.Aggregator == "" { 335 return nil, fmt.Errorf("aggregatorDid is required") 336 } 337 if auth.CommunityDid == "" { 338 return nil, fmt.Errorf("communityDid is required") 339 } 340 if auth.CreatedAt == "" { 341 return nil, fmt.Errorf("createdAt is required") 342 } 343 if auth.CreatedBy == "" { 344 return nil, fmt.Errorf("createdBy is required") 345 } 346 347 return &auth, nil 348}