A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/api/routes" 6 "Coves/internal/atproto/identity" 7 "Coves/internal/atproto/jetstream" 8 "Coves/internal/atproto/oauth" 9 "Coves/internal/core/aggregators" 10 "Coves/internal/core/blobs" 11 "Coves/internal/core/comments" 12 "Coves/internal/core/communities" 13 "Coves/internal/core/communityFeeds" 14 "Coves/internal/core/discover" 15 "Coves/internal/core/posts" 16 "Coves/internal/core/timeline" 17 "Coves/internal/core/unfurl" 18 "Coves/internal/core/users" 19 "bytes" 20 "context" 21 "crypto/rand" 22 "database/sql" 23 "encoding/base64" 24 "encoding/json" 25 "fmt" 26 "io" 27 "log" 28 "net/http" 29 "os" 30 "os/signal" 31 "strings" 32 "syscall" 33 "time" 34 35 "github.com/go-chi/chi/v5" 36 chiMiddleware "github.com/go-chi/chi/v5/middleware" 37 _ "github.com/lib/pq" 38 "github.com/pressly/goose/v3" 39 40 commentsAPI "Coves/internal/api/handlers/comments" 41 42 postgresRepo "Coves/internal/db/postgres" 43) 44 45func main() { 46 // Database configuration (AppView database) 47 dbURL := os.Getenv("DATABASE_URL") 48 if dbURL == "" { 49 // Use dev database from .env.dev 50 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 51 } 52 53 // Default PDS URL for this Coves instance (supports self-hosting) 54 defaultPDS := os.Getenv("PDS_URL") 55 if defaultPDS == "" { 56 defaultPDS = "http://localhost:3001" // Local dev PDS 57 } 58 59 // Cursor secret for HMAC signing (prevents cursor manipulation) 60 cursorSecret := os.Getenv("CURSOR_SECRET") 61 if cursorSecret == "" { 62 // Generate a random secret if not set (dev mode) 63 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value 64 cursorSecret = "dev-cursor-secret-change-in-production" 65 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!") 66 } 67 68 db, err := sql.Open("postgres", dbURL) 69 if err != nil { 70 log.Fatal("Failed to connect to database:", err) 71 } 72 defer func() { 73 if closeErr := db.Close(); closeErr != nil { 74 log.Printf("Failed to close database connection: %v", closeErr) 75 } 76 }() 77 78 if err = db.Ping(); err != nil { 79 log.Fatal("Failed to ping database:", err) 80 } 81 82 log.Println("Connected to AppView database") 83 84 // Run migrations 85 if err = goose.SetDialect("postgres"); err != nil { 86 log.Fatal("Failed to set goose dialect:", err) 87 } 88 89 if err = goose.Up(db, "internal/db/migrations"); err != nil { 90 log.Fatal("Failed to run migrations:", err) 91 } 92 93 log.Println("Migrations completed successfully") 94 95 r := chi.NewRouter() 96 97 r.Use(chiMiddleware.Logger) 98 r.Use(chiMiddleware.Recoverer) 99 r.Use(chiMiddleware.RequestID) 100 101 // Rate limiting: 100 requests per minute per IP 102 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 103 r.Use(rateLimiter.Middleware) 104 105 // Initialize identity resolver 106 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 107 // directory as DID registration to ensure E2E tests work without hitting 108 // the production plc.directory 109 identityConfig := identity.DefaultConfig() 110 111 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 112 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 113 if plcDirectoryURL == "" { 114 plcDirectoryURL = "https://plc.directory" // Default to production PLC 115 } 116 117 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 118 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 119 if isDevEnv { 120 identityConfig.PLCURL = plcDirectoryURL 121 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 122 } else { 123 // Production: Allow separate IDENTITY_PLC_URL for read operations 124 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 125 identityConfig.PLCURL = identityPLCURL 126 } else { 127 identityConfig.PLCURL = plcDirectoryURL 128 } 129 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 130 } 131 132 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 133 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 134 identityConfig.CacheTTL = duration 135 } 136 } 137 138 identityResolver := identity.NewResolver(db, identityConfig) 139 140 // Get PLC URL for OAuth and other services 141 plcURL := os.Getenv("PLC_DIRECTORY_URL") 142 if plcURL == "" { 143 plcURL = "https://plc.directory" 144 } 145 146 // Initialize OAuth client for sealed session tokens 147 // Mobile apps authenticate via OAuth flow and receive sealed session tokens 148 // These tokens are encrypted references to OAuth sessions stored in the database 149 oauthSealSecret := os.Getenv("OAUTH_SEAL_SECRET") 150 if oauthSealSecret == "" { 151 if os.Getenv("IS_DEV_ENV") != "true" { 152 log.Fatal("OAUTH_SEAL_SECRET is required in production mode") 153 } 154 // Generate RANDOM secret for dev mode 155 randomBytes := make([]byte, 32) 156 if _, err := rand.Read(randomBytes); err != nil { 157 log.Fatal("Failed to generate random seal secret: ", err) 158 } 159 oauthSealSecret = base64.StdEncoding.EncodeToString(randomBytes) 160 log.Println("⚠️ DEV MODE: Generated random OAuth seal secret (won't persist across restarts)") 161 } 162 163 isDevMode := os.Getenv("IS_DEV_ENV") == "true" 164 oauthConfig := &oauth.OAuthConfig{ 165 PublicURL: os.Getenv("APPVIEW_PUBLIC_URL"), 166 SealSecret: oauthSealSecret, 167 Scopes: []string{"atproto", "transition:generic"}, 168 DevMode: isDevMode, 169 AllowPrivateIPs: isDevMode, // Allow private IPs only in dev mode 170 PLCURL: plcURL, 171 // SessionTTL and SealedTokenTTL will use defaults if not set (7 days and 14 days) 172 } 173 174 // Create PostgreSQL-backed OAuth session store (using default 7-day TTL) 175 baseOAuthStore := oauth.NewPostgresOAuthStore(db, 0) 176 // Wrap with MobileAwareStoreWrapper to capture OAuth state for mobile CSRF validation. 177 // This intercepts SaveAuthRequestInfo to save mobile CSRF data when present in context. 178 oauthStore := oauth.NewMobileAwareStoreWrapper(baseOAuthStore) 179 180 if oauthConfig.PublicURL == "" { 181 oauthConfig.PublicURL = "http://localhost:8080" 182 oauthConfig.DevMode = true // Force dev mode for localhost 183 } 184 185 oauthClient, err := oauth.NewOAuthClient(oauthConfig, oauthStore) 186 if err != nil { 187 log.Fatalf("Failed to initialize OAuth client: %v", err) 188 } 189 190 // Create OAuth handler for HTTP endpoints 191 oauthHandler := oauth.NewOAuthHandler(oauthClient, oauthStore) 192 193 // Create OAuth auth middleware 194 // Validates sealed session tokens and loads OAuth sessions from database 195 authMiddleware := middleware.NewOAuthAuthMiddleware(oauthClient, oauthStore) 196 log.Println("✅ OAuth auth middleware initialized (sealed session tokens)") 197 198 // Initialize repositories and services 199 userRepo := postgresRepo.NewUserRepository(db) 200 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 201 202 communityRepo := postgresRepo.NewCommunityRepository(db) 203 204 // V2.0: PDS-managed DID generation 205 // Community DIDs and keys are generated entirely by the PDS 206 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 207 208 instanceDID := os.Getenv("INSTANCE_DID") 209 if instanceDID == "" { 210 instanceDID = "did:web:coves.social" // Default for development 211 } 212 213 // V2: Extract instance domain for community handles 214 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 215 // We cannot allow arbitrary domains to prevent impersonation attacks 216 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 217 // 218 // SECURITY: did:web domain verification is implemented in the Jetstream consumer 219 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim() 220 // Communities with mismatched hostedBy domains are rejected during indexing 221 var instanceDomain string 222 if strings.HasPrefix(instanceDID, "did:web:") { 223 // Extract domain from did:web (this is the authoritative source) 224 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 225 } else { 226 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 227 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 228 if instanceDomain == "" { 229 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 230 } 231 } 232 233 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 234 235 // Community creation restriction - if set, only these DIDs can create communities 236 var allowedCommunityCreators []string 237 if communityCreators := os.Getenv("COMMUNITY_CREATORS"); communityCreators != "" { 238 for _, did := range strings.Split(communityCreators, ",") { 239 did = strings.TrimSpace(did) 240 if did != "" { 241 allowedCommunityCreators = append(allowedCommunityCreators, did) 242 } 243 } 244 log.Printf("Community creation restricted to %d DIDs", len(allowedCommunityCreators)) 245 } else { 246 log.Println("Community creation open to all authenticated users") 247 } 248 249 // V2.0: Initialize PDS account provisioner for communities (simplified) 250 // PDS handles all DID and key generation - no Coves-side cryptography needed 251 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 252 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 253 log.Printf(" - Communities will be created at: %s", defaultPDS) 254 log.Printf(" - PDS will generate and manage all DIDs and keys") 255 256 // Initialize community service (no longer needs didGenerator directly) 257 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 258 259 // Authenticate Coves instance with PDS to enable community record writes 260 // The instance needs a PDS account to write community records it owns 261 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 262 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 263 if pdsHandle != "" && pdsPassword != "" { 264 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 265 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 266 if authErr != nil { 267 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 268 log.Println("Community creation will fail until PDS authentication is configured") 269 } else { 270 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 271 svc.SetPDSAccessToken(accessToken) 272 log.Println("✓ Coves instance authenticated with PDS") 273 } 274 } 275 } else { 276 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 277 log.Println("Community creation via write-forward is disabled") 278 } 279 280 // Start Jetstream consumer for read-forward user indexing 281 jetstreamURL := os.Getenv("JETSTREAM_URL") 282 if jetstreamURL == "" { 283 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 284 } 285 286 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 287 288 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 289 ctx := context.Background() 290 go func() { 291 if startErr := userConsumer.Start(ctx); startErr != nil { 292 log.Printf("Jetstream consumer stopped: %v", startErr) 293 } 294 }() 295 296 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 297 298 // Start Jetstream consumer for community events (profiles and subscriptions) 299 // This consumer indexes: 300 // 1. Community profiles (social.coves.community.profile) - in community's own repo 301 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 302 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 303 if communityJetstreamURL == "" { 304 // Local Jetstream for communities - filter to our instance's collections 305 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 306 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 307 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 308 } 309 310 // Initialize community event consumer with did:web verification 311 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true" 312 if skipDIDWebVerification { 313 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)") 314 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production") 315 } 316 317 // Pass identity resolver to consumer for PLC handle resolution (source of truth) 318 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver) 319 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 320 321 go func() { 322 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 323 log.Printf("Community Jetstream consumer stopped: %v", startErr) 324 } 325 }() 326 327 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 328 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 329 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 330 331 // Start OAuth session cleanup background job with cancellable context 332 cleanupCtx, cleanupCancel := context.WithCancel(context.Background()) 333 go func() { 334 ticker := time.NewTicker(1 * time.Hour) 335 defer ticker.Stop() 336 for { 337 select { 338 case <-cleanupCtx.Done(): 339 log.Println("OAuth cleanup job stopped") 340 return 341 case <-ticker.C: 342 // Check if store implements cleanup methods 343 // Use UnwrapPostgresStore to get the underlying store from the wrapper 344 if cleanupStore := oauthStore.UnwrapPostgresStore(); cleanupStore != nil { 345 sessions, sessErr := cleanupStore.CleanupExpiredSessions(cleanupCtx) 346 if sessErr != nil { 347 log.Printf("Error cleaning up expired OAuth sessions: %v", sessErr) 348 } 349 requests, reqErr := cleanupStore.CleanupExpiredAuthRequests(cleanupCtx) 350 if reqErr != nil { 351 log.Printf("Error cleaning up expired OAuth auth requests: %v", reqErr) 352 } 353 if sessions > 0 || requests > 0 { 354 log.Printf("OAuth cleanup: removed %d expired sessions, %d expired auth requests", sessions, requests) 355 } 356 } 357 } 358 } 359 }() 360 361 log.Println("Started OAuth session cleanup background job (runs hourly)") 362 363 // Initialize aggregator service 364 aggregatorRepo := postgresRepo.NewAggregatorRepository(db) 365 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 366 log.Println("✅ Aggregator service initialized") 367 368 // Initialize unfurl cache repository 369 unfurlRepo := unfurl.NewRepository(db) 370 371 // Initialize blob upload service 372 blobService := blobs.NewBlobService(defaultPDS) 373 374 // Initialize unfurl service with configuration 375 unfurlService := unfurl.NewService( 376 unfurlRepo, 377 unfurl.WithTimeout(10*time.Second), 378 unfurl.WithUserAgent("CovesBot/1.0 (+https://coves.social)"), 379 unfurl.WithCacheTTL(24*time.Hour), 380 ) 381 log.Println("✅ Unfurl and blob services initialized") 382 383 // Initialize post service (with aggregator support) 384 postRepo := postgresRepo.NewPostRepository(db) 385 postService := posts.NewPostService(postRepo, communityService, aggregatorService, blobService, unfurlService, defaultPDS) 386 387 // Initialize vote repository (used by Jetstream consumer for indexing) 388 voteRepo := postgresRepo.NewVoteRepository(db) 389 log.Println("✅ Vote repository initialized (Jetstream indexing only)") 390 391 // Initialize comment repository (used by Jetstream consumer for indexing) 392 commentRepo := postgresRepo.NewCommentRepository(db) 393 log.Println("✅ Comment repository initialized (Jetstream indexing only)") 394 395 // Initialize comment service (for query API) 396 // Requires user and community repos for proper author/community hydration per lexicon 397 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo) 398 log.Println("✅ Comment service initialized (with author/community hydration)") 399 400 // Initialize feed service 401 feedRepo := postgresRepo.NewCommunityFeedRepository(db, cursorSecret) 402 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 403 log.Println("✅ Feed service initialized") 404 405 // Initialize timeline service (home feed from subscribed communities) 406 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret) 407 timelineService := timeline.NewTimelineService(timelineRepo) 408 log.Println("✅ Timeline service initialized") 409 410 // Initialize discover service (public feed from all communities) 411 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret) 412 discoverService := discover.NewDiscoverService(discoverRepo) 413 log.Println("✅ Discover service initialized") 414 415 // Start Jetstream consumer for posts 416 // This consumer indexes posts created in community repositories via the firehose 417 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist 418 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL") 419 if postJetstreamURL == "" { 420 // Listen to post record creation events 421 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post" 422 } 423 424 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 425 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL) 426 427 go func() { 428 if startErr := postJetstreamConnector.Start(ctx); startErr != nil { 429 log.Printf("Post Jetstream consumer stopped: %v", startErr) 430 } 431 }() 432 433 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL) 434 log.Println(" - Indexing: social.coves.community.post CREATE operations") 435 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented") 436 437 // Start Jetstream consumer for aggregators 438 // This consumer indexes aggregator service declarations and authorization records 439 // Following Bluesky's pattern for feed generators and labelers 440 // NOTE: Uses the same Jetstream as communities, just filtering different collections 441 aggregatorJetstreamURL := communityJetstreamURL 442 // Override if specific URL needed for testing 443 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" { 444 aggregatorJetstreamURL = envURL 445 } else if aggregatorJetstreamURL == "" { 446 // Fallback if community URL also not set 447 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization" 448 } 449 450 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 451 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL) 452 453 go func() { 454 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil { 455 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr) 456 } 457 }() 458 459 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL) 460 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)") 461 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)") 462 463 // Start Jetstream consumer for votes 464 // This consumer indexes votes from user repositories and updates post vote counts 465 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL") 466 if voteJetstreamURL == "" { 467 // Listen to vote record CREATE/DELETE events from user repositories 468 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote" 469 } 470 471 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db) 472 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL) 473 474 go func() { 475 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil { 476 log.Printf("Vote Jetstream consumer stopped: %v", startErr) 477 } 478 }() 479 480 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL) 481 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations") 482 log.Println(" - Updating: Post vote counts atomically") 483 484 // Start Jetstream consumer for comments 485 // This consumer indexes comments from user repositories and updates parent counts 486 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL") 487 if commentJetstreamURL == "" { 488 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories 489 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.comment" 490 } 491 492 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 493 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL) 494 495 go func() { 496 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil { 497 log.Printf("Comment Jetstream consumer stopped: %v", startErr) 498 } 499 }() 500 501 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL) 502 log.Println(" - Indexing: social.coves.community.comment CREATE/UPDATE/DELETE operations") 503 log.Println(" - Updating: Post comment counts and comment reply counts atomically") 504 505 // Register XRPC routes 506 routes.RegisterUserRoutes(r, userService) 507 routes.RegisterCommunityRoutes(r, communityService, authMiddleware, allowedCommunityCreators) 508 log.Println("Community XRPC endpoints registered with OAuth authentication") 509 510 routes.RegisterPostRoutes(r, postService, authMiddleware) 511 log.Println("Post XRPC endpoints registered with OAuth authentication") 512 513 // Vote write endpoints removed - clients write directly to their PDS 514 // The AppView indexes votes from Jetstream (see vote consumer above) 515 516 routes.RegisterCommunityFeedRoutes(r, feedService) 517 log.Println("Feed XRPC endpoints registered (public, no auth required)") 518 519 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware) 520 log.Println("Timeline XRPC endpoints registered (requires authentication)") 521 522 routes.RegisterDiscoverRoutes(r, discoverService) 523 log.Println("Discover XRPC endpoints registered (public, no auth required)") 524 525 routes.RegisterAggregatorRoutes(r, aggregatorService, userService, identityResolver) 526 log.Println("Aggregator XRPC endpoints registered (query endpoints public, registration endpoint public)") 527 528 // Comment query API - supports optional authentication for viewer state 529 // Stricter rate limiting for expensive nested comment queries 530 commentRateLimiter := middleware.NewRateLimiter(20, 1*time.Minute) 531 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService) 532 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter) 533 r.Handle( 534 "/xrpc/social.coves.community.comment.getComments", 535 commentRateLimiter.Middleware( 536 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments), 537 ), 538 ) 539 log.Println("✅ Comment query API registered (20 req/min rate limit)") 540 log.Println(" - GET /xrpc/social.coves.community.comment.getComments") 541 542 // Configure allowed CORS origins for OAuth callback 543 // SECURITY: Never use wildcard "*" with credentials - only allow specific origins 544 var oauthAllowedOrigins []string 545 appviewPublicURL := os.Getenv("APPVIEW_PUBLIC_URL") 546 if appviewPublicURL == "" { 547 appviewPublicURL = "http://localhost:8080" 548 } 549 oauthAllowedOrigins = append(oauthAllowedOrigins, appviewPublicURL) 550 551 // In dev mode, also allow common localhost origins for testing 552 if oauthConfig.DevMode { 553 oauthAllowedOrigins = append(oauthAllowedOrigins, 554 "http://localhost:3000", 555 "http://localhost:3001", 556 "http://localhost:5173", 557 "http://127.0.0.1:8080", 558 "http://127.0.0.1:3000", 559 "http://127.0.0.1:3001", 560 "http://127.0.0.1:5173", 561 ) 562 log.Printf("🧪 DEV MODE: OAuth CORS allows localhost origins for testing") 563 } 564 log.Printf("OAuth CORS allowed origins: %v", oauthAllowedOrigins) 565 566 // Register OAuth routes for authentication flow 567 routes.RegisterOAuthRoutes(r, oauthHandler, oauthAllowedOrigins) 568 log.Println("✅ OAuth endpoints registered") 569 log.Println(" - GET /oauth/client-metadata.json") 570 log.Println(" - GET /oauth/jwks.json") 571 log.Println(" - GET /oauth/login") 572 log.Println(" - GET /oauth/mobile/login") 573 log.Println(" - GET /oauth/callback") 574 log.Println(" - POST /oauth/logout") 575 log.Println(" - POST /oauth/refresh") 576 577 // Register well-known routes for mobile app deep linking 578 routes.RegisterWellKnownRoutes(r) 579 log.Println("✅ Well-known endpoints registered (mobile Universal Links & App Links)") 580 log.Println(" - GET /.well-known/apple-app-site-association (iOS Universal Links)") 581 log.Println(" - GET /.well-known/assetlinks.json (Android App Links)") 582 583 // Health check endpoints 584 healthHandler := func(w http.ResponseWriter, r *http.Request) { 585 w.WriteHeader(http.StatusOK) 586 if _, err := w.Write([]byte("OK")); err != nil { 587 log.Printf("Failed to write health check response: %v", err) 588 } 589 } 590 r.Get("/health", healthHandler) 591 r.Get("/xrpc/_health", healthHandler) 592 593 // Check PORT first (docker-compose), then APPVIEW_PORT (legacy) 594 port := os.Getenv("PORT") 595 if port == "" { 596 port = os.Getenv("APPVIEW_PORT") 597 } 598 if port == "" { 599 port = "8080" 600 } 601 602 // Create HTTP server for graceful shutdown 603 server := &http.Server{ 604 Addr: ":" + port, 605 Handler: r, 606 } 607 608 // Channel to listen for shutdown signals 609 stop := make(chan os.Signal, 1) 610 signal.Notify(stop, os.Interrupt, syscall.SIGTERM) 611 612 // Start server in goroutine 613 go func() { 614 fmt.Printf("Coves AppView starting on port %s\n", port) 615 fmt.Printf("Default PDS: %s\n", defaultPDS) 616 if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { 617 log.Fatalf("Server error: %v", err) 618 } 619 }() 620 621 // Wait for shutdown signal 622 <-stop 623 log.Println("Shutting down server...") 624 625 // Graceful shutdown with timeout 626 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 627 defer cancel() 628 629 // Stop OAuth cleanup background job 630 cleanupCancel() 631 632 if err := server.Shutdown(ctx); err != nil { 633 log.Fatalf("Server shutdown error: %v", err) 634 } 635 log.Println("Server stopped gracefully") 636} 637 638// authenticateWithPDS creates a session on the PDS and returns an access token 639func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 640 type CreateSessionRequest struct { 641 Identifier string `json:"identifier"` 642 Password string `json:"password"` 643 } 644 645 type CreateSessionResponse struct { 646 DID string `json:"did"` 647 Handle string `json:"handle"` 648 AccessJwt string `json:"accessJwt"` 649 } 650 651 reqBody, err := json.Marshal(CreateSessionRequest{ 652 Identifier: handle, 653 Password: password, 654 }) 655 if err != nil { 656 return "", fmt.Errorf("failed to marshal request: %w", err) 657 } 658 659 resp, err := http.Post( 660 pdsURL+"/xrpc/com.atproto.server.createSession", 661 "application/json", 662 bytes.NewReader(reqBody), 663 ) 664 if err != nil { 665 return "", fmt.Errorf("failed to call PDS: %w", err) 666 } 667 defer func() { 668 if closeErr := resp.Body.Close(); closeErr != nil { 669 log.Printf("Failed to close response body: %v", closeErr) 670 } 671 }() 672 673 if resp.StatusCode != http.StatusOK { 674 body, readErr := io.ReadAll(resp.Body) 675 if readErr != nil { 676 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 677 } 678 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 679 } 680 681 var session CreateSessionResponse 682 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 683 return "", fmt.Errorf("failed to decode response: %w", err) 684 } 685 686 return session.AccessJwt, nil 687}