A community based topic aggregation platform built on atproto
1package communities 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log" 10 "net/http" 11 "regexp" 12 "strings" 13 "time" 14) 15 16// Community handle validation regex (DNS-valid handle: name.communities.instance.com) 17// Matches standard DNS hostname format (RFC 1035) 18var communityHandleRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 19 20type communityService struct { 21 repo Repository 22 provisioner *PDSAccountProvisioner 23 pdsURL string 24 instanceDID string 25 instanceDomain string 26 pdsAccessToken string 27} 28 29// NewCommunityService creates a new community service 30func NewCommunityService(repo Repository, pdsURL, instanceDID, instanceDomain string, provisioner *PDSAccountProvisioner) Service { 31 // SECURITY: Basic validation that did:web domain matches configured instanceDomain 32 // This catches honest configuration mistakes but NOT malicious code modifications 33 // Full verification (Phase 2) requires fetching DID document from domain 34 // See: docs/PRD_BACKLOG.md - "did:web Domain Verification" 35 if strings.HasPrefix(instanceDID, "did:web:") { 36 didDomain := strings.TrimPrefix(instanceDID, "did:web:") 37 if didDomain != instanceDomain { 38 log.Printf("⚠️ SECURITY WARNING: Instance DID domain (%s) doesn't match configured domain (%s)", 39 didDomain, instanceDomain) 40 log.Printf(" This could indicate a configuration error or potential domain spoofing attempt") 41 log.Printf(" Communities will be hosted by: %s", instanceDID) 42 } 43 } 44 45 return &communityService{ 46 repo: repo, 47 pdsURL: pdsURL, 48 instanceDID: instanceDID, 49 instanceDomain: instanceDomain, 50 provisioner: provisioner, 51 } 52} 53 54// SetPDSAccessToken sets the PDS access token for authentication 55// This should be called after creating a session for the Coves instance DID on the PDS 56func (s *communityService) SetPDSAccessToken(token string) { 57 s.pdsAccessToken = token 58} 59 60// CreateCommunity creates a new community via write-forward to PDS 61// V2 Flow: 62// 1. Service creates PDS account for community (PDS generates signing keypair) 63// 2. Service writes community profile to COMMUNITY's own repository 64// 3. Firehose emits event 65// 4. Consumer indexes to AppView DB 66// 67// V2 Architecture: 68// - Community owns its own repository (at://community_did/social.coves.community.profile/self) 69// - PDS manages the signing keypair (we never see it) 70// - We store PDS credentials to act on behalf of the community 71// - Community can migrate to other instances (future V2.1 with rotation keys) 72func (s *communityService) CreateCommunity(ctx context.Context, req CreateCommunityRequest) (*Community, error) { 73 // Apply defaults before validation 74 if req.Visibility == "" { 75 req.Visibility = "public" 76 } 77 78 // SECURITY: Auto-populate hostedByDID from instance configuration 79 // Clients MUST NOT provide this field - it's derived from the instance receiving the request 80 // This prevents malicious instances from claiming to host communities for domains they don't own 81 req.HostedByDID = s.instanceDID 82 83 // Validate request 84 if err := s.validateCreateRequest(req); err != nil { 85 return nil, err 86 } 87 88 // V2: Provision a real PDS account for this community 89 // This calls com.atproto.server.createAccount internally 90 // The PDS will: 91 // 1. Generate a signing keypair (stored in PDS, we never see it) 92 // 2. Create a DID (did:plc:xxx) 93 // 3. Return credentials (DID, tokens) 94 pdsAccount, err := s.provisioner.ProvisionCommunityAccount(ctx, req.Name) 95 if err != nil { 96 return nil, fmt.Errorf("failed to provision PDS account for community: %w", err) 97 } 98 99 // Validate the atProto handle 100 if validateErr := s.ValidateHandle(pdsAccount.Handle); validateErr != nil { 101 return nil, fmt.Errorf("generated atProto handle is invalid: %w", validateErr) 102 } 103 104 // Build community profile record 105 profile := map[string]interface{}{ 106 "$type": "social.coves.community.profile", 107 "handle": pdsAccount.Handle, // atProto handle (e.g., gaming.communities.coves.social) 108 "name": req.Name, // Short name for !mentions (e.g., "gaming") 109 "visibility": req.Visibility, 110 "hostedBy": s.instanceDID, // V2: Instance hosts, community owns 111 "createdBy": req.CreatedByDID, 112 "createdAt": time.Now().Format(time.RFC3339), 113 "federation": map[string]interface{}{ 114 "allowExternalDiscovery": req.AllowExternalDiscovery, 115 }, 116 } 117 118 // Add optional fields 119 if req.DisplayName != "" { 120 profile["displayName"] = req.DisplayName 121 } 122 if req.Description != "" { 123 profile["description"] = req.Description 124 } 125 if len(req.Rules) > 0 { 126 profile["rules"] = req.Rules 127 } 128 if len(req.Categories) > 0 { 129 profile["categories"] = req.Categories 130 } 131 if req.Language != "" { 132 profile["language"] = req.Language 133 } 134 135 // Initialize counts 136 profile["memberCount"] = 0 137 profile["subscriberCount"] = 0 138 139 // TODO: Handle avatar and banner blobs 140 // For now, we'll skip blob uploads. This would require: 141 // 1. Upload blob to PDS via com.atproto.repo.uploadBlob 142 // 2. Get blob ref (CID) 143 // 3. Add to profile record 144 145 // V2: Write to COMMUNITY's own repository (not instance repo!) 146 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self 147 // Authenticate using community's access token 148 recordURI, recordCID, err := s.createRecordOnPDSAs( 149 ctx, 150 pdsAccount.DID, // repo = community's DID (community owns its repo!) 151 "social.coves.community.profile", 152 "self", // canonical rkey for profile 153 profile, 154 pdsAccount.AccessToken, // authenticate as the community 155 ) 156 if err != nil { 157 return nil, fmt.Errorf("failed to create community profile record: %w", err) 158 } 159 160 // Build Community object with PDS credentials AND cryptographic keys 161 community := &Community{ 162 DID: pdsAccount.DID, // Community's DID (owns the repo!) 163 Handle: pdsAccount.Handle, // atProto handle (e.g., gaming.communities.coves.social) 164 Name: req.Name, 165 DisplayName: req.DisplayName, 166 Description: req.Description, 167 OwnerDID: pdsAccount.DID, // V2: Community owns itself 168 CreatedByDID: req.CreatedByDID, 169 HostedByDID: req.HostedByDID, 170 PDSEmail: pdsAccount.Email, 171 PDSPassword: pdsAccount.Password, 172 PDSAccessToken: pdsAccount.AccessToken, 173 PDSRefreshToken: pdsAccount.RefreshToken, 174 PDSURL: pdsAccount.PDSURL, 175 Visibility: req.Visibility, 176 AllowExternalDiscovery: req.AllowExternalDiscovery, 177 MemberCount: 0, 178 SubscriberCount: 0, 179 CreatedAt: time.Now(), 180 UpdatedAt: time.Now(), 181 RecordURI: recordURI, 182 RecordCID: recordCID, 183 // V2: Cryptographic keys for portability (will be encrypted by repository) 184 RotationKeyPEM: pdsAccount.RotationKeyPEM, // CRITICAL: Enables DID migration 185 SigningKeyPEM: pdsAccount.SigningKeyPEM, // For atproto operations 186 } 187 188 // CRITICAL: Persist PDS credentials immediately to database 189 // The Jetstream consumer will eventually index the community profile from the firehose, 190 // but it won't have the PDS credentials. We must store them now so we can: 191 // 1. Update the community profile later (using its own credentials) 192 // 2. Re-authenticate if access tokens expire 193 _, err = s.repo.Create(ctx, community) 194 if err != nil { 195 return nil, fmt.Errorf("failed to persist community with credentials: %w", err) 196 } 197 198 return community, nil 199} 200 201// GetCommunity retrieves a community from AppView DB 202// identifier can be either a DID or handle 203func (s *communityService) GetCommunity(ctx context.Context, identifier string) (*Community, error) { 204 if identifier == "" { 205 return nil, ErrInvalidInput 206 } 207 208 // Determine if identifier is DID or handle 209 if strings.HasPrefix(identifier, "did:") { 210 return s.repo.GetByDID(ctx, identifier) 211 } 212 213 if strings.HasPrefix(identifier, "!") { 214 return s.repo.GetByHandle(ctx, identifier) 215 } 216 217 return nil, NewValidationError("identifier", "must be a DID or handle") 218} 219 220// UpdateCommunity updates a community via write-forward to PDS 221func (s *communityService) UpdateCommunity(ctx context.Context, req UpdateCommunityRequest) (*Community, error) { 222 if req.CommunityDID == "" { 223 return nil, NewValidationError("communityDid", "required") 224 } 225 226 if req.UpdatedByDID == "" { 227 return nil, NewValidationError("updatedByDid", "required") 228 } 229 230 // Get existing community 231 existing, err := s.repo.GetByDID(ctx, req.CommunityDID) 232 if err != nil { 233 return nil, err 234 } 235 236 // Authorization: verify user is the creator 237 // TODO(Communities-Auth): Add moderator check when moderation system is implemented 238 if existing.CreatedByDID != req.UpdatedByDID { 239 return nil, ErrUnauthorized 240 } 241 242 // Build updated profile record (start with existing) 243 profile := map[string]interface{}{ 244 "$type": "social.coves.community.profile", 245 "handle": existing.Handle, 246 "name": existing.Name, 247 "owner": existing.OwnerDID, 248 "createdBy": existing.CreatedByDID, 249 "hostedBy": existing.HostedByDID, 250 "createdAt": existing.CreatedAt.Format(time.RFC3339), 251 } 252 253 // Apply updates 254 if req.DisplayName != nil { 255 profile["displayName"] = *req.DisplayName 256 } else { 257 profile["displayName"] = existing.DisplayName 258 } 259 260 if req.Description != nil { 261 profile["description"] = *req.Description 262 } else { 263 profile["description"] = existing.Description 264 } 265 266 if req.Visibility != nil { 267 profile["visibility"] = *req.Visibility 268 } else { 269 profile["visibility"] = existing.Visibility 270 } 271 272 if req.AllowExternalDiscovery != nil { 273 profile["federation"] = map[string]interface{}{ 274 "allowExternalDiscovery": *req.AllowExternalDiscovery, 275 } 276 } else { 277 profile["federation"] = map[string]interface{}{ 278 "allowExternalDiscovery": existing.AllowExternalDiscovery, 279 } 280 } 281 282 // Preserve moderation settings (even if empty) 283 // These fields are optional but should not be erased on update 284 if req.ModerationType != nil { 285 profile["moderationType"] = *req.ModerationType 286 } else if existing.ModerationType != "" { 287 profile["moderationType"] = existing.ModerationType 288 } 289 290 if len(req.ContentWarnings) > 0 { 291 profile["contentWarnings"] = req.ContentWarnings 292 } else if len(existing.ContentWarnings) > 0 { 293 profile["contentWarnings"] = existing.ContentWarnings 294 } 295 296 // Preserve counts 297 profile["memberCount"] = existing.MemberCount 298 profile["subscriberCount"] = existing.SubscriberCount 299 300 // V2: Community profiles always use "self" as rkey 301 // (No need to extract from URI - it's always "self" for V2 communities) 302 303 // V2 CRITICAL FIX: Write-forward using COMMUNITY's own DID and credentials 304 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self 305 // Authenticate as the community (not as instance!) 306 if existing.PDSAccessToken == "" { 307 return nil, fmt.Errorf("community %s missing PDS credentials - cannot update", existing.DID) 308 } 309 310 recordURI, recordCID, err := s.putRecordOnPDSAs( 311 ctx, 312 existing.DID, // repo = community's own DID (V2!) 313 "social.coves.community.profile", 314 "self", // V2: always "self" 315 profile, 316 existing.PDSAccessToken, // authenticate as the community 317 ) 318 if err != nil { 319 return nil, fmt.Errorf("failed to update community on PDS: %w", err) 320 } 321 322 // Return updated community representation 323 // Actual AppView DB update happens via Jetstream consumer 324 updated := *existing 325 if req.DisplayName != nil { 326 updated.DisplayName = *req.DisplayName 327 } 328 if req.Description != nil { 329 updated.Description = *req.Description 330 } 331 if req.Visibility != nil { 332 updated.Visibility = *req.Visibility 333 } 334 if req.AllowExternalDiscovery != nil { 335 updated.AllowExternalDiscovery = *req.AllowExternalDiscovery 336 } 337 if req.ModerationType != nil { 338 updated.ModerationType = *req.ModerationType 339 } 340 if len(req.ContentWarnings) > 0 { 341 updated.ContentWarnings = req.ContentWarnings 342 } 343 updated.RecordURI = recordURI 344 updated.RecordCID = recordCID 345 updated.UpdatedAt = time.Now() 346 347 return &updated, nil 348} 349 350// ListCommunities queries AppView DB for communities with filters 351func (s *communityService) ListCommunities(ctx context.Context, req ListCommunitiesRequest) ([]*Community, int, error) { 352 // Set defaults 353 if req.Limit <= 0 || req.Limit > 100 { 354 req.Limit = 50 355 } 356 357 return s.repo.List(ctx, req) 358} 359 360// SearchCommunities performs fuzzy search in AppView DB 361func (s *communityService) SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error) { 362 if req.Query == "" { 363 return nil, 0, NewValidationError("query", "search query is required") 364 } 365 366 // Set defaults 367 if req.Limit <= 0 || req.Limit > 100 { 368 req.Limit = 50 369 } 370 371 return s.repo.Search(ctx, req) 372} 373 374// SubscribeToCommunity creates a subscription via write-forward to PDS 375func (s *communityService) SubscribeToCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string, contentVisibility int) (*Subscription, error) { 376 if userDID == "" { 377 return nil, NewValidationError("userDid", "required") 378 } 379 if userAccessToken == "" { 380 return nil, NewValidationError("userAccessToken", "required") 381 } 382 383 // Clamp contentVisibility to valid range (1-5), default to 3 if 0 or invalid 384 if contentVisibility <= 0 || contentVisibility > 5 { 385 contentVisibility = 3 386 } 387 388 // Resolve community identifier to DID 389 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 390 if err != nil { 391 return nil, err 392 } 393 394 // Verify community exists 395 community, err := s.repo.GetByDID(ctx, communityDID) 396 if err != nil { 397 return nil, err 398 } 399 400 // Check visibility - can't subscribe to private communities without invitation (TODO) 401 if community.Visibility == "private" { 402 return nil, ErrUnauthorized 403 } 404 405 // Build subscription record 406 // CRITICAL: Collection is social.coves.community.subscription (RECORD TYPE), not social.coves.community.subscribe (XRPC procedure) 407 // This record will be created in the USER's repository: at://user_did/social.coves.community.subscription/{tid} 408 // Following atProto conventions, we use "subject" field to reference the community 409 subRecord := map[string]interface{}{ 410 "$type": "social.coves.community.subscription", 411 "subject": communityDID, // atProto convention: "subject" for entity references 412 "createdAt": time.Now().Format(time.RFC3339), 413 "contentVisibility": contentVisibility, 414 } 415 416 // Write-forward: create subscription record in user's repo using their access token 417 // The collection parameter refers to the record type in the repository 418 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", "", subRecord, userAccessToken) 419 if err != nil { 420 return nil, fmt.Errorf("failed to create subscription on PDS: %w", err) 421 } 422 423 // Return subscription representation 424 subscription := &Subscription{ 425 UserDID: userDID, 426 CommunityDID: communityDID, 427 ContentVisibility: contentVisibility, 428 SubscribedAt: time.Now(), 429 RecordURI: recordURI, 430 RecordCID: recordCID, 431 } 432 433 return subscription, nil 434} 435 436// UnsubscribeFromCommunity removes a subscription via PDS delete 437func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error { 438 if userDID == "" { 439 return NewValidationError("userDid", "required") 440 } 441 if userAccessToken == "" { 442 return NewValidationError("userAccessToken", "required") 443 } 444 445 // Resolve community identifier 446 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 447 if err != nil { 448 return err 449 } 450 451 // Get the subscription from AppView to find the record key 452 subscription, err := s.repo.GetSubscription(ctx, userDID, communityDID) 453 if err != nil { 454 return err 455 } 456 457 // Extract rkey from record URI (at://did/collection/rkey) 458 rkey := extractRKeyFromURI(subscription.RecordURI) 459 if rkey == "" { 460 return fmt.Errorf("invalid subscription record URI") 461 } 462 463 // Write-forward: delete record from PDS using user's access token 464 // CRITICAL: Delete from social.coves.community.subscription (RECORD TYPE), not social.coves.community.unsubscribe 465 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", rkey, userAccessToken); err != nil { 466 return fmt.Errorf("failed to delete subscription on PDS: %w", err) 467 } 468 469 return nil 470} 471 472// GetUserSubscriptions queries AppView DB for user's subscriptions 473func (s *communityService) GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) { 474 if limit <= 0 || limit > 100 { 475 limit = 50 476 } 477 478 return s.repo.ListSubscriptions(ctx, userDID, limit, offset) 479} 480 481// GetCommunitySubscribers queries AppView DB for community subscribers 482func (s *communityService) GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) { 483 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 484 if err != nil { 485 return nil, err 486 } 487 488 if limit <= 0 || limit > 100 { 489 limit = 50 490 } 491 492 return s.repo.ListSubscribers(ctx, communityDID, limit, offset) 493} 494 495// GetMembership retrieves membership info from AppView DB 496func (s *communityService) GetMembership(ctx context.Context, userDID, communityIdentifier string) (*Membership, error) { 497 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 498 if err != nil { 499 return nil, err 500 } 501 502 return s.repo.GetMembership(ctx, userDID, communityDID) 503} 504 505// ListCommunityMembers queries AppView DB for members 506func (s *communityService) ListCommunityMembers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Membership, error) { 507 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 508 if err != nil { 509 return nil, err 510 } 511 512 if limit <= 0 || limit > 100 { 513 limit = 50 514 } 515 516 return s.repo.ListMembers(ctx, communityDID, limit, offset) 517} 518 519// ValidateHandle checks if a community handle is valid 520func (s *communityService) ValidateHandle(handle string) error { 521 if handle == "" { 522 return NewValidationError("handle", "required") 523 } 524 525 if !communityHandleRegex.MatchString(handle) { 526 return ErrInvalidHandle 527 } 528 529 return nil 530} 531 532// ResolveCommunityIdentifier converts a handle or DID to a DID 533func (s *communityService) ResolveCommunityIdentifier(ctx context.Context, identifier string) (string, error) { 534 if identifier == "" { 535 return "", ErrInvalidInput 536 } 537 538 // If it's already a DID, return it 539 if strings.HasPrefix(identifier, "did:") { 540 return identifier, nil 541 } 542 543 // If it's a handle, look it up in AppView DB 544 if strings.HasPrefix(identifier, "!") { 545 community, err := s.repo.GetByHandle(ctx, identifier) 546 if err != nil { 547 return "", err 548 } 549 return community.DID, nil 550 } 551 552 return "", NewValidationError("identifier", "must be a DID or handle") 553} 554 555// Validation helpers 556 557func (s *communityService) validateCreateRequest(req CreateCommunityRequest) error { 558 if req.Name == "" { 559 return NewValidationError("name", "required") 560 } 561 562 // DNS label limit: 63 characters per label 563 // Community handle format: {name}.communities.{instanceDomain} 564 // The first label is just req.Name, so it must be <= 63 chars 565 if len(req.Name) > 63 { 566 return NewValidationError("name", "must be 63 characters or less (DNS label limit)") 567 } 568 569 // Name can only contain alphanumeric and hyphens 570 // Must start and end with alphanumeric (not hyphen) 571 nameRegex := regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 572 if !nameRegex.MatchString(req.Name) { 573 return NewValidationError("name", "must contain only alphanumeric characters and hyphens") 574 } 575 576 if req.Description != "" && len(req.Description) > 3000 { 577 return NewValidationError("description", "must be 3000 characters or less") 578 } 579 580 // Visibility should already be set with default in CreateCommunity 581 if req.Visibility != "public" && req.Visibility != "unlisted" && req.Visibility != "private" { 582 return ErrInvalidVisibility 583 } 584 585 if req.CreatedByDID == "" { 586 return NewValidationError("createdByDid", "required") 587 } 588 589 // hostedByDID is auto-populated by the service layer, no validation needed 590 // The handler ensures clients cannot provide this field 591 592 return nil 593} 594 595// PDS write-forward helpers 596 597func (s *communityService) createRecordOnPDS(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}) (string, string, error) { 598 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/")) 599 600 payload := map[string]interface{}{ 601 "repo": repoDID, 602 "collection": collection, 603 "record": record, 604 } 605 606 if rkey != "" { 607 payload["rkey"] = rkey 608 } 609 610 return s.callPDS(ctx, "POST", endpoint, payload) 611} 612 613// createRecordOnPDSAs creates a record with a specific access token (for V2 community auth) 614func (s *communityService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) { 615 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/")) 616 617 payload := map[string]interface{}{ 618 "repo": repoDID, 619 "collection": collection, 620 "record": record, 621 } 622 623 if rkey != "" { 624 payload["rkey"] = rkey 625 } 626 627 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 628} 629 630// putRecordOnPDSAs updates a record with a specific access token (for V2 community auth) 631func (s *communityService) putRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) { 632 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.putRecord", strings.TrimSuffix(s.pdsURL, "/")) 633 634 payload := map[string]interface{}{ 635 "repo": repoDID, 636 "collection": collection, 637 "rkey": rkey, 638 "record": record, 639 } 640 641 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 642} 643 644func (s *communityService) deleteRecordOnPDS(ctx context.Context, repoDID, collection, rkey string) error { 645 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/")) 646 647 payload := map[string]interface{}{ 648 "repo": repoDID, 649 "collection": collection, 650 "rkey": rkey, 651 } 652 653 _, _, err := s.callPDS(ctx, "POST", endpoint, payload) 654 return err 655} 656 657// deleteRecordOnPDSAs deletes a record with a specific access token (for user-scoped deletions) 658func (s *communityService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, accessToken string) error { 659 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/")) 660 661 payload := map[string]interface{}{ 662 "repo": repoDID, 663 "collection": collection, 664 "rkey": rkey, 665 } 666 667 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 668 return err 669} 670 671func (s *communityService) callPDS(ctx context.Context, method, endpoint string, payload map[string]interface{}) (string, string, error) { 672 // Use instance's access token 673 return s.callPDSWithAuth(ctx, method, endpoint, payload, s.pdsAccessToken) 674} 675 676// callPDSWithAuth makes a PDS call with a specific access token (V2: for community authentication) 677func (s *communityService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) { 678 jsonData, err := json.Marshal(payload) 679 if err != nil { 680 return "", "", fmt.Errorf("failed to marshal payload: %w", err) 681 } 682 683 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData)) 684 if err != nil { 685 return "", "", fmt.Errorf("failed to create request: %w", err) 686 } 687 req.Header.Set("Content-Type", "application/json") 688 689 // Add authentication with provided access token 690 if accessToken != "" { 691 req.Header.Set("Authorization", "Bearer "+accessToken) 692 } 693 694 // Dynamic timeout based on operation type 695 // Write operations (createAccount, createRecord, putRecord) are slower due to: 696 // - Keypair generation 697 // - DID PLC registration 698 // - Database writes on PDS 699 timeout := 10 * time.Second // Default for read operations 700 if strings.Contains(endpoint, "createAccount") || 701 strings.Contains(endpoint, "createRecord") || 702 strings.Contains(endpoint, "putRecord") { 703 timeout = 30 * time.Second // Extended timeout for write operations 704 } 705 706 client := &http.Client{Timeout: timeout} 707 resp, err := client.Do(req) 708 if err != nil { 709 return "", "", fmt.Errorf("failed to call PDS: %w", err) 710 } 711 defer func() { 712 if closeErr := resp.Body.Close(); closeErr != nil { 713 log.Printf("Failed to close response body: %v", closeErr) 714 } 715 }() 716 717 body, err := io.ReadAll(resp.Body) 718 if err != nil { 719 return "", "", fmt.Errorf("failed to read response: %w", err) 720 } 721 722 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 723 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 724 } 725 726 // Parse response to extract URI and CID 727 var result struct { 728 URI string `json:"uri"` 729 CID string `json:"cid"` 730 } 731 if err := json.Unmarshal(body, &result); err != nil { 732 // For delete operations, there might not be a response body 733 if method == "POST" && strings.Contains(endpoint, "deleteRecord") { 734 return "", "", nil 735 } 736 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 737 } 738 739 return result.URI, result.CID, nil 740} 741 742// Helper functions 743 744func extractRKeyFromURI(uri string) string { 745 // at://did/collection/rkey -> rkey 746 parts := strings.Split(uri, "/") 747 if len(parts) >= 4 { 748 return parts[len(parts)-1] 749 } 750 return "" 751}