A community based topic aggregation platform built on atproto
1package communities 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log" 10 "net/http" 11 "regexp" 12 "strings" 13 "time" 14) 15 16// Community handle validation regex (DNS-valid handle: name.communities.instance.com) 17// Matches standard DNS hostname format (RFC 1035) 18var communityHandleRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 19 20type communityService struct { 21 repo Repository 22 provisioner *PDSAccountProvisioner 23 pdsURL string 24 instanceDID string 25 instanceDomain string 26 pdsAccessToken string 27} 28 29// NewCommunityService creates a new community service 30func NewCommunityService(repo Repository, pdsURL, instanceDID, instanceDomain string, provisioner *PDSAccountProvisioner) Service { 31 // SECURITY: Basic validation that did:web domain matches configured instanceDomain 32 // This catches honest configuration mistakes but NOT malicious code modifications 33 // Full verification (Phase 2) requires fetching DID document from domain 34 // See: docs/PRD_BACKLOG.md - "did:web Domain Verification" 35 if strings.HasPrefix(instanceDID, "did:web:") { 36 didDomain := strings.TrimPrefix(instanceDID, "did:web:") 37 if didDomain != instanceDomain { 38 log.Printf("⚠️ SECURITY WARNING: Instance DID domain (%s) doesn't match configured domain (%s)", 39 didDomain, instanceDomain) 40 log.Printf(" This could indicate a configuration error or potential domain spoofing attempt") 41 log.Printf(" Communities will be hosted by: %s", instanceDID) 42 } 43 } 44 45 return &communityService{ 46 repo: repo, 47 pdsURL: pdsURL, 48 instanceDID: instanceDID, 49 instanceDomain: instanceDomain, 50 provisioner: provisioner, 51 } 52} 53 54// SetPDSAccessToken sets the PDS access token for authentication 55// This should be called after creating a session for the Coves instance DID on the PDS 56func (s *communityService) SetPDSAccessToken(token string) { 57 s.pdsAccessToken = token 58} 59 60// CreateCommunity creates a new community via write-forward to PDS 61// V2 Flow: 62// 1. Service creates PDS account for community (PDS generates signing keypair) 63// 2. Service writes community profile to COMMUNITY's own repository 64// 3. Firehose emits event 65// 4. Consumer indexes to AppView DB 66// 67// V2 Architecture: 68// - Community owns its own repository (at://community_did/social.coves.community.profile/self) 69// - PDS manages the signing keypair (we never see it) 70// - We store PDS credentials to act on behalf of the community 71// - Community can migrate to other instances (future V2.1 with rotation keys) 72func (s *communityService) CreateCommunity(ctx context.Context, req CreateCommunityRequest) (*Community, error) { 73 // Apply defaults before validation 74 if req.Visibility == "" { 75 req.Visibility = "public" 76 } 77 78 // SECURITY: Auto-populate hostedByDID from instance configuration 79 // Clients MUST NOT provide this field - it's derived from the instance receiving the request 80 // This prevents malicious instances from claiming to host communities for domains they don't own 81 req.HostedByDID = s.instanceDID 82 83 // Validate request 84 if err := s.validateCreateRequest(req); err != nil { 85 return nil, err 86 } 87 88 // V2: Provision a real PDS account for this community 89 // This calls com.atproto.server.createAccount internally 90 // The PDS will: 91 // 1. Generate a signing keypair (stored in PDS, we never see it) 92 // 2. Create a DID (did:plc:xxx) 93 // 3. Return credentials (DID, tokens) 94 pdsAccount, err := s.provisioner.ProvisionCommunityAccount(ctx, req.Name) 95 if err != nil { 96 return nil, fmt.Errorf("failed to provision PDS account for community: %w", err) 97 } 98 99 // Validate the atProto handle 100 if validateErr := s.ValidateHandle(pdsAccount.Handle); validateErr != nil { 101 return nil, fmt.Errorf("generated atProto handle is invalid: %w", validateErr) 102 } 103 104 // Build community profile record 105 profile := map[string]interface{}{ 106 "$type": "social.coves.community.profile", 107 "handle": pdsAccount.Handle, // atProto handle (e.g., gaming.communities.coves.social) 108 "name": req.Name, // Short name for !mentions (e.g., "gaming") 109 "visibility": req.Visibility, 110 "hostedBy": s.instanceDID, // V2: Instance hosts, community owns 111 "createdBy": req.CreatedByDID, 112 "createdAt": time.Now().Format(time.RFC3339), 113 "federation": map[string]interface{}{ 114 "allowExternalDiscovery": req.AllowExternalDiscovery, 115 }, 116 } 117 118 // Add optional fields 119 if req.DisplayName != "" { 120 profile["displayName"] = req.DisplayName 121 } 122 if req.Description != "" { 123 profile["description"] = req.Description 124 } 125 if len(req.Rules) > 0 { 126 profile["rules"] = req.Rules 127 } 128 if len(req.Categories) > 0 { 129 profile["categories"] = req.Categories 130 } 131 if req.Language != "" { 132 profile["language"] = req.Language 133 } 134 135 // Initialize counts 136 profile["memberCount"] = 0 137 profile["subscriberCount"] = 0 138 139 // TODO: Handle avatar and banner blobs 140 // For now, we'll skip blob uploads. This would require: 141 // 1. Upload blob to PDS via com.atproto.repo.uploadBlob 142 // 2. Get blob ref (CID) 143 // 3. Add to profile record 144 145 // V2: Write to COMMUNITY's own repository (not instance repo!) 146 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self 147 // Authenticate using community's access token 148 recordURI, recordCID, err := s.createRecordOnPDSAs( 149 ctx, 150 pdsAccount.DID, // repo = community's DID (community owns its repo!) 151 "social.coves.community.profile", 152 "self", // canonical rkey for profile 153 profile, 154 pdsAccount.AccessToken, // authenticate as the community 155 ) 156 if err != nil { 157 return nil, fmt.Errorf("failed to create community profile record: %w", err) 158 } 159 160 // Build Community object with PDS credentials AND cryptographic keys 161 community := &Community{ 162 DID: pdsAccount.DID, // Community's DID (owns the repo!) 163 Handle: pdsAccount.Handle, // atProto handle (e.g., gaming.communities.coves.social) 164 Name: req.Name, 165 DisplayName: req.DisplayName, 166 Description: req.Description, 167 OwnerDID: pdsAccount.DID, // V2: Community owns itself 168 CreatedByDID: req.CreatedByDID, 169 HostedByDID: req.HostedByDID, 170 PDSEmail: pdsAccount.Email, 171 PDSPassword: pdsAccount.Password, 172 PDSAccessToken: pdsAccount.AccessToken, 173 PDSRefreshToken: pdsAccount.RefreshToken, 174 PDSURL: pdsAccount.PDSURL, 175 Visibility: req.Visibility, 176 AllowExternalDiscovery: req.AllowExternalDiscovery, 177 MemberCount: 0, 178 SubscriberCount: 0, 179 CreatedAt: time.Now(), 180 UpdatedAt: time.Now(), 181 RecordURI: recordURI, 182 RecordCID: recordCID, 183 // V2: Cryptographic keys for portability (will be encrypted by repository) 184 RotationKeyPEM: pdsAccount.RotationKeyPEM, // CRITICAL: Enables DID migration 185 SigningKeyPEM: pdsAccount.SigningKeyPEM, // For atproto operations 186 } 187 188 // CRITICAL: Persist PDS credentials immediately to database 189 // The Jetstream consumer will eventually index the community profile from the firehose, 190 // but it won't have the PDS credentials. We must store them now so we can: 191 // 1. Update the community profile later (using its own credentials) 192 // 2. Re-authenticate if access tokens expire 193 _, err = s.repo.Create(ctx, community) 194 if err != nil { 195 return nil, fmt.Errorf("failed to persist community with credentials: %w", err) 196 } 197 198 return community, nil 199} 200 201// GetCommunity retrieves a community from AppView DB 202// identifier can be either a DID or handle 203func (s *communityService) GetCommunity(ctx context.Context, identifier string) (*Community, error) { 204 if identifier == "" { 205 return nil, ErrInvalidInput 206 } 207 208 // Determine if identifier is DID or handle 209 if strings.HasPrefix(identifier, "did:") { 210 return s.repo.GetByDID(ctx, identifier) 211 } 212 213 if strings.HasPrefix(identifier, "!") { 214 return s.repo.GetByHandle(ctx, identifier) 215 } 216 217 return nil, NewValidationError("identifier", "must be a DID or handle") 218} 219 220// UpdateCommunity updates a community via write-forward to PDS 221func (s *communityService) UpdateCommunity(ctx context.Context, req UpdateCommunityRequest) (*Community, error) { 222 if req.CommunityDID == "" { 223 return nil, NewValidationError("communityDid", "required") 224 } 225 226 if req.UpdatedByDID == "" { 227 return nil, NewValidationError("updatedByDid", "required") 228 } 229 230 // Get existing community 231 existing, err := s.repo.GetByDID(ctx, req.CommunityDID) 232 if err != nil { 233 return nil, err 234 } 235 236 // Authorization: verify user is the creator 237 // TODO(Communities-Auth): Add moderator check when moderation system is implemented 238 if existing.CreatedByDID != req.UpdatedByDID { 239 return nil, ErrUnauthorized 240 } 241 242 // Build updated profile record (start with existing) 243 profile := map[string]interface{}{ 244 "$type": "social.coves.community.profile", 245 "handle": existing.Handle, 246 "name": existing.Name, 247 "owner": existing.OwnerDID, 248 "createdBy": existing.CreatedByDID, 249 "hostedBy": existing.HostedByDID, 250 "createdAt": existing.CreatedAt.Format(time.RFC3339), 251 } 252 253 // Apply updates 254 if req.DisplayName != nil { 255 profile["displayName"] = *req.DisplayName 256 } else { 257 profile["displayName"] = existing.DisplayName 258 } 259 260 if req.Description != nil { 261 profile["description"] = *req.Description 262 } else { 263 profile["description"] = existing.Description 264 } 265 266 if req.Visibility != nil { 267 profile["visibility"] = *req.Visibility 268 } else { 269 profile["visibility"] = existing.Visibility 270 } 271 272 if req.AllowExternalDiscovery != nil { 273 profile["federation"] = map[string]interface{}{ 274 "allowExternalDiscovery": *req.AllowExternalDiscovery, 275 } 276 } else { 277 profile["federation"] = map[string]interface{}{ 278 "allowExternalDiscovery": existing.AllowExternalDiscovery, 279 } 280 } 281 282 // Preserve moderation settings (even if empty) 283 // These fields are optional but should not be erased on update 284 if req.ModerationType != nil { 285 profile["moderationType"] = *req.ModerationType 286 } else if existing.ModerationType != "" { 287 profile["moderationType"] = existing.ModerationType 288 } 289 290 if len(req.ContentWarnings) > 0 { 291 profile["contentWarnings"] = req.ContentWarnings 292 } else if len(existing.ContentWarnings) > 0 { 293 profile["contentWarnings"] = existing.ContentWarnings 294 } 295 296 // Preserve counts 297 profile["memberCount"] = existing.MemberCount 298 profile["subscriberCount"] = existing.SubscriberCount 299 300 // V2: Community profiles always use "self" as rkey 301 // (No need to extract from URI - it's always "self" for V2 communities) 302 303 // V2 CRITICAL FIX: Write-forward using COMMUNITY's own DID and credentials 304 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self 305 // Authenticate as the community (not as instance!) 306 if existing.PDSAccessToken == "" { 307 return nil, fmt.Errorf("community %s missing PDS credentials - cannot update", existing.DID) 308 } 309 310 recordURI, recordCID, err := s.putRecordOnPDSAs( 311 ctx, 312 existing.DID, // repo = community's own DID (V2!) 313 "social.coves.community.profile", 314 "self", // V2: always "self" 315 profile, 316 existing.PDSAccessToken, // authenticate as the community 317 ) 318 if err != nil { 319 return nil, fmt.Errorf("failed to update community on PDS: %w", err) 320 } 321 322 // Return updated community representation 323 // Actual AppView DB update happens via Jetstream consumer 324 updated := *existing 325 if req.DisplayName != nil { 326 updated.DisplayName = *req.DisplayName 327 } 328 if req.Description != nil { 329 updated.Description = *req.Description 330 } 331 if req.Visibility != nil { 332 updated.Visibility = *req.Visibility 333 } 334 if req.AllowExternalDiscovery != nil { 335 updated.AllowExternalDiscovery = *req.AllowExternalDiscovery 336 } 337 if req.ModerationType != nil { 338 updated.ModerationType = *req.ModerationType 339 } 340 if len(req.ContentWarnings) > 0 { 341 updated.ContentWarnings = req.ContentWarnings 342 } 343 updated.RecordURI = recordURI 344 updated.RecordCID = recordCID 345 updated.UpdatedAt = time.Now() 346 347 return &updated, nil 348} 349 350// ListCommunities queries AppView DB for communities with filters 351func (s *communityService) ListCommunities(ctx context.Context, req ListCommunitiesRequest) ([]*Community, int, error) { 352 // Set defaults 353 if req.Limit <= 0 || req.Limit > 100 { 354 req.Limit = 50 355 } 356 357 return s.repo.List(ctx, req) 358} 359 360// SearchCommunities performs fuzzy search in AppView DB 361func (s *communityService) SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error) { 362 if req.Query == "" { 363 return nil, 0, NewValidationError("query", "search query is required") 364 } 365 366 // Set defaults 367 if req.Limit <= 0 || req.Limit > 100 { 368 req.Limit = 50 369 } 370 371 return s.repo.Search(ctx, req) 372} 373 374// SubscribeToCommunity creates a subscription via write-forward to PDS 375func (s *communityService) SubscribeToCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) (*Subscription, error) { 376 if userDID == "" { 377 return nil, NewValidationError("userDid", "required") 378 } 379 if userAccessToken == "" { 380 return nil, NewValidationError("userAccessToken", "required") 381 } 382 383 // Resolve community identifier to DID 384 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 385 if err != nil { 386 return nil, err 387 } 388 389 // Verify community exists 390 community, err := s.repo.GetByDID(ctx, communityDID) 391 if err != nil { 392 return nil, err 393 } 394 395 // Check visibility - can't subscribe to private communities without invitation (TODO) 396 if community.Visibility == "private" { 397 return nil, ErrUnauthorized 398 } 399 400 // Build subscription record 401 subRecord := map[string]interface{}{ 402 "$type": "social.coves.community.subscribe", 403 "community": communityDID, 404 } 405 406 // Write-forward: create subscription record in user's repo using their access token 407 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.subscribe", "", subRecord, userAccessToken) 408 if err != nil { 409 return nil, fmt.Errorf("failed to create subscription on PDS: %w", err) 410 } 411 412 // Return subscription representation 413 subscription := &Subscription{ 414 UserDID: userDID, 415 CommunityDID: communityDID, 416 SubscribedAt: time.Now(), 417 RecordURI: recordURI, 418 RecordCID: recordCID, 419 } 420 421 return subscription, nil 422} 423 424// UnsubscribeFromCommunity removes a subscription via PDS delete 425func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error { 426 if userDID == "" { 427 return NewValidationError("userDid", "required") 428 } 429 if userAccessToken == "" { 430 return NewValidationError("userAccessToken", "required") 431 } 432 433 // Resolve community identifier 434 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 435 if err != nil { 436 return err 437 } 438 439 // Get the subscription from AppView to find the record key 440 subscription, err := s.repo.GetSubscription(ctx, userDID, communityDID) 441 if err != nil { 442 return err 443 } 444 445 // Extract rkey from record URI (at://did/collection/rkey) 446 rkey := extractRKeyFromURI(subscription.RecordURI) 447 if rkey == "" { 448 return fmt.Errorf("invalid subscription record URI") 449 } 450 451 // Write-forward: delete record from PDS using user's access token 452 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.subscribe", rkey, userAccessToken); err != nil { 453 return fmt.Errorf("failed to delete subscription on PDS: %w", err) 454 } 455 456 return nil 457} 458 459// GetUserSubscriptions queries AppView DB for user's subscriptions 460func (s *communityService) GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) { 461 if limit <= 0 || limit > 100 { 462 limit = 50 463 } 464 465 return s.repo.ListSubscriptions(ctx, userDID, limit, offset) 466} 467 468// GetCommunitySubscribers queries AppView DB for community subscribers 469func (s *communityService) GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) { 470 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 471 if err != nil { 472 return nil, err 473 } 474 475 if limit <= 0 || limit > 100 { 476 limit = 50 477 } 478 479 return s.repo.ListSubscribers(ctx, communityDID, limit, offset) 480} 481 482// GetMembership retrieves membership info from AppView DB 483func (s *communityService) GetMembership(ctx context.Context, userDID, communityIdentifier string) (*Membership, error) { 484 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 485 if err != nil { 486 return nil, err 487 } 488 489 return s.repo.GetMembership(ctx, userDID, communityDID) 490} 491 492// ListCommunityMembers queries AppView DB for members 493func (s *communityService) ListCommunityMembers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Membership, error) { 494 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 495 if err != nil { 496 return nil, err 497 } 498 499 if limit <= 0 || limit > 100 { 500 limit = 50 501 } 502 503 return s.repo.ListMembers(ctx, communityDID, limit, offset) 504} 505 506// ValidateHandle checks if a community handle is valid 507func (s *communityService) ValidateHandle(handle string) error { 508 if handle == "" { 509 return NewValidationError("handle", "required") 510 } 511 512 if !communityHandleRegex.MatchString(handle) { 513 return ErrInvalidHandle 514 } 515 516 return nil 517} 518 519// ResolveCommunityIdentifier converts a handle or DID to a DID 520func (s *communityService) ResolveCommunityIdentifier(ctx context.Context, identifier string) (string, error) { 521 if identifier == "" { 522 return "", ErrInvalidInput 523 } 524 525 // If it's already a DID, return it 526 if strings.HasPrefix(identifier, "did:") { 527 return identifier, nil 528 } 529 530 // If it's a handle, look it up in AppView DB 531 if strings.HasPrefix(identifier, "!") { 532 community, err := s.repo.GetByHandle(ctx, identifier) 533 if err != nil { 534 return "", err 535 } 536 return community.DID, nil 537 } 538 539 return "", NewValidationError("identifier", "must be a DID or handle") 540} 541 542// Validation helpers 543 544func (s *communityService) validateCreateRequest(req CreateCommunityRequest) error { 545 if req.Name == "" { 546 return NewValidationError("name", "required") 547 } 548 549 // DNS label limit: 63 characters per label 550 // Community handle format: {name}.communities.{instanceDomain} 551 // The first label is just req.Name, so it must be <= 63 chars 552 if len(req.Name) > 63 { 553 return NewValidationError("name", "must be 63 characters or less (DNS label limit)") 554 } 555 556 // Name can only contain alphanumeric and hyphens 557 // Must start and end with alphanumeric (not hyphen) 558 nameRegex := regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 559 if !nameRegex.MatchString(req.Name) { 560 return NewValidationError("name", "must contain only alphanumeric characters and hyphens") 561 } 562 563 if req.Description != "" && len(req.Description) > 3000 { 564 return NewValidationError("description", "must be 3000 characters or less") 565 } 566 567 // Visibility should already be set with default in CreateCommunity 568 if req.Visibility != "public" && req.Visibility != "unlisted" && req.Visibility != "private" { 569 return ErrInvalidVisibility 570 } 571 572 if req.CreatedByDID == "" { 573 return NewValidationError("createdByDid", "required") 574 } 575 576 // hostedByDID is auto-populated by the service layer, no validation needed 577 // The handler ensures clients cannot provide this field 578 579 return nil 580} 581 582// PDS write-forward helpers 583 584func (s *communityService) createRecordOnPDS(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}) (string, string, error) { 585 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/")) 586 587 payload := map[string]interface{}{ 588 "repo": repoDID, 589 "collection": collection, 590 "record": record, 591 } 592 593 if rkey != "" { 594 payload["rkey"] = rkey 595 } 596 597 return s.callPDS(ctx, "POST", endpoint, payload) 598} 599 600// createRecordOnPDSAs creates a record with a specific access token (for V2 community auth) 601func (s *communityService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) { 602 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/")) 603 604 payload := map[string]interface{}{ 605 "repo": repoDID, 606 "collection": collection, 607 "record": record, 608 } 609 610 if rkey != "" { 611 payload["rkey"] = rkey 612 } 613 614 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 615} 616 617// putRecordOnPDSAs updates a record with a specific access token (for V2 community auth) 618func (s *communityService) putRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) { 619 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.putRecord", strings.TrimSuffix(s.pdsURL, "/")) 620 621 payload := map[string]interface{}{ 622 "repo": repoDID, 623 "collection": collection, 624 "rkey": rkey, 625 "record": record, 626 } 627 628 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 629} 630 631func (s *communityService) deleteRecordOnPDS(ctx context.Context, repoDID, collection, rkey string) error { 632 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/")) 633 634 payload := map[string]interface{}{ 635 "repo": repoDID, 636 "collection": collection, 637 "rkey": rkey, 638 } 639 640 _, _, err := s.callPDS(ctx, "POST", endpoint, payload) 641 return err 642} 643 644// deleteRecordOnPDSAs deletes a record with a specific access token (for user-scoped deletions) 645func (s *communityService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, accessToken string) error { 646 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/")) 647 648 payload := map[string]interface{}{ 649 "repo": repoDID, 650 "collection": collection, 651 "rkey": rkey, 652 } 653 654 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 655 return err 656} 657 658func (s *communityService) callPDS(ctx context.Context, method, endpoint string, payload map[string]interface{}) (string, string, error) { 659 // Use instance's access token 660 return s.callPDSWithAuth(ctx, method, endpoint, payload, s.pdsAccessToken) 661} 662 663// callPDSWithAuth makes a PDS call with a specific access token (V2: for community authentication) 664func (s *communityService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) { 665 jsonData, err := json.Marshal(payload) 666 if err != nil { 667 return "", "", fmt.Errorf("failed to marshal payload: %w", err) 668 } 669 670 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData)) 671 if err != nil { 672 return "", "", fmt.Errorf("failed to create request: %w", err) 673 } 674 req.Header.Set("Content-Type", "application/json") 675 676 // Add authentication with provided access token 677 if accessToken != "" { 678 req.Header.Set("Authorization", "Bearer "+accessToken) 679 } 680 681 // Dynamic timeout based on operation type 682 // Write operations (createAccount, createRecord, putRecord) are slower due to: 683 // - Keypair generation 684 // - DID PLC registration 685 // - Database writes on PDS 686 timeout := 10 * time.Second // Default for read operations 687 if strings.Contains(endpoint, "createAccount") || 688 strings.Contains(endpoint, "createRecord") || 689 strings.Contains(endpoint, "putRecord") { 690 timeout = 30 * time.Second // Extended timeout for write operations 691 } 692 693 client := &http.Client{Timeout: timeout} 694 resp, err := client.Do(req) 695 if err != nil { 696 return "", "", fmt.Errorf("failed to call PDS: %w", err) 697 } 698 defer func() { 699 if closeErr := resp.Body.Close(); closeErr != nil { 700 log.Printf("Failed to close response body: %v", closeErr) 701 } 702 }() 703 704 body, err := io.ReadAll(resp.Body) 705 if err != nil { 706 return "", "", fmt.Errorf("failed to read response: %w", err) 707 } 708 709 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 710 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 711 } 712 713 // Parse response to extract URI and CID 714 var result struct { 715 URI string `json:"uri"` 716 CID string `json:"cid"` 717 } 718 if err := json.Unmarshal(body, &result); err != nil { 719 // For delete operations, there might not be a response body 720 if method == "POST" && strings.Contains(endpoint, "deleteRecord") { 721 return "", "", nil 722 } 723 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 724 } 725 726 return result.URI, result.CID, nil 727} 728 729// Helper functions 730 731func extractRKeyFromURI(uri string) string { 732 // at://did/collection/rkey -> rkey 733 parts := strings.Split(uri, "/") 734 if len(parts) >= 4 { 735 return parts[len(parts)-1] 736 } 737 return "" 738}