A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "bytes"
5 "context"
6 "database/sql"
7 "encoding/json"
8 "fmt"
9 "io"
10 "log"
11 "net/http"
12 "os"
13 "strings"
14 "time"
15
16 "Coves/internal/api/middleware"
17 "Coves/internal/api/routes"
18 "Coves/internal/atproto/auth"
19 "Coves/internal/atproto/identity"
20 "Coves/internal/atproto/jetstream"
21 "Coves/internal/core/aggregators"
22 "Coves/internal/core/comments"
23 "Coves/internal/core/communities"
24 "Coves/internal/core/communityFeeds"
25 "Coves/internal/core/discover"
26 "Coves/internal/core/posts"
27 "Coves/internal/core/timeline"
28 "Coves/internal/core/users"
29
30 "github.com/go-chi/chi/v5"
31 chiMiddleware "github.com/go-chi/chi/v5/middleware"
32 _ "github.com/lib/pq"
33 "github.com/pressly/goose/v3"
34
35 commentsAPI "Coves/internal/api/handlers/comments"
36
37 postgresRepo "Coves/internal/db/postgres"
38)
39
40func main() {
41 // Database configuration (AppView database)
42 dbURL := os.Getenv("DATABASE_URL")
43 if dbURL == "" {
44 // Use dev database from .env.dev
45 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
46 }
47
48 // Default PDS URL for this Coves instance (supports self-hosting)
49 defaultPDS := os.Getenv("PDS_URL")
50 if defaultPDS == "" {
51 defaultPDS = "http://localhost:3001" // Local dev PDS
52 }
53
54 // Cursor secret for HMAC signing (prevents cursor manipulation)
55 cursorSecret := os.Getenv("CURSOR_SECRET")
56 if cursorSecret == "" {
57 // Generate a random secret if not set (dev mode)
58 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value
59 cursorSecret = "dev-cursor-secret-change-in-production"
60 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!")
61 }
62
63 db, err := sql.Open("postgres", dbURL)
64 if err != nil {
65 log.Fatal("Failed to connect to database:", err)
66 }
67 defer func() {
68 if closeErr := db.Close(); closeErr != nil {
69 log.Printf("Failed to close database connection: %v", closeErr)
70 }
71 }()
72
73 if err = db.Ping(); err != nil {
74 log.Fatal("Failed to ping database:", err)
75 }
76
77 log.Println("Connected to AppView database")
78
79 // Run migrations
80 if err = goose.SetDialect("postgres"); err != nil {
81 log.Fatal("Failed to set goose dialect:", err)
82 }
83
84 if err = goose.Up(db, "internal/db/migrations"); err != nil {
85 log.Fatal("Failed to run migrations:", err)
86 }
87
88 log.Println("Migrations completed successfully")
89
90 r := chi.NewRouter()
91
92 r.Use(chiMiddleware.Logger)
93 r.Use(chiMiddleware.Recoverer)
94 r.Use(chiMiddleware.RequestID)
95
96 // Rate limiting: 100 requests per minute per IP
97 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
98 r.Use(rateLimiter.Middleware)
99
100 // Initialize identity resolver
101 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC
102 // directory as DID registration to ensure E2E tests work without hitting
103 // the production plc.directory
104 identityConfig := identity.DefaultConfig()
105
106 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
107 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
108 if plcDirectoryURL == "" {
109 plcDirectoryURL = "https://plc.directory" // Default to production PLC
110 }
111
112 // In dev mode, use PLC_DIRECTORY_URL for identity resolution
113 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL
114 if isDevEnv {
115 identityConfig.PLCURL = plcDirectoryURL
116 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL)
117 } else {
118 // Production: Allow separate IDENTITY_PLC_URL for read operations
119 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" {
120 identityConfig.PLCURL = identityPLCURL
121 } else {
122 identityConfig.PLCURL = plcDirectoryURL
123 }
124 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL)
125 }
126
127 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
128 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil {
129 identityConfig.CacheTTL = duration
130 }
131 }
132
133 identityResolver := identity.NewResolver(db, identityConfig)
134
135 // Initialize atProto auth middleware for JWT validation
136 // Phase 1: Set skipVerify=true to test JWT parsing only
137 // Phase 2: Set skipVerify=false to enable full signature verification
138 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true"
139 if skipVerify {
140 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)")
141 log.Println(" Set AUTH_SKIP_VERIFY=false for production")
142 }
143
144 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour
145 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL)
146 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify)
147 log.Println("✅ atProto auth middleware initialized")
148
149 // Initialize repositories and services
150 userRepo := postgresRepo.NewUserRepository(db)
151 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
152
153 communityRepo := postgresRepo.NewCommunityRepository(db)
154
155 // V2.0: PDS-managed DID generation
156 // Community DIDs and keys are generated entirely by the PDS
157 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach)
158
159 instanceDID := os.Getenv("INSTANCE_DID")
160 if instanceDID == "" {
161 instanceDID = "did:web:coves.social" // Default for development
162 }
163
164 // V2: Extract instance domain for community handles
165 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security
166 // We cannot allow arbitrary domains to prevent impersonation attacks
167 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance
168 //
169 // SECURITY: did:web domain verification is implemented in the Jetstream consumer
170 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim()
171 // Communities with mismatched hostedBy domains are rejected during indexing
172 var instanceDomain string
173 if strings.HasPrefix(instanceDID, "did:web:") {
174 // Extract domain from did:web (this is the authoritative source)
175 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
176 } else {
177 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN
178 instanceDomain = os.Getenv("INSTANCE_DOMAIN")
179 if instanceDomain == "" {
180 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs")
181 }
182 }
183
184 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID)
185
186 // V2.0: Initialize PDS account provisioner for communities (simplified)
187 // PDS handles all DID and key generation - no Coves-side cryptography needed
188 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS)
189 log.Printf("✅ Community provisioner initialized (PDS-managed keys)")
190 log.Printf(" - Communities will be created at: %s", defaultPDS)
191 log.Printf(" - PDS will generate and manage all DIDs and keys")
192
193 // Initialize community service (no longer needs didGenerator directly)
194 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner)
195
196 // Authenticate Coves instance with PDS to enable community record writes
197 // The instance needs a PDS account to write community records it owns
198 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
199 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
200 if pdsHandle != "" && pdsPassword != "" {
201 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
202 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
203 if authErr != nil {
204 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr)
205 log.Println("Community creation will fail until PDS authentication is configured")
206 } else {
207 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
208 svc.SetPDSAccessToken(accessToken)
209 log.Println("✓ Coves instance authenticated with PDS")
210 }
211 }
212 } else {
213 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
214 log.Println("Community creation via write-forward is disabled")
215 }
216
217 // Start Jetstream consumer for read-forward user indexing
218 jetstreamURL := os.Getenv("JETSTREAM_URL")
219 if jetstreamURL == "" {
220 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
221 }
222
223 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
224
225 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
226 ctx := context.Background()
227 go func() {
228 if startErr := userConsumer.Start(ctx); startErr != nil {
229 log.Printf("Jetstream consumer stopped: %v", startErr)
230 }
231 }()
232
233 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
234
235 // Start Jetstream consumer for community events (profiles and subscriptions)
236 // This consumer indexes:
237 // 1. Community profiles (social.coves.community.profile) - in community's own repo
238 // 2. User subscriptions (social.coves.community.subscription) - in user's repo
239 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL")
240 if communityJetstreamURL == "" {
241 // Local Jetstream for communities - filter to our instance's collections
242 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe)
243 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures
244 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription"
245 }
246
247 // Initialize community event consumer with did:web verification
248 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true"
249 if skipDIDWebVerification {
250 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)")
251 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production")
252 }
253
254 // Pass identity resolver to consumer for PLC handle resolution (source of truth)
255 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver)
256 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL)
257
258 go func() {
259 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil {
260 log.Printf("Community Jetstream consumer stopped: %v", startErr)
261 }
262 }()
263
264 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL)
265 log.Println(" - Indexing: social.coves.community.profile (community profiles)")
266 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)")
267
268 // Start JWKS cache cleanup background job
269 go func() {
270 ticker := time.NewTicker(1 * time.Hour)
271 defer ticker.Stop()
272 for range ticker.C {
273 jwksFetcher.CleanupExpiredCache()
274 log.Println("JWKS cache cleanup completed")
275 }
276 }()
277
278 log.Println("Started JWKS cache cleanup background job (runs hourly)")
279
280 // Initialize aggregator service
281 aggregatorRepo := postgresRepo.NewAggregatorRepository(db)
282 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService)
283 log.Println("✅ Aggregator service initialized")
284
285 // Initialize post service (with aggregator support)
286 postRepo := postgresRepo.NewPostRepository(db)
287 postService := posts.NewPostService(postRepo, communityService, aggregatorService, defaultPDS)
288
289 // Initialize vote repository (used by Jetstream consumer for indexing)
290 voteRepo := postgresRepo.NewVoteRepository(db)
291 log.Println("✅ Vote repository initialized (Jetstream indexing only)")
292
293 // Initialize comment repository (used by Jetstream consumer for indexing)
294 commentRepo := postgresRepo.NewCommentRepository(db)
295 log.Println("✅ Comment repository initialized (Jetstream indexing only)")
296
297 // Initialize comment service (for query API)
298 // Requires user and community repos for proper author/community hydration per lexicon
299 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo)
300 log.Println("✅ Comment service initialized (with author/community hydration)")
301
302 // Initialize feed service
303 feedRepo := postgresRepo.NewCommunityFeedRepository(db)
304 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
305 log.Println("✅ Feed service initialized")
306
307 // Initialize timeline service (home feed from subscribed communities)
308 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret)
309 timelineService := timeline.NewTimelineService(timelineRepo)
310 log.Println("✅ Timeline service initialized")
311
312 // Initialize discover service (public feed from all communities)
313 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret)
314 discoverService := discover.NewDiscoverService(discoverRepo)
315 log.Println("✅ Discover service initialized")
316
317 // Start Jetstream consumer for posts
318 // This consumer indexes posts created in community repositories via the firehose
319 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist
320 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL")
321 if postJetstreamURL == "" {
322 // Listen to post record creation events
323 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post"
324 }
325
326 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
327 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL)
328
329 go func() {
330 if startErr := postJetstreamConnector.Start(ctx); startErr != nil {
331 log.Printf("Post Jetstream consumer stopped: %v", startErr)
332 }
333 }()
334
335 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL)
336 log.Println(" - Indexing: social.coves.community.post CREATE operations")
337 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented")
338
339 // Start Jetstream consumer for aggregators
340 // This consumer indexes aggregator service declarations and authorization records
341 // Following Bluesky's pattern for feed generators and labelers
342 // NOTE: Uses the same Jetstream as communities, just filtering different collections
343 aggregatorJetstreamURL := communityJetstreamURL
344 // Override if specific URL needed for testing
345 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" {
346 aggregatorJetstreamURL = envURL
347 } else if aggregatorJetstreamURL == "" {
348 // Fallback if community URL also not set
349 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization"
350 }
351
352 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo)
353 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL)
354
355 go func() {
356 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil {
357 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr)
358 }
359 }()
360
361 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL)
362 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)")
363 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)")
364
365 // Start Jetstream consumer for votes
366 // This consumer indexes votes from user repositories and updates post vote counts
367 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL")
368 if voteJetstreamURL == "" {
369 // Listen to vote record CREATE/DELETE events from user repositories
370 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote"
371 }
372
373 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
374 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL)
375
376 go func() {
377 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil {
378 log.Printf("Vote Jetstream consumer stopped: %v", startErr)
379 }
380 }()
381
382 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL)
383 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations")
384 log.Println(" - Updating: Post vote counts atomically")
385
386 // Start Jetstream consumer for comments
387 // This consumer indexes comments from user repositories and updates parent counts
388 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL")
389 if commentJetstreamURL == "" {
390 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories
391 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.comment"
392 }
393
394 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db)
395 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL)
396
397 go func() {
398 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil {
399 log.Printf("Comment Jetstream consumer stopped: %v", startErr)
400 }
401 }()
402
403 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL)
404 log.Println(" - Indexing: social.coves.feed.comment CREATE/UPDATE/DELETE operations")
405 log.Println(" - Updating: Post comment counts and comment reply counts atomically")
406
407 // Register XRPC routes
408 routes.RegisterUserRoutes(r, userService)
409 routes.RegisterCommunityRoutes(r, communityService, authMiddleware)
410 log.Println("Community XRPC endpoints registered with OAuth authentication")
411
412 routes.RegisterPostRoutes(r, postService, authMiddleware)
413 log.Println("Post XRPC endpoints registered with OAuth authentication")
414
415 // Vote write endpoints removed - clients write directly to their PDS
416 // The AppView indexes votes from Jetstream (see vote consumer above)
417
418 routes.RegisterCommunityFeedRoutes(r, feedService)
419 log.Println("Feed XRPC endpoints registered (public, no auth required)")
420
421 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware)
422 log.Println("Timeline XRPC endpoints registered (requires authentication)")
423
424 routes.RegisterDiscoverRoutes(r, discoverService)
425 log.Println("Discover XRPC endpoints registered (public, no auth required)")
426
427 routes.RegisterAggregatorRoutes(r, aggregatorService)
428 log.Println("Aggregator XRPC endpoints registered (query endpoints public)")
429
430 // Comment query API - supports optional authentication for viewer state
431 // Stricter rate limiting for expensive nested comment queries
432 commentRateLimiter := middleware.NewRateLimiter(20, 1*time.Minute)
433 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService)
434 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter)
435 r.Handle(
436 "/xrpc/social.coves.community.comment.getComments",
437 commentRateLimiter.Middleware(
438 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments),
439 ),
440 )
441 log.Println("✅ Comment query API registered (20 req/min rate limit)")
442 log.Println(" - GET /xrpc/social.coves.community.comment.getComments")
443
444 r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
445 w.WriteHeader(http.StatusOK)
446 if _, err := w.Write([]byte("OK")); err != nil {
447 log.Printf("Failed to write health check response: %v", err)
448 }
449 })
450
451 port := os.Getenv("APPVIEW_PORT")
452 if port == "" {
453 port = "8081" // Match .env.dev default
454 }
455
456 fmt.Printf("Coves AppView starting on port %s\n", port)
457 fmt.Printf("Default PDS: %s\n", defaultPDS)
458 log.Fatal(http.ListenAndServe(":"+port, r))
459}
460
461// authenticateWithPDS creates a session on the PDS and returns an access token
462func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
463 type CreateSessionRequest struct {
464 Identifier string `json:"identifier"`
465 Password string `json:"password"`
466 }
467
468 type CreateSessionResponse struct {
469 DID string `json:"did"`
470 Handle string `json:"handle"`
471 AccessJwt string `json:"accessJwt"`
472 }
473
474 reqBody, err := json.Marshal(CreateSessionRequest{
475 Identifier: handle,
476 Password: password,
477 })
478 if err != nil {
479 return "", fmt.Errorf("failed to marshal request: %w", err)
480 }
481
482 resp, err := http.Post(
483 pdsURL+"/xrpc/com.atproto.server.createSession",
484 "application/json",
485 bytes.NewReader(reqBody),
486 )
487 if err != nil {
488 return "", fmt.Errorf("failed to call PDS: %w", err)
489 }
490 defer func() {
491 if closeErr := resp.Body.Close(); closeErr != nil {
492 log.Printf("Failed to close response body: %v", closeErr)
493 }
494 }()
495
496 if resp.StatusCode != http.StatusOK {
497 body, readErr := io.ReadAll(resp.Body)
498 if readErr != nil {
499 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr)
500 }
501 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
502 }
503
504 var session CreateSessionResponse
505 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
506 return "", fmt.Errorf("failed to decode response: %w", err)
507 }
508
509 return session.AccessJwt, nil
510}