A community based topic aggregation platform built on atproto
1package communities 2 3import ( 4 "Coves/internal/atproto/utils" 5 "bytes" 6 "context" 7 "encoding/json" 8 "errors" 9 "fmt" 10 "io" 11 "log" 12 "net/http" 13 "regexp" 14 "strings" 15 "sync" 16 "time" 17) 18 19// Community handle validation regex (DNS-valid handle: name.community.instance.com) 20// Matches standard DNS hostname format (RFC 1035) 21var communityHandleRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 22 23// DNS label validation (RFC 1035: 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen) 24var dnsLabelRegex = regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 25 26// Domain validation (simplified - checks for valid DNS hostname structure) 27var domainRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)*[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 28 29type communityService struct { 30 // Interfaces and pointers first (better alignment) 31 repo Repository 32 provisioner *PDSAccountProvisioner 33 34 // Token refresh concurrency control 35 // Each community gets its own mutex to prevent concurrent refresh attempts 36 refreshMutexes map[string]*sync.Mutex 37 38 // Strings 39 pdsURL string 40 instanceDID string 41 instanceDomain string 42 pdsAccessToken string 43 44 // Sync primitives last 45 mapMutex sync.RWMutex // Protects refreshMutexes map itself 46} 47 48const ( 49 // Maximum recommended size for mutex cache (warning threshold, not hard limit) 50 // At 10,000 entries × 16 bytes = ~160KB memory (negligible overhead) 51 // Map can grow larger in production - even 100,000 entries = 1.6MB is acceptable 52 maxMutexCacheSize = 10000 53) 54 55// NewCommunityService creates a new community service 56func NewCommunityService(repo Repository, pdsURL, instanceDID, instanceDomain string, provisioner *PDSAccountProvisioner) Service { 57 // SECURITY: Basic validation that did:web domain matches configured instanceDomain 58 // This catches honest configuration mistakes but NOT malicious code modifications 59 // Full verification (Phase 2) requires fetching DID document from domain 60 // See: docs/PRD_BACKLOG.md - "did:web Domain Verification" 61 if strings.HasPrefix(instanceDID, "did:web:") { 62 didDomain := strings.TrimPrefix(instanceDID, "did:web:") 63 if didDomain != instanceDomain { 64 log.Printf("⚠️ SECURITY WARNING: Instance DID domain (%s) doesn't match configured domain (%s)", 65 didDomain, instanceDomain) 66 log.Printf(" This could indicate a configuration error or potential domain spoofing attempt") 67 log.Printf(" Communities will be hosted by: %s", instanceDID) 68 } 69 } 70 71 return &communityService{ 72 repo: repo, 73 pdsURL: pdsURL, 74 instanceDID: instanceDID, 75 instanceDomain: instanceDomain, 76 provisioner: provisioner, 77 refreshMutexes: make(map[string]*sync.Mutex), 78 } 79} 80 81// SetPDSAccessToken sets the PDS access token for authentication 82// This should be called after creating a session for the Coves instance DID on the PDS 83func (s *communityService) SetPDSAccessToken(token string) { 84 s.pdsAccessToken = token 85} 86 87// CreateCommunity creates a new community via write-forward to PDS 88// V2 Flow: 89// 1. Service creates PDS account for community (PDS generates signing keypair) 90// 2. Service writes community profile to COMMUNITY's own repository 91// 3. Firehose emits event 92// 4. Consumer indexes to AppView DB 93// 94// V2 Architecture: 95// - Community owns its own repository (at://community_did/social.coves.community.profile/self) 96// - PDS manages the signing keypair (we never see it) 97// - We store PDS credentials to act on behalf of the community 98// - Community can migrate to other instances (future V2.1 with rotation keys) 99func (s *communityService) CreateCommunity(ctx context.Context, req CreateCommunityRequest) (*Community, error) { 100 // Apply defaults before validation 101 if req.Visibility == "" { 102 req.Visibility = "public" 103 } 104 105 // SECURITY: Auto-populate hostedByDID from instance configuration 106 // Clients MUST NOT provide this field - it's derived from the instance receiving the request 107 // This prevents malicious instances from claiming to host communities for domains they don't own 108 req.HostedByDID = s.instanceDID 109 110 // Validate request 111 if err := s.validateCreateRequest(req); err != nil { 112 return nil, err 113 } 114 115 // V2: Provision a real PDS account for this community 116 // This calls com.atproto.server.createAccount internally 117 // The PDS will: 118 // 1. Generate a signing keypair (stored in PDS, we never see it) 119 // 2. Create a DID (did:plc:xxx) 120 // 3. Return credentials (DID, tokens) 121 pdsAccount, err := s.provisioner.ProvisionCommunityAccount(ctx, req.Name) 122 if err != nil { 123 return nil, fmt.Errorf("failed to provision PDS account for community: %w", err) 124 } 125 126 // Validate the atProto handle 127 if validateErr := s.ValidateHandle(pdsAccount.Handle); validateErr != nil { 128 return nil, fmt.Errorf("generated atProto handle is invalid: %w", validateErr) 129 } 130 131 // Build community profile record 132 profile := map[string]interface{}{ 133 "$type": "social.coves.community.profile", 134 "name": req.Name, // Short name for !mentions (e.g., "gaming") 135 "visibility": req.Visibility, 136 "hostedBy": s.instanceDID, // V2: Instance hosts, community owns 137 "createdBy": req.CreatedByDID, 138 "createdAt": time.Now().Format(time.RFC3339), 139 "federation": map[string]interface{}{ 140 "allowExternalDiscovery": req.AllowExternalDiscovery, 141 }, 142 } 143 144 // Add optional fields 145 if req.DisplayName != "" { 146 profile["displayName"] = req.DisplayName 147 } 148 if req.Description != "" { 149 profile["description"] = req.Description 150 } 151 if len(req.Rules) > 0 { 152 profile["rules"] = req.Rules 153 } 154 if len(req.Categories) > 0 { 155 profile["categories"] = req.Categories 156 } 157 if req.Language != "" { 158 profile["language"] = req.Language 159 } 160 161 // TODO: Handle avatar and banner blobs 162 // For now, we'll skip blob uploads. This would require: 163 // 1. Upload blob to PDS via com.atproto.repo.uploadBlob 164 // 2. Get blob ref (CID) 165 // 3. Add to profile record 166 167 // V2: Write to COMMUNITY's own repository (not instance repo!) 168 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self 169 // Authenticate using community's access token 170 recordURI, recordCID, err := s.createRecordOnPDSAs( 171 ctx, 172 pdsAccount.DID, // repo = community's DID (community owns its repo!) 173 "social.coves.community.profile", 174 "self", // canonical rkey for profile 175 profile, 176 pdsAccount.AccessToken, // authenticate as the community 177 ) 178 if err != nil { 179 return nil, fmt.Errorf("failed to create community profile record: %w", err) 180 } 181 182 // Build Community object with PDS credentials AND cryptographic keys 183 community := &Community{ 184 DID: pdsAccount.DID, // Community's DID (owns the repo!) 185 Handle: pdsAccount.Handle, // atProto handle (e.g., gaming.community.coves.social) 186 Name: req.Name, 187 DisplayName: req.DisplayName, 188 Description: req.Description, 189 OwnerDID: pdsAccount.DID, // V2: Community owns itself 190 CreatedByDID: req.CreatedByDID, 191 HostedByDID: req.HostedByDID, 192 PDSEmail: pdsAccount.Email, 193 PDSPassword: pdsAccount.Password, 194 PDSAccessToken: pdsAccount.AccessToken, 195 PDSRefreshToken: pdsAccount.RefreshToken, 196 PDSURL: pdsAccount.PDSURL, 197 Visibility: req.Visibility, 198 AllowExternalDiscovery: req.AllowExternalDiscovery, 199 MemberCount: 0, 200 SubscriberCount: 0, 201 CreatedAt: time.Now(), 202 UpdatedAt: time.Now(), 203 RecordURI: recordURI, 204 RecordCID: recordCID, 205 // V2: Cryptographic keys for portability (will be encrypted by repository) 206 RotationKeyPEM: pdsAccount.RotationKeyPEM, // CRITICAL: Enables DID migration 207 SigningKeyPEM: pdsAccount.SigningKeyPEM, // For atproto operations 208 } 209 210 // CRITICAL: Persist PDS credentials immediately to database 211 // The Jetstream consumer will eventually index the community profile from the firehose, 212 // but it won't have the PDS credentials. We must store them now so we can: 213 // 1. Update the community profile later (using its own credentials) 214 // 2. Re-authenticate if access tokens expire 215 _, err = s.repo.Create(ctx, community) 216 if err != nil { 217 return nil, fmt.Errorf("failed to persist community with credentials: %w", err) 218 } 219 220 return community, nil 221} 222 223// GetCommunity retrieves a community from AppView DB 224// identifier can be either a DID or handle 225func (s *communityService) GetCommunity(ctx context.Context, identifier string) (*Community, error) { 226 if identifier == "" { 227 return nil, ErrInvalidInput 228 } 229 230 // Determine if identifier is DID or handle 231 if strings.HasPrefix(identifier, "did:") { 232 return s.repo.GetByDID(ctx, identifier) 233 } 234 235 if strings.HasPrefix(identifier, "!") { 236 return s.repo.GetByHandle(ctx, identifier) 237 } 238 239 return nil, NewValidationError("identifier", "must be a DID or handle") 240} 241 242// GetByDID retrieves a community by its DID 243// Exported for use by post service when validating community references 244func (s *communityService) GetByDID(ctx context.Context, did string) (*Community, error) { 245 if did == "" { 246 return nil, ErrInvalidInput 247 } 248 249 if !strings.HasPrefix(did, "did:") { 250 return nil, NewValidationError("did", "must be a valid DID") 251 } 252 253 return s.repo.GetByDID(ctx, did) 254} 255 256// UpdateCommunity updates a community via write-forward to PDS 257func (s *communityService) UpdateCommunity(ctx context.Context, req UpdateCommunityRequest) (*Community, error) { 258 if req.CommunityDID == "" { 259 return nil, NewValidationError("communityDid", "required") 260 } 261 262 if req.UpdatedByDID == "" { 263 return nil, NewValidationError("updatedByDid", "required") 264 } 265 266 // Get existing community 267 existing, err := s.repo.GetByDID(ctx, req.CommunityDID) 268 if err != nil { 269 return nil, err 270 } 271 272 // CRITICAL: Ensure fresh PDS access token before write operation 273 // Community PDS tokens expire every ~2 hours and must be refreshed 274 existing, err = s.EnsureFreshToken(ctx, existing) 275 if err != nil { 276 return nil, fmt.Errorf("failed to ensure fresh credentials: %w", err) 277 } 278 279 // Authorization: verify user is the creator 280 // TODO(Communities-Auth): Add moderator check when moderation system is implemented 281 if existing.CreatedByDID != req.UpdatedByDID { 282 return nil, ErrUnauthorized 283 } 284 285 // Build updated profile record (start with existing) 286 profile := map[string]interface{}{ 287 "$type": "social.coves.community.profile", 288 "name": existing.Name, 289 "owner": existing.OwnerDID, 290 "createdBy": existing.CreatedByDID, 291 "hostedBy": existing.HostedByDID, 292 "createdAt": existing.CreatedAt.Format(time.RFC3339), 293 } 294 295 // Apply updates 296 if req.DisplayName != nil { 297 profile["displayName"] = *req.DisplayName 298 } else { 299 profile["displayName"] = existing.DisplayName 300 } 301 302 if req.Description != nil { 303 profile["description"] = *req.Description 304 } else { 305 profile["description"] = existing.Description 306 } 307 308 if req.Visibility != nil { 309 profile["visibility"] = *req.Visibility 310 } else { 311 profile["visibility"] = existing.Visibility 312 } 313 314 if req.AllowExternalDiscovery != nil { 315 profile["federation"] = map[string]interface{}{ 316 "allowExternalDiscovery": *req.AllowExternalDiscovery, 317 } 318 } else { 319 profile["federation"] = map[string]interface{}{ 320 "allowExternalDiscovery": existing.AllowExternalDiscovery, 321 } 322 } 323 324 // Preserve moderation settings (even if empty) 325 // These fields are optional but should not be erased on update 326 if req.ModerationType != nil { 327 profile["moderationType"] = *req.ModerationType 328 } else if existing.ModerationType != "" { 329 profile["moderationType"] = existing.ModerationType 330 } 331 332 if len(req.ContentWarnings) > 0 { 333 profile["contentWarnings"] = req.ContentWarnings 334 } else if len(existing.ContentWarnings) > 0 { 335 profile["contentWarnings"] = existing.ContentWarnings 336 } 337 338 // V2: Community profiles always use "self" as rkey 339 // (No need to extract from URI - it's always "self" for V2 communities) 340 341 // V2 CRITICAL FIX: Write-forward using COMMUNITY's own DID and credentials 342 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self 343 // Authenticate as the community (not as instance!) 344 if existing.PDSAccessToken == "" { 345 return nil, fmt.Errorf("community %s missing PDS credentials - cannot update", existing.DID) 346 } 347 348 recordURI, recordCID, err := s.putRecordOnPDSAs( 349 ctx, 350 existing.DID, // repo = community's own DID (V2!) 351 "social.coves.community.profile", 352 "self", // V2: always "self" 353 profile, 354 existing.PDSAccessToken, // authenticate as the community 355 ) 356 if err != nil { 357 return nil, fmt.Errorf("failed to update community on PDS: %w", err) 358 } 359 360 // Return updated community representation 361 // Actual AppView DB update happens via Jetstream consumer 362 updated := *existing 363 if req.DisplayName != nil { 364 updated.DisplayName = *req.DisplayName 365 } 366 if req.Description != nil { 367 updated.Description = *req.Description 368 } 369 if req.Visibility != nil { 370 updated.Visibility = *req.Visibility 371 } 372 if req.AllowExternalDiscovery != nil { 373 updated.AllowExternalDiscovery = *req.AllowExternalDiscovery 374 } 375 if req.ModerationType != nil { 376 updated.ModerationType = *req.ModerationType 377 } 378 if len(req.ContentWarnings) > 0 { 379 updated.ContentWarnings = req.ContentWarnings 380 } 381 updated.RecordURI = recordURI 382 updated.RecordCID = recordCID 383 updated.UpdatedAt = time.Now() 384 385 return &updated, nil 386} 387 388// getOrCreateRefreshMutex returns a mutex for the given community DID 389// Thread-safe with read-lock fast path for existing entries 390// SAFETY: Does NOT evict entries to avoid race condition where: 391// 1. Thread A holds mutex for community-123 392// 2. Thread B evicts community-123 from map 393// 3. Thread C creates NEW mutex for community-123 394// 4. Now two threads can refresh community-123 concurrently (mutex defeated!) 395func (s *communityService) getOrCreateRefreshMutex(did string) *sync.Mutex { 396 // Fast path: check if mutex already exists (read lock) 397 s.mapMutex.RLock() 398 mutex, exists := s.refreshMutexes[did] 399 s.mapMutex.RUnlock() 400 401 if exists { 402 return mutex 403 } 404 405 // Slow path: create new mutex (write lock) 406 s.mapMutex.Lock() 407 defer s.mapMutex.Unlock() 408 409 // Double-check after acquiring write lock (another goroutine might have created it) 410 mutex, exists = s.refreshMutexes[did] 411 if exists { 412 return mutex 413 } 414 415 // Create new mutex 416 mutex = &sync.Mutex{} 417 s.refreshMutexes[did] = mutex 418 419 // SAFETY: No eviction to prevent race condition 420 // Map will grow beyond maxMutexCacheSize but this is safer than evicting in-use mutexes 421 if len(s.refreshMutexes) > maxMutexCacheSize { 422 memoryKB := len(s.refreshMutexes) * 16 / 1024 423 log.Printf("[TOKEN-REFRESH] WARN: Mutex cache size (%d) exceeds recommended limit (%d) - this is safe but may indicate high community churn. Memory usage: ~%d KB", 424 len(s.refreshMutexes), maxMutexCacheSize, memoryKB) 425 } 426 427 return mutex 428} 429 430// ensureFreshToken checks if a community's access token needs refresh and updates if needed 431// Returns updated community with fresh credentials (or original if no refresh needed) 432// Thread-safe: Uses per-community mutex to prevent concurrent refresh attempts 433// EnsureFreshToken ensures the community's PDS access token is valid 434// Exported for use by post service when writing posts to community repos 435func (s *communityService) EnsureFreshToken(ctx context.Context, community *Community) (*Community, error) { 436 // Get or create mutex for this specific community DID 437 mutex := s.getOrCreateRefreshMutex(community.DID) 438 439 // Lock for this specific community (allows other communities to refresh concurrently) 440 mutex.Lock() 441 defer mutex.Unlock() 442 443 // Re-fetch community from DB (another goroutine might have already refreshed it) 444 fresh, err := s.repo.GetByDID(ctx, community.DID) 445 if err != nil { 446 return nil, fmt.Errorf("failed to re-fetch community: %w", err) 447 } 448 449 // Check if token needs refresh (5-minute buffer before expiration) 450 needsRefresh, err := NeedsRefresh(fresh.PDSAccessToken) 451 if err != nil { 452 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_parse_failed, Error: %v", fresh.DID, err) 453 return nil, fmt.Errorf("failed to check token expiration: %w", err) 454 } 455 456 if !needsRefresh { 457 // Token still valid, no refresh needed 458 return fresh, nil 459 } 460 461 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refresh_started, Message: Access token expiring soon", fresh.DID) 462 463 // Attempt token refresh using refresh token 464 newAccessToken, newRefreshToken, err := refreshPDSToken(ctx, fresh.PDSURL, fresh.PDSAccessToken, fresh.PDSRefreshToken) 465 if err != nil { 466 // Check if refresh token expired (need password fallback) 467 // Match both "ExpiredToken" and "Token has expired" error messages 468 if strings.Contains(strings.ToLower(err.Error()), "expired") { 469 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_token_expired, Message: Re-authenticating with password", fresh.DID) 470 471 // Fallback: Re-authenticate with stored password 472 newAccessToken, newRefreshToken, err = reauthenticateWithPassword( 473 ctx, 474 fresh.PDSURL, 475 fresh.PDSEmail, 476 fresh.PDSPassword, // Retrieved decrypted from DB 477 ) 478 if err != nil { 479 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_auth_failed, Error: %v", fresh.DID, err) 480 return nil, fmt.Errorf("failed to re-authenticate community: %w", err) 481 } 482 483 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_fallback_success, Message: Re-authenticated after refresh token expiry", fresh.DID) 484 } else { 485 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_failed, Error: %v", fresh.DID, err) 486 return nil, fmt.Errorf("failed to refresh token: %w", err) 487 } 488 } 489 490 // CRITICAL: Update database with new tokens immediately 491 // Refresh tokens are SINGLE-USE - old one is now invalid 492 // Use retry logic to handle transient DB failures 493 const maxRetries = 3 494 var updateErr error 495 for attempt := 0; attempt < maxRetries; attempt++ { 496 updateErr = s.repo.UpdateCredentials(ctx, fresh.DID, newAccessToken, newRefreshToken) 497 if updateErr == nil { 498 break // Success 499 } 500 501 log.Printf("[TOKEN-REFRESH] Community: %s, Event: db_update_retry, Attempt: %d/%d, Error: %v", 502 fresh.DID, attempt+1, maxRetries, updateErr) 503 504 if attempt < maxRetries-1 { 505 // Exponential backoff: 100ms, 200ms, 400ms 506 backoff := time.Duration(1<<attempt) * 100 * time.Millisecond 507 time.Sleep(backoff) 508 } 509 } 510 511 if updateErr != nil { 512 // CRITICAL: Community is now locked out - old refresh token invalid, new one not saved 513 log.Printf("[TOKEN-REFRESH] CRITICAL: Community %s LOCKED OUT - failed to persist credentials after %d retries: %v", 514 fresh.DID, maxRetries, updateErr) 515 // TODO: Send alert to monitoring system (add in Beta) 516 return nil, fmt.Errorf("failed to persist refreshed credentials after %d retries (COMMUNITY LOCKED OUT): %w", 517 maxRetries, updateErr) 518 } 519 520 // Return updated community object with fresh tokens 521 updatedCommunity := *fresh 522 updatedCommunity.PDSAccessToken = newAccessToken 523 updatedCommunity.PDSRefreshToken = newRefreshToken 524 525 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refreshed, Message: Access token refreshed successfully", fresh.DID) 526 527 return &updatedCommunity, nil 528} 529 530// ListCommunities queries AppView DB for communities with filters 531func (s *communityService) ListCommunities(ctx context.Context, req ListCommunitiesRequest) ([]*Community, error) { 532 // Set defaults 533 if req.Limit <= 0 || req.Limit > 100 { 534 req.Limit = 50 535 } 536 537 return s.repo.List(ctx, req) 538} 539 540// SearchCommunities performs fuzzy search in AppView DB 541func (s *communityService) SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error) { 542 if req.Query == "" { 543 return nil, 0, NewValidationError("query", "search query is required") 544 } 545 546 // Set defaults 547 if req.Limit <= 0 || req.Limit > 100 { 548 req.Limit = 50 549 } 550 551 return s.repo.Search(ctx, req) 552} 553 554// SubscribeToCommunity creates a subscription via write-forward to PDS 555func (s *communityService) SubscribeToCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string, contentVisibility int) (*Subscription, error) { 556 if userDID == "" { 557 return nil, NewValidationError("userDid", "required") 558 } 559 if userAccessToken == "" { 560 return nil, NewValidationError("userAccessToken", "required") 561 } 562 563 // Clamp contentVisibility to valid range (1-5), default to 3 if 0 or invalid 564 if contentVisibility <= 0 || contentVisibility > 5 { 565 contentVisibility = 3 566 } 567 568 // Resolve community identifier to DID 569 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 570 if err != nil { 571 return nil, err 572 } 573 574 // Verify community exists 575 community, err := s.repo.GetByDID(ctx, communityDID) 576 if err != nil { 577 return nil, err 578 } 579 580 // Check visibility - can't subscribe to private communities without invitation (TODO) 581 if community.Visibility == "private" { 582 return nil, ErrUnauthorized 583 } 584 585 // Build subscription record 586 // CRITICAL: Collection is social.coves.community.subscription (RECORD TYPE), not social.coves.community.subscribe (XRPC procedure) 587 // This record will be created in the USER's repository: at://user_did/social.coves.community.subscription/{tid} 588 // Following atProto conventions, we use "subject" field to reference the community 589 subRecord := map[string]interface{}{ 590 "$type": "social.coves.community.subscription", 591 "subject": communityDID, // atProto convention: "subject" for entity references 592 "createdAt": time.Now().Format(time.RFC3339), 593 "contentVisibility": contentVisibility, 594 } 595 596 // Write-forward: create subscription record in user's repo using their access token 597 // The collection parameter refers to the record type in the repository 598 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", "", subRecord, userAccessToken) 599 if err != nil { 600 return nil, fmt.Errorf("failed to create subscription on PDS: %w", err) 601 } 602 603 // Return subscription representation 604 subscription := &Subscription{ 605 UserDID: userDID, 606 CommunityDID: communityDID, 607 ContentVisibility: contentVisibility, 608 SubscribedAt: time.Now(), 609 RecordURI: recordURI, 610 RecordCID: recordCID, 611 } 612 613 return subscription, nil 614} 615 616// UnsubscribeFromCommunity removes a subscription via PDS delete 617func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error { 618 if userDID == "" { 619 return NewValidationError("userDid", "required") 620 } 621 if userAccessToken == "" { 622 return NewValidationError("userAccessToken", "required") 623 } 624 625 // Resolve community identifier 626 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 627 if err != nil { 628 return err 629 } 630 631 // Get the subscription from AppView to find the record key 632 subscription, err := s.repo.GetSubscription(ctx, userDID, communityDID) 633 if err != nil { 634 return err 635 } 636 637 // Extract rkey from record URI (at://did/collection/rkey) 638 rkey := utils.ExtractRKeyFromURI(subscription.RecordURI) 639 if rkey == "" { 640 return fmt.Errorf("invalid subscription record URI") 641 } 642 643 // Write-forward: delete record from PDS using user's access token 644 // CRITICAL: Delete from social.coves.community.subscription (RECORD TYPE), not social.coves.community.unsubscribe 645 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", rkey, userAccessToken); err != nil { 646 return fmt.Errorf("failed to delete subscription on PDS: %w", err) 647 } 648 649 return nil 650} 651 652// GetUserSubscriptions queries AppView DB for user's subscriptions 653func (s *communityService) GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) { 654 if limit <= 0 || limit > 100 { 655 limit = 50 656 } 657 658 return s.repo.ListSubscriptions(ctx, userDID, limit, offset) 659} 660 661// GetCommunitySubscribers queries AppView DB for community subscribers 662func (s *communityService) GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) { 663 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 664 if err != nil { 665 return nil, err 666 } 667 668 if limit <= 0 || limit > 100 { 669 limit = 50 670 } 671 672 return s.repo.ListSubscribers(ctx, communityDID, limit, offset) 673} 674 675// GetMembership retrieves membership info from AppView DB 676func (s *communityService) GetMembership(ctx context.Context, userDID, communityIdentifier string) (*Membership, error) { 677 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 678 if err != nil { 679 return nil, err 680 } 681 682 return s.repo.GetMembership(ctx, userDID, communityDID) 683} 684 685// ListCommunityMembers queries AppView DB for members 686func (s *communityService) ListCommunityMembers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Membership, error) { 687 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 688 if err != nil { 689 return nil, err 690 } 691 692 if limit <= 0 || limit > 100 { 693 limit = 50 694 } 695 696 return s.repo.ListMembers(ctx, communityDID, limit, offset) 697} 698 699// BlockCommunity blocks a community via write-forward to PDS 700func (s *communityService) BlockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) (*CommunityBlock, error) { 701 if userDID == "" { 702 return nil, NewValidationError("userDid", "required") 703 } 704 if userAccessToken == "" { 705 return nil, NewValidationError("userAccessToken", "required") 706 } 707 708 // Resolve community identifier (also verifies community exists) 709 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 710 if err != nil { 711 return nil, err 712 } 713 714 // Build block record 715 // CRITICAL: Collection is social.coves.community.block (RECORD TYPE) 716 // This record will be created in the USER's repository: at://user_did/social.coves.community.block/{tid} 717 // Following atProto conventions and Bluesky's app.bsky.graph.block pattern 718 blockRecord := map[string]interface{}{ 719 "$type": "social.coves.community.block", 720 "subject": communityDID, // DID of community being blocked 721 "createdAt": time.Now().Format(time.RFC3339), 722 } 723 724 // Write-forward: create block record in user's repo using their access token 725 // Note: We don't check for existing blocks first because: 726 // 1. The PDS may reject duplicates (depending on implementation) 727 // 2. The repository layer handles idempotency with ON CONFLICT DO NOTHING 728 // 3. This avoids a race condition where two concurrent requests both pass the check 729 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.block", "", blockRecord, userAccessToken) 730 if err != nil { 731 // Check if this is a duplicate/conflict error from PDS 732 // PDS should return 409 Conflict for duplicate records, but we also check common error messages 733 // for compatibility with different PDS implementations 734 errMsg := err.Error() 735 isDuplicate := strings.Contains(errMsg, "status 409") || // HTTP 409 Conflict 736 strings.Contains(errMsg, "duplicate") || 737 strings.Contains(errMsg, "already exists") || 738 strings.Contains(errMsg, "AlreadyExists") 739 740 if isDuplicate { 741 // Fetch and return existing block from our indexed view 742 existingBlock, getErr := s.repo.GetBlock(ctx, userDID, communityDID) 743 if getErr == nil { 744 // Block exists in our index - return it 745 return existingBlock, nil 746 } 747 // Only treat as "already exists" if the error is ErrBlockNotFound (race condition) 748 // Any other error (DB outage, connection failure, etc.) should bubble up 749 if errors.Is(getErr, ErrBlockNotFound) { 750 // Race condition: PDS has the block but Jetstream hasn't indexed it yet 751 // Return typed conflict error so handler can return 409 instead of 500 752 // This is normal in eventually-consistent systems 753 return nil, ErrBlockAlreadyExists 754 } 755 // Real datastore error - bubble it up so operators see the failure 756 return nil, fmt.Errorf("PDS reported duplicate block but failed to fetch from index: %w", getErr) 757 } 758 return nil, fmt.Errorf("failed to create block on PDS: %w", err) 759 } 760 761 // Return block representation 762 block := &CommunityBlock{ 763 UserDID: userDID, 764 CommunityDID: communityDID, 765 BlockedAt: time.Now(), 766 RecordURI: recordURI, 767 RecordCID: recordCID, 768 } 769 770 return block, nil 771} 772 773// UnblockCommunity removes a block via PDS delete 774func (s *communityService) UnblockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error { 775 if userDID == "" { 776 return NewValidationError("userDid", "required") 777 } 778 if userAccessToken == "" { 779 return NewValidationError("userAccessToken", "required") 780 } 781 782 // Resolve community identifier 783 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 784 if err != nil { 785 return err 786 } 787 788 // Get the block from AppView to find the record key 789 block, err := s.repo.GetBlock(ctx, userDID, communityDID) 790 if err != nil { 791 return err 792 } 793 794 // Extract rkey from record URI (at://did/collection/rkey) 795 rkey := utils.ExtractRKeyFromURI(block.RecordURI) 796 if rkey == "" { 797 return fmt.Errorf("invalid block record URI") 798 } 799 800 // Write-forward: delete record from PDS using user's access token 801 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.block", rkey, userAccessToken); err != nil { 802 return fmt.Errorf("failed to delete block on PDS: %w", err) 803 } 804 805 return nil 806} 807 808// GetBlockedCommunities queries AppView DB for user's blocks 809func (s *communityService) GetBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*CommunityBlock, error) { 810 if limit <= 0 || limit > 100 { 811 limit = 50 812 } 813 814 return s.repo.ListBlockedCommunities(ctx, userDID, limit, offset) 815} 816 817// IsBlocked checks if a user has blocked a community 818func (s *communityService) IsBlocked(ctx context.Context, userDID, communityIdentifier string) (bool, error) { 819 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 820 if err != nil { 821 return false, err 822 } 823 824 return s.repo.IsBlocked(ctx, userDID, communityDID) 825} 826 827// ValidateHandle checks if a community handle is valid 828func (s *communityService) ValidateHandle(handle string) error { 829 if handle == "" { 830 return NewValidationError("handle", "required") 831 } 832 833 if !communityHandleRegex.MatchString(handle) { 834 return ErrInvalidHandle 835 } 836 837 return nil 838} 839 840// ResolveCommunityIdentifier converts a community identifier to a DID 841// Following Bluesky's pattern with Coves extensions: 842// 843// Accepts (like Bluesky's at-identifier): 844// 1. DID: did:plc:abc123 (pass through) 845// 2. Canonical handle: gardening.community.coves.social (atProto standard) 846// 3. At-identifier: @gardening.community.coves.social (strip @ prefix) 847// 848// Coves-specific extensions: 849// 4. Scoped format: !gardening@coves.social (parse and resolve) 850// 851// Returns: DID string 852func (s *communityService) ResolveCommunityIdentifier(ctx context.Context, identifier string) (string, error) { 853 identifier = strings.TrimSpace(identifier) 854 855 if identifier == "" { 856 return "", ErrInvalidInput 857 } 858 859 // 1. DID - verify it exists and return (Bluesky standard) 860 if strings.HasPrefix(identifier, "did:") { 861 _, err := s.repo.GetByDID(ctx, identifier) 862 if err != nil { 863 if IsNotFound(err) { 864 return "", fmt.Errorf("community not found for DID %s: %w", identifier, err) 865 } 866 return "", fmt.Errorf("failed to verify community DID %s: %w", identifier, err) 867 } 868 return identifier, nil 869 } 870 871 // 2. Scoped format: !name@instance (Coves-specific) 872 if strings.HasPrefix(identifier, "!") { 873 return s.resolveScopedIdentifier(ctx, identifier) 874 } 875 876 // 3. At-identifier format: @handle (Bluesky standard - strip @ prefix) 877 identifier = strings.TrimPrefix(identifier, "@") 878 879 // 4. Canonical handle: name.community.instance.com (Bluesky standard) 880 if strings.Contains(identifier, ".") { 881 community, err := s.repo.GetByHandle(ctx, strings.ToLower(identifier)) 882 if err != nil { 883 return "", fmt.Errorf("community not found for handle %s: %w", identifier, err) 884 } 885 return community.DID, nil 886 } 887 888 return "", NewValidationError("identifier", "must be a DID, handle, or scoped identifier (!name@instance)") 889} 890 891// resolveScopedIdentifier handles Coves-specific !name@instance format 892// Formats accepted: 893// 894// !gardening@coves.social -> gardening.community.coves.social 895func (s *communityService) resolveScopedIdentifier(ctx context.Context, scoped string) (string, error) { 896 // Remove ! prefix 897 scoped = strings.TrimPrefix(scoped, "!") 898 899 var name string 900 var instanceDomain string 901 902 // Parse !name@instance 903 if !strings.Contains(scoped, "@") { 904 return "", NewValidationError("identifier", "scoped identifier must include @ symbol (!name@instance)") 905 } 906 907 parts := strings.SplitN(scoped, "@", 2) 908 name = strings.TrimSpace(parts[0]) 909 instanceDomain = strings.TrimSpace(parts[1]) 910 911 // Validate name format 912 if name == "" { 913 return "", NewValidationError("identifier", "community name cannot be empty") 914 } 915 916 // Validate name is a valid DNS label (RFC 1035) 917 // Must be 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen 918 if !isValidDNSLabel(name) { 919 return "", NewValidationError("identifier", "community name must be valid DNS label (alphanumeric and hyphens only, 1-63 chars, cannot start or end with hyphen)") 920 } 921 922 // Validate instance domain format 923 if !isValidDomain(instanceDomain) { 924 return "", NewValidationError("identifier", "invalid instance domain format") 925 } 926 927 // Normalize domain to lowercase (DNS is case-insensitive) 928 // This fixes the bug where !gardening@Coves.social would fail lookup 929 instanceDomain = strings.ToLower(instanceDomain) 930 931 // Validate the instance matches this server 932 if !s.isLocalInstance(instanceDomain) { 933 return "", NewValidationError("identifier", 934 fmt.Sprintf("community is not hosted on this instance (expected @%s)", s.instanceDomain)) 935 } 936 937 // Construct canonical handle: {name}.community.{instanceDomain} 938 // Both name and instanceDomain are normalized to lowercase for consistent DB lookup 939 canonicalHandle := fmt.Sprintf("%s.community.%s", 940 strings.ToLower(name), 941 instanceDomain) // Already normalized to lowercase on line 923 942 943 // Look up by canonical handle 944 community, err := s.repo.GetByHandle(ctx, canonicalHandle) 945 if err != nil { 946 return "", fmt.Errorf("community not found for scoped identifier !%s@%s: %w", name, instanceDomain, err) 947 } 948 949 return community.DID, nil 950} 951 952// isLocalInstance checks if the provided domain matches this instance 953func (s *communityService) isLocalInstance(domain string) bool { 954 // Normalize both domains 955 domain = strings.ToLower(strings.TrimSpace(domain)) 956 instanceDomain := strings.ToLower(s.instanceDomain) 957 958 // Direct match 959 return domain == instanceDomain 960} 961 962// Validation helpers 963 964// isValidDNSLabel validates that a string is a valid DNS label per RFC 1035 965// - 1-63 characters 966// - Alphanumeric and hyphens only 967// - Cannot start or end with hyphen 968func isValidDNSLabel(label string) bool { 969 return dnsLabelRegex.MatchString(label) 970} 971 972// isValidDomain validates that a string is a valid domain name 973// Simplified validation - checks basic DNS hostname structure 974func isValidDomain(domain string) bool { 975 if domain == "" || len(domain) > 253 { 976 return false 977 } 978 return domainRegex.MatchString(domain) 979} 980 981func (s *communityService) validateCreateRequest(req CreateCommunityRequest) error { 982 if req.Name == "" { 983 return NewValidationError("name", "required") 984 } 985 986 // DNS label limit: 63 characters per label 987 // Community handle format: {name}.community.{instanceDomain} 988 // The first label is just req.Name, so it must be <= 63 chars 989 if len(req.Name) > 63 { 990 return NewValidationError("name", "must be 63 characters or less (DNS label limit)") 991 } 992 993 // Name can only contain alphanumeric and hyphens 994 // Must start and end with alphanumeric (not hyphen) 995 nameRegex := regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 996 if !nameRegex.MatchString(req.Name) { 997 return NewValidationError("name", "must contain only alphanumeric characters and hyphens") 998 } 999 1000 if req.Description != "" && len(req.Description) > 3000 { 1001 return NewValidationError("description", "must be 3000 characters or less") 1002 } 1003 1004 // Visibility should already be set with default in CreateCommunity 1005 if req.Visibility != "public" && req.Visibility != "unlisted" && req.Visibility != "private" { 1006 return ErrInvalidVisibility 1007 } 1008 1009 if req.CreatedByDID == "" { 1010 return NewValidationError("createdByDid", "required") 1011 } 1012 1013 // hostedByDID is auto-populated by the service layer, no validation needed 1014 // The handler ensures clients cannot provide this field 1015 1016 return nil 1017} 1018 1019// PDS write-forward helpers 1020 1021// createRecordOnPDSAs creates a record with a specific access token (for V2 community auth) 1022func (s *communityService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) { 1023 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/")) 1024 1025 payload := map[string]interface{}{ 1026 "repo": repoDID, 1027 "collection": collection, 1028 "record": record, 1029 } 1030 1031 if rkey != "" { 1032 payload["rkey"] = rkey 1033 } 1034 1035 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 1036} 1037 1038// putRecordOnPDSAs updates a record with a specific access token (for V2 community auth) 1039func (s *communityService) putRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) { 1040 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.putRecord", strings.TrimSuffix(s.pdsURL, "/")) 1041 1042 payload := map[string]interface{}{ 1043 "repo": repoDID, 1044 "collection": collection, 1045 "rkey": rkey, 1046 "record": record, 1047 } 1048 1049 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 1050} 1051 1052// deleteRecordOnPDSAs deletes a record with a specific access token (for user-scoped deletions) 1053func (s *communityService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey, accessToken string) error { 1054 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/")) 1055 1056 payload := map[string]interface{}{ 1057 "repo": repoDID, 1058 "collection": collection, 1059 "rkey": rkey, 1060 } 1061 1062 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 1063 return err 1064} 1065 1066// callPDSWithAuth makes a PDS call with a specific access token (V2: for community authentication) 1067func (s *communityService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) { 1068 jsonData, err := json.Marshal(payload) 1069 if err != nil { 1070 return "", "", fmt.Errorf("failed to marshal payload: %w", err) 1071 } 1072 1073 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData)) 1074 if err != nil { 1075 return "", "", fmt.Errorf("failed to create request: %w", err) 1076 } 1077 req.Header.Set("Content-Type", "application/json") 1078 1079 // Add authentication with provided access token 1080 if accessToken != "" { 1081 req.Header.Set("Authorization", "Bearer "+accessToken) 1082 } 1083 1084 // Dynamic timeout based on operation type 1085 // Write operations (createAccount, createRecord, putRecord) are slower due to: 1086 // - Keypair generation 1087 // - DID PLC registration 1088 // - Database writes on PDS 1089 timeout := 10 * time.Second // Default for read operations 1090 if strings.Contains(endpoint, "createAccount") || 1091 strings.Contains(endpoint, "createRecord") || 1092 strings.Contains(endpoint, "putRecord") { 1093 timeout = 30 * time.Second // Extended timeout for write operations 1094 } 1095 1096 client := &http.Client{Timeout: timeout} 1097 resp, err := client.Do(req) 1098 if err != nil { 1099 return "", "", fmt.Errorf("failed to call PDS: %w", err) 1100 } 1101 defer func() { 1102 if closeErr := resp.Body.Close(); closeErr != nil { 1103 log.Printf("Failed to close response body: %v", closeErr) 1104 } 1105 }() 1106 1107 body, err := io.ReadAll(resp.Body) 1108 if err != nil { 1109 return "", "", fmt.Errorf("failed to read response: %w", err) 1110 } 1111 1112 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 1113 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 1114 } 1115 1116 // Parse response to extract URI and CID 1117 var result struct { 1118 URI string `json:"uri"` 1119 CID string `json:"cid"` 1120 } 1121 if err := json.Unmarshal(body, &result); err != nil { 1122 // For delete operations, there might not be a response body 1123 if method == "POST" && strings.Contains(endpoint, "deleteRecord") { 1124 return "", "", nil 1125 } 1126 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 1127 } 1128 1129 return result.URI, result.CID, nil 1130} 1131 1132// Helper functions