A community based topic aggregation platform built on atproto
1package communities 2 3import ( 4 "Coves/internal/atproto/utils" 5 "bytes" 6 "context" 7 "encoding/json" 8 "errors" 9 "fmt" 10 "io" 11 "log" 12 "net/http" 13 "regexp" 14 "strings" 15 "sync" 16 "time" 17) 18 19// Community handle validation regex (DNS-valid handle: name.communities.instance.com) 20// Matches standard DNS hostname format (RFC 1035) 21var communityHandleRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 22 23type communityService struct { 24 // Interfaces and pointers first (better alignment) 25 repo Repository 26 provisioner *PDSAccountProvisioner 27 28 // Token refresh concurrency control 29 // Each community gets its own mutex to prevent concurrent refresh attempts 30 refreshMutexes map[string]*sync.Mutex 31 32 // Strings 33 pdsURL string 34 instanceDID string 35 instanceDomain string 36 pdsAccessToken string 37 38 // Sync primitives last 39 mapMutex sync.RWMutex // Protects refreshMutexes map itself 40} 41 42const ( 43 // Maximum recommended size for mutex cache (warning threshold, not hard limit) 44 // At 10,000 entries × 16 bytes = ~160KB memory (negligible overhead) 45 // Map can grow larger in production - even 100,000 entries = 1.6MB is acceptable 46 maxMutexCacheSize = 10000 47) 48 49// NewCommunityService creates a new community service 50func NewCommunityService(repo Repository, pdsURL, instanceDID, instanceDomain string, provisioner *PDSAccountProvisioner) Service { 51 // SECURITY: Basic validation that did:web domain matches configured instanceDomain 52 // This catches honest configuration mistakes but NOT malicious code modifications 53 // Full verification (Phase 2) requires fetching DID document from domain 54 // See: docs/PRD_BACKLOG.md - "did:web Domain Verification" 55 if strings.HasPrefix(instanceDID, "did:web:") { 56 didDomain := strings.TrimPrefix(instanceDID, "did:web:") 57 if didDomain != instanceDomain { 58 log.Printf("⚠️ SECURITY WARNING: Instance DID domain (%s) doesn't match configured domain (%s)", 59 didDomain, instanceDomain) 60 log.Printf(" This could indicate a configuration error or potential domain spoofing attempt") 61 log.Printf(" Communities will be hosted by: %s", instanceDID) 62 } 63 } 64 65 return &communityService{ 66 repo: repo, 67 pdsURL: pdsURL, 68 instanceDID: instanceDID, 69 instanceDomain: instanceDomain, 70 provisioner: provisioner, 71 refreshMutexes: make(map[string]*sync.Mutex), 72 } 73} 74 75// SetPDSAccessToken sets the PDS access token for authentication 76// This should be called after creating a session for the Coves instance DID on the PDS 77func (s *communityService) SetPDSAccessToken(token string) { 78 s.pdsAccessToken = token 79} 80 81// CreateCommunity creates a new community via write-forward to PDS 82// V2 Flow: 83// 1. Service creates PDS account for community (PDS generates signing keypair) 84// 2. Service writes community profile to COMMUNITY's own repository 85// 3. Firehose emits event 86// 4. Consumer indexes to AppView DB 87// 88// V2 Architecture: 89// - Community owns its own repository (at://community_did/social.coves.community.profile/self) 90// - PDS manages the signing keypair (we never see it) 91// - We store PDS credentials to act on behalf of the community 92// - Community can migrate to other instances (future V2.1 with rotation keys) 93func (s *communityService) CreateCommunity(ctx context.Context, req CreateCommunityRequest) (*Community, error) { 94 // Apply defaults before validation 95 if req.Visibility == "" { 96 req.Visibility = "public" 97 } 98 99 // SECURITY: Auto-populate hostedByDID from instance configuration 100 // Clients MUST NOT provide this field - it's derived from the instance receiving the request 101 // This prevents malicious instances from claiming to host communities for domains they don't own 102 req.HostedByDID = s.instanceDID 103 104 // Validate request 105 if err := s.validateCreateRequest(req); err != nil { 106 return nil, err 107 } 108 109 // V2: Provision a real PDS account for this community 110 // This calls com.atproto.server.createAccount internally 111 // The PDS will: 112 // 1. Generate a signing keypair (stored in PDS, we never see it) 113 // 2. Create a DID (did:plc:xxx) 114 // 3. Return credentials (DID, tokens) 115 pdsAccount, err := s.provisioner.ProvisionCommunityAccount(ctx, req.Name) 116 if err != nil { 117 return nil, fmt.Errorf("failed to provision PDS account for community: %w", err) 118 } 119 120 // Validate the atProto handle 121 if validateErr := s.ValidateHandle(pdsAccount.Handle); validateErr != nil { 122 return nil, fmt.Errorf("generated atProto handle is invalid: %w", validateErr) 123 } 124 125 // Build community profile record 126 profile := map[string]interface{}{ 127 "$type": "social.coves.community.profile", 128 "handle": pdsAccount.Handle, // atProto handle (e.g., gaming.communities.coves.social) 129 "name": req.Name, // Short name for !mentions (e.g., "gaming") 130 "visibility": req.Visibility, 131 "hostedBy": s.instanceDID, // V2: Instance hosts, community owns 132 "createdBy": req.CreatedByDID, 133 "createdAt": time.Now().Format(time.RFC3339), 134 "federation": map[string]interface{}{ 135 "allowExternalDiscovery": req.AllowExternalDiscovery, 136 }, 137 } 138 139 // Add optional fields 140 if req.DisplayName != "" { 141 profile["displayName"] = req.DisplayName 142 } 143 if req.Description != "" { 144 profile["description"] = req.Description 145 } 146 if len(req.Rules) > 0 { 147 profile["rules"] = req.Rules 148 } 149 if len(req.Categories) > 0 { 150 profile["categories"] = req.Categories 151 } 152 if req.Language != "" { 153 profile["language"] = req.Language 154 } 155 156 // Initialize counts 157 profile["memberCount"] = 0 158 profile["subscriberCount"] = 0 159 160 // TODO: Handle avatar and banner blobs 161 // For now, we'll skip blob uploads. This would require: 162 // 1. Upload blob to PDS via com.atproto.repo.uploadBlob 163 // 2. Get blob ref (CID) 164 // 3. Add to profile record 165 166 // V2: Write to COMMUNITY's own repository (not instance repo!) 167 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self 168 // Authenticate using community's access token 169 recordURI, recordCID, err := s.createRecordOnPDSAs( 170 ctx, 171 pdsAccount.DID, // repo = community's DID (community owns its repo!) 172 "social.coves.community.profile", 173 "self", // canonical rkey for profile 174 profile, 175 pdsAccount.AccessToken, // authenticate as the community 176 ) 177 if err != nil { 178 return nil, fmt.Errorf("failed to create community profile record: %w", err) 179 } 180 181 // Build Community object with PDS credentials AND cryptographic keys 182 community := &Community{ 183 DID: pdsAccount.DID, // Community's DID (owns the repo!) 184 Handle: pdsAccount.Handle, // atProto handle (e.g., gaming.communities.coves.social) 185 Name: req.Name, 186 DisplayName: req.DisplayName, 187 Description: req.Description, 188 OwnerDID: pdsAccount.DID, // V2: Community owns itself 189 CreatedByDID: req.CreatedByDID, 190 HostedByDID: req.HostedByDID, 191 PDSEmail: pdsAccount.Email, 192 PDSPassword: pdsAccount.Password, 193 PDSAccessToken: pdsAccount.AccessToken, 194 PDSRefreshToken: pdsAccount.RefreshToken, 195 PDSURL: pdsAccount.PDSURL, 196 Visibility: req.Visibility, 197 AllowExternalDiscovery: req.AllowExternalDiscovery, 198 MemberCount: 0, 199 SubscriberCount: 0, 200 CreatedAt: time.Now(), 201 UpdatedAt: time.Now(), 202 RecordURI: recordURI, 203 RecordCID: recordCID, 204 // V2: Cryptographic keys for portability (will be encrypted by repository) 205 RotationKeyPEM: pdsAccount.RotationKeyPEM, // CRITICAL: Enables DID migration 206 SigningKeyPEM: pdsAccount.SigningKeyPEM, // For atproto operations 207 } 208 209 // CRITICAL: Persist PDS credentials immediately to database 210 // The Jetstream consumer will eventually index the community profile from the firehose, 211 // but it won't have the PDS credentials. We must store them now so we can: 212 // 1. Update the community profile later (using its own credentials) 213 // 2. Re-authenticate if access tokens expire 214 _, err = s.repo.Create(ctx, community) 215 if err != nil { 216 return nil, fmt.Errorf("failed to persist community with credentials: %w", err) 217 } 218 219 return community, nil 220} 221 222// GetCommunity retrieves a community from AppView DB 223// identifier can be either a DID or handle 224func (s *communityService) GetCommunity(ctx context.Context, identifier string) (*Community, error) { 225 if identifier == "" { 226 return nil, ErrInvalidInput 227 } 228 229 // Determine if identifier is DID or handle 230 if strings.HasPrefix(identifier, "did:") { 231 return s.repo.GetByDID(ctx, identifier) 232 } 233 234 if strings.HasPrefix(identifier, "!") { 235 return s.repo.GetByHandle(ctx, identifier) 236 } 237 238 return nil, NewValidationError("identifier", "must be a DID or handle") 239} 240 241// UpdateCommunity updates a community via write-forward to PDS 242func (s *communityService) UpdateCommunity(ctx context.Context, req UpdateCommunityRequest) (*Community, error) { 243 if req.CommunityDID == "" { 244 return nil, NewValidationError("communityDid", "required") 245 } 246 247 if req.UpdatedByDID == "" { 248 return nil, NewValidationError("updatedByDid", "required") 249 } 250 251 // Get existing community 252 existing, err := s.repo.GetByDID(ctx, req.CommunityDID) 253 if err != nil { 254 return nil, err 255 } 256 257 // CRITICAL: Ensure fresh PDS access token before write operation 258 // Community PDS tokens expire every ~2 hours and must be refreshed 259 existing, err = s.ensureFreshToken(ctx, existing) 260 if err != nil { 261 return nil, fmt.Errorf("failed to ensure fresh credentials: %w", err) 262 } 263 264 // Authorization: verify user is the creator 265 // TODO(Communities-Auth): Add moderator check when moderation system is implemented 266 if existing.CreatedByDID != req.UpdatedByDID { 267 return nil, ErrUnauthorized 268 } 269 270 // Build updated profile record (start with existing) 271 profile := map[string]interface{}{ 272 "$type": "social.coves.community.profile", 273 "handle": existing.Handle, 274 "name": existing.Name, 275 "owner": existing.OwnerDID, 276 "createdBy": existing.CreatedByDID, 277 "hostedBy": existing.HostedByDID, 278 "createdAt": existing.CreatedAt.Format(time.RFC3339), 279 } 280 281 // Apply updates 282 if req.DisplayName != nil { 283 profile["displayName"] = *req.DisplayName 284 } else { 285 profile["displayName"] = existing.DisplayName 286 } 287 288 if req.Description != nil { 289 profile["description"] = *req.Description 290 } else { 291 profile["description"] = existing.Description 292 } 293 294 if req.Visibility != nil { 295 profile["visibility"] = *req.Visibility 296 } else { 297 profile["visibility"] = existing.Visibility 298 } 299 300 if req.AllowExternalDiscovery != nil { 301 profile["federation"] = map[string]interface{}{ 302 "allowExternalDiscovery": *req.AllowExternalDiscovery, 303 } 304 } else { 305 profile["federation"] = map[string]interface{}{ 306 "allowExternalDiscovery": existing.AllowExternalDiscovery, 307 } 308 } 309 310 // Preserve moderation settings (even if empty) 311 // These fields are optional but should not be erased on update 312 if req.ModerationType != nil { 313 profile["moderationType"] = *req.ModerationType 314 } else if existing.ModerationType != "" { 315 profile["moderationType"] = existing.ModerationType 316 } 317 318 if len(req.ContentWarnings) > 0 { 319 profile["contentWarnings"] = req.ContentWarnings 320 } else if len(existing.ContentWarnings) > 0 { 321 profile["contentWarnings"] = existing.ContentWarnings 322 } 323 324 // Preserve counts 325 profile["memberCount"] = existing.MemberCount 326 profile["subscriberCount"] = existing.SubscriberCount 327 328 // V2: Community profiles always use "self" as rkey 329 // (No need to extract from URI - it's always "self" for V2 communities) 330 331 // V2 CRITICAL FIX: Write-forward using COMMUNITY's own DID and credentials 332 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self 333 // Authenticate as the community (not as instance!) 334 if existing.PDSAccessToken == "" { 335 return nil, fmt.Errorf("community %s missing PDS credentials - cannot update", existing.DID) 336 } 337 338 recordURI, recordCID, err := s.putRecordOnPDSAs( 339 ctx, 340 existing.DID, // repo = community's own DID (V2!) 341 "social.coves.community.profile", 342 "self", // V2: always "self" 343 profile, 344 existing.PDSAccessToken, // authenticate as the community 345 ) 346 if err != nil { 347 return nil, fmt.Errorf("failed to update community on PDS: %w", err) 348 } 349 350 // Return updated community representation 351 // Actual AppView DB update happens via Jetstream consumer 352 updated := *existing 353 if req.DisplayName != nil { 354 updated.DisplayName = *req.DisplayName 355 } 356 if req.Description != nil { 357 updated.Description = *req.Description 358 } 359 if req.Visibility != nil { 360 updated.Visibility = *req.Visibility 361 } 362 if req.AllowExternalDiscovery != nil { 363 updated.AllowExternalDiscovery = *req.AllowExternalDiscovery 364 } 365 if req.ModerationType != nil { 366 updated.ModerationType = *req.ModerationType 367 } 368 if len(req.ContentWarnings) > 0 { 369 updated.ContentWarnings = req.ContentWarnings 370 } 371 updated.RecordURI = recordURI 372 updated.RecordCID = recordCID 373 updated.UpdatedAt = time.Now() 374 375 return &updated, nil 376} 377 378// getOrCreateRefreshMutex returns a mutex for the given community DID 379// Thread-safe with read-lock fast path for existing entries 380// SAFETY: Does NOT evict entries to avoid race condition where: 381// 1. Thread A holds mutex for community-123 382// 2. Thread B evicts community-123 from map 383// 3. Thread C creates NEW mutex for community-123 384// 4. Now two threads can refresh community-123 concurrently (mutex defeated!) 385func (s *communityService) getOrCreateRefreshMutex(did string) *sync.Mutex { 386 // Fast path: check if mutex already exists (read lock) 387 s.mapMutex.RLock() 388 mutex, exists := s.refreshMutexes[did] 389 s.mapMutex.RUnlock() 390 391 if exists { 392 return mutex 393 } 394 395 // Slow path: create new mutex (write lock) 396 s.mapMutex.Lock() 397 defer s.mapMutex.Unlock() 398 399 // Double-check after acquiring write lock (another goroutine might have created it) 400 mutex, exists = s.refreshMutexes[did] 401 if exists { 402 return mutex 403 } 404 405 // Create new mutex 406 mutex = &sync.Mutex{} 407 s.refreshMutexes[did] = mutex 408 409 // SAFETY: No eviction to prevent race condition 410 // Map will grow beyond maxMutexCacheSize but this is safer than evicting in-use mutexes 411 if len(s.refreshMutexes) > maxMutexCacheSize { 412 memoryKB := len(s.refreshMutexes) * 16 / 1024 413 log.Printf("[TOKEN-REFRESH] WARN: Mutex cache size (%d) exceeds recommended limit (%d) - this is safe but may indicate high community churn. Memory usage: ~%d KB", 414 len(s.refreshMutexes), maxMutexCacheSize, memoryKB) 415 } 416 417 return mutex 418} 419 420// ensureFreshToken checks if a community's access token needs refresh and updates if needed 421// Returns updated community with fresh credentials (or original if no refresh needed) 422// Thread-safe: Uses per-community mutex to prevent concurrent refresh attempts 423func (s *communityService) ensureFreshToken(ctx context.Context, community *Community) (*Community, error) { 424 // Get or create mutex for this specific community DID 425 mutex := s.getOrCreateRefreshMutex(community.DID) 426 427 // Lock for this specific community (allows other communities to refresh concurrently) 428 mutex.Lock() 429 defer mutex.Unlock() 430 431 // Re-fetch community from DB (another goroutine might have already refreshed it) 432 fresh, err := s.repo.GetByDID(ctx, community.DID) 433 if err != nil { 434 return nil, fmt.Errorf("failed to re-fetch community: %w", err) 435 } 436 437 // Check if token needs refresh (5-minute buffer before expiration) 438 needsRefresh, err := NeedsRefresh(fresh.PDSAccessToken) 439 if err != nil { 440 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_parse_failed, Error: %v", fresh.DID, err) 441 return nil, fmt.Errorf("failed to check token expiration: %w", err) 442 } 443 444 if !needsRefresh { 445 // Token still valid, no refresh needed 446 return fresh, nil 447 } 448 449 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refresh_started, Message: Access token expiring soon", fresh.DID) 450 451 // Attempt token refresh using refresh token 452 newAccessToken, newRefreshToken, err := refreshPDSToken(ctx, fresh.PDSURL, fresh.PDSAccessToken, fresh.PDSRefreshToken) 453 if err != nil { 454 // Check if refresh token expired (need password fallback) 455 if strings.Contains(err.Error(), "expired or invalid") { 456 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_token_expired, Message: Re-authenticating with password", fresh.DID) 457 458 // Fallback: Re-authenticate with stored password 459 newAccessToken, newRefreshToken, err = reauthenticateWithPassword( 460 ctx, 461 fresh.PDSURL, 462 fresh.PDSEmail, 463 fresh.PDSPassword, // Retrieved decrypted from DB 464 ) 465 if err != nil { 466 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_auth_failed, Error: %v", fresh.DID, err) 467 return nil, fmt.Errorf("failed to re-authenticate community: %w", err) 468 } 469 470 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_fallback_success, Message: Re-authenticated after refresh token expiry", fresh.DID) 471 } else { 472 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_failed, Error: %v", fresh.DID, err) 473 return nil, fmt.Errorf("failed to refresh token: %w", err) 474 } 475 } 476 477 // CRITICAL: Update database with new tokens immediately 478 // Refresh tokens are SINGLE-USE - old one is now invalid 479 // Use retry logic to handle transient DB failures 480 const maxRetries = 3 481 var updateErr error 482 for attempt := 0; attempt < maxRetries; attempt++ { 483 updateErr = s.repo.UpdateCredentials(ctx, fresh.DID, newAccessToken, newRefreshToken) 484 if updateErr == nil { 485 break // Success 486 } 487 488 log.Printf("[TOKEN-REFRESH] Community: %s, Event: db_update_retry, Attempt: %d/%d, Error: %v", 489 fresh.DID, attempt+1, maxRetries, updateErr) 490 491 if attempt < maxRetries-1 { 492 // Exponential backoff: 100ms, 200ms, 400ms 493 backoff := time.Duration(1<<attempt) * 100 * time.Millisecond 494 time.Sleep(backoff) 495 } 496 } 497 498 if updateErr != nil { 499 // CRITICAL: Community is now locked out - old refresh token invalid, new one not saved 500 log.Printf("[TOKEN-REFRESH] CRITICAL: Community %s LOCKED OUT - failed to persist credentials after %d retries: %v", 501 fresh.DID, maxRetries, updateErr) 502 // TODO: Send alert to monitoring system (add in Beta) 503 return nil, fmt.Errorf("failed to persist refreshed credentials after %d retries (COMMUNITY LOCKED OUT): %w", 504 maxRetries, updateErr) 505 } 506 507 // Return updated community object with fresh tokens 508 updatedCommunity := *fresh 509 updatedCommunity.PDSAccessToken = newAccessToken 510 updatedCommunity.PDSRefreshToken = newRefreshToken 511 512 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refreshed, Message: Access token refreshed successfully", fresh.DID) 513 514 return &updatedCommunity, nil 515} 516 517// ListCommunities queries AppView DB for communities with filters 518func (s *communityService) ListCommunities(ctx context.Context, req ListCommunitiesRequest) ([]*Community, int, error) { 519 // Set defaults 520 if req.Limit <= 0 || req.Limit > 100 { 521 req.Limit = 50 522 } 523 524 return s.repo.List(ctx, req) 525} 526 527// SearchCommunities performs fuzzy search in AppView DB 528func (s *communityService) SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error) { 529 if req.Query == "" { 530 return nil, 0, NewValidationError("query", "search query is required") 531 } 532 533 // Set defaults 534 if req.Limit <= 0 || req.Limit > 100 { 535 req.Limit = 50 536 } 537 538 return s.repo.Search(ctx, req) 539} 540 541// SubscribeToCommunity creates a subscription via write-forward to PDS 542func (s *communityService) SubscribeToCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string, contentVisibility int) (*Subscription, error) { 543 if userDID == "" { 544 return nil, NewValidationError("userDid", "required") 545 } 546 if userAccessToken == "" { 547 return nil, NewValidationError("userAccessToken", "required") 548 } 549 550 // Clamp contentVisibility to valid range (1-5), default to 3 if 0 or invalid 551 if contentVisibility <= 0 || contentVisibility > 5 { 552 contentVisibility = 3 553 } 554 555 // Resolve community identifier to DID 556 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 557 if err != nil { 558 return nil, err 559 } 560 561 // Verify community exists 562 community, err := s.repo.GetByDID(ctx, communityDID) 563 if err != nil { 564 return nil, err 565 } 566 567 // Check visibility - can't subscribe to private communities without invitation (TODO) 568 if community.Visibility == "private" { 569 return nil, ErrUnauthorized 570 } 571 572 // Build subscription record 573 // CRITICAL: Collection is social.coves.community.subscription (RECORD TYPE), not social.coves.community.subscribe (XRPC procedure) 574 // This record will be created in the USER's repository: at://user_did/social.coves.community.subscription/{tid} 575 // Following atProto conventions, we use "subject" field to reference the community 576 subRecord := map[string]interface{}{ 577 "$type": "social.coves.community.subscription", 578 "subject": communityDID, // atProto convention: "subject" for entity references 579 "createdAt": time.Now().Format(time.RFC3339), 580 "contentVisibility": contentVisibility, 581 } 582 583 // Write-forward: create subscription record in user's repo using their access token 584 // The collection parameter refers to the record type in the repository 585 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", "", subRecord, userAccessToken) 586 if err != nil { 587 return nil, fmt.Errorf("failed to create subscription on PDS: %w", err) 588 } 589 590 // Return subscription representation 591 subscription := &Subscription{ 592 UserDID: userDID, 593 CommunityDID: communityDID, 594 ContentVisibility: contentVisibility, 595 SubscribedAt: time.Now(), 596 RecordURI: recordURI, 597 RecordCID: recordCID, 598 } 599 600 return subscription, nil 601} 602 603// UnsubscribeFromCommunity removes a subscription via PDS delete 604func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error { 605 if userDID == "" { 606 return NewValidationError("userDid", "required") 607 } 608 if userAccessToken == "" { 609 return NewValidationError("userAccessToken", "required") 610 } 611 612 // Resolve community identifier 613 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 614 if err != nil { 615 return err 616 } 617 618 // Get the subscription from AppView to find the record key 619 subscription, err := s.repo.GetSubscription(ctx, userDID, communityDID) 620 if err != nil { 621 return err 622 } 623 624 // Extract rkey from record URI (at://did/collection/rkey) 625 rkey := utils.ExtractRKeyFromURI(subscription.RecordURI) 626 if rkey == "" { 627 return fmt.Errorf("invalid subscription record URI") 628 } 629 630 // Write-forward: delete record from PDS using user's access token 631 // CRITICAL: Delete from social.coves.community.subscription (RECORD TYPE), not social.coves.community.unsubscribe 632 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", rkey, userAccessToken); err != nil { 633 return fmt.Errorf("failed to delete subscription on PDS: %w", err) 634 } 635 636 return nil 637} 638 639// GetUserSubscriptions queries AppView DB for user's subscriptions 640func (s *communityService) GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) { 641 if limit <= 0 || limit > 100 { 642 limit = 50 643 } 644 645 return s.repo.ListSubscriptions(ctx, userDID, limit, offset) 646} 647 648// GetCommunitySubscribers queries AppView DB for community subscribers 649func (s *communityService) GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) { 650 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 651 if err != nil { 652 return nil, err 653 } 654 655 if limit <= 0 || limit > 100 { 656 limit = 50 657 } 658 659 return s.repo.ListSubscribers(ctx, communityDID, limit, offset) 660} 661 662// GetMembership retrieves membership info from AppView DB 663func (s *communityService) GetMembership(ctx context.Context, userDID, communityIdentifier string) (*Membership, error) { 664 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 665 if err != nil { 666 return nil, err 667 } 668 669 return s.repo.GetMembership(ctx, userDID, communityDID) 670} 671 672// ListCommunityMembers queries AppView DB for members 673func (s *communityService) ListCommunityMembers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Membership, error) { 674 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 675 if err != nil { 676 return nil, err 677 } 678 679 if limit <= 0 || limit > 100 { 680 limit = 50 681 } 682 683 return s.repo.ListMembers(ctx, communityDID, limit, offset) 684} 685 686// BlockCommunity blocks a community via write-forward to PDS 687func (s *communityService) BlockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) (*CommunityBlock, error) { 688 if userDID == "" { 689 return nil, NewValidationError("userDid", "required") 690 } 691 if userAccessToken == "" { 692 return nil, NewValidationError("userAccessToken", "required") 693 } 694 695 // Resolve community identifier (also verifies community exists) 696 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 697 if err != nil { 698 return nil, err 699 } 700 701 // Build block record 702 // CRITICAL: Collection is social.coves.community.block (RECORD TYPE) 703 // This record will be created in the USER's repository: at://user_did/social.coves.community.block/{tid} 704 // Following atProto conventions and Bluesky's app.bsky.graph.block pattern 705 blockRecord := map[string]interface{}{ 706 "$type": "social.coves.community.block", 707 "subject": communityDID, // DID of community being blocked 708 "createdAt": time.Now().Format(time.RFC3339), 709 } 710 711 // Write-forward: create block record in user's repo using their access token 712 // Note: We don't check for existing blocks first because: 713 // 1. The PDS may reject duplicates (depending on implementation) 714 // 2. The repository layer handles idempotency with ON CONFLICT DO NOTHING 715 // 3. This avoids a race condition where two concurrent requests both pass the check 716 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.block", "", blockRecord, userAccessToken) 717 if err != nil { 718 // Check if this is a duplicate/conflict error from PDS 719 // PDS should return 409 Conflict for duplicate records, but we also check common error messages 720 // for compatibility with different PDS implementations 721 errMsg := err.Error() 722 isDuplicate := strings.Contains(errMsg, "status 409") || // HTTP 409 Conflict 723 strings.Contains(errMsg, "duplicate") || 724 strings.Contains(errMsg, "already exists") || 725 strings.Contains(errMsg, "AlreadyExists") 726 727 if isDuplicate { 728 // Fetch and return existing block from our indexed view 729 existingBlock, getErr := s.repo.GetBlock(ctx, userDID, communityDID) 730 if getErr == nil { 731 // Block exists in our index - return it 732 return existingBlock, nil 733 } 734 // Only treat as "already exists" if the error is ErrBlockNotFound (race condition) 735 // Any other error (DB outage, connection failure, etc.) should bubble up 736 if errors.Is(getErr, ErrBlockNotFound) { 737 // Race condition: PDS has the block but Jetstream hasn't indexed it yet 738 // Return typed conflict error so handler can return 409 instead of 500 739 // This is normal in eventually-consistent systems 740 return nil, ErrBlockAlreadyExists 741 } 742 // Real datastore error - bubble it up so operators see the failure 743 return nil, fmt.Errorf("PDS reported duplicate block but failed to fetch from index: %w", getErr) 744 } 745 return nil, fmt.Errorf("failed to create block on PDS: %w", err) 746 } 747 748 // Return block representation 749 block := &CommunityBlock{ 750 UserDID: userDID, 751 CommunityDID: communityDID, 752 BlockedAt: time.Now(), 753 RecordURI: recordURI, 754 RecordCID: recordCID, 755 } 756 757 return block, nil 758} 759 760// UnblockCommunity removes a block via PDS delete 761func (s *communityService) UnblockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error { 762 if userDID == "" { 763 return NewValidationError("userDid", "required") 764 } 765 if userAccessToken == "" { 766 return NewValidationError("userAccessToken", "required") 767 } 768 769 // Resolve community identifier 770 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 771 if err != nil { 772 return err 773 } 774 775 // Get the block from AppView to find the record key 776 block, err := s.repo.GetBlock(ctx, userDID, communityDID) 777 if err != nil { 778 return err 779 } 780 781 // Extract rkey from record URI (at://did/collection/rkey) 782 rkey := utils.ExtractRKeyFromURI(block.RecordURI) 783 if rkey == "" { 784 return fmt.Errorf("invalid block record URI") 785 } 786 787 // Write-forward: delete record from PDS using user's access token 788 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.block", rkey, userAccessToken); err != nil { 789 return fmt.Errorf("failed to delete block on PDS: %w", err) 790 } 791 792 return nil 793} 794 795// GetBlockedCommunities queries AppView DB for user's blocks 796func (s *communityService) GetBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*CommunityBlock, error) { 797 if limit <= 0 || limit > 100 { 798 limit = 50 799 } 800 801 return s.repo.ListBlockedCommunities(ctx, userDID, limit, offset) 802} 803 804// IsBlocked checks if a user has blocked a community 805func (s *communityService) IsBlocked(ctx context.Context, userDID, communityIdentifier string) (bool, error) { 806 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 807 if err != nil { 808 return false, err 809 } 810 811 return s.repo.IsBlocked(ctx, userDID, communityDID) 812} 813 814// ValidateHandle checks if a community handle is valid 815func (s *communityService) ValidateHandle(handle string) error { 816 if handle == "" { 817 return NewValidationError("handle", "required") 818 } 819 820 if !communityHandleRegex.MatchString(handle) { 821 return ErrInvalidHandle 822 } 823 824 return nil 825} 826 827// ResolveCommunityIdentifier converts a handle or DID to a DID 828func (s *communityService) ResolveCommunityIdentifier(ctx context.Context, identifier string) (string, error) { 829 if identifier == "" { 830 return "", ErrInvalidInput 831 } 832 833 // If it's already a DID, verify the community exists 834 if strings.HasPrefix(identifier, "did:") { 835 _, err := s.repo.GetByDID(ctx, identifier) 836 if err != nil { 837 if IsNotFound(err) { 838 return "", fmt.Errorf("community not found: %w", err) 839 } 840 return "", fmt.Errorf("failed to verify community DID: %w", err) 841 } 842 return identifier, nil 843 } 844 845 // If it's a handle, look it up in AppView DB 846 if strings.HasPrefix(identifier, "!") { 847 community, err := s.repo.GetByHandle(ctx, identifier) 848 if err != nil { 849 return "", err 850 } 851 return community.DID, nil 852 } 853 854 return "", NewValidationError("identifier", "must be a DID or handle") 855} 856 857// Validation helpers 858 859func (s *communityService) validateCreateRequest(req CreateCommunityRequest) error { 860 if req.Name == "" { 861 return NewValidationError("name", "required") 862 } 863 864 // DNS label limit: 63 characters per label 865 // Community handle format: {name}.communities.{instanceDomain} 866 // The first label is just req.Name, so it must be <= 63 chars 867 if len(req.Name) > 63 { 868 return NewValidationError("name", "must be 63 characters or less (DNS label limit)") 869 } 870 871 // Name can only contain alphanumeric and hyphens 872 // Must start and end with alphanumeric (not hyphen) 873 nameRegex := regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 874 if !nameRegex.MatchString(req.Name) { 875 return NewValidationError("name", "must contain only alphanumeric characters and hyphens") 876 } 877 878 if req.Description != "" && len(req.Description) > 3000 { 879 return NewValidationError("description", "must be 3000 characters or less") 880 } 881 882 // Visibility should already be set with default in CreateCommunity 883 if req.Visibility != "public" && req.Visibility != "unlisted" && req.Visibility != "private" { 884 return ErrInvalidVisibility 885 } 886 887 if req.CreatedByDID == "" { 888 return NewValidationError("createdByDid", "required") 889 } 890 891 // hostedByDID is auto-populated by the service layer, no validation needed 892 // The handler ensures clients cannot provide this field 893 894 return nil 895} 896 897// PDS write-forward helpers 898 899// createRecordOnPDSAs creates a record with a specific access token (for V2 community auth) 900func (s *communityService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) { 901 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/")) 902 903 payload := map[string]interface{}{ 904 "repo": repoDID, 905 "collection": collection, 906 "record": record, 907 } 908 909 if rkey != "" { 910 payload["rkey"] = rkey 911 } 912 913 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 914} 915 916// putRecordOnPDSAs updates a record with a specific access token (for V2 community auth) 917func (s *communityService) putRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) { 918 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.putRecord", strings.TrimSuffix(s.pdsURL, "/")) 919 920 payload := map[string]interface{}{ 921 "repo": repoDID, 922 "collection": collection, 923 "rkey": rkey, 924 "record": record, 925 } 926 927 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 928} 929 930// deleteRecordOnPDSAs deletes a record with a specific access token (for user-scoped deletions) 931func (s *communityService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey, accessToken string) error { 932 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/")) 933 934 payload := map[string]interface{}{ 935 "repo": repoDID, 936 "collection": collection, 937 "rkey": rkey, 938 } 939 940 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 941 return err 942} 943 944// callPDSWithAuth makes a PDS call with a specific access token (V2: for community authentication) 945func (s *communityService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) { 946 jsonData, err := json.Marshal(payload) 947 if err != nil { 948 return "", "", fmt.Errorf("failed to marshal payload: %w", err) 949 } 950 951 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData)) 952 if err != nil { 953 return "", "", fmt.Errorf("failed to create request: %w", err) 954 } 955 req.Header.Set("Content-Type", "application/json") 956 957 // Add authentication with provided access token 958 if accessToken != "" { 959 req.Header.Set("Authorization", "Bearer "+accessToken) 960 } 961 962 // Dynamic timeout based on operation type 963 // Write operations (createAccount, createRecord, putRecord) are slower due to: 964 // - Keypair generation 965 // - DID PLC registration 966 // - Database writes on PDS 967 timeout := 10 * time.Second // Default for read operations 968 if strings.Contains(endpoint, "createAccount") || 969 strings.Contains(endpoint, "createRecord") || 970 strings.Contains(endpoint, "putRecord") { 971 timeout = 30 * time.Second // Extended timeout for write operations 972 } 973 974 client := &http.Client{Timeout: timeout} 975 resp, err := client.Do(req) 976 if err != nil { 977 return "", "", fmt.Errorf("failed to call PDS: %w", err) 978 } 979 defer func() { 980 if closeErr := resp.Body.Close(); closeErr != nil { 981 log.Printf("Failed to close response body: %v", closeErr) 982 } 983 }() 984 985 body, err := io.ReadAll(resp.Body) 986 if err != nil { 987 return "", "", fmt.Errorf("failed to read response: %w", err) 988 } 989 990 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 991 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 992 } 993 994 // Parse response to extract URI and CID 995 var result struct { 996 URI string `json:"uri"` 997 CID string `json:"cid"` 998 } 999 if err := json.Unmarshal(body, &result); err != nil { 1000 // For delete operations, there might not be a response body 1001 if method == "POST" && strings.Contains(endpoint, "deleteRecord") { 1002 return "", "", nil 1003 } 1004 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 1005 } 1006 1007 return result.URI, result.CID, nil 1008} 1009 1010// Helper functions