A community based topic aggregation platform built on atproto
at main 29 kB view raw
1package main 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/api/routes" 6 "Coves/internal/atproto/identity" 7 "Coves/internal/atproto/jetstream" 8 "Coves/internal/atproto/oauth" 9 "Coves/internal/core/aggregators" 10 "Coves/internal/core/blobs" 11 "Coves/internal/core/comments" 12 "Coves/internal/core/communities" 13 "Coves/internal/core/communityFeeds" 14 "Coves/internal/core/discover" 15 "Coves/internal/core/posts" 16 "Coves/internal/core/timeline" 17 "Coves/internal/core/unfurl" 18 "Coves/internal/core/users" 19 "Coves/internal/core/votes" 20 "bytes" 21 "context" 22 "crypto/rand" 23 "database/sql" 24 "encoding/base64" 25 "encoding/json" 26 "fmt" 27 "io" 28 "log" 29 "net/http" 30 "os" 31 "os/signal" 32 "strings" 33 "syscall" 34 "time" 35 36 "github.com/go-chi/chi/v5" 37 chiMiddleware "github.com/go-chi/chi/v5/middleware" 38 _ "github.com/lib/pq" 39 "github.com/pressly/goose/v3" 40 41 commentsAPI "Coves/internal/api/handlers/comments" 42 43 postgresRepo "Coves/internal/db/postgres" 44) 45 46func main() { 47 // Database configuration (AppView database) 48 dbURL := os.Getenv("DATABASE_URL") 49 if dbURL == "" { 50 // Use dev database from .env.dev 51 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 52 } 53 54 // Default PDS URL for this Coves instance (supports self-hosting) 55 defaultPDS := os.Getenv("PDS_URL") 56 if defaultPDS == "" { 57 defaultPDS = "http://localhost:3001" // Local dev PDS 58 } 59 60 // Cursor secret for HMAC signing (prevents cursor manipulation) 61 cursorSecret := os.Getenv("CURSOR_SECRET") 62 if cursorSecret == "" { 63 // Generate a random secret if not set (dev mode) 64 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value 65 cursorSecret = "dev-cursor-secret-change-in-production" 66 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!") 67 } 68 69 db, err := sql.Open("postgres", dbURL) 70 if err != nil { 71 log.Fatal("Failed to connect to database:", err) 72 } 73 defer func() { 74 if closeErr := db.Close(); closeErr != nil { 75 log.Printf("Failed to close database connection: %v", closeErr) 76 } 77 }() 78 79 if err = db.Ping(); err != nil { 80 log.Fatal("Failed to ping database:", err) 81 } 82 83 log.Println("Connected to AppView database") 84 85 // Run migrations 86 if err = goose.SetDialect("postgres"); err != nil { 87 log.Fatal("Failed to set goose dialect:", err) 88 } 89 90 if err = goose.Up(db, "internal/db/migrations"); err != nil { 91 log.Fatal("Failed to run migrations:", err) 92 } 93 94 log.Println("Migrations completed successfully") 95 96 r := chi.NewRouter() 97 98 r.Use(chiMiddleware.Logger) 99 r.Use(chiMiddleware.Recoverer) 100 r.Use(chiMiddleware.RequestID) 101 102 // Rate limiting: 100 requests per minute per IP 103 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 104 r.Use(rateLimiter.Middleware) 105 106 // Initialize identity resolver 107 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 108 // directory as DID registration to ensure E2E tests work without hitting 109 // the production plc.directory 110 identityConfig := identity.DefaultConfig() 111 112 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 113 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 114 if plcDirectoryURL == "" { 115 plcDirectoryURL = "https://plc.directory" // Default to production PLC 116 } 117 118 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 119 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 120 if isDevEnv { 121 identityConfig.PLCURL = plcDirectoryURL 122 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 123 } else { 124 // Production: Allow separate IDENTITY_PLC_URL for read operations 125 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 126 identityConfig.PLCURL = identityPLCURL 127 } else { 128 identityConfig.PLCURL = plcDirectoryURL 129 } 130 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 131 } 132 133 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 134 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 135 identityConfig.CacheTTL = duration 136 } 137 } 138 139 identityResolver := identity.NewResolver(db, identityConfig) 140 141 // Get PLC URL for OAuth and other services 142 plcURL := os.Getenv("PLC_DIRECTORY_URL") 143 if plcURL == "" { 144 plcURL = "https://plc.directory" 145 } 146 log.Printf("🔐 OAuth will use PLC directory: %s", plcURL) 147 148 // Initialize OAuth client for sealed session tokens 149 // Mobile apps authenticate via OAuth flow and receive sealed session tokens 150 // These tokens are encrypted references to OAuth sessions stored in the database 151 oauthSealSecret := os.Getenv("OAUTH_SEAL_SECRET") 152 if oauthSealSecret == "" { 153 if os.Getenv("IS_DEV_ENV") != "true" { 154 log.Fatal("OAUTH_SEAL_SECRET is required in production mode") 155 } 156 // Generate RANDOM secret for dev mode 157 randomBytes := make([]byte, 32) 158 if _, err := rand.Read(randomBytes); err != nil { 159 log.Fatal("Failed to generate random seal secret: ", err) 160 } 161 oauthSealSecret = base64.StdEncoding.EncodeToString(randomBytes) 162 log.Println("⚠️ DEV MODE: Generated random OAuth seal secret (won't persist across restarts)") 163 } 164 165 isDevMode := os.Getenv("IS_DEV_ENV") == "true" 166 pdsURL := os.Getenv("PDS_URL") // For dev mode: resolve handles via local PDS 167 oauthConfig := &oauth.OAuthConfig{ 168 PublicURL: os.Getenv("APPVIEW_PUBLIC_URL"), 169 SealSecret: oauthSealSecret, 170 Scopes: []string{"atproto", "transition:generic"}, 171 DevMode: isDevMode, 172 AllowPrivateIPs: isDevMode, // Allow private IPs only in dev mode 173 PLCURL: plcURL, 174 PDSURL: pdsURL, // For dev mode handle resolution 175 // SessionTTL and SealedTokenTTL will use defaults if not set (7 days and 14 days) 176 } 177 178 // Create PostgreSQL-backed OAuth session store (using default 7-day TTL) 179 baseOAuthStore := oauth.NewPostgresOAuthStore(db, 0) 180 // Wrap with MobileAwareStoreWrapper to capture OAuth state for mobile CSRF validation. 181 // This intercepts SaveAuthRequestInfo to save mobile CSRF data when present in context. 182 oauthStore := oauth.NewMobileAwareStoreWrapper(baseOAuthStore) 183 184 if oauthConfig.PublicURL == "" { 185 oauthConfig.PublicURL = "http://localhost:8080" 186 oauthConfig.DevMode = true // Force dev mode for localhost 187 } 188 189 oauthClient, err := oauth.NewOAuthClient(oauthConfig, oauthStore) 190 if err != nil { 191 log.Fatalf("Failed to initialize OAuth client: %v", err) 192 } 193 194 // Create OAuth handler for HTTP endpoints 195 oauthHandler := oauth.NewOAuthHandler(oauthClient, oauthStore) 196 197 // Create OAuth auth middleware 198 // Validates sealed session tokens and loads OAuth sessions from database 199 authMiddleware := middleware.NewOAuthAuthMiddleware(oauthClient, oauthStore) 200 log.Println("✅ OAuth auth middleware initialized (sealed session tokens)") 201 202 // Initialize repositories and services 203 userRepo := postgresRepo.NewUserRepository(db) 204 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 205 206 communityRepo := postgresRepo.NewCommunityRepository(db) 207 208 // V2.0: PDS-managed DID generation 209 // Community DIDs and keys are generated entirely by the PDS 210 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 211 212 instanceDID := os.Getenv("INSTANCE_DID") 213 if instanceDID == "" { 214 instanceDID = "did:web:coves.social" // Default for development 215 } 216 217 // V2: Extract instance domain for community handles 218 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 219 // We cannot allow arbitrary domains to prevent impersonation attacks 220 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 221 // 222 // SECURITY: did:web domain verification is implemented in the Jetstream consumer 223 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim() 224 // Communities with mismatched hostedBy domains are rejected during indexing 225 var instanceDomain string 226 if strings.HasPrefix(instanceDID, "did:web:") { 227 // Extract domain from did:web (this is the authoritative source) 228 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 229 } else { 230 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 231 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 232 if instanceDomain == "" { 233 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 234 } 235 } 236 237 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 238 239 // Community creation restriction - if set, only these DIDs can create communities 240 var allowedCommunityCreators []string 241 if communityCreators := os.Getenv("COMMUNITY_CREATORS"); communityCreators != "" { 242 for _, did := range strings.Split(communityCreators, ",") { 243 did = strings.TrimSpace(did) 244 if did != "" { 245 allowedCommunityCreators = append(allowedCommunityCreators, did) 246 } 247 } 248 log.Printf("Community creation restricted to %d DIDs", len(allowedCommunityCreators)) 249 } else { 250 log.Println("Community creation open to all authenticated users") 251 } 252 253 // V2.0: Initialize PDS account provisioner for communities (simplified) 254 // PDS handles all DID and key generation - no Coves-side cryptography needed 255 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 256 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 257 log.Printf(" - Communities will be created at: %s", defaultPDS) 258 log.Printf(" - PDS will generate and manage all DIDs and keys") 259 260 // Initialize community service (no longer needs didGenerator directly) 261 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 262 263 // Authenticate Coves instance with PDS to enable community record writes 264 // The instance needs a PDS account to write community records it owns 265 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 266 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 267 if pdsHandle != "" && pdsPassword != "" { 268 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 269 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 270 if authErr != nil { 271 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 272 log.Println("Community creation will fail until PDS authentication is configured") 273 } else { 274 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 275 svc.SetPDSAccessToken(accessToken) 276 log.Println("✓ Coves instance authenticated with PDS") 277 } 278 } 279 } else { 280 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 281 log.Println("Community creation via write-forward is disabled") 282 } 283 284 // Start Jetstream consumer for read-forward user indexing 285 jetstreamURL := os.Getenv("JETSTREAM_URL") 286 if jetstreamURL == "" { 287 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 288 } 289 290 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 291 292 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 293 ctx := context.Background() 294 go func() { 295 if startErr := userConsumer.Start(ctx); startErr != nil { 296 log.Printf("Jetstream consumer stopped: %v", startErr) 297 } 298 }() 299 300 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 301 302 // Start Jetstream consumer for community events (profiles and subscriptions) 303 // This consumer indexes: 304 // 1. Community profiles (social.coves.community.profile) - in community's own repo 305 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 306 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 307 if communityJetstreamURL == "" { 308 // Local Jetstream for communities - filter to our instance's collections 309 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 310 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 311 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 312 } 313 314 // Initialize community event consumer with did:web verification 315 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true" 316 if skipDIDWebVerification { 317 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)") 318 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production") 319 } 320 321 // Pass identity resolver to consumer for PLC handle resolution (source of truth) 322 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver) 323 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 324 325 go func() { 326 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 327 log.Printf("Community Jetstream consumer stopped: %v", startErr) 328 } 329 }() 330 331 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 332 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 333 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 334 335 // Start OAuth session cleanup background job with cancellable context 336 cleanupCtx, cleanupCancel := context.WithCancel(context.Background()) 337 go func() { 338 ticker := time.NewTicker(1 * time.Hour) 339 defer ticker.Stop() 340 for { 341 select { 342 case <-cleanupCtx.Done(): 343 log.Println("OAuth cleanup job stopped") 344 return 345 case <-ticker.C: 346 // Check if store implements cleanup methods 347 // Use UnwrapPostgresStore to get the underlying store from the wrapper 348 if cleanupStore := oauthStore.UnwrapPostgresStore(); cleanupStore != nil { 349 sessions, sessErr := cleanupStore.CleanupExpiredSessions(cleanupCtx) 350 if sessErr != nil { 351 log.Printf("Error cleaning up expired OAuth sessions: %v", sessErr) 352 } 353 requests, reqErr := cleanupStore.CleanupExpiredAuthRequests(cleanupCtx) 354 if reqErr != nil { 355 log.Printf("Error cleaning up expired OAuth auth requests: %v", reqErr) 356 } 357 if sessions > 0 || requests > 0 { 358 log.Printf("OAuth cleanup: removed %d expired sessions, %d expired auth requests", sessions, requests) 359 } 360 } 361 } 362 } 363 }() 364 365 log.Println("Started OAuth session cleanup background job (runs hourly)") 366 367 // Initialize aggregator service 368 aggregatorRepo := postgresRepo.NewAggregatorRepository(db) 369 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 370 log.Println("✅ Aggregator service initialized") 371 372 // Initialize unfurl cache repository 373 unfurlRepo := unfurl.NewRepository(db) 374 375 // Initialize blob upload service 376 blobService := blobs.NewBlobService(defaultPDS) 377 378 // Initialize unfurl service with configuration 379 unfurlService := unfurl.NewService( 380 unfurlRepo, 381 unfurl.WithTimeout(10*time.Second), 382 unfurl.WithUserAgent("CovesBot/1.0 (+https://coves.social)"), 383 unfurl.WithCacheTTL(24*time.Hour), 384 ) 385 log.Println("✅ Unfurl and blob services initialized") 386 387 // Initialize post service (with aggregator support) 388 postRepo := postgresRepo.NewPostRepository(db) 389 postService := posts.NewPostService(postRepo, communityService, aggregatorService, blobService, unfurlService, defaultPDS) 390 391 // Initialize vote repository (used by Jetstream consumer for indexing) 392 voteRepo := postgresRepo.NewVoteRepository(db) 393 log.Println("✅ Vote repository initialized (Jetstream indexing only)") 394 395 // Initialize comment repository (used by Jetstream consumer for indexing) 396 commentRepo := postgresRepo.NewCommentRepository(db) 397 log.Println("✅ Comment repository initialized (Jetstream indexing only)") 398 399 // Initialize vote cache (stores user votes from PDS to avoid eventual consistency issues) 400 // TTL of 10 minutes - cache is also updated on vote create/delete 401 voteCache := votes.NewVoteCache(10*time.Minute, nil) 402 log.Println("✅ Vote cache initialized (10 minute TTL)") 403 404 // Initialize vote service (for XRPC API endpoints) 405 // Note: We don't validate subject existence - the vote goes to the user's PDS regardless. 406 // The Jetstream consumer handles orphaned votes correctly by only updating counts for 407 // non-deleted subjects. This avoids race conditions and eventual consistency issues. 408 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, voteCache, nil) 409 log.Println("✅ Vote service initialized (with OAuth authentication and vote cache)") 410 411 // Initialize comment service (for query and write APIs) 412 // Requires user and community repos for proper author/community hydration per lexicon 413 // OAuth client and store are needed for write operations (create, update, delete) 414 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo, oauthClient, oauthStore, nil) 415 log.Println("✅ Comment service initialized (with author/community hydration and write support)") 416 417 // Initialize feed service 418 feedRepo := postgresRepo.NewCommunityFeedRepository(db, cursorSecret) 419 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 420 log.Println("✅ Feed service initialized") 421 422 // Initialize timeline service (home feed from subscribed communities) 423 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret) 424 timelineService := timeline.NewTimelineService(timelineRepo) 425 log.Println("✅ Timeline service initialized") 426 427 // Initialize discover service (public feed from all communities) 428 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret) 429 discoverService := discover.NewDiscoverService(discoverRepo) 430 log.Println("✅ Discover service initialized") 431 432 // Start Jetstream consumer for posts 433 // This consumer indexes posts created in community repositories via the firehose 434 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist 435 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL") 436 if postJetstreamURL == "" { 437 // Listen to post record creation events 438 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post" 439 } 440 441 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 442 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL) 443 444 go func() { 445 if startErr := postJetstreamConnector.Start(ctx); startErr != nil { 446 log.Printf("Post Jetstream consumer stopped: %v", startErr) 447 } 448 }() 449 450 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL) 451 log.Println(" - Indexing: social.coves.community.post CREATE operations") 452 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented") 453 454 // Start Jetstream consumer for aggregators 455 // This consumer indexes aggregator service declarations and authorization records 456 // Following Bluesky's pattern for feed generators and labelers 457 // NOTE: Uses the same Jetstream as communities, just filtering different collections 458 aggregatorJetstreamURL := communityJetstreamURL 459 // Override if specific URL needed for testing 460 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" { 461 aggregatorJetstreamURL = envURL 462 } else if aggregatorJetstreamURL == "" { 463 // Fallback if community URL also not set 464 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization" 465 } 466 467 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 468 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL) 469 470 go func() { 471 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil { 472 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr) 473 } 474 }() 475 476 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL) 477 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)") 478 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)") 479 480 // Start Jetstream consumer for votes 481 // This consumer indexes votes from user repositories and updates post vote counts 482 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL") 483 if voteJetstreamURL == "" { 484 // Listen to vote record CREATE/DELETE events from user repositories 485 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote" 486 } 487 488 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db) 489 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL) 490 491 go func() { 492 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil { 493 log.Printf("Vote Jetstream consumer stopped: %v", startErr) 494 } 495 }() 496 497 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL) 498 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations") 499 log.Println(" - Updating: Post vote counts atomically") 500 501 // Start Jetstream consumer for comments 502 // This consumer indexes comments from user repositories and updates parent counts 503 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL") 504 if commentJetstreamURL == "" { 505 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories 506 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.comment" 507 } 508 509 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 510 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL) 511 512 go func() { 513 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil { 514 log.Printf("Comment Jetstream consumer stopped: %v", startErr) 515 } 516 }() 517 518 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL) 519 log.Println(" - Indexing: social.coves.community.comment CREATE/UPDATE/DELETE operations") 520 log.Println(" - Updating: Post comment counts and comment reply counts atomically") 521 522 // Register XRPC routes 523 routes.RegisterUserRoutes(r, userService) 524 routes.RegisterCommunityRoutes(r, communityService, authMiddleware, allowedCommunityCreators) 525 log.Println("Community XRPC endpoints registered with OAuth authentication") 526 527 routes.RegisterPostRoutes(r, postService, authMiddleware) 528 log.Println("Post XRPC endpoints registered with OAuth authentication") 529 530 routes.RegisterVoteRoutes(r, voteService, authMiddleware) 531 log.Println("Vote XRPC endpoints registered with OAuth authentication") 532 533 // Register comment write routes (create, update, delete) 534 routes.RegisterCommentRoutes(r, commentService, authMiddleware) 535 log.Println("Comment write XRPC endpoints registered") 536 log.Println(" - POST /xrpc/social.coves.community.comment.create") 537 log.Println(" - POST /xrpc/social.coves.community.comment.update") 538 log.Println(" - POST /xrpc/social.coves.community.comment.delete") 539 540 routes.RegisterCommunityFeedRoutes(r, feedService, voteService, authMiddleware) 541 log.Println("Feed XRPC endpoints registered (public with optional auth for viewer vote state)") 542 543 routes.RegisterTimelineRoutes(r, timelineService, voteService, authMiddleware) 544 log.Println("Timeline XRPC endpoints registered (requires authentication, includes viewer vote state)") 545 546 routes.RegisterDiscoverRoutes(r, discoverService, voteService, authMiddleware) 547 log.Println("Discover XRPC endpoints registered (public with optional auth for viewer vote state)") 548 549 routes.RegisterAggregatorRoutes(r, aggregatorService, userService, identityResolver) 550 log.Println("Aggregator XRPC endpoints registered (query endpoints public, registration endpoint public)") 551 552 // Comment query API - supports optional authentication for viewer state 553 // Stricter rate limiting for expensive nested comment queries 554 commentRateLimiter := middleware.NewRateLimiter(20, 1*time.Minute) 555 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService) 556 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter) 557 r.Handle( 558 "/xrpc/social.coves.community.comment.getComments", 559 commentRateLimiter.Middleware( 560 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments), 561 ), 562 ) 563 log.Println("✅ Comment query API registered (20 req/min rate limit)") 564 log.Println(" - GET /xrpc/social.coves.community.comment.getComments") 565 566 // Configure allowed CORS origins for OAuth callback 567 // SECURITY: Never use wildcard "*" with credentials - only allow specific origins 568 var oauthAllowedOrigins []string 569 appviewPublicURL := os.Getenv("APPVIEW_PUBLIC_URL") 570 if appviewPublicURL == "" { 571 appviewPublicURL = "http://localhost:8080" 572 } 573 oauthAllowedOrigins = append(oauthAllowedOrigins, appviewPublicURL) 574 575 // In dev mode, also allow common localhost origins for testing 576 if oauthConfig.DevMode { 577 oauthAllowedOrigins = append(oauthAllowedOrigins, 578 "http://localhost:3000", 579 "http://localhost:3001", 580 "http://localhost:5173", 581 "http://127.0.0.1:8080", 582 "http://127.0.0.1:3000", 583 "http://127.0.0.1:3001", 584 "http://127.0.0.1:5173", 585 ) 586 log.Printf("🧪 DEV MODE: OAuth CORS allows localhost origins for testing") 587 } 588 log.Printf("OAuth CORS allowed origins: %v", oauthAllowedOrigins) 589 590 // Register OAuth routes for authentication flow 591 routes.RegisterOAuthRoutes(r, oauthHandler, oauthAllowedOrigins) 592 log.Println("✅ OAuth endpoints registered") 593 log.Println(" - GET /oauth/client-metadata.json") 594 log.Println(" - GET /oauth/jwks.json") 595 log.Println(" - GET /oauth/login") 596 log.Println(" - GET /oauth/mobile/login") 597 log.Println(" - GET /oauth/callback") 598 log.Println(" - POST /oauth/logout") 599 log.Println(" - POST /oauth/refresh") 600 601 // Register well-known routes for mobile app deep linking 602 routes.RegisterWellKnownRoutes(r) 603 log.Println("✅ Well-known endpoints registered (mobile Universal Links & App Links)") 604 log.Println(" - GET /.well-known/apple-app-site-association (iOS Universal Links)") 605 log.Println(" - GET /.well-known/assetlinks.json (Android App Links)") 606 607 // Health check endpoints 608 healthHandler := func(w http.ResponseWriter, r *http.Request) { 609 w.WriteHeader(http.StatusOK) 610 if _, err := w.Write([]byte("OK")); err != nil { 611 log.Printf("Failed to write health check response: %v", err) 612 } 613 } 614 r.Get("/health", healthHandler) 615 r.Get("/xrpc/_health", healthHandler) 616 617 // Check PORT first (docker-compose), then APPVIEW_PORT (legacy) 618 port := os.Getenv("PORT") 619 if port == "" { 620 port = os.Getenv("APPVIEW_PORT") 621 } 622 if port == "" { 623 port = "8080" 624 } 625 626 // Create HTTP server for graceful shutdown 627 server := &http.Server{ 628 Addr: ":" + port, 629 Handler: r, 630 } 631 632 // Channel to listen for shutdown signals 633 stop := make(chan os.Signal, 1) 634 signal.Notify(stop, os.Interrupt, syscall.SIGTERM) 635 636 // Start server in goroutine 637 go func() { 638 fmt.Printf("Coves AppView starting on port %s\n", port) 639 fmt.Printf("Default PDS: %s\n", defaultPDS) 640 if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { 641 log.Fatalf("Server error: %v", err) 642 } 643 }() 644 645 // Wait for shutdown signal 646 <-stop 647 log.Println("Shutting down server...") 648 649 // Graceful shutdown with timeout 650 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 651 defer cancel() 652 653 // Stop OAuth cleanup background job 654 cleanupCancel() 655 656 if err := server.Shutdown(ctx); err != nil { 657 log.Fatalf("Server shutdown error: %v", err) 658 } 659 log.Println("Server stopped gracefully") 660} 661 662// authenticateWithPDS creates a session on the PDS and returns an access token 663func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 664 type CreateSessionRequest struct { 665 Identifier string `json:"identifier"` 666 Password string `json:"password"` 667 } 668 669 type CreateSessionResponse struct { 670 DID string `json:"did"` 671 Handle string `json:"handle"` 672 AccessJwt string `json:"accessJwt"` 673 } 674 675 reqBody, err := json.Marshal(CreateSessionRequest{ 676 Identifier: handle, 677 Password: password, 678 }) 679 if err != nil { 680 return "", fmt.Errorf("failed to marshal request: %w", err) 681 } 682 683 resp, err := http.Post( 684 pdsURL+"/xrpc/com.atproto.server.createSession", 685 "application/json", 686 bytes.NewReader(reqBody), 687 ) 688 if err != nil { 689 return "", fmt.Errorf("failed to call PDS: %w", err) 690 } 691 defer func() { 692 if closeErr := resp.Body.Close(); closeErr != nil { 693 log.Printf("Failed to close response body: %v", closeErr) 694 } 695 }() 696 697 if resp.StatusCode != http.StatusOK { 698 body, readErr := io.ReadAll(resp.Body) 699 if readErr != nil { 700 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 701 } 702 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 703 } 704 705 var session CreateSessionResponse 706 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 707 return "", fmt.Errorf("failed to decode response: %w", err) 708 } 709 710 return session.AccessJwt, nil 711}