A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/api/routes"
6 "Coves/internal/atproto/auth"
7 "Coves/internal/atproto/identity"
8 "Coves/internal/atproto/jetstream"
9 "Coves/internal/core/aggregators"
10 "Coves/internal/core/communities"
11 "Coves/internal/core/communityFeeds"
12 "Coves/internal/core/discover"
13 "Coves/internal/core/posts"
14 "Coves/internal/core/timeline"
15 "Coves/internal/core/users"
16 "bytes"
17 "context"
18 "database/sql"
19 "encoding/json"
20 "fmt"
21 "io"
22 "log"
23 "net/http"
24 "os"
25 "strings"
26 "time"
27
28 "github.com/go-chi/chi/v5"
29 chiMiddleware "github.com/go-chi/chi/v5/middleware"
30 _ "github.com/lib/pq"
31 "github.com/pressly/goose/v3"
32
33 commentsAPI "Coves/internal/api/handlers/comments"
34 "Coves/internal/core/comments"
35 postgresRepo "Coves/internal/db/postgres"
36)
37
38func main() {
39 // Database configuration (AppView database)
40 dbURL := os.Getenv("DATABASE_URL")
41 if dbURL == "" {
42 // Use dev database from .env.dev
43 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
44 }
45
46 // Default PDS URL for this Coves instance (supports self-hosting)
47 defaultPDS := os.Getenv("PDS_URL")
48 if defaultPDS == "" {
49 defaultPDS = "http://localhost:3001" // Local dev PDS
50 }
51
52 // Cursor secret for HMAC signing (prevents cursor manipulation)
53 cursorSecret := os.Getenv("CURSOR_SECRET")
54 if cursorSecret == "" {
55 // Generate a random secret if not set (dev mode)
56 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value
57 cursorSecret = "dev-cursor-secret-change-in-production"
58 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!")
59 }
60
61 db, err := sql.Open("postgres", dbURL)
62 if err != nil {
63 log.Fatal("Failed to connect to database:", err)
64 }
65 defer func() {
66 if closeErr := db.Close(); closeErr != nil {
67 log.Printf("Failed to close database connection: %v", closeErr)
68 }
69 }()
70
71 if err = db.Ping(); err != nil {
72 log.Fatal("Failed to ping database:", err)
73 }
74
75 log.Println("Connected to AppView database")
76
77 // Run migrations
78 if err = goose.SetDialect("postgres"); err != nil {
79 log.Fatal("Failed to set goose dialect:", err)
80 }
81
82 if err = goose.Up(db, "internal/db/migrations"); err != nil {
83 log.Fatal("Failed to run migrations:", err)
84 }
85
86 log.Println("Migrations completed successfully")
87
88 r := chi.NewRouter()
89
90 r.Use(chiMiddleware.Logger)
91 r.Use(chiMiddleware.Recoverer)
92 r.Use(chiMiddleware.RequestID)
93
94 // Rate limiting: 100 requests per minute per IP
95 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
96 r.Use(rateLimiter.Middleware)
97
98 // Initialize identity resolver
99 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC
100 // directory as DID registration to ensure E2E tests work without hitting
101 // the production plc.directory
102 identityConfig := identity.DefaultConfig()
103
104 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
105 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
106 if plcDirectoryURL == "" {
107 plcDirectoryURL = "https://plc.directory" // Default to production PLC
108 }
109
110 // In dev mode, use PLC_DIRECTORY_URL for identity resolution
111 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL
112 if isDevEnv {
113 identityConfig.PLCURL = plcDirectoryURL
114 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL)
115 } else {
116 // Production: Allow separate IDENTITY_PLC_URL for read operations
117 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" {
118 identityConfig.PLCURL = identityPLCURL
119 } else {
120 identityConfig.PLCURL = plcDirectoryURL
121 }
122 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL)
123 }
124
125 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
126 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil {
127 identityConfig.CacheTTL = duration
128 }
129 }
130
131 identityResolver := identity.NewResolver(db, identityConfig)
132
133 // Initialize atProto auth middleware for JWT validation
134 // Phase 1: Set skipVerify=true to test JWT parsing only
135 // Phase 2: Set skipVerify=false to enable full signature verification
136 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true"
137 if skipVerify {
138 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)")
139 log.Println(" Set AUTH_SKIP_VERIFY=false for production")
140 }
141
142 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour
143 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL)
144 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify)
145 log.Println("✅ atProto auth middleware initialized")
146
147 // Initialize repositories and services
148 userRepo := postgresRepo.NewUserRepository(db)
149 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
150
151 communityRepo := postgresRepo.NewCommunityRepository(db)
152
153 // V2.0: PDS-managed DID generation
154 // Community DIDs and keys are generated entirely by the PDS
155 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach)
156
157 instanceDID := os.Getenv("INSTANCE_DID")
158 if instanceDID == "" {
159 instanceDID = "did:web:coves.social" // Default for development
160 }
161
162 // V2: Extract instance domain for community handles
163 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security
164 // We cannot allow arbitrary domains to prevent impersonation attacks
165 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance
166 //
167 // SECURITY: did:web domain verification is implemented in the Jetstream consumer
168 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim()
169 // Communities with mismatched hostedBy domains are rejected during indexing
170 var instanceDomain string
171 if strings.HasPrefix(instanceDID, "did:web:") {
172 // Extract domain from did:web (this is the authoritative source)
173 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
174 } else {
175 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN
176 instanceDomain = os.Getenv("INSTANCE_DOMAIN")
177 if instanceDomain == "" {
178 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs")
179 }
180 }
181
182 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID)
183
184 // V2.0: Initialize PDS account provisioner for communities (simplified)
185 // PDS handles all DID and key generation - no Coves-side cryptography needed
186 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS)
187 log.Printf("✅ Community provisioner initialized (PDS-managed keys)")
188 log.Printf(" - Communities will be created at: %s", defaultPDS)
189 log.Printf(" - PDS will generate and manage all DIDs and keys")
190
191 // Initialize community service (no longer needs didGenerator directly)
192 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner)
193
194 // Authenticate Coves instance with PDS to enable community record writes
195 // The instance needs a PDS account to write community records it owns
196 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
197 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
198 if pdsHandle != "" && pdsPassword != "" {
199 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
200 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
201 if authErr != nil {
202 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr)
203 log.Println("Community creation will fail until PDS authentication is configured")
204 } else {
205 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
206 svc.SetPDSAccessToken(accessToken)
207 log.Println("✓ Coves instance authenticated with PDS")
208 }
209 }
210 } else {
211 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
212 log.Println("Community creation via write-forward is disabled")
213 }
214
215 // Start Jetstream consumer for read-forward user indexing
216 jetstreamURL := os.Getenv("JETSTREAM_URL")
217 if jetstreamURL == "" {
218 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
219 }
220
221 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
222
223 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
224 ctx := context.Background()
225 go func() {
226 if startErr := userConsumer.Start(ctx); startErr != nil {
227 log.Printf("Jetstream consumer stopped: %v", startErr)
228 }
229 }()
230
231 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
232
233 // Start Jetstream consumer for community events (profiles and subscriptions)
234 // This consumer indexes:
235 // 1. Community profiles (social.coves.community.profile) - in community's own repo
236 // 2. User subscriptions (social.coves.community.subscription) - in user's repo
237 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL")
238 if communityJetstreamURL == "" {
239 // Local Jetstream for communities - filter to our instance's collections
240 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe)
241 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures
242 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription"
243 }
244
245 // Initialize community event consumer with did:web verification
246 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true"
247 if skipDIDWebVerification {
248 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)")
249 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production")
250 }
251
252 // Pass identity resolver to consumer for PLC handle resolution (source of truth)
253 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver)
254 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL)
255
256 go func() {
257 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil {
258 log.Printf("Community Jetstream consumer stopped: %v", startErr)
259 }
260 }()
261
262 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL)
263 log.Println(" - Indexing: social.coves.community.profile (community profiles)")
264 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)")
265
266 // Start JWKS cache cleanup background job
267 go func() {
268 ticker := time.NewTicker(1 * time.Hour)
269 defer ticker.Stop()
270 for range ticker.C {
271 jwksFetcher.CleanupExpiredCache()
272 log.Println("JWKS cache cleanup completed")
273 }
274 }()
275
276 log.Println("Started JWKS cache cleanup background job (runs hourly)")
277
278 // Initialize aggregator service
279 aggregatorRepo := postgresRepo.NewAggregatorRepository(db)
280 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService)
281 log.Println("✅ Aggregator service initialized")
282
283 // Initialize post service (with aggregator support)
284 postRepo := postgresRepo.NewPostRepository(db)
285 postService := posts.NewPostService(postRepo, communityService, aggregatorService, defaultPDS)
286
287 // Initialize vote repository (used by Jetstream consumer for indexing)
288 voteRepo := postgresRepo.NewVoteRepository(db)
289 log.Println("✅ Vote repository initialized (Jetstream indexing only)")
290
291 // Initialize comment repository (used by Jetstream consumer for indexing)
292 commentRepo := postgresRepo.NewCommentRepository(db)
293 log.Println("✅ Comment repository initialized (Jetstream indexing only)")
294
295 // Initialize comment service (for query API)
296 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo)
297 log.Println("✅ Comment service initialized")
298
299 // Initialize feed service
300 feedRepo := postgresRepo.NewCommunityFeedRepository(db)
301 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
302 log.Println("✅ Feed service initialized")
303
304 // Initialize timeline service (home feed from subscribed communities)
305 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret)
306 timelineService := timeline.NewTimelineService(timelineRepo)
307 log.Println("✅ Timeline service initialized")
308
309 // Initialize discover service (public feed from all communities)
310 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret)
311 discoverService := discover.NewDiscoverService(discoverRepo)
312 log.Println("✅ Discover service initialized")
313
314 // Start Jetstream consumer for posts
315 // This consumer indexes posts created in community repositories via the firehose
316 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist
317 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL")
318 if postJetstreamURL == "" {
319 // Listen to post record creation events
320 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post"
321 }
322
323 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
324 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL)
325
326 go func() {
327 if startErr := postJetstreamConnector.Start(ctx); startErr != nil {
328 log.Printf("Post Jetstream consumer stopped: %v", startErr)
329 }
330 }()
331
332 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL)
333 log.Println(" - Indexing: social.coves.community.post CREATE operations")
334 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented")
335
336 // Start Jetstream consumer for aggregators
337 // This consumer indexes aggregator service declarations and authorization records
338 // Following Bluesky's pattern for feed generators and labelers
339 // NOTE: Uses the same Jetstream as communities, just filtering different collections
340 aggregatorJetstreamURL := communityJetstreamURL
341 // Override if specific URL needed for testing
342 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" {
343 aggregatorJetstreamURL = envURL
344 } else if aggregatorJetstreamURL == "" {
345 // Fallback if community URL also not set
346 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization"
347 }
348
349 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo)
350 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL)
351
352 go func() {
353 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil {
354 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr)
355 }
356 }()
357
358 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL)
359 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)")
360 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)")
361
362 // Start Jetstream consumer for votes
363 // This consumer indexes votes from user repositories and updates post vote counts
364 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL")
365 if voteJetstreamURL == "" {
366 // Listen to vote record CREATE/DELETE events from user repositories
367 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote"
368 }
369
370 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
371 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL)
372
373 go func() {
374 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil {
375 log.Printf("Vote Jetstream consumer stopped: %v", startErr)
376 }
377 }()
378
379 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL)
380 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations")
381 log.Println(" - Updating: Post vote counts atomically")
382
383 // Start Jetstream consumer for comments
384 // This consumer indexes comments from user repositories and updates parent counts
385 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL")
386 if commentJetstreamURL == "" {
387 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories
388 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.comment"
389 }
390
391 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db)
392 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL)
393
394 go func() {
395 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil {
396 log.Printf("Comment Jetstream consumer stopped: %v", startErr)
397 }
398 }()
399
400 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL)
401 log.Println(" - Indexing: social.coves.feed.comment CREATE/UPDATE/DELETE operations")
402 log.Println(" - Updating: Post comment counts and comment reply counts atomically")
403
404 // Register XRPC routes
405 routes.RegisterUserRoutes(r, userService)
406 routes.RegisterCommunityRoutes(r, communityService, authMiddleware)
407 log.Println("Community XRPC endpoints registered with OAuth authentication")
408
409 routes.RegisterPostRoutes(r, postService, authMiddleware)
410 log.Println("Post XRPC endpoints registered with OAuth authentication")
411
412 // Vote write endpoints removed - clients write directly to their PDS
413 // The AppView indexes votes from Jetstream (see vote consumer above)
414
415 routes.RegisterCommunityFeedRoutes(r, feedService)
416 log.Println("Feed XRPC endpoints registered (public, no auth required)")
417
418 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware)
419 log.Println("Timeline XRPC endpoints registered (requires authentication)")
420
421 routes.RegisterDiscoverRoutes(r, discoverService)
422 log.Println("Discover XRPC endpoints registered (public, no auth required)")
423
424 routes.RegisterAggregatorRoutes(r, aggregatorService)
425 log.Println("Aggregator XRPC endpoints registered (query endpoints public)")
426
427 // Comment query API - supports optional authentication for viewer state
428 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService)
429 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter)
430 r.Handle(
431 "/xrpc/social.coves.community.comment.getComments",
432 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments),
433 )
434 log.Println("✅ Comment query API registered")
435 log.Println(" - GET /xrpc/social.coves.community.comment.getComments")
436
437 r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
438 w.WriteHeader(http.StatusOK)
439 if _, err := w.Write([]byte("OK")); err != nil {
440 log.Printf("Failed to write health check response: %v", err)
441 }
442 })
443
444 port := os.Getenv("APPVIEW_PORT")
445 if port == "" {
446 port = "8081" // Match .env.dev default
447 }
448
449 fmt.Printf("Coves AppView starting on port %s\n", port)
450 fmt.Printf("Default PDS: %s\n", defaultPDS)
451 log.Fatal(http.ListenAndServe(":"+port, r))
452}
453
454// authenticateWithPDS creates a session on the PDS and returns an access token
455func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
456 type CreateSessionRequest struct {
457 Identifier string `json:"identifier"`
458 Password string `json:"password"`
459 }
460
461 type CreateSessionResponse struct {
462 DID string `json:"did"`
463 Handle string `json:"handle"`
464 AccessJwt string `json:"accessJwt"`
465 }
466
467 reqBody, err := json.Marshal(CreateSessionRequest{
468 Identifier: handle,
469 Password: password,
470 })
471 if err != nil {
472 return "", fmt.Errorf("failed to marshal request: %w", err)
473 }
474
475 resp, err := http.Post(
476 pdsURL+"/xrpc/com.atproto.server.createSession",
477 "application/json",
478 bytes.NewReader(reqBody),
479 )
480 if err != nil {
481 return "", fmt.Errorf("failed to call PDS: %w", err)
482 }
483 defer func() {
484 if closeErr := resp.Body.Close(); closeErr != nil {
485 log.Printf("Failed to close response body: %v", closeErr)
486 }
487 }()
488
489 if resp.StatusCode != http.StatusOK {
490 body, readErr := io.ReadAll(resp.Body)
491 if readErr != nil {
492 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr)
493 }
494 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
495 }
496
497 var session CreateSessionResponse
498 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
499 return "", fmt.Errorf("failed to decode response: %w", err)
500 }
501
502 return session.AccessJwt, nil
503}