A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/api/routes"
6 "Coves/internal/atproto/identity"
7 "Coves/internal/atproto/jetstream"
8 "Coves/internal/atproto/oauth"
9 "Coves/internal/core/aggregators"
10 "Coves/internal/core/blobs"
11 "Coves/internal/core/comments"
12 "Coves/internal/core/communities"
13 "Coves/internal/core/communityFeeds"
14 "Coves/internal/core/discover"
15 "Coves/internal/core/posts"
16 "Coves/internal/core/timeline"
17 "Coves/internal/core/unfurl"
18 "Coves/internal/core/users"
19 "Coves/internal/core/votes"
20 "bytes"
21 "context"
22 "crypto/rand"
23 "database/sql"
24 "encoding/base64"
25 "encoding/json"
26 "fmt"
27 "io"
28 "log"
29 "net/http"
30 "os"
31 "os/signal"
32 "strings"
33 "syscall"
34 "time"
35
36 "github.com/go-chi/chi/v5"
37 chiMiddleware "github.com/go-chi/chi/v5/middleware"
38 _ "github.com/lib/pq"
39 "github.com/pressly/goose/v3"
40
41 commentsAPI "Coves/internal/api/handlers/comments"
42
43 postgresRepo "Coves/internal/db/postgres"
44)
45
46func main() {
47 // Database configuration (AppView database)
48 dbURL := os.Getenv("DATABASE_URL")
49 if dbURL == "" {
50 // Use dev database from .env.dev
51 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
52 }
53
54 // Default PDS URL for this Coves instance (supports self-hosting)
55 defaultPDS := os.Getenv("PDS_URL")
56 if defaultPDS == "" {
57 defaultPDS = "http://localhost:3001" // Local dev PDS
58 }
59
60 // Cursor secret for HMAC signing (prevents cursor manipulation)
61 cursorSecret := os.Getenv("CURSOR_SECRET")
62 if cursorSecret == "" {
63 // Generate a random secret if not set (dev mode)
64 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value
65 cursorSecret = "dev-cursor-secret-change-in-production"
66 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!")
67 }
68
69 db, err := sql.Open("postgres", dbURL)
70 if err != nil {
71 log.Fatal("Failed to connect to database:", err)
72 }
73 defer func() {
74 if closeErr := db.Close(); closeErr != nil {
75 log.Printf("Failed to close database connection: %v", closeErr)
76 }
77 }()
78
79 if err = db.Ping(); err != nil {
80 log.Fatal("Failed to ping database:", err)
81 }
82
83 log.Println("Connected to AppView database")
84
85 // Run migrations
86 if err = goose.SetDialect("postgres"); err != nil {
87 log.Fatal("Failed to set goose dialect:", err)
88 }
89
90 if err = goose.Up(db, "internal/db/migrations"); err != nil {
91 log.Fatal("Failed to run migrations:", err)
92 }
93
94 log.Println("Migrations completed successfully")
95
96 r := chi.NewRouter()
97
98 r.Use(chiMiddleware.Logger)
99 r.Use(chiMiddleware.Recoverer)
100 r.Use(chiMiddleware.RequestID)
101
102 // Rate limiting: 100 requests per minute per IP
103 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
104 r.Use(rateLimiter.Middleware)
105
106 // Initialize identity resolver
107 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC
108 // directory as DID registration to ensure E2E tests work without hitting
109 // the production plc.directory
110 identityConfig := identity.DefaultConfig()
111
112 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
113 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
114 if plcDirectoryURL == "" {
115 plcDirectoryURL = "https://plc.directory" // Default to production PLC
116 }
117
118 // In dev mode, use PLC_DIRECTORY_URL for identity resolution
119 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL
120 if isDevEnv {
121 identityConfig.PLCURL = plcDirectoryURL
122 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL)
123 } else {
124 // Production: Allow separate IDENTITY_PLC_URL for read operations
125 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" {
126 identityConfig.PLCURL = identityPLCURL
127 } else {
128 identityConfig.PLCURL = plcDirectoryURL
129 }
130 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL)
131 }
132
133 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
134 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil {
135 identityConfig.CacheTTL = duration
136 }
137 }
138
139 identityResolver := identity.NewResolver(db, identityConfig)
140
141 // Get PLC URL for OAuth and other services
142 plcURL := os.Getenv("PLC_DIRECTORY_URL")
143 if plcURL == "" {
144 plcURL = "https://plc.directory"
145 }
146
147 // Initialize OAuth client for sealed session tokens
148 // Mobile apps authenticate via OAuth flow and receive sealed session tokens
149 // These tokens are encrypted references to OAuth sessions stored in the database
150 oauthSealSecret := os.Getenv("OAUTH_SEAL_SECRET")
151 if oauthSealSecret == "" {
152 if os.Getenv("IS_DEV_ENV") != "true" {
153 log.Fatal("OAUTH_SEAL_SECRET is required in production mode")
154 }
155 // Generate RANDOM secret for dev mode
156 randomBytes := make([]byte, 32)
157 if _, err := rand.Read(randomBytes); err != nil {
158 log.Fatal("Failed to generate random seal secret: ", err)
159 }
160 oauthSealSecret = base64.StdEncoding.EncodeToString(randomBytes)
161 log.Println("⚠️ DEV MODE: Generated random OAuth seal secret (won't persist across restarts)")
162 }
163
164 isDevMode := os.Getenv("IS_DEV_ENV") == "true"
165 oauthConfig := &oauth.OAuthConfig{
166 PublicURL: os.Getenv("APPVIEW_PUBLIC_URL"),
167 SealSecret: oauthSealSecret,
168 Scopes: []string{"atproto", "transition:generic"},
169 DevMode: isDevMode,
170 AllowPrivateIPs: isDevMode, // Allow private IPs only in dev mode
171 PLCURL: plcURL,
172 // SessionTTL and SealedTokenTTL will use defaults if not set (7 days and 14 days)
173 }
174
175 // Create PostgreSQL-backed OAuth session store (using default 7-day TTL)
176 baseOAuthStore := oauth.NewPostgresOAuthStore(db, 0)
177 // Wrap with MobileAwareStoreWrapper to capture OAuth state for mobile CSRF validation.
178 // This intercepts SaveAuthRequestInfo to save mobile CSRF data when present in context.
179 oauthStore := oauth.NewMobileAwareStoreWrapper(baseOAuthStore)
180
181 if oauthConfig.PublicURL == "" {
182 oauthConfig.PublicURL = "http://localhost:8080"
183 oauthConfig.DevMode = true // Force dev mode for localhost
184 }
185
186 oauthClient, err := oauth.NewOAuthClient(oauthConfig, oauthStore)
187 if err != nil {
188 log.Fatalf("Failed to initialize OAuth client: %v", err)
189 }
190
191 // Create OAuth handler for HTTP endpoints
192 oauthHandler := oauth.NewOAuthHandler(oauthClient, oauthStore)
193
194 // Create OAuth auth middleware
195 // Validates sealed session tokens and loads OAuth sessions from database
196 authMiddleware := middleware.NewOAuthAuthMiddleware(oauthClient, oauthStore)
197 log.Println("✅ OAuth auth middleware initialized (sealed session tokens)")
198
199 // Initialize repositories and services
200 userRepo := postgresRepo.NewUserRepository(db)
201 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
202
203 communityRepo := postgresRepo.NewCommunityRepository(db)
204
205 // V2.0: PDS-managed DID generation
206 // Community DIDs and keys are generated entirely by the PDS
207 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach)
208
209 instanceDID := os.Getenv("INSTANCE_DID")
210 if instanceDID == "" {
211 instanceDID = "did:web:coves.social" // Default for development
212 }
213
214 // V2: Extract instance domain for community handles
215 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security
216 // We cannot allow arbitrary domains to prevent impersonation attacks
217 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance
218 //
219 // SECURITY: did:web domain verification is implemented in the Jetstream consumer
220 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim()
221 // Communities with mismatched hostedBy domains are rejected during indexing
222 var instanceDomain string
223 if strings.HasPrefix(instanceDID, "did:web:") {
224 // Extract domain from did:web (this is the authoritative source)
225 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
226 } else {
227 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN
228 instanceDomain = os.Getenv("INSTANCE_DOMAIN")
229 if instanceDomain == "" {
230 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs")
231 }
232 }
233
234 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID)
235
236 // Community creation restriction - if set, only these DIDs can create communities
237 var allowedCommunityCreators []string
238 if communityCreators := os.Getenv("COMMUNITY_CREATORS"); communityCreators != "" {
239 for _, did := range strings.Split(communityCreators, ",") {
240 did = strings.TrimSpace(did)
241 if did != "" {
242 allowedCommunityCreators = append(allowedCommunityCreators, did)
243 }
244 }
245 log.Printf("Community creation restricted to %d DIDs", len(allowedCommunityCreators))
246 } else {
247 log.Println("Community creation open to all authenticated users")
248 }
249
250 // V2.0: Initialize PDS account provisioner for communities (simplified)
251 // PDS handles all DID and key generation - no Coves-side cryptography needed
252 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS)
253 log.Printf("✅ Community provisioner initialized (PDS-managed keys)")
254 log.Printf(" - Communities will be created at: %s", defaultPDS)
255 log.Printf(" - PDS will generate and manage all DIDs and keys")
256
257 // Initialize community service (no longer needs didGenerator directly)
258 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner)
259
260 // Authenticate Coves instance with PDS to enable community record writes
261 // The instance needs a PDS account to write community records it owns
262 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
263 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
264 if pdsHandle != "" && pdsPassword != "" {
265 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
266 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
267 if authErr != nil {
268 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr)
269 log.Println("Community creation will fail until PDS authentication is configured")
270 } else {
271 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
272 svc.SetPDSAccessToken(accessToken)
273 log.Println("✓ Coves instance authenticated with PDS")
274 }
275 }
276 } else {
277 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
278 log.Println("Community creation via write-forward is disabled")
279 }
280
281 // Start Jetstream consumer for read-forward user indexing
282 jetstreamURL := os.Getenv("JETSTREAM_URL")
283 if jetstreamURL == "" {
284 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
285 }
286
287 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
288
289 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
290 ctx := context.Background()
291 go func() {
292 if startErr := userConsumer.Start(ctx); startErr != nil {
293 log.Printf("Jetstream consumer stopped: %v", startErr)
294 }
295 }()
296
297 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
298
299 // Start Jetstream consumer for community events (profiles and subscriptions)
300 // This consumer indexes:
301 // 1. Community profiles (social.coves.community.profile) - in community's own repo
302 // 2. User subscriptions (social.coves.community.subscription) - in user's repo
303 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL")
304 if communityJetstreamURL == "" {
305 // Local Jetstream for communities - filter to our instance's collections
306 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe)
307 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures
308 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription"
309 }
310
311 // Initialize community event consumer with did:web verification
312 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true"
313 if skipDIDWebVerification {
314 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)")
315 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production")
316 }
317
318 // Pass identity resolver to consumer for PLC handle resolution (source of truth)
319 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver)
320 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL)
321
322 go func() {
323 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil {
324 log.Printf("Community Jetstream consumer stopped: %v", startErr)
325 }
326 }()
327
328 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL)
329 log.Println(" - Indexing: social.coves.community.profile (community profiles)")
330 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)")
331
332 // Start OAuth session cleanup background job with cancellable context
333 cleanupCtx, cleanupCancel := context.WithCancel(context.Background())
334 go func() {
335 ticker := time.NewTicker(1 * time.Hour)
336 defer ticker.Stop()
337 for {
338 select {
339 case <-cleanupCtx.Done():
340 log.Println("OAuth cleanup job stopped")
341 return
342 case <-ticker.C:
343 // Check if store implements cleanup methods
344 // Use UnwrapPostgresStore to get the underlying store from the wrapper
345 if cleanupStore := oauthStore.UnwrapPostgresStore(); cleanupStore != nil {
346 sessions, sessErr := cleanupStore.CleanupExpiredSessions(cleanupCtx)
347 if sessErr != nil {
348 log.Printf("Error cleaning up expired OAuth sessions: %v", sessErr)
349 }
350 requests, reqErr := cleanupStore.CleanupExpiredAuthRequests(cleanupCtx)
351 if reqErr != nil {
352 log.Printf("Error cleaning up expired OAuth auth requests: %v", reqErr)
353 }
354 if sessions > 0 || requests > 0 {
355 log.Printf("OAuth cleanup: removed %d expired sessions, %d expired auth requests", sessions, requests)
356 }
357 }
358 }
359 }
360 }()
361
362 log.Println("Started OAuth session cleanup background job (runs hourly)")
363
364 // Initialize aggregator service
365 aggregatorRepo := postgresRepo.NewAggregatorRepository(db)
366 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService)
367 log.Println("✅ Aggregator service initialized")
368
369 // Initialize unfurl cache repository
370 unfurlRepo := unfurl.NewRepository(db)
371
372 // Initialize blob upload service
373 blobService := blobs.NewBlobService(defaultPDS)
374
375 // Initialize unfurl service with configuration
376 unfurlService := unfurl.NewService(
377 unfurlRepo,
378 unfurl.WithTimeout(10*time.Second),
379 unfurl.WithUserAgent("CovesBot/1.0 (+https://coves.social)"),
380 unfurl.WithCacheTTL(24*time.Hour),
381 )
382 log.Println("✅ Unfurl and blob services initialized")
383
384 // Initialize post service (with aggregator support)
385 postRepo := postgresRepo.NewPostRepository(db)
386 postService := posts.NewPostService(postRepo, communityService, aggregatorService, blobService, unfurlService, defaultPDS)
387
388 // Initialize vote repository (used by Jetstream consumer for indexing)
389 voteRepo := postgresRepo.NewVoteRepository(db)
390 log.Println("✅ Vote repository initialized (Jetstream indexing only)")
391
392 // Initialize comment repository (used by Jetstream consumer for indexing)
393 commentRepo := postgresRepo.NewCommentRepository(db)
394 log.Println("✅ Comment repository initialized (Jetstream indexing only)")
395
396 // Initialize vote service (for XRPC API endpoints)
397 // Note: We don't validate subject existence - the vote goes to the user's PDS regardless.
398 // The Jetstream consumer handles orphaned votes correctly by only updating counts for
399 // non-deleted subjects. This avoids race conditions and eventual consistency issues.
400 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
401 log.Println("✅ Vote service initialized (with OAuth authentication)")
402
403 // Initialize comment service (for query API)
404 // Requires user and community repos for proper author/community hydration per lexicon
405 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo)
406 log.Println("✅ Comment service initialized (with author/community hydration)")
407
408 // Initialize feed service
409 feedRepo := postgresRepo.NewCommunityFeedRepository(db, cursorSecret)
410 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
411 log.Println("✅ Feed service initialized")
412
413 // Initialize timeline service (home feed from subscribed communities)
414 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret)
415 timelineService := timeline.NewTimelineService(timelineRepo)
416 log.Println("✅ Timeline service initialized")
417
418 // Initialize discover service (public feed from all communities)
419 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret)
420 discoverService := discover.NewDiscoverService(discoverRepo)
421 log.Println("✅ Discover service initialized")
422
423 // Start Jetstream consumer for posts
424 // This consumer indexes posts created in community repositories via the firehose
425 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist
426 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL")
427 if postJetstreamURL == "" {
428 // Listen to post record creation events
429 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post"
430 }
431
432 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
433 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL)
434
435 go func() {
436 if startErr := postJetstreamConnector.Start(ctx); startErr != nil {
437 log.Printf("Post Jetstream consumer stopped: %v", startErr)
438 }
439 }()
440
441 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL)
442 log.Println(" - Indexing: social.coves.community.post CREATE operations")
443 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented")
444
445 // Start Jetstream consumer for aggregators
446 // This consumer indexes aggregator service declarations and authorization records
447 // Following Bluesky's pattern for feed generators and labelers
448 // NOTE: Uses the same Jetstream as communities, just filtering different collections
449 aggregatorJetstreamURL := communityJetstreamURL
450 // Override if specific URL needed for testing
451 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" {
452 aggregatorJetstreamURL = envURL
453 } else if aggregatorJetstreamURL == "" {
454 // Fallback if community URL also not set
455 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization"
456 }
457
458 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo)
459 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL)
460
461 go func() {
462 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil {
463 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr)
464 }
465 }()
466
467 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL)
468 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)")
469 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)")
470
471 // Start Jetstream consumer for votes
472 // This consumer indexes votes from user repositories and updates post vote counts
473 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL")
474 if voteJetstreamURL == "" {
475 // Listen to vote record CREATE/DELETE events from user repositories
476 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote"
477 }
478
479 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
480 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL)
481
482 go func() {
483 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil {
484 log.Printf("Vote Jetstream consumer stopped: %v", startErr)
485 }
486 }()
487
488 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL)
489 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations")
490 log.Println(" - Updating: Post vote counts atomically")
491
492 // Start Jetstream consumer for comments
493 // This consumer indexes comments from user repositories and updates parent counts
494 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL")
495 if commentJetstreamURL == "" {
496 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories
497 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.comment"
498 }
499
500 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db)
501 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL)
502
503 go func() {
504 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil {
505 log.Printf("Comment Jetstream consumer stopped: %v", startErr)
506 }
507 }()
508
509 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL)
510 log.Println(" - Indexing: social.coves.community.comment CREATE/UPDATE/DELETE operations")
511 log.Println(" - Updating: Post comment counts and comment reply counts atomically")
512
513 // Register XRPC routes
514 routes.RegisterUserRoutes(r, userService)
515 routes.RegisterCommunityRoutes(r, communityService, authMiddleware, allowedCommunityCreators)
516 log.Println("Community XRPC endpoints registered with OAuth authentication")
517
518 routes.RegisterPostRoutes(r, postService, authMiddleware)
519 log.Println("Post XRPC endpoints registered with OAuth authentication")
520
521 routes.RegisterVoteRoutes(r, voteService, authMiddleware)
522 log.Println("Vote XRPC endpoints registered with OAuth authentication")
523
524 routes.RegisterCommunityFeedRoutes(r, feedService)
525 log.Println("Feed XRPC endpoints registered (public, no auth required)")
526
527 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware)
528 log.Println("Timeline XRPC endpoints registered (requires authentication)")
529
530 routes.RegisterDiscoverRoutes(r, discoverService)
531 log.Println("Discover XRPC endpoints registered (public, no auth required)")
532
533 routes.RegisterAggregatorRoutes(r, aggregatorService, userService, identityResolver)
534 log.Println("Aggregator XRPC endpoints registered (query endpoints public, registration endpoint public)")
535
536 // Comment query API - supports optional authentication for viewer state
537 // Stricter rate limiting for expensive nested comment queries
538 commentRateLimiter := middleware.NewRateLimiter(20, 1*time.Minute)
539 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService)
540 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter)
541 r.Handle(
542 "/xrpc/social.coves.community.comment.getComments",
543 commentRateLimiter.Middleware(
544 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments),
545 ),
546 )
547 log.Println("✅ Comment query API registered (20 req/min rate limit)")
548 log.Println(" - GET /xrpc/social.coves.community.comment.getComments")
549
550 // Configure allowed CORS origins for OAuth callback
551 // SECURITY: Never use wildcard "*" with credentials - only allow specific origins
552 var oauthAllowedOrigins []string
553 appviewPublicURL := os.Getenv("APPVIEW_PUBLIC_URL")
554 if appviewPublicURL == "" {
555 appviewPublicURL = "http://localhost:8080"
556 }
557 oauthAllowedOrigins = append(oauthAllowedOrigins, appviewPublicURL)
558
559 // In dev mode, also allow common localhost origins for testing
560 if oauthConfig.DevMode {
561 oauthAllowedOrigins = append(oauthAllowedOrigins,
562 "http://localhost:3000",
563 "http://localhost:3001",
564 "http://localhost:5173",
565 "http://127.0.0.1:8080",
566 "http://127.0.0.1:3000",
567 "http://127.0.0.1:3001",
568 "http://127.0.0.1:5173",
569 )
570 log.Printf("🧪 DEV MODE: OAuth CORS allows localhost origins for testing")
571 }
572 log.Printf("OAuth CORS allowed origins: %v", oauthAllowedOrigins)
573
574 // Register OAuth routes for authentication flow
575 routes.RegisterOAuthRoutes(r, oauthHandler, oauthAllowedOrigins)
576 log.Println("✅ OAuth endpoints registered")
577 log.Println(" - GET /oauth/client-metadata.json")
578 log.Println(" - GET /oauth/jwks.json")
579 log.Println(" - GET /oauth/login")
580 log.Println(" - GET /oauth/mobile/login")
581 log.Println(" - GET /oauth/callback")
582 log.Println(" - POST /oauth/logout")
583 log.Println(" - POST /oauth/refresh")
584
585 // Register well-known routes for mobile app deep linking
586 routes.RegisterWellKnownRoutes(r)
587 log.Println("✅ Well-known endpoints registered (mobile Universal Links & App Links)")
588 log.Println(" - GET /.well-known/apple-app-site-association (iOS Universal Links)")
589 log.Println(" - GET /.well-known/assetlinks.json (Android App Links)")
590
591 // Health check endpoints
592 healthHandler := func(w http.ResponseWriter, r *http.Request) {
593 w.WriteHeader(http.StatusOK)
594 if _, err := w.Write([]byte("OK")); err != nil {
595 log.Printf("Failed to write health check response: %v", err)
596 }
597 }
598 r.Get("/health", healthHandler)
599 r.Get("/xrpc/_health", healthHandler)
600
601 // Check PORT first (docker-compose), then APPVIEW_PORT (legacy)
602 port := os.Getenv("PORT")
603 if port == "" {
604 port = os.Getenv("APPVIEW_PORT")
605 }
606 if port == "" {
607 port = "8080"
608 }
609
610 // Create HTTP server for graceful shutdown
611 server := &http.Server{
612 Addr: ":" + port,
613 Handler: r,
614 }
615
616 // Channel to listen for shutdown signals
617 stop := make(chan os.Signal, 1)
618 signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
619
620 // Start server in goroutine
621 go func() {
622 fmt.Printf("Coves AppView starting on port %s\n", port)
623 fmt.Printf("Default PDS: %s\n", defaultPDS)
624 if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
625 log.Fatalf("Server error: %v", err)
626 }
627 }()
628
629 // Wait for shutdown signal
630 <-stop
631 log.Println("Shutting down server...")
632
633 // Graceful shutdown with timeout
634 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
635 defer cancel()
636
637 // Stop OAuth cleanup background job
638 cleanupCancel()
639
640 if err := server.Shutdown(ctx); err != nil {
641 log.Fatalf("Server shutdown error: %v", err)
642 }
643 log.Println("Server stopped gracefully")
644}
645
646// authenticateWithPDS creates a session on the PDS and returns an access token
647func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
648 type CreateSessionRequest struct {
649 Identifier string `json:"identifier"`
650 Password string `json:"password"`
651 }
652
653 type CreateSessionResponse struct {
654 DID string `json:"did"`
655 Handle string `json:"handle"`
656 AccessJwt string `json:"accessJwt"`
657 }
658
659 reqBody, err := json.Marshal(CreateSessionRequest{
660 Identifier: handle,
661 Password: password,
662 })
663 if err != nil {
664 return "", fmt.Errorf("failed to marshal request: %w", err)
665 }
666
667 resp, err := http.Post(
668 pdsURL+"/xrpc/com.atproto.server.createSession",
669 "application/json",
670 bytes.NewReader(reqBody),
671 )
672 if err != nil {
673 return "", fmt.Errorf("failed to call PDS: %w", err)
674 }
675 defer func() {
676 if closeErr := resp.Body.Close(); closeErr != nil {
677 log.Printf("Failed to close response body: %v", closeErr)
678 }
679 }()
680
681 if resp.StatusCode != http.StatusOK {
682 body, readErr := io.ReadAll(resp.Body)
683 if readErr != nil {
684 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr)
685 }
686 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
687 }
688
689 var session CreateSessionResponse
690 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
691 return "", fmt.Errorf("failed to decode response: %w", err)
692 }
693
694 return session.AccessJwt, nil
695}