A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/api/routes"
6 "Coves/internal/atproto/identity"
7 "Coves/internal/atproto/jetstream"
8 "Coves/internal/atproto/oauth"
9 "Coves/internal/core/aggregators"
10 "Coves/internal/core/blobs"
11 "Coves/internal/core/comments"
12 "Coves/internal/core/communities"
13 "Coves/internal/core/communityFeeds"
14 "Coves/internal/core/discover"
15 "Coves/internal/core/posts"
16 "Coves/internal/core/timeline"
17 "Coves/internal/core/unfurl"
18 "Coves/internal/core/users"
19 "bytes"
20 "context"
21 "crypto/rand"
22 "database/sql"
23 "encoding/base64"
24 "encoding/json"
25 "fmt"
26 "io"
27 "log"
28 "net/http"
29 "os"
30 "os/signal"
31 "strings"
32 "syscall"
33 "time"
34
35 "github.com/go-chi/chi/v5"
36 chiMiddleware "github.com/go-chi/chi/v5/middleware"
37 _ "github.com/lib/pq"
38 "github.com/pressly/goose/v3"
39
40 commentsAPI "Coves/internal/api/handlers/comments"
41
42 postgresRepo "Coves/internal/db/postgres"
43)
44
45func main() {
46 // Database configuration (AppView database)
47 dbURL := os.Getenv("DATABASE_URL")
48 if dbURL == "" {
49 // Use dev database from .env.dev
50 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
51 }
52
53 // Default PDS URL for this Coves instance (supports self-hosting)
54 defaultPDS := os.Getenv("PDS_URL")
55 if defaultPDS == "" {
56 defaultPDS = "http://localhost:3001" // Local dev PDS
57 }
58
59 // Cursor secret for HMAC signing (prevents cursor manipulation)
60 cursorSecret := os.Getenv("CURSOR_SECRET")
61 if cursorSecret == "" {
62 // Generate a random secret if not set (dev mode)
63 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value
64 cursorSecret = "dev-cursor-secret-change-in-production"
65 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!")
66 }
67
68 db, err := sql.Open("postgres", dbURL)
69 if err != nil {
70 log.Fatal("Failed to connect to database:", err)
71 }
72 defer func() {
73 if closeErr := db.Close(); closeErr != nil {
74 log.Printf("Failed to close database connection: %v", closeErr)
75 }
76 }()
77
78 if err = db.Ping(); err != nil {
79 log.Fatal("Failed to ping database:", err)
80 }
81
82 log.Println("Connected to AppView database")
83
84 // Run migrations
85 if err = goose.SetDialect("postgres"); err != nil {
86 log.Fatal("Failed to set goose dialect:", err)
87 }
88
89 if err = goose.Up(db, "internal/db/migrations"); err != nil {
90 log.Fatal("Failed to run migrations:", err)
91 }
92
93 log.Println("Migrations completed successfully")
94
95 r := chi.NewRouter()
96
97 r.Use(chiMiddleware.Logger)
98 r.Use(chiMiddleware.Recoverer)
99 r.Use(chiMiddleware.RequestID)
100
101 // Rate limiting: 100 requests per minute per IP
102 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
103 r.Use(rateLimiter.Middleware)
104
105 // Initialize identity resolver
106 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC
107 // directory as DID registration to ensure E2E tests work without hitting
108 // the production plc.directory
109 identityConfig := identity.DefaultConfig()
110
111 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
112 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
113 if plcDirectoryURL == "" {
114 plcDirectoryURL = "https://plc.directory" // Default to production PLC
115 }
116
117 // In dev mode, use PLC_DIRECTORY_URL for identity resolution
118 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL
119 if isDevEnv {
120 identityConfig.PLCURL = plcDirectoryURL
121 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL)
122 } else {
123 // Production: Allow separate IDENTITY_PLC_URL for read operations
124 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" {
125 identityConfig.PLCURL = identityPLCURL
126 } else {
127 identityConfig.PLCURL = plcDirectoryURL
128 }
129 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL)
130 }
131
132 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
133 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil {
134 identityConfig.CacheTTL = duration
135 }
136 }
137
138 identityResolver := identity.NewResolver(db, identityConfig)
139
140 // Get PLC URL for OAuth and other services
141 plcURL := os.Getenv("PLC_DIRECTORY_URL")
142 if plcURL == "" {
143 plcURL = "https://plc.directory"
144 }
145
146 // Initialize OAuth client for sealed session tokens
147 // Mobile apps authenticate via OAuth flow and receive sealed session tokens
148 // These tokens are encrypted references to OAuth sessions stored in the database
149 oauthSealSecret := os.Getenv("OAUTH_SEAL_SECRET")
150 if oauthSealSecret == "" {
151 if os.Getenv("IS_DEV_ENV") != "true" {
152 log.Fatal("OAUTH_SEAL_SECRET is required in production mode")
153 }
154 // Generate RANDOM secret for dev mode
155 randomBytes := make([]byte, 32)
156 if _, err := rand.Read(randomBytes); err != nil {
157 log.Fatal("Failed to generate random seal secret: ", err)
158 }
159 oauthSealSecret = base64.StdEncoding.EncodeToString(randomBytes)
160 log.Println("⚠️ DEV MODE: Generated random OAuth seal secret (won't persist across restarts)")
161 }
162
163 isDevMode := os.Getenv("IS_DEV_ENV") == "true"
164 oauthConfig := &oauth.OAuthConfig{
165 PublicURL: os.Getenv("APPVIEW_PUBLIC_URL"),
166 SealSecret: oauthSealSecret,
167 Scopes: []string{"atproto", "transition:generic"},
168 DevMode: isDevMode,
169 AllowPrivateIPs: isDevMode, // Allow private IPs only in dev mode
170 PLCURL: plcURL,
171 // SessionTTL and SealedTokenTTL will use defaults if not set (7 days and 14 days)
172 }
173
174 // Create PostgreSQL-backed OAuth session store (using default 7-day TTL)
175 baseOAuthStore := oauth.NewPostgresOAuthStore(db, 0)
176 // Wrap with MobileAwareStoreWrapper to capture OAuth state for mobile CSRF validation.
177 // This intercepts SaveAuthRequestInfo to save mobile CSRF data when present in context.
178 oauthStore := oauth.NewMobileAwareStoreWrapper(baseOAuthStore)
179
180 if oauthConfig.PublicURL == "" {
181 oauthConfig.PublicURL = "http://localhost:8080"
182 oauthConfig.DevMode = true // Force dev mode for localhost
183 }
184
185 oauthClient, err := oauth.NewOAuthClient(oauthConfig, oauthStore)
186 if err != nil {
187 log.Fatalf("Failed to initialize OAuth client: %v", err)
188 }
189
190 // Create OAuth handler for HTTP endpoints
191 oauthHandler := oauth.NewOAuthHandler(oauthClient, oauthStore)
192
193 // Create OAuth auth middleware
194 // Validates sealed session tokens and loads OAuth sessions from database
195 authMiddleware := middleware.NewOAuthAuthMiddleware(oauthClient, oauthStore)
196 log.Println("✅ OAuth auth middleware initialized (sealed session tokens)")
197
198 // Initialize repositories and services
199 userRepo := postgresRepo.NewUserRepository(db)
200 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
201
202 communityRepo := postgresRepo.NewCommunityRepository(db)
203
204 // V2.0: PDS-managed DID generation
205 // Community DIDs and keys are generated entirely by the PDS
206 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach)
207
208 instanceDID := os.Getenv("INSTANCE_DID")
209 if instanceDID == "" {
210 instanceDID = "did:web:coves.social" // Default for development
211 }
212
213 // V2: Extract instance domain for community handles
214 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security
215 // We cannot allow arbitrary domains to prevent impersonation attacks
216 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance
217 //
218 // SECURITY: did:web domain verification is implemented in the Jetstream consumer
219 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim()
220 // Communities with mismatched hostedBy domains are rejected during indexing
221 var instanceDomain string
222 if strings.HasPrefix(instanceDID, "did:web:") {
223 // Extract domain from did:web (this is the authoritative source)
224 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
225 } else {
226 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN
227 instanceDomain = os.Getenv("INSTANCE_DOMAIN")
228 if instanceDomain == "" {
229 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs")
230 }
231 }
232
233 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID)
234
235 // Community creation restriction - if set, only these DIDs can create communities
236 var allowedCommunityCreators []string
237 if communityCreators := os.Getenv("COMMUNITY_CREATORS"); communityCreators != "" {
238 for _, did := range strings.Split(communityCreators, ",") {
239 did = strings.TrimSpace(did)
240 if did != "" {
241 allowedCommunityCreators = append(allowedCommunityCreators, did)
242 }
243 }
244 log.Printf("Community creation restricted to %d DIDs", len(allowedCommunityCreators))
245 } else {
246 log.Println("Community creation open to all authenticated users")
247 }
248
249 // V2.0: Initialize PDS account provisioner for communities (simplified)
250 // PDS handles all DID and key generation - no Coves-side cryptography needed
251 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS)
252 log.Printf("✅ Community provisioner initialized (PDS-managed keys)")
253 log.Printf(" - Communities will be created at: %s", defaultPDS)
254 log.Printf(" - PDS will generate and manage all DIDs and keys")
255
256 // Initialize community service (no longer needs didGenerator directly)
257 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner)
258
259 // Authenticate Coves instance with PDS to enable community record writes
260 // The instance needs a PDS account to write community records it owns
261 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
262 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
263 if pdsHandle != "" && pdsPassword != "" {
264 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
265 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
266 if authErr != nil {
267 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr)
268 log.Println("Community creation will fail until PDS authentication is configured")
269 } else {
270 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
271 svc.SetPDSAccessToken(accessToken)
272 log.Println("✓ Coves instance authenticated with PDS")
273 }
274 }
275 } else {
276 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
277 log.Println("Community creation via write-forward is disabled")
278 }
279
280 // Start Jetstream consumer for read-forward user indexing
281 jetstreamURL := os.Getenv("JETSTREAM_URL")
282 if jetstreamURL == "" {
283 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
284 }
285
286 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
287
288 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
289 ctx := context.Background()
290 go func() {
291 if startErr := userConsumer.Start(ctx); startErr != nil {
292 log.Printf("Jetstream consumer stopped: %v", startErr)
293 }
294 }()
295
296 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
297
298 // Start Jetstream consumer for community events (profiles and subscriptions)
299 // This consumer indexes:
300 // 1. Community profiles (social.coves.community.profile) - in community's own repo
301 // 2. User subscriptions (social.coves.community.subscription) - in user's repo
302 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL")
303 if communityJetstreamURL == "" {
304 // Local Jetstream for communities - filter to our instance's collections
305 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe)
306 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures
307 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription"
308 }
309
310 // Initialize community event consumer with did:web verification
311 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true"
312 if skipDIDWebVerification {
313 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)")
314 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production")
315 }
316
317 // Pass identity resolver to consumer for PLC handle resolution (source of truth)
318 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver)
319 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL)
320
321 go func() {
322 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil {
323 log.Printf("Community Jetstream consumer stopped: %v", startErr)
324 }
325 }()
326
327 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL)
328 log.Println(" - Indexing: social.coves.community.profile (community profiles)")
329 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)")
330
331 // Start OAuth session cleanup background job with cancellable context
332 cleanupCtx, cleanupCancel := context.WithCancel(context.Background())
333 go func() {
334 ticker := time.NewTicker(1 * time.Hour)
335 defer ticker.Stop()
336 for {
337 select {
338 case <-cleanupCtx.Done():
339 log.Println("OAuth cleanup job stopped")
340 return
341 case <-ticker.C:
342 // Check if store implements cleanup methods
343 // Use UnwrapPostgresStore to get the underlying store from the wrapper
344 if cleanupStore := oauthStore.UnwrapPostgresStore(); cleanupStore != nil {
345 sessions, sessErr := cleanupStore.CleanupExpiredSessions(cleanupCtx)
346 if sessErr != nil {
347 log.Printf("Error cleaning up expired OAuth sessions: %v", sessErr)
348 }
349 requests, reqErr := cleanupStore.CleanupExpiredAuthRequests(cleanupCtx)
350 if reqErr != nil {
351 log.Printf("Error cleaning up expired OAuth auth requests: %v", reqErr)
352 }
353 if sessions > 0 || requests > 0 {
354 log.Printf("OAuth cleanup: removed %d expired sessions, %d expired auth requests", sessions, requests)
355 }
356 }
357 }
358 }
359 }()
360
361 log.Println("Started OAuth session cleanup background job (runs hourly)")
362
363 // Initialize aggregator service
364 aggregatorRepo := postgresRepo.NewAggregatorRepository(db)
365 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService)
366 log.Println("✅ Aggregator service initialized")
367
368 // Initialize unfurl cache repository
369 unfurlRepo := unfurl.NewRepository(db)
370
371 // Initialize blob upload service
372 blobService := blobs.NewBlobService(defaultPDS)
373
374 // Initialize unfurl service with configuration
375 unfurlService := unfurl.NewService(
376 unfurlRepo,
377 unfurl.WithTimeout(10*time.Second),
378 unfurl.WithUserAgent("CovesBot/1.0 (+https://coves.social)"),
379 unfurl.WithCacheTTL(24*time.Hour),
380 )
381 log.Println("✅ Unfurl and blob services initialized")
382
383 // Initialize post service (with aggregator support)
384 postRepo := postgresRepo.NewPostRepository(db)
385 postService := posts.NewPostService(postRepo, communityService, aggregatorService, blobService, unfurlService, defaultPDS)
386
387 // Initialize vote repository (used by Jetstream consumer for indexing)
388 voteRepo := postgresRepo.NewVoteRepository(db)
389 log.Println("✅ Vote repository initialized (Jetstream indexing only)")
390
391 // Initialize comment repository (used by Jetstream consumer for indexing)
392 commentRepo := postgresRepo.NewCommentRepository(db)
393 log.Println("✅ Comment repository initialized (Jetstream indexing only)")
394
395 // Initialize comment service (for query API)
396 // Requires user and community repos for proper author/community hydration per lexicon
397 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo)
398 log.Println("✅ Comment service initialized (with author/community hydration)")
399
400 // Initialize feed service
401 feedRepo := postgresRepo.NewCommunityFeedRepository(db, cursorSecret)
402 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
403 log.Println("✅ Feed service initialized")
404
405 // Initialize timeline service (home feed from subscribed communities)
406 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret)
407 timelineService := timeline.NewTimelineService(timelineRepo)
408 log.Println("✅ Timeline service initialized")
409
410 // Initialize discover service (public feed from all communities)
411 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret)
412 discoverService := discover.NewDiscoverService(discoverRepo)
413 log.Println("✅ Discover service initialized")
414
415 // Start Jetstream consumer for posts
416 // This consumer indexes posts created in community repositories via the firehose
417 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist
418 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL")
419 if postJetstreamURL == "" {
420 // Listen to post record creation events
421 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post"
422 }
423
424 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
425 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL)
426
427 go func() {
428 if startErr := postJetstreamConnector.Start(ctx); startErr != nil {
429 log.Printf("Post Jetstream consumer stopped: %v", startErr)
430 }
431 }()
432
433 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL)
434 log.Println(" - Indexing: social.coves.community.post CREATE operations")
435 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented")
436
437 // Start Jetstream consumer for aggregators
438 // This consumer indexes aggregator service declarations and authorization records
439 // Following Bluesky's pattern for feed generators and labelers
440 // NOTE: Uses the same Jetstream as communities, just filtering different collections
441 aggregatorJetstreamURL := communityJetstreamURL
442 // Override if specific URL needed for testing
443 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" {
444 aggregatorJetstreamURL = envURL
445 } else if aggregatorJetstreamURL == "" {
446 // Fallback if community URL also not set
447 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization"
448 }
449
450 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo)
451 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL)
452
453 go func() {
454 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil {
455 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr)
456 }
457 }()
458
459 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL)
460 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)")
461 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)")
462
463 // Start Jetstream consumer for votes
464 // This consumer indexes votes from user repositories and updates post vote counts
465 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL")
466 if voteJetstreamURL == "" {
467 // Listen to vote record CREATE/DELETE events from user repositories
468 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote"
469 }
470
471 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
472 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL)
473
474 go func() {
475 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil {
476 log.Printf("Vote Jetstream consumer stopped: %v", startErr)
477 }
478 }()
479
480 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL)
481 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations")
482 log.Println(" - Updating: Post vote counts atomically")
483
484 // Start Jetstream consumer for comments
485 // This consumer indexes comments from user repositories and updates parent counts
486 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL")
487 if commentJetstreamURL == "" {
488 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories
489 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.comment"
490 }
491
492 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db)
493 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL)
494
495 go func() {
496 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil {
497 log.Printf("Comment Jetstream consumer stopped: %v", startErr)
498 }
499 }()
500
501 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL)
502 log.Println(" - Indexing: social.coves.community.comment CREATE/UPDATE/DELETE operations")
503 log.Println(" - Updating: Post comment counts and comment reply counts atomically")
504
505 // Register XRPC routes
506 routes.RegisterUserRoutes(r, userService)
507 routes.RegisterCommunityRoutes(r, communityService, authMiddleware, allowedCommunityCreators)
508 log.Println("Community XRPC endpoints registered with OAuth authentication")
509
510 routes.RegisterPostRoutes(r, postService, authMiddleware)
511 log.Println("Post XRPC endpoints registered with OAuth authentication")
512
513 // Vote write endpoints removed - clients write directly to their PDS
514 // The AppView indexes votes from Jetstream (see vote consumer above)
515
516 routes.RegisterCommunityFeedRoutes(r, feedService)
517 log.Println("Feed XRPC endpoints registered (public, no auth required)")
518
519 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware)
520 log.Println("Timeline XRPC endpoints registered (requires authentication)")
521
522 routes.RegisterDiscoverRoutes(r, discoverService)
523 log.Println("Discover XRPC endpoints registered (public, no auth required)")
524
525 routes.RegisterAggregatorRoutes(r, aggregatorService, userService, identityResolver)
526 log.Println("Aggregator XRPC endpoints registered (query endpoints public, registration endpoint public)")
527
528 // Comment query API - supports optional authentication for viewer state
529 // Stricter rate limiting for expensive nested comment queries
530 commentRateLimiter := middleware.NewRateLimiter(20, 1*time.Minute)
531 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService)
532 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter)
533 r.Handle(
534 "/xrpc/social.coves.community.comment.getComments",
535 commentRateLimiter.Middleware(
536 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments),
537 ),
538 )
539 log.Println("✅ Comment query API registered (20 req/min rate limit)")
540 log.Println(" - GET /xrpc/social.coves.community.comment.getComments")
541
542 // Configure allowed CORS origins for OAuth callback
543 // SECURITY: Never use wildcard "*" with credentials - only allow specific origins
544 var oauthAllowedOrigins []string
545 appviewPublicURL := os.Getenv("APPVIEW_PUBLIC_URL")
546 if appviewPublicURL == "" {
547 appviewPublicURL = "http://localhost:8080"
548 }
549 oauthAllowedOrigins = append(oauthAllowedOrigins, appviewPublicURL)
550
551 // In dev mode, also allow common localhost origins for testing
552 if oauthConfig.DevMode {
553 oauthAllowedOrigins = append(oauthAllowedOrigins,
554 "http://localhost:3000",
555 "http://localhost:3001",
556 "http://localhost:5173",
557 "http://127.0.0.1:8080",
558 "http://127.0.0.1:3000",
559 "http://127.0.0.1:3001",
560 "http://127.0.0.1:5173",
561 )
562 log.Printf("🧪 DEV MODE: OAuth CORS allows localhost origins for testing")
563 }
564 log.Printf("OAuth CORS allowed origins: %v", oauthAllowedOrigins)
565
566 // Register OAuth routes for authentication flow
567 routes.RegisterOAuthRoutes(r, oauthHandler, oauthAllowedOrigins)
568 log.Println("✅ OAuth endpoints registered")
569 log.Println(" - GET /oauth/client-metadata.json")
570 log.Println(" - GET /oauth/jwks.json")
571 log.Println(" - GET /oauth/login")
572 log.Println(" - GET /oauth/mobile/login")
573 log.Println(" - GET /oauth/callback")
574 log.Println(" - POST /oauth/logout")
575 log.Println(" - POST /oauth/refresh")
576
577 // Register well-known routes for mobile app deep linking
578 routes.RegisterWellKnownRoutes(r)
579 log.Println("✅ Well-known endpoints registered (mobile Universal Links & App Links)")
580 log.Println(" - GET /.well-known/apple-app-site-association (iOS Universal Links)")
581 log.Println(" - GET /.well-known/assetlinks.json (Android App Links)")
582
583 // Health check endpoints
584 healthHandler := func(w http.ResponseWriter, r *http.Request) {
585 w.WriteHeader(http.StatusOK)
586 if _, err := w.Write([]byte("OK")); err != nil {
587 log.Printf("Failed to write health check response: %v", err)
588 }
589 }
590 r.Get("/health", healthHandler)
591 r.Get("/xrpc/_health", healthHandler)
592
593 // Check PORT first (docker-compose), then APPVIEW_PORT (legacy)
594 port := os.Getenv("PORT")
595 if port == "" {
596 port = os.Getenv("APPVIEW_PORT")
597 }
598 if port == "" {
599 port = "8080"
600 }
601
602 // Create HTTP server for graceful shutdown
603 server := &http.Server{
604 Addr: ":" + port,
605 Handler: r,
606 }
607
608 // Channel to listen for shutdown signals
609 stop := make(chan os.Signal, 1)
610 signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
611
612 // Start server in goroutine
613 go func() {
614 fmt.Printf("Coves AppView starting on port %s\n", port)
615 fmt.Printf("Default PDS: %s\n", defaultPDS)
616 if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
617 log.Fatalf("Server error: %v", err)
618 }
619 }()
620
621 // Wait for shutdown signal
622 <-stop
623 log.Println("Shutting down server...")
624
625 // Graceful shutdown with timeout
626 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
627 defer cancel()
628
629 // Stop OAuth cleanup background job
630 cleanupCancel()
631
632 if err := server.Shutdown(ctx); err != nil {
633 log.Fatalf("Server shutdown error: %v", err)
634 }
635 log.Println("Server stopped gracefully")
636}
637
638// authenticateWithPDS creates a session on the PDS and returns an access token
639func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
640 type CreateSessionRequest struct {
641 Identifier string `json:"identifier"`
642 Password string `json:"password"`
643 }
644
645 type CreateSessionResponse struct {
646 DID string `json:"did"`
647 Handle string `json:"handle"`
648 AccessJwt string `json:"accessJwt"`
649 }
650
651 reqBody, err := json.Marshal(CreateSessionRequest{
652 Identifier: handle,
653 Password: password,
654 })
655 if err != nil {
656 return "", fmt.Errorf("failed to marshal request: %w", err)
657 }
658
659 resp, err := http.Post(
660 pdsURL+"/xrpc/com.atproto.server.createSession",
661 "application/json",
662 bytes.NewReader(reqBody),
663 )
664 if err != nil {
665 return "", fmt.Errorf("failed to call PDS: %w", err)
666 }
667 defer func() {
668 if closeErr := resp.Body.Close(); closeErr != nil {
669 log.Printf("Failed to close response body: %v", closeErr)
670 }
671 }()
672
673 if resp.StatusCode != http.StatusOK {
674 body, readErr := io.ReadAll(resp.Body)
675 if readErr != nil {
676 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr)
677 }
678 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
679 }
680
681 var session CreateSessionResponse
682 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
683 return "", fmt.Errorf("failed to decode response: %w", err)
684 }
685
686 return session.AccessJwt, nil
687}