A community based topic aggregation platform built on atproto
1package communities
2
3import (
4 "Coves/internal/atproto/utils"
5 "bytes"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log"
12 "net/http"
13 "regexp"
14 "strings"
15 "sync"
16 "time"
17)
18
19// Community handle validation regex (DNS-valid handle: name.community.instance.com)
20// Matches standard DNS hostname format (RFC 1035)
21var communityHandleRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
22
23// DNS label validation (RFC 1035: 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen)
24var dnsLabelRegex = regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
25
26// Domain validation (simplified - checks for valid DNS hostname structure)
27var domainRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)*[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
28
29type communityService struct {
30 // Interfaces and pointers first (better alignment)
31 repo Repository
32 provisioner *PDSAccountProvisioner
33
34 // Token refresh concurrency control
35 // Each community gets its own mutex to prevent concurrent refresh attempts
36 refreshMutexes map[string]*sync.Mutex
37
38 // Strings
39 pdsURL string
40 instanceDID string
41 instanceDomain string
42 pdsAccessToken string
43
44 // Sync primitives last
45 mapMutex sync.RWMutex // Protects refreshMutexes map itself
46}
47
48const (
49 // Maximum recommended size for mutex cache (warning threshold, not hard limit)
50 // At 10,000 entries × 16 bytes = ~160KB memory (negligible overhead)
51 // Map can grow larger in production - even 100,000 entries = 1.6MB is acceptable
52 maxMutexCacheSize = 10000
53)
54
55// NewCommunityService creates a new community service
56func NewCommunityService(repo Repository, pdsURL, instanceDID, instanceDomain string, provisioner *PDSAccountProvisioner) Service {
57 // SECURITY: Basic validation that did:web domain matches configured instanceDomain
58 // This catches honest configuration mistakes but NOT malicious code modifications
59 // Full verification (Phase 2) requires fetching DID document from domain
60 // See: docs/PRD_BACKLOG.md - "did:web Domain Verification"
61 if strings.HasPrefix(instanceDID, "did:web:") {
62 didDomain := strings.TrimPrefix(instanceDID, "did:web:")
63 if didDomain != instanceDomain {
64 log.Printf("⚠️ SECURITY WARNING: Instance DID domain (%s) doesn't match configured domain (%s)",
65 didDomain, instanceDomain)
66 log.Printf(" This could indicate a configuration error or potential domain spoofing attempt")
67 log.Printf(" Communities will be hosted by: %s", instanceDID)
68 }
69 }
70
71 return &communityService{
72 repo: repo,
73 pdsURL: pdsURL,
74 instanceDID: instanceDID,
75 instanceDomain: instanceDomain,
76 provisioner: provisioner,
77 refreshMutexes: make(map[string]*sync.Mutex),
78 }
79}
80
81// SetPDSAccessToken sets the PDS access token for authentication
82// This should be called after creating a session for the Coves instance DID on the PDS
83func (s *communityService) SetPDSAccessToken(token string) {
84 s.pdsAccessToken = token
85}
86
87// CreateCommunity creates a new community via write-forward to PDS
88// V2 Flow:
89// 1. Service creates PDS account for community (PDS generates signing keypair)
90// 2. Service writes community profile to COMMUNITY's own repository
91// 3. Firehose emits event
92// 4. Consumer indexes to AppView DB
93//
94// V2 Architecture:
95// - Community owns its own repository (at://community_did/social.coves.community.profile/self)
96// - PDS manages the signing keypair (we never see it)
97// - We store PDS credentials to act on behalf of the community
98// - Community can migrate to other instances (future V2.1 with rotation keys)
99func (s *communityService) CreateCommunity(ctx context.Context, req CreateCommunityRequest) (*Community, error) {
100 // Apply defaults before validation
101 if req.Visibility == "" {
102 req.Visibility = "public"
103 }
104
105 // SECURITY: Auto-populate hostedByDID from instance configuration
106 // Clients MUST NOT provide this field - it's derived from the instance receiving the request
107 // This prevents malicious instances from claiming to host communities for domains they don't own
108 req.HostedByDID = s.instanceDID
109
110 // Validate request
111 if err := s.validateCreateRequest(req); err != nil {
112 return nil, err
113 }
114
115 // V2: Provision a real PDS account for this community
116 // This calls com.atproto.server.createAccount internally
117 // The PDS will:
118 // 1. Generate a signing keypair (stored in PDS, we never see it)
119 // 2. Create a DID (did:plc:xxx)
120 // 3. Return credentials (DID, tokens)
121 pdsAccount, err := s.provisioner.ProvisionCommunityAccount(ctx, req.Name)
122 if err != nil {
123 return nil, fmt.Errorf("failed to provision PDS account for community: %w", err)
124 }
125
126 // Validate the atProto handle
127 if validateErr := s.ValidateHandle(pdsAccount.Handle); validateErr != nil {
128 return nil, fmt.Errorf("generated atProto handle is invalid: %w", validateErr)
129 }
130
131 // Build community profile record
132 profile := map[string]interface{}{
133 "$type": "social.coves.community.profile",
134 "handle": pdsAccount.Handle, // atProto handle (e.g., gaming.community.coves.social)
135 "name": req.Name, // Short name for !mentions (e.g., "gaming")
136 "visibility": req.Visibility,
137 "hostedBy": s.instanceDID, // V2: Instance hosts, community owns
138 "createdBy": req.CreatedByDID,
139 "createdAt": time.Now().Format(time.RFC3339),
140 "federation": map[string]interface{}{
141 "allowExternalDiscovery": req.AllowExternalDiscovery,
142 },
143 }
144
145 // Add optional fields
146 if req.DisplayName != "" {
147 profile["displayName"] = req.DisplayName
148 }
149 if req.Description != "" {
150 profile["description"] = req.Description
151 }
152 if len(req.Rules) > 0 {
153 profile["rules"] = req.Rules
154 }
155 if len(req.Categories) > 0 {
156 profile["categories"] = req.Categories
157 }
158 if req.Language != "" {
159 profile["language"] = req.Language
160 }
161
162 // Initialize counts
163 profile["memberCount"] = 0
164 profile["subscriberCount"] = 0
165
166 // TODO: Handle avatar and banner blobs
167 // For now, we'll skip blob uploads. This would require:
168 // 1. Upload blob to PDS via com.atproto.repo.uploadBlob
169 // 2. Get blob ref (CID)
170 // 3. Add to profile record
171
172 // V2: Write to COMMUNITY's own repository (not instance repo!)
173 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self
174 // Authenticate using community's access token
175 recordURI, recordCID, err := s.createRecordOnPDSAs(
176 ctx,
177 pdsAccount.DID, // repo = community's DID (community owns its repo!)
178 "social.coves.community.profile",
179 "self", // canonical rkey for profile
180 profile,
181 pdsAccount.AccessToken, // authenticate as the community
182 )
183 if err != nil {
184 return nil, fmt.Errorf("failed to create community profile record: %w", err)
185 }
186
187 // Build Community object with PDS credentials AND cryptographic keys
188 community := &Community{
189 DID: pdsAccount.DID, // Community's DID (owns the repo!)
190 Handle: pdsAccount.Handle, // atProto handle (e.g., gaming.community.coves.social)
191 Name: req.Name,
192 DisplayName: req.DisplayName,
193 Description: req.Description,
194 OwnerDID: pdsAccount.DID, // V2: Community owns itself
195 CreatedByDID: req.CreatedByDID,
196 HostedByDID: req.HostedByDID,
197 PDSEmail: pdsAccount.Email,
198 PDSPassword: pdsAccount.Password,
199 PDSAccessToken: pdsAccount.AccessToken,
200 PDSRefreshToken: pdsAccount.RefreshToken,
201 PDSURL: pdsAccount.PDSURL,
202 Visibility: req.Visibility,
203 AllowExternalDiscovery: req.AllowExternalDiscovery,
204 MemberCount: 0,
205 SubscriberCount: 0,
206 CreatedAt: time.Now(),
207 UpdatedAt: time.Now(),
208 RecordURI: recordURI,
209 RecordCID: recordCID,
210 // V2: Cryptographic keys for portability (will be encrypted by repository)
211 RotationKeyPEM: pdsAccount.RotationKeyPEM, // CRITICAL: Enables DID migration
212 SigningKeyPEM: pdsAccount.SigningKeyPEM, // For atproto operations
213 }
214
215 // CRITICAL: Persist PDS credentials immediately to database
216 // The Jetstream consumer will eventually index the community profile from the firehose,
217 // but it won't have the PDS credentials. We must store them now so we can:
218 // 1. Update the community profile later (using its own credentials)
219 // 2. Re-authenticate if access tokens expire
220 _, err = s.repo.Create(ctx, community)
221 if err != nil {
222 return nil, fmt.Errorf("failed to persist community with credentials: %w", err)
223 }
224
225 return community, nil
226}
227
228// GetCommunity retrieves a community from AppView DB
229// identifier can be either a DID or handle
230func (s *communityService) GetCommunity(ctx context.Context, identifier string) (*Community, error) {
231 if identifier == "" {
232 return nil, ErrInvalidInput
233 }
234
235 // Determine if identifier is DID or handle
236 if strings.HasPrefix(identifier, "did:") {
237 return s.repo.GetByDID(ctx, identifier)
238 }
239
240 if strings.HasPrefix(identifier, "!") {
241 return s.repo.GetByHandle(ctx, identifier)
242 }
243
244 return nil, NewValidationError("identifier", "must be a DID or handle")
245}
246
247// UpdateCommunity updates a community via write-forward to PDS
248func (s *communityService) UpdateCommunity(ctx context.Context, req UpdateCommunityRequest) (*Community, error) {
249 if req.CommunityDID == "" {
250 return nil, NewValidationError("communityDid", "required")
251 }
252
253 if req.UpdatedByDID == "" {
254 return nil, NewValidationError("updatedByDid", "required")
255 }
256
257 // Get existing community
258 existing, err := s.repo.GetByDID(ctx, req.CommunityDID)
259 if err != nil {
260 return nil, err
261 }
262
263 // CRITICAL: Ensure fresh PDS access token before write operation
264 // Community PDS tokens expire every ~2 hours and must be refreshed
265 existing, err = s.ensureFreshToken(ctx, existing)
266 if err != nil {
267 return nil, fmt.Errorf("failed to ensure fresh credentials: %w", err)
268 }
269
270 // Authorization: verify user is the creator
271 // TODO(Communities-Auth): Add moderator check when moderation system is implemented
272 if existing.CreatedByDID != req.UpdatedByDID {
273 return nil, ErrUnauthorized
274 }
275
276 // Build updated profile record (start with existing)
277 profile := map[string]interface{}{
278 "$type": "social.coves.community.profile",
279 "handle": existing.Handle,
280 "name": existing.Name,
281 "owner": existing.OwnerDID,
282 "createdBy": existing.CreatedByDID,
283 "hostedBy": existing.HostedByDID,
284 "createdAt": existing.CreatedAt.Format(time.RFC3339),
285 }
286
287 // Apply updates
288 if req.DisplayName != nil {
289 profile["displayName"] = *req.DisplayName
290 } else {
291 profile["displayName"] = existing.DisplayName
292 }
293
294 if req.Description != nil {
295 profile["description"] = *req.Description
296 } else {
297 profile["description"] = existing.Description
298 }
299
300 if req.Visibility != nil {
301 profile["visibility"] = *req.Visibility
302 } else {
303 profile["visibility"] = existing.Visibility
304 }
305
306 if req.AllowExternalDiscovery != nil {
307 profile["federation"] = map[string]interface{}{
308 "allowExternalDiscovery": *req.AllowExternalDiscovery,
309 }
310 } else {
311 profile["federation"] = map[string]interface{}{
312 "allowExternalDiscovery": existing.AllowExternalDiscovery,
313 }
314 }
315
316 // Preserve moderation settings (even if empty)
317 // These fields are optional but should not be erased on update
318 if req.ModerationType != nil {
319 profile["moderationType"] = *req.ModerationType
320 } else if existing.ModerationType != "" {
321 profile["moderationType"] = existing.ModerationType
322 }
323
324 if len(req.ContentWarnings) > 0 {
325 profile["contentWarnings"] = req.ContentWarnings
326 } else if len(existing.ContentWarnings) > 0 {
327 profile["contentWarnings"] = existing.ContentWarnings
328 }
329
330 // Preserve counts
331 profile["memberCount"] = existing.MemberCount
332 profile["subscriberCount"] = existing.SubscriberCount
333
334 // V2: Community profiles always use "self" as rkey
335 // (No need to extract from URI - it's always "self" for V2 communities)
336
337 // V2 CRITICAL FIX: Write-forward using COMMUNITY's own DID and credentials
338 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self
339 // Authenticate as the community (not as instance!)
340 if existing.PDSAccessToken == "" {
341 return nil, fmt.Errorf("community %s missing PDS credentials - cannot update", existing.DID)
342 }
343
344 recordURI, recordCID, err := s.putRecordOnPDSAs(
345 ctx,
346 existing.DID, // repo = community's own DID (V2!)
347 "social.coves.community.profile",
348 "self", // V2: always "self"
349 profile,
350 existing.PDSAccessToken, // authenticate as the community
351 )
352 if err != nil {
353 return nil, fmt.Errorf("failed to update community on PDS: %w", err)
354 }
355
356 // Return updated community representation
357 // Actual AppView DB update happens via Jetstream consumer
358 updated := *existing
359 if req.DisplayName != nil {
360 updated.DisplayName = *req.DisplayName
361 }
362 if req.Description != nil {
363 updated.Description = *req.Description
364 }
365 if req.Visibility != nil {
366 updated.Visibility = *req.Visibility
367 }
368 if req.AllowExternalDiscovery != nil {
369 updated.AllowExternalDiscovery = *req.AllowExternalDiscovery
370 }
371 if req.ModerationType != nil {
372 updated.ModerationType = *req.ModerationType
373 }
374 if len(req.ContentWarnings) > 0 {
375 updated.ContentWarnings = req.ContentWarnings
376 }
377 updated.RecordURI = recordURI
378 updated.RecordCID = recordCID
379 updated.UpdatedAt = time.Now()
380
381 return &updated, nil
382}
383
384// getOrCreateRefreshMutex returns a mutex for the given community DID
385// Thread-safe with read-lock fast path for existing entries
386// SAFETY: Does NOT evict entries to avoid race condition where:
387// 1. Thread A holds mutex for community-123
388// 2. Thread B evicts community-123 from map
389// 3. Thread C creates NEW mutex for community-123
390// 4. Now two threads can refresh community-123 concurrently (mutex defeated!)
391func (s *communityService) getOrCreateRefreshMutex(did string) *sync.Mutex {
392 // Fast path: check if mutex already exists (read lock)
393 s.mapMutex.RLock()
394 mutex, exists := s.refreshMutexes[did]
395 s.mapMutex.RUnlock()
396
397 if exists {
398 return mutex
399 }
400
401 // Slow path: create new mutex (write lock)
402 s.mapMutex.Lock()
403 defer s.mapMutex.Unlock()
404
405 // Double-check after acquiring write lock (another goroutine might have created it)
406 mutex, exists = s.refreshMutexes[did]
407 if exists {
408 return mutex
409 }
410
411 // Create new mutex
412 mutex = &sync.Mutex{}
413 s.refreshMutexes[did] = mutex
414
415 // SAFETY: No eviction to prevent race condition
416 // Map will grow beyond maxMutexCacheSize but this is safer than evicting in-use mutexes
417 if len(s.refreshMutexes) > maxMutexCacheSize {
418 memoryKB := len(s.refreshMutexes) * 16 / 1024
419 log.Printf("[TOKEN-REFRESH] WARN: Mutex cache size (%d) exceeds recommended limit (%d) - this is safe but may indicate high community churn. Memory usage: ~%d KB",
420 len(s.refreshMutexes), maxMutexCacheSize, memoryKB)
421 }
422
423 return mutex
424}
425
426// ensureFreshToken checks if a community's access token needs refresh and updates if needed
427// Returns updated community with fresh credentials (or original if no refresh needed)
428// Thread-safe: Uses per-community mutex to prevent concurrent refresh attempts
429func (s *communityService) ensureFreshToken(ctx context.Context, community *Community) (*Community, error) {
430 // Get or create mutex for this specific community DID
431 mutex := s.getOrCreateRefreshMutex(community.DID)
432
433 // Lock for this specific community (allows other communities to refresh concurrently)
434 mutex.Lock()
435 defer mutex.Unlock()
436
437 // Re-fetch community from DB (another goroutine might have already refreshed it)
438 fresh, err := s.repo.GetByDID(ctx, community.DID)
439 if err != nil {
440 return nil, fmt.Errorf("failed to re-fetch community: %w", err)
441 }
442
443 // Check if token needs refresh (5-minute buffer before expiration)
444 needsRefresh, err := NeedsRefresh(fresh.PDSAccessToken)
445 if err != nil {
446 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_parse_failed, Error: %v", fresh.DID, err)
447 return nil, fmt.Errorf("failed to check token expiration: %w", err)
448 }
449
450 if !needsRefresh {
451 // Token still valid, no refresh needed
452 return fresh, nil
453 }
454
455 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refresh_started, Message: Access token expiring soon", fresh.DID)
456
457 // Attempt token refresh using refresh token
458 newAccessToken, newRefreshToken, err := refreshPDSToken(ctx, fresh.PDSURL, fresh.PDSAccessToken, fresh.PDSRefreshToken)
459 if err != nil {
460 // Check if refresh token expired (need password fallback)
461 if strings.Contains(err.Error(), "expired or invalid") {
462 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_token_expired, Message: Re-authenticating with password", fresh.DID)
463
464 // Fallback: Re-authenticate with stored password
465 newAccessToken, newRefreshToken, err = reauthenticateWithPassword(
466 ctx,
467 fresh.PDSURL,
468 fresh.PDSEmail,
469 fresh.PDSPassword, // Retrieved decrypted from DB
470 )
471 if err != nil {
472 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_auth_failed, Error: %v", fresh.DID, err)
473 return nil, fmt.Errorf("failed to re-authenticate community: %w", err)
474 }
475
476 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_fallback_success, Message: Re-authenticated after refresh token expiry", fresh.DID)
477 } else {
478 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_failed, Error: %v", fresh.DID, err)
479 return nil, fmt.Errorf("failed to refresh token: %w", err)
480 }
481 }
482
483 // CRITICAL: Update database with new tokens immediately
484 // Refresh tokens are SINGLE-USE - old one is now invalid
485 // Use retry logic to handle transient DB failures
486 const maxRetries = 3
487 var updateErr error
488 for attempt := 0; attempt < maxRetries; attempt++ {
489 updateErr = s.repo.UpdateCredentials(ctx, fresh.DID, newAccessToken, newRefreshToken)
490 if updateErr == nil {
491 break // Success
492 }
493
494 log.Printf("[TOKEN-REFRESH] Community: %s, Event: db_update_retry, Attempt: %d/%d, Error: %v",
495 fresh.DID, attempt+1, maxRetries, updateErr)
496
497 if attempt < maxRetries-1 {
498 // Exponential backoff: 100ms, 200ms, 400ms
499 backoff := time.Duration(1<<attempt) * 100 * time.Millisecond
500 time.Sleep(backoff)
501 }
502 }
503
504 if updateErr != nil {
505 // CRITICAL: Community is now locked out - old refresh token invalid, new one not saved
506 log.Printf("[TOKEN-REFRESH] CRITICAL: Community %s LOCKED OUT - failed to persist credentials after %d retries: %v",
507 fresh.DID, maxRetries, updateErr)
508 // TODO: Send alert to monitoring system (add in Beta)
509 return nil, fmt.Errorf("failed to persist refreshed credentials after %d retries (COMMUNITY LOCKED OUT): %w",
510 maxRetries, updateErr)
511 }
512
513 // Return updated community object with fresh tokens
514 updatedCommunity := *fresh
515 updatedCommunity.PDSAccessToken = newAccessToken
516 updatedCommunity.PDSRefreshToken = newRefreshToken
517
518 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refreshed, Message: Access token refreshed successfully", fresh.DID)
519
520 return &updatedCommunity, nil
521}
522
523// ListCommunities queries AppView DB for communities with filters
524func (s *communityService) ListCommunities(ctx context.Context, req ListCommunitiesRequest) ([]*Community, int, error) {
525 // Set defaults
526 if req.Limit <= 0 || req.Limit > 100 {
527 req.Limit = 50
528 }
529
530 return s.repo.List(ctx, req)
531}
532
533// SearchCommunities performs fuzzy search in AppView DB
534func (s *communityService) SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error) {
535 if req.Query == "" {
536 return nil, 0, NewValidationError("query", "search query is required")
537 }
538
539 // Set defaults
540 if req.Limit <= 0 || req.Limit > 100 {
541 req.Limit = 50
542 }
543
544 return s.repo.Search(ctx, req)
545}
546
547// SubscribeToCommunity creates a subscription via write-forward to PDS
548func (s *communityService) SubscribeToCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string, contentVisibility int) (*Subscription, error) {
549 if userDID == "" {
550 return nil, NewValidationError("userDid", "required")
551 }
552 if userAccessToken == "" {
553 return nil, NewValidationError("userAccessToken", "required")
554 }
555
556 // Clamp contentVisibility to valid range (1-5), default to 3 if 0 or invalid
557 if contentVisibility <= 0 || contentVisibility > 5 {
558 contentVisibility = 3
559 }
560
561 // Resolve community identifier to DID
562 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
563 if err != nil {
564 return nil, err
565 }
566
567 // Verify community exists
568 community, err := s.repo.GetByDID(ctx, communityDID)
569 if err != nil {
570 return nil, err
571 }
572
573 // Check visibility - can't subscribe to private communities without invitation (TODO)
574 if community.Visibility == "private" {
575 return nil, ErrUnauthorized
576 }
577
578 // Build subscription record
579 // CRITICAL: Collection is social.coves.community.subscription (RECORD TYPE), not social.coves.community.subscribe (XRPC procedure)
580 // This record will be created in the USER's repository: at://user_did/social.coves.community.subscription/{tid}
581 // Following atProto conventions, we use "subject" field to reference the community
582 subRecord := map[string]interface{}{
583 "$type": "social.coves.community.subscription",
584 "subject": communityDID, // atProto convention: "subject" for entity references
585 "createdAt": time.Now().Format(time.RFC3339),
586 "contentVisibility": contentVisibility,
587 }
588
589 // Write-forward: create subscription record in user's repo using their access token
590 // The collection parameter refers to the record type in the repository
591 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", "", subRecord, userAccessToken)
592 if err != nil {
593 return nil, fmt.Errorf("failed to create subscription on PDS: %w", err)
594 }
595
596 // Return subscription representation
597 subscription := &Subscription{
598 UserDID: userDID,
599 CommunityDID: communityDID,
600 ContentVisibility: contentVisibility,
601 SubscribedAt: time.Now(),
602 RecordURI: recordURI,
603 RecordCID: recordCID,
604 }
605
606 return subscription, nil
607}
608
609// UnsubscribeFromCommunity removes a subscription via PDS delete
610func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error {
611 if userDID == "" {
612 return NewValidationError("userDid", "required")
613 }
614 if userAccessToken == "" {
615 return NewValidationError("userAccessToken", "required")
616 }
617
618 // Resolve community identifier
619 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
620 if err != nil {
621 return err
622 }
623
624 // Get the subscription from AppView to find the record key
625 subscription, err := s.repo.GetSubscription(ctx, userDID, communityDID)
626 if err != nil {
627 return err
628 }
629
630 // Extract rkey from record URI (at://did/collection/rkey)
631 rkey := utils.ExtractRKeyFromURI(subscription.RecordURI)
632 if rkey == "" {
633 return fmt.Errorf("invalid subscription record URI")
634 }
635
636 // Write-forward: delete record from PDS using user's access token
637 // CRITICAL: Delete from social.coves.community.subscription (RECORD TYPE), not social.coves.community.unsubscribe
638 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", rkey, userAccessToken); err != nil {
639 return fmt.Errorf("failed to delete subscription on PDS: %w", err)
640 }
641
642 return nil
643}
644
645// GetUserSubscriptions queries AppView DB for user's subscriptions
646func (s *communityService) GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) {
647 if limit <= 0 || limit > 100 {
648 limit = 50
649 }
650
651 return s.repo.ListSubscriptions(ctx, userDID, limit, offset)
652}
653
654// GetCommunitySubscribers queries AppView DB for community subscribers
655func (s *communityService) GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) {
656 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
657 if err != nil {
658 return nil, err
659 }
660
661 if limit <= 0 || limit > 100 {
662 limit = 50
663 }
664
665 return s.repo.ListSubscribers(ctx, communityDID, limit, offset)
666}
667
668// GetMembership retrieves membership info from AppView DB
669func (s *communityService) GetMembership(ctx context.Context, userDID, communityIdentifier string) (*Membership, error) {
670 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
671 if err != nil {
672 return nil, err
673 }
674
675 return s.repo.GetMembership(ctx, userDID, communityDID)
676}
677
678// ListCommunityMembers queries AppView DB for members
679func (s *communityService) ListCommunityMembers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Membership, error) {
680 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
681 if err != nil {
682 return nil, err
683 }
684
685 if limit <= 0 || limit > 100 {
686 limit = 50
687 }
688
689 return s.repo.ListMembers(ctx, communityDID, limit, offset)
690}
691
692// BlockCommunity blocks a community via write-forward to PDS
693func (s *communityService) BlockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) (*CommunityBlock, error) {
694 if userDID == "" {
695 return nil, NewValidationError("userDid", "required")
696 }
697 if userAccessToken == "" {
698 return nil, NewValidationError("userAccessToken", "required")
699 }
700
701 // Resolve community identifier (also verifies community exists)
702 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
703 if err != nil {
704 return nil, err
705 }
706
707 // Build block record
708 // CRITICAL: Collection is social.coves.community.block (RECORD TYPE)
709 // This record will be created in the USER's repository: at://user_did/social.coves.community.block/{tid}
710 // Following atProto conventions and Bluesky's app.bsky.graph.block pattern
711 blockRecord := map[string]interface{}{
712 "$type": "social.coves.community.block",
713 "subject": communityDID, // DID of community being blocked
714 "createdAt": time.Now().Format(time.RFC3339),
715 }
716
717 // Write-forward: create block record in user's repo using their access token
718 // Note: We don't check for existing blocks first because:
719 // 1. The PDS may reject duplicates (depending on implementation)
720 // 2. The repository layer handles idempotency with ON CONFLICT DO NOTHING
721 // 3. This avoids a race condition where two concurrent requests both pass the check
722 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.block", "", blockRecord, userAccessToken)
723 if err != nil {
724 // Check if this is a duplicate/conflict error from PDS
725 // PDS should return 409 Conflict for duplicate records, but we also check common error messages
726 // for compatibility with different PDS implementations
727 errMsg := err.Error()
728 isDuplicate := strings.Contains(errMsg, "status 409") || // HTTP 409 Conflict
729 strings.Contains(errMsg, "duplicate") ||
730 strings.Contains(errMsg, "already exists") ||
731 strings.Contains(errMsg, "AlreadyExists")
732
733 if isDuplicate {
734 // Fetch and return existing block from our indexed view
735 existingBlock, getErr := s.repo.GetBlock(ctx, userDID, communityDID)
736 if getErr == nil {
737 // Block exists in our index - return it
738 return existingBlock, nil
739 }
740 // Only treat as "already exists" if the error is ErrBlockNotFound (race condition)
741 // Any other error (DB outage, connection failure, etc.) should bubble up
742 if errors.Is(getErr, ErrBlockNotFound) {
743 // Race condition: PDS has the block but Jetstream hasn't indexed it yet
744 // Return typed conflict error so handler can return 409 instead of 500
745 // This is normal in eventually-consistent systems
746 return nil, ErrBlockAlreadyExists
747 }
748 // Real datastore error - bubble it up so operators see the failure
749 return nil, fmt.Errorf("PDS reported duplicate block but failed to fetch from index: %w", getErr)
750 }
751 return nil, fmt.Errorf("failed to create block on PDS: %w", err)
752 }
753
754 // Return block representation
755 block := &CommunityBlock{
756 UserDID: userDID,
757 CommunityDID: communityDID,
758 BlockedAt: time.Now(),
759 RecordURI: recordURI,
760 RecordCID: recordCID,
761 }
762
763 return block, nil
764}
765
766// UnblockCommunity removes a block via PDS delete
767func (s *communityService) UnblockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error {
768 if userDID == "" {
769 return NewValidationError("userDid", "required")
770 }
771 if userAccessToken == "" {
772 return NewValidationError("userAccessToken", "required")
773 }
774
775 // Resolve community identifier
776 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
777 if err != nil {
778 return err
779 }
780
781 // Get the block from AppView to find the record key
782 block, err := s.repo.GetBlock(ctx, userDID, communityDID)
783 if err != nil {
784 return err
785 }
786
787 // Extract rkey from record URI (at://did/collection/rkey)
788 rkey := utils.ExtractRKeyFromURI(block.RecordURI)
789 if rkey == "" {
790 return fmt.Errorf("invalid block record URI")
791 }
792
793 // Write-forward: delete record from PDS using user's access token
794 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.block", rkey, userAccessToken); err != nil {
795 return fmt.Errorf("failed to delete block on PDS: %w", err)
796 }
797
798 return nil
799}
800
801// GetBlockedCommunities queries AppView DB for user's blocks
802func (s *communityService) GetBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*CommunityBlock, error) {
803 if limit <= 0 || limit > 100 {
804 limit = 50
805 }
806
807 return s.repo.ListBlockedCommunities(ctx, userDID, limit, offset)
808}
809
810// IsBlocked checks if a user has blocked a community
811func (s *communityService) IsBlocked(ctx context.Context, userDID, communityIdentifier string) (bool, error) {
812 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
813 if err != nil {
814 return false, err
815 }
816
817 return s.repo.IsBlocked(ctx, userDID, communityDID)
818}
819
820// ValidateHandle checks if a community handle is valid
821func (s *communityService) ValidateHandle(handle string) error {
822 if handle == "" {
823 return NewValidationError("handle", "required")
824 }
825
826 if !communityHandleRegex.MatchString(handle) {
827 return ErrInvalidHandle
828 }
829
830 return nil
831}
832
833// ResolveCommunityIdentifier converts a community identifier to a DID
834// Following Bluesky's pattern with Coves extensions:
835//
836// Accepts (like Bluesky's at-identifier):
837// 1. DID: did:plc:abc123 (pass through)
838// 2. Canonical handle: gardening.community.coves.social (atProto standard)
839// 3. At-identifier: @gardening.community.coves.social (strip @ prefix)
840//
841// Coves-specific extensions:
842// 4. Scoped format: !gardening@coves.social (parse and resolve)
843//
844// Returns: DID string
845func (s *communityService) ResolveCommunityIdentifier(ctx context.Context, identifier string) (string, error) {
846 identifier = strings.TrimSpace(identifier)
847
848 if identifier == "" {
849 return "", ErrInvalidInput
850 }
851
852 // 1. DID - verify it exists and return (Bluesky standard)
853 if strings.HasPrefix(identifier, "did:") {
854 _, err := s.repo.GetByDID(ctx, identifier)
855 if err != nil {
856 if IsNotFound(err) {
857 return "", fmt.Errorf("community not found for DID %s: %w", identifier, err)
858 }
859 return "", fmt.Errorf("failed to verify community DID %s: %w", identifier, err)
860 }
861 return identifier, nil
862 }
863
864 // 2. Scoped format: !name@instance (Coves-specific)
865 if strings.HasPrefix(identifier, "!") {
866 return s.resolveScopedIdentifier(ctx, identifier)
867 }
868
869 // 3. At-identifier format: @handle (Bluesky standard - strip @ prefix)
870 if strings.HasPrefix(identifier, "@") {
871 identifier = strings.TrimPrefix(identifier, "@")
872 }
873
874 // 4. Canonical handle: name.community.instance.com (Bluesky standard)
875 if strings.Contains(identifier, ".") {
876 community, err := s.repo.GetByHandle(ctx, strings.ToLower(identifier))
877 if err != nil {
878 return "", fmt.Errorf("community not found for handle %s: %w", identifier, err)
879 }
880 return community.DID, nil
881 }
882
883 return "", NewValidationError("identifier", "must be a DID, handle, or scoped identifier (!name@instance)")
884}
885
886// resolveScopedIdentifier handles Coves-specific !name@instance format
887// Formats accepted:
888// !gardening@coves.social -> gardening.community.coves.social
889func (s *communityService) resolveScopedIdentifier(ctx context.Context, scoped string) (string, error) {
890 // Remove ! prefix
891 scoped = strings.TrimPrefix(scoped, "!")
892
893 var name string
894 var instanceDomain string
895
896 // Parse !name@instance
897 if !strings.Contains(scoped, "@") {
898 return "", NewValidationError("identifier", "scoped identifier must include @ symbol (!name@instance)")
899 }
900
901 parts := strings.SplitN(scoped, "@", 2)
902 name = strings.TrimSpace(parts[0])
903 instanceDomain = strings.TrimSpace(parts[1])
904
905 // Validate name format
906 if name == "" {
907 return "", NewValidationError("identifier", "community name cannot be empty")
908 }
909
910 // Validate name is a valid DNS label (RFC 1035)
911 // Must be 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen
912 if !isValidDNSLabel(name) {
913 return "", NewValidationError("identifier", "community name must be valid DNS label (alphanumeric and hyphens only, 1-63 chars, cannot start or end with hyphen)")
914 }
915
916 // Validate instance domain format
917 if !isValidDomain(instanceDomain) {
918 return "", NewValidationError("identifier", "invalid instance domain format")
919 }
920
921 // Normalize domain to lowercase (DNS is case-insensitive)
922 // This fixes the bug where !gardening@Coves.social would fail lookup
923 instanceDomain = strings.ToLower(instanceDomain)
924
925 // Validate the instance matches this server
926 if !s.isLocalInstance(instanceDomain) {
927 return "", NewValidationError("identifier",
928 fmt.Sprintf("community is not hosted on this instance (expected @%s)", s.instanceDomain))
929 }
930
931 // Construct canonical handle: {name}.community.{instanceDomain}
932 // Both name and instanceDomain are normalized to lowercase for consistent DB lookup
933 canonicalHandle := fmt.Sprintf("%s.community.%s",
934 strings.ToLower(name),
935 instanceDomain) // Already normalized to lowercase on line 923
936
937 // Look up by canonical handle
938 community, err := s.repo.GetByHandle(ctx, canonicalHandle)
939 if err != nil {
940 return "", fmt.Errorf("community not found for scoped identifier !%s@%s: %w", name, instanceDomain, err)
941 }
942
943 return community.DID, nil
944}
945
946// isLocalInstance checks if the provided domain matches this instance
947func (s *communityService) isLocalInstance(domain string) bool {
948 // Normalize both domains
949 domain = strings.ToLower(strings.TrimSpace(domain))
950 instanceDomain := strings.ToLower(s.instanceDomain)
951
952 // Direct match
953 return domain == instanceDomain
954}
955
956// Validation helpers
957
958// isValidDNSLabel validates that a string is a valid DNS label per RFC 1035
959// - 1-63 characters
960// - Alphanumeric and hyphens only
961// - Cannot start or end with hyphen
962func isValidDNSLabel(label string) bool {
963 return dnsLabelRegex.MatchString(label)
964}
965
966// isValidDomain validates that a string is a valid domain name
967// Simplified validation - checks basic DNS hostname structure
968func isValidDomain(domain string) bool {
969 if domain == "" || len(domain) > 253 {
970 return false
971 }
972 return domainRegex.MatchString(domain)
973}
974
975func (s *communityService) validateCreateRequest(req CreateCommunityRequest) error {
976 if req.Name == "" {
977 return NewValidationError("name", "required")
978 }
979
980 // DNS label limit: 63 characters per label
981 // Community handle format: {name}.community.{instanceDomain}
982 // The first label is just req.Name, so it must be <= 63 chars
983 if len(req.Name) > 63 {
984 return NewValidationError("name", "must be 63 characters or less (DNS label limit)")
985 }
986
987 // Name can only contain alphanumeric and hyphens
988 // Must start and end with alphanumeric (not hyphen)
989 nameRegex := regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
990 if !nameRegex.MatchString(req.Name) {
991 return NewValidationError("name", "must contain only alphanumeric characters and hyphens")
992 }
993
994 if req.Description != "" && len(req.Description) > 3000 {
995 return NewValidationError("description", "must be 3000 characters or less")
996 }
997
998 // Visibility should already be set with default in CreateCommunity
999 if req.Visibility != "public" && req.Visibility != "unlisted" && req.Visibility != "private" {
1000 return ErrInvalidVisibility
1001 }
1002
1003 if req.CreatedByDID == "" {
1004 return NewValidationError("createdByDid", "required")
1005 }
1006
1007 // hostedByDID is auto-populated by the service layer, no validation needed
1008 // The handler ensures clients cannot provide this field
1009
1010 return nil
1011}
1012
1013// PDS write-forward helpers
1014
1015// createRecordOnPDSAs creates a record with a specific access token (for V2 community auth)
1016func (s *communityService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) {
1017 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/"))
1018
1019 payload := map[string]interface{}{
1020 "repo": repoDID,
1021 "collection": collection,
1022 "record": record,
1023 }
1024
1025 if rkey != "" {
1026 payload["rkey"] = rkey
1027 }
1028
1029 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
1030}
1031
1032// putRecordOnPDSAs updates a record with a specific access token (for V2 community auth)
1033func (s *communityService) putRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) {
1034 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.putRecord", strings.TrimSuffix(s.pdsURL, "/"))
1035
1036 payload := map[string]interface{}{
1037 "repo": repoDID,
1038 "collection": collection,
1039 "rkey": rkey,
1040 "record": record,
1041 }
1042
1043 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
1044}
1045
1046// deleteRecordOnPDSAs deletes a record with a specific access token (for user-scoped deletions)
1047func (s *communityService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey, accessToken string) error {
1048 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/"))
1049
1050 payload := map[string]interface{}{
1051 "repo": repoDID,
1052 "collection": collection,
1053 "rkey": rkey,
1054 }
1055
1056 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
1057 return err
1058}
1059
1060// callPDSWithAuth makes a PDS call with a specific access token (V2: for community authentication)
1061func (s *communityService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) {
1062 jsonData, err := json.Marshal(payload)
1063 if err != nil {
1064 return "", "", fmt.Errorf("failed to marshal payload: %w", err)
1065 }
1066
1067 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData))
1068 if err != nil {
1069 return "", "", fmt.Errorf("failed to create request: %w", err)
1070 }
1071 req.Header.Set("Content-Type", "application/json")
1072
1073 // Add authentication with provided access token
1074 if accessToken != "" {
1075 req.Header.Set("Authorization", "Bearer "+accessToken)
1076 }
1077
1078 // Dynamic timeout based on operation type
1079 // Write operations (createAccount, createRecord, putRecord) are slower due to:
1080 // - Keypair generation
1081 // - DID PLC registration
1082 // - Database writes on PDS
1083 timeout := 10 * time.Second // Default for read operations
1084 if strings.Contains(endpoint, "createAccount") ||
1085 strings.Contains(endpoint, "createRecord") ||
1086 strings.Contains(endpoint, "putRecord") {
1087 timeout = 30 * time.Second // Extended timeout for write operations
1088 }
1089
1090 client := &http.Client{Timeout: timeout}
1091 resp, err := client.Do(req)
1092 if err != nil {
1093 return "", "", fmt.Errorf("failed to call PDS: %w", err)
1094 }
1095 defer func() {
1096 if closeErr := resp.Body.Close(); closeErr != nil {
1097 log.Printf("Failed to close response body: %v", closeErr)
1098 }
1099 }()
1100
1101 body, err := io.ReadAll(resp.Body)
1102 if err != nil {
1103 return "", "", fmt.Errorf("failed to read response: %w", err)
1104 }
1105
1106 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
1107 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
1108 }
1109
1110 // Parse response to extract URI and CID
1111 var result struct {
1112 URI string `json:"uri"`
1113 CID string `json:"cid"`
1114 }
1115 if err := json.Unmarshal(body, &result); err != nil {
1116 // For delete operations, there might not be a response body
1117 if method == "POST" && strings.Contains(endpoint, "deleteRecord") {
1118 return "", "", nil
1119 }
1120 return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
1121 }
1122
1123 return result.URI, result.CID, nil
1124}
1125
1126// Helper functions