A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/api/routes" 6 "Coves/internal/atproto/identity" 7 "Coves/internal/atproto/jetstream" 8 "Coves/internal/atproto/oauth" 9 "Coves/internal/core/aggregators" 10 "Coves/internal/core/blobs" 11 "Coves/internal/core/comments" 12 "Coves/internal/core/communities" 13 "Coves/internal/core/communityFeeds" 14 "Coves/internal/core/discover" 15 "Coves/internal/core/posts" 16 "Coves/internal/core/timeline" 17 "Coves/internal/core/unfurl" 18 "Coves/internal/core/users" 19 "bytes" 20 "context" 21 "crypto/rand" 22 "database/sql" 23 "encoding/base64" 24 "encoding/json" 25 "fmt" 26 "io" 27 "log" 28 "net/http" 29 "os" 30 "os/signal" 31 "strings" 32 "syscall" 33 "time" 34 35 "github.com/go-chi/chi/v5" 36 chiMiddleware "github.com/go-chi/chi/v5/middleware" 37 _ "github.com/lib/pq" 38 "github.com/pressly/goose/v3" 39 40 commentsAPI "Coves/internal/api/handlers/comments" 41 42 postgresRepo "Coves/internal/db/postgres" 43) 44 45func main() { 46 // Database configuration (AppView database) 47 dbURL := os.Getenv("DATABASE_URL") 48 if dbURL == "" { 49 // Use dev database from .env.dev 50 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 51 } 52 53 // Default PDS URL for this Coves instance (supports self-hosting) 54 defaultPDS := os.Getenv("PDS_URL") 55 if defaultPDS == "" { 56 defaultPDS = "http://localhost:3001" // Local dev PDS 57 } 58 59 // Cursor secret for HMAC signing (prevents cursor manipulation) 60 cursorSecret := os.Getenv("CURSOR_SECRET") 61 if cursorSecret == "" { 62 // Generate a random secret if not set (dev mode) 63 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value 64 cursorSecret = "dev-cursor-secret-change-in-production" 65 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!") 66 } 67 68 db, err := sql.Open("postgres", dbURL) 69 if err != nil { 70 log.Fatal("Failed to connect to database:", err) 71 } 72 defer func() { 73 if closeErr := db.Close(); closeErr != nil { 74 log.Printf("Failed to close database connection: %v", closeErr) 75 } 76 }() 77 78 if err = db.Ping(); err != nil { 79 log.Fatal("Failed to ping database:", err) 80 } 81 82 log.Println("Connected to AppView database") 83 84 // Run migrations 85 if err = goose.SetDialect("postgres"); err != nil { 86 log.Fatal("Failed to set goose dialect:", err) 87 } 88 89 if err = goose.Up(db, "internal/db/migrations"); err != nil { 90 log.Fatal("Failed to run migrations:", err) 91 } 92 93 log.Println("Migrations completed successfully") 94 95 r := chi.NewRouter() 96 97 r.Use(chiMiddleware.Logger) 98 r.Use(chiMiddleware.Recoverer) 99 r.Use(chiMiddleware.RequestID) 100 101 // Rate limiting: 100 requests per minute per IP 102 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 103 r.Use(rateLimiter.Middleware) 104 105 // Initialize identity resolver 106 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 107 // directory as DID registration to ensure E2E tests work without hitting 108 // the production plc.directory 109 identityConfig := identity.DefaultConfig() 110 111 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 112 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 113 if plcDirectoryURL == "" { 114 plcDirectoryURL = "https://plc.directory" // Default to production PLC 115 } 116 117 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 118 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 119 if isDevEnv { 120 identityConfig.PLCURL = plcDirectoryURL 121 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 122 } else { 123 // Production: Allow separate IDENTITY_PLC_URL for read operations 124 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 125 identityConfig.PLCURL = identityPLCURL 126 } else { 127 identityConfig.PLCURL = plcDirectoryURL 128 } 129 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 130 } 131 132 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 133 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 134 identityConfig.CacheTTL = duration 135 } 136 } 137 138 identityResolver := identity.NewResolver(db, identityConfig) 139 140 // Get PLC URL for OAuth and other services 141 plcURL := os.Getenv("PLC_DIRECTORY_URL") 142 if plcURL == "" { 143 plcURL = "https://plc.directory" 144 } 145 146 // Initialize OAuth client for sealed session tokens 147 // Mobile apps authenticate via OAuth flow and receive sealed session tokens 148 // These tokens are encrypted references to OAuth sessions stored in the database 149 oauthSealSecret := os.Getenv("OAUTH_SEAL_SECRET") 150 if oauthSealSecret == "" { 151 if os.Getenv("IS_DEV_ENV") != "true" { 152 log.Fatal("OAUTH_SEAL_SECRET is required in production mode") 153 } 154 // Generate RANDOM secret for dev mode 155 randomBytes := make([]byte, 32) 156 if _, err := rand.Read(randomBytes); err != nil { 157 log.Fatal("Failed to generate random seal secret: ", err) 158 } 159 oauthSealSecret = base64.StdEncoding.EncodeToString(randomBytes) 160 log.Println("⚠️ DEV MODE: Generated random OAuth seal secret (won't persist across restarts)") 161 } 162 163 isDevMode := os.Getenv("IS_DEV_ENV") == "true" 164 oauthConfig := &oauth.OAuthConfig{ 165 PublicURL: os.Getenv("APPVIEW_PUBLIC_URL"), 166 SealSecret: oauthSealSecret, 167 Scopes: []string{"atproto", "transition:generic"}, 168 DevMode: isDevMode, 169 AllowPrivateIPs: isDevMode, // Allow private IPs only in dev mode 170 PLCURL: plcURL, 171 // SessionTTL and SealedTokenTTL will use defaults if not set (7 days and 14 days) 172 } 173 174 // Create PostgreSQL-backed OAuth session store (using default 7-day TTL) 175 baseOAuthStore := oauth.NewPostgresOAuthStore(db, 0) 176 // Wrap with MobileAwareStoreWrapper to capture OAuth state for mobile CSRF validation. 177 // This intercepts SaveAuthRequestInfo to save mobile CSRF data when present in context. 178 oauthStore := oauth.NewMobileAwareStoreWrapper(baseOAuthStore) 179 180 if oauthConfig.PublicURL == "" { 181 oauthConfig.PublicURL = "http://localhost:8080" 182 oauthConfig.DevMode = true // Force dev mode for localhost 183 } 184 185 // Optional: confidential client secret for production 186 oauthConfig.ClientSecret = os.Getenv("OAUTH_CLIENT_SECRET") 187 oauthConfig.ClientKID = os.Getenv("OAUTH_CLIENT_KID") 188 189 oauthClient, err := oauth.NewOAuthClient(oauthConfig, oauthStore) 190 if err != nil { 191 log.Fatalf("Failed to initialize OAuth client: %v", err) 192 } 193 194 // Create OAuth handler for HTTP endpoints 195 oauthHandler := oauth.NewOAuthHandler(oauthClient, oauthStore) 196 197 // Create OAuth auth middleware 198 // Validates sealed session tokens and loads OAuth sessions from database 199 authMiddleware := middleware.NewOAuthAuthMiddleware(oauthClient, oauthStore) 200 log.Println("✅ OAuth auth middleware initialized (sealed session tokens)") 201 202 // Initialize repositories and services 203 userRepo := postgresRepo.NewUserRepository(db) 204 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 205 206 communityRepo := postgresRepo.NewCommunityRepository(db) 207 208 // V2.0: PDS-managed DID generation 209 // Community DIDs and keys are generated entirely by the PDS 210 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 211 212 instanceDID := os.Getenv("INSTANCE_DID") 213 if instanceDID == "" { 214 instanceDID = "did:web:coves.social" // Default for development 215 } 216 217 // V2: Extract instance domain for community handles 218 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 219 // We cannot allow arbitrary domains to prevent impersonation attacks 220 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 221 // 222 // SECURITY: did:web domain verification is implemented in the Jetstream consumer 223 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim() 224 // Communities with mismatched hostedBy domains are rejected during indexing 225 var instanceDomain string 226 if strings.HasPrefix(instanceDID, "did:web:") { 227 // Extract domain from did:web (this is the authoritative source) 228 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 229 } else { 230 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 231 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 232 if instanceDomain == "" { 233 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 234 } 235 } 236 237 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 238 239 // Community creation restriction - if set, only these DIDs can create communities 240 var allowedCommunityCreators []string 241 if communityCreators := os.Getenv("COMMUNITY_CREATORS"); communityCreators != "" { 242 for _, did := range strings.Split(communityCreators, ",") { 243 did = strings.TrimSpace(did) 244 if did != "" { 245 allowedCommunityCreators = append(allowedCommunityCreators, did) 246 } 247 } 248 log.Printf("Community creation restricted to %d DIDs", len(allowedCommunityCreators)) 249 } else { 250 log.Println("Community creation open to all authenticated users") 251 } 252 253 // V2.0: Initialize PDS account provisioner for communities (simplified) 254 // PDS handles all DID and key generation - no Coves-side cryptography needed 255 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 256 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 257 log.Printf(" - Communities will be created at: %s", defaultPDS) 258 log.Printf(" - PDS will generate and manage all DIDs and keys") 259 260 // Initialize community service (no longer needs didGenerator directly) 261 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 262 263 // Authenticate Coves instance with PDS to enable community record writes 264 // The instance needs a PDS account to write community records it owns 265 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 266 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 267 if pdsHandle != "" && pdsPassword != "" { 268 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 269 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 270 if authErr != nil { 271 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 272 log.Println("Community creation will fail until PDS authentication is configured") 273 } else { 274 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 275 svc.SetPDSAccessToken(accessToken) 276 log.Println("✓ Coves instance authenticated with PDS") 277 } 278 } 279 } else { 280 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 281 log.Println("Community creation via write-forward is disabled") 282 } 283 284 // Start Jetstream consumer for read-forward user indexing 285 jetstreamURL := os.Getenv("JETSTREAM_URL") 286 if jetstreamURL == "" { 287 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 288 } 289 290 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 291 292 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 293 ctx := context.Background() 294 go func() { 295 if startErr := userConsumer.Start(ctx); startErr != nil { 296 log.Printf("Jetstream consumer stopped: %v", startErr) 297 } 298 }() 299 300 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 301 302 // Start Jetstream consumer for community events (profiles and subscriptions) 303 // This consumer indexes: 304 // 1. Community profiles (social.coves.community.profile) - in community's own repo 305 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 306 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 307 if communityJetstreamURL == "" { 308 // Local Jetstream for communities - filter to our instance's collections 309 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 310 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 311 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 312 } 313 314 // Initialize community event consumer with did:web verification 315 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true" 316 if skipDIDWebVerification { 317 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)") 318 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production") 319 } 320 321 // Pass identity resolver to consumer for PLC handle resolution (source of truth) 322 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver) 323 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 324 325 go func() { 326 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 327 log.Printf("Community Jetstream consumer stopped: %v", startErr) 328 } 329 }() 330 331 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 332 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 333 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 334 335 // Start OAuth session cleanup background job with cancellable context 336 cleanupCtx, cleanupCancel := context.WithCancel(context.Background()) 337 go func() { 338 ticker := time.NewTicker(1 * time.Hour) 339 defer ticker.Stop() 340 for { 341 select { 342 case <-cleanupCtx.Done(): 343 log.Println("OAuth cleanup job stopped") 344 return 345 case <-ticker.C: 346 // Check if store implements cleanup methods 347 // Use UnwrapPostgresStore to get the underlying store from the wrapper 348 if cleanupStore := oauthStore.UnwrapPostgresStore(); cleanupStore != nil { 349 sessions, sessErr := cleanupStore.CleanupExpiredSessions(cleanupCtx) 350 if sessErr != nil { 351 log.Printf("Error cleaning up expired OAuth sessions: %v", sessErr) 352 } 353 requests, reqErr := cleanupStore.CleanupExpiredAuthRequests(cleanupCtx) 354 if reqErr != nil { 355 log.Printf("Error cleaning up expired OAuth auth requests: %v", reqErr) 356 } 357 if sessions > 0 || requests > 0 { 358 log.Printf("OAuth cleanup: removed %d expired sessions, %d expired auth requests", sessions, requests) 359 } 360 } 361 } 362 } 363 }() 364 365 log.Println("Started OAuth session cleanup background job (runs hourly)") 366 367 // Initialize aggregator service 368 aggregatorRepo := postgresRepo.NewAggregatorRepository(db) 369 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 370 log.Println("✅ Aggregator service initialized") 371 372 // Initialize unfurl cache repository 373 unfurlRepo := unfurl.NewRepository(db) 374 375 // Initialize blob upload service 376 blobService := blobs.NewBlobService(defaultPDS) 377 378 // Initialize unfurl service with configuration 379 unfurlService := unfurl.NewService( 380 unfurlRepo, 381 unfurl.WithTimeout(10*time.Second), 382 unfurl.WithUserAgent("CovesBot/1.0 (+https://coves.social)"), 383 unfurl.WithCacheTTL(24*time.Hour), 384 ) 385 log.Println("✅ Unfurl and blob services initialized") 386 387 // Initialize post service (with aggregator support) 388 postRepo := postgresRepo.NewPostRepository(db) 389 postService := posts.NewPostService(postRepo, communityService, aggregatorService, blobService, unfurlService, defaultPDS) 390 391 // Initialize vote repository (used by Jetstream consumer for indexing) 392 voteRepo := postgresRepo.NewVoteRepository(db) 393 log.Println("✅ Vote repository initialized (Jetstream indexing only)") 394 395 // Initialize comment repository (used by Jetstream consumer for indexing) 396 commentRepo := postgresRepo.NewCommentRepository(db) 397 log.Println("✅ Comment repository initialized (Jetstream indexing only)") 398 399 // Initialize comment service (for query API) 400 // Requires user and community repos for proper author/community hydration per lexicon 401 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo) 402 log.Println("✅ Comment service initialized (with author/community hydration)") 403 404 // Initialize feed service 405 feedRepo := postgresRepo.NewCommunityFeedRepository(db, cursorSecret) 406 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 407 log.Println("✅ Feed service initialized") 408 409 // Initialize timeline service (home feed from subscribed communities) 410 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret) 411 timelineService := timeline.NewTimelineService(timelineRepo) 412 log.Println("✅ Timeline service initialized") 413 414 // Initialize discover service (public feed from all communities) 415 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret) 416 discoverService := discover.NewDiscoverService(discoverRepo) 417 log.Println("✅ Discover service initialized") 418 419 // Start Jetstream consumer for posts 420 // This consumer indexes posts created in community repositories via the firehose 421 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist 422 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL") 423 if postJetstreamURL == "" { 424 // Listen to post record creation events 425 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post" 426 } 427 428 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 429 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL) 430 431 go func() { 432 if startErr := postJetstreamConnector.Start(ctx); startErr != nil { 433 log.Printf("Post Jetstream consumer stopped: %v", startErr) 434 } 435 }() 436 437 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL) 438 log.Println(" - Indexing: social.coves.community.post CREATE operations") 439 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented") 440 441 // Start Jetstream consumer for aggregators 442 // This consumer indexes aggregator service declarations and authorization records 443 // Following Bluesky's pattern for feed generators and labelers 444 // NOTE: Uses the same Jetstream as communities, just filtering different collections 445 aggregatorJetstreamURL := communityJetstreamURL 446 // Override if specific URL needed for testing 447 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" { 448 aggregatorJetstreamURL = envURL 449 } else if aggregatorJetstreamURL == "" { 450 // Fallback if community URL also not set 451 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization" 452 } 453 454 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 455 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL) 456 457 go func() { 458 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil { 459 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr) 460 } 461 }() 462 463 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL) 464 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)") 465 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)") 466 467 // Start Jetstream consumer for votes 468 // This consumer indexes votes from user repositories and updates post vote counts 469 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL") 470 if voteJetstreamURL == "" { 471 // Listen to vote record CREATE/DELETE events from user repositories 472 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote" 473 } 474 475 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db) 476 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL) 477 478 go func() { 479 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil { 480 log.Printf("Vote Jetstream consumer stopped: %v", startErr) 481 } 482 }() 483 484 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL) 485 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations") 486 log.Println(" - Updating: Post vote counts atomically") 487 488 // Start Jetstream consumer for comments 489 // This consumer indexes comments from user repositories and updates parent counts 490 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL") 491 if commentJetstreamURL == "" { 492 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories 493 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.comment" 494 } 495 496 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 497 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL) 498 499 go func() { 500 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil { 501 log.Printf("Comment Jetstream consumer stopped: %v", startErr) 502 } 503 }() 504 505 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL) 506 log.Println(" - Indexing: social.coves.community.comment CREATE/UPDATE/DELETE operations") 507 log.Println(" - Updating: Post comment counts and comment reply counts atomically") 508 509 // Register XRPC routes 510 routes.RegisterUserRoutes(r, userService) 511 routes.RegisterCommunityRoutes(r, communityService, authMiddleware, allowedCommunityCreators) 512 log.Println("Community XRPC endpoints registered with OAuth authentication") 513 514 routes.RegisterPostRoutes(r, postService, authMiddleware) 515 log.Println("Post XRPC endpoints registered with OAuth authentication") 516 517 // Vote write endpoints removed - clients write directly to their PDS 518 // The AppView indexes votes from Jetstream (see vote consumer above) 519 520 routes.RegisterCommunityFeedRoutes(r, feedService) 521 log.Println("Feed XRPC endpoints registered (public, no auth required)") 522 523 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware) 524 log.Println("Timeline XRPC endpoints registered (requires authentication)") 525 526 routes.RegisterDiscoverRoutes(r, discoverService) 527 log.Println("Discover XRPC endpoints registered (public, no auth required)") 528 529 routes.RegisterAggregatorRoutes(r, aggregatorService, userService, identityResolver) 530 log.Println("Aggregator XRPC endpoints registered (query endpoints public, registration endpoint public)") 531 532 // Comment query API - supports optional authentication for viewer state 533 // Stricter rate limiting for expensive nested comment queries 534 commentRateLimiter := middleware.NewRateLimiter(20, 1*time.Minute) 535 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService) 536 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter) 537 r.Handle( 538 "/xrpc/social.coves.community.comment.getComments", 539 commentRateLimiter.Middleware( 540 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments), 541 ), 542 ) 543 log.Println("✅ Comment query API registered (20 req/min rate limit)") 544 log.Println(" - GET /xrpc/social.coves.community.comment.getComments") 545 546 // Configure allowed CORS origins for OAuth callback 547 // SECURITY: Never use wildcard "*" with credentials - only allow specific origins 548 var oauthAllowedOrigins []string 549 appviewPublicURL := os.Getenv("APPVIEW_PUBLIC_URL") 550 if appviewPublicURL == "" { 551 appviewPublicURL = "http://localhost:8080" 552 } 553 oauthAllowedOrigins = append(oauthAllowedOrigins, appviewPublicURL) 554 555 // In dev mode, also allow common localhost origins for testing 556 if oauthConfig.DevMode { 557 oauthAllowedOrigins = append(oauthAllowedOrigins, 558 "http://localhost:3000", 559 "http://localhost:3001", 560 "http://localhost:5173", 561 "http://127.0.0.1:8080", 562 "http://127.0.0.1:3000", 563 "http://127.0.0.1:3001", 564 "http://127.0.0.1:5173", 565 ) 566 log.Printf("🧪 DEV MODE: OAuth CORS allows localhost origins for testing") 567 } 568 log.Printf("OAuth CORS allowed origins: %v", oauthAllowedOrigins) 569 570 // Register OAuth routes for authentication flow 571 routes.RegisterOAuthRoutes(r, oauthHandler, oauthAllowedOrigins) 572 log.Println("✅ OAuth endpoints registered") 573 log.Println(" - GET /oauth/client-metadata.json") 574 log.Println(" - GET /oauth/jwks.json") 575 log.Println(" - GET /oauth/login") 576 log.Println(" - GET /oauth/mobile/login") 577 log.Println(" - GET /oauth/callback") 578 log.Println(" - POST /oauth/logout") 579 log.Println(" - POST /oauth/refresh") 580 581 // Register well-known routes for mobile app deep linking 582 routes.RegisterWellKnownRoutes(r) 583 log.Println("✅ Well-known endpoints registered (mobile Universal Links & App Links)") 584 log.Println(" - GET /.well-known/apple-app-site-association (iOS Universal Links)") 585 log.Println(" - GET /.well-known/assetlinks.json (Android App Links)") 586 587 // Health check endpoints 588 healthHandler := func(w http.ResponseWriter, r *http.Request) { 589 w.WriteHeader(http.StatusOK) 590 if _, err := w.Write([]byte("OK")); err != nil { 591 log.Printf("Failed to write health check response: %v", err) 592 } 593 } 594 r.Get("/health", healthHandler) 595 r.Get("/xrpc/_health", healthHandler) 596 597 // Check PORT first (docker-compose), then APPVIEW_PORT (legacy) 598 port := os.Getenv("PORT") 599 if port == "" { 600 port = os.Getenv("APPVIEW_PORT") 601 } 602 if port == "" { 603 port = "8080" 604 } 605 606 // Create HTTP server for graceful shutdown 607 server := &http.Server{ 608 Addr: ":" + port, 609 Handler: r, 610 } 611 612 // Channel to listen for shutdown signals 613 stop := make(chan os.Signal, 1) 614 signal.Notify(stop, os.Interrupt, syscall.SIGTERM) 615 616 // Start server in goroutine 617 go func() { 618 fmt.Printf("Coves AppView starting on port %s\n", port) 619 fmt.Printf("Default PDS: %s\n", defaultPDS) 620 if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { 621 log.Fatalf("Server error: %v", err) 622 } 623 }() 624 625 // Wait for shutdown signal 626 <-stop 627 log.Println("Shutting down server...") 628 629 // Graceful shutdown with timeout 630 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 631 defer cancel() 632 633 // Stop OAuth cleanup background job 634 cleanupCancel() 635 636 if err := server.Shutdown(ctx); err != nil { 637 log.Fatalf("Server shutdown error: %v", err) 638 } 639 log.Println("Server stopped gracefully") 640} 641 642// authenticateWithPDS creates a session on the PDS and returns an access token 643func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 644 type CreateSessionRequest struct { 645 Identifier string `json:"identifier"` 646 Password string `json:"password"` 647 } 648 649 type CreateSessionResponse struct { 650 DID string `json:"did"` 651 Handle string `json:"handle"` 652 AccessJwt string `json:"accessJwt"` 653 } 654 655 reqBody, err := json.Marshal(CreateSessionRequest{ 656 Identifier: handle, 657 Password: password, 658 }) 659 if err != nil { 660 return "", fmt.Errorf("failed to marshal request: %w", err) 661 } 662 663 resp, err := http.Post( 664 pdsURL+"/xrpc/com.atproto.server.createSession", 665 "application/json", 666 bytes.NewReader(reqBody), 667 ) 668 if err != nil { 669 return "", fmt.Errorf("failed to call PDS: %w", err) 670 } 671 defer func() { 672 if closeErr := resp.Body.Close(); closeErr != nil { 673 log.Printf("Failed to close response body: %v", closeErr) 674 } 675 }() 676 677 if resp.StatusCode != http.StatusOK { 678 body, readErr := io.ReadAll(resp.Body) 679 if readErr != nil { 680 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 681 } 682 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 683 } 684 685 var session CreateSessionResponse 686 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 687 return "", fmt.Errorf("failed to decode response: %w", err) 688 } 689 690 return session.AccessJwt, nil 691}