A community based topic aggregation platform built on atproto
1package communities
2
3import (
4 "Coves/internal/atproto/utils"
5 "bytes"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log"
12 "net/http"
13 "regexp"
14 "strings"
15 "sync"
16 "time"
17)
18
19// Community handle validation regex (DNS-valid handle: name.community.instance.com)
20// Matches standard DNS hostname format (RFC 1035)
21var communityHandleRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
22
23// DNS label validation (RFC 1035: 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen)
24var dnsLabelRegex = regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
25
26// Domain validation (simplified - checks for valid DNS hostname structure)
27var domainRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)*[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
28
29type communityService struct {
30 // Interfaces and pointers first (better alignment)
31 repo Repository
32 provisioner *PDSAccountProvisioner
33
34 // Token refresh concurrency control
35 // Each community gets its own mutex to prevent concurrent refresh attempts
36 refreshMutexes map[string]*sync.Mutex
37
38 // Strings
39 pdsURL string
40 instanceDID string
41 instanceDomain string
42 pdsAccessToken string
43
44 // Sync primitives last
45 mapMutex sync.RWMutex // Protects refreshMutexes map itself
46}
47
48const (
49 // Maximum recommended size for mutex cache (warning threshold, not hard limit)
50 // At 10,000 entries × 16 bytes = ~160KB memory (negligible overhead)
51 // Map can grow larger in production - even 100,000 entries = 1.6MB is acceptable
52 maxMutexCacheSize = 10000
53)
54
55// NewCommunityService creates a new community service
56func NewCommunityService(repo Repository, pdsURL, instanceDID, instanceDomain string, provisioner *PDSAccountProvisioner) Service {
57 // SECURITY: Basic validation that did:web domain matches configured instanceDomain
58 // This catches honest configuration mistakes but NOT malicious code modifications
59 // Full verification (Phase 2) requires fetching DID document from domain
60 // See: docs/PRD_BACKLOG.md - "did:web Domain Verification"
61 if strings.HasPrefix(instanceDID, "did:web:") {
62 didDomain := strings.TrimPrefix(instanceDID, "did:web:")
63 if didDomain != instanceDomain {
64 log.Printf("⚠️ SECURITY WARNING: Instance DID domain (%s) doesn't match configured domain (%s)",
65 didDomain, instanceDomain)
66 log.Printf(" This could indicate a configuration error or potential domain spoofing attempt")
67 log.Printf(" Communities will be hosted by: %s", instanceDID)
68 }
69 }
70
71 return &communityService{
72 repo: repo,
73 pdsURL: pdsURL,
74 instanceDID: instanceDID,
75 instanceDomain: instanceDomain,
76 provisioner: provisioner,
77 refreshMutexes: make(map[string]*sync.Mutex),
78 }
79}
80
81// SetPDSAccessToken sets the PDS access token for authentication
82// This should be called after creating a session for the Coves instance DID on the PDS
83func (s *communityService) SetPDSAccessToken(token string) {
84 s.pdsAccessToken = token
85}
86
87// CreateCommunity creates a new community via write-forward to PDS
88// V2 Flow:
89// 1. Service creates PDS account for community (PDS generates signing keypair)
90// 2. Service writes community profile to COMMUNITY's own repository
91// 3. Firehose emits event
92// 4. Consumer indexes to AppView DB
93//
94// V2 Architecture:
95// - Community owns its own repository (at://community_did/social.coves.community.profile/self)
96// - PDS manages the signing keypair (we never see it)
97// - We store PDS credentials to act on behalf of the community
98// - Community can migrate to other instances (future V2.1 with rotation keys)
99func (s *communityService) CreateCommunity(ctx context.Context, req CreateCommunityRequest) (*Community, error) {
100 // Apply defaults before validation
101 if req.Visibility == "" {
102 req.Visibility = "public"
103 }
104
105 // SECURITY: Auto-populate hostedByDID from instance configuration
106 // Clients MUST NOT provide this field - it's derived from the instance receiving the request
107 // This prevents malicious instances from claiming to host communities for domains they don't own
108 req.HostedByDID = s.instanceDID
109
110 // Validate request
111 if err := s.validateCreateRequest(req); err != nil {
112 return nil, err
113 }
114
115 // V2: Provision a real PDS account for this community
116 // This calls com.atproto.server.createAccount internally
117 // The PDS will:
118 // 1. Generate a signing keypair (stored in PDS, we never see it)
119 // 2. Create a DID (did:plc:xxx)
120 // 3. Return credentials (DID, tokens)
121 pdsAccount, err := s.provisioner.ProvisionCommunityAccount(ctx, req.Name)
122 if err != nil {
123 return nil, fmt.Errorf("failed to provision PDS account for community: %w", err)
124 }
125
126 // Validate the atProto handle
127 if validateErr := s.ValidateHandle(pdsAccount.Handle); validateErr != nil {
128 return nil, fmt.Errorf("generated atProto handle is invalid: %w", validateErr)
129 }
130
131 // Build community profile record
132 profile := map[string]interface{}{
133 "$type": "social.coves.community.profile",
134 "name": req.Name, // Short name for !mentions (e.g., "gaming")
135 "visibility": req.Visibility,
136 "hostedBy": s.instanceDID, // V2: Instance hosts, community owns
137 "createdBy": req.CreatedByDID,
138 "createdAt": time.Now().Format(time.RFC3339),
139 "federation": map[string]interface{}{
140 "allowExternalDiscovery": req.AllowExternalDiscovery,
141 },
142 }
143
144 // Add optional fields
145 if req.DisplayName != "" {
146 profile["displayName"] = req.DisplayName
147 }
148 if req.Description != "" {
149 profile["description"] = req.Description
150 }
151 if len(req.Rules) > 0 {
152 profile["rules"] = req.Rules
153 }
154 if len(req.Categories) > 0 {
155 profile["categories"] = req.Categories
156 }
157 if req.Language != "" {
158 profile["language"] = req.Language
159 }
160
161 // TODO: Handle avatar and banner blobs
162 // For now, we'll skip blob uploads. This would require:
163 // 1. Upload blob to PDS via com.atproto.repo.uploadBlob
164 // 2. Get blob ref (CID)
165 // 3. Add to profile record
166
167 // V2: Write to COMMUNITY's own repository (not instance repo!)
168 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self
169 // Authenticate using community's access token
170 recordURI, recordCID, err := s.createRecordOnPDSAs(
171 ctx,
172 pdsAccount.DID, // repo = community's DID (community owns its repo!)
173 "social.coves.community.profile",
174 "self", // canonical rkey for profile
175 profile,
176 pdsAccount.AccessToken, // authenticate as the community
177 )
178 if err != nil {
179 return nil, fmt.Errorf("failed to create community profile record: %w", err)
180 }
181
182 // Build Community object with PDS credentials AND cryptographic keys
183 community := &Community{
184 DID: pdsAccount.DID, // Community's DID (owns the repo!)
185 Handle: pdsAccount.Handle, // atProto handle (e.g., gaming.community.coves.social)
186 Name: req.Name,
187 DisplayName: req.DisplayName,
188 Description: req.Description,
189 OwnerDID: pdsAccount.DID, // V2: Community owns itself
190 CreatedByDID: req.CreatedByDID,
191 HostedByDID: req.HostedByDID,
192 PDSEmail: pdsAccount.Email,
193 PDSPassword: pdsAccount.Password,
194 PDSAccessToken: pdsAccount.AccessToken,
195 PDSRefreshToken: pdsAccount.RefreshToken,
196 PDSURL: pdsAccount.PDSURL,
197 Visibility: req.Visibility,
198 AllowExternalDiscovery: req.AllowExternalDiscovery,
199 MemberCount: 0,
200 SubscriberCount: 0,
201 CreatedAt: time.Now(),
202 UpdatedAt: time.Now(),
203 RecordURI: recordURI,
204 RecordCID: recordCID,
205 // V2: Cryptographic keys for portability (will be encrypted by repository)
206 RotationKeyPEM: pdsAccount.RotationKeyPEM, // CRITICAL: Enables DID migration
207 SigningKeyPEM: pdsAccount.SigningKeyPEM, // For atproto operations
208 }
209
210 // CRITICAL: Persist PDS credentials immediately to database
211 // The Jetstream consumer will eventually index the community profile from the firehose,
212 // but it won't have the PDS credentials. We must store them now so we can:
213 // 1. Update the community profile later (using its own credentials)
214 // 2. Re-authenticate if access tokens expire
215 _, err = s.repo.Create(ctx, community)
216 if err != nil {
217 return nil, fmt.Errorf("failed to persist community with credentials: %w", err)
218 }
219
220 return community, nil
221}
222
223// GetCommunity retrieves a community from AppView DB
224// identifier can be either a DID or handle
225func (s *communityService) GetCommunity(ctx context.Context, identifier string) (*Community, error) {
226 if identifier == "" {
227 return nil, ErrInvalidInput
228 }
229
230 // Determine if identifier is DID or handle
231 if strings.HasPrefix(identifier, "did:") {
232 return s.repo.GetByDID(ctx, identifier)
233 }
234
235 if strings.HasPrefix(identifier, "!") {
236 return s.repo.GetByHandle(ctx, identifier)
237 }
238
239 return nil, NewValidationError("identifier", "must be a DID or handle")
240}
241
242// GetByDID retrieves a community by its DID
243// Exported for use by post service when validating community references
244func (s *communityService) GetByDID(ctx context.Context, did string) (*Community, error) {
245 if did == "" {
246 return nil, ErrInvalidInput
247 }
248
249 if !strings.HasPrefix(did, "did:") {
250 return nil, NewValidationError("did", "must be a valid DID")
251 }
252
253 return s.repo.GetByDID(ctx, did)
254}
255
256// UpdateCommunity updates a community via write-forward to PDS
257func (s *communityService) UpdateCommunity(ctx context.Context, req UpdateCommunityRequest) (*Community, error) {
258 if req.CommunityDID == "" {
259 return nil, NewValidationError("communityDid", "required")
260 }
261
262 if req.UpdatedByDID == "" {
263 return nil, NewValidationError("updatedByDid", "required")
264 }
265
266 // Get existing community
267 existing, err := s.repo.GetByDID(ctx, req.CommunityDID)
268 if err != nil {
269 return nil, err
270 }
271
272 // CRITICAL: Ensure fresh PDS access token before write operation
273 // Community PDS tokens expire every ~2 hours and must be refreshed
274 existing, err = s.EnsureFreshToken(ctx, existing)
275 if err != nil {
276 return nil, fmt.Errorf("failed to ensure fresh credentials: %w", err)
277 }
278
279 // Authorization: verify user is the creator
280 // TODO(Communities-Auth): Add moderator check when moderation system is implemented
281 if existing.CreatedByDID != req.UpdatedByDID {
282 return nil, ErrUnauthorized
283 }
284
285 // Build updated profile record (start with existing)
286 profile := map[string]interface{}{
287 "$type": "social.coves.community.profile",
288 "name": existing.Name,
289 "owner": existing.OwnerDID,
290 "createdBy": existing.CreatedByDID,
291 "hostedBy": existing.HostedByDID,
292 "createdAt": existing.CreatedAt.Format(time.RFC3339),
293 }
294
295 // Apply updates
296 if req.DisplayName != nil {
297 profile["displayName"] = *req.DisplayName
298 } else {
299 profile["displayName"] = existing.DisplayName
300 }
301
302 if req.Description != nil {
303 profile["description"] = *req.Description
304 } else {
305 profile["description"] = existing.Description
306 }
307
308 if req.Visibility != nil {
309 profile["visibility"] = *req.Visibility
310 } else {
311 profile["visibility"] = existing.Visibility
312 }
313
314 if req.AllowExternalDiscovery != nil {
315 profile["federation"] = map[string]interface{}{
316 "allowExternalDiscovery": *req.AllowExternalDiscovery,
317 }
318 } else {
319 profile["federation"] = map[string]interface{}{
320 "allowExternalDiscovery": existing.AllowExternalDiscovery,
321 }
322 }
323
324 // Preserve moderation settings (even if empty)
325 // These fields are optional but should not be erased on update
326 if req.ModerationType != nil {
327 profile["moderationType"] = *req.ModerationType
328 } else if existing.ModerationType != "" {
329 profile["moderationType"] = existing.ModerationType
330 }
331
332 if len(req.ContentWarnings) > 0 {
333 profile["contentWarnings"] = req.ContentWarnings
334 } else if len(existing.ContentWarnings) > 0 {
335 profile["contentWarnings"] = existing.ContentWarnings
336 }
337
338 // V2: Community profiles always use "self" as rkey
339 // (No need to extract from URI - it's always "self" for V2 communities)
340
341 // V2 CRITICAL FIX: Write-forward using COMMUNITY's own DID and credentials
342 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self
343 // Authenticate as the community (not as instance!)
344 if existing.PDSAccessToken == "" {
345 return nil, fmt.Errorf("community %s missing PDS credentials - cannot update", existing.DID)
346 }
347
348 recordURI, recordCID, err := s.putRecordOnPDSAs(
349 ctx,
350 existing.DID, // repo = community's own DID (V2!)
351 "social.coves.community.profile",
352 "self", // V2: always "self"
353 profile,
354 existing.PDSAccessToken, // authenticate as the community
355 )
356 if err != nil {
357 return nil, fmt.Errorf("failed to update community on PDS: %w", err)
358 }
359
360 // Return updated community representation
361 // Actual AppView DB update happens via Jetstream consumer
362 updated := *existing
363 if req.DisplayName != nil {
364 updated.DisplayName = *req.DisplayName
365 }
366 if req.Description != nil {
367 updated.Description = *req.Description
368 }
369 if req.Visibility != nil {
370 updated.Visibility = *req.Visibility
371 }
372 if req.AllowExternalDiscovery != nil {
373 updated.AllowExternalDiscovery = *req.AllowExternalDiscovery
374 }
375 if req.ModerationType != nil {
376 updated.ModerationType = *req.ModerationType
377 }
378 if len(req.ContentWarnings) > 0 {
379 updated.ContentWarnings = req.ContentWarnings
380 }
381 updated.RecordURI = recordURI
382 updated.RecordCID = recordCID
383 updated.UpdatedAt = time.Now()
384
385 return &updated, nil
386}
387
388// getOrCreateRefreshMutex returns a mutex for the given community DID
389// Thread-safe with read-lock fast path for existing entries
390// SAFETY: Does NOT evict entries to avoid race condition where:
391// 1. Thread A holds mutex for community-123
392// 2. Thread B evicts community-123 from map
393// 3. Thread C creates NEW mutex for community-123
394// 4. Now two threads can refresh community-123 concurrently (mutex defeated!)
395func (s *communityService) getOrCreateRefreshMutex(did string) *sync.Mutex {
396 // Fast path: check if mutex already exists (read lock)
397 s.mapMutex.RLock()
398 mutex, exists := s.refreshMutexes[did]
399 s.mapMutex.RUnlock()
400
401 if exists {
402 return mutex
403 }
404
405 // Slow path: create new mutex (write lock)
406 s.mapMutex.Lock()
407 defer s.mapMutex.Unlock()
408
409 // Double-check after acquiring write lock (another goroutine might have created it)
410 mutex, exists = s.refreshMutexes[did]
411 if exists {
412 return mutex
413 }
414
415 // Create new mutex
416 mutex = &sync.Mutex{}
417 s.refreshMutexes[did] = mutex
418
419 // SAFETY: No eviction to prevent race condition
420 // Map will grow beyond maxMutexCacheSize but this is safer than evicting in-use mutexes
421 if len(s.refreshMutexes) > maxMutexCacheSize {
422 memoryKB := len(s.refreshMutexes) * 16 / 1024
423 log.Printf("[TOKEN-REFRESH] WARN: Mutex cache size (%d) exceeds recommended limit (%d) - this is safe but may indicate high community churn. Memory usage: ~%d KB",
424 len(s.refreshMutexes), maxMutexCacheSize, memoryKB)
425 }
426
427 return mutex
428}
429
430// ensureFreshToken checks if a community's access token needs refresh and updates if needed
431// Returns updated community with fresh credentials (or original if no refresh needed)
432// Thread-safe: Uses per-community mutex to prevent concurrent refresh attempts
433// EnsureFreshToken ensures the community's PDS access token is valid
434// Exported for use by post service when writing posts to community repos
435func (s *communityService) EnsureFreshToken(ctx context.Context, community *Community) (*Community, error) {
436 // Get or create mutex for this specific community DID
437 mutex := s.getOrCreateRefreshMutex(community.DID)
438
439 // Lock for this specific community (allows other communities to refresh concurrently)
440 mutex.Lock()
441 defer mutex.Unlock()
442
443 // Re-fetch community from DB (another goroutine might have already refreshed it)
444 fresh, err := s.repo.GetByDID(ctx, community.DID)
445 if err != nil {
446 return nil, fmt.Errorf("failed to re-fetch community: %w", err)
447 }
448
449 // Check if token needs refresh (5-minute buffer before expiration)
450 needsRefresh, err := NeedsRefresh(fresh.PDSAccessToken)
451 if err != nil {
452 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_parse_failed, Error: %v", fresh.DID, err)
453 return nil, fmt.Errorf("failed to check token expiration: %w", err)
454 }
455
456 if !needsRefresh {
457 // Token still valid, no refresh needed
458 return fresh, nil
459 }
460
461 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refresh_started, Message: Access token expiring soon", fresh.DID)
462
463 // Attempt token refresh using refresh token
464 newAccessToken, newRefreshToken, err := refreshPDSToken(ctx, fresh.PDSURL, fresh.PDSAccessToken, fresh.PDSRefreshToken)
465 if err != nil {
466 // Check if refresh token expired (need password fallback)
467 // Match both "ExpiredToken" and "Token has expired" error messages
468 if strings.Contains(strings.ToLower(err.Error()), "expired") {
469 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_token_expired, Message: Re-authenticating with password", fresh.DID)
470
471 // Fallback: Re-authenticate with stored password
472 newAccessToken, newRefreshToken, err = reauthenticateWithPassword(
473 ctx,
474 fresh.PDSURL,
475 fresh.PDSEmail,
476 fresh.PDSPassword, // Retrieved decrypted from DB
477 )
478 if err != nil {
479 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_auth_failed, Error: %v", fresh.DID, err)
480 return nil, fmt.Errorf("failed to re-authenticate community: %w", err)
481 }
482
483 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_fallback_success, Message: Re-authenticated after refresh token expiry", fresh.DID)
484 } else {
485 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_failed, Error: %v", fresh.DID, err)
486 return nil, fmt.Errorf("failed to refresh token: %w", err)
487 }
488 }
489
490 // CRITICAL: Update database with new tokens immediately
491 // Refresh tokens are SINGLE-USE - old one is now invalid
492 // Use retry logic to handle transient DB failures
493 const maxRetries = 3
494 var updateErr error
495 for attempt := 0; attempt < maxRetries; attempt++ {
496 updateErr = s.repo.UpdateCredentials(ctx, fresh.DID, newAccessToken, newRefreshToken)
497 if updateErr == nil {
498 break // Success
499 }
500
501 log.Printf("[TOKEN-REFRESH] Community: %s, Event: db_update_retry, Attempt: %d/%d, Error: %v",
502 fresh.DID, attempt+1, maxRetries, updateErr)
503
504 if attempt < maxRetries-1 {
505 // Exponential backoff: 100ms, 200ms, 400ms
506 backoff := time.Duration(1<<attempt) * 100 * time.Millisecond
507 time.Sleep(backoff)
508 }
509 }
510
511 if updateErr != nil {
512 // CRITICAL: Community is now locked out - old refresh token invalid, new one not saved
513 log.Printf("[TOKEN-REFRESH] CRITICAL: Community %s LOCKED OUT - failed to persist credentials after %d retries: %v",
514 fresh.DID, maxRetries, updateErr)
515 // TODO: Send alert to monitoring system (add in Beta)
516 return nil, fmt.Errorf("failed to persist refreshed credentials after %d retries (COMMUNITY LOCKED OUT): %w",
517 maxRetries, updateErr)
518 }
519
520 // Return updated community object with fresh tokens
521 updatedCommunity := *fresh
522 updatedCommunity.PDSAccessToken = newAccessToken
523 updatedCommunity.PDSRefreshToken = newRefreshToken
524
525 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refreshed, Message: Access token refreshed successfully", fresh.DID)
526
527 return &updatedCommunity, nil
528}
529
530// ListCommunities queries AppView DB for communities with filters
531func (s *communityService) ListCommunities(ctx context.Context, req ListCommunitiesRequest) ([]*Community, int, error) {
532 // Set defaults
533 if req.Limit <= 0 || req.Limit > 100 {
534 req.Limit = 50
535 }
536
537 return s.repo.List(ctx, req)
538}
539
540// SearchCommunities performs fuzzy search in AppView DB
541func (s *communityService) SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error) {
542 if req.Query == "" {
543 return nil, 0, NewValidationError("query", "search query is required")
544 }
545
546 // Set defaults
547 if req.Limit <= 0 || req.Limit > 100 {
548 req.Limit = 50
549 }
550
551 return s.repo.Search(ctx, req)
552}
553
554// SubscribeToCommunity creates a subscription via write-forward to PDS
555func (s *communityService) SubscribeToCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string, contentVisibility int) (*Subscription, error) {
556 if userDID == "" {
557 return nil, NewValidationError("userDid", "required")
558 }
559 if userAccessToken == "" {
560 return nil, NewValidationError("userAccessToken", "required")
561 }
562
563 // Clamp contentVisibility to valid range (1-5), default to 3 if 0 or invalid
564 if contentVisibility <= 0 || contentVisibility > 5 {
565 contentVisibility = 3
566 }
567
568 // Resolve community identifier to DID
569 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
570 if err != nil {
571 return nil, err
572 }
573
574 // Verify community exists
575 community, err := s.repo.GetByDID(ctx, communityDID)
576 if err != nil {
577 return nil, err
578 }
579
580 // Check visibility - can't subscribe to private communities without invitation (TODO)
581 if community.Visibility == "private" {
582 return nil, ErrUnauthorized
583 }
584
585 // Build subscription record
586 // CRITICAL: Collection is social.coves.community.subscription (RECORD TYPE), not social.coves.community.subscribe (XRPC procedure)
587 // This record will be created in the USER's repository: at://user_did/social.coves.community.subscription/{tid}
588 // Following atProto conventions, we use "subject" field to reference the community
589 subRecord := map[string]interface{}{
590 "$type": "social.coves.community.subscription",
591 "subject": communityDID, // atProto convention: "subject" for entity references
592 "createdAt": time.Now().Format(time.RFC3339),
593 "contentVisibility": contentVisibility,
594 }
595
596 // Write-forward: create subscription record in user's repo using their access token
597 // The collection parameter refers to the record type in the repository
598 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", "", subRecord, userAccessToken)
599 if err != nil {
600 return nil, fmt.Errorf("failed to create subscription on PDS: %w", err)
601 }
602
603 // Return subscription representation
604 subscription := &Subscription{
605 UserDID: userDID,
606 CommunityDID: communityDID,
607 ContentVisibility: contentVisibility,
608 SubscribedAt: time.Now(),
609 RecordURI: recordURI,
610 RecordCID: recordCID,
611 }
612
613 return subscription, nil
614}
615
616// UnsubscribeFromCommunity removes a subscription via PDS delete
617func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error {
618 if userDID == "" {
619 return NewValidationError("userDid", "required")
620 }
621 if userAccessToken == "" {
622 return NewValidationError("userAccessToken", "required")
623 }
624
625 // Resolve community identifier
626 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
627 if err != nil {
628 return err
629 }
630
631 // Get the subscription from AppView to find the record key
632 subscription, err := s.repo.GetSubscription(ctx, userDID, communityDID)
633 if err != nil {
634 return err
635 }
636
637 // Extract rkey from record URI (at://did/collection/rkey)
638 rkey := utils.ExtractRKeyFromURI(subscription.RecordURI)
639 if rkey == "" {
640 return fmt.Errorf("invalid subscription record URI")
641 }
642
643 // Write-forward: delete record from PDS using user's access token
644 // CRITICAL: Delete from social.coves.community.subscription (RECORD TYPE), not social.coves.community.unsubscribe
645 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", rkey, userAccessToken); err != nil {
646 return fmt.Errorf("failed to delete subscription on PDS: %w", err)
647 }
648
649 return nil
650}
651
652// GetUserSubscriptions queries AppView DB for user's subscriptions
653func (s *communityService) GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) {
654 if limit <= 0 || limit > 100 {
655 limit = 50
656 }
657
658 return s.repo.ListSubscriptions(ctx, userDID, limit, offset)
659}
660
661// GetCommunitySubscribers queries AppView DB for community subscribers
662func (s *communityService) GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) {
663 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
664 if err != nil {
665 return nil, err
666 }
667
668 if limit <= 0 || limit > 100 {
669 limit = 50
670 }
671
672 return s.repo.ListSubscribers(ctx, communityDID, limit, offset)
673}
674
675// GetMembership retrieves membership info from AppView DB
676func (s *communityService) GetMembership(ctx context.Context, userDID, communityIdentifier string) (*Membership, error) {
677 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
678 if err != nil {
679 return nil, err
680 }
681
682 return s.repo.GetMembership(ctx, userDID, communityDID)
683}
684
685// ListCommunityMembers queries AppView DB for members
686func (s *communityService) ListCommunityMembers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Membership, error) {
687 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
688 if err != nil {
689 return nil, err
690 }
691
692 if limit <= 0 || limit > 100 {
693 limit = 50
694 }
695
696 return s.repo.ListMembers(ctx, communityDID, limit, offset)
697}
698
699// BlockCommunity blocks a community via write-forward to PDS
700func (s *communityService) BlockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) (*CommunityBlock, error) {
701 if userDID == "" {
702 return nil, NewValidationError("userDid", "required")
703 }
704 if userAccessToken == "" {
705 return nil, NewValidationError("userAccessToken", "required")
706 }
707
708 // Resolve community identifier (also verifies community exists)
709 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
710 if err != nil {
711 return nil, err
712 }
713
714 // Build block record
715 // CRITICAL: Collection is social.coves.community.block (RECORD TYPE)
716 // This record will be created in the USER's repository: at://user_did/social.coves.community.block/{tid}
717 // Following atProto conventions and Bluesky's app.bsky.graph.block pattern
718 blockRecord := map[string]interface{}{
719 "$type": "social.coves.community.block",
720 "subject": communityDID, // DID of community being blocked
721 "createdAt": time.Now().Format(time.RFC3339),
722 }
723
724 // Write-forward: create block record in user's repo using their access token
725 // Note: We don't check for existing blocks first because:
726 // 1. The PDS may reject duplicates (depending on implementation)
727 // 2. The repository layer handles idempotency with ON CONFLICT DO NOTHING
728 // 3. This avoids a race condition where two concurrent requests both pass the check
729 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.block", "", blockRecord, userAccessToken)
730 if err != nil {
731 // Check if this is a duplicate/conflict error from PDS
732 // PDS should return 409 Conflict for duplicate records, but we also check common error messages
733 // for compatibility with different PDS implementations
734 errMsg := err.Error()
735 isDuplicate := strings.Contains(errMsg, "status 409") || // HTTP 409 Conflict
736 strings.Contains(errMsg, "duplicate") ||
737 strings.Contains(errMsg, "already exists") ||
738 strings.Contains(errMsg, "AlreadyExists")
739
740 if isDuplicate {
741 // Fetch and return existing block from our indexed view
742 existingBlock, getErr := s.repo.GetBlock(ctx, userDID, communityDID)
743 if getErr == nil {
744 // Block exists in our index - return it
745 return existingBlock, nil
746 }
747 // Only treat as "already exists" if the error is ErrBlockNotFound (race condition)
748 // Any other error (DB outage, connection failure, etc.) should bubble up
749 if errors.Is(getErr, ErrBlockNotFound) {
750 // Race condition: PDS has the block but Jetstream hasn't indexed it yet
751 // Return typed conflict error so handler can return 409 instead of 500
752 // This is normal in eventually-consistent systems
753 return nil, ErrBlockAlreadyExists
754 }
755 // Real datastore error - bubble it up so operators see the failure
756 return nil, fmt.Errorf("PDS reported duplicate block but failed to fetch from index: %w", getErr)
757 }
758 return nil, fmt.Errorf("failed to create block on PDS: %w", err)
759 }
760
761 // Return block representation
762 block := &CommunityBlock{
763 UserDID: userDID,
764 CommunityDID: communityDID,
765 BlockedAt: time.Now(),
766 RecordURI: recordURI,
767 RecordCID: recordCID,
768 }
769
770 return block, nil
771}
772
773// UnblockCommunity removes a block via PDS delete
774func (s *communityService) UnblockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error {
775 if userDID == "" {
776 return NewValidationError("userDid", "required")
777 }
778 if userAccessToken == "" {
779 return NewValidationError("userAccessToken", "required")
780 }
781
782 // Resolve community identifier
783 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
784 if err != nil {
785 return err
786 }
787
788 // Get the block from AppView to find the record key
789 block, err := s.repo.GetBlock(ctx, userDID, communityDID)
790 if err != nil {
791 return err
792 }
793
794 // Extract rkey from record URI (at://did/collection/rkey)
795 rkey := utils.ExtractRKeyFromURI(block.RecordURI)
796 if rkey == "" {
797 return fmt.Errorf("invalid block record URI")
798 }
799
800 // Write-forward: delete record from PDS using user's access token
801 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.block", rkey, userAccessToken); err != nil {
802 return fmt.Errorf("failed to delete block on PDS: %w", err)
803 }
804
805 return nil
806}
807
808// GetBlockedCommunities queries AppView DB for user's blocks
809func (s *communityService) GetBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*CommunityBlock, error) {
810 if limit <= 0 || limit > 100 {
811 limit = 50
812 }
813
814 return s.repo.ListBlockedCommunities(ctx, userDID, limit, offset)
815}
816
817// IsBlocked checks if a user has blocked a community
818func (s *communityService) IsBlocked(ctx context.Context, userDID, communityIdentifier string) (bool, error) {
819 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
820 if err != nil {
821 return false, err
822 }
823
824 return s.repo.IsBlocked(ctx, userDID, communityDID)
825}
826
827// ValidateHandle checks if a community handle is valid
828func (s *communityService) ValidateHandle(handle string) error {
829 if handle == "" {
830 return NewValidationError("handle", "required")
831 }
832
833 if !communityHandleRegex.MatchString(handle) {
834 return ErrInvalidHandle
835 }
836
837 return nil
838}
839
840// ResolveCommunityIdentifier converts a community identifier to a DID
841// Following Bluesky's pattern with Coves extensions:
842//
843// Accepts (like Bluesky's at-identifier):
844// 1. DID: did:plc:abc123 (pass through)
845// 2. Canonical handle: gardening.community.coves.social (atProto standard)
846// 3. At-identifier: @gardening.community.coves.social (strip @ prefix)
847//
848// Coves-specific extensions:
849// 4. Scoped format: !gardening@coves.social (parse and resolve)
850//
851// Returns: DID string
852func (s *communityService) ResolveCommunityIdentifier(ctx context.Context, identifier string) (string, error) {
853 identifier = strings.TrimSpace(identifier)
854
855 if identifier == "" {
856 return "", ErrInvalidInput
857 }
858
859 // 1. DID - verify it exists and return (Bluesky standard)
860 if strings.HasPrefix(identifier, "did:") {
861 _, err := s.repo.GetByDID(ctx, identifier)
862 if err != nil {
863 if IsNotFound(err) {
864 return "", fmt.Errorf("community not found for DID %s: %w", identifier, err)
865 }
866 return "", fmt.Errorf("failed to verify community DID %s: %w", identifier, err)
867 }
868 return identifier, nil
869 }
870
871 // 2. Scoped format: !name@instance (Coves-specific)
872 if strings.HasPrefix(identifier, "!") {
873 return s.resolveScopedIdentifier(ctx, identifier)
874 }
875
876 // 3. At-identifier format: @handle (Bluesky standard - strip @ prefix)
877 identifier = strings.TrimPrefix(identifier, "@")
878
879 // 4. Canonical handle: name.community.instance.com (Bluesky standard)
880 if strings.Contains(identifier, ".") {
881 community, err := s.repo.GetByHandle(ctx, strings.ToLower(identifier))
882 if err != nil {
883 return "", fmt.Errorf("community not found for handle %s: %w", identifier, err)
884 }
885 return community.DID, nil
886 }
887
888 return "", NewValidationError("identifier", "must be a DID, handle, or scoped identifier (!name@instance)")
889}
890
891// resolveScopedIdentifier handles Coves-specific !name@instance format
892// Formats accepted:
893//
894// !gardening@coves.social -> gardening.community.coves.social
895func (s *communityService) resolveScopedIdentifier(ctx context.Context, scoped string) (string, error) {
896 // Remove ! prefix
897 scoped = strings.TrimPrefix(scoped, "!")
898
899 var name string
900 var instanceDomain string
901
902 // Parse !name@instance
903 if !strings.Contains(scoped, "@") {
904 return "", NewValidationError("identifier", "scoped identifier must include @ symbol (!name@instance)")
905 }
906
907 parts := strings.SplitN(scoped, "@", 2)
908 name = strings.TrimSpace(parts[0])
909 instanceDomain = strings.TrimSpace(parts[1])
910
911 // Validate name format
912 if name == "" {
913 return "", NewValidationError("identifier", "community name cannot be empty")
914 }
915
916 // Validate name is a valid DNS label (RFC 1035)
917 // Must be 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen
918 if !isValidDNSLabel(name) {
919 return "", NewValidationError("identifier", "community name must be valid DNS label (alphanumeric and hyphens only, 1-63 chars, cannot start or end with hyphen)")
920 }
921
922 // Validate instance domain format
923 if !isValidDomain(instanceDomain) {
924 return "", NewValidationError("identifier", "invalid instance domain format")
925 }
926
927 // Normalize domain to lowercase (DNS is case-insensitive)
928 // This fixes the bug where !gardening@Coves.social would fail lookup
929 instanceDomain = strings.ToLower(instanceDomain)
930
931 // Validate the instance matches this server
932 if !s.isLocalInstance(instanceDomain) {
933 return "", NewValidationError("identifier",
934 fmt.Sprintf("community is not hosted on this instance (expected @%s)", s.instanceDomain))
935 }
936
937 // Construct canonical handle: {name}.community.{instanceDomain}
938 // Both name and instanceDomain are normalized to lowercase for consistent DB lookup
939 canonicalHandle := fmt.Sprintf("%s.community.%s",
940 strings.ToLower(name),
941 instanceDomain) // Already normalized to lowercase on line 923
942
943 // Look up by canonical handle
944 community, err := s.repo.GetByHandle(ctx, canonicalHandle)
945 if err != nil {
946 return "", fmt.Errorf("community not found for scoped identifier !%s@%s: %w", name, instanceDomain, err)
947 }
948
949 return community.DID, nil
950}
951
952// isLocalInstance checks if the provided domain matches this instance
953func (s *communityService) isLocalInstance(domain string) bool {
954 // Normalize both domains
955 domain = strings.ToLower(strings.TrimSpace(domain))
956 instanceDomain := strings.ToLower(s.instanceDomain)
957
958 // Direct match
959 return domain == instanceDomain
960}
961
962// Validation helpers
963
964// isValidDNSLabel validates that a string is a valid DNS label per RFC 1035
965// - 1-63 characters
966// - Alphanumeric and hyphens only
967// - Cannot start or end with hyphen
968func isValidDNSLabel(label string) bool {
969 return dnsLabelRegex.MatchString(label)
970}
971
972// isValidDomain validates that a string is a valid domain name
973// Simplified validation - checks basic DNS hostname structure
974func isValidDomain(domain string) bool {
975 if domain == "" || len(domain) > 253 {
976 return false
977 }
978 return domainRegex.MatchString(domain)
979}
980
981func (s *communityService) validateCreateRequest(req CreateCommunityRequest) error {
982 if req.Name == "" {
983 return NewValidationError("name", "required")
984 }
985
986 // DNS label limit: 63 characters per label
987 // Community handle format: {name}.community.{instanceDomain}
988 // The first label is just req.Name, so it must be <= 63 chars
989 if len(req.Name) > 63 {
990 return NewValidationError("name", "must be 63 characters or less (DNS label limit)")
991 }
992
993 // Name can only contain alphanumeric and hyphens
994 // Must start and end with alphanumeric (not hyphen)
995 nameRegex := regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
996 if !nameRegex.MatchString(req.Name) {
997 return NewValidationError("name", "must contain only alphanumeric characters and hyphens")
998 }
999
1000 if req.Description != "" && len(req.Description) > 3000 {
1001 return NewValidationError("description", "must be 3000 characters or less")
1002 }
1003
1004 // Visibility should already be set with default in CreateCommunity
1005 if req.Visibility != "public" && req.Visibility != "unlisted" && req.Visibility != "private" {
1006 return ErrInvalidVisibility
1007 }
1008
1009 if req.CreatedByDID == "" {
1010 return NewValidationError("createdByDid", "required")
1011 }
1012
1013 // hostedByDID is auto-populated by the service layer, no validation needed
1014 // The handler ensures clients cannot provide this field
1015
1016 return nil
1017}
1018
1019// PDS write-forward helpers
1020
1021// createRecordOnPDSAs creates a record with a specific access token (for V2 community auth)
1022func (s *communityService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) {
1023 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/"))
1024
1025 payload := map[string]interface{}{
1026 "repo": repoDID,
1027 "collection": collection,
1028 "record": record,
1029 }
1030
1031 if rkey != "" {
1032 payload["rkey"] = rkey
1033 }
1034
1035 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
1036}
1037
1038// putRecordOnPDSAs updates a record with a specific access token (for V2 community auth)
1039func (s *communityService) putRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) {
1040 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.putRecord", strings.TrimSuffix(s.pdsURL, "/"))
1041
1042 payload := map[string]interface{}{
1043 "repo": repoDID,
1044 "collection": collection,
1045 "rkey": rkey,
1046 "record": record,
1047 }
1048
1049 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
1050}
1051
1052// deleteRecordOnPDSAs deletes a record with a specific access token (for user-scoped deletions)
1053func (s *communityService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey, accessToken string) error {
1054 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/"))
1055
1056 payload := map[string]interface{}{
1057 "repo": repoDID,
1058 "collection": collection,
1059 "rkey": rkey,
1060 }
1061
1062 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
1063 return err
1064}
1065
1066// callPDSWithAuth makes a PDS call with a specific access token (V2: for community authentication)
1067func (s *communityService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) {
1068 jsonData, err := json.Marshal(payload)
1069 if err != nil {
1070 return "", "", fmt.Errorf("failed to marshal payload: %w", err)
1071 }
1072
1073 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData))
1074 if err != nil {
1075 return "", "", fmt.Errorf("failed to create request: %w", err)
1076 }
1077 req.Header.Set("Content-Type", "application/json")
1078
1079 // Add authentication with provided access token
1080 if accessToken != "" {
1081 req.Header.Set("Authorization", "Bearer "+accessToken)
1082 }
1083
1084 // Dynamic timeout based on operation type
1085 // Write operations (createAccount, createRecord, putRecord) are slower due to:
1086 // - Keypair generation
1087 // - DID PLC registration
1088 // - Database writes on PDS
1089 timeout := 10 * time.Second // Default for read operations
1090 if strings.Contains(endpoint, "createAccount") ||
1091 strings.Contains(endpoint, "createRecord") ||
1092 strings.Contains(endpoint, "putRecord") {
1093 timeout = 30 * time.Second // Extended timeout for write operations
1094 }
1095
1096 client := &http.Client{Timeout: timeout}
1097 resp, err := client.Do(req)
1098 if err != nil {
1099 return "", "", fmt.Errorf("failed to call PDS: %w", err)
1100 }
1101 defer func() {
1102 if closeErr := resp.Body.Close(); closeErr != nil {
1103 log.Printf("Failed to close response body: %v", closeErr)
1104 }
1105 }()
1106
1107 body, err := io.ReadAll(resp.Body)
1108 if err != nil {
1109 return "", "", fmt.Errorf("failed to read response: %w", err)
1110 }
1111
1112 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
1113 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
1114 }
1115
1116 // Parse response to extract URI and CID
1117 var result struct {
1118 URI string `json:"uri"`
1119 CID string `json:"cid"`
1120 }
1121 if err := json.Unmarshal(body, &result); err != nil {
1122 // For delete operations, there might not be a response body
1123 if method == "POST" && strings.Contains(endpoint, "deleteRecord") {
1124 return "", "", nil
1125 }
1126 return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
1127 }
1128
1129 return result.URI, result.CID, nil
1130}
1131
1132// Helper functions