A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/api/routes"
6 "Coves/internal/atproto/identity"
7 "Coves/internal/atproto/jetstream"
8 "Coves/internal/atproto/oauth"
9 "Coves/internal/core/aggregators"
10 "Coves/internal/core/blobs"
11 "Coves/internal/core/comments"
12 "Coves/internal/core/communities"
13 "Coves/internal/core/communityFeeds"
14 "Coves/internal/core/discover"
15 "Coves/internal/core/posts"
16 "Coves/internal/core/timeline"
17 "Coves/internal/core/unfurl"
18 "Coves/internal/core/users"
19 "Coves/internal/core/votes"
20 "bytes"
21 "context"
22 "crypto/rand"
23 "database/sql"
24 "encoding/base64"
25 "encoding/json"
26 "fmt"
27 "io"
28 "log"
29 "net/http"
30 "os"
31 "os/signal"
32 "strings"
33 "syscall"
34 "time"
35
36 "github.com/go-chi/chi/v5"
37 chiMiddleware "github.com/go-chi/chi/v5/middleware"
38 _ "github.com/lib/pq"
39 "github.com/pressly/goose/v3"
40
41 commentsAPI "Coves/internal/api/handlers/comments"
42
43 postgresRepo "Coves/internal/db/postgres"
44)
45
46func main() {
47 // Database configuration (AppView database)
48 dbURL := os.Getenv("DATABASE_URL")
49 if dbURL == "" {
50 // Use dev database from .env.dev
51 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
52 }
53
54 // Default PDS URL for this Coves instance (supports self-hosting)
55 defaultPDS := os.Getenv("PDS_URL")
56 if defaultPDS == "" {
57 defaultPDS = "http://localhost:3001" // Local dev PDS
58 }
59
60 // Cursor secret for HMAC signing (prevents cursor manipulation)
61 cursorSecret := os.Getenv("CURSOR_SECRET")
62 if cursorSecret == "" {
63 // Generate a random secret if not set (dev mode)
64 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value
65 cursorSecret = "dev-cursor-secret-change-in-production"
66 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!")
67 }
68
69 db, err := sql.Open("postgres", dbURL)
70 if err != nil {
71 log.Fatal("Failed to connect to database:", err)
72 }
73 defer func() {
74 if closeErr := db.Close(); closeErr != nil {
75 log.Printf("Failed to close database connection: %v", closeErr)
76 }
77 }()
78
79 if err = db.Ping(); err != nil {
80 log.Fatal("Failed to ping database:", err)
81 }
82
83 log.Println("Connected to AppView database")
84
85 // Run migrations
86 if err = goose.SetDialect("postgres"); err != nil {
87 log.Fatal("Failed to set goose dialect:", err)
88 }
89
90 if err = goose.Up(db, "internal/db/migrations"); err != nil {
91 log.Fatal("Failed to run migrations:", err)
92 }
93
94 log.Println("Migrations completed successfully")
95
96 r := chi.NewRouter()
97
98 r.Use(chiMiddleware.Logger)
99 r.Use(chiMiddleware.Recoverer)
100 r.Use(chiMiddleware.RequestID)
101
102 // Rate limiting: 100 requests per minute per IP
103 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
104 r.Use(rateLimiter.Middleware)
105
106 // Initialize identity resolver
107 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC
108 // directory as DID registration to ensure E2E tests work without hitting
109 // the production plc.directory
110 identityConfig := identity.DefaultConfig()
111
112 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
113 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
114 if plcDirectoryURL == "" {
115 plcDirectoryURL = "https://plc.directory" // Default to production PLC
116 }
117
118 // In dev mode, use PLC_DIRECTORY_URL for identity resolution
119 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL
120 if isDevEnv {
121 identityConfig.PLCURL = plcDirectoryURL
122 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL)
123 } else {
124 // Production: Allow separate IDENTITY_PLC_URL for read operations
125 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" {
126 identityConfig.PLCURL = identityPLCURL
127 } else {
128 identityConfig.PLCURL = plcDirectoryURL
129 }
130 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL)
131 }
132
133 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
134 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil {
135 identityConfig.CacheTTL = duration
136 }
137 }
138
139 identityResolver := identity.NewResolver(db, identityConfig)
140
141 // Get PLC URL for OAuth and other services
142 plcURL := os.Getenv("PLC_DIRECTORY_URL")
143 if plcURL == "" {
144 plcURL = "https://plc.directory"
145 }
146 log.Printf("🔐 OAuth will use PLC directory: %s", plcURL)
147
148 // Initialize OAuth client for sealed session tokens
149 // Mobile apps authenticate via OAuth flow and receive sealed session tokens
150 // These tokens are encrypted references to OAuth sessions stored in the database
151 oauthSealSecret := os.Getenv("OAUTH_SEAL_SECRET")
152 if oauthSealSecret == "" {
153 if os.Getenv("IS_DEV_ENV") != "true" {
154 log.Fatal("OAUTH_SEAL_SECRET is required in production mode")
155 }
156 // Generate RANDOM secret for dev mode
157 randomBytes := make([]byte, 32)
158 if _, err := rand.Read(randomBytes); err != nil {
159 log.Fatal("Failed to generate random seal secret: ", err)
160 }
161 oauthSealSecret = base64.StdEncoding.EncodeToString(randomBytes)
162 log.Println("⚠️ DEV MODE: Generated random OAuth seal secret (won't persist across restarts)")
163 }
164
165 isDevMode := os.Getenv("IS_DEV_ENV") == "true"
166 pdsURL := os.Getenv("PDS_URL") // For dev mode: resolve handles via local PDS
167 oauthConfig := &oauth.OAuthConfig{
168 PublicURL: os.Getenv("APPVIEW_PUBLIC_URL"),
169 SealSecret: oauthSealSecret,
170 Scopes: []string{"atproto", "transition:generic"},
171 DevMode: isDevMode,
172 AllowPrivateIPs: isDevMode, // Allow private IPs only in dev mode
173 PLCURL: plcURL,
174 PDSURL: pdsURL, // For dev mode handle resolution
175 // SessionTTL and SealedTokenTTL will use defaults if not set (7 days and 14 days)
176 }
177
178 // Create PostgreSQL-backed OAuth session store (using default 7-day TTL)
179 baseOAuthStore := oauth.NewPostgresOAuthStore(db, 0)
180 // Wrap with MobileAwareStoreWrapper to capture OAuth state for mobile CSRF validation.
181 // This intercepts SaveAuthRequestInfo to save mobile CSRF data when present in context.
182 oauthStore := oauth.NewMobileAwareStoreWrapper(baseOAuthStore)
183
184 if oauthConfig.PublicURL == "" {
185 oauthConfig.PublicURL = "http://localhost:8080"
186 oauthConfig.DevMode = true // Force dev mode for localhost
187 }
188
189 oauthClient, err := oauth.NewOAuthClient(oauthConfig, oauthStore)
190 if err != nil {
191 log.Fatalf("Failed to initialize OAuth client: %v", err)
192 }
193
194 // Create OAuth handler for HTTP endpoints
195 oauthHandler := oauth.NewOAuthHandler(oauthClient, oauthStore)
196
197 // Create OAuth auth middleware
198 // Validates sealed session tokens and loads OAuth sessions from database
199 authMiddleware := middleware.NewOAuthAuthMiddleware(oauthClient, oauthStore)
200 log.Println("✅ OAuth auth middleware initialized (sealed session tokens)")
201
202 // Initialize repositories and services
203 userRepo := postgresRepo.NewUserRepository(db)
204 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
205
206 communityRepo := postgresRepo.NewCommunityRepository(db)
207
208 // V2.0: PDS-managed DID generation
209 // Community DIDs and keys are generated entirely by the PDS
210 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach)
211
212 instanceDID := os.Getenv("INSTANCE_DID")
213 if instanceDID == "" {
214 instanceDID = "did:web:coves.social" // Default for development
215 }
216
217 // V2: Extract instance domain for community handles
218 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security
219 // We cannot allow arbitrary domains to prevent impersonation attacks
220 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance
221 //
222 // SECURITY: did:web domain verification is implemented in the Jetstream consumer
223 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim()
224 // Communities with mismatched hostedBy domains are rejected during indexing
225 var instanceDomain string
226 if strings.HasPrefix(instanceDID, "did:web:") {
227 // Extract domain from did:web (this is the authoritative source)
228 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
229 } else {
230 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN
231 instanceDomain = os.Getenv("INSTANCE_DOMAIN")
232 if instanceDomain == "" {
233 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs")
234 }
235 }
236
237 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID)
238
239 // Community creation restriction - if set, only these DIDs can create communities
240 var allowedCommunityCreators []string
241 if communityCreators := os.Getenv("COMMUNITY_CREATORS"); communityCreators != "" {
242 for _, did := range strings.Split(communityCreators, ",") {
243 did = strings.TrimSpace(did)
244 if did != "" {
245 allowedCommunityCreators = append(allowedCommunityCreators, did)
246 }
247 }
248 log.Printf("Community creation restricted to %d DIDs", len(allowedCommunityCreators))
249 } else {
250 log.Println("Community creation open to all authenticated users")
251 }
252
253 // V2.0: Initialize PDS account provisioner for communities (simplified)
254 // PDS handles all DID and key generation - no Coves-side cryptography needed
255 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS)
256 log.Printf("✅ Community provisioner initialized (PDS-managed keys)")
257 log.Printf(" - Communities will be created at: %s", defaultPDS)
258 log.Printf(" - PDS will generate and manage all DIDs and keys")
259
260 // Initialize community service (no longer needs didGenerator directly)
261 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner)
262
263 // Authenticate Coves instance with PDS to enable community record writes
264 // The instance needs a PDS account to write community records it owns
265 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
266 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
267 if pdsHandle != "" && pdsPassword != "" {
268 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
269 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
270 if authErr != nil {
271 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr)
272 log.Println("Community creation will fail until PDS authentication is configured")
273 } else {
274 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
275 svc.SetPDSAccessToken(accessToken)
276 log.Println("✓ Coves instance authenticated with PDS")
277 }
278 }
279 } else {
280 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
281 log.Println("Community creation via write-forward is disabled")
282 }
283
284 // Start Jetstream consumer for read-forward user indexing
285 jetstreamURL := os.Getenv("JETSTREAM_URL")
286 if jetstreamURL == "" {
287 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
288 }
289
290 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
291
292 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
293 ctx := context.Background()
294 go func() {
295 if startErr := userConsumer.Start(ctx); startErr != nil {
296 log.Printf("Jetstream consumer stopped: %v", startErr)
297 }
298 }()
299
300 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
301
302 // Start Jetstream consumer for community events (profiles and subscriptions)
303 // This consumer indexes:
304 // 1. Community profiles (social.coves.community.profile) - in community's own repo
305 // 2. User subscriptions (social.coves.community.subscription) - in user's repo
306 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL")
307 if communityJetstreamURL == "" {
308 // Local Jetstream for communities - filter to our instance's collections
309 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe)
310 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures
311 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription"
312 }
313
314 // Initialize community event consumer with did:web verification
315 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true"
316 if skipDIDWebVerification {
317 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)")
318 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production")
319 }
320
321 // Pass identity resolver to consumer for PLC handle resolution (source of truth)
322 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver)
323 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL)
324
325 go func() {
326 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil {
327 log.Printf("Community Jetstream consumer stopped: %v", startErr)
328 }
329 }()
330
331 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL)
332 log.Println(" - Indexing: social.coves.community.profile (community profiles)")
333 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)")
334
335 // Start OAuth session cleanup background job with cancellable context
336 cleanupCtx, cleanupCancel := context.WithCancel(context.Background())
337 go func() {
338 ticker := time.NewTicker(1 * time.Hour)
339 defer ticker.Stop()
340 for {
341 select {
342 case <-cleanupCtx.Done():
343 log.Println("OAuth cleanup job stopped")
344 return
345 case <-ticker.C:
346 // Check if store implements cleanup methods
347 // Use UnwrapPostgresStore to get the underlying store from the wrapper
348 if cleanupStore := oauthStore.UnwrapPostgresStore(); cleanupStore != nil {
349 sessions, sessErr := cleanupStore.CleanupExpiredSessions(cleanupCtx)
350 if sessErr != nil {
351 log.Printf("Error cleaning up expired OAuth sessions: %v", sessErr)
352 }
353 requests, reqErr := cleanupStore.CleanupExpiredAuthRequests(cleanupCtx)
354 if reqErr != nil {
355 log.Printf("Error cleaning up expired OAuth auth requests: %v", reqErr)
356 }
357 if sessions > 0 || requests > 0 {
358 log.Printf("OAuth cleanup: removed %d expired sessions, %d expired auth requests", sessions, requests)
359 }
360 }
361 }
362 }
363 }()
364
365 log.Println("Started OAuth session cleanup background job (runs hourly)")
366
367 // Initialize aggregator service
368 aggregatorRepo := postgresRepo.NewAggregatorRepository(db)
369 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService)
370 log.Println("✅ Aggregator service initialized")
371
372 // Initialize unfurl cache repository
373 unfurlRepo := unfurl.NewRepository(db)
374
375 // Initialize blob upload service
376 blobService := blobs.NewBlobService(defaultPDS)
377
378 // Initialize unfurl service with configuration
379 unfurlService := unfurl.NewService(
380 unfurlRepo,
381 unfurl.WithTimeout(10*time.Second),
382 unfurl.WithUserAgent("CovesBot/1.0 (+https://coves.social)"),
383 unfurl.WithCacheTTL(24*time.Hour),
384 )
385 log.Println("✅ Unfurl and blob services initialized")
386
387 // Initialize post service (with aggregator support)
388 postRepo := postgresRepo.NewPostRepository(db)
389 postService := posts.NewPostService(postRepo, communityService, aggregatorService, blobService, unfurlService, defaultPDS)
390
391 // Initialize vote repository (used by Jetstream consumer for indexing)
392 voteRepo := postgresRepo.NewVoteRepository(db)
393 log.Println("✅ Vote repository initialized (Jetstream indexing only)")
394
395 // Initialize comment repository (used by Jetstream consumer for indexing)
396 commentRepo := postgresRepo.NewCommentRepository(db)
397 log.Println("✅ Comment repository initialized (Jetstream indexing only)")
398
399 // Initialize vote cache (stores user votes from PDS to avoid eventual consistency issues)
400 // TTL of 10 minutes - cache is also updated on vote create/delete
401 voteCache := votes.NewVoteCache(10*time.Minute, nil)
402 log.Println("✅ Vote cache initialized (10 minute TTL)")
403
404 // Initialize vote service (for XRPC API endpoints)
405 // Note: We don't validate subject existence - the vote goes to the user's PDS regardless.
406 // The Jetstream consumer handles orphaned votes correctly by only updating counts for
407 // non-deleted subjects. This avoids race conditions and eventual consistency issues.
408 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, voteCache, nil)
409 log.Println("✅ Vote service initialized (with OAuth authentication and vote cache)")
410
411 // Initialize comment service (for query API)
412 // Requires user and community repos for proper author/community hydration per lexicon
413 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo)
414 log.Println("✅ Comment service initialized (with author/community hydration)")
415
416 // Initialize feed service
417 feedRepo := postgresRepo.NewCommunityFeedRepository(db, cursorSecret)
418 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
419 log.Println("✅ Feed service initialized")
420
421 // Initialize timeline service (home feed from subscribed communities)
422 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret)
423 timelineService := timeline.NewTimelineService(timelineRepo)
424 log.Println("✅ Timeline service initialized")
425
426 // Initialize discover service (public feed from all communities)
427 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret)
428 discoverService := discover.NewDiscoverService(discoverRepo)
429 log.Println("✅ Discover service initialized")
430
431 // Start Jetstream consumer for posts
432 // This consumer indexes posts created in community repositories via the firehose
433 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist
434 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL")
435 if postJetstreamURL == "" {
436 // Listen to post record creation events
437 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post"
438 }
439
440 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
441 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL)
442
443 go func() {
444 if startErr := postJetstreamConnector.Start(ctx); startErr != nil {
445 log.Printf("Post Jetstream consumer stopped: %v", startErr)
446 }
447 }()
448
449 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL)
450 log.Println(" - Indexing: social.coves.community.post CREATE operations")
451 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented")
452
453 // Start Jetstream consumer for aggregators
454 // This consumer indexes aggregator service declarations and authorization records
455 // Following Bluesky's pattern for feed generators and labelers
456 // NOTE: Uses the same Jetstream as communities, just filtering different collections
457 aggregatorJetstreamURL := communityJetstreamURL
458 // Override if specific URL needed for testing
459 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" {
460 aggregatorJetstreamURL = envURL
461 } else if aggregatorJetstreamURL == "" {
462 // Fallback if community URL also not set
463 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization"
464 }
465
466 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo)
467 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL)
468
469 go func() {
470 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil {
471 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr)
472 }
473 }()
474
475 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL)
476 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)")
477 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)")
478
479 // Start Jetstream consumer for votes
480 // This consumer indexes votes from user repositories and updates post vote counts
481 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL")
482 if voteJetstreamURL == "" {
483 // Listen to vote record CREATE/DELETE events from user repositories
484 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote"
485 }
486
487 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
488 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL)
489
490 go func() {
491 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil {
492 log.Printf("Vote Jetstream consumer stopped: %v", startErr)
493 }
494 }()
495
496 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL)
497 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations")
498 log.Println(" - Updating: Post vote counts atomically")
499
500 // Start Jetstream consumer for comments
501 // This consumer indexes comments from user repositories and updates parent counts
502 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL")
503 if commentJetstreamURL == "" {
504 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories
505 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.comment"
506 }
507
508 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db)
509 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL)
510
511 go func() {
512 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil {
513 log.Printf("Comment Jetstream consumer stopped: %v", startErr)
514 }
515 }()
516
517 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL)
518 log.Println(" - Indexing: social.coves.community.comment CREATE/UPDATE/DELETE operations")
519 log.Println(" - Updating: Post comment counts and comment reply counts atomically")
520
521 // Register XRPC routes
522 routes.RegisterUserRoutes(r, userService)
523 routes.RegisterCommunityRoutes(r, communityService, authMiddleware, allowedCommunityCreators)
524 log.Println("Community XRPC endpoints registered with OAuth authentication")
525
526 routes.RegisterPostRoutes(r, postService, authMiddleware)
527 log.Println("Post XRPC endpoints registered with OAuth authentication")
528
529 routes.RegisterVoteRoutes(r, voteService, authMiddleware)
530 log.Println("Vote XRPC endpoints registered with OAuth authentication")
531
532 routes.RegisterCommunityFeedRoutes(r, feedService, voteService, authMiddleware)
533 log.Println("Feed XRPC endpoints registered (public with optional auth for viewer vote state)")
534
535 routes.RegisterTimelineRoutes(r, timelineService, voteService, authMiddleware)
536 log.Println("Timeline XRPC endpoints registered (requires authentication, includes viewer vote state)")
537
538 routes.RegisterDiscoverRoutes(r, discoverService)
539 log.Println("Discover XRPC endpoints registered (public, no auth required)")
540
541 routes.RegisterAggregatorRoutes(r, aggregatorService, userService, identityResolver)
542 log.Println("Aggregator XRPC endpoints registered (query endpoints public, registration endpoint public)")
543
544 // Comment query API - supports optional authentication for viewer state
545 // Stricter rate limiting for expensive nested comment queries
546 commentRateLimiter := middleware.NewRateLimiter(20, 1*time.Minute)
547 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService)
548 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter)
549 r.Handle(
550 "/xrpc/social.coves.community.comment.getComments",
551 commentRateLimiter.Middleware(
552 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments),
553 ),
554 )
555 log.Println("✅ Comment query API registered (20 req/min rate limit)")
556 log.Println(" - GET /xrpc/social.coves.community.comment.getComments")
557
558 // Configure allowed CORS origins for OAuth callback
559 // SECURITY: Never use wildcard "*" with credentials - only allow specific origins
560 var oauthAllowedOrigins []string
561 appviewPublicURL := os.Getenv("APPVIEW_PUBLIC_URL")
562 if appviewPublicURL == "" {
563 appviewPublicURL = "http://localhost:8080"
564 }
565 oauthAllowedOrigins = append(oauthAllowedOrigins, appviewPublicURL)
566
567 // In dev mode, also allow common localhost origins for testing
568 if oauthConfig.DevMode {
569 oauthAllowedOrigins = append(oauthAllowedOrigins,
570 "http://localhost:3000",
571 "http://localhost:3001",
572 "http://localhost:5173",
573 "http://127.0.0.1:8080",
574 "http://127.0.0.1:3000",
575 "http://127.0.0.1:3001",
576 "http://127.0.0.1:5173",
577 )
578 log.Printf("🧪 DEV MODE: OAuth CORS allows localhost origins for testing")
579 }
580 log.Printf("OAuth CORS allowed origins: %v", oauthAllowedOrigins)
581
582 // Register OAuth routes for authentication flow
583 routes.RegisterOAuthRoutes(r, oauthHandler, oauthAllowedOrigins)
584 log.Println("✅ OAuth endpoints registered")
585 log.Println(" - GET /oauth/client-metadata.json")
586 log.Println(" - GET /oauth/jwks.json")
587 log.Println(" - GET /oauth/login")
588 log.Println(" - GET /oauth/mobile/login")
589 log.Println(" - GET /oauth/callback")
590 log.Println(" - POST /oauth/logout")
591 log.Println(" - POST /oauth/refresh")
592
593 // Register well-known routes for mobile app deep linking
594 routes.RegisterWellKnownRoutes(r)
595 log.Println("✅ Well-known endpoints registered (mobile Universal Links & App Links)")
596 log.Println(" - GET /.well-known/apple-app-site-association (iOS Universal Links)")
597 log.Println(" - GET /.well-known/assetlinks.json (Android App Links)")
598
599 // Health check endpoints
600 healthHandler := func(w http.ResponseWriter, r *http.Request) {
601 w.WriteHeader(http.StatusOK)
602 if _, err := w.Write([]byte("OK")); err != nil {
603 log.Printf("Failed to write health check response: %v", err)
604 }
605 }
606 r.Get("/health", healthHandler)
607 r.Get("/xrpc/_health", healthHandler)
608
609 // Check PORT first (docker-compose), then APPVIEW_PORT (legacy)
610 port := os.Getenv("PORT")
611 if port == "" {
612 port = os.Getenv("APPVIEW_PORT")
613 }
614 if port == "" {
615 port = "8080"
616 }
617
618 // Create HTTP server for graceful shutdown
619 server := &http.Server{
620 Addr: ":" + port,
621 Handler: r,
622 }
623
624 // Channel to listen for shutdown signals
625 stop := make(chan os.Signal, 1)
626 signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
627
628 // Start server in goroutine
629 go func() {
630 fmt.Printf("Coves AppView starting on port %s\n", port)
631 fmt.Printf("Default PDS: %s\n", defaultPDS)
632 if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
633 log.Fatalf("Server error: %v", err)
634 }
635 }()
636
637 // Wait for shutdown signal
638 <-stop
639 log.Println("Shutting down server...")
640
641 // Graceful shutdown with timeout
642 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
643 defer cancel()
644
645 // Stop OAuth cleanup background job
646 cleanupCancel()
647
648 if err := server.Shutdown(ctx); err != nil {
649 log.Fatalf("Server shutdown error: %v", err)
650 }
651 log.Println("Server stopped gracefully")
652}
653
654// authenticateWithPDS creates a session on the PDS and returns an access token
655func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
656 type CreateSessionRequest struct {
657 Identifier string `json:"identifier"`
658 Password string `json:"password"`
659 }
660
661 type CreateSessionResponse struct {
662 DID string `json:"did"`
663 Handle string `json:"handle"`
664 AccessJwt string `json:"accessJwt"`
665 }
666
667 reqBody, err := json.Marshal(CreateSessionRequest{
668 Identifier: handle,
669 Password: password,
670 })
671 if err != nil {
672 return "", fmt.Errorf("failed to marshal request: %w", err)
673 }
674
675 resp, err := http.Post(
676 pdsURL+"/xrpc/com.atproto.server.createSession",
677 "application/json",
678 bytes.NewReader(reqBody),
679 )
680 if err != nil {
681 return "", fmt.Errorf("failed to call PDS: %w", err)
682 }
683 defer func() {
684 if closeErr := resp.Body.Close(); closeErr != nil {
685 log.Printf("Failed to close response body: %v", closeErr)
686 }
687 }()
688
689 if resp.StatusCode != http.StatusOK {
690 body, readErr := io.ReadAll(resp.Body)
691 if readErr != nil {
692 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr)
693 }
694 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
695 }
696
697 var session CreateSessionResponse
698 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
699 return "", fmt.Errorf("failed to decode response: %w", err)
700 }
701
702 return session.AccessJwt, nil
703}