A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/api/routes" 6 "Coves/internal/atproto/auth" 7 "Coves/internal/atproto/identity" 8 "Coves/internal/atproto/jetstream" 9 "Coves/internal/core/aggregators" 10 "Coves/internal/core/blobs" 11 "Coves/internal/core/comments" 12 "Coves/internal/core/communities" 13 "Coves/internal/core/communityFeeds" 14 "Coves/internal/core/discover" 15 "Coves/internal/core/posts" 16 "Coves/internal/core/timeline" 17 "Coves/internal/core/unfurl" 18 "Coves/internal/core/users" 19 "bytes" 20 "context" 21 "database/sql" 22 "encoding/json" 23 "fmt" 24 "io" 25 "log" 26 "net/http" 27 "os" 28 "strings" 29 "time" 30 31 "github.com/go-chi/chi/v5" 32 chiMiddleware "github.com/go-chi/chi/v5/middleware" 33 _ "github.com/lib/pq" 34 "github.com/pressly/goose/v3" 35 36 commentsAPI "Coves/internal/api/handlers/comments" 37 38 postgresRepo "Coves/internal/db/postgres" 39 40 indigoIdentity "github.com/bluesky-social/indigo/atproto/identity" 41) 42 43func main() { 44 // Database configuration (AppView database) 45 dbURL := os.Getenv("DATABASE_URL") 46 if dbURL == "" { 47 // Use dev database from .env.dev 48 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 49 } 50 51 // Default PDS URL for this Coves instance (supports self-hosting) 52 defaultPDS := os.Getenv("PDS_URL") 53 if defaultPDS == "" { 54 defaultPDS = "http://localhost:3001" // Local dev PDS 55 } 56 57 // Cursor secret for HMAC signing (prevents cursor manipulation) 58 cursorSecret := os.Getenv("CURSOR_SECRET") 59 if cursorSecret == "" { 60 // Generate a random secret if not set (dev mode) 61 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value 62 cursorSecret = "dev-cursor-secret-change-in-production" 63 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!") 64 } 65 66 db, err := sql.Open("postgres", dbURL) 67 if err != nil { 68 log.Fatal("Failed to connect to database:", err) 69 } 70 defer func() { 71 if closeErr := db.Close(); closeErr != nil { 72 log.Printf("Failed to close database connection: %v", closeErr) 73 } 74 }() 75 76 if err = db.Ping(); err != nil { 77 log.Fatal("Failed to ping database:", err) 78 } 79 80 log.Println("Connected to AppView database") 81 82 // Run migrations 83 if err = goose.SetDialect("postgres"); err != nil { 84 log.Fatal("Failed to set goose dialect:", err) 85 } 86 87 if err = goose.Up(db, "internal/db/migrations"); err != nil { 88 log.Fatal("Failed to run migrations:", err) 89 } 90 91 log.Println("Migrations completed successfully") 92 93 r := chi.NewRouter() 94 95 r.Use(chiMiddleware.Logger) 96 r.Use(chiMiddleware.Recoverer) 97 r.Use(chiMiddleware.RequestID) 98 99 // Rate limiting: 100 requests per minute per IP 100 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 101 r.Use(rateLimiter.Middleware) 102 103 // Initialize identity resolver 104 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 105 // directory as DID registration to ensure E2E tests work without hitting 106 // the production plc.directory 107 identityConfig := identity.DefaultConfig() 108 109 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 110 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 111 if plcDirectoryURL == "" { 112 plcDirectoryURL = "https://plc.directory" // Default to production PLC 113 } 114 115 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 116 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 117 if isDevEnv { 118 identityConfig.PLCURL = plcDirectoryURL 119 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 120 } else { 121 // Production: Allow separate IDENTITY_PLC_URL for read operations 122 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 123 identityConfig.PLCURL = identityPLCURL 124 } else { 125 identityConfig.PLCURL = plcDirectoryURL 126 } 127 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 128 } 129 130 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 131 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 132 identityConfig.CacheTTL = duration 133 } 134 } 135 136 identityResolver := identity.NewResolver(db, identityConfig) 137 138 // Initialize atProto auth middleware for JWT validation 139 // Phase 1: Set skipVerify=true to test JWT parsing only 140 // Phase 2: Set skipVerify=false to enable full signature verification 141 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true" 142 if skipVerify { 143 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)") 144 log.Println(" Set AUTH_SKIP_VERIFY=false for production") 145 } 146 147 // Initialize Indigo directory for DID resolution (used by auth) 148 plcURL := os.Getenv("PLC_DIRECTORY_URL") 149 if plcURL == "" { 150 plcURL = "https://plc.directory" 151 } 152 indigoDir := &indigoIdentity.BaseDirectory{ 153 PLCURL: plcURL, 154 HTTPClient: http.Client{Timeout: 10 * time.Second}, 155 } 156 157 // Initialize JWT config early to cache HS256_ISSUERS and PDS_JWT_SECRET 158 // This avoids reading env vars on every request 159 auth.InitJWTConfig() 160 161 // Create combined key fetcher for both DID and URL issuers 162 // - DID issuers (did:plc:, did:web:) → resolved via DID document keys (ES256) 163 // - URL issuers → JWKS endpoint (fallback for legacy tokens) 164 jwksCacheTTL := 1 * time.Hour 165 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL) 166 keyFetcher := auth.NewCombinedKeyFetcher(indigoDir, jwksFetcher) 167 168 authMiddleware := middleware.NewAtProtoAuthMiddleware(keyFetcher, skipVerify) 169 log.Println("✅ atProto auth middleware initialized (DID + JWKS key resolution)") 170 171 // Initialize repositories and services 172 userRepo := postgresRepo.NewUserRepository(db) 173 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 174 175 communityRepo := postgresRepo.NewCommunityRepository(db) 176 177 // V2.0: PDS-managed DID generation 178 // Community DIDs and keys are generated entirely by the PDS 179 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 180 181 instanceDID := os.Getenv("INSTANCE_DID") 182 if instanceDID == "" { 183 instanceDID = "did:web:coves.social" // Default for development 184 } 185 186 // V2: Extract instance domain for community handles 187 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 188 // We cannot allow arbitrary domains to prevent impersonation attacks 189 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 190 // 191 // SECURITY: did:web domain verification is implemented in the Jetstream consumer 192 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim() 193 // Communities with mismatched hostedBy domains are rejected during indexing 194 var instanceDomain string 195 if strings.HasPrefix(instanceDID, "did:web:") { 196 // Extract domain from did:web (this is the authoritative source) 197 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 198 } else { 199 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 200 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 201 if instanceDomain == "" { 202 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 203 } 204 } 205 206 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 207 208 // Community creation restriction - if set, only these DIDs can create communities 209 var allowedCommunityCreators []string 210 if communityCreators := os.Getenv("COMMUNITY_CREATORS"); communityCreators != "" { 211 for _, did := range strings.Split(communityCreators, ",") { 212 did = strings.TrimSpace(did) 213 if did != "" { 214 allowedCommunityCreators = append(allowedCommunityCreators, did) 215 } 216 } 217 log.Printf("Community creation restricted to %d DIDs", len(allowedCommunityCreators)) 218 } else { 219 log.Println("Community creation open to all authenticated users") 220 } 221 222 // V2.0: Initialize PDS account provisioner for communities (simplified) 223 // PDS handles all DID and key generation - no Coves-side cryptography needed 224 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 225 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 226 log.Printf(" - Communities will be created at: %s", defaultPDS) 227 log.Printf(" - PDS will generate and manage all DIDs and keys") 228 229 // Initialize community service (no longer needs didGenerator directly) 230 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 231 232 // Authenticate Coves instance with PDS to enable community record writes 233 // The instance needs a PDS account to write community records it owns 234 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 235 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 236 if pdsHandle != "" && pdsPassword != "" { 237 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 238 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 239 if authErr != nil { 240 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 241 log.Println("Community creation will fail until PDS authentication is configured") 242 } else { 243 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 244 svc.SetPDSAccessToken(accessToken) 245 log.Println("✓ Coves instance authenticated with PDS") 246 } 247 } 248 } else { 249 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 250 log.Println("Community creation via write-forward is disabled") 251 } 252 253 // Start Jetstream consumer for read-forward user indexing 254 jetstreamURL := os.Getenv("JETSTREAM_URL") 255 if jetstreamURL == "" { 256 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 257 } 258 259 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 260 261 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 262 ctx := context.Background() 263 go func() { 264 if startErr := userConsumer.Start(ctx); startErr != nil { 265 log.Printf("Jetstream consumer stopped: %v", startErr) 266 } 267 }() 268 269 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 270 271 // Start Jetstream consumer for community events (profiles and subscriptions) 272 // This consumer indexes: 273 // 1. Community profiles (social.coves.community.profile) - in community's own repo 274 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 275 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 276 if communityJetstreamURL == "" { 277 // Local Jetstream for communities - filter to our instance's collections 278 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 279 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 280 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 281 } 282 283 // Initialize community event consumer with did:web verification 284 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true" 285 if skipDIDWebVerification { 286 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)") 287 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production") 288 } 289 290 // Pass identity resolver to consumer for PLC handle resolution (source of truth) 291 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver) 292 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 293 294 go func() { 295 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 296 log.Printf("Community Jetstream consumer stopped: %v", startErr) 297 } 298 }() 299 300 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 301 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 302 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 303 304 // Start JWKS cache cleanup background job 305 go func() { 306 ticker := time.NewTicker(1 * time.Hour) 307 defer ticker.Stop() 308 for range ticker.C { 309 jwksFetcher.CleanupExpiredCache() 310 log.Println("JWKS cache cleanup completed") 311 } 312 }() 313 314 log.Println("Started JWKS cache cleanup background job (runs hourly)") 315 316 // Initialize aggregator service 317 aggregatorRepo := postgresRepo.NewAggregatorRepository(db) 318 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 319 log.Println("✅ Aggregator service initialized") 320 321 // Initialize unfurl cache repository 322 unfurlRepo := unfurl.NewRepository(db) 323 324 // Initialize blob upload service 325 blobService := blobs.NewBlobService(defaultPDS) 326 327 // Initialize unfurl service with configuration 328 unfurlService := unfurl.NewService( 329 unfurlRepo, 330 unfurl.WithTimeout(10*time.Second), 331 unfurl.WithUserAgent("CovesBot/1.0 (+https://coves.social)"), 332 unfurl.WithCacheTTL(24*time.Hour), 333 ) 334 log.Println("✅ Unfurl and blob services initialized") 335 336 // Initialize post service (with aggregator support) 337 postRepo := postgresRepo.NewPostRepository(db) 338 postService := posts.NewPostService(postRepo, communityService, aggregatorService, blobService, unfurlService, defaultPDS) 339 340 // Initialize vote repository (used by Jetstream consumer for indexing) 341 voteRepo := postgresRepo.NewVoteRepository(db) 342 log.Println("✅ Vote repository initialized (Jetstream indexing only)") 343 344 // Initialize comment repository (used by Jetstream consumer for indexing) 345 commentRepo := postgresRepo.NewCommentRepository(db) 346 log.Println("✅ Comment repository initialized (Jetstream indexing only)") 347 348 // Initialize comment service (for query API) 349 // Requires user and community repos for proper author/community hydration per lexicon 350 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo) 351 log.Println("✅ Comment service initialized (with author/community hydration)") 352 353 // Initialize feed service 354 feedRepo := postgresRepo.NewCommunityFeedRepository(db, cursorSecret) 355 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 356 log.Println("✅ Feed service initialized") 357 358 // Initialize timeline service (home feed from subscribed communities) 359 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret) 360 timelineService := timeline.NewTimelineService(timelineRepo) 361 log.Println("✅ Timeline service initialized") 362 363 // Initialize discover service (public feed from all communities) 364 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret) 365 discoverService := discover.NewDiscoverService(discoverRepo) 366 log.Println("✅ Discover service initialized") 367 368 // Start Jetstream consumer for posts 369 // This consumer indexes posts created in community repositories via the firehose 370 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist 371 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL") 372 if postJetstreamURL == "" { 373 // Listen to post record creation events 374 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post" 375 } 376 377 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 378 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL) 379 380 go func() { 381 if startErr := postJetstreamConnector.Start(ctx); startErr != nil { 382 log.Printf("Post Jetstream consumer stopped: %v", startErr) 383 } 384 }() 385 386 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL) 387 log.Println(" - Indexing: social.coves.community.post CREATE operations") 388 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented") 389 390 // Start Jetstream consumer for aggregators 391 // This consumer indexes aggregator service declarations and authorization records 392 // Following Bluesky's pattern for feed generators and labelers 393 // NOTE: Uses the same Jetstream as communities, just filtering different collections 394 aggregatorJetstreamURL := communityJetstreamURL 395 // Override if specific URL needed for testing 396 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" { 397 aggregatorJetstreamURL = envURL 398 } else if aggregatorJetstreamURL == "" { 399 // Fallback if community URL also not set 400 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization" 401 } 402 403 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 404 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL) 405 406 go func() { 407 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil { 408 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr) 409 } 410 }() 411 412 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL) 413 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)") 414 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)") 415 416 // Start Jetstream consumer for votes 417 // This consumer indexes votes from user repositories and updates post vote counts 418 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL") 419 if voteJetstreamURL == "" { 420 // Listen to vote record CREATE/DELETE events from user repositories 421 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote" 422 } 423 424 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db) 425 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL) 426 427 go func() { 428 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil { 429 log.Printf("Vote Jetstream consumer stopped: %v", startErr) 430 } 431 }() 432 433 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL) 434 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations") 435 log.Println(" - Updating: Post vote counts atomically") 436 437 // Start Jetstream consumer for comments 438 // This consumer indexes comments from user repositories and updates parent counts 439 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL") 440 if commentJetstreamURL == "" { 441 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories 442 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.comment" 443 } 444 445 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 446 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL) 447 448 go func() { 449 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil { 450 log.Printf("Comment Jetstream consumer stopped: %v", startErr) 451 } 452 }() 453 454 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL) 455 log.Println(" - Indexing: social.coves.community.comment CREATE/UPDATE/DELETE operations") 456 log.Println(" - Updating: Post comment counts and comment reply counts atomically") 457 458 // Register XRPC routes 459 routes.RegisterUserRoutes(r, userService) 460 routes.RegisterCommunityRoutes(r, communityService, authMiddleware, allowedCommunityCreators) 461 log.Println("Community XRPC endpoints registered with OAuth authentication") 462 463 routes.RegisterPostRoutes(r, postService, authMiddleware) 464 log.Println("Post XRPC endpoints registered with OAuth authentication") 465 466 // Vote write endpoints removed - clients write directly to their PDS 467 // The AppView indexes votes from Jetstream (see vote consumer above) 468 469 routes.RegisterCommunityFeedRoutes(r, feedService) 470 log.Println("Feed XRPC endpoints registered (public, no auth required)") 471 472 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware) 473 log.Println("Timeline XRPC endpoints registered (requires authentication)") 474 475 routes.RegisterDiscoverRoutes(r, discoverService) 476 log.Println("Discover XRPC endpoints registered (public, no auth required)") 477 478 routes.RegisterAggregatorRoutes(r, aggregatorService, userService, identityResolver) 479 log.Println("Aggregator XRPC endpoints registered (query endpoints public, registration endpoint public)") 480 481 // Comment query API - supports optional authentication for viewer state 482 // Stricter rate limiting for expensive nested comment queries 483 commentRateLimiter := middleware.NewRateLimiter(20, 1*time.Minute) 484 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService) 485 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter) 486 r.Handle( 487 "/xrpc/social.coves.community.comment.getComments", 488 commentRateLimiter.Middleware( 489 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments), 490 ), 491 ) 492 log.Println("✅ Comment query API registered (20 req/min rate limit)") 493 log.Println(" - GET /xrpc/social.coves.community.comment.getComments") 494 495 // Health check endpoints 496 healthHandler := func(w http.ResponseWriter, r *http.Request) { 497 w.WriteHeader(http.StatusOK) 498 if _, err := w.Write([]byte("OK")); err != nil { 499 log.Printf("Failed to write health check response: %v", err) 500 } 501 } 502 r.Get("/health", healthHandler) 503 r.Get("/xrpc/_health", healthHandler) 504 505 // Check PORT first (docker-compose), then APPVIEW_PORT (legacy) 506 port := os.Getenv("PORT") 507 if port == "" { 508 port = os.Getenv("APPVIEW_PORT") 509 } 510 if port == "" { 511 port = "8080" 512 } 513 514 fmt.Printf("Coves AppView starting on port %s\n", port) 515 fmt.Printf("Default PDS: %s\n", defaultPDS) 516 log.Fatal(http.ListenAndServe(":"+port, r)) 517} 518 519// authenticateWithPDS creates a session on the PDS and returns an access token 520func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 521 type CreateSessionRequest struct { 522 Identifier string `json:"identifier"` 523 Password string `json:"password"` 524 } 525 526 type CreateSessionResponse struct { 527 DID string `json:"did"` 528 Handle string `json:"handle"` 529 AccessJwt string `json:"accessJwt"` 530 } 531 532 reqBody, err := json.Marshal(CreateSessionRequest{ 533 Identifier: handle, 534 Password: password, 535 }) 536 if err != nil { 537 return "", fmt.Errorf("failed to marshal request: %w", err) 538 } 539 540 resp, err := http.Post( 541 pdsURL+"/xrpc/com.atproto.server.createSession", 542 "application/json", 543 bytes.NewReader(reqBody), 544 ) 545 if err != nil { 546 return "", fmt.Errorf("failed to call PDS: %w", err) 547 } 548 defer func() { 549 if closeErr := resp.Body.Close(); closeErr != nil { 550 log.Printf("Failed to close response body: %v", closeErr) 551 } 552 }() 553 554 if resp.StatusCode != http.StatusOK { 555 body, readErr := io.ReadAll(resp.Body) 556 if readErr != nil { 557 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 558 } 559 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 560 } 561 562 var session CreateSessionResponse 563 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 564 return "", fmt.Errorf("failed to decode response: %w", err) 565 } 566 567 return session.AccessJwt, nil 568}