A community based topic aggregation platform built on atproto

feat(server): integrate post creation endpoints and consumer

Wire up post creation feature in main server:

1. Initialize post service
- Create post repository (PostgreSQL)
- Create post service with community service integration
- Configure with default PDS URL

2. Register XRPC routes
- POST /xrpc/social.coves.post.create
- Requires OAuth authentication via middleware

3. Start Jetstream consumer for posts
- WebSocket URL: ws://localhost:6008/subscribe
- Collection filter: social.coves.post.record
- Runs in background goroutine
- Indexes CREATE operations (UPDATE/DELETE deferred)

Environment variables:
- POST_JETSTREAM_URL: Override default Jetstream endpoint

Initialization order ensures postRepo created before consumer.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+30
cmd
server
+30
cmd/server/main.go
···
"Coves/internal/atproto/identity"
"Coves/internal/atproto/jetstream"
"Coves/internal/core/communities"
+
"Coves/internal/core/posts"
"Coves/internal/core/users"
"bytes"
"context"
···
log.Println("Started JWKS cache cleanup background job (runs hourly)")
+
// Initialize post service
+
postRepo := postgresRepo.NewPostRepository(db)
+
postService := posts.NewPostService(postRepo, communityService, defaultPDS)
+
+
// Start Jetstream consumer for posts
+
// This consumer indexes posts created in community repositories via the firehose
+
// Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist
+
postJetstreamURL := os.Getenv("POST_JETSTREAM_URL")
+
if postJetstreamURL == "" {
+
// Listen to post record creation events
+
postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.post.record"
+
}
+
+
postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
+
postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL)
+
+
go func() {
+
if startErr := postJetstreamConnector.Start(ctx); startErr != nil {
+
log.Printf("Post Jetstream consumer stopped: %v", startErr)
+
}
+
}()
+
+
log.Printf("Started Jetstream post consumer: %s", postJetstreamURL)
+
log.Println(" - Indexing: social.coves.post.record CREATE operations")
+
log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented")
+
// Register XRPC routes
routes.RegisterUserRoutes(r, userService)
routes.RegisterCommunityRoutes(r, communityService, authMiddleware)
log.Println("Community XRPC endpoints registered with OAuth authentication")
+
+
routes.RegisterPostRoutes(r, postService, authMiddleware)
+
log.Println("Post XRPC endpoints registered with OAuth authentication")
r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)