A community based topic aggregation platform built on atproto
1package communities
2
3import (
4 "Coves/internal/atproto/utils"
5 "bytes"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log"
12 "net/http"
13 "regexp"
14 "strings"
15 "sync"
16 "time"
17)
18
19// Community handle validation regex (DNS-valid handle: name.communities.instance.com)
20// Matches standard DNS hostname format (RFC 1035)
21var communityHandleRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
22
23type communityService struct {
24 // Interfaces and pointers first (better alignment)
25 repo Repository
26 provisioner *PDSAccountProvisioner
27
28 // Token refresh concurrency control
29 // Each community gets its own mutex to prevent concurrent refresh attempts
30 refreshMutexes map[string]*sync.Mutex
31
32 // Strings
33 pdsURL string
34 instanceDID string
35 instanceDomain string
36 pdsAccessToken string
37
38 // Sync primitives last
39 mapMutex sync.RWMutex // Protects refreshMutexes map itself
40}
41
42const (
43 // Maximum recommended size for mutex cache (warning threshold, not hard limit)
44 // At 10,000 entries × 16 bytes = ~160KB memory (negligible overhead)
45 // Map can grow larger in production - even 100,000 entries = 1.6MB is acceptable
46 maxMutexCacheSize = 10000
47)
48
49// NewCommunityService creates a new community service
50func NewCommunityService(repo Repository, pdsURL, instanceDID, instanceDomain string, provisioner *PDSAccountProvisioner) Service {
51 // SECURITY: Basic validation that did:web domain matches configured instanceDomain
52 // This catches honest configuration mistakes but NOT malicious code modifications
53 // Full verification (Phase 2) requires fetching DID document from domain
54 // See: docs/PRD_BACKLOG.md - "did:web Domain Verification"
55 if strings.HasPrefix(instanceDID, "did:web:") {
56 didDomain := strings.TrimPrefix(instanceDID, "did:web:")
57 if didDomain != instanceDomain {
58 log.Printf("⚠️ SECURITY WARNING: Instance DID domain (%s) doesn't match configured domain (%s)",
59 didDomain, instanceDomain)
60 log.Printf(" This could indicate a configuration error or potential domain spoofing attempt")
61 log.Printf(" Communities will be hosted by: %s", instanceDID)
62 }
63 }
64
65 return &communityService{
66 repo: repo,
67 pdsURL: pdsURL,
68 instanceDID: instanceDID,
69 instanceDomain: instanceDomain,
70 provisioner: provisioner,
71 refreshMutexes: make(map[string]*sync.Mutex),
72 }
73}
74
75// SetPDSAccessToken sets the PDS access token for authentication
76// This should be called after creating a session for the Coves instance DID on the PDS
77func (s *communityService) SetPDSAccessToken(token string) {
78 s.pdsAccessToken = token
79}
80
81// CreateCommunity creates a new community via write-forward to PDS
82// V2 Flow:
83// 1. Service creates PDS account for community (PDS generates signing keypair)
84// 2. Service writes community profile to COMMUNITY's own repository
85// 3. Firehose emits event
86// 4. Consumer indexes to AppView DB
87//
88// V2 Architecture:
89// - Community owns its own repository (at://community_did/social.coves.community.profile/self)
90// - PDS manages the signing keypair (we never see it)
91// - We store PDS credentials to act on behalf of the community
92// - Community can migrate to other instances (future V2.1 with rotation keys)
93func (s *communityService) CreateCommunity(ctx context.Context, req CreateCommunityRequest) (*Community, error) {
94 // Apply defaults before validation
95 if req.Visibility == "" {
96 req.Visibility = "public"
97 }
98
99 // SECURITY: Auto-populate hostedByDID from instance configuration
100 // Clients MUST NOT provide this field - it's derived from the instance receiving the request
101 // This prevents malicious instances from claiming to host communities for domains they don't own
102 req.HostedByDID = s.instanceDID
103
104 // Validate request
105 if err := s.validateCreateRequest(req); err != nil {
106 return nil, err
107 }
108
109 // V2: Provision a real PDS account for this community
110 // This calls com.atproto.server.createAccount internally
111 // The PDS will:
112 // 1. Generate a signing keypair (stored in PDS, we never see it)
113 // 2. Create a DID (did:plc:xxx)
114 // 3. Return credentials (DID, tokens)
115 pdsAccount, err := s.provisioner.ProvisionCommunityAccount(ctx, req.Name)
116 if err != nil {
117 return nil, fmt.Errorf("failed to provision PDS account for community: %w", err)
118 }
119
120 // Validate the atProto handle
121 if validateErr := s.ValidateHandle(pdsAccount.Handle); validateErr != nil {
122 return nil, fmt.Errorf("generated atProto handle is invalid: %w", validateErr)
123 }
124
125 // Build community profile record
126 profile := map[string]interface{}{
127 "$type": "social.coves.community.profile",
128 "handle": pdsAccount.Handle, // atProto handle (e.g., gaming.communities.coves.social)
129 "name": req.Name, // Short name for !mentions (e.g., "gaming")
130 "visibility": req.Visibility,
131 "hostedBy": s.instanceDID, // V2: Instance hosts, community owns
132 "createdBy": req.CreatedByDID,
133 "createdAt": time.Now().Format(time.RFC3339),
134 "federation": map[string]interface{}{
135 "allowExternalDiscovery": req.AllowExternalDiscovery,
136 },
137 }
138
139 // Add optional fields
140 if req.DisplayName != "" {
141 profile["displayName"] = req.DisplayName
142 }
143 if req.Description != "" {
144 profile["description"] = req.Description
145 }
146 if len(req.Rules) > 0 {
147 profile["rules"] = req.Rules
148 }
149 if len(req.Categories) > 0 {
150 profile["categories"] = req.Categories
151 }
152 if req.Language != "" {
153 profile["language"] = req.Language
154 }
155
156 // Initialize counts
157 profile["memberCount"] = 0
158 profile["subscriberCount"] = 0
159
160 // TODO: Handle avatar and banner blobs
161 // For now, we'll skip blob uploads. This would require:
162 // 1. Upload blob to PDS via com.atproto.repo.uploadBlob
163 // 2. Get blob ref (CID)
164 // 3. Add to profile record
165
166 // V2: Write to COMMUNITY's own repository (not instance repo!)
167 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self
168 // Authenticate using community's access token
169 recordURI, recordCID, err := s.createRecordOnPDSAs(
170 ctx,
171 pdsAccount.DID, // repo = community's DID (community owns its repo!)
172 "social.coves.community.profile",
173 "self", // canonical rkey for profile
174 profile,
175 pdsAccount.AccessToken, // authenticate as the community
176 )
177 if err != nil {
178 return nil, fmt.Errorf("failed to create community profile record: %w", err)
179 }
180
181 // Build Community object with PDS credentials AND cryptographic keys
182 community := &Community{
183 DID: pdsAccount.DID, // Community's DID (owns the repo!)
184 Handle: pdsAccount.Handle, // atProto handle (e.g., gaming.communities.coves.social)
185 Name: req.Name,
186 DisplayName: req.DisplayName,
187 Description: req.Description,
188 OwnerDID: pdsAccount.DID, // V2: Community owns itself
189 CreatedByDID: req.CreatedByDID,
190 HostedByDID: req.HostedByDID,
191 PDSEmail: pdsAccount.Email,
192 PDSPassword: pdsAccount.Password,
193 PDSAccessToken: pdsAccount.AccessToken,
194 PDSRefreshToken: pdsAccount.RefreshToken,
195 PDSURL: pdsAccount.PDSURL,
196 Visibility: req.Visibility,
197 AllowExternalDiscovery: req.AllowExternalDiscovery,
198 MemberCount: 0,
199 SubscriberCount: 0,
200 CreatedAt: time.Now(),
201 UpdatedAt: time.Now(),
202 RecordURI: recordURI,
203 RecordCID: recordCID,
204 // V2: Cryptographic keys for portability (will be encrypted by repository)
205 RotationKeyPEM: pdsAccount.RotationKeyPEM, // CRITICAL: Enables DID migration
206 SigningKeyPEM: pdsAccount.SigningKeyPEM, // For atproto operations
207 }
208
209 // CRITICAL: Persist PDS credentials immediately to database
210 // The Jetstream consumer will eventually index the community profile from the firehose,
211 // but it won't have the PDS credentials. We must store them now so we can:
212 // 1. Update the community profile later (using its own credentials)
213 // 2. Re-authenticate if access tokens expire
214 _, err = s.repo.Create(ctx, community)
215 if err != nil {
216 return nil, fmt.Errorf("failed to persist community with credentials: %w", err)
217 }
218
219 return community, nil
220}
221
222// GetCommunity retrieves a community from AppView DB
223// identifier can be either a DID or handle
224func (s *communityService) GetCommunity(ctx context.Context, identifier string) (*Community, error) {
225 if identifier == "" {
226 return nil, ErrInvalidInput
227 }
228
229 // Determine if identifier is DID or handle
230 if strings.HasPrefix(identifier, "did:") {
231 return s.repo.GetByDID(ctx, identifier)
232 }
233
234 if strings.HasPrefix(identifier, "!") {
235 return s.repo.GetByHandle(ctx, identifier)
236 }
237
238 return nil, NewValidationError("identifier", "must be a DID or handle")
239}
240
241// UpdateCommunity updates a community via write-forward to PDS
242func (s *communityService) UpdateCommunity(ctx context.Context, req UpdateCommunityRequest) (*Community, error) {
243 if req.CommunityDID == "" {
244 return nil, NewValidationError("communityDid", "required")
245 }
246
247 if req.UpdatedByDID == "" {
248 return nil, NewValidationError("updatedByDid", "required")
249 }
250
251 // Get existing community
252 existing, err := s.repo.GetByDID(ctx, req.CommunityDID)
253 if err != nil {
254 return nil, err
255 }
256
257 // CRITICAL: Ensure fresh PDS access token before write operation
258 // Community PDS tokens expire every ~2 hours and must be refreshed
259 existing, err = s.ensureFreshToken(ctx, existing)
260 if err != nil {
261 return nil, fmt.Errorf("failed to ensure fresh credentials: %w", err)
262 }
263
264 // Authorization: verify user is the creator
265 // TODO(Communities-Auth): Add moderator check when moderation system is implemented
266 if existing.CreatedByDID != req.UpdatedByDID {
267 return nil, ErrUnauthorized
268 }
269
270 // Build updated profile record (start with existing)
271 profile := map[string]interface{}{
272 "$type": "social.coves.community.profile",
273 "handle": existing.Handle,
274 "name": existing.Name,
275 "owner": existing.OwnerDID,
276 "createdBy": existing.CreatedByDID,
277 "hostedBy": existing.HostedByDID,
278 "createdAt": existing.CreatedAt.Format(time.RFC3339),
279 }
280
281 // Apply updates
282 if req.DisplayName != nil {
283 profile["displayName"] = *req.DisplayName
284 } else {
285 profile["displayName"] = existing.DisplayName
286 }
287
288 if req.Description != nil {
289 profile["description"] = *req.Description
290 } else {
291 profile["description"] = existing.Description
292 }
293
294 if req.Visibility != nil {
295 profile["visibility"] = *req.Visibility
296 } else {
297 profile["visibility"] = existing.Visibility
298 }
299
300 if req.AllowExternalDiscovery != nil {
301 profile["federation"] = map[string]interface{}{
302 "allowExternalDiscovery": *req.AllowExternalDiscovery,
303 }
304 } else {
305 profile["federation"] = map[string]interface{}{
306 "allowExternalDiscovery": existing.AllowExternalDiscovery,
307 }
308 }
309
310 // Preserve moderation settings (even if empty)
311 // These fields are optional but should not be erased on update
312 if req.ModerationType != nil {
313 profile["moderationType"] = *req.ModerationType
314 } else if existing.ModerationType != "" {
315 profile["moderationType"] = existing.ModerationType
316 }
317
318 if len(req.ContentWarnings) > 0 {
319 profile["contentWarnings"] = req.ContentWarnings
320 } else if len(existing.ContentWarnings) > 0 {
321 profile["contentWarnings"] = existing.ContentWarnings
322 }
323
324 // Preserve counts
325 profile["memberCount"] = existing.MemberCount
326 profile["subscriberCount"] = existing.SubscriberCount
327
328 // V2: Community profiles always use "self" as rkey
329 // (No need to extract from URI - it's always "self" for V2 communities)
330
331 // V2 CRITICAL FIX: Write-forward using COMMUNITY's own DID and credentials
332 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self
333 // Authenticate as the community (not as instance!)
334 if existing.PDSAccessToken == "" {
335 return nil, fmt.Errorf("community %s missing PDS credentials - cannot update", existing.DID)
336 }
337
338 recordURI, recordCID, err := s.putRecordOnPDSAs(
339 ctx,
340 existing.DID, // repo = community's own DID (V2!)
341 "social.coves.community.profile",
342 "self", // V2: always "self"
343 profile,
344 existing.PDSAccessToken, // authenticate as the community
345 )
346 if err != nil {
347 return nil, fmt.Errorf("failed to update community on PDS: %w", err)
348 }
349
350 // Return updated community representation
351 // Actual AppView DB update happens via Jetstream consumer
352 updated := *existing
353 if req.DisplayName != nil {
354 updated.DisplayName = *req.DisplayName
355 }
356 if req.Description != nil {
357 updated.Description = *req.Description
358 }
359 if req.Visibility != nil {
360 updated.Visibility = *req.Visibility
361 }
362 if req.AllowExternalDiscovery != nil {
363 updated.AllowExternalDiscovery = *req.AllowExternalDiscovery
364 }
365 if req.ModerationType != nil {
366 updated.ModerationType = *req.ModerationType
367 }
368 if len(req.ContentWarnings) > 0 {
369 updated.ContentWarnings = req.ContentWarnings
370 }
371 updated.RecordURI = recordURI
372 updated.RecordCID = recordCID
373 updated.UpdatedAt = time.Now()
374
375 return &updated, nil
376}
377
378// getOrCreateRefreshMutex returns a mutex for the given community DID
379// Thread-safe with read-lock fast path for existing entries
380// SAFETY: Does NOT evict entries to avoid race condition where:
381// 1. Thread A holds mutex for community-123
382// 2. Thread B evicts community-123 from map
383// 3. Thread C creates NEW mutex for community-123
384// 4. Now two threads can refresh community-123 concurrently (mutex defeated!)
385func (s *communityService) getOrCreateRefreshMutex(did string) *sync.Mutex {
386 // Fast path: check if mutex already exists (read lock)
387 s.mapMutex.RLock()
388 mutex, exists := s.refreshMutexes[did]
389 s.mapMutex.RUnlock()
390
391 if exists {
392 return mutex
393 }
394
395 // Slow path: create new mutex (write lock)
396 s.mapMutex.Lock()
397 defer s.mapMutex.Unlock()
398
399 // Double-check after acquiring write lock (another goroutine might have created it)
400 mutex, exists = s.refreshMutexes[did]
401 if exists {
402 return mutex
403 }
404
405 // Create new mutex
406 mutex = &sync.Mutex{}
407 s.refreshMutexes[did] = mutex
408
409 // SAFETY: No eviction to prevent race condition
410 // Map will grow beyond maxMutexCacheSize but this is safer than evicting in-use mutexes
411 if len(s.refreshMutexes) > maxMutexCacheSize {
412 memoryKB := len(s.refreshMutexes) * 16 / 1024
413 log.Printf("[TOKEN-REFRESH] WARN: Mutex cache size (%d) exceeds recommended limit (%d) - this is safe but may indicate high community churn. Memory usage: ~%d KB",
414 len(s.refreshMutexes), maxMutexCacheSize, memoryKB)
415 }
416
417 return mutex
418}
419
420// ensureFreshToken checks if a community's access token needs refresh and updates if needed
421// Returns updated community with fresh credentials (or original if no refresh needed)
422// Thread-safe: Uses per-community mutex to prevent concurrent refresh attempts
423func (s *communityService) ensureFreshToken(ctx context.Context, community *Community) (*Community, error) {
424 // Get or create mutex for this specific community DID
425 mutex := s.getOrCreateRefreshMutex(community.DID)
426
427 // Lock for this specific community (allows other communities to refresh concurrently)
428 mutex.Lock()
429 defer mutex.Unlock()
430
431 // Re-fetch community from DB (another goroutine might have already refreshed it)
432 fresh, err := s.repo.GetByDID(ctx, community.DID)
433 if err != nil {
434 return nil, fmt.Errorf("failed to re-fetch community: %w", err)
435 }
436
437 // Check if token needs refresh (5-minute buffer before expiration)
438 needsRefresh, err := NeedsRefresh(fresh.PDSAccessToken)
439 if err != nil {
440 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_parse_failed, Error: %v", fresh.DID, err)
441 return nil, fmt.Errorf("failed to check token expiration: %w", err)
442 }
443
444 if !needsRefresh {
445 // Token still valid, no refresh needed
446 return fresh, nil
447 }
448
449 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refresh_started, Message: Access token expiring soon", fresh.DID)
450
451 // Attempt token refresh using refresh token
452 newAccessToken, newRefreshToken, err := refreshPDSToken(ctx, fresh.PDSURL, fresh.PDSAccessToken, fresh.PDSRefreshToken)
453 if err != nil {
454 // Check if refresh token expired (need password fallback)
455 if strings.Contains(err.Error(), "expired or invalid") {
456 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_token_expired, Message: Re-authenticating with password", fresh.DID)
457
458 // Fallback: Re-authenticate with stored password
459 newAccessToken, newRefreshToken, err = reauthenticateWithPassword(
460 ctx,
461 fresh.PDSURL,
462 fresh.PDSEmail,
463 fresh.PDSPassword, // Retrieved decrypted from DB
464 )
465 if err != nil {
466 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_auth_failed, Error: %v", fresh.DID, err)
467 return nil, fmt.Errorf("failed to re-authenticate community: %w", err)
468 }
469
470 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_fallback_success, Message: Re-authenticated after refresh token expiry", fresh.DID)
471 } else {
472 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_failed, Error: %v", fresh.DID, err)
473 return nil, fmt.Errorf("failed to refresh token: %w", err)
474 }
475 }
476
477 // CRITICAL: Update database with new tokens immediately
478 // Refresh tokens are SINGLE-USE - old one is now invalid
479 // Use retry logic to handle transient DB failures
480 const maxRetries = 3
481 var updateErr error
482 for attempt := 0; attempt < maxRetries; attempt++ {
483 updateErr = s.repo.UpdateCredentials(ctx, fresh.DID, newAccessToken, newRefreshToken)
484 if updateErr == nil {
485 break // Success
486 }
487
488 log.Printf("[TOKEN-REFRESH] Community: %s, Event: db_update_retry, Attempt: %d/%d, Error: %v",
489 fresh.DID, attempt+1, maxRetries, updateErr)
490
491 if attempt < maxRetries-1 {
492 // Exponential backoff: 100ms, 200ms, 400ms
493 backoff := time.Duration(1<<attempt) * 100 * time.Millisecond
494 time.Sleep(backoff)
495 }
496 }
497
498 if updateErr != nil {
499 // CRITICAL: Community is now locked out - old refresh token invalid, new one not saved
500 log.Printf("[TOKEN-REFRESH] CRITICAL: Community %s LOCKED OUT - failed to persist credentials after %d retries: %v",
501 fresh.DID, maxRetries, updateErr)
502 // TODO: Send alert to monitoring system (add in Beta)
503 return nil, fmt.Errorf("failed to persist refreshed credentials after %d retries (COMMUNITY LOCKED OUT): %w",
504 maxRetries, updateErr)
505 }
506
507 // Return updated community object with fresh tokens
508 updatedCommunity := *fresh
509 updatedCommunity.PDSAccessToken = newAccessToken
510 updatedCommunity.PDSRefreshToken = newRefreshToken
511
512 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refreshed, Message: Access token refreshed successfully", fresh.DID)
513
514 return &updatedCommunity, nil
515}
516
517// ListCommunities queries AppView DB for communities with filters
518func (s *communityService) ListCommunities(ctx context.Context, req ListCommunitiesRequest) ([]*Community, int, error) {
519 // Set defaults
520 if req.Limit <= 0 || req.Limit > 100 {
521 req.Limit = 50
522 }
523
524 return s.repo.List(ctx, req)
525}
526
527// SearchCommunities performs fuzzy search in AppView DB
528func (s *communityService) SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error) {
529 if req.Query == "" {
530 return nil, 0, NewValidationError("query", "search query is required")
531 }
532
533 // Set defaults
534 if req.Limit <= 0 || req.Limit > 100 {
535 req.Limit = 50
536 }
537
538 return s.repo.Search(ctx, req)
539}
540
541// SubscribeToCommunity creates a subscription via write-forward to PDS
542func (s *communityService) SubscribeToCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string, contentVisibility int) (*Subscription, error) {
543 if userDID == "" {
544 return nil, NewValidationError("userDid", "required")
545 }
546 if userAccessToken == "" {
547 return nil, NewValidationError("userAccessToken", "required")
548 }
549
550 // Clamp contentVisibility to valid range (1-5), default to 3 if 0 or invalid
551 if contentVisibility <= 0 || contentVisibility > 5 {
552 contentVisibility = 3
553 }
554
555 // Resolve community identifier to DID
556 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
557 if err != nil {
558 return nil, err
559 }
560
561 // Verify community exists
562 community, err := s.repo.GetByDID(ctx, communityDID)
563 if err != nil {
564 return nil, err
565 }
566
567 // Check visibility - can't subscribe to private communities without invitation (TODO)
568 if community.Visibility == "private" {
569 return nil, ErrUnauthorized
570 }
571
572 // Build subscription record
573 // CRITICAL: Collection is social.coves.community.subscription (RECORD TYPE), not social.coves.community.subscribe (XRPC procedure)
574 // This record will be created in the USER's repository: at://user_did/social.coves.community.subscription/{tid}
575 // Following atProto conventions, we use "subject" field to reference the community
576 subRecord := map[string]interface{}{
577 "$type": "social.coves.community.subscription",
578 "subject": communityDID, // atProto convention: "subject" for entity references
579 "createdAt": time.Now().Format(time.RFC3339),
580 "contentVisibility": contentVisibility,
581 }
582
583 // Write-forward: create subscription record in user's repo using their access token
584 // The collection parameter refers to the record type in the repository
585 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", "", subRecord, userAccessToken)
586 if err != nil {
587 return nil, fmt.Errorf("failed to create subscription on PDS: %w", err)
588 }
589
590 // Return subscription representation
591 subscription := &Subscription{
592 UserDID: userDID,
593 CommunityDID: communityDID,
594 ContentVisibility: contentVisibility,
595 SubscribedAt: time.Now(),
596 RecordURI: recordURI,
597 RecordCID: recordCID,
598 }
599
600 return subscription, nil
601}
602
603// UnsubscribeFromCommunity removes a subscription via PDS delete
604func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error {
605 if userDID == "" {
606 return NewValidationError("userDid", "required")
607 }
608 if userAccessToken == "" {
609 return NewValidationError("userAccessToken", "required")
610 }
611
612 // Resolve community identifier
613 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
614 if err != nil {
615 return err
616 }
617
618 // Get the subscription from AppView to find the record key
619 subscription, err := s.repo.GetSubscription(ctx, userDID, communityDID)
620 if err != nil {
621 return err
622 }
623
624 // Extract rkey from record URI (at://did/collection/rkey)
625 rkey := utils.ExtractRKeyFromURI(subscription.RecordURI)
626 if rkey == "" {
627 return fmt.Errorf("invalid subscription record URI")
628 }
629
630 // Write-forward: delete record from PDS using user's access token
631 // CRITICAL: Delete from social.coves.community.subscription (RECORD TYPE), not social.coves.community.unsubscribe
632 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", rkey, userAccessToken); err != nil {
633 return fmt.Errorf("failed to delete subscription on PDS: %w", err)
634 }
635
636 return nil
637}
638
639// GetUserSubscriptions queries AppView DB for user's subscriptions
640func (s *communityService) GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) {
641 if limit <= 0 || limit > 100 {
642 limit = 50
643 }
644
645 return s.repo.ListSubscriptions(ctx, userDID, limit, offset)
646}
647
648// GetCommunitySubscribers queries AppView DB for community subscribers
649func (s *communityService) GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) {
650 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
651 if err != nil {
652 return nil, err
653 }
654
655 if limit <= 0 || limit > 100 {
656 limit = 50
657 }
658
659 return s.repo.ListSubscribers(ctx, communityDID, limit, offset)
660}
661
662// GetMembership retrieves membership info from AppView DB
663func (s *communityService) GetMembership(ctx context.Context, userDID, communityIdentifier string) (*Membership, error) {
664 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
665 if err != nil {
666 return nil, err
667 }
668
669 return s.repo.GetMembership(ctx, userDID, communityDID)
670}
671
672// ListCommunityMembers queries AppView DB for members
673func (s *communityService) ListCommunityMembers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Membership, error) {
674 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
675 if err != nil {
676 return nil, err
677 }
678
679 if limit <= 0 || limit > 100 {
680 limit = 50
681 }
682
683 return s.repo.ListMembers(ctx, communityDID, limit, offset)
684}
685
686// BlockCommunity blocks a community via write-forward to PDS
687func (s *communityService) BlockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) (*CommunityBlock, error) {
688 if userDID == "" {
689 return nil, NewValidationError("userDid", "required")
690 }
691 if userAccessToken == "" {
692 return nil, NewValidationError("userAccessToken", "required")
693 }
694
695 // Resolve community identifier (also verifies community exists)
696 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
697 if err != nil {
698 return nil, err
699 }
700
701 // Build block record
702 // CRITICAL: Collection is social.coves.community.block (RECORD TYPE)
703 // This record will be created in the USER's repository: at://user_did/social.coves.community.block/{tid}
704 // Following atProto conventions and Bluesky's app.bsky.graph.block pattern
705 blockRecord := map[string]interface{}{
706 "$type": "social.coves.community.block",
707 "subject": communityDID, // DID of community being blocked
708 "createdAt": time.Now().Format(time.RFC3339),
709 }
710
711 // Write-forward: create block record in user's repo using their access token
712 // Note: We don't check for existing blocks first because:
713 // 1. The PDS may reject duplicates (depending on implementation)
714 // 2. The repository layer handles idempotency with ON CONFLICT DO NOTHING
715 // 3. This avoids a race condition where two concurrent requests both pass the check
716 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.block", "", blockRecord, userAccessToken)
717 if err != nil {
718 // Check if this is a duplicate/conflict error from PDS
719 // PDS should return 409 Conflict for duplicate records, but we also check common error messages
720 // for compatibility with different PDS implementations
721 errMsg := err.Error()
722 isDuplicate := strings.Contains(errMsg, "status 409") || // HTTP 409 Conflict
723 strings.Contains(errMsg, "duplicate") ||
724 strings.Contains(errMsg, "already exists") ||
725 strings.Contains(errMsg, "AlreadyExists")
726
727 if isDuplicate {
728 // Fetch and return existing block from our indexed view
729 existingBlock, getErr := s.repo.GetBlock(ctx, userDID, communityDID)
730 if getErr == nil {
731 // Block exists in our index - return it
732 return existingBlock, nil
733 }
734 // Only treat as "already exists" if the error is ErrBlockNotFound (race condition)
735 // Any other error (DB outage, connection failure, etc.) should bubble up
736 if errors.Is(getErr, ErrBlockNotFound) {
737 // Race condition: PDS has the block but Jetstream hasn't indexed it yet
738 // Return typed conflict error so handler can return 409 instead of 500
739 // This is normal in eventually-consistent systems
740 return nil, ErrBlockAlreadyExists
741 }
742 // Real datastore error - bubble it up so operators see the failure
743 return nil, fmt.Errorf("PDS reported duplicate block but failed to fetch from index: %w", getErr)
744 }
745 return nil, fmt.Errorf("failed to create block on PDS: %w", err)
746 }
747
748 // Return block representation
749 block := &CommunityBlock{
750 UserDID: userDID,
751 CommunityDID: communityDID,
752 BlockedAt: time.Now(),
753 RecordURI: recordURI,
754 RecordCID: recordCID,
755 }
756
757 return block, nil
758}
759
760// UnblockCommunity removes a block via PDS delete
761func (s *communityService) UnblockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error {
762 if userDID == "" {
763 return NewValidationError("userDid", "required")
764 }
765 if userAccessToken == "" {
766 return NewValidationError("userAccessToken", "required")
767 }
768
769 // Resolve community identifier
770 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
771 if err != nil {
772 return err
773 }
774
775 // Get the block from AppView to find the record key
776 block, err := s.repo.GetBlock(ctx, userDID, communityDID)
777 if err != nil {
778 return err
779 }
780
781 // Extract rkey from record URI (at://did/collection/rkey)
782 rkey := utils.ExtractRKeyFromURI(block.RecordURI)
783 if rkey == "" {
784 return fmt.Errorf("invalid block record URI")
785 }
786
787 // Write-forward: delete record from PDS using user's access token
788 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.block", rkey, userAccessToken); err != nil {
789 return fmt.Errorf("failed to delete block on PDS: %w", err)
790 }
791
792 return nil
793}
794
795// GetBlockedCommunities queries AppView DB for user's blocks
796func (s *communityService) GetBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*CommunityBlock, error) {
797 if limit <= 0 || limit > 100 {
798 limit = 50
799 }
800
801 return s.repo.ListBlockedCommunities(ctx, userDID, limit, offset)
802}
803
804// IsBlocked checks if a user has blocked a community
805func (s *communityService) IsBlocked(ctx context.Context, userDID, communityIdentifier string) (bool, error) {
806 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
807 if err != nil {
808 return false, err
809 }
810
811 return s.repo.IsBlocked(ctx, userDID, communityDID)
812}
813
814// ValidateHandle checks if a community handle is valid
815func (s *communityService) ValidateHandle(handle string) error {
816 if handle == "" {
817 return NewValidationError("handle", "required")
818 }
819
820 if !communityHandleRegex.MatchString(handle) {
821 return ErrInvalidHandle
822 }
823
824 return nil
825}
826
827// ResolveCommunityIdentifier converts a handle or DID to a DID
828func (s *communityService) ResolveCommunityIdentifier(ctx context.Context, identifier string) (string, error) {
829 if identifier == "" {
830 return "", ErrInvalidInput
831 }
832
833 // If it's already a DID, verify the community exists
834 if strings.HasPrefix(identifier, "did:") {
835 _, err := s.repo.GetByDID(ctx, identifier)
836 if err != nil {
837 if IsNotFound(err) {
838 return "", fmt.Errorf("community not found: %w", err)
839 }
840 return "", fmt.Errorf("failed to verify community DID: %w", err)
841 }
842 return identifier, nil
843 }
844
845 // If it's a handle, look it up in AppView DB
846 if strings.HasPrefix(identifier, "!") {
847 community, err := s.repo.GetByHandle(ctx, identifier)
848 if err != nil {
849 return "", err
850 }
851 return community.DID, nil
852 }
853
854 return "", NewValidationError("identifier", "must be a DID or handle")
855}
856
857// Validation helpers
858
859func (s *communityService) validateCreateRequest(req CreateCommunityRequest) error {
860 if req.Name == "" {
861 return NewValidationError("name", "required")
862 }
863
864 // DNS label limit: 63 characters per label
865 // Community handle format: {name}.communities.{instanceDomain}
866 // The first label is just req.Name, so it must be <= 63 chars
867 if len(req.Name) > 63 {
868 return NewValidationError("name", "must be 63 characters or less (DNS label limit)")
869 }
870
871 // Name can only contain alphanumeric and hyphens
872 // Must start and end with alphanumeric (not hyphen)
873 nameRegex := regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
874 if !nameRegex.MatchString(req.Name) {
875 return NewValidationError("name", "must contain only alphanumeric characters and hyphens")
876 }
877
878 if req.Description != "" && len(req.Description) > 3000 {
879 return NewValidationError("description", "must be 3000 characters or less")
880 }
881
882 // Visibility should already be set with default in CreateCommunity
883 if req.Visibility != "public" && req.Visibility != "unlisted" && req.Visibility != "private" {
884 return ErrInvalidVisibility
885 }
886
887 if req.CreatedByDID == "" {
888 return NewValidationError("createdByDid", "required")
889 }
890
891 // hostedByDID is auto-populated by the service layer, no validation needed
892 // The handler ensures clients cannot provide this field
893
894 return nil
895}
896
897// PDS write-forward helpers
898
899// createRecordOnPDSAs creates a record with a specific access token (for V2 community auth)
900func (s *communityService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) {
901 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/"))
902
903 payload := map[string]interface{}{
904 "repo": repoDID,
905 "collection": collection,
906 "record": record,
907 }
908
909 if rkey != "" {
910 payload["rkey"] = rkey
911 }
912
913 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
914}
915
916// putRecordOnPDSAs updates a record with a specific access token (for V2 community auth)
917func (s *communityService) putRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) {
918 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.putRecord", strings.TrimSuffix(s.pdsURL, "/"))
919
920 payload := map[string]interface{}{
921 "repo": repoDID,
922 "collection": collection,
923 "rkey": rkey,
924 "record": record,
925 }
926
927 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
928}
929
930// deleteRecordOnPDSAs deletes a record with a specific access token (for user-scoped deletions)
931func (s *communityService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey, accessToken string) error {
932 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/"))
933
934 payload := map[string]interface{}{
935 "repo": repoDID,
936 "collection": collection,
937 "rkey": rkey,
938 }
939
940 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
941 return err
942}
943
944// callPDSWithAuth makes a PDS call with a specific access token (V2: for community authentication)
945func (s *communityService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) {
946 jsonData, err := json.Marshal(payload)
947 if err != nil {
948 return "", "", fmt.Errorf("failed to marshal payload: %w", err)
949 }
950
951 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData))
952 if err != nil {
953 return "", "", fmt.Errorf("failed to create request: %w", err)
954 }
955 req.Header.Set("Content-Type", "application/json")
956
957 // Add authentication with provided access token
958 if accessToken != "" {
959 req.Header.Set("Authorization", "Bearer "+accessToken)
960 }
961
962 // Dynamic timeout based on operation type
963 // Write operations (createAccount, createRecord, putRecord) are slower due to:
964 // - Keypair generation
965 // - DID PLC registration
966 // - Database writes on PDS
967 timeout := 10 * time.Second // Default for read operations
968 if strings.Contains(endpoint, "createAccount") ||
969 strings.Contains(endpoint, "createRecord") ||
970 strings.Contains(endpoint, "putRecord") {
971 timeout = 30 * time.Second // Extended timeout for write operations
972 }
973
974 client := &http.Client{Timeout: timeout}
975 resp, err := client.Do(req)
976 if err != nil {
977 return "", "", fmt.Errorf("failed to call PDS: %w", err)
978 }
979 defer func() {
980 if closeErr := resp.Body.Close(); closeErr != nil {
981 log.Printf("Failed to close response body: %v", closeErr)
982 }
983 }()
984
985 body, err := io.ReadAll(resp.Body)
986 if err != nil {
987 return "", "", fmt.Errorf("failed to read response: %w", err)
988 }
989
990 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
991 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
992 }
993
994 // Parse response to extract URI and CID
995 var result struct {
996 URI string `json:"uri"`
997 CID string `json:"cid"`
998 }
999 if err := json.Unmarshal(body, &result); err != nil {
1000 // For delete operations, there might not be a response body
1001 if method == "POST" && strings.Contains(endpoint, "deleteRecord") {
1002 return "", "", nil
1003 }
1004 return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
1005 }
1006
1007 return result.URI, result.CID, nil
1008}
1009
1010// Helper functions