A community based topic aggregation platform built on atproto
1package communities 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log" 11 "net/http" 12 "regexp" 13 "strings" 14 "sync" 15 "time" 16 17 "Coves/internal/atproto/utils" 18) 19 20// Community handle validation regex (DNS-valid handle: name.community.instance.com) 21// Matches standard DNS hostname format (RFC 1035) 22var communityHandleRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 23 24// DNS label validation (RFC 1035: 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen) 25var dnsLabelRegex = regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 26 27// Domain validation (simplified - checks for valid DNS hostname structure) 28var domainRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)*[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 29 30type communityService struct { 31 // Interfaces and pointers first (better alignment) 32 repo Repository 33 provisioner *PDSAccountProvisioner 34 35 // Token refresh concurrency control 36 // Each community gets its own mutex to prevent concurrent refresh attempts 37 refreshMutexes map[string]*sync.Mutex 38 39 // Strings 40 pdsURL string 41 instanceDID string 42 instanceDomain string 43 pdsAccessToken string 44 45 // Sync primitives last 46 mapMutex sync.RWMutex // Protects refreshMutexes map itself 47} 48 49const ( 50 // Maximum recommended size for mutex cache (warning threshold, not hard limit) 51 // At 10,000 entries × 16 bytes = ~160KB memory (negligible overhead) 52 // Map can grow larger in production - even 100,000 entries = 1.6MB is acceptable 53 maxMutexCacheSize = 10000 54) 55 56// NewCommunityService creates a new community service 57func NewCommunityService(repo Repository, pdsURL, instanceDID, instanceDomain string, provisioner *PDSAccountProvisioner) Service { 58 // SECURITY: Basic validation that did:web domain matches configured instanceDomain 59 // This catches honest configuration mistakes but NOT malicious code modifications 60 // Full verification (Phase 2) requires fetching DID document from domain 61 // See: docs/PRD_BACKLOG.md - "did:web Domain Verification" 62 if strings.HasPrefix(instanceDID, "did:web:") { 63 didDomain := strings.TrimPrefix(instanceDID, "did:web:") 64 if didDomain != instanceDomain { 65 log.Printf("⚠️ SECURITY WARNING: Instance DID domain (%s) doesn't match configured domain (%s)", 66 didDomain, instanceDomain) 67 log.Printf(" This could indicate a configuration error or potential domain spoofing attempt") 68 log.Printf(" Communities will be hosted by: %s", instanceDID) 69 } 70 } 71 72 return &communityService{ 73 repo: repo, 74 pdsURL: pdsURL, 75 instanceDID: instanceDID, 76 instanceDomain: instanceDomain, 77 provisioner: provisioner, 78 refreshMutexes: make(map[string]*sync.Mutex), 79 } 80} 81 82// SetPDSAccessToken sets the PDS access token for authentication 83// This should be called after creating a session for the Coves instance DID on the PDS 84func (s *communityService) SetPDSAccessToken(token string) { 85 s.pdsAccessToken = token 86} 87 88// CreateCommunity creates a new community via write-forward to PDS 89// V2 Flow: 90// 1. Service creates PDS account for community (PDS generates signing keypair) 91// 2. Service writes community profile to COMMUNITY's own repository 92// 3. Firehose emits event 93// 4. Consumer indexes to AppView DB 94// 95// V2 Architecture: 96// - Community owns its own repository (at://community_did/social.coves.community.profile/self) 97// - PDS manages the signing keypair (we never see it) 98// - We store PDS credentials to act on behalf of the community 99// - Community can migrate to other instances (future V2.1 with rotation keys) 100func (s *communityService) CreateCommunity(ctx context.Context, req CreateCommunityRequest) (*Community, error) { 101 // Apply defaults before validation 102 if req.Visibility == "" { 103 req.Visibility = "public" 104 } 105 106 // SECURITY: Auto-populate hostedByDID from instance configuration 107 // Clients MUST NOT provide this field - it's derived from the instance receiving the request 108 // This prevents malicious instances from claiming to host communities for domains they don't own 109 req.HostedByDID = s.instanceDID 110 111 // Validate request 112 if err := s.validateCreateRequest(req); err != nil { 113 return nil, err 114 } 115 116 // V2: Provision a real PDS account for this community 117 // This calls com.atproto.server.createAccount internally 118 // The PDS will: 119 // 1. Generate a signing keypair (stored in PDS, we never see it) 120 // 2. Create a DID (did:plc:xxx) 121 // 3. Return credentials (DID, tokens) 122 pdsAccount, err := s.provisioner.ProvisionCommunityAccount(ctx, req.Name) 123 if err != nil { 124 return nil, fmt.Errorf("failed to provision PDS account for community: %w", err) 125 } 126 127 // Validate the atProto handle 128 if validateErr := s.ValidateHandle(pdsAccount.Handle); validateErr != nil { 129 return nil, fmt.Errorf("generated atProto handle is invalid: %w", validateErr) 130 } 131 132 // Build community profile record 133 profile := map[string]interface{}{ 134 "$type": "social.coves.community.profile", 135 "name": req.Name, // Short name for !mentions (e.g., "gaming") 136 "visibility": req.Visibility, 137 "hostedBy": s.instanceDID, // V2: Instance hosts, community owns 138 "createdBy": req.CreatedByDID, 139 "createdAt": time.Now().Format(time.RFC3339), 140 "federation": map[string]interface{}{ 141 "allowExternalDiscovery": req.AllowExternalDiscovery, 142 }, 143 } 144 145 // Add optional fields 146 if req.DisplayName != "" { 147 profile["displayName"] = req.DisplayName 148 } 149 if req.Description != "" { 150 profile["description"] = req.Description 151 } 152 if len(req.Rules) > 0 { 153 profile["rules"] = req.Rules 154 } 155 if len(req.Categories) > 0 { 156 profile["categories"] = req.Categories 157 } 158 if req.Language != "" { 159 profile["language"] = req.Language 160 } 161 162 // TODO: Handle avatar and banner blobs 163 // For now, we'll skip blob uploads. This would require: 164 // 1. Upload blob to PDS via com.atproto.repo.uploadBlob 165 // 2. Get blob ref (CID) 166 // 3. Add to profile record 167 168 // V2: Write to COMMUNITY's own repository (not instance repo!) 169 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self 170 // Authenticate using community's access token 171 recordURI, recordCID, err := s.createRecordOnPDSAs( 172 ctx, 173 pdsAccount.DID, // repo = community's DID (community owns its repo!) 174 "social.coves.community.profile", 175 "self", // canonical rkey for profile 176 profile, 177 pdsAccount.AccessToken, // authenticate as the community 178 ) 179 if err != nil { 180 return nil, fmt.Errorf("failed to create community profile record: %w", err) 181 } 182 183 // Build Community object with PDS credentials AND cryptographic keys 184 community := &Community{ 185 DID: pdsAccount.DID, // Community's DID (owns the repo!) 186 Handle: pdsAccount.Handle, // atProto handle (e.g., gaming.community.coves.social) 187 Name: req.Name, 188 DisplayName: req.DisplayName, 189 Description: req.Description, 190 OwnerDID: pdsAccount.DID, // V2: Community owns itself 191 CreatedByDID: req.CreatedByDID, 192 HostedByDID: req.HostedByDID, 193 PDSEmail: pdsAccount.Email, 194 PDSPassword: pdsAccount.Password, 195 PDSAccessToken: pdsAccount.AccessToken, 196 PDSRefreshToken: pdsAccount.RefreshToken, 197 PDSURL: pdsAccount.PDSURL, 198 Visibility: req.Visibility, 199 AllowExternalDiscovery: req.AllowExternalDiscovery, 200 MemberCount: 0, 201 SubscriberCount: 0, 202 CreatedAt: time.Now(), 203 UpdatedAt: time.Now(), 204 RecordURI: recordURI, 205 RecordCID: recordCID, 206 // V2: Cryptographic keys for portability (will be encrypted by repository) 207 RotationKeyPEM: pdsAccount.RotationKeyPEM, // CRITICAL: Enables DID migration 208 SigningKeyPEM: pdsAccount.SigningKeyPEM, // For atproto operations 209 } 210 211 // CRITICAL: Persist PDS credentials immediately to database 212 // The Jetstream consumer will eventually index the community profile from the firehose, 213 // but it won't have the PDS credentials. We must store them now so we can: 214 // 1. Update the community profile later (using its own credentials) 215 // 2. Re-authenticate if access tokens expire 216 _, err = s.repo.Create(ctx, community) 217 if err != nil { 218 return nil, fmt.Errorf("failed to persist community with credentials: %w", err) 219 } 220 221 return community, nil 222} 223 224// GetCommunity retrieves a community from AppView DB 225// identifier can be either a DID or handle 226func (s *communityService) GetCommunity(ctx context.Context, identifier string) (*Community, error) { 227 if identifier == "" { 228 return nil, ErrInvalidInput 229 } 230 231 // Determine if identifier is DID or handle 232 if strings.HasPrefix(identifier, "did:") { 233 return s.repo.GetByDID(ctx, identifier) 234 } 235 236 if strings.HasPrefix(identifier, "!") { 237 return s.repo.GetByHandle(ctx, identifier) 238 } 239 240 return nil, NewValidationError("identifier", "must be a DID or handle") 241} 242 243// GetByDID retrieves a community by its DID 244// Exported for use by post service when validating community references 245func (s *communityService) GetByDID(ctx context.Context, did string) (*Community, error) { 246 if did == "" { 247 return nil, ErrInvalidInput 248 } 249 250 if !strings.HasPrefix(did, "did:") { 251 return nil, NewValidationError("did", "must be a valid DID") 252 } 253 254 return s.repo.GetByDID(ctx, did) 255} 256 257// UpdateCommunity updates a community via write-forward to PDS 258func (s *communityService) UpdateCommunity(ctx context.Context, req UpdateCommunityRequest) (*Community, error) { 259 if req.CommunityDID == "" { 260 return nil, NewValidationError("communityDid", "required") 261 } 262 263 if req.UpdatedByDID == "" { 264 return nil, NewValidationError("updatedByDid", "required") 265 } 266 267 // Get existing community 268 existing, err := s.repo.GetByDID(ctx, req.CommunityDID) 269 if err != nil { 270 return nil, err 271 } 272 273 // CRITICAL: Ensure fresh PDS access token before write operation 274 // Community PDS tokens expire every ~2 hours and must be refreshed 275 existing, err = s.EnsureFreshToken(ctx, existing) 276 if err != nil { 277 return nil, fmt.Errorf("failed to ensure fresh credentials: %w", err) 278 } 279 280 // Authorization: verify user is the creator 281 // TODO(Communities-Auth): Add moderator check when moderation system is implemented 282 if existing.CreatedByDID != req.UpdatedByDID { 283 return nil, ErrUnauthorized 284 } 285 286 // Build updated profile record (start with existing) 287 profile := map[string]interface{}{ 288 "$type": "social.coves.community.profile", 289 "name": existing.Name, 290 "owner": existing.OwnerDID, 291 "createdBy": existing.CreatedByDID, 292 "hostedBy": existing.HostedByDID, 293 "createdAt": existing.CreatedAt.Format(time.RFC3339), 294 } 295 296 // Apply updates 297 if req.DisplayName != nil { 298 profile["displayName"] = *req.DisplayName 299 } else { 300 profile["displayName"] = existing.DisplayName 301 } 302 303 if req.Description != nil { 304 profile["description"] = *req.Description 305 } else { 306 profile["description"] = existing.Description 307 } 308 309 if req.Visibility != nil { 310 profile["visibility"] = *req.Visibility 311 } else { 312 profile["visibility"] = existing.Visibility 313 } 314 315 if req.AllowExternalDiscovery != nil { 316 profile["federation"] = map[string]interface{}{ 317 "allowExternalDiscovery": *req.AllowExternalDiscovery, 318 } 319 } else { 320 profile["federation"] = map[string]interface{}{ 321 "allowExternalDiscovery": existing.AllowExternalDiscovery, 322 } 323 } 324 325 // Preserve moderation settings (even if empty) 326 // These fields are optional but should not be erased on update 327 if req.ModerationType != nil { 328 profile["moderationType"] = *req.ModerationType 329 } else if existing.ModerationType != "" { 330 profile["moderationType"] = existing.ModerationType 331 } 332 333 if len(req.ContentWarnings) > 0 { 334 profile["contentWarnings"] = req.ContentWarnings 335 } else if len(existing.ContentWarnings) > 0 { 336 profile["contentWarnings"] = existing.ContentWarnings 337 } 338 339 // V2: Community profiles always use "self" as rkey 340 // (No need to extract from URI - it's always "self" for V2 communities) 341 342 // V2 CRITICAL FIX: Write-forward using COMMUNITY's own DID and credentials 343 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self 344 // Authenticate as the community (not as instance!) 345 if existing.PDSAccessToken == "" { 346 return nil, fmt.Errorf("community %s missing PDS credentials - cannot update", existing.DID) 347 } 348 349 recordURI, recordCID, err := s.putRecordOnPDSAs( 350 ctx, 351 existing.DID, // repo = community's own DID (V2!) 352 "social.coves.community.profile", 353 "self", // V2: always "self" 354 profile, 355 existing.PDSAccessToken, // authenticate as the community 356 ) 357 if err != nil { 358 return nil, fmt.Errorf("failed to update community on PDS: %w", err) 359 } 360 361 // Return updated community representation 362 // Actual AppView DB update happens via Jetstream consumer 363 updated := *existing 364 if req.DisplayName != nil { 365 updated.DisplayName = *req.DisplayName 366 } 367 if req.Description != nil { 368 updated.Description = *req.Description 369 } 370 if req.Visibility != nil { 371 updated.Visibility = *req.Visibility 372 } 373 if req.AllowExternalDiscovery != nil { 374 updated.AllowExternalDiscovery = *req.AllowExternalDiscovery 375 } 376 if req.ModerationType != nil { 377 updated.ModerationType = *req.ModerationType 378 } 379 if len(req.ContentWarnings) > 0 { 380 updated.ContentWarnings = req.ContentWarnings 381 } 382 updated.RecordURI = recordURI 383 updated.RecordCID = recordCID 384 updated.UpdatedAt = time.Now() 385 386 return &updated, nil 387} 388 389// getOrCreateRefreshMutex returns a mutex for the given community DID 390// Thread-safe with read-lock fast path for existing entries 391// SAFETY: Does NOT evict entries to avoid race condition where: 392// 1. Thread A holds mutex for community-123 393// 2. Thread B evicts community-123 from map 394// 3. Thread C creates NEW mutex for community-123 395// 4. Now two threads can refresh community-123 concurrently (mutex defeated!) 396func (s *communityService) getOrCreateRefreshMutex(did string) *sync.Mutex { 397 // Fast path: check if mutex already exists (read lock) 398 s.mapMutex.RLock() 399 mutex, exists := s.refreshMutexes[did] 400 s.mapMutex.RUnlock() 401 402 if exists { 403 return mutex 404 } 405 406 // Slow path: create new mutex (write lock) 407 s.mapMutex.Lock() 408 defer s.mapMutex.Unlock() 409 410 // Double-check after acquiring write lock (another goroutine might have created it) 411 mutex, exists = s.refreshMutexes[did] 412 if exists { 413 return mutex 414 } 415 416 // Create new mutex 417 mutex = &sync.Mutex{} 418 s.refreshMutexes[did] = mutex 419 420 // SAFETY: No eviction to prevent race condition 421 // Map will grow beyond maxMutexCacheSize but this is safer than evicting in-use mutexes 422 if len(s.refreshMutexes) > maxMutexCacheSize { 423 memoryKB := len(s.refreshMutexes) * 16 / 1024 424 log.Printf("[TOKEN-REFRESH] WARN: Mutex cache size (%d) exceeds recommended limit (%d) - this is safe but may indicate high community churn. Memory usage: ~%d KB", 425 len(s.refreshMutexes), maxMutexCacheSize, memoryKB) 426 } 427 428 return mutex 429} 430 431// ensureFreshToken checks if a community's access token needs refresh and updates if needed 432// Returns updated community with fresh credentials (or original if no refresh needed) 433// Thread-safe: Uses per-community mutex to prevent concurrent refresh attempts 434// EnsureFreshToken ensures the community's PDS access token is valid 435// Exported for use by post service when writing posts to community repos 436func (s *communityService) EnsureFreshToken(ctx context.Context, community *Community) (*Community, error) { 437 // Get or create mutex for this specific community DID 438 mutex := s.getOrCreateRefreshMutex(community.DID) 439 440 // Lock for this specific community (allows other communities to refresh concurrently) 441 mutex.Lock() 442 defer mutex.Unlock() 443 444 // Re-fetch community from DB (another goroutine might have already refreshed it) 445 fresh, err := s.repo.GetByDID(ctx, community.DID) 446 if err != nil { 447 return nil, fmt.Errorf("failed to re-fetch community: %w", err) 448 } 449 450 // Check if token needs refresh (5-minute buffer before expiration) 451 needsRefresh, err := NeedsRefresh(fresh.PDSAccessToken) 452 if err != nil { 453 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_parse_failed, Error: %v", fresh.DID, err) 454 return nil, fmt.Errorf("failed to check token expiration: %w", err) 455 } 456 457 if !needsRefresh { 458 // Token still valid, no refresh needed 459 return fresh, nil 460 } 461 462 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refresh_started, Message: Access token expiring soon", fresh.DID) 463 464 // Attempt token refresh using refresh token 465 newAccessToken, newRefreshToken, err := refreshPDSToken(ctx, fresh.PDSURL, fresh.PDSAccessToken, fresh.PDSRefreshToken) 466 if err != nil { 467 // Check if refresh token expired (need password fallback) 468 // Match both "ExpiredToken" and "Token has expired" error messages 469 if strings.Contains(strings.ToLower(err.Error()), "expired") { 470 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_token_expired, Message: Re-authenticating with password", fresh.DID) 471 472 // Fallback: Re-authenticate with stored password 473 newAccessToken, newRefreshToken, err = reauthenticateWithPassword( 474 ctx, 475 fresh.PDSURL, 476 fresh.PDSEmail, 477 fresh.PDSPassword, // Retrieved decrypted from DB 478 ) 479 if err != nil { 480 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_auth_failed, Error: %v", fresh.DID, err) 481 return nil, fmt.Errorf("failed to re-authenticate community: %w", err) 482 } 483 484 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_fallback_success, Message: Re-authenticated after refresh token expiry", fresh.DID) 485 } else { 486 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_failed, Error: %v", fresh.DID, err) 487 return nil, fmt.Errorf("failed to refresh token: %w", err) 488 } 489 } 490 491 // CRITICAL: Update database with new tokens immediately 492 // Refresh tokens are SINGLE-USE - old one is now invalid 493 // Use retry logic to handle transient DB failures 494 const maxRetries = 3 495 var updateErr error 496 for attempt := 0; attempt < maxRetries; attempt++ { 497 updateErr = s.repo.UpdateCredentials(ctx, fresh.DID, newAccessToken, newRefreshToken) 498 if updateErr == nil { 499 break // Success 500 } 501 502 log.Printf("[TOKEN-REFRESH] Community: %s, Event: db_update_retry, Attempt: %d/%d, Error: %v", 503 fresh.DID, attempt+1, maxRetries, updateErr) 504 505 if attempt < maxRetries-1 { 506 // Exponential backoff: 100ms, 200ms, 400ms 507 backoff := time.Duration(1<<attempt) * 100 * time.Millisecond 508 time.Sleep(backoff) 509 } 510 } 511 512 if updateErr != nil { 513 // CRITICAL: Community is now locked out - old refresh token invalid, new one not saved 514 log.Printf("[TOKEN-REFRESH] CRITICAL: Community %s LOCKED OUT - failed to persist credentials after %d retries: %v", 515 fresh.DID, maxRetries, updateErr) 516 // TODO: Send alert to monitoring system (add in Beta) 517 return nil, fmt.Errorf("failed to persist refreshed credentials after %d retries (COMMUNITY LOCKED OUT): %w", 518 maxRetries, updateErr) 519 } 520 521 // Return updated community object with fresh tokens 522 updatedCommunity := *fresh 523 updatedCommunity.PDSAccessToken = newAccessToken 524 updatedCommunity.PDSRefreshToken = newRefreshToken 525 526 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refreshed, Message: Access token refreshed successfully", fresh.DID) 527 528 return &updatedCommunity, nil 529} 530 531// ListCommunities queries AppView DB for communities with filters 532func (s *communityService) ListCommunities(ctx context.Context, req ListCommunitiesRequest) ([]*Community, int, error) { 533 // Set defaults 534 if req.Limit <= 0 || req.Limit > 100 { 535 req.Limit = 50 536 } 537 538 return s.repo.List(ctx, req) 539} 540 541// SearchCommunities performs fuzzy search in AppView DB 542func (s *communityService) SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error) { 543 if req.Query == "" { 544 return nil, 0, NewValidationError("query", "search query is required") 545 } 546 547 // Set defaults 548 if req.Limit <= 0 || req.Limit > 100 { 549 req.Limit = 50 550 } 551 552 return s.repo.Search(ctx, req) 553} 554 555// SubscribeToCommunity creates a subscription via write-forward to PDS 556func (s *communityService) SubscribeToCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string, contentVisibility int) (*Subscription, error) { 557 if userDID == "" { 558 return nil, NewValidationError("userDid", "required") 559 } 560 if userAccessToken == "" { 561 return nil, NewValidationError("userAccessToken", "required") 562 } 563 564 // Clamp contentVisibility to valid range (1-5), default to 3 if 0 or invalid 565 if contentVisibility <= 0 || contentVisibility > 5 { 566 contentVisibility = 3 567 } 568 569 // Resolve community identifier to DID 570 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 571 if err != nil { 572 return nil, err 573 } 574 575 // Verify community exists 576 community, err := s.repo.GetByDID(ctx, communityDID) 577 if err != nil { 578 return nil, err 579 } 580 581 // Check visibility - can't subscribe to private communities without invitation (TODO) 582 if community.Visibility == "private" { 583 return nil, ErrUnauthorized 584 } 585 586 // Build subscription record 587 // CRITICAL: Collection is social.coves.community.subscription (RECORD TYPE), not social.coves.community.subscribe (XRPC procedure) 588 // This record will be created in the USER's repository: at://user_did/social.coves.community.subscription/{tid} 589 // Following atProto conventions, we use "subject" field to reference the community 590 subRecord := map[string]interface{}{ 591 "$type": "social.coves.community.subscription", 592 "subject": communityDID, // atProto convention: "subject" for entity references 593 "createdAt": time.Now().Format(time.RFC3339), 594 "contentVisibility": contentVisibility, 595 } 596 597 // Write-forward: create subscription record in user's repo using their access token 598 // The collection parameter refers to the record type in the repository 599 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", "", subRecord, userAccessToken) 600 if err != nil { 601 return nil, fmt.Errorf("failed to create subscription on PDS: %w", err) 602 } 603 604 // Return subscription representation 605 subscription := &Subscription{ 606 UserDID: userDID, 607 CommunityDID: communityDID, 608 ContentVisibility: contentVisibility, 609 SubscribedAt: time.Now(), 610 RecordURI: recordURI, 611 RecordCID: recordCID, 612 } 613 614 return subscription, nil 615} 616 617// UnsubscribeFromCommunity removes a subscription via PDS delete 618func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error { 619 if userDID == "" { 620 return NewValidationError("userDid", "required") 621 } 622 if userAccessToken == "" { 623 return NewValidationError("userAccessToken", "required") 624 } 625 626 // Resolve community identifier 627 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 628 if err != nil { 629 return err 630 } 631 632 // Get the subscription from AppView to find the record key 633 subscription, err := s.repo.GetSubscription(ctx, userDID, communityDID) 634 if err != nil { 635 return err 636 } 637 638 // Extract rkey from record URI (at://did/collection/rkey) 639 rkey := utils.ExtractRKeyFromURI(subscription.RecordURI) 640 if rkey == "" { 641 return fmt.Errorf("invalid subscription record URI") 642 } 643 644 // Write-forward: delete record from PDS using user's access token 645 // CRITICAL: Delete from social.coves.community.subscription (RECORD TYPE), not social.coves.community.unsubscribe 646 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", rkey, userAccessToken); err != nil { 647 return fmt.Errorf("failed to delete subscription on PDS: %w", err) 648 } 649 650 return nil 651} 652 653// GetUserSubscriptions queries AppView DB for user's subscriptions 654func (s *communityService) GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) { 655 if limit <= 0 || limit > 100 { 656 limit = 50 657 } 658 659 return s.repo.ListSubscriptions(ctx, userDID, limit, offset) 660} 661 662// GetCommunitySubscribers queries AppView DB for community subscribers 663func (s *communityService) GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) { 664 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 665 if err != nil { 666 return nil, err 667 } 668 669 if limit <= 0 || limit > 100 { 670 limit = 50 671 } 672 673 return s.repo.ListSubscribers(ctx, communityDID, limit, offset) 674} 675 676// GetMembership retrieves membership info from AppView DB 677func (s *communityService) GetMembership(ctx context.Context, userDID, communityIdentifier string) (*Membership, error) { 678 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 679 if err != nil { 680 return nil, err 681 } 682 683 return s.repo.GetMembership(ctx, userDID, communityDID) 684} 685 686// ListCommunityMembers queries AppView DB for members 687func (s *communityService) ListCommunityMembers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Membership, error) { 688 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 689 if err != nil { 690 return nil, err 691 } 692 693 if limit <= 0 || limit > 100 { 694 limit = 50 695 } 696 697 return s.repo.ListMembers(ctx, communityDID, limit, offset) 698} 699 700// BlockCommunity blocks a community via write-forward to PDS 701func (s *communityService) BlockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) (*CommunityBlock, error) { 702 if userDID == "" { 703 return nil, NewValidationError("userDid", "required") 704 } 705 if userAccessToken == "" { 706 return nil, NewValidationError("userAccessToken", "required") 707 } 708 709 // Resolve community identifier (also verifies community exists) 710 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 711 if err != nil { 712 return nil, err 713 } 714 715 // Build block record 716 // CRITICAL: Collection is social.coves.community.block (RECORD TYPE) 717 // This record will be created in the USER's repository: at://user_did/social.coves.community.block/{tid} 718 // Following atProto conventions and Bluesky's app.bsky.graph.block pattern 719 blockRecord := map[string]interface{}{ 720 "$type": "social.coves.community.block", 721 "subject": communityDID, // DID of community being blocked 722 "createdAt": time.Now().Format(time.RFC3339), 723 } 724 725 // Write-forward: create block record in user's repo using their access token 726 // Note: We don't check for existing blocks first because: 727 // 1. The PDS may reject duplicates (depending on implementation) 728 // 2. The repository layer handles idempotency with ON CONFLICT DO NOTHING 729 // 3. This avoids a race condition where two concurrent requests both pass the check 730 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.block", "", blockRecord, userAccessToken) 731 if err != nil { 732 // Check if this is a duplicate/conflict error from PDS 733 // PDS should return 409 Conflict for duplicate records, but we also check common error messages 734 // for compatibility with different PDS implementations 735 errMsg := err.Error() 736 isDuplicate := strings.Contains(errMsg, "status 409") || // HTTP 409 Conflict 737 strings.Contains(errMsg, "duplicate") || 738 strings.Contains(errMsg, "already exists") || 739 strings.Contains(errMsg, "AlreadyExists") 740 741 if isDuplicate { 742 // Fetch and return existing block from our indexed view 743 existingBlock, getErr := s.repo.GetBlock(ctx, userDID, communityDID) 744 if getErr == nil { 745 // Block exists in our index - return it 746 return existingBlock, nil 747 } 748 // Only treat as "already exists" if the error is ErrBlockNotFound (race condition) 749 // Any other error (DB outage, connection failure, etc.) should bubble up 750 if errors.Is(getErr, ErrBlockNotFound) { 751 // Race condition: PDS has the block but Jetstream hasn't indexed it yet 752 // Return typed conflict error so handler can return 409 instead of 500 753 // This is normal in eventually-consistent systems 754 return nil, ErrBlockAlreadyExists 755 } 756 // Real datastore error - bubble it up so operators see the failure 757 return nil, fmt.Errorf("PDS reported duplicate block but failed to fetch from index: %w", getErr) 758 } 759 return nil, fmt.Errorf("failed to create block on PDS: %w", err) 760 } 761 762 // Return block representation 763 block := &CommunityBlock{ 764 UserDID: userDID, 765 CommunityDID: communityDID, 766 BlockedAt: time.Now(), 767 RecordURI: recordURI, 768 RecordCID: recordCID, 769 } 770 771 return block, nil 772} 773 774// UnblockCommunity removes a block via PDS delete 775func (s *communityService) UnblockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error { 776 if userDID == "" { 777 return NewValidationError("userDid", "required") 778 } 779 if userAccessToken == "" { 780 return NewValidationError("userAccessToken", "required") 781 } 782 783 // Resolve community identifier 784 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 785 if err != nil { 786 return err 787 } 788 789 // Get the block from AppView to find the record key 790 block, err := s.repo.GetBlock(ctx, userDID, communityDID) 791 if err != nil { 792 return err 793 } 794 795 // Extract rkey from record URI (at://did/collection/rkey) 796 rkey := utils.ExtractRKeyFromURI(block.RecordURI) 797 if rkey == "" { 798 return fmt.Errorf("invalid block record URI") 799 } 800 801 // Write-forward: delete record from PDS using user's access token 802 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.block", rkey, userAccessToken); err != nil { 803 return fmt.Errorf("failed to delete block on PDS: %w", err) 804 } 805 806 return nil 807} 808 809// GetBlockedCommunities queries AppView DB for user's blocks 810func (s *communityService) GetBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*CommunityBlock, error) { 811 if limit <= 0 || limit > 100 { 812 limit = 50 813 } 814 815 return s.repo.ListBlockedCommunities(ctx, userDID, limit, offset) 816} 817 818// IsBlocked checks if a user has blocked a community 819func (s *communityService) IsBlocked(ctx context.Context, userDID, communityIdentifier string) (bool, error) { 820 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 821 if err != nil { 822 return false, err 823 } 824 825 return s.repo.IsBlocked(ctx, userDID, communityDID) 826} 827 828// ValidateHandle checks if a community handle is valid 829func (s *communityService) ValidateHandle(handle string) error { 830 if handle == "" { 831 return NewValidationError("handle", "required") 832 } 833 834 if !communityHandleRegex.MatchString(handle) { 835 return ErrInvalidHandle 836 } 837 838 return nil 839} 840 841// ResolveCommunityIdentifier converts a community identifier to a DID 842// Following Bluesky's pattern with Coves extensions: 843// 844// Accepts (like Bluesky's at-identifier): 845// 1. DID: did:plc:abc123 (pass through) 846// 2. Canonical handle: gardening.community.coves.social (atProto standard) 847// 3. At-identifier: @gardening.community.coves.social (strip @ prefix) 848// 849// Coves-specific extensions: 850// 4. Scoped format: !gardening@coves.social (parse and resolve) 851// 852// Returns: DID string 853func (s *communityService) ResolveCommunityIdentifier(ctx context.Context, identifier string) (string, error) { 854 identifier = strings.TrimSpace(identifier) 855 856 if identifier == "" { 857 return "", ErrInvalidInput 858 } 859 860 // 1. DID - verify it exists and return (Bluesky standard) 861 if strings.HasPrefix(identifier, "did:") { 862 _, err := s.repo.GetByDID(ctx, identifier) 863 if err != nil { 864 if IsNotFound(err) { 865 return "", fmt.Errorf("community not found for DID %s: %w", identifier, err) 866 } 867 return "", fmt.Errorf("failed to verify community DID %s: %w", identifier, err) 868 } 869 return identifier, nil 870 } 871 872 // 2. Scoped format: !name@instance (Coves-specific) 873 if strings.HasPrefix(identifier, "!") { 874 return s.resolveScopedIdentifier(ctx, identifier) 875 } 876 877 // 3. At-identifier format: @handle (Bluesky standard - strip @ prefix) 878 identifier = strings.TrimPrefix(identifier, "@") 879 880 // 4. Canonical handle: name.community.instance.com (Bluesky standard) 881 if strings.Contains(identifier, ".") { 882 community, err := s.repo.GetByHandle(ctx, strings.ToLower(identifier)) 883 if err != nil { 884 return "", fmt.Errorf("community not found for handle %s: %w", identifier, err) 885 } 886 return community.DID, nil 887 } 888 889 return "", NewValidationError("identifier", "must be a DID, handle, or scoped identifier (!name@instance)") 890} 891 892// resolveScopedIdentifier handles Coves-specific !name@instance format 893// Formats accepted: 894// 895// !gardening@coves.social -> gardening.community.coves.social 896func (s *communityService) resolveScopedIdentifier(ctx context.Context, scoped string) (string, error) { 897 // Remove ! prefix 898 scoped = strings.TrimPrefix(scoped, "!") 899 900 var name string 901 var instanceDomain string 902 903 // Parse !name@instance 904 if !strings.Contains(scoped, "@") { 905 return "", NewValidationError("identifier", "scoped identifier must include @ symbol (!name@instance)") 906 } 907 908 parts := strings.SplitN(scoped, "@", 2) 909 name = strings.TrimSpace(parts[0]) 910 instanceDomain = strings.TrimSpace(parts[1]) 911 912 // Validate name format 913 if name == "" { 914 return "", NewValidationError("identifier", "community name cannot be empty") 915 } 916 917 // Validate name is a valid DNS label (RFC 1035) 918 // Must be 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen 919 if !isValidDNSLabel(name) { 920 return "", NewValidationError("identifier", "community name must be valid DNS label (alphanumeric and hyphens only, 1-63 chars, cannot start or end with hyphen)") 921 } 922 923 // Validate instance domain format 924 if !isValidDomain(instanceDomain) { 925 return "", NewValidationError("identifier", "invalid instance domain format") 926 } 927 928 // Normalize domain to lowercase (DNS is case-insensitive) 929 // This fixes the bug where !gardening@Coves.social would fail lookup 930 instanceDomain = strings.ToLower(instanceDomain) 931 932 // Validate the instance matches this server 933 if !s.isLocalInstance(instanceDomain) { 934 return "", NewValidationError("identifier", 935 fmt.Sprintf("community is not hosted on this instance (expected @%s)", s.instanceDomain)) 936 } 937 938 // Construct canonical handle: {name}.community.{instanceDomain} 939 // Both name and instanceDomain are normalized to lowercase for consistent DB lookup 940 canonicalHandle := fmt.Sprintf("%s.community.%s", 941 strings.ToLower(name), 942 instanceDomain) // Already normalized to lowercase on line 923 943 944 // Look up by canonical handle 945 community, err := s.repo.GetByHandle(ctx, canonicalHandle) 946 if err != nil { 947 return "", fmt.Errorf("community not found for scoped identifier !%s@%s: %w", name, instanceDomain, err) 948 } 949 950 return community.DID, nil 951} 952 953// isLocalInstance checks if the provided domain matches this instance 954func (s *communityService) isLocalInstance(domain string) bool { 955 // Normalize both domains 956 domain = strings.ToLower(strings.TrimSpace(domain)) 957 instanceDomain := strings.ToLower(s.instanceDomain) 958 959 // Direct match 960 return domain == instanceDomain 961} 962 963// Validation helpers 964 965// isValidDNSLabel validates that a string is a valid DNS label per RFC 1035 966// - 1-63 characters 967// - Alphanumeric and hyphens only 968// - Cannot start or end with hyphen 969func isValidDNSLabel(label string) bool { 970 return dnsLabelRegex.MatchString(label) 971} 972 973// isValidDomain validates that a string is a valid domain name 974// Simplified validation - checks basic DNS hostname structure 975func isValidDomain(domain string) bool { 976 if domain == "" || len(domain) > 253 { 977 return false 978 } 979 return domainRegex.MatchString(domain) 980} 981 982func (s *communityService) validateCreateRequest(req CreateCommunityRequest) error { 983 if req.Name == "" { 984 return NewValidationError("name", "required") 985 } 986 987 // DNS label limit: 63 characters per label 988 // Community handle format: {name}.community.{instanceDomain} 989 // The first label is just req.Name, so it must be <= 63 chars 990 if len(req.Name) > 63 { 991 return NewValidationError("name", "must be 63 characters or less (DNS label limit)") 992 } 993 994 // Name can only contain alphanumeric and hyphens 995 // Must start and end with alphanumeric (not hyphen) 996 nameRegex := regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 997 if !nameRegex.MatchString(req.Name) { 998 return NewValidationError("name", "must contain only alphanumeric characters and hyphens") 999 } 1000 1001 if req.Description != "" && len(req.Description) > 3000 { 1002 return NewValidationError("description", "must be 3000 characters or less") 1003 } 1004 1005 // Visibility should already be set with default in CreateCommunity 1006 if req.Visibility != "public" && req.Visibility != "unlisted" && req.Visibility != "private" { 1007 return ErrInvalidVisibility 1008 } 1009 1010 if req.CreatedByDID == "" { 1011 return NewValidationError("createdByDid", "required") 1012 } 1013 1014 // hostedByDID is auto-populated by the service layer, no validation needed 1015 // The handler ensures clients cannot provide this field 1016 1017 return nil 1018} 1019 1020// PDS write-forward helpers 1021 1022// createRecordOnPDSAs creates a record with a specific access token (for V2 community auth) 1023func (s *communityService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) { 1024 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/")) 1025 1026 payload := map[string]interface{}{ 1027 "repo": repoDID, 1028 "collection": collection, 1029 "record": record, 1030 } 1031 1032 if rkey != "" { 1033 payload["rkey"] = rkey 1034 } 1035 1036 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 1037} 1038 1039// putRecordOnPDSAs updates a record with a specific access token (for V2 community auth) 1040func (s *communityService) putRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) { 1041 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.putRecord", strings.TrimSuffix(s.pdsURL, "/")) 1042 1043 payload := map[string]interface{}{ 1044 "repo": repoDID, 1045 "collection": collection, 1046 "rkey": rkey, 1047 "record": record, 1048 } 1049 1050 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 1051} 1052 1053// deleteRecordOnPDSAs deletes a record with a specific access token (for user-scoped deletions) 1054func (s *communityService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey, accessToken string) error { 1055 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/")) 1056 1057 payload := map[string]interface{}{ 1058 "repo": repoDID, 1059 "collection": collection, 1060 "rkey": rkey, 1061 } 1062 1063 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 1064 return err 1065} 1066 1067// callPDSWithAuth makes a PDS call with a specific access token (V2: for community authentication) 1068func (s *communityService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) { 1069 jsonData, err := json.Marshal(payload) 1070 if err != nil { 1071 return "", "", fmt.Errorf("failed to marshal payload: %w", err) 1072 } 1073 1074 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData)) 1075 if err != nil { 1076 return "", "", fmt.Errorf("failed to create request: %w", err) 1077 } 1078 req.Header.Set("Content-Type", "application/json") 1079 1080 // Add authentication with provided access token 1081 if accessToken != "" { 1082 req.Header.Set("Authorization", "Bearer "+accessToken) 1083 } 1084 1085 // Dynamic timeout based on operation type 1086 // Write operations (createAccount, createRecord, putRecord) are slower due to: 1087 // - Keypair generation 1088 // - DID PLC registration 1089 // - Database writes on PDS 1090 timeout := 10 * time.Second // Default for read operations 1091 if strings.Contains(endpoint, "createAccount") || 1092 strings.Contains(endpoint, "createRecord") || 1093 strings.Contains(endpoint, "putRecord") { 1094 timeout = 30 * time.Second // Extended timeout for write operations 1095 } 1096 1097 client := &http.Client{Timeout: timeout} 1098 resp, err := client.Do(req) 1099 if err != nil { 1100 return "", "", fmt.Errorf("failed to call PDS: %w", err) 1101 } 1102 defer func() { 1103 if closeErr := resp.Body.Close(); closeErr != nil { 1104 log.Printf("Failed to close response body: %v", closeErr) 1105 } 1106 }() 1107 1108 body, err := io.ReadAll(resp.Body) 1109 if err != nil { 1110 return "", "", fmt.Errorf("failed to read response: %w", err) 1111 } 1112 1113 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 1114 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 1115 } 1116 1117 // Parse response to extract URI and CID 1118 var result struct { 1119 URI string `json:"uri"` 1120 CID string `json:"cid"` 1121 } 1122 if err := json.Unmarshal(body, &result); err != nil { 1123 // For delete operations, there might not be a response body 1124 if method == "POST" && strings.Contains(endpoint, "deleteRecord") { 1125 return "", "", nil 1126 } 1127 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 1128 } 1129 1130 return result.URI, result.CID, nil 1131} 1132 1133// Helper functions