A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/api/routes" 6 "Coves/internal/atproto/auth" 7 "Coves/internal/atproto/identity" 8 "Coves/internal/atproto/jetstream" 9 "Coves/internal/core/aggregators" 10 "Coves/internal/core/blobs" 11 "Coves/internal/core/comments" 12 "Coves/internal/core/communities" 13 "Coves/internal/core/communityFeeds" 14 "Coves/internal/core/discover" 15 "Coves/internal/core/posts" 16 "Coves/internal/core/timeline" 17 "Coves/internal/core/unfurl" 18 "Coves/internal/core/users" 19 "bytes" 20 "context" 21 "database/sql" 22 "encoding/json" 23 "fmt" 24 "io" 25 "log" 26 "net/http" 27 "os" 28 "os/signal" 29 "strings" 30 "syscall" 31 "time" 32 33 "github.com/go-chi/chi/v5" 34 chiMiddleware "github.com/go-chi/chi/v5/middleware" 35 _ "github.com/lib/pq" 36 "github.com/pressly/goose/v3" 37 38 commentsAPI "Coves/internal/api/handlers/comments" 39 40 postgresRepo "Coves/internal/db/postgres" 41 42 indigoIdentity "github.com/bluesky-social/indigo/atproto/identity" 43) 44 45func main() { 46 // Database configuration (AppView database) 47 dbURL := os.Getenv("DATABASE_URL") 48 if dbURL == "" { 49 // Use dev database from .env.dev 50 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 51 } 52 53 // Default PDS URL for this Coves instance (supports self-hosting) 54 defaultPDS := os.Getenv("PDS_URL") 55 if defaultPDS == "" { 56 defaultPDS = "http://localhost:3001" // Local dev PDS 57 } 58 59 // Cursor secret for HMAC signing (prevents cursor manipulation) 60 cursorSecret := os.Getenv("CURSOR_SECRET") 61 if cursorSecret == "" { 62 // Generate a random secret if not set (dev mode) 63 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value 64 cursorSecret = "dev-cursor-secret-change-in-production" 65 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!") 66 } 67 68 db, err := sql.Open("postgres", dbURL) 69 if err != nil { 70 log.Fatal("Failed to connect to database:", err) 71 } 72 defer func() { 73 if closeErr := db.Close(); closeErr != nil { 74 log.Printf("Failed to close database connection: %v", closeErr) 75 } 76 }() 77 78 if err = db.Ping(); err != nil { 79 log.Fatal("Failed to ping database:", err) 80 } 81 82 log.Println("Connected to AppView database") 83 84 // Run migrations 85 if err = goose.SetDialect("postgres"); err != nil { 86 log.Fatal("Failed to set goose dialect:", err) 87 } 88 89 if err = goose.Up(db, "internal/db/migrations"); err != nil { 90 log.Fatal("Failed to run migrations:", err) 91 } 92 93 log.Println("Migrations completed successfully") 94 95 r := chi.NewRouter() 96 97 r.Use(chiMiddleware.Logger) 98 r.Use(chiMiddleware.Recoverer) 99 r.Use(chiMiddleware.RequestID) 100 101 // Rate limiting: 100 requests per minute per IP 102 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 103 r.Use(rateLimiter.Middleware) 104 105 // Initialize identity resolver 106 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 107 // directory as DID registration to ensure E2E tests work without hitting 108 // the production plc.directory 109 identityConfig := identity.DefaultConfig() 110 111 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 112 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 113 if plcDirectoryURL == "" { 114 plcDirectoryURL = "https://plc.directory" // Default to production PLC 115 } 116 117 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 118 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 119 if isDevEnv { 120 identityConfig.PLCURL = plcDirectoryURL 121 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 122 } else { 123 // Production: Allow separate IDENTITY_PLC_URL for read operations 124 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 125 identityConfig.PLCURL = identityPLCURL 126 } else { 127 identityConfig.PLCURL = plcDirectoryURL 128 } 129 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 130 } 131 132 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 133 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 134 identityConfig.CacheTTL = duration 135 } 136 } 137 138 identityResolver := identity.NewResolver(db, identityConfig) 139 140 // Initialize atProto auth middleware for JWT validation 141 // Phase 1: Set skipVerify=true to test JWT parsing only 142 // Phase 2: Set skipVerify=false to enable full signature verification 143 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true" 144 if skipVerify { 145 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)") 146 log.Println(" Set AUTH_SKIP_VERIFY=false for production") 147 } 148 149 // Initialize Indigo directory for DID resolution (used by auth) 150 plcURL := os.Getenv("PLC_DIRECTORY_URL") 151 if plcURL == "" { 152 plcURL = "https://plc.directory" 153 } 154 indigoDir := &indigoIdentity.BaseDirectory{ 155 PLCURL: plcURL, 156 HTTPClient: http.Client{Timeout: 10 * time.Second}, 157 } 158 159 // Initialize JWT config early to cache HS256_ISSUERS and PDS_JWT_SECRET 160 // This avoids reading env vars on every request 161 auth.InitJWTConfig() 162 163 // Create combined key fetcher for both DID and URL issuers 164 // - DID issuers (did:plc:, did:web:) → resolved via DID document keys (ES256) 165 // - URL issuers → JWKS endpoint (fallback for legacy tokens) 166 jwksCacheTTL := 1 * time.Hour 167 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL) 168 keyFetcher := auth.NewCombinedKeyFetcher(indigoDir, jwksFetcher) 169 170 authMiddleware := middleware.NewAtProtoAuthMiddleware(keyFetcher, skipVerify) 171 log.Println("✅ atProto auth middleware initialized (DID + JWKS key resolution)") 172 173 // Initialize repositories and services 174 userRepo := postgresRepo.NewUserRepository(db) 175 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 176 177 communityRepo := postgresRepo.NewCommunityRepository(db) 178 179 // V2.0: PDS-managed DID generation 180 // Community DIDs and keys are generated entirely by the PDS 181 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 182 183 instanceDID := os.Getenv("INSTANCE_DID") 184 if instanceDID == "" { 185 instanceDID = "did:web:coves.social" // Default for development 186 } 187 188 // V2: Extract instance domain for community handles 189 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 190 // We cannot allow arbitrary domains to prevent impersonation attacks 191 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 192 // 193 // SECURITY: did:web domain verification is implemented in the Jetstream consumer 194 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim() 195 // Communities with mismatched hostedBy domains are rejected during indexing 196 var instanceDomain string 197 if strings.HasPrefix(instanceDID, "did:web:") { 198 // Extract domain from did:web (this is the authoritative source) 199 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 200 } else { 201 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 202 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 203 if instanceDomain == "" { 204 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 205 } 206 } 207 208 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 209 210 // Community creation restriction - if set, only these DIDs can create communities 211 var allowedCommunityCreators []string 212 if communityCreators := os.Getenv("COMMUNITY_CREATORS"); communityCreators != "" { 213 for _, did := range strings.Split(communityCreators, ",") { 214 did = strings.TrimSpace(did) 215 if did != "" { 216 allowedCommunityCreators = append(allowedCommunityCreators, did) 217 } 218 } 219 log.Printf("Community creation restricted to %d DIDs", len(allowedCommunityCreators)) 220 } else { 221 log.Println("Community creation open to all authenticated users") 222 } 223 224 // V2.0: Initialize PDS account provisioner for communities (simplified) 225 // PDS handles all DID and key generation - no Coves-side cryptography needed 226 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 227 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 228 log.Printf(" - Communities will be created at: %s", defaultPDS) 229 log.Printf(" - PDS will generate and manage all DIDs and keys") 230 231 // Initialize community service (no longer needs didGenerator directly) 232 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 233 234 // Authenticate Coves instance with PDS to enable community record writes 235 // The instance needs a PDS account to write community records it owns 236 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 237 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 238 if pdsHandle != "" && pdsPassword != "" { 239 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 240 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 241 if authErr != nil { 242 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 243 log.Println("Community creation will fail until PDS authentication is configured") 244 } else { 245 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 246 svc.SetPDSAccessToken(accessToken) 247 log.Println("✓ Coves instance authenticated with PDS") 248 } 249 } 250 } else { 251 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 252 log.Println("Community creation via write-forward is disabled") 253 } 254 255 // Start Jetstream consumer for read-forward user indexing 256 jetstreamURL := os.Getenv("JETSTREAM_URL") 257 if jetstreamURL == "" { 258 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 259 } 260 261 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 262 263 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 264 ctx := context.Background() 265 go func() { 266 if startErr := userConsumer.Start(ctx); startErr != nil { 267 log.Printf("Jetstream consumer stopped: %v", startErr) 268 } 269 }() 270 271 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 272 273 // Start Jetstream consumer for community events (profiles and subscriptions) 274 // This consumer indexes: 275 // 1. Community profiles (social.coves.community.profile) - in community's own repo 276 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 277 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 278 if communityJetstreamURL == "" { 279 // Local Jetstream for communities - filter to our instance's collections 280 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 281 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 282 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 283 } 284 285 // Initialize community event consumer with did:web verification 286 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true" 287 if skipDIDWebVerification { 288 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)") 289 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production") 290 } 291 292 // Pass identity resolver to consumer for PLC handle resolution (source of truth) 293 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver) 294 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 295 296 go func() { 297 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 298 log.Printf("Community Jetstream consumer stopped: %v", startErr) 299 } 300 }() 301 302 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 303 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 304 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 305 306 // Start JWKS cache cleanup background job 307 go func() { 308 ticker := time.NewTicker(1 * time.Hour) 309 defer ticker.Stop() 310 for range ticker.C { 311 jwksFetcher.CleanupExpiredCache() 312 log.Println("JWKS cache cleanup completed") 313 } 314 }() 315 316 log.Println("Started JWKS cache cleanup background job (runs hourly)") 317 318 // Initialize aggregator service 319 aggregatorRepo := postgresRepo.NewAggregatorRepository(db) 320 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 321 log.Println("✅ Aggregator service initialized") 322 323 // Initialize unfurl cache repository 324 unfurlRepo := unfurl.NewRepository(db) 325 326 // Initialize blob upload service 327 blobService := blobs.NewBlobService(defaultPDS) 328 329 // Initialize unfurl service with configuration 330 unfurlService := unfurl.NewService( 331 unfurlRepo, 332 unfurl.WithTimeout(10*time.Second), 333 unfurl.WithUserAgent("CovesBot/1.0 (+https://coves.social)"), 334 unfurl.WithCacheTTL(24*time.Hour), 335 ) 336 log.Println("✅ Unfurl and blob services initialized") 337 338 // Initialize post service (with aggregator support) 339 postRepo := postgresRepo.NewPostRepository(db) 340 postService := posts.NewPostService(postRepo, communityService, aggregatorService, blobService, unfurlService, defaultPDS) 341 342 // Initialize vote repository (used by Jetstream consumer for indexing) 343 voteRepo := postgresRepo.NewVoteRepository(db) 344 log.Println("✅ Vote repository initialized (Jetstream indexing only)") 345 346 // Initialize comment repository (used by Jetstream consumer for indexing) 347 commentRepo := postgresRepo.NewCommentRepository(db) 348 log.Println("✅ Comment repository initialized (Jetstream indexing only)") 349 350 // Initialize comment service (for query API) 351 // Requires user and community repos for proper author/community hydration per lexicon 352 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo) 353 log.Println("✅ Comment service initialized (with author/community hydration)") 354 355 // Initialize feed service 356 feedRepo := postgresRepo.NewCommunityFeedRepository(db, cursorSecret) 357 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 358 log.Println("✅ Feed service initialized") 359 360 // Initialize timeline service (home feed from subscribed communities) 361 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret) 362 timelineService := timeline.NewTimelineService(timelineRepo) 363 log.Println("✅ Timeline service initialized") 364 365 // Initialize discover service (public feed from all communities) 366 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret) 367 discoverService := discover.NewDiscoverService(discoverRepo) 368 log.Println("✅ Discover service initialized") 369 370 // Start Jetstream consumer for posts 371 // This consumer indexes posts created in community repositories via the firehose 372 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist 373 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL") 374 if postJetstreamURL == "" { 375 // Listen to post record creation events 376 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post" 377 } 378 379 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 380 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL) 381 382 go func() { 383 if startErr := postJetstreamConnector.Start(ctx); startErr != nil { 384 log.Printf("Post Jetstream consumer stopped: %v", startErr) 385 } 386 }() 387 388 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL) 389 log.Println(" - Indexing: social.coves.community.post CREATE operations") 390 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented") 391 392 // Start Jetstream consumer for aggregators 393 // This consumer indexes aggregator service declarations and authorization records 394 // Following Bluesky's pattern for feed generators and labelers 395 // NOTE: Uses the same Jetstream as communities, just filtering different collections 396 aggregatorJetstreamURL := communityJetstreamURL 397 // Override if specific URL needed for testing 398 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" { 399 aggregatorJetstreamURL = envURL 400 } else if aggregatorJetstreamURL == "" { 401 // Fallback if community URL also not set 402 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization" 403 } 404 405 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 406 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL) 407 408 go func() { 409 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil { 410 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr) 411 } 412 }() 413 414 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL) 415 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)") 416 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)") 417 418 // Start Jetstream consumer for votes 419 // This consumer indexes votes from user repositories and updates post vote counts 420 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL") 421 if voteJetstreamURL == "" { 422 // Listen to vote record CREATE/DELETE events from user repositories 423 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote" 424 } 425 426 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db) 427 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL) 428 429 go func() { 430 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil { 431 log.Printf("Vote Jetstream consumer stopped: %v", startErr) 432 } 433 }() 434 435 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL) 436 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations") 437 log.Println(" - Updating: Post vote counts atomically") 438 439 // Start Jetstream consumer for comments 440 // This consumer indexes comments from user repositories and updates parent counts 441 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL") 442 if commentJetstreamURL == "" { 443 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories 444 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.comment" 445 } 446 447 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 448 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL) 449 450 go func() { 451 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil { 452 log.Printf("Comment Jetstream consumer stopped: %v", startErr) 453 } 454 }() 455 456 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL) 457 log.Println(" - Indexing: social.coves.community.comment CREATE/UPDATE/DELETE operations") 458 log.Println(" - Updating: Post comment counts and comment reply counts atomically") 459 460 // Register XRPC routes 461 routes.RegisterUserRoutes(r, userService) 462 routes.RegisterCommunityRoutes(r, communityService, authMiddleware, allowedCommunityCreators) 463 log.Println("Community XRPC endpoints registered with OAuth authentication") 464 465 routes.RegisterPostRoutes(r, postService, authMiddleware) 466 log.Println("Post XRPC endpoints registered with OAuth authentication") 467 468 // Vote write endpoints removed - clients write directly to their PDS 469 // The AppView indexes votes from Jetstream (see vote consumer above) 470 471 routes.RegisterCommunityFeedRoutes(r, feedService) 472 log.Println("Feed XRPC endpoints registered (public, no auth required)") 473 474 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware) 475 log.Println("Timeline XRPC endpoints registered (requires authentication)") 476 477 routes.RegisterDiscoverRoutes(r, discoverService) 478 log.Println("Discover XRPC endpoints registered (public, no auth required)") 479 480 routes.RegisterAggregatorRoutes(r, aggregatorService, userService, identityResolver) 481 log.Println("Aggregator XRPC endpoints registered (query endpoints public, registration endpoint public)") 482 483 // Comment query API - supports optional authentication for viewer state 484 // Stricter rate limiting for expensive nested comment queries 485 commentRateLimiter := middleware.NewRateLimiter(20, 1*time.Minute) 486 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService) 487 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter) 488 r.Handle( 489 "/xrpc/social.coves.community.comment.getComments", 490 commentRateLimiter.Middleware( 491 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments), 492 ), 493 ) 494 log.Println("✅ Comment query API registered (20 req/min rate limit)") 495 log.Println(" - GET /xrpc/social.coves.community.comment.getComments") 496 497 // Health check endpoints 498 healthHandler := func(w http.ResponseWriter, r *http.Request) { 499 w.WriteHeader(http.StatusOK) 500 if _, err := w.Write([]byte("OK")); err != nil { 501 log.Printf("Failed to write health check response: %v", err) 502 } 503 } 504 r.Get("/health", healthHandler) 505 r.Get("/xrpc/_health", healthHandler) 506 507 // Check PORT first (docker-compose), then APPVIEW_PORT (legacy) 508 port := os.Getenv("PORT") 509 if port == "" { 510 port = os.Getenv("APPVIEW_PORT") 511 } 512 if port == "" { 513 port = "8080" 514 } 515 516 // Create HTTP server for graceful shutdown 517 server := &http.Server{ 518 Addr: ":" + port, 519 Handler: r, 520 } 521 522 // Channel to listen for shutdown signals 523 stop := make(chan os.Signal, 1) 524 signal.Notify(stop, os.Interrupt, syscall.SIGTERM) 525 526 // Start server in goroutine 527 go func() { 528 fmt.Printf("Coves AppView starting on port %s\n", port) 529 fmt.Printf("Default PDS: %s\n", defaultPDS) 530 if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { 531 log.Fatalf("Server error: %v", err) 532 } 533 }() 534 535 // Wait for shutdown signal 536 <-stop 537 log.Println("Shutting down server...") 538 539 // Graceful shutdown with timeout 540 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 541 defer cancel() 542 543 // Stop auth middleware background goroutines (DPoP replay cache cleanup) 544 authMiddleware.Stop() 545 log.Println("Auth middleware stopped") 546 547 if err := server.Shutdown(ctx); err != nil { 548 log.Fatalf("Server shutdown error: %v", err) 549 } 550 log.Println("Server stopped gracefully") 551} 552 553// authenticateWithPDS creates a session on the PDS and returns an access token 554func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 555 type CreateSessionRequest struct { 556 Identifier string `json:"identifier"` 557 Password string `json:"password"` 558 } 559 560 type CreateSessionResponse struct { 561 DID string `json:"did"` 562 Handle string `json:"handle"` 563 AccessJwt string `json:"accessJwt"` 564 } 565 566 reqBody, err := json.Marshal(CreateSessionRequest{ 567 Identifier: handle, 568 Password: password, 569 }) 570 if err != nil { 571 return "", fmt.Errorf("failed to marshal request: %w", err) 572 } 573 574 resp, err := http.Post( 575 pdsURL+"/xrpc/com.atproto.server.createSession", 576 "application/json", 577 bytes.NewReader(reqBody), 578 ) 579 if err != nil { 580 return "", fmt.Errorf("failed to call PDS: %w", err) 581 } 582 defer func() { 583 if closeErr := resp.Body.Close(); closeErr != nil { 584 log.Printf("Failed to close response body: %v", closeErr) 585 } 586 }() 587 588 if resp.StatusCode != http.StatusOK { 589 body, readErr := io.ReadAll(resp.Body) 590 if readErr != nil { 591 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 592 } 593 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 594 } 595 596 var session CreateSessionResponse 597 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 598 return "", fmt.Errorf("failed to decode response: %w", err) 599 } 600 601 return session.AccessJwt, nil 602}