A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/api/routes" 6 "Coves/internal/atproto/auth" 7 "Coves/internal/atproto/identity" 8 "Coves/internal/atproto/jetstream" 9 "Coves/internal/core/aggregators" 10 "Coves/internal/core/blobs" 11 "Coves/internal/core/comments" 12 "Coves/internal/core/communities" 13 "Coves/internal/core/communityFeeds" 14 "Coves/internal/core/discover" 15 "Coves/internal/core/posts" 16 "Coves/internal/core/timeline" 17 "Coves/internal/core/unfurl" 18 "Coves/internal/core/users" 19 "bytes" 20 "context" 21 "database/sql" 22 "encoding/json" 23 "fmt" 24 "io" 25 "log" 26 "net/http" 27 "os" 28 "strings" 29 "time" 30 31 "github.com/go-chi/chi/v5" 32 chiMiddleware "github.com/go-chi/chi/v5/middleware" 33 _ "github.com/lib/pq" 34 "github.com/pressly/goose/v3" 35 36 commentsAPI "Coves/internal/api/handlers/comments" 37 38 postgresRepo "Coves/internal/db/postgres" 39) 40 41func main() { 42 // Database configuration (AppView database) 43 dbURL := os.Getenv("DATABASE_URL") 44 if dbURL == "" { 45 // Use dev database from .env.dev 46 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 47 } 48 49 // Default PDS URL for this Coves instance (supports self-hosting) 50 defaultPDS := os.Getenv("PDS_URL") 51 if defaultPDS == "" { 52 defaultPDS = "http://localhost:3001" // Local dev PDS 53 } 54 55 // Cursor secret for HMAC signing (prevents cursor manipulation) 56 cursorSecret := os.Getenv("CURSOR_SECRET") 57 if cursorSecret == "" { 58 // Generate a random secret if not set (dev mode) 59 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value 60 cursorSecret = "dev-cursor-secret-change-in-production" 61 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!") 62 } 63 64 db, err := sql.Open("postgres", dbURL) 65 if err != nil { 66 log.Fatal("Failed to connect to database:", err) 67 } 68 defer func() { 69 if closeErr := db.Close(); closeErr != nil { 70 log.Printf("Failed to close database connection: %v", closeErr) 71 } 72 }() 73 74 if err = db.Ping(); err != nil { 75 log.Fatal("Failed to ping database:", err) 76 } 77 78 log.Println("Connected to AppView database") 79 80 // Run migrations 81 if err = goose.SetDialect("postgres"); err != nil { 82 log.Fatal("Failed to set goose dialect:", err) 83 } 84 85 if err = goose.Up(db, "internal/db/migrations"); err != nil { 86 log.Fatal("Failed to run migrations:", err) 87 } 88 89 log.Println("Migrations completed successfully") 90 91 r := chi.NewRouter() 92 93 r.Use(chiMiddleware.Logger) 94 r.Use(chiMiddleware.Recoverer) 95 r.Use(chiMiddleware.RequestID) 96 97 // Rate limiting: 100 requests per minute per IP 98 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 99 r.Use(rateLimiter.Middleware) 100 101 // Initialize identity resolver 102 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 103 // directory as DID registration to ensure E2E tests work without hitting 104 // the production plc.directory 105 identityConfig := identity.DefaultConfig() 106 107 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 108 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 109 if plcDirectoryURL == "" { 110 plcDirectoryURL = "https://plc.directory" // Default to production PLC 111 } 112 113 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 114 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 115 if isDevEnv { 116 identityConfig.PLCURL = plcDirectoryURL 117 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 118 } else { 119 // Production: Allow separate IDENTITY_PLC_URL for read operations 120 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 121 identityConfig.PLCURL = identityPLCURL 122 } else { 123 identityConfig.PLCURL = plcDirectoryURL 124 } 125 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 126 } 127 128 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 129 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 130 identityConfig.CacheTTL = duration 131 } 132 } 133 134 identityResolver := identity.NewResolver(db, identityConfig) 135 136 // Initialize atProto auth middleware for JWT validation 137 // Phase 1: Set skipVerify=true to test JWT parsing only 138 // Phase 2: Set skipVerify=false to enable full signature verification 139 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true" 140 if skipVerify { 141 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)") 142 log.Println(" Set AUTH_SKIP_VERIFY=false for production") 143 } 144 145 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour 146 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL) 147 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify) 148 log.Println("✅ atProto auth middleware initialized") 149 150 // Initialize repositories and services 151 userRepo := postgresRepo.NewUserRepository(db) 152 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 153 154 communityRepo := postgresRepo.NewCommunityRepository(db) 155 156 // V2.0: PDS-managed DID generation 157 // Community DIDs and keys are generated entirely by the PDS 158 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 159 160 instanceDID := os.Getenv("INSTANCE_DID") 161 if instanceDID == "" { 162 instanceDID = "did:web:coves.social" // Default for development 163 } 164 165 // V2: Extract instance domain for community handles 166 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 167 // We cannot allow arbitrary domains to prevent impersonation attacks 168 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 169 // 170 // SECURITY: did:web domain verification is implemented in the Jetstream consumer 171 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim() 172 // Communities with mismatched hostedBy domains are rejected during indexing 173 var instanceDomain string 174 if strings.HasPrefix(instanceDID, "did:web:") { 175 // Extract domain from did:web (this is the authoritative source) 176 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 177 } else { 178 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 179 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 180 if instanceDomain == "" { 181 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 182 } 183 } 184 185 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 186 187 // Community creation restriction - if set, only these DIDs can create communities 188 var allowedCommunityCreators []string 189 if communityCreators := os.Getenv("COMMUNITY_CREATORS"); communityCreators != "" { 190 for _, did := range strings.Split(communityCreators, ",") { 191 did = strings.TrimSpace(did) 192 if did != "" { 193 allowedCommunityCreators = append(allowedCommunityCreators, did) 194 } 195 } 196 log.Printf("Community creation restricted to %d DIDs", len(allowedCommunityCreators)) 197 } else { 198 log.Println("Community creation open to all authenticated users") 199 } 200 201 // V2.0: Initialize PDS account provisioner for communities (simplified) 202 // PDS handles all DID and key generation - no Coves-side cryptography needed 203 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 204 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 205 log.Printf(" - Communities will be created at: %s", defaultPDS) 206 log.Printf(" - PDS will generate and manage all DIDs and keys") 207 208 // Initialize community service (no longer needs didGenerator directly) 209 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 210 211 // Authenticate Coves instance with PDS to enable community record writes 212 // The instance needs a PDS account to write community records it owns 213 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 214 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 215 if pdsHandle != "" && pdsPassword != "" { 216 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 217 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 218 if authErr != nil { 219 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 220 log.Println("Community creation will fail until PDS authentication is configured") 221 } else { 222 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 223 svc.SetPDSAccessToken(accessToken) 224 log.Println("✓ Coves instance authenticated with PDS") 225 } 226 } 227 } else { 228 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 229 log.Println("Community creation via write-forward is disabled") 230 } 231 232 // Start Jetstream consumer for read-forward user indexing 233 jetstreamURL := os.Getenv("JETSTREAM_URL") 234 if jetstreamURL == "" { 235 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 236 } 237 238 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 239 240 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 241 ctx := context.Background() 242 go func() { 243 if startErr := userConsumer.Start(ctx); startErr != nil { 244 log.Printf("Jetstream consumer stopped: %v", startErr) 245 } 246 }() 247 248 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 249 250 // Start Jetstream consumer for community events (profiles and subscriptions) 251 // This consumer indexes: 252 // 1. Community profiles (social.coves.community.profile) - in community's own repo 253 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 254 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 255 if communityJetstreamURL == "" { 256 // Local Jetstream for communities - filter to our instance's collections 257 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 258 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 259 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 260 } 261 262 // Initialize community event consumer with did:web verification 263 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true" 264 if skipDIDWebVerification { 265 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)") 266 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production") 267 } 268 269 // Pass identity resolver to consumer for PLC handle resolution (source of truth) 270 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver) 271 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 272 273 go func() { 274 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 275 log.Printf("Community Jetstream consumer stopped: %v", startErr) 276 } 277 }() 278 279 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 280 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 281 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 282 283 // Start JWKS cache cleanup background job 284 go func() { 285 ticker := time.NewTicker(1 * time.Hour) 286 defer ticker.Stop() 287 for range ticker.C { 288 jwksFetcher.CleanupExpiredCache() 289 log.Println("JWKS cache cleanup completed") 290 } 291 }() 292 293 log.Println("Started JWKS cache cleanup background job (runs hourly)") 294 295 // Initialize aggregator service 296 aggregatorRepo := postgresRepo.NewAggregatorRepository(db) 297 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 298 log.Println("✅ Aggregator service initialized") 299 300 // Initialize unfurl cache repository 301 unfurlRepo := unfurl.NewRepository(db) 302 303 // Initialize blob upload service 304 blobService := blobs.NewBlobService(defaultPDS) 305 306 // Initialize unfurl service with configuration 307 unfurlService := unfurl.NewService( 308 unfurlRepo, 309 unfurl.WithTimeout(10*time.Second), 310 unfurl.WithUserAgent("CovesBot/1.0 (+https://coves.social)"), 311 unfurl.WithCacheTTL(24*time.Hour), 312 ) 313 log.Println("✅ Unfurl and blob services initialized") 314 315 // Initialize post service (with aggregator support) 316 postRepo := postgresRepo.NewPostRepository(db) 317 postService := posts.NewPostService(postRepo, communityService, aggregatorService, blobService, unfurlService, defaultPDS) 318 319 // Initialize vote repository (used by Jetstream consumer for indexing) 320 voteRepo := postgresRepo.NewVoteRepository(db) 321 log.Println("✅ Vote repository initialized (Jetstream indexing only)") 322 323 // Initialize comment repository (used by Jetstream consumer for indexing) 324 commentRepo := postgresRepo.NewCommentRepository(db) 325 log.Println("✅ Comment repository initialized (Jetstream indexing only)") 326 327 // Initialize comment service (for query API) 328 // Requires user and community repos for proper author/community hydration per lexicon 329 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo) 330 log.Println("✅ Comment service initialized (with author/community hydration)") 331 332 // Initialize feed service 333 feedRepo := postgresRepo.NewCommunityFeedRepository(db, cursorSecret) 334 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 335 log.Println("✅ Feed service initialized") 336 337 // Initialize timeline service (home feed from subscribed communities) 338 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret) 339 timelineService := timeline.NewTimelineService(timelineRepo) 340 log.Println("✅ Timeline service initialized") 341 342 // Initialize discover service (public feed from all communities) 343 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret) 344 discoverService := discover.NewDiscoverService(discoverRepo) 345 log.Println("✅ Discover service initialized") 346 347 // Start Jetstream consumer for posts 348 // This consumer indexes posts created in community repositories via the firehose 349 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist 350 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL") 351 if postJetstreamURL == "" { 352 // Listen to post record creation events 353 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post" 354 } 355 356 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 357 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL) 358 359 go func() { 360 if startErr := postJetstreamConnector.Start(ctx); startErr != nil { 361 log.Printf("Post Jetstream consumer stopped: %v", startErr) 362 } 363 }() 364 365 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL) 366 log.Println(" - Indexing: social.coves.community.post CREATE operations") 367 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented") 368 369 // Start Jetstream consumer for aggregators 370 // This consumer indexes aggregator service declarations and authorization records 371 // Following Bluesky's pattern for feed generators and labelers 372 // NOTE: Uses the same Jetstream as communities, just filtering different collections 373 aggregatorJetstreamURL := communityJetstreamURL 374 // Override if specific URL needed for testing 375 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" { 376 aggregatorJetstreamURL = envURL 377 } else if aggregatorJetstreamURL == "" { 378 // Fallback if community URL also not set 379 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization" 380 } 381 382 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 383 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL) 384 385 go func() { 386 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil { 387 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr) 388 } 389 }() 390 391 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL) 392 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)") 393 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)") 394 395 // Start Jetstream consumer for votes 396 // This consumer indexes votes from user repositories and updates post vote counts 397 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL") 398 if voteJetstreamURL == "" { 399 // Listen to vote record CREATE/DELETE events from user repositories 400 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote" 401 } 402 403 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db) 404 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL) 405 406 go func() { 407 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil { 408 log.Printf("Vote Jetstream consumer stopped: %v", startErr) 409 } 410 }() 411 412 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL) 413 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations") 414 log.Println(" - Updating: Post vote counts atomically") 415 416 // Start Jetstream consumer for comments 417 // This consumer indexes comments from user repositories and updates parent counts 418 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL") 419 if commentJetstreamURL == "" { 420 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories 421 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.comment" 422 } 423 424 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 425 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL) 426 427 go func() { 428 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil { 429 log.Printf("Comment Jetstream consumer stopped: %v", startErr) 430 } 431 }() 432 433 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL) 434 log.Println(" - Indexing: social.coves.community.comment CREATE/UPDATE/DELETE operations") 435 log.Println(" - Updating: Post comment counts and comment reply counts atomically") 436 437 // Register XRPC routes 438 routes.RegisterUserRoutes(r, userService) 439 routes.RegisterCommunityRoutes(r, communityService, authMiddleware, allowedCommunityCreators) 440 log.Println("Community XRPC endpoints registered with OAuth authentication") 441 442 routes.RegisterPostRoutes(r, postService, authMiddleware) 443 log.Println("Post XRPC endpoints registered with OAuth authentication") 444 445 // Vote write endpoints removed - clients write directly to their PDS 446 // The AppView indexes votes from Jetstream (see vote consumer above) 447 448 routes.RegisterCommunityFeedRoutes(r, feedService) 449 log.Println("Feed XRPC endpoints registered (public, no auth required)") 450 451 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware) 452 log.Println("Timeline XRPC endpoints registered (requires authentication)") 453 454 routes.RegisterDiscoverRoutes(r, discoverService) 455 log.Println("Discover XRPC endpoints registered (public, no auth required)") 456 457 routes.RegisterAggregatorRoutes(r, aggregatorService, userService, identityResolver) 458 log.Println("Aggregator XRPC endpoints registered (query endpoints public, registration endpoint public)") 459 460 // Comment query API - supports optional authentication for viewer state 461 // Stricter rate limiting for expensive nested comment queries 462 commentRateLimiter := middleware.NewRateLimiter(20, 1*time.Minute) 463 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService) 464 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter) 465 r.Handle( 466 "/xrpc/social.coves.community.comment.getComments", 467 commentRateLimiter.Middleware( 468 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments), 469 ), 470 ) 471 log.Println("✅ Comment query API registered (20 req/min rate limit)") 472 log.Println(" - GET /xrpc/social.coves.community.comment.getComments") 473 474 // Health check endpoints 475 healthHandler := func(w http.ResponseWriter, r *http.Request) { 476 w.WriteHeader(http.StatusOK) 477 if _, err := w.Write([]byte("OK")); err != nil { 478 log.Printf("Failed to write health check response: %v", err) 479 } 480 } 481 r.Get("/health", healthHandler) 482 r.Get("/xrpc/_health", healthHandler) 483 484 // Check PORT first (docker-compose), then APPVIEW_PORT (legacy) 485 port := os.Getenv("PORT") 486 if port == "" { 487 port = os.Getenv("APPVIEW_PORT") 488 } 489 if port == "" { 490 port = "8080" 491 } 492 493 fmt.Printf("Coves AppView starting on port %s\n", port) 494 fmt.Printf("Default PDS: %s\n", defaultPDS) 495 log.Fatal(http.ListenAndServe(":"+port, r)) 496} 497 498// authenticateWithPDS creates a session on the PDS and returns an access token 499func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 500 type CreateSessionRequest struct { 501 Identifier string `json:"identifier"` 502 Password string `json:"password"` 503 } 504 505 type CreateSessionResponse struct { 506 DID string `json:"did"` 507 Handle string `json:"handle"` 508 AccessJwt string `json:"accessJwt"` 509 } 510 511 reqBody, err := json.Marshal(CreateSessionRequest{ 512 Identifier: handle, 513 Password: password, 514 }) 515 if err != nil { 516 return "", fmt.Errorf("failed to marshal request: %w", err) 517 } 518 519 resp, err := http.Post( 520 pdsURL+"/xrpc/com.atproto.server.createSession", 521 "application/json", 522 bytes.NewReader(reqBody), 523 ) 524 if err != nil { 525 return "", fmt.Errorf("failed to call PDS: %w", err) 526 } 527 defer func() { 528 if closeErr := resp.Body.Close(); closeErr != nil { 529 log.Printf("Failed to close response body: %v", closeErr) 530 } 531 }() 532 533 if resp.StatusCode != http.StatusOK { 534 body, readErr := io.ReadAll(resp.Body) 535 if readErr != nil { 536 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 537 } 538 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 539 } 540 541 var session CreateSessionResponse 542 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 543 return "", fmt.Errorf("failed to decode response: %w", err) 544 } 545 546 return session.AccessJwt, nil 547}