A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/core/aggregators" 5 "context" 6 "encoding/json" 7 "fmt" 8 "log" 9 "time" 10) 11 12// AggregatorEventConsumer consumes aggregator-related events from Jetstream 13// Following Bluesky's pattern: feed generators (app.bsky.feed.generator) and labelers (app.bsky.labeler.service) 14type AggregatorEventConsumer struct { 15 repo aggregators.Repository // Repository for aggregator operations 16} 17 18// NewAggregatorEventConsumer creates a new Jetstream consumer for aggregator events 19func NewAggregatorEventConsumer(repo aggregators.Repository) *AggregatorEventConsumer { 20 return &AggregatorEventConsumer{ 21 repo: repo, 22 } 23} 24 25// HandleEvent processes a Jetstream event for aggregator records 26// This is called by the main Jetstream consumer when it receives commit events 27func (c *AggregatorEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 28 // We only care about commit events for aggregator records 29 if event.Kind != "commit" || event.Commit == nil { 30 return nil 31 } 32 33 commit := event.Commit 34 35 // Route to appropriate handler based on collection 36 // IMPORTANT: Collection names refer to RECORD TYPES in repositories 37 // - social.coves.aggregator.service: Service declaration (in aggregator's own repo, rkey="self") 38 // - social.coves.aggregator.authorization: Authorization (in community's repo, any rkey) 39 switch commit.Collection { 40 case "social.coves.aggregator.service": 41 return c.handleServiceDeclaration(ctx, event.Did, commit) 42 case "social.coves.aggregator.authorization": 43 return c.handleAuthorization(ctx, event.Did, commit) 44 default: 45 // Not an aggregator-related collection 46 return nil 47 } 48} 49 50// handleServiceDeclaration processes aggregator service declaration events 51// Service declarations are stored at: at://aggregator_did/social.coves.aggregator.service/self 52func (c *AggregatorEventConsumer) handleServiceDeclaration(ctx context.Context, did string, commit *CommitEvent) error { 53 switch commit.Operation { 54 case "create", "update": 55 // Both create and update are handled the same way (upsert) 56 return c.upsertAggregator(ctx, did, commit) 57 case "delete": 58 return c.deleteAggregator(ctx, did) 59 default: 60 log.Printf("Unknown operation for aggregator service: %s", commit.Operation) 61 return nil 62 } 63} 64 65// handleAuthorization processes authorization record events 66// Authorizations are stored at: at://community_did/social.coves.aggregator.authorization/{rkey} 67func (c *AggregatorEventConsumer) handleAuthorization(ctx context.Context, communityDID string, commit *CommitEvent) error { 68 switch commit.Operation { 69 case "create", "update": 70 // Both create and update are handled the same way (upsert) 71 return c.upsertAuthorization(ctx, communityDID, commit) 72 case "delete": 73 return c.deleteAuthorization(ctx, communityDID, commit) 74 default: 75 log.Printf("Unknown operation for aggregator authorization: %s", commit.Operation) 76 return nil 77 } 78} 79 80// upsertAggregator indexes or updates an aggregator service declaration 81func (c *AggregatorEventConsumer) upsertAggregator(ctx context.Context, did string, commit *CommitEvent) error { 82 if commit.Record == nil { 83 return fmt.Errorf("aggregator service event missing record data") 84 } 85 86 // Verify rkey is "self" (canonical location for service declaration) 87 // Following Bluesky's pattern: app.bsky.feed.generator and app.bsky.labeler.service use /self 88 if commit.RKey != "self" { 89 return fmt.Errorf("invalid aggregator service rkey: expected 'self', got '%s'", commit.RKey) 90 } 91 92 // Parse the service declaration record 93 service, err := parseAggregatorService(commit.Record) 94 if err != nil { 95 return fmt.Errorf("failed to parse aggregator service: %w", err) 96 } 97 98 // Validate DID matches repo DID (security check) 99 if service.DID != "" && service.DID != did { 100 return fmt.Errorf("service record DID (%s) does not match repo DID (%s)", service.DID, did) 101 } 102 103 // Build AT-URI for this record 104 uri := fmt.Sprintf("at://%s/social.coves.aggregator.service/self", did) 105 106 // Parse createdAt from service record 107 var createdAt time.Time 108 if service.CreatedAt != "" { 109 createdAt, err = time.Parse(time.RFC3339, service.CreatedAt) 110 if err != nil { 111 createdAt = time.Now() // Fallback 112 log.Printf("Warning: invalid createdAt format for aggregator %s: %v", did, err) 113 } 114 } else { 115 createdAt = time.Now() 116 } 117 118 // Extract avatar CID from blob if present 119 var avatarCID string 120 if service.Avatar != nil { 121 if cid, ok := extractBlobCID(service.Avatar); ok { 122 avatarCID = cid 123 } 124 } 125 126 // Build aggregator domain model 127 agg := &aggregators.Aggregator{ 128 DID: did, 129 DisplayName: service.DisplayName, 130 Description: service.Description, 131 AvatarURL: avatarCID, // Now contains the CID from blob 132 MaintainerDID: service.MaintainerDID, 133 SourceURL: service.SourceURL, 134 CreatedAt: createdAt, 135 IndexedAt: time.Now(), 136 RecordURI: uri, 137 RecordCID: commit.CID, 138 } 139 140 // Handle config schema (JSONB) 141 if service.ConfigSchema != nil { 142 schemaBytes, err := json.Marshal(service.ConfigSchema) 143 if err != nil { 144 return fmt.Errorf("failed to marshal config schema: %w", err) 145 } 146 agg.ConfigSchema = schemaBytes 147 } 148 149 // Create or update in database 150 if err := c.repo.CreateAggregator(ctx, agg); err != nil { 151 return fmt.Errorf("failed to index aggregator: %w", err) 152 } 153 154 log.Printf("[AGGREGATOR-CONSUMER] Indexed service: %s (%s)", agg.DisplayName, did) 155 return nil 156} 157 158// deleteAggregator removes an aggregator from the index 159func (c *AggregatorEventConsumer) deleteAggregator(ctx context.Context, did string) error { 160 // Delete from database (cascade deletes authorizations and posts via FK) 161 if err := c.repo.DeleteAggregator(ctx, did); err != nil { 162 // Log but don't fail if not found (idempotent delete) 163 if aggregators.IsNotFound(err) { 164 log.Printf("[AGGREGATOR-CONSUMER] Aggregator not found for deletion: %s (already deleted?)", did) 165 return nil 166 } 167 return fmt.Errorf("failed to delete aggregator: %w", err) 168 } 169 170 log.Printf("[AGGREGATOR-CONSUMER] Deleted aggregator: %s", did) 171 return nil 172} 173 174// upsertAuthorization indexes or updates an authorization record 175func (c *AggregatorEventConsumer) upsertAuthorization(ctx context.Context, communityDID string, commit *CommitEvent) error { 176 if commit.Record == nil { 177 return fmt.Errorf("authorization event missing record data") 178 } 179 180 // Parse the authorization record 181 authRecord, err := parseAggregatorAuthorization(commit.Record) 182 if err != nil { 183 return fmt.Errorf("failed to parse authorization: %w", err) 184 } 185 186 // Validate communityDid matches repo DID (security check) 187 if authRecord.CommunityDid != "" && authRecord.CommunityDid != communityDID { 188 return fmt.Errorf("authorization record communityDid (%s) does not match repo DID (%s)", 189 authRecord.CommunityDid, communityDID) 190 } 191 192 // Build AT-URI for this record 193 uri := fmt.Sprintf("at://%s/social.coves.aggregator.authorization/%s", communityDID, commit.RKey) 194 195 // Parse createdAt from authorization record 196 var createdAt time.Time 197 if authRecord.CreatedAt != "" { 198 createdAt, err = time.Parse(time.RFC3339, authRecord.CreatedAt) 199 if err != nil { 200 createdAt = time.Now() // Fallback 201 log.Printf("Warning: invalid createdAt format for authorization %s: %v", uri, err) 202 } 203 } else { 204 createdAt = time.Now() 205 } 206 207 // Parse disabledAt from authorization record (optional, for modlog/audit) 208 var disabledAt *time.Time 209 if authRecord.DisabledAt != "" { 210 parsed, err := time.Parse(time.RFC3339, authRecord.DisabledAt) 211 if err != nil { 212 log.Printf("Warning: invalid disabledAt format for authorization %s: %v", uri, err) 213 } else { 214 disabledAt = &parsed 215 } 216 } 217 218 // Build authorization domain model 219 auth := &aggregators.Authorization{ 220 AggregatorDID: authRecord.Aggregator, 221 CommunityDID: communityDID, 222 Enabled: authRecord.Enabled, 223 CreatedBy: authRecord.CreatedBy, 224 DisabledBy: authRecord.DisabledBy, 225 DisabledAt: disabledAt, 226 CreatedAt: createdAt, 227 IndexedAt: time.Now(), 228 RecordURI: uri, 229 RecordCID: commit.CID, 230 } 231 232 // Handle config (JSONB) 233 if authRecord.Config != nil { 234 configBytes, err := json.Marshal(authRecord.Config) 235 if err != nil { 236 return fmt.Errorf("failed to marshal config: %w", err) 237 } 238 auth.Config = configBytes 239 } 240 241 // Create or update in database 242 if err := c.repo.CreateAuthorization(ctx, auth); err != nil { 243 return fmt.Errorf("failed to index authorization: %w", err) 244 } 245 246 log.Printf("[AGGREGATOR-CONSUMER] Indexed authorization: community=%s, aggregator=%s, enabled=%v", 247 communityDID, authRecord.Aggregator, authRecord.Enabled) 248 return nil 249} 250 251// deleteAuthorization removes an authorization from the index 252func (c *AggregatorEventConsumer) deleteAuthorization(ctx context.Context, communityDID string, commit *CommitEvent) error { 253 // Build AT-URI to find the authorization 254 uri := fmt.Sprintf("at://%s/social.coves.aggregator.authorization/%s", communityDID, commit.RKey) 255 256 // Delete from database 257 if err := c.repo.DeleteAuthorizationByURI(ctx, uri); err != nil { 258 // Log but don't fail if not found (idempotent delete) 259 if aggregators.IsNotFound(err) { 260 log.Printf("[AGGREGATOR-CONSUMER] Authorization not found for deletion: %s (already deleted?)", uri) 261 return nil 262 } 263 return fmt.Errorf("failed to delete authorization: %w", err) 264 } 265 266 log.Printf("[AGGREGATOR-CONSUMER] Deleted authorization: %s", uri) 267 return nil 268} 269 270// ===== Record Parsing Functions ===== 271 272// AggregatorServiceRecord represents the service declaration record structure 273type AggregatorServiceRecord struct { 274 Type string `json:"$type"` 275 DID string `json:"did"` // DID of aggregator (must match repo DID) 276 DisplayName string `json:"displayName"` 277 Description string `json:"description,omitempty"` 278 Avatar map[string]interface{} `json:"avatar,omitempty"` // Blob reference (CID will be extracted) 279 ConfigSchema map[string]interface{} `json:"configSchema,omitempty"` // JSON Schema 280 MaintainerDID string `json:"maintainer,omitempty"` // Fixed: was maintainerDid 281 SourceURL string `json:"sourceUrl,omitempty"` // Fixed: was homepageUrl 282 CreatedAt string `json:"createdAt"` 283} 284 285// parseAggregatorService parses an aggregator service record 286func parseAggregatorService(record interface{}) (*AggregatorServiceRecord, error) { 287 recordBytes, err := json.Marshal(record) 288 if err != nil { 289 return nil, fmt.Errorf("failed to marshal record: %w", err) 290 } 291 292 var service AggregatorServiceRecord 293 if err := json.Unmarshal(recordBytes, &service); err != nil { 294 return nil, fmt.Errorf("failed to unmarshal service record: %w", err) 295 } 296 297 // Validate required fields 298 if service.DisplayName == "" { 299 return nil, fmt.Errorf("displayName is required") 300 } 301 302 return &service, nil 303} 304 305// Note: extractBlobCID is defined in community_consumer.go and shared across consumers 306 307// AggregatorAuthorizationRecord represents the authorization record structure 308type AggregatorAuthorizationRecord struct { 309 Config map[string]interface{} `json:"config,omitempty"` 310 Type string `json:"$type"` 311 Aggregator string `json:"aggregatorDid"` 312 CommunityDid string `json:"communityDid"` 313 CreatedBy string `json:"createdBy"` 314 DisabledBy string `json:"disabledBy,omitempty"` 315 DisabledAt string `json:"disabledAt,omitempty"` 316 CreatedAt string `json:"createdAt"` 317 Enabled bool `json:"enabled"` 318} 319 320// parseAggregatorAuthorization parses an aggregator authorization record 321func parseAggregatorAuthorization(record interface{}) (*AggregatorAuthorizationRecord, error) { 322 recordBytes, err := json.Marshal(record) 323 if err != nil { 324 return nil, fmt.Errorf("failed to marshal record: %w", err) 325 } 326 327 var auth AggregatorAuthorizationRecord 328 if err := json.Unmarshal(recordBytes, &auth); err != nil { 329 return nil, fmt.Errorf("failed to unmarshal authorization record: %w", err) 330 } 331 332 // Validate required fields per lexicon 333 if auth.Aggregator == "" { 334 return nil, fmt.Errorf("aggregatorDid is required") 335 } 336 if auth.CommunityDid == "" { 337 return nil, fmt.Errorf("communityDid is required") 338 } 339 if auth.CreatedAt == "" { 340 return nil, fmt.Errorf("createdAt is required") 341 } 342 if auth.CreatedBy == "" { 343 return nil, fmt.Errorf("createdBy is required") 344 } 345 346 return &auth, nil 347}