A community based topic aggregation platform built on atproto
1package communities 2 3import ( 4 "Coves/internal/atproto/utils" 5 "bytes" 6 "context" 7 "encoding/json" 8 "errors" 9 "fmt" 10 "io" 11 "log" 12 "net/http" 13 "regexp" 14 "strings" 15 "sync" 16 "time" 17) 18 19// Community handle validation regex (DNS-valid handle: name.community.instance.com) 20// Matches standard DNS hostname format (RFC 1035) 21var communityHandleRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 22 23// DNS label validation (RFC 1035: 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen) 24var dnsLabelRegex = regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 25 26// Domain validation (simplified - checks for valid DNS hostname structure) 27var domainRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)*[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 28 29type communityService struct { 30 // Interfaces and pointers first (better alignment) 31 repo Repository 32 provisioner *PDSAccountProvisioner 33 34 // Token refresh concurrency control 35 // Each community gets its own mutex to prevent concurrent refresh attempts 36 refreshMutexes map[string]*sync.Mutex 37 38 // Strings 39 pdsURL string 40 instanceDID string 41 instanceDomain string 42 pdsAccessToken string 43 44 // Sync primitives last 45 mapMutex sync.RWMutex // Protects refreshMutexes map itself 46} 47 48const ( 49 // Maximum recommended size for mutex cache (warning threshold, not hard limit) 50 // At 10,000 entries × 16 bytes = ~160KB memory (negligible overhead) 51 // Map can grow larger in production - even 100,000 entries = 1.6MB is acceptable 52 maxMutexCacheSize = 10000 53) 54 55// NewCommunityService creates a new community service 56func NewCommunityService(repo Repository, pdsURL, instanceDID, instanceDomain string, provisioner *PDSAccountProvisioner) Service { 57 // SECURITY: Basic validation that did:web domain matches configured instanceDomain 58 // This catches honest configuration mistakes but NOT malicious code modifications 59 // Full verification (Phase 2) requires fetching DID document from domain 60 // See: docs/PRD_BACKLOG.md - "did:web Domain Verification" 61 if strings.HasPrefix(instanceDID, "did:web:") { 62 didDomain := strings.TrimPrefix(instanceDID, "did:web:") 63 if didDomain != instanceDomain { 64 log.Printf("⚠️ SECURITY WARNING: Instance DID domain (%s) doesn't match configured domain (%s)", 65 didDomain, instanceDomain) 66 log.Printf(" This could indicate a configuration error or potential domain spoofing attempt") 67 log.Printf(" Communities will be hosted by: %s", instanceDID) 68 } 69 } 70 71 return &communityService{ 72 repo: repo, 73 pdsURL: pdsURL, 74 instanceDID: instanceDID, 75 instanceDomain: instanceDomain, 76 provisioner: provisioner, 77 refreshMutexes: make(map[string]*sync.Mutex), 78 } 79} 80 81// SetPDSAccessToken sets the PDS access token for authentication 82// This should be called after creating a session for the Coves instance DID on the PDS 83func (s *communityService) SetPDSAccessToken(token string) { 84 s.pdsAccessToken = token 85} 86 87// CreateCommunity creates a new community via write-forward to PDS 88// V2 Flow: 89// 1. Service creates PDS account for community (PDS generates signing keypair) 90// 2. Service writes community profile to COMMUNITY's own repository 91// 3. Firehose emits event 92// 4. Consumer indexes to AppView DB 93// 94// V2 Architecture: 95// - Community owns its own repository (at://community_did/social.coves.community.profile/self) 96// - PDS manages the signing keypair (we never see it) 97// - We store PDS credentials to act on behalf of the community 98// - Community can migrate to other instances (future V2.1 with rotation keys) 99func (s *communityService) CreateCommunity(ctx context.Context, req CreateCommunityRequest) (*Community, error) { 100 // Apply defaults before validation 101 if req.Visibility == "" { 102 req.Visibility = "public" 103 } 104 105 // SECURITY: Auto-populate hostedByDID from instance configuration 106 // Clients MUST NOT provide this field - it's derived from the instance receiving the request 107 // This prevents malicious instances from claiming to host communities for domains they don't own 108 req.HostedByDID = s.instanceDID 109 110 // Validate request 111 if err := s.validateCreateRequest(req); err != nil { 112 return nil, err 113 } 114 115 // V2: Provision a real PDS account for this community 116 // This calls com.atproto.server.createAccount internally 117 // The PDS will: 118 // 1. Generate a signing keypair (stored in PDS, we never see it) 119 // 2. Create a DID (did:plc:xxx) 120 // 3. Return credentials (DID, tokens) 121 pdsAccount, err := s.provisioner.ProvisionCommunityAccount(ctx, req.Name) 122 if err != nil { 123 return nil, fmt.Errorf("failed to provision PDS account for community: %w", err) 124 } 125 126 // Validate the atProto handle 127 if validateErr := s.ValidateHandle(pdsAccount.Handle); validateErr != nil { 128 return nil, fmt.Errorf("generated atProto handle is invalid: %w", validateErr) 129 } 130 131 // Build community profile record 132 profile := map[string]interface{}{ 133 "$type": "social.coves.community.profile", 134 "handle": pdsAccount.Handle, // atProto handle (e.g., gaming.community.coves.social) 135 "name": req.Name, // Short name for !mentions (e.g., "gaming") 136 "visibility": req.Visibility, 137 "hostedBy": s.instanceDID, // V2: Instance hosts, community owns 138 "createdBy": req.CreatedByDID, 139 "createdAt": time.Now().Format(time.RFC3339), 140 "federation": map[string]interface{}{ 141 "allowExternalDiscovery": req.AllowExternalDiscovery, 142 }, 143 } 144 145 // Add optional fields 146 if req.DisplayName != "" { 147 profile["displayName"] = req.DisplayName 148 } 149 if req.Description != "" { 150 profile["description"] = req.Description 151 } 152 if len(req.Rules) > 0 { 153 profile["rules"] = req.Rules 154 } 155 if len(req.Categories) > 0 { 156 profile["categories"] = req.Categories 157 } 158 if req.Language != "" { 159 profile["language"] = req.Language 160 } 161 162 // Initialize counts 163 profile["memberCount"] = 0 164 profile["subscriberCount"] = 0 165 166 // TODO: Handle avatar and banner blobs 167 // For now, we'll skip blob uploads. This would require: 168 // 1. Upload blob to PDS via com.atproto.repo.uploadBlob 169 // 2. Get blob ref (CID) 170 // 3. Add to profile record 171 172 // V2: Write to COMMUNITY's own repository (not instance repo!) 173 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self 174 // Authenticate using community's access token 175 recordURI, recordCID, err := s.createRecordOnPDSAs( 176 ctx, 177 pdsAccount.DID, // repo = community's DID (community owns its repo!) 178 "social.coves.community.profile", 179 "self", // canonical rkey for profile 180 profile, 181 pdsAccount.AccessToken, // authenticate as the community 182 ) 183 if err != nil { 184 return nil, fmt.Errorf("failed to create community profile record: %w", err) 185 } 186 187 // Build Community object with PDS credentials AND cryptographic keys 188 community := &Community{ 189 DID: pdsAccount.DID, // Community's DID (owns the repo!) 190 Handle: pdsAccount.Handle, // atProto handle (e.g., gaming.community.coves.social) 191 Name: req.Name, 192 DisplayName: req.DisplayName, 193 Description: req.Description, 194 OwnerDID: pdsAccount.DID, // V2: Community owns itself 195 CreatedByDID: req.CreatedByDID, 196 HostedByDID: req.HostedByDID, 197 PDSEmail: pdsAccount.Email, 198 PDSPassword: pdsAccount.Password, 199 PDSAccessToken: pdsAccount.AccessToken, 200 PDSRefreshToken: pdsAccount.RefreshToken, 201 PDSURL: pdsAccount.PDSURL, 202 Visibility: req.Visibility, 203 AllowExternalDiscovery: req.AllowExternalDiscovery, 204 MemberCount: 0, 205 SubscriberCount: 0, 206 CreatedAt: time.Now(), 207 UpdatedAt: time.Now(), 208 RecordURI: recordURI, 209 RecordCID: recordCID, 210 // V2: Cryptographic keys for portability (will be encrypted by repository) 211 RotationKeyPEM: pdsAccount.RotationKeyPEM, // CRITICAL: Enables DID migration 212 SigningKeyPEM: pdsAccount.SigningKeyPEM, // For atproto operations 213 } 214 215 // CRITICAL: Persist PDS credentials immediately to database 216 // The Jetstream consumer will eventually index the community profile from the firehose, 217 // but it won't have the PDS credentials. We must store them now so we can: 218 // 1. Update the community profile later (using its own credentials) 219 // 2. Re-authenticate if access tokens expire 220 _, err = s.repo.Create(ctx, community) 221 if err != nil { 222 return nil, fmt.Errorf("failed to persist community with credentials: %w", err) 223 } 224 225 return community, nil 226} 227 228// GetCommunity retrieves a community from AppView DB 229// identifier can be either a DID or handle 230func (s *communityService) GetCommunity(ctx context.Context, identifier string) (*Community, error) { 231 if identifier == "" { 232 return nil, ErrInvalidInput 233 } 234 235 // Determine if identifier is DID or handle 236 if strings.HasPrefix(identifier, "did:") { 237 return s.repo.GetByDID(ctx, identifier) 238 } 239 240 if strings.HasPrefix(identifier, "!") { 241 return s.repo.GetByHandle(ctx, identifier) 242 } 243 244 return nil, NewValidationError("identifier", "must be a DID or handle") 245} 246 247// UpdateCommunity updates a community via write-forward to PDS 248func (s *communityService) UpdateCommunity(ctx context.Context, req UpdateCommunityRequest) (*Community, error) { 249 if req.CommunityDID == "" { 250 return nil, NewValidationError("communityDid", "required") 251 } 252 253 if req.UpdatedByDID == "" { 254 return nil, NewValidationError("updatedByDid", "required") 255 } 256 257 // Get existing community 258 existing, err := s.repo.GetByDID(ctx, req.CommunityDID) 259 if err != nil { 260 return nil, err 261 } 262 263 // CRITICAL: Ensure fresh PDS access token before write operation 264 // Community PDS tokens expire every ~2 hours and must be refreshed 265 existing, err = s.ensureFreshToken(ctx, existing) 266 if err != nil { 267 return nil, fmt.Errorf("failed to ensure fresh credentials: %w", err) 268 } 269 270 // Authorization: verify user is the creator 271 // TODO(Communities-Auth): Add moderator check when moderation system is implemented 272 if existing.CreatedByDID != req.UpdatedByDID { 273 return nil, ErrUnauthorized 274 } 275 276 // Build updated profile record (start with existing) 277 profile := map[string]interface{}{ 278 "$type": "social.coves.community.profile", 279 "handle": existing.Handle, 280 "name": existing.Name, 281 "owner": existing.OwnerDID, 282 "createdBy": existing.CreatedByDID, 283 "hostedBy": existing.HostedByDID, 284 "createdAt": existing.CreatedAt.Format(time.RFC3339), 285 } 286 287 // Apply updates 288 if req.DisplayName != nil { 289 profile["displayName"] = *req.DisplayName 290 } else { 291 profile["displayName"] = existing.DisplayName 292 } 293 294 if req.Description != nil { 295 profile["description"] = *req.Description 296 } else { 297 profile["description"] = existing.Description 298 } 299 300 if req.Visibility != nil { 301 profile["visibility"] = *req.Visibility 302 } else { 303 profile["visibility"] = existing.Visibility 304 } 305 306 if req.AllowExternalDiscovery != nil { 307 profile["federation"] = map[string]interface{}{ 308 "allowExternalDiscovery": *req.AllowExternalDiscovery, 309 } 310 } else { 311 profile["federation"] = map[string]interface{}{ 312 "allowExternalDiscovery": existing.AllowExternalDiscovery, 313 } 314 } 315 316 // Preserve moderation settings (even if empty) 317 // These fields are optional but should not be erased on update 318 if req.ModerationType != nil { 319 profile["moderationType"] = *req.ModerationType 320 } else if existing.ModerationType != "" { 321 profile["moderationType"] = existing.ModerationType 322 } 323 324 if len(req.ContentWarnings) > 0 { 325 profile["contentWarnings"] = req.ContentWarnings 326 } else if len(existing.ContentWarnings) > 0 { 327 profile["contentWarnings"] = existing.ContentWarnings 328 } 329 330 // Preserve counts 331 profile["memberCount"] = existing.MemberCount 332 profile["subscriberCount"] = existing.SubscriberCount 333 334 // V2: Community profiles always use "self" as rkey 335 // (No need to extract from URI - it's always "self" for V2 communities) 336 337 // V2 CRITICAL FIX: Write-forward using COMMUNITY's own DID and credentials 338 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self 339 // Authenticate as the community (not as instance!) 340 if existing.PDSAccessToken == "" { 341 return nil, fmt.Errorf("community %s missing PDS credentials - cannot update", existing.DID) 342 } 343 344 recordURI, recordCID, err := s.putRecordOnPDSAs( 345 ctx, 346 existing.DID, // repo = community's own DID (V2!) 347 "social.coves.community.profile", 348 "self", // V2: always "self" 349 profile, 350 existing.PDSAccessToken, // authenticate as the community 351 ) 352 if err != nil { 353 return nil, fmt.Errorf("failed to update community on PDS: %w", err) 354 } 355 356 // Return updated community representation 357 // Actual AppView DB update happens via Jetstream consumer 358 updated := *existing 359 if req.DisplayName != nil { 360 updated.DisplayName = *req.DisplayName 361 } 362 if req.Description != nil { 363 updated.Description = *req.Description 364 } 365 if req.Visibility != nil { 366 updated.Visibility = *req.Visibility 367 } 368 if req.AllowExternalDiscovery != nil { 369 updated.AllowExternalDiscovery = *req.AllowExternalDiscovery 370 } 371 if req.ModerationType != nil { 372 updated.ModerationType = *req.ModerationType 373 } 374 if len(req.ContentWarnings) > 0 { 375 updated.ContentWarnings = req.ContentWarnings 376 } 377 updated.RecordURI = recordURI 378 updated.RecordCID = recordCID 379 updated.UpdatedAt = time.Now() 380 381 return &updated, nil 382} 383 384// getOrCreateRefreshMutex returns a mutex for the given community DID 385// Thread-safe with read-lock fast path for existing entries 386// SAFETY: Does NOT evict entries to avoid race condition where: 387// 1. Thread A holds mutex for community-123 388// 2. Thread B evicts community-123 from map 389// 3. Thread C creates NEW mutex for community-123 390// 4. Now two threads can refresh community-123 concurrently (mutex defeated!) 391func (s *communityService) getOrCreateRefreshMutex(did string) *sync.Mutex { 392 // Fast path: check if mutex already exists (read lock) 393 s.mapMutex.RLock() 394 mutex, exists := s.refreshMutexes[did] 395 s.mapMutex.RUnlock() 396 397 if exists { 398 return mutex 399 } 400 401 // Slow path: create new mutex (write lock) 402 s.mapMutex.Lock() 403 defer s.mapMutex.Unlock() 404 405 // Double-check after acquiring write lock (another goroutine might have created it) 406 mutex, exists = s.refreshMutexes[did] 407 if exists { 408 return mutex 409 } 410 411 // Create new mutex 412 mutex = &sync.Mutex{} 413 s.refreshMutexes[did] = mutex 414 415 // SAFETY: No eviction to prevent race condition 416 // Map will grow beyond maxMutexCacheSize but this is safer than evicting in-use mutexes 417 if len(s.refreshMutexes) > maxMutexCacheSize { 418 memoryKB := len(s.refreshMutexes) * 16 / 1024 419 log.Printf("[TOKEN-REFRESH] WARN: Mutex cache size (%d) exceeds recommended limit (%d) - this is safe but may indicate high community churn. Memory usage: ~%d KB", 420 len(s.refreshMutexes), maxMutexCacheSize, memoryKB) 421 } 422 423 return mutex 424} 425 426// ensureFreshToken checks if a community's access token needs refresh and updates if needed 427// Returns updated community with fresh credentials (or original if no refresh needed) 428// Thread-safe: Uses per-community mutex to prevent concurrent refresh attempts 429func (s *communityService) ensureFreshToken(ctx context.Context, community *Community) (*Community, error) { 430 // Get or create mutex for this specific community DID 431 mutex := s.getOrCreateRefreshMutex(community.DID) 432 433 // Lock for this specific community (allows other communities to refresh concurrently) 434 mutex.Lock() 435 defer mutex.Unlock() 436 437 // Re-fetch community from DB (another goroutine might have already refreshed it) 438 fresh, err := s.repo.GetByDID(ctx, community.DID) 439 if err != nil { 440 return nil, fmt.Errorf("failed to re-fetch community: %w", err) 441 } 442 443 // Check if token needs refresh (5-minute buffer before expiration) 444 needsRefresh, err := NeedsRefresh(fresh.PDSAccessToken) 445 if err != nil { 446 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_parse_failed, Error: %v", fresh.DID, err) 447 return nil, fmt.Errorf("failed to check token expiration: %w", err) 448 } 449 450 if !needsRefresh { 451 // Token still valid, no refresh needed 452 return fresh, nil 453 } 454 455 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refresh_started, Message: Access token expiring soon", fresh.DID) 456 457 // Attempt token refresh using refresh token 458 newAccessToken, newRefreshToken, err := refreshPDSToken(ctx, fresh.PDSURL, fresh.PDSAccessToken, fresh.PDSRefreshToken) 459 if err != nil { 460 // Check if refresh token expired (need password fallback) 461 if strings.Contains(err.Error(), "expired or invalid") { 462 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_token_expired, Message: Re-authenticating with password", fresh.DID) 463 464 // Fallback: Re-authenticate with stored password 465 newAccessToken, newRefreshToken, err = reauthenticateWithPassword( 466 ctx, 467 fresh.PDSURL, 468 fresh.PDSEmail, 469 fresh.PDSPassword, // Retrieved decrypted from DB 470 ) 471 if err != nil { 472 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_auth_failed, Error: %v", fresh.DID, err) 473 return nil, fmt.Errorf("failed to re-authenticate community: %w", err) 474 } 475 476 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_fallback_success, Message: Re-authenticated after refresh token expiry", fresh.DID) 477 } else { 478 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_failed, Error: %v", fresh.DID, err) 479 return nil, fmt.Errorf("failed to refresh token: %w", err) 480 } 481 } 482 483 // CRITICAL: Update database with new tokens immediately 484 // Refresh tokens are SINGLE-USE - old one is now invalid 485 // Use retry logic to handle transient DB failures 486 const maxRetries = 3 487 var updateErr error 488 for attempt := 0; attempt < maxRetries; attempt++ { 489 updateErr = s.repo.UpdateCredentials(ctx, fresh.DID, newAccessToken, newRefreshToken) 490 if updateErr == nil { 491 break // Success 492 } 493 494 log.Printf("[TOKEN-REFRESH] Community: %s, Event: db_update_retry, Attempt: %d/%d, Error: %v", 495 fresh.DID, attempt+1, maxRetries, updateErr) 496 497 if attempt < maxRetries-1 { 498 // Exponential backoff: 100ms, 200ms, 400ms 499 backoff := time.Duration(1<<attempt) * 100 * time.Millisecond 500 time.Sleep(backoff) 501 } 502 } 503 504 if updateErr != nil { 505 // CRITICAL: Community is now locked out - old refresh token invalid, new one not saved 506 log.Printf("[TOKEN-REFRESH] CRITICAL: Community %s LOCKED OUT - failed to persist credentials after %d retries: %v", 507 fresh.DID, maxRetries, updateErr) 508 // TODO: Send alert to monitoring system (add in Beta) 509 return nil, fmt.Errorf("failed to persist refreshed credentials after %d retries (COMMUNITY LOCKED OUT): %w", 510 maxRetries, updateErr) 511 } 512 513 // Return updated community object with fresh tokens 514 updatedCommunity := *fresh 515 updatedCommunity.PDSAccessToken = newAccessToken 516 updatedCommunity.PDSRefreshToken = newRefreshToken 517 518 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refreshed, Message: Access token refreshed successfully", fresh.DID) 519 520 return &updatedCommunity, nil 521} 522 523// ListCommunities queries AppView DB for communities with filters 524func (s *communityService) ListCommunities(ctx context.Context, req ListCommunitiesRequest) ([]*Community, int, error) { 525 // Set defaults 526 if req.Limit <= 0 || req.Limit > 100 { 527 req.Limit = 50 528 } 529 530 return s.repo.List(ctx, req) 531} 532 533// SearchCommunities performs fuzzy search in AppView DB 534func (s *communityService) SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error) { 535 if req.Query == "" { 536 return nil, 0, NewValidationError("query", "search query is required") 537 } 538 539 // Set defaults 540 if req.Limit <= 0 || req.Limit > 100 { 541 req.Limit = 50 542 } 543 544 return s.repo.Search(ctx, req) 545} 546 547// SubscribeToCommunity creates a subscription via write-forward to PDS 548func (s *communityService) SubscribeToCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string, contentVisibility int) (*Subscription, error) { 549 if userDID == "" { 550 return nil, NewValidationError("userDid", "required") 551 } 552 if userAccessToken == "" { 553 return nil, NewValidationError("userAccessToken", "required") 554 } 555 556 // Clamp contentVisibility to valid range (1-5), default to 3 if 0 or invalid 557 if contentVisibility <= 0 || contentVisibility > 5 { 558 contentVisibility = 3 559 } 560 561 // Resolve community identifier to DID 562 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 563 if err != nil { 564 return nil, err 565 } 566 567 // Verify community exists 568 community, err := s.repo.GetByDID(ctx, communityDID) 569 if err != nil { 570 return nil, err 571 } 572 573 // Check visibility - can't subscribe to private communities without invitation (TODO) 574 if community.Visibility == "private" { 575 return nil, ErrUnauthorized 576 } 577 578 // Build subscription record 579 // CRITICAL: Collection is social.coves.community.subscription (RECORD TYPE), not social.coves.community.subscribe (XRPC procedure) 580 // This record will be created in the USER's repository: at://user_did/social.coves.community.subscription/{tid} 581 // Following atProto conventions, we use "subject" field to reference the community 582 subRecord := map[string]interface{}{ 583 "$type": "social.coves.community.subscription", 584 "subject": communityDID, // atProto convention: "subject" for entity references 585 "createdAt": time.Now().Format(time.RFC3339), 586 "contentVisibility": contentVisibility, 587 } 588 589 // Write-forward: create subscription record in user's repo using their access token 590 // The collection parameter refers to the record type in the repository 591 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", "", subRecord, userAccessToken) 592 if err != nil { 593 return nil, fmt.Errorf("failed to create subscription on PDS: %w", err) 594 } 595 596 // Return subscription representation 597 subscription := &Subscription{ 598 UserDID: userDID, 599 CommunityDID: communityDID, 600 ContentVisibility: contentVisibility, 601 SubscribedAt: time.Now(), 602 RecordURI: recordURI, 603 RecordCID: recordCID, 604 } 605 606 return subscription, nil 607} 608 609// UnsubscribeFromCommunity removes a subscription via PDS delete 610func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error { 611 if userDID == "" { 612 return NewValidationError("userDid", "required") 613 } 614 if userAccessToken == "" { 615 return NewValidationError("userAccessToken", "required") 616 } 617 618 // Resolve community identifier 619 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 620 if err != nil { 621 return err 622 } 623 624 // Get the subscription from AppView to find the record key 625 subscription, err := s.repo.GetSubscription(ctx, userDID, communityDID) 626 if err != nil { 627 return err 628 } 629 630 // Extract rkey from record URI (at://did/collection/rkey) 631 rkey := utils.ExtractRKeyFromURI(subscription.RecordURI) 632 if rkey == "" { 633 return fmt.Errorf("invalid subscription record URI") 634 } 635 636 // Write-forward: delete record from PDS using user's access token 637 // CRITICAL: Delete from social.coves.community.subscription (RECORD TYPE), not social.coves.community.unsubscribe 638 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", rkey, userAccessToken); err != nil { 639 return fmt.Errorf("failed to delete subscription on PDS: %w", err) 640 } 641 642 return nil 643} 644 645// GetUserSubscriptions queries AppView DB for user's subscriptions 646func (s *communityService) GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) { 647 if limit <= 0 || limit > 100 { 648 limit = 50 649 } 650 651 return s.repo.ListSubscriptions(ctx, userDID, limit, offset) 652} 653 654// GetCommunitySubscribers queries AppView DB for community subscribers 655func (s *communityService) GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) { 656 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 657 if err != nil { 658 return nil, err 659 } 660 661 if limit <= 0 || limit > 100 { 662 limit = 50 663 } 664 665 return s.repo.ListSubscribers(ctx, communityDID, limit, offset) 666} 667 668// GetMembership retrieves membership info from AppView DB 669func (s *communityService) GetMembership(ctx context.Context, userDID, communityIdentifier string) (*Membership, error) { 670 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 671 if err != nil { 672 return nil, err 673 } 674 675 return s.repo.GetMembership(ctx, userDID, communityDID) 676} 677 678// ListCommunityMembers queries AppView DB for members 679func (s *communityService) ListCommunityMembers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Membership, error) { 680 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 681 if err != nil { 682 return nil, err 683 } 684 685 if limit <= 0 || limit > 100 { 686 limit = 50 687 } 688 689 return s.repo.ListMembers(ctx, communityDID, limit, offset) 690} 691 692// BlockCommunity blocks a community via write-forward to PDS 693func (s *communityService) BlockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) (*CommunityBlock, error) { 694 if userDID == "" { 695 return nil, NewValidationError("userDid", "required") 696 } 697 if userAccessToken == "" { 698 return nil, NewValidationError("userAccessToken", "required") 699 } 700 701 // Resolve community identifier (also verifies community exists) 702 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 703 if err != nil { 704 return nil, err 705 } 706 707 // Build block record 708 // CRITICAL: Collection is social.coves.community.block (RECORD TYPE) 709 // This record will be created in the USER's repository: at://user_did/social.coves.community.block/{tid} 710 // Following atProto conventions and Bluesky's app.bsky.graph.block pattern 711 blockRecord := map[string]interface{}{ 712 "$type": "social.coves.community.block", 713 "subject": communityDID, // DID of community being blocked 714 "createdAt": time.Now().Format(time.RFC3339), 715 } 716 717 // Write-forward: create block record in user's repo using their access token 718 // Note: We don't check for existing blocks first because: 719 // 1. The PDS may reject duplicates (depending on implementation) 720 // 2. The repository layer handles idempotency with ON CONFLICT DO NOTHING 721 // 3. This avoids a race condition where two concurrent requests both pass the check 722 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.block", "", blockRecord, userAccessToken) 723 if err != nil { 724 // Check if this is a duplicate/conflict error from PDS 725 // PDS should return 409 Conflict for duplicate records, but we also check common error messages 726 // for compatibility with different PDS implementations 727 errMsg := err.Error() 728 isDuplicate := strings.Contains(errMsg, "status 409") || // HTTP 409 Conflict 729 strings.Contains(errMsg, "duplicate") || 730 strings.Contains(errMsg, "already exists") || 731 strings.Contains(errMsg, "AlreadyExists") 732 733 if isDuplicate { 734 // Fetch and return existing block from our indexed view 735 existingBlock, getErr := s.repo.GetBlock(ctx, userDID, communityDID) 736 if getErr == nil { 737 // Block exists in our index - return it 738 return existingBlock, nil 739 } 740 // Only treat as "already exists" if the error is ErrBlockNotFound (race condition) 741 // Any other error (DB outage, connection failure, etc.) should bubble up 742 if errors.Is(getErr, ErrBlockNotFound) { 743 // Race condition: PDS has the block but Jetstream hasn't indexed it yet 744 // Return typed conflict error so handler can return 409 instead of 500 745 // This is normal in eventually-consistent systems 746 return nil, ErrBlockAlreadyExists 747 } 748 // Real datastore error - bubble it up so operators see the failure 749 return nil, fmt.Errorf("PDS reported duplicate block but failed to fetch from index: %w", getErr) 750 } 751 return nil, fmt.Errorf("failed to create block on PDS: %w", err) 752 } 753 754 // Return block representation 755 block := &CommunityBlock{ 756 UserDID: userDID, 757 CommunityDID: communityDID, 758 BlockedAt: time.Now(), 759 RecordURI: recordURI, 760 RecordCID: recordCID, 761 } 762 763 return block, nil 764} 765 766// UnblockCommunity removes a block via PDS delete 767func (s *communityService) UnblockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error { 768 if userDID == "" { 769 return NewValidationError("userDid", "required") 770 } 771 if userAccessToken == "" { 772 return NewValidationError("userAccessToken", "required") 773 } 774 775 // Resolve community identifier 776 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 777 if err != nil { 778 return err 779 } 780 781 // Get the block from AppView to find the record key 782 block, err := s.repo.GetBlock(ctx, userDID, communityDID) 783 if err != nil { 784 return err 785 } 786 787 // Extract rkey from record URI (at://did/collection/rkey) 788 rkey := utils.ExtractRKeyFromURI(block.RecordURI) 789 if rkey == "" { 790 return fmt.Errorf("invalid block record URI") 791 } 792 793 // Write-forward: delete record from PDS using user's access token 794 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.block", rkey, userAccessToken); err != nil { 795 return fmt.Errorf("failed to delete block on PDS: %w", err) 796 } 797 798 return nil 799} 800 801// GetBlockedCommunities queries AppView DB for user's blocks 802func (s *communityService) GetBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*CommunityBlock, error) { 803 if limit <= 0 || limit > 100 { 804 limit = 50 805 } 806 807 return s.repo.ListBlockedCommunities(ctx, userDID, limit, offset) 808} 809 810// IsBlocked checks if a user has blocked a community 811func (s *communityService) IsBlocked(ctx context.Context, userDID, communityIdentifier string) (bool, error) { 812 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 813 if err != nil { 814 return false, err 815 } 816 817 return s.repo.IsBlocked(ctx, userDID, communityDID) 818} 819 820// ValidateHandle checks if a community handle is valid 821func (s *communityService) ValidateHandle(handle string) error { 822 if handle == "" { 823 return NewValidationError("handle", "required") 824 } 825 826 if !communityHandleRegex.MatchString(handle) { 827 return ErrInvalidHandle 828 } 829 830 return nil 831} 832 833// ResolveCommunityIdentifier converts a community identifier to a DID 834// Following Bluesky's pattern with Coves extensions: 835// 836// Accepts (like Bluesky's at-identifier): 837// 1. DID: did:plc:abc123 (pass through) 838// 2. Canonical handle: gardening.community.coves.social (atProto standard) 839// 3. At-identifier: @gardening.community.coves.social (strip @ prefix) 840// 841// Coves-specific extensions: 842// 4. Scoped format: !gardening@coves.social (parse and resolve) 843// 844// Returns: DID string 845func (s *communityService) ResolveCommunityIdentifier(ctx context.Context, identifier string) (string, error) { 846 identifier = strings.TrimSpace(identifier) 847 848 if identifier == "" { 849 return "", ErrInvalidInput 850 } 851 852 // 1. DID - verify it exists and return (Bluesky standard) 853 if strings.HasPrefix(identifier, "did:") { 854 _, err := s.repo.GetByDID(ctx, identifier) 855 if err != nil { 856 if IsNotFound(err) { 857 return "", fmt.Errorf("community not found for DID %s: %w", identifier, err) 858 } 859 return "", fmt.Errorf("failed to verify community DID %s: %w", identifier, err) 860 } 861 return identifier, nil 862 } 863 864 // 2. Scoped format: !name@instance (Coves-specific) 865 if strings.HasPrefix(identifier, "!") { 866 return s.resolveScopedIdentifier(ctx, identifier) 867 } 868 869 // 3. At-identifier format: @handle (Bluesky standard - strip @ prefix) 870 if strings.HasPrefix(identifier, "@") { 871 identifier = strings.TrimPrefix(identifier, "@") 872 } 873 874 // 4. Canonical handle: name.community.instance.com (Bluesky standard) 875 if strings.Contains(identifier, ".") { 876 community, err := s.repo.GetByHandle(ctx, strings.ToLower(identifier)) 877 if err != nil { 878 return "", fmt.Errorf("community not found for handle %s: %w", identifier, err) 879 } 880 return community.DID, nil 881 } 882 883 return "", NewValidationError("identifier", "must be a DID, handle, or scoped identifier (!name@instance)") 884} 885 886// resolveScopedIdentifier handles Coves-specific !name@instance format 887// Formats accepted: 888// !gardening@coves.social -> gardening.community.coves.social 889func (s *communityService) resolveScopedIdentifier(ctx context.Context, scoped string) (string, error) { 890 // Remove ! prefix 891 scoped = strings.TrimPrefix(scoped, "!") 892 893 var name string 894 var instanceDomain string 895 896 // Parse !name@instance 897 if !strings.Contains(scoped, "@") { 898 return "", NewValidationError("identifier", "scoped identifier must include @ symbol (!name@instance)") 899 } 900 901 parts := strings.SplitN(scoped, "@", 2) 902 name = strings.TrimSpace(parts[0]) 903 instanceDomain = strings.TrimSpace(parts[1]) 904 905 // Validate name format 906 if name == "" { 907 return "", NewValidationError("identifier", "community name cannot be empty") 908 } 909 910 // Validate name is a valid DNS label (RFC 1035) 911 // Must be 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen 912 if !isValidDNSLabel(name) { 913 return "", NewValidationError("identifier", "community name must be valid DNS label (alphanumeric and hyphens only, 1-63 chars, cannot start or end with hyphen)") 914 } 915 916 // Validate instance domain format 917 if !isValidDomain(instanceDomain) { 918 return "", NewValidationError("identifier", "invalid instance domain format") 919 } 920 921 // Normalize domain to lowercase (DNS is case-insensitive) 922 // This fixes the bug where !gardening@Coves.social would fail lookup 923 instanceDomain = strings.ToLower(instanceDomain) 924 925 // Validate the instance matches this server 926 if !s.isLocalInstance(instanceDomain) { 927 return "", NewValidationError("identifier", 928 fmt.Sprintf("community is not hosted on this instance (expected @%s)", s.instanceDomain)) 929 } 930 931 // Construct canonical handle: {name}.community.{instanceDomain} 932 // Both name and instanceDomain are normalized to lowercase for consistent DB lookup 933 canonicalHandle := fmt.Sprintf("%s.community.%s", 934 strings.ToLower(name), 935 instanceDomain) // Already normalized to lowercase on line 923 936 937 // Look up by canonical handle 938 community, err := s.repo.GetByHandle(ctx, canonicalHandle) 939 if err != nil { 940 return "", fmt.Errorf("community not found for scoped identifier !%s@%s: %w", name, instanceDomain, err) 941 } 942 943 return community.DID, nil 944} 945 946// isLocalInstance checks if the provided domain matches this instance 947func (s *communityService) isLocalInstance(domain string) bool { 948 // Normalize both domains 949 domain = strings.ToLower(strings.TrimSpace(domain)) 950 instanceDomain := strings.ToLower(s.instanceDomain) 951 952 // Direct match 953 return domain == instanceDomain 954} 955 956// Validation helpers 957 958// isValidDNSLabel validates that a string is a valid DNS label per RFC 1035 959// - 1-63 characters 960// - Alphanumeric and hyphens only 961// - Cannot start or end with hyphen 962func isValidDNSLabel(label string) bool { 963 return dnsLabelRegex.MatchString(label) 964} 965 966// isValidDomain validates that a string is a valid domain name 967// Simplified validation - checks basic DNS hostname structure 968func isValidDomain(domain string) bool { 969 if domain == "" || len(domain) > 253 { 970 return false 971 } 972 return domainRegex.MatchString(domain) 973} 974 975func (s *communityService) validateCreateRequest(req CreateCommunityRequest) error { 976 if req.Name == "" { 977 return NewValidationError("name", "required") 978 } 979 980 // DNS label limit: 63 characters per label 981 // Community handle format: {name}.community.{instanceDomain} 982 // The first label is just req.Name, so it must be <= 63 chars 983 if len(req.Name) > 63 { 984 return NewValidationError("name", "must be 63 characters or less (DNS label limit)") 985 } 986 987 // Name can only contain alphanumeric and hyphens 988 // Must start and end with alphanumeric (not hyphen) 989 nameRegex := regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 990 if !nameRegex.MatchString(req.Name) { 991 return NewValidationError("name", "must contain only alphanumeric characters and hyphens") 992 } 993 994 if req.Description != "" && len(req.Description) > 3000 { 995 return NewValidationError("description", "must be 3000 characters or less") 996 } 997 998 // Visibility should already be set with default in CreateCommunity 999 if req.Visibility != "public" && req.Visibility != "unlisted" && req.Visibility != "private" { 1000 return ErrInvalidVisibility 1001 } 1002 1003 if req.CreatedByDID == "" { 1004 return NewValidationError("createdByDid", "required") 1005 } 1006 1007 // hostedByDID is auto-populated by the service layer, no validation needed 1008 // The handler ensures clients cannot provide this field 1009 1010 return nil 1011} 1012 1013// PDS write-forward helpers 1014 1015// createRecordOnPDSAs creates a record with a specific access token (for V2 community auth) 1016func (s *communityService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) { 1017 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/")) 1018 1019 payload := map[string]interface{}{ 1020 "repo": repoDID, 1021 "collection": collection, 1022 "record": record, 1023 } 1024 1025 if rkey != "" { 1026 payload["rkey"] = rkey 1027 } 1028 1029 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 1030} 1031 1032// putRecordOnPDSAs updates a record with a specific access token (for V2 community auth) 1033func (s *communityService) putRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) { 1034 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.putRecord", strings.TrimSuffix(s.pdsURL, "/")) 1035 1036 payload := map[string]interface{}{ 1037 "repo": repoDID, 1038 "collection": collection, 1039 "rkey": rkey, 1040 "record": record, 1041 } 1042 1043 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 1044} 1045 1046// deleteRecordOnPDSAs deletes a record with a specific access token (for user-scoped deletions) 1047func (s *communityService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey, accessToken string) error { 1048 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/")) 1049 1050 payload := map[string]interface{}{ 1051 "repo": repoDID, 1052 "collection": collection, 1053 "rkey": rkey, 1054 } 1055 1056 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 1057 return err 1058} 1059 1060// callPDSWithAuth makes a PDS call with a specific access token (V2: for community authentication) 1061func (s *communityService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) { 1062 jsonData, err := json.Marshal(payload) 1063 if err != nil { 1064 return "", "", fmt.Errorf("failed to marshal payload: %w", err) 1065 } 1066 1067 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData)) 1068 if err != nil { 1069 return "", "", fmt.Errorf("failed to create request: %w", err) 1070 } 1071 req.Header.Set("Content-Type", "application/json") 1072 1073 // Add authentication with provided access token 1074 if accessToken != "" { 1075 req.Header.Set("Authorization", "Bearer "+accessToken) 1076 } 1077 1078 // Dynamic timeout based on operation type 1079 // Write operations (createAccount, createRecord, putRecord) are slower due to: 1080 // - Keypair generation 1081 // - DID PLC registration 1082 // - Database writes on PDS 1083 timeout := 10 * time.Second // Default for read operations 1084 if strings.Contains(endpoint, "createAccount") || 1085 strings.Contains(endpoint, "createRecord") || 1086 strings.Contains(endpoint, "putRecord") { 1087 timeout = 30 * time.Second // Extended timeout for write operations 1088 } 1089 1090 client := &http.Client{Timeout: timeout} 1091 resp, err := client.Do(req) 1092 if err != nil { 1093 return "", "", fmt.Errorf("failed to call PDS: %w", err) 1094 } 1095 defer func() { 1096 if closeErr := resp.Body.Close(); closeErr != nil { 1097 log.Printf("Failed to close response body: %v", closeErr) 1098 } 1099 }() 1100 1101 body, err := io.ReadAll(resp.Body) 1102 if err != nil { 1103 return "", "", fmt.Errorf("failed to read response: %w", err) 1104 } 1105 1106 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 1107 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 1108 } 1109 1110 // Parse response to extract URI and CID 1111 var result struct { 1112 URI string `json:"uri"` 1113 CID string `json:"cid"` 1114 } 1115 if err := json.Unmarshal(body, &result); err != nil { 1116 // For delete operations, there might not be a response body 1117 if method == "POST" && strings.Contains(endpoint, "deleteRecord") { 1118 return "", "", nil 1119 } 1120 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 1121 } 1122 1123 return result.URI, result.CID, nil 1124} 1125 1126// Helper functions