A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/api/routes"
6 "Coves/internal/atproto/auth"
7 "Coves/internal/atproto/identity"
8 "Coves/internal/atproto/jetstream"
9 "Coves/internal/core/aggregators"
10 "Coves/internal/core/blobs"
11 "Coves/internal/core/comments"
12 "Coves/internal/core/communities"
13 "Coves/internal/core/communityFeeds"
14 "Coves/internal/core/discover"
15 "Coves/internal/core/posts"
16 "Coves/internal/core/timeline"
17 "Coves/internal/core/unfurl"
18 "Coves/internal/core/users"
19 "bytes"
20 "context"
21 "database/sql"
22 "encoding/json"
23 "fmt"
24 "io"
25 "log"
26 "net/http"
27 "os"
28 "os/signal"
29 "strings"
30 "syscall"
31 "time"
32
33 "github.com/go-chi/chi/v5"
34 chiMiddleware "github.com/go-chi/chi/v5/middleware"
35 _ "github.com/lib/pq"
36 "github.com/pressly/goose/v3"
37
38 commentsAPI "Coves/internal/api/handlers/comments"
39
40 postgresRepo "Coves/internal/db/postgres"
41
42 indigoIdentity "github.com/bluesky-social/indigo/atproto/identity"
43)
44
45func main() {
46 // Database configuration (AppView database)
47 dbURL := os.Getenv("DATABASE_URL")
48 if dbURL == "" {
49 // Use dev database from .env.dev
50 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
51 }
52
53 // Default PDS URL for this Coves instance (supports self-hosting)
54 defaultPDS := os.Getenv("PDS_URL")
55 if defaultPDS == "" {
56 defaultPDS = "http://localhost:3001" // Local dev PDS
57 }
58
59 // Cursor secret for HMAC signing (prevents cursor manipulation)
60 cursorSecret := os.Getenv("CURSOR_SECRET")
61 if cursorSecret == "" {
62 // Generate a random secret if not set (dev mode)
63 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value
64 cursorSecret = "dev-cursor-secret-change-in-production"
65 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!")
66 }
67
68 db, err := sql.Open("postgres", dbURL)
69 if err != nil {
70 log.Fatal("Failed to connect to database:", err)
71 }
72 defer func() {
73 if closeErr := db.Close(); closeErr != nil {
74 log.Printf("Failed to close database connection: %v", closeErr)
75 }
76 }()
77
78 if err = db.Ping(); err != nil {
79 log.Fatal("Failed to ping database:", err)
80 }
81
82 log.Println("Connected to AppView database")
83
84 // Run migrations
85 if err = goose.SetDialect("postgres"); err != nil {
86 log.Fatal("Failed to set goose dialect:", err)
87 }
88
89 if err = goose.Up(db, "internal/db/migrations"); err != nil {
90 log.Fatal("Failed to run migrations:", err)
91 }
92
93 log.Println("Migrations completed successfully")
94
95 r := chi.NewRouter()
96
97 r.Use(chiMiddleware.Logger)
98 r.Use(chiMiddleware.Recoverer)
99 r.Use(chiMiddleware.RequestID)
100
101 // Rate limiting: 100 requests per minute per IP
102 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
103 r.Use(rateLimiter.Middleware)
104
105 // Initialize identity resolver
106 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC
107 // directory as DID registration to ensure E2E tests work without hitting
108 // the production plc.directory
109 identityConfig := identity.DefaultConfig()
110
111 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
112 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
113 if plcDirectoryURL == "" {
114 plcDirectoryURL = "https://plc.directory" // Default to production PLC
115 }
116
117 // In dev mode, use PLC_DIRECTORY_URL for identity resolution
118 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL
119 if isDevEnv {
120 identityConfig.PLCURL = plcDirectoryURL
121 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL)
122 } else {
123 // Production: Allow separate IDENTITY_PLC_URL for read operations
124 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" {
125 identityConfig.PLCURL = identityPLCURL
126 } else {
127 identityConfig.PLCURL = plcDirectoryURL
128 }
129 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL)
130 }
131
132 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
133 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil {
134 identityConfig.CacheTTL = duration
135 }
136 }
137
138 identityResolver := identity.NewResolver(db, identityConfig)
139
140 // Initialize atProto auth middleware for JWT validation
141 // Phase 1: Set skipVerify=true to test JWT parsing only
142 // Phase 2: Set skipVerify=false to enable full signature verification
143 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true"
144 if skipVerify {
145 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)")
146 log.Println(" Set AUTH_SKIP_VERIFY=false for production")
147 }
148
149 // Initialize Indigo directory for DID resolution (used by auth)
150 plcURL := os.Getenv("PLC_DIRECTORY_URL")
151 if plcURL == "" {
152 plcURL = "https://plc.directory"
153 }
154 indigoDir := &indigoIdentity.BaseDirectory{
155 PLCURL: plcURL,
156 HTTPClient: http.Client{Timeout: 10 * time.Second},
157 }
158
159 // Initialize JWT config early to cache HS256_ISSUERS and PDS_JWT_SECRET
160 // This avoids reading env vars on every request
161 auth.InitJWTConfig()
162
163 // Create combined key fetcher for both DID and URL issuers
164 // - DID issuers (did:plc:, did:web:) → resolved via DID document keys (ES256)
165 // - URL issuers → JWKS endpoint (fallback for legacy tokens)
166 jwksCacheTTL := 1 * time.Hour
167 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL)
168 keyFetcher := auth.NewCombinedKeyFetcher(indigoDir, jwksFetcher)
169
170 authMiddleware := middleware.NewAtProtoAuthMiddleware(keyFetcher, skipVerify)
171 log.Println("✅ atProto auth middleware initialized (DID + JWKS key resolution)")
172
173 // Initialize repositories and services
174 userRepo := postgresRepo.NewUserRepository(db)
175 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
176
177 communityRepo := postgresRepo.NewCommunityRepository(db)
178
179 // V2.0: PDS-managed DID generation
180 // Community DIDs and keys are generated entirely by the PDS
181 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach)
182
183 instanceDID := os.Getenv("INSTANCE_DID")
184 if instanceDID == "" {
185 instanceDID = "did:web:coves.social" // Default for development
186 }
187
188 // V2: Extract instance domain for community handles
189 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security
190 // We cannot allow arbitrary domains to prevent impersonation attacks
191 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance
192 //
193 // SECURITY: did:web domain verification is implemented in the Jetstream consumer
194 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim()
195 // Communities with mismatched hostedBy domains are rejected during indexing
196 var instanceDomain string
197 if strings.HasPrefix(instanceDID, "did:web:") {
198 // Extract domain from did:web (this is the authoritative source)
199 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
200 } else {
201 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN
202 instanceDomain = os.Getenv("INSTANCE_DOMAIN")
203 if instanceDomain == "" {
204 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs")
205 }
206 }
207
208 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID)
209
210 // Community creation restriction - if set, only these DIDs can create communities
211 var allowedCommunityCreators []string
212 if communityCreators := os.Getenv("COMMUNITY_CREATORS"); communityCreators != "" {
213 for _, did := range strings.Split(communityCreators, ",") {
214 did = strings.TrimSpace(did)
215 if did != "" {
216 allowedCommunityCreators = append(allowedCommunityCreators, did)
217 }
218 }
219 log.Printf("Community creation restricted to %d DIDs", len(allowedCommunityCreators))
220 } else {
221 log.Println("Community creation open to all authenticated users")
222 }
223
224 // V2.0: Initialize PDS account provisioner for communities (simplified)
225 // PDS handles all DID and key generation - no Coves-side cryptography needed
226 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS)
227 log.Printf("✅ Community provisioner initialized (PDS-managed keys)")
228 log.Printf(" - Communities will be created at: %s", defaultPDS)
229 log.Printf(" - PDS will generate and manage all DIDs and keys")
230
231 // Initialize community service (no longer needs didGenerator directly)
232 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner)
233
234 // Authenticate Coves instance with PDS to enable community record writes
235 // The instance needs a PDS account to write community records it owns
236 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
237 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
238 if pdsHandle != "" && pdsPassword != "" {
239 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
240 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
241 if authErr != nil {
242 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr)
243 log.Println("Community creation will fail until PDS authentication is configured")
244 } else {
245 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
246 svc.SetPDSAccessToken(accessToken)
247 log.Println("✓ Coves instance authenticated with PDS")
248 }
249 }
250 } else {
251 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
252 log.Println("Community creation via write-forward is disabled")
253 }
254
255 // Start Jetstream consumer for read-forward user indexing
256 jetstreamURL := os.Getenv("JETSTREAM_URL")
257 if jetstreamURL == "" {
258 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
259 }
260
261 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
262
263 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
264 ctx := context.Background()
265 go func() {
266 if startErr := userConsumer.Start(ctx); startErr != nil {
267 log.Printf("Jetstream consumer stopped: %v", startErr)
268 }
269 }()
270
271 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
272
273 // Start Jetstream consumer for community events (profiles and subscriptions)
274 // This consumer indexes:
275 // 1. Community profiles (social.coves.community.profile) - in community's own repo
276 // 2. User subscriptions (social.coves.community.subscription) - in user's repo
277 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL")
278 if communityJetstreamURL == "" {
279 // Local Jetstream for communities - filter to our instance's collections
280 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe)
281 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures
282 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription"
283 }
284
285 // Initialize community event consumer with did:web verification
286 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true"
287 if skipDIDWebVerification {
288 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)")
289 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production")
290 }
291
292 // Pass identity resolver to consumer for PLC handle resolution (source of truth)
293 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver)
294 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL)
295
296 go func() {
297 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil {
298 log.Printf("Community Jetstream consumer stopped: %v", startErr)
299 }
300 }()
301
302 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL)
303 log.Println(" - Indexing: social.coves.community.profile (community profiles)")
304 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)")
305
306 // Start JWKS cache cleanup background job
307 go func() {
308 ticker := time.NewTicker(1 * time.Hour)
309 defer ticker.Stop()
310 for range ticker.C {
311 jwksFetcher.CleanupExpiredCache()
312 log.Println("JWKS cache cleanup completed")
313 }
314 }()
315
316 log.Println("Started JWKS cache cleanup background job (runs hourly)")
317
318 // Initialize aggregator service
319 aggregatorRepo := postgresRepo.NewAggregatorRepository(db)
320 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService)
321 log.Println("✅ Aggregator service initialized")
322
323 // Initialize unfurl cache repository
324 unfurlRepo := unfurl.NewRepository(db)
325
326 // Initialize blob upload service
327 blobService := blobs.NewBlobService(defaultPDS)
328
329 // Initialize unfurl service with configuration
330 unfurlService := unfurl.NewService(
331 unfurlRepo,
332 unfurl.WithTimeout(10*time.Second),
333 unfurl.WithUserAgent("CovesBot/1.0 (+https://coves.social)"),
334 unfurl.WithCacheTTL(24*time.Hour),
335 )
336 log.Println("✅ Unfurl and blob services initialized")
337
338 // Initialize post service (with aggregator support)
339 postRepo := postgresRepo.NewPostRepository(db)
340 postService := posts.NewPostService(postRepo, communityService, aggregatorService, blobService, unfurlService, defaultPDS)
341
342 // Initialize vote repository (used by Jetstream consumer for indexing)
343 voteRepo := postgresRepo.NewVoteRepository(db)
344 log.Println("✅ Vote repository initialized (Jetstream indexing only)")
345
346 // Initialize comment repository (used by Jetstream consumer for indexing)
347 commentRepo := postgresRepo.NewCommentRepository(db)
348 log.Println("✅ Comment repository initialized (Jetstream indexing only)")
349
350 // Initialize comment service (for query API)
351 // Requires user and community repos for proper author/community hydration per lexicon
352 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo)
353 log.Println("✅ Comment service initialized (with author/community hydration)")
354
355 // Initialize feed service
356 feedRepo := postgresRepo.NewCommunityFeedRepository(db, cursorSecret)
357 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
358 log.Println("✅ Feed service initialized")
359
360 // Initialize timeline service (home feed from subscribed communities)
361 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret)
362 timelineService := timeline.NewTimelineService(timelineRepo)
363 log.Println("✅ Timeline service initialized")
364
365 // Initialize discover service (public feed from all communities)
366 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret)
367 discoverService := discover.NewDiscoverService(discoverRepo)
368 log.Println("✅ Discover service initialized")
369
370 // Start Jetstream consumer for posts
371 // This consumer indexes posts created in community repositories via the firehose
372 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist
373 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL")
374 if postJetstreamURL == "" {
375 // Listen to post record creation events
376 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post"
377 }
378
379 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
380 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL)
381
382 go func() {
383 if startErr := postJetstreamConnector.Start(ctx); startErr != nil {
384 log.Printf("Post Jetstream consumer stopped: %v", startErr)
385 }
386 }()
387
388 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL)
389 log.Println(" - Indexing: social.coves.community.post CREATE operations")
390 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented")
391
392 // Start Jetstream consumer for aggregators
393 // This consumer indexes aggregator service declarations and authorization records
394 // Following Bluesky's pattern for feed generators and labelers
395 // NOTE: Uses the same Jetstream as communities, just filtering different collections
396 aggregatorJetstreamURL := communityJetstreamURL
397 // Override if specific URL needed for testing
398 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" {
399 aggregatorJetstreamURL = envURL
400 } else if aggregatorJetstreamURL == "" {
401 // Fallback if community URL also not set
402 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization"
403 }
404
405 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo)
406 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL)
407
408 go func() {
409 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil {
410 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr)
411 }
412 }()
413
414 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL)
415 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)")
416 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)")
417
418 // Start Jetstream consumer for votes
419 // This consumer indexes votes from user repositories and updates post vote counts
420 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL")
421 if voteJetstreamURL == "" {
422 // Listen to vote record CREATE/DELETE events from user repositories
423 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote"
424 }
425
426 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
427 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL)
428
429 go func() {
430 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil {
431 log.Printf("Vote Jetstream consumer stopped: %v", startErr)
432 }
433 }()
434
435 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL)
436 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations")
437 log.Println(" - Updating: Post vote counts atomically")
438
439 // Start Jetstream consumer for comments
440 // This consumer indexes comments from user repositories and updates parent counts
441 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL")
442 if commentJetstreamURL == "" {
443 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories
444 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.comment"
445 }
446
447 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db)
448 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL)
449
450 go func() {
451 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil {
452 log.Printf("Comment Jetstream consumer stopped: %v", startErr)
453 }
454 }()
455
456 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL)
457 log.Println(" - Indexing: social.coves.community.comment CREATE/UPDATE/DELETE operations")
458 log.Println(" - Updating: Post comment counts and comment reply counts atomically")
459
460 // Register XRPC routes
461 routes.RegisterUserRoutes(r, userService)
462 routes.RegisterCommunityRoutes(r, communityService, authMiddleware, allowedCommunityCreators)
463 log.Println("Community XRPC endpoints registered with OAuth authentication")
464
465 routes.RegisterPostRoutes(r, postService, authMiddleware)
466 log.Println("Post XRPC endpoints registered with OAuth authentication")
467
468 // Vote write endpoints removed - clients write directly to their PDS
469 // The AppView indexes votes from Jetstream (see vote consumer above)
470
471 routes.RegisterCommunityFeedRoutes(r, feedService)
472 log.Println("Feed XRPC endpoints registered (public, no auth required)")
473
474 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware)
475 log.Println("Timeline XRPC endpoints registered (requires authentication)")
476
477 routes.RegisterDiscoverRoutes(r, discoverService)
478 log.Println("Discover XRPC endpoints registered (public, no auth required)")
479
480 routes.RegisterAggregatorRoutes(r, aggregatorService, userService, identityResolver)
481 log.Println("Aggregator XRPC endpoints registered (query endpoints public, registration endpoint public)")
482
483 // Comment query API - supports optional authentication for viewer state
484 // Stricter rate limiting for expensive nested comment queries
485 commentRateLimiter := middleware.NewRateLimiter(20, 1*time.Minute)
486 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService)
487 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter)
488 r.Handle(
489 "/xrpc/social.coves.community.comment.getComments",
490 commentRateLimiter.Middleware(
491 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments),
492 ),
493 )
494 log.Println("✅ Comment query API registered (20 req/min rate limit)")
495 log.Println(" - GET /xrpc/social.coves.community.comment.getComments")
496
497 // Health check endpoints
498 healthHandler := func(w http.ResponseWriter, r *http.Request) {
499 w.WriteHeader(http.StatusOK)
500 if _, err := w.Write([]byte("OK")); err != nil {
501 log.Printf("Failed to write health check response: %v", err)
502 }
503 }
504 r.Get("/health", healthHandler)
505 r.Get("/xrpc/_health", healthHandler)
506
507 // Check PORT first (docker-compose), then APPVIEW_PORT (legacy)
508 port := os.Getenv("PORT")
509 if port == "" {
510 port = os.Getenv("APPVIEW_PORT")
511 }
512 if port == "" {
513 port = "8080"
514 }
515
516 // Create HTTP server for graceful shutdown
517 server := &http.Server{
518 Addr: ":" + port,
519 Handler: r,
520 }
521
522 // Channel to listen for shutdown signals
523 stop := make(chan os.Signal, 1)
524 signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
525
526 // Start server in goroutine
527 go func() {
528 fmt.Printf("Coves AppView starting on port %s\n", port)
529 fmt.Printf("Default PDS: %s\n", defaultPDS)
530 if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
531 log.Fatalf("Server error: %v", err)
532 }
533 }()
534
535 // Wait for shutdown signal
536 <-stop
537 log.Println("Shutting down server...")
538
539 // Graceful shutdown with timeout
540 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
541 defer cancel()
542
543 // Stop auth middleware background goroutines (DPoP replay cache cleanup)
544 authMiddleware.Stop()
545 log.Println("Auth middleware stopped")
546
547 if err := server.Shutdown(ctx); err != nil {
548 log.Fatalf("Server shutdown error: %v", err)
549 }
550 log.Println("Server stopped gracefully")
551}
552
553// authenticateWithPDS creates a session on the PDS and returns an access token
554func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
555 type CreateSessionRequest struct {
556 Identifier string `json:"identifier"`
557 Password string `json:"password"`
558 }
559
560 type CreateSessionResponse struct {
561 DID string `json:"did"`
562 Handle string `json:"handle"`
563 AccessJwt string `json:"accessJwt"`
564 }
565
566 reqBody, err := json.Marshal(CreateSessionRequest{
567 Identifier: handle,
568 Password: password,
569 })
570 if err != nil {
571 return "", fmt.Errorf("failed to marshal request: %w", err)
572 }
573
574 resp, err := http.Post(
575 pdsURL+"/xrpc/com.atproto.server.createSession",
576 "application/json",
577 bytes.NewReader(reqBody),
578 )
579 if err != nil {
580 return "", fmt.Errorf("failed to call PDS: %w", err)
581 }
582 defer func() {
583 if closeErr := resp.Body.Close(); closeErr != nil {
584 log.Printf("Failed to close response body: %v", closeErr)
585 }
586 }()
587
588 if resp.StatusCode != http.StatusOK {
589 body, readErr := io.ReadAll(resp.Body)
590 if readErr != nil {
591 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr)
592 }
593 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
594 }
595
596 var session CreateSessionResponse
597 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
598 return "", fmt.Errorf("failed to decode response: %w", err)
599 }
600
601 return session.AccessJwt, nil
602}