A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/api/routes" 6 "Coves/internal/atproto/identity" 7 "Coves/internal/atproto/jetstream" 8 "Coves/internal/atproto/oauth" 9 "Coves/internal/core/aggregators" 10 "Coves/internal/core/blobs" 11 "Coves/internal/core/comments" 12 "Coves/internal/core/communities" 13 "Coves/internal/core/communityFeeds" 14 "Coves/internal/core/discover" 15 "Coves/internal/core/posts" 16 "Coves/internal/core/timeline" 17 "Coves/internal/core/unfurl" 18 "Coves/internal/core/users" 19 "Coves/internal/core/votes" 20 "bytes" 21 "context" 22 "crypto/rand" 23 "database/sql" 24 "encoding/base64" 25 "encoding/json" 26 "fmt" 27 "io" 28 "log" 29 "net/http" 30 "os" 31 "os/signal" 32 "strings" 33 "syscall" 34 "time" 35 36 "github.com/go-chi/chi/v5" 37 chiMiddleware "github.com/go-chi/chi/v5/middleware" 38 _ "github.com/lib/pq" 39 "github.com/pressly/goose/v3" 40 41 commentsAPI "Coves/internal/api/handlers/comments" 42 43 postgresRepo "Coves/internal/db/postgres" 44) 45 46func main() { 47 // Database configuration (AppView database) 48 dbURL := os.Getenv("DATABASE_URL") 49 if dbURL == "" { 50 // Use dev database from .env.dev 51 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 52 } 53 54 // Default PDS URL for this Coves instance (supports self-hosting) 55 defaultPDS := os.Getenv("PDS_URL") 56 if defaultPDS == "" { 57 defaultPDS = "http://localhost:3001" // Local dev PDS 58 } 59 60 // Cursor secret for HMAC signing (prevents cursor manipulation) 61 cursorSecret := os.Getenv("CURSOR_SECRET") 62 if cursorSecret == "" { 63 // Generate a random secret if not set (dev mode) 64 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value 65 cursorSecret = "dev-cursor-secret-change-in-production" 66 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!") 67 } 68 69 db, err := sql.Open("postgres", dbURL) 70 if err != nil { 71 log.Fatal("Failed to connect to database:", err) 72 } 73 defer func() { 74 if closeErr := db.Close(); closeErr != nil { 75 log.Printf("Failed to close database connection: %v", closeErr) 76 } 77 }() 78 79 if err = db.Ping(); err != nil { 80 log.Fatal("Failed to ping database:", err) 81 } 82 83 log.Println("Connected to AppView database") 84 85 // Run migrations 86 if err = goose.SetDialect("postgres"); err != nil { 87 log.Fatal("Failed to set goose dialect:", err) 88 } 89 90 if err = goose.Up(db, "internal/db/migrations"); err != nil { 91 log.Fatal("Failed to run migrations:", err) 92 } 93 94 log.Println("Migrations completed successfully") 95 96 r := chi.NewRouter() 97 98 r.Use(chiMiddleware.Logger) 99 r.Use(chiMiddleware.Recoverer) 100 r.Use(chiMiddleware.RequestID) 101 102 // Rate limiting: 100 requests per minute per IP 103 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 104 r.Use(rateLimiter.Middleware) 105 106 // Initialize identity resolver 107 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 108 // directory as DID registration to ensure E2E tests work without hitting 109 // the production plc.directory 110 identityConfig := identity.DefaultConfig() 111 112 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 113 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 114 if plcDirectoryURL == "" { 115 plcDirectoryURL = "https://plc.directory" // Default to production PLC 116 } 117 118 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 119 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 120 if isDevEnv { 121 identityConfig.PLCURL = plcDirectoryURL 122 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 123 } else { 124 // Production: Allow separate IDENTITY_PLC_URL for read operations 125 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 126 identityConfig.PLCURL = identityPLCURL 127 } else { 128 identityConfig.PLCURL = plcDirectoryURL 129 } 130 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 131 } 132 133 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 134 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 135 identityConfig.CacheTTL = duration 136 } 137 } 138 139 identityResolver := identity.NewResolver(db, identityConfig) 140 141 // Get PLC URL for OAuth and other services 142 plcURL := os.Getenv("PLC_DIRECTORY_URL") 143 if plcURL == "" { 144 plcURL = "https://plc.directory" 145 } 146 147 // Initialize OAuth client for sealed session tokens 148 // Mobile apps authenticate via OAuth flow and receive sealed session tokens 149 // These tokens are encrypted references to OAuth sessions stored in the database 150 oauthSealSecret := os.Getenv("OAUTH_SEAL_SECRET") 151 if oauthSealSecret == "" { 152 if os.Getenv("IS_DEV_ENV") != "true" { 153 log.Fatal("OAUTH_SEAL_SECRET is required in production mode") 154 } 155 // Generate RANDOM secret for dev mode 156 randomBytes := make([]byte, 32) 157 if _, err := rand.Read(randomBytes); err != nil { 158 log.Fatal("Failed to generate random seal secret: ", err) 159 } 160 oauthSealSecret = base64.StdEncoding.EncodeToString(randomBytes) 161 log.Println("⚠️ DEV MODE: Generated random OAuth seal secret (won't persist across restarts)") 162 } 163 164 isDevMode := os.Getenv("IS_DEV_ENV") == "true" 165 oauthConfig := &oauth.OAuthConfig{ 166 PublicURL: os.Getenv("APPVIEW_PUBLIC_URL"), 167 SealSecret: oauthSealSecret, 168 Scopes: []string{"atproto", "transition:generic"}, 169 DevMode: isDevMode, 170 AllowPrivateIPs: isDevMode, // Allow private IPs only in dev mode 171 PLCURL: plcURL, 172 // SessionTTL and SealedTokenTTL will use defaults if not set (7 days and 14 days) 173 } 174 175 // Create PostgreSQL-backed OAuth session store (using default 7-day TTL) 176 baseOAuthStore := oauth.NewPostgresOAuthStore(db, 0) 177 // Wrap with MobileAwareStoreWrapper to capture OAuth state for mobile CSRF validation. 178 // This intercepts SaveAuthRequestInfo to save mobile CSRF data when present in context. 179 oauthStore := oauth.NewMobileAwareStoreWrapper(baseOAuthStore) 180 181 if oauthConfig.PublicURL == "" { 182 oauthConfig.PublicURL = "http://localhost:8080" 183 oauthConfig.DevMode = true // Force dev mode for localhost 184 } 185 186 oauthClient, err := oauth.NewOAuthClient(oauthConfig, oauthStore) 187 if err != nil { 188 log.Fatalf("Failed to initialize OAuth client: %v", err) 189 } 190 191 // Create OAuth handler for HTTP endpoints 192 oauthHandler := oauth.NewOAuthHandler(oauthClient, oauthStore) 193 194 // Create OAuth auth middleware 195 // Validates sealed session tokens and loads OAuth sessions from database 196 authMiddleware := middleware.NewOAuthAuthMiddleware(oauthClient, oauthStore) 197 log.Println("✅ OAuth auth middleware initialized (sealed session tokens)") 198 199 // Initialize repositories and services 200 userRepo := postgresRepo.NewUserRepository(db) 201 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 202 203 communityRepo := postgresRepo.NewCommunityRepository(db) 204 205 // V2.0: PDS-managed DID generation 206 // Community DIDs and keys are generated entirely by the PDS 207 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 208 209 instanceDID := os.Getenv("INSTANCE_DID") 210 if instanceDID == "" { 211 instanceDID = "did:web:coves.social" // Default for development 212 } 213 214 // V2: Extract instance domain for community handles 215 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 216 // We cannot allow arbitrary domains to prevent impersonation attacks 217 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 218 // 219 // SECURITY: did:web domain verification is implemented in the Jetstream consumer 220 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim() 221 // Communities with mismatched hostedBy domains are rejected during indexing 222 var instanceDomain string 223 if strings.HasPrefix(instanceDID, "did:web:") { 224 // Extract domain from did:web (this is the authoritative source) 225 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 226 } else { 227 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 228 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 229 if instanceDomain == "" { 230 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 231 } 232 } 233 234 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 235 236 // Community creation restriction - if set, only these DIDs can create communities 237 var allowedCommunityCreators []string 238 if communityCreators := os.Getenv("COMMUNITY_CREATORS"); communityCreators != "" { 239 for _, did := range strings.Split(communityCreators, ",") { 240 did = strings.TrimSpace(did) 241 if did != "" { 242 allowedCommunityCreators = append(allowedCommunityCreators, did) 243 } 244 } 245 log.Printf("Community creation restricted to %d DIDs", len(allowedCommunityCreators)) 246 } else { 247 log.Println("Community creation open to all authenticated users") 248 } 249 250 // V2.0: Initialize PDS account provisioner for communities (simplified) 251 // PDS handles all DID and key generation - no Coves-side cryptography needed 252 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 253 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 254 log.Printf(" - Communities will be created at: %s", defaultPDS) 255 log.Printf(" - PDS will generate and manage all DIDs and keys") 256 257 // Initialize community service (no longer needs didGenerator directly) 258 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 259 260 // Authenticate Coves instance with PDS to enable community record writes 261 // The instance needs a PDS account to write community records it owns 262 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 263 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 264 if pdsHandle != "" && pdsPassword != "" { 265 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 266 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 267 if authErr != nil { 268 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 269 log.Println("Community creation will fail until PDS authentication is configured") 270 } else { 271 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 272 svc.SetPDSAccessToken(accessToken) 273 log.Println("✓ Coves instance authenticated with PDS") 274 } 275 } 276 } else { 277 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 278 log.Println("Community creation via write-forward is disabled") 279 } 280 281 // Start Jetstream consumer for read-forward user indexing 282 jetstreamURL := os.Getenv("JETSTREAM_URL") 283 if jetstreamURL == "" { 284 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 285 } 286 287 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 288 289 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 290 ctx := context.Background() 291 go func() { 292 if startErr := userConsumer.Start(ctx); startErr != nil { 293 log.Printf("Jetstream consumer stopped: %v", startErr) 294 } 295 }() 296 297 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 298 299 // Start Jetstream consumer for community events (profiles and subscriptions) 300 // This consumer indexes: 301 // 1. Community profiles (social.coves.community.profile) - in community's own repo 302 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 303 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 304 if communityJetstreamURL == "" { 305 // Local Jetstream for communities - filter to our instance's collections 306 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 307 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 308 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 309 } 310 311 // Initialize community event consumer with did:web verification 312 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true" 313 if skipDIDWebVerification { 314 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)") 315 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production") 316 } 317 318 // Pass identity resolver to consumer for PLC handle resolution (source of truth) 319 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver) 320 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 321 322 go func() { 323 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 324 log.Printf("Community Jetstream consumer stopped: %v", startErr) 325 } 326 }() 327 328 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 329 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 330 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 331 332 // Start OAuth session cleanup background job with cancellable context 333 cleanupCtx, cleanupCancel := context.WithCancel(context.Background()) 334 go func() { 335 ticker := time.NewTicker(1 * time.Hour) 336 defer ticker.Stop() 337 for { 338 select { 339 case <-cleanupCtx.Done(): 340 log.Println("OAuth cleanup job stopped") 341 return 342 case <-ticker.C: 343 // Check if store implements cleanup methods 344 // Use UnwrapPostgresStore to get the underlying store from the wrapper 345 if cleanupStore := oauthStore.UnwrapPostgresStore(); cleanupStore != nil { 346 sessions, sessErr := cleanupStore.CleanupExpiredSessions(cleanupCtx) 347 if sessErr != nil { 348 log.Printf("Error cleaning up expired OAuth sessions: %v", sessErr) 349 } 350 requests, reqErr := cleanupStore.CleanupExpiredAuthRequests(cleanupCtx) 351 if reqErr != nil { 352 log.Printf("Error cleaning up expired OAuth auth requests: %v", reqErr) 353 } 354 if sessions > 0 || requests > 0 { 355 log.Printf("OAuth cleanup: removed %d expired sessions, %d expired auth requests", sessions, requests) 356 } 357 } 358 } 359 } 360 }() 361 362 log.Println("Started OAuth session cleanup background job (runs hourly)") 363 364 // Initialize aggregator service 365 aggregatorRepo := postgresRepo.NewAggregatorRepository(db) 366 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 367 log.Println("✅ Aggregator service initialized") 368 369 // Initialize unfurl cache repository 370 unfurlRepo := unfurl.NewRepository(db) 371 372 // Initialize blob upload service 373 blobService := blobs.NewBlobService(defaultPDS) 374 375 // Initialize unfurl service with configuration 376 unfurlService := unfurl.NewService( 377 unfurlRepo, 378 unfurl.WithTimeout(10*time.Second), 379 unfurl.WithUserAgent("CovesBot/1.0 (+https://coves.social)"), 380 unfurl.WithCacheTTL(24*time.Hour), 381 ) 382 log.Println("✅ Unfurl and blob services initialized") 383 384 // Initialize post service (with aggregator support) 385 postRepo := postgresRepo.NewPostRepository(db) 386 postService := posts.NewPostService(postRepo, communityService, aggregatorService, blobService, unfurlService, defaultPDS) 387 388 // Initialize vote repository (used by Jetstream consumer for indexing) 389 voteRepo := postgresRepo.NewVoteRepository(db) 390 log.Println("✅ Vote repository initialized (Jetstream indexing only)") 391 392 // Initialize comment repository (used by Jetstream consumer for indexing) 393 commentRepo := postgresRepo.NewCommentRepository(db) 394 log.Println("✅ Comment repository initialized (Jetstream indexing only)") 395 396 // Initialize vote service (for XRPC API endpoints) 397 // Note: We don't validate subject existence - the vote goes to the user's PDS regardless. 398 // The Jetstream consumer handles orphaned votes correctly by only updating counts for 399 // non-deleted subjects. This avoids race conditions and eventual consistency issues. 400 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil) 401 log.Println("✅ Vote service initialized (with OAuth authentication)") 402 403 // Initialize comment service (for query API) 404 // Requires user and community repos for proper author/community hydration per lexicon 405 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo) 406 log.Println("✅ Comment service initialized (with author/community hydration)") 407 408 // Initialize feed service 409 feedRepo := postgresRepo.NewCommunityFeedRepository(db, cursorSecret) 410 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 411 log.Println("✅ Feed service initialized") 412 413 // Initialize timeline service (home feed from subscribed communities) 414 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret) 415 timelineService := timeline.NewTimelineService(timelineRepo) 416 log.Println("✅ Timeline service initialized") 417 418 // Initialize discover service (public feed from all communities) 419 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret) 420 discoverService := discover.NewDiscoverService(discoverRepo) 421 log.Println("✅ Discover service initialized") 422 423 // Start Jetstream consumer for posts 424 // This consumer indexes posts created in community repositories via the firehose 425 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist 426 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL") 427 if postJetstreamURL == "" { 428 // Listen to post record creation events 429 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post" 430 } 431 432 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 433 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL) 434 435 go func() { 436 if startErr := postJetstreamConnector.Start(ctx); startErr != nil { 437 log.Printf("Post Jetstream consumer stopped: %v", startErr) 438 } 439 }() 440 441 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL) 442 log.Println(" - Indexing: social.coves.community.post CREATE operations") 443 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented") 444 445 // Start Jetstream consumer for aggregators 446 // This consumer indexes aggregator service declarations and authorization records 447 // Following Bluesky's pattern for feed generators and labelers 448 // NOTE: Uses the same Jetstream as communities, just filtering different collections 449 aggregatorJetstreamURL := communityJetstreamURL 450 // Override if specific URL needed for testing 451 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" { 452 aggregatorJetstreamURL = envURL 453 } else if aggregatorJetstreamURL == "" { 454 // Fallback if community URL also not set 455 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization" 456 } 457 458 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 459 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL) 460 461 go func() { 462 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil { 463 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr) 464 } 465 }() 466 467 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL) 468 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)") 469 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)") 470 471 // Start Jetstream consumer for votes 472 // This consumer indexes votes from user repositories and updates post vote counts 473 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL") 474 if voteJetstreamURL == "" { 475 // Listen to vote record CREATE/DELETE events from user repositories 476 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote" 477 } 478 479 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db) 480 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL) 481 482 go func() { 483 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil { 484 log.Printf("Vote Jetstream consumer stopped: %v", startErr) 485 } 486 }() 487 488 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL) 489 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations") 490 log.Println(" - Updating: Post vote counts atomically") 491 492 // Start Jetstream consumer for comments 493 // This consumer indexes comments from user repositories and updates parent counts 494 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL") 495 if commentJetstreamURL == "" { 496 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories 497 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.comment" 498 } 499 500 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 501 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL) 502 503 go func() { 504 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil { 505 log.Printf("Comment Jetstream consumer stopped: %v", startErr) 506 } 507 }() 508 509 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL) 510 log.Println(" - Indexing: social.coves.community.comment CREATE/UPDATE/DELETE operations") 511 log.Println(" - Updating: Post comment counts and comment reply counts atomically") 512 513 // Register XRPC routes 514 routes.RegisterUserRoutes(r, userService) 515 routes.RegisterCommunityRoutes(r, communityService, authMiddleware, allowedCommunityCreators) 516 log.Println("Community XRPC endpoints registered with OAuth authentication") 517 518 routes.RegisterPostRoutes(r, postService, authMiddleware) 519 log.Println("Post XRPC endpoints registered with OAuth authentication") 520 521 routes.RegisterVoteRoutes(r, voteService, authMiddleware) 522 log.Println("Vote XRPC endpoints registered with OAuth authentication") 523 524 routes.RegisterCommunityFeedRoutes(r, feedService) 525 log.Println("Feed XRPC endpoints registered (public, no auth required)") 526 527 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware) 528 log.Println("Timeline XRPC endpoints registered (requires authentication)") 529 530 routes.RegisterDiscoverRoutes(r, discoverService) 531 log.Println("Discover XRPC endpoints registered (public, no auth required)") 532 533 routes.RegisterAggregatorRoutes(r, aggregatorService, userService, identityResolver) 534 log.Println("Aggregator XRPC endpoints registered (query endpoints public, registration endpoint public)") 535 536 // Comment query API - supports optional authentication for viewer state 537 // Stricter rate limiting for expensive nested comment queries 538 commentRateLimiter := middleware.NewRateLimiter(20, 1*time.Minute) 539 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService) 540 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter) 541 r.Handle( 542 "/xrpc/social.coves.community.comment.getComments", 543 commentRateLimiter.Middleware( 544 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments), 545 ), 546 ) 547 log.Println("✅ Comment query API registered (20 req/min rate limit)") 548 log.Println(" - GET /xrpc/social.coves.community.comment.getComments") 549 550 // Configure allowed CORS origins for OAuth callback 551 // SECURITY: Never use wildcard "*" with credentials - only allow specific origins 552 var oauthAllowedOrigins []string 553 appviewPublicURL := os.Getenv("APPVIEW_PUBLIC_URL") 554 if appviewPublicURL == "" { 555 appviewPublicURL = "http://localhost:8080" 556 } 557 oauthAllowedOrigins = append(oauthAllowedOrigins, appviewPublicURL) 558 559 // In dev mode, also allow common localhost origins for testing 560 if oauthConfig.DevMode { 561 oauthAllowedOrigins = append(oauthAllowedOrigins, 562 "http://localhost:3000", 563 "http://localhost:3001", 564 "http://localhost:5173", 565 "http://127.0.0.1:8080", 566 "http://127.0.0.1:3000", 567 "http://127.0.0.1:3001", 568 "http://127.0.0.1:5173", 569 ) 570 log.Printf("🧪 DEV MODE: OAuth CORS allows localhost origins for testing") 571 } 572 log.Printf("OAuth CORS allowed origins: %v", oauthAllowedOrigins) 573 574 // Register OAuth routes for authentication flow 575 routes.RegisterOAuthRoutes(r, oauthHandler, oauthAllowedOrigins) 576 log.Println("✅ OAuth endpoints registered") 577 log.Println(" - GET /oauth/client-metadata.json") 578 log.Println(" - GET /oauth/jwks.json") 579 log.Println(" - GET /oauth/login") 580 log.Println(" - GET /oauth/mobile/login") 581 log.Println(" - GET /oauth/callback") 582 log.Println(" - POST /oauth/logout") 583 log.Println(" - POST /oauth/refresh") 584 585 // Register well-known routes for mobile app deep linking 586 routes.RegisterWellKnownRoutes(r) 587 log.Println("✅ Well-known endpoints registered (mobile Universal Links & App Links)") 588 log.Println(" - GET /.well-known/apple-app-site-association (iOS Universal Links)") 589 log.Println(" - GET /.well-known/assetlinks.json (Android App Links)") 590 591 // Health check endpoints 592 healthHandler := func(w http.ResponseWriter, r *http.Request) { 593 w.WriteHeader(http.StatusOK) 594 if _, err := w.Write([]byte("OK")); err != nil { 595 log.Printf("Failed to write health check response: %v", err) 596 } 597 } 598 r.Get("/health", healthHandler) 599 r.Get("/xrpc/_health", healthHandler) 600 601 // Check PORT first (docker-compose), then APPVIEW_PORT (legacy) 602 port := os.Getenv("PORT") 603 if port == "" { 604 port = os.Getenv("APPVIEW_PORT") 605 } 606 if port == "" { 607 port = "8080" 608 } 609 610 // Create HTTP server for graceful shutdown 611 server := &http.Server{ 612 Addr: ":" + port, 613 Handler: r, 614 } 615 616 // Channel to listen for shutdown signals 617 stop := make(chan os.Signal, 1) 618 signal.Notify(stop, os.Interrupt, syscall.SIGTERM) 619 620 // Start server in goroutine 621 go func() { 622 fmt.Printf("Coves AppView starting on port %s\n", port) 623 fmt.Printf("Default PDS: %s\n", defaultPDS) 624 if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { 625 log.Fatalf("Server error: %v", err) 626 } 627 }() 628 629 // Wait for shutdown signal 630 <-stop 631 log.Println("Shutting down server...") 632 633 // Graceful shutdown with timeout 634 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 635 defer cancel() 636 637 // Stop OAuth cleanup background job 638 cleanupCancel() 639 640 if err := server.Shutdown(ctx); err != nil { 641 log.Fatalf("Server shutdown error: %v", err) 642 } 643 log.Println("Server stopped gracefully") 644} 645 646// authenticateWithPDS creates a session on the PDS and returns an access token 647func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 648 type CreateSessionRequest struct { 649 Identifier string `json:"identifier"` 650 Password string `json:"password"` 651 } 652 653 type CreateSessionResponse struct { 654 DID string `json:"did"` 655 Handle string `json:"handle"` 656 AccessJwt string `json:"accessJwt"` 657 } 658 659 reqBody, err := json.Marshal(CreateSessionRequest{ 660 Identifier: handle, 661 Password: password, 662 }) 663 if err != nil { 664 return "", fmt.Errorf("failed to marshal request: %w", err) 665 } 666 667 resp, err := http.Post( 668 pdsURL+"/xrpc/com.atproto.server.createSession", 669 "application/json", 670 bytes.NewReader(reqBody), 671 ) 672 if err != nil { 673 return "", fmt.Errorf("failed to call PDS: %w", err) 674 } 675 defer func() { 676 if closeErr := resp.Body.Close(); closeErr != nil { 677 log.Printf("Failed to close response body: %v", closeErr) 678 } 679 }() 680 681 if resp.StatusCode != http.StatusOK { 682 body, readErr := io.ReadAll(resp.Body) 683 if readErr != nil { 684 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 685 } 686 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 687 } 688 689 var session CreateSessionResponse 690 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 691 return "", fmt.Errorf("failed to decode response: %w", err) 692 } 693 694 return session.AccessJwt, nil 695}