···
"Coves/internal/atproto/auth"
"Coves/internal/atproto/identity"
"Coves/internal/atproto/jetstream"
9
+
"Coves/internal/core/aggregators"
"Coves/internal/core/communities"
"Coves/internal/core/communityFeeds"
"Coves/internal/core/posts"
···
log.Println("Started JWKS cache cleanup background job (runs hourly)")
263
-
// Initialize post service
264
+
// Initialize aggregator service
265
+
aggregatorRepo := postgresRepo.NewAggregatorRepository(db)
266
+
aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService)
267
+
log.Println("✅ Aggregator service initialized")
269
+
// Initialize post service (with aggregator support)
postRepo := postgresRepo.NewPostRepository(db)
265
-
postService := posts.NewPostService(postRepo, communityService, defaultPDS)
271
+
postService := posts.NewPostService(postRepo, communityService, aggregatorService, defaultPDS)
// Initialize feed service
feedRepo := postgresRepo.NewCommunityFeedRepository(db)
···
log.Println(" - Indexing: social.coves.post.record CREATE operations")
log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented")
300
+
// Start Jetstream consumer for aggregators
301
+
// This consumer indexes aggregator service declarations and authorization records
302
+
// Following Bluesky's pattern for feed generators and labelers
303
+
// NOTE: Uses the same Jetstream as communities, just filtering different collections
304
+
aggregatorJetstreamURL := communityJetstreamURL
305
+
// Override if specific URL needed for testing
306
+
if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" {
307
+
aggregatorJetstreamURL = envURL
308
+
} else if aggregatorJetstreamURL == "" {
309
+
// Fallback if community URL also not set
310
+
aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization"
313
+
aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo)
314
+
aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL)
317
+
if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil {
318
+
log.Printf("Aggregator Jetstream consumer stopped: %v", startErr)
322
+
log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL)
323
+
log.Println(" - Indexing: social.coves.aggregator.service (service declarations)")
324
+
log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)")
routes.RegisterUserRoutes(r, userService)
routes.RegisterCommunityRoutes(r, communityService, authMiddleware)
···
routes.RegisterCommunityFeedRoutes(r, feedService)
log.Println("Feed XRPC endpoints registered (public, no auth required)")
337
+
routes.RegisterAggregatorRoutes(r, aggregatorService)
338
+
log.Println("Aggregator XRPC endpoints registered (query endpoints public)")
r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)