A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "bytes" 5 "context" 6 "database/sql" 7 "encoding/json" 8 "fmt" 9 "io" 10 "log" 11 "net/http" 12 "os" 13 "strings" 14 "time" 15 16 "Coves/internal/api/middleware" 17 "Coves/internal/api/routes" 18 "Coves/internal/atproto/auth" 19 "Coves/internal/atproto/identity" 20 "Coves/internal/atproto/jetstream" 21 "Coves/internal/core/aggregators" 22 "Coves/internal/core/blobs" 23 "Coves/internal/core/comments" 24 "Coves/internal/core/communities" 25 "Coves/internal/core/communityFeeds" 26 "Coves/internal/core/discover" 27 "Coves/internal/core/posts" 28 "Coves/internal/core/timeline" 29 "Coves/internal/core/unfurl" 30 "Coves/internal/core/users" 31 32 "github.com/go-chi/chi/v5" 33 chiMiddleware "github.com/go-chi/chi/v5/middleware" 34 _ "github.com/lib/pq" 35 "github.com/pressly/goose/v3" 36 37 commentsAPI "Coves/internal/api/handlers/comments" 38 39 postgresRepo "Coves/internal/db/postgres" 40) 41 42func main() { 43 // Database configuration (AppView database) 44 dbURL := os.Getenv("DATABASE_URL") 45 if dbURL == "" { 46 // Use dev database from .env.dev 47 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 48 } 49 50 // Default PDS URL for this Coves instance (supports self-hosting) 51 defaultPDS := os.Getenv("PDS_URL") 52 if defaultPDS == "" { 53 defaultPDS = "http://localhost:3001" // Local dev PDS 54 } 55 56 // Cursor secret for HMAC signing (prevents cursor manipulation) 57 cursorSecret := os.Getenv("CURSOR_SECRET") 58 if cursorSecret == "" { 59 // Generate a random secret if not set (dev mode) 60 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value 61 cursorSecret = "dev-cursor-secret-change-in-production" 62 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!") 63 } 64 65 db, err := sql.Open("postgres", dbURL) 66 if err != nil { 67 log.Fatal("Failed to connect to database:", err) 68 } 69 defer func() { 70 if closeErr := db.Close(); closeErr != nil { 71 log.Printf("Failed to close database connection: %v", closeErr) 72 } 73 }() 74 75 if err = db.Ping(); err != nil { 76 log.Fatal("Failed to ping database:", err) 77 } 78 79 log.Println("Connected to AppView database") 80 81 // Run migrations 82 if err = goose.SetDialect("postgres"); err != nil { 83 log.Fatal("Failed to set goose dialect:", err) 84 } 85 86 if err = goose.Up(db, "internal/db/migrations"); err != nil { 87 log.Fatal("Failed to run migrations:", err) 88 } 89 90 log.Println("Migrations completed successfully") 91 92 r := chi.NewRouter() 93 94 r.Use(chiMiddleware.Logger) 95 r.Use(chiMiddleware.Recoverer) 96 r.Use(chiMiddleware.RequestID) 97 98 // Rate limiting: 100 requests per minute per IP 99 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 100 r.Use(rateLimiter.Middleware) 101 102 // Initialize identity resolver 103 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 104 // directory as DID registration to ensure E2E tests work without hitting 105 // the production plc.directory 106 identityConfig := identity.DefaultConfig() 107 108 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 109 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 110 if plcDirectoryURL == "" { 111 plcDirectoryURL = "https://plc.directory" // Default to production PLC 112 } 113 114 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 115 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 116 if isDevEnv { 117 identityConfig.PLCURL = plcDirectoryURL 118 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 119 } else { 120 // Production: Allow separate IDENTITY_PLC_URL for read operations 121 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 122 identityConfig.PLCURL = identityPLCURL 123 } else { 124 identityConfig.PLCURL = plcDirectoryURL 125 } 126 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 127 } 128 129 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 130 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 131 identityConfig.CacheTTL = duration 132 } 133 } 134 135 identityResolver := identity.NewResolver(db, identityConfig) 136 137 // Initialize atProto auth middleware for JWT validation 138 // Phase 1: Set skipVerify=true to test JWT parsing only 139 // Phase 2: Set skipVerify=false to enable full signature verification 140 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true" 141 if skipVerify { 142 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)") 143 log.Println(" Set AUTH_SKIP_VERIFY=false for production") 144 } 145 146 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour 147 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL) 148 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify) 149 log.Println("✅ atProto auth middleware initialized") 150 151 // Initialize repositories and services 152 userRepo := postgresRepo.NewUserRepository(db) 153 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 154 155 communityRepo := postgresRepo.NewCommunityRepository(db) 156 157 // V2.0: PDS-managed DID generation 158 // Community DIDs and keys are generated entirely by the PDS 159 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 160 161 instanceDID := os.Getenv("INSTANCE_DID") 162 if instanceDID == "" { 163 instanceDID = "did:web:coves.social" // Default for development 164 } 165 166 // V2: Extract instance domain for community handles 167 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 168 // We cannot allow arbitrary domains to prevent impersonation attacks 169 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 170 // 171 // SECURITY: did:web domain verification is implemented in the Jetstream consumer 172 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim() 173 // Communities with mismatched hostedBy domains are rejected during indexing 174 var instanceDomain string 175 if strings.HasPrefix(instanceDID, "did:web:") { 176 // Extract domain from did:web (this is the authoritative source) 177 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 178 } else { 179 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 180 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 181 if instanceDomain == "" { 182 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 183 } 184 } 185 186 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 187 188 // Community creation restriction - if set, only these DIDs can create communities 189 var allowedCommunityCreators []string 190 if communityCreators := os.Getenv("COMMUNITY_CREATORS"); communityCreators != "" { 191 for _, did := range strings.Split(communityCreators, ",") { 192 did = strings.TrimSpace(did) 193 if did != "" { 194 allowedCommunityCreators = append(allowedCommunityCreators, did) 195 } 196 } 197 log.Printf("Community creation restricted to %d DIDs", len(allowedCommunityCreators)) 198 } else { 199 log.Println("Community creation open to all authenticated users") 200 } 201 202 // V2.0: Initialize PDS account provisioner for communities (simplified) 203 // PDS handles all DID and key generation - no Coves-side cryptography needed 204 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 205 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 206 log.Printf(" - Communities will be created at: %s", defaultPDS) 207 log.Printf(" - PDS will generate and manage all DIDs and keys") 208 209 // Initialize community service (no longer needs didGenerator directly) 210 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 211 212 // Authenticate Coves instance with PDS to enable community record writes 213 // The instance needs a PDS account to write community records it owns 214 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 215 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 216 if pdsHandle != "" && pdsPassword != "" { 217 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 218 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 219 if authErr != nil { 220 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 221 log.Println("Community creation will fail until PDS authentication is configured") 222 } else { 223 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 224 svc.SetPDSAccessToken(accessToken) 225 log.Println("✓ Coves instance authenticated with PDS") 226 } 227 } 228 } else { 229 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 230 log.Println("Community creation via write-forward is disabled") 231 } 232 233 // Start Jetstream consumer for read-forward user indexing 234 jetstreamURL := os.Getenv("JETSTREAM_URL") 235 if jetstreamURL == "" { 236 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 237 } 238 239 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 240 241 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 242 ctx := context.Background() 243 go func() { 244 if startErr := userConsumer.Start(ctx); startErr != nil { 245 log.Printf("Jetstream consumer stopped: %v", startErr) 246 } 247 }() 248 249 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 250 251 // Start Jetstream consumer for community events (profiles and subscriptions) 252 // This consumer indexes: 253 // 1. Community profiles (social.coves.community.profile) - in community's own repo 254 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 255 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 256 if communityJetstreamURL == "" { 257 // Local Jetstream for communities - filter to our instance's collections 258 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 259 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 260 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 261 } 262 263 // Initialize community event consumer with did:web verification 264 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true" 265 if skipDIDWebVerification { 266 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)") 267 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production") 268 } 269 270 // Pass identity resolver to consumer for PLC handle resolution (source of truth) 271 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver) 272 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 273 274 go func() { 275 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 276 log.Printf("Community Jetstream consumer stopped: %v", startErr) 277 } 278 }() 279 280 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 281 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 282 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 283 284 // Start JWKS cache cleanup background job 285 go func() { 286 ticker := time.NewTicker(1 * time.Hour) 287 defer ticker.Stop() 288 for range ticker.C { 289 jwksFetcher.CleanupExpiredCache() 290 log.Println("JWKS cache cleanup completed") 291 } 292 }() 293 294 log.Println("Started JWKS cache cleanup background job (runs hourly)") 295 296 // Initialize aggregator service 297 aggregatorRepo := postgresRepo.NewAggregatorRepository(db) 298 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 299 log.Println("✅ Aggregator service initialized") 300 301 // Initialize unfurl cache repository 302 unfurlRepo := unfurl.NewRepository(db) 303 304 // Initialize blob upload service 305 blobService := blobs.NewBlobService(defaultPDS) 306 307 // Initialize unfurl service with configuration 308 unfurlService := unfurl.NewService( 309 unfurlRepo, 310 unfurl.WithTimeout(10*time.Second), 311 unfurl.WithUserAgent("CovesBot/1.0 (+https://coves.social)"), 312 unfurl.WithCacheTTL(24*time.Hour), 313 ) 314 log.Println("✅ Unfurl and blob services initialized") 315 316 // Initialize post service (with aggregator support) 317 postRepo := postgresRepo.NewPostRepository(db) 318 postService := posts.NewPostService(postRepo, communityService, aggregatorService, blobService, unfurlService, defaultPDS) 319 320 // Initialize vote repository (used by Jetstream consumer for indexing) 321 voteRepo := postgresRepo.NewVoteRepository(db) 322 log.Println("✅ Vote repository initialized (Jetstream indexing only)") 323 324 // Initialize comment repository (used by Jetstream consumer for indexing) 325 commentRepo := postgresRepo.NewCommentRepository(db) 326 log.Println("✅ Comment repository initialized (Jetstream indexing only)") 327 328 // Initialize comment service (for query API) 329 // Requires user and community repos for proper author/community hydration per lexicon 330 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo) 331 log.Println("✅ Comment service initialized (with author/community hydration)") 332 333 // Initialize feed service 334 feedRepo := postgresRepo.NewCommunityFeedRepository(db, cursorSecret) 335 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 336 log.Println("✅ Feed service initialized") 337 338 // Initialize timeline service (home feed from subscribed communities) 339 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret) 340 timelineService := timeline.NewTimelineService(timelineRepo) 341 log.Println("✅ Timeline service initialized") 342 343 // Initialize discover service (public feed from all communities) 344 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret) 345 discoverService := discover.NewDiscoverService(discoverRepo) 346 log.Println("✅ Discover service initialized") 347 348 // Start Jetstream consumer for posts 349 // This consumer indexes posts created in community repositories via the firehose 350 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist 351 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL") 352 if postJetstreamURL == "" { 353 // Listen to post record creation events 354 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post" 355 } 356 357 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 358 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL) 359 360 go func() { 361 if startErr := postJetstreamConnector.Start(ctx); startErr != nil { 362 log.Printf("Post Jetstream consumer stopped: %v", startErr) 363 } 364 }() 365 366 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL) 367 log.Println(" - Indexing: social.coves.community.post CREATE operations") 368 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented") 369 370 // Start Jetstream consumer for aggregators 371 // This consumer indexes aggregator service declarations and authorization records 372 // Following Bluesky's pattern for feed generators and labelers 373 // NOTE: Uses the same Jetstream as communities, just filtering different collections 374 aggregatorJetstreamURL := communityJetstreamURL 375 // Override if specific URL needed for testing 376 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" { 377 aggregatorJetstreamURL = envURL 378 } else if aggregatorJetstreamURL == "" { 379 // Fallback if community URL also not set 380 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization" 381 } 382 383 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 384 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL) 385 386 go func() { 387 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil { 388 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr) 389 } 390 }() 391 392 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL) 393 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)") 394 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)") 395 396 // Start Jetstream consumer for votes 397 // This consumer indexes votes from user repositories and updates post vote counts 398 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL") 399 if voteJetstreamURL == "" { 400 // Listen to vote record CREATE/DELETE events from user repositories 401 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote" 402 } 403 404 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db) 405 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL) 406 407 go func() { 408 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil { 409 log.Printf("Vote Jetstream consumer stopped: %v", startErr) 410 } 411 }() 412 413 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL) 414 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations") 415 log.Println(" - Updating: Post vote counts atomically") 416 417 // Start Jetstream consumer for comments 418 // This consumer indexes comments from user repositories and updates parent counts 419 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL") 420 if commentJetstreamURL == "" { 421 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories 422 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.comment" 423 } 424 425 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 426 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL) 427 428 go func() { 429 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil { 430 log.Printf("Comment Jetstream consumer stopped: %v", startErr) 431 } 432 }() 433 434 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL) 435 log.Println(" - Indexing: social.coves.community.comment CREATE/UPDATE/DELETE operations") 436 log.Println(" - Updating: Post comment counts and comment reply counts atomically") 437 438 // Register XRPC routes 439 routes.RegisterUserRoutes(r, userService) 440 routes.RegisterCommunityRoutes(r, communityService, authMiddleware, allowedCommunityCreators) 441 log.Println("Community XRPC endpoints registered with OAuth authentication") 442 443 routes.RegisterPostRoutes(r, postService, authMiddleware) 444 log.Println("Post XRPC endpoints registered with OAuth authentication") 445 446 // Vote write endpoints removed - clients write directly to their PDS 447 // The AppView indexes votes from Jetstream (see vote consumer above) 448 449 routes.RegisterCommunityFeedRoutes(r, feedService) 450 log.Println("Feed XRPC endpoints registered (public, no auth required)") 451 452 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware) 453 log.Println("Timeline XRPC endpoints registered (requires authentication)") 454 455 routes.RegisterDiscoverRoutes(r, discoverService) 456 log.Println("Discover XRPC endpoints registered (public, no auth required)") 457 458 routes.RegisterAggregatorRoutes(r, aggregatorService, userService, identityResolver) 459 log.Println("Aggregator XRPC endpoints registered (query endpoints public, registration endpoint public)") 460 461 // Comment query API - supports optional authentication for viewer state 462 // Stricter rate limiting for expensive nested comment queries 463 commentRateLimiter := middleware.NewRateLimiter(20, 1*time.Minute) 464 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService) 465 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter) 466 r.Handle( 467 "/xrpc/social.coves.community.comment.getComments", 468 commentRateLimiter.Middleware( 469 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments), 470 ), 471 ) 472 log.Println("✅ Comment query API registered (20 req/min rate limit)") 473 log.Println(" - GET /xrpc/social.coves.community.comment.getComments") 474 475 // Health check endpoints 476 healthHandler := func(w http.ResponseWriter, r *http.Request) { 477 w.WriteHeader(http.StatusOK) 478 if _, err := w.Write([]byte("OK")); err != nil { 479 log.Printf("Failed to write health check response: %v", err) 480 } 481 } 482 r.Get("/health", healthHandler) 483 r.Get("/xrpc/_health", healthHandler) 484 485 // Check PORT first (docker-compose), then APPVIEW_PORT (legacy) 486 port := os.Getenv("PORT") 487 if port == "" { 488 port = os.Getenv("APPVIEW_PORT") 489 } 490 if port == "" { 491 port = "8080" 492 } 493 494 fmt.Printf("Coves AppView starting on port %s\n", port) 495 fmt.Printf("Default PDS: %s\n", defaultPDS) 496 log.Fatal(http.ListenAndServe(":"+port, r)) 497} 498 499// authenticateWithPDS creates a session on the PDS and returns an access token 500func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 501 type CreateSessionRequest struct { 502 Identifier string `json:"identifier"` 503 Password string `json:"password"` 504 } 505 506 type CreateSessionResponse struct { 507 DID string `json:"did"` 508 Handle string `json:"handle"` 509 AccessJwt string `json:"accessJwt"` 510 } 511 512 reqBody, err := json.Marshal(CreateSessionRequest{ 513 Identifier: handle, 514 Password: password, 515 }) 516 if err != nil { 517 return "", fmt.Errorf("failed to marshal request: %w", err) 518 } 519 520 resp, err := http.Post( 521 pdsURL+"/xrpc/com.atproto.server.createSession", 522 "application/json", 523 bytes.NewReader(reqBody), 524 ) 525 if err != nil { 526 return "", fmt.Errorf("failed to call PDS: %w", err) 527 } 528 defer func() { 529 if closeErr := resp.Body.Close(); closeErr != nil { 530 log.Printf("Failed to close response body: %v", closeErr) 531 } 532 }() 533 534 if resp.StatusCode != http.StatusOK { 535 body, readErr := io.ReadAll(resp.Body) 536 if readErr != nil { 537 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 538 } 539 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 540 } 541 542 var session CreateSessionResponse 543 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 544 return "", fmt.Errorf("failed to decode response: %w", err) 545 } 546 547 return session.AccessJwt, nil 548}