A community based topic aggregation platform built on atproto
1package communities
2
3import (
4 "Coves/internal/atproto/utils"
5 "bytes"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log"
12 "net/http"
13 "regexp"
14 "strings"
15 "sync"
16 "time"
17)
18
19// Community handle validation regex (DNS-valid handle: name.community.instance.com)
20// Matches standard DNS hostname format (RFC 1035)
21var communityHandleRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
22
23// DNS label validation (RFC 1035: 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen)
24var dnsLabelRegex = regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
25
26// Domain validation (simplified - checks for valid DNS hostname structure)
27var domainRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)*[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
28
29type communityService struct {
30 // Interfaces and pointers first (better alignment)
31 repo Repository
32 provisioner *PDSAccountProvisioner
33
34 // Token refresh concurrency control
35 // Each community gets its own mutex to prevent concurrent refresh attempts
36 refreshMutexes map[string]*sync.Mutex
37
38 // Strings
39 pdsURL string
40 instanceDID string
41 instanceDomain string
42 pdsAccessToken string
43
44 // Sync primitives last
45 mapMutex sync.RWMutex // Protects refreshMutexes map itself
46}
47
48const (
49 // Maximum recommended size for mutex cache (warning threshold, not hard limit)
50 // At 10,000 entries × 16 bytes = ~160KB memory (negligible overhead)
51 // Map can grow larger in production - even 100,000 entries = 1.6MB is acceptable
52 maxMutexCacheSize = 10000
53)
54
55// NewCommunityService creates a new community service
56func NewCommunityService(repo Repository, pdsURL, instanceDID, instanceDomain string, provisioner *PDSAccountProvisioner) Service {
57 // SECURITY: Basic validation that did:web domain matches configured instanceDomain
58 // This catches honest configuration mistakes but NOT malicious code modifications
59 // Full verification (Phase 2) requires fetching DID document from domain
60 // See: docs/PRD_BACKLOG.md - "did:web Domain Verification"
61 if strings.HasPrefix(instanceDID, "did:web:") {
62 didDomain := strings.TrimPrefix(instanceDID, "did:web:")
63 if didDomain != instanceDomain {
64 log.Printf("⚠️ SECURITY WARNING: Instance DID domain (%s) doesn't match configured domain (%s)",
65 didDomain, instanceDomain)
66 log.Printf(" This could indicate a configuration error or potential domain spoofing attempt")
67 log.Printf(" Communities will be hosted by: %s", instanceDID)
68 }
69 }
70
71 return &communityService{
72 repo: repo,
73 pdsURL: pdsURL,
74 instanceDID: instanceDID,
75 instanceDomain: instanceDomain,
76 provisioner: provisioner,
77 refreshMutexes: make(map[string]*sync.Mutex),
78 }
79}
80
81// SetPDSAccessToken sets the PDS access token for authentication
82// This should be called after creating a session for the Coves instance DID on the PDS
83func (s *communityService) SetPDSAccessToken(token string) {
84 s.pdsAccessToken = token
85}
86
87// CreateCommunity creates a new community via write-forward to PDS
88// V2 Flow:
89// 1. Service creates PDS account for community (PDS generates signing keypair)
90// 2. Service writes community profile to COMMUNITY's own repository
91// 3. Firehose emits event
92// 4. Consumer indexes to AppView DB
93//
94// V2 Architecture:
95// - Community owns its own repository (at://community_did/social.coves.community.profile/self)
96// - PDS manages the signing keypair (we never see it)
97// - We store PDS credentials to act on behalf of the community
98// - Community can migrate to other instances (future V2.1 with rotation keys)
99func (s *communityService) CreateCommunity(ctx context.Context, req CreateCommunityRequest) (*Community, error) {
100 // Apply defaults before validation
101 if req.Visibility == "" {
102 req.Visibility = "public"
103 }
104
105 // SECURITY: Auto-populate hostedByDID from instance configuration
106 // Clients MUST NOT provide this field - it's derived from the instance receiving the request
107 // This prevents malicious instances from claiming to host communities for domains they don't own
108 req.HostedByDID = s.instanceDID
109
110 // Validate request
111 if err := s.validateCreateRequest(req); err != nil {
112 return nil, err
113 }
114
115 // V2: Provision a real PDS account for this community
116 // This calls com.atproto.server.createAccount internally
117 // The PDS will:
118 // 1. Generate a signing keypair (stored in PDS, we never see it)
119 // 2. Create a DID (did:plc:xxx)
120 // 3. Return credentials (DID, tokens)
121 pdsAccount, err := s.provisioner.ProvisionCommunityAccount(ctx, req.Name)
122 if err != nil {
123 return nil, fmt.Errorf("failed to provision PDS account for community: %w", err)
124 }
125
126 // Validate the atProto handle
127 if validateErr := s.ValidateHandle(pdsAccount.Handle); validateErr != nil {
128 return nil, fmt.Errorf("generated atProto handle is invalid: %w", validateErr)
129 }
130
131 // Build community profile record
132 profile := map[string]interface{}{
133 "$type": "social.coves.community.profile",
134 "handle": pdsAccount.Handle, // atProto handle (e.g., gaming.community.coves.social)
135 "name": req.Name, // Short name for !mentions (e.g., "gaming")
136 "visibility": req.Visibility,
137 "hostedBy": s.instanceDID, // V2: Instance hosts, community owns
138 "createdBy": req.CreatedByDID,
139 "createdAt": time.Now().Format(time.RFC3339),
140 "federation": map[string]interface{}{
141 "allowExternalDiscovery": req.AllowExternalDiscovery,
142 },
143 }
144
145 // Add optional fields
146 if req.DisplayName != "" {
147 profile["displayName"] = req.DisplayName
148 }
149 if req.Description != "" {
150 profile["description"] = req.Description
151 }
152 if len(req.Rules) > 0 {
153 profile["rules"] = req.Rules
154 }
155 if len(req.Categories) > 0 {
156 profile["categories"] = req.Categories
157 }
158 if req.Language != "" {
159 profile["language"] = req.Language
160 }
161
162 // Initialize counts
163 profile["memberCount"] = 0
164 profile["subscriberCount"] = 0
165
166 // TODO: Handle avatar and banner blobs
167 // For now, we'll skip blob uploads. This would require:
168 // 1. Upload blob to PDS via com.atproto.repo.uploadBlob
169 // 2. Get blob ref (CID)
170 // 3. Add to profile record
171
172 // V2: Write to COMMUNITY's own repository (not instance repo!)
173 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self
174 // Authenticate using community's access token
175 recordURI, recordCID, err := s.createRecordOnPDSAs(
176 ctx,
177 pdsAccount.DID, // repo = community's DID (community owns its repo!)
178 "social.coves.community.profile",
179 "self", // canonical rkey for profile
180 profile,
181 pdsAccount.AccessToken, // authenticate as the community
182 )
183 if err != nil {
184 return nil, fmt.Errorf("failed to create community profile record: %w", err)
185 }
186
187 // Build Community object with PDS credentials AND cryptographic keys
188 community := &Community{
189 DID: pdsAccount.DID, // Community's DID (owns the repo!)
190 Handle: pdsAccount.Handle, // atProto handle (e.g., gaming.community.coves.social)
191 Name: req.Name,
192 DisplayName: req.DisplayName,
193 Description: req.Description,
194 OwnerDID: pdsAccount.DID, // V2: Community owns itself
195 CreatedByDID: req.CreatedByDID,
196 HostedByDID: req.HostedByDID,
197 PDSEmail: pdsAccount.Email,
198 PDSPassword: pdsAccount.Password,
199 PDSAccessToken: pdsAccount.AccessToken,
200 PDSRefreshToken: pdsAccount.RefreshToken,
201 PDSURL: pdsAccount.PDSURL,
202 Visibility: req.Visibility,
203 AllowExternalDiscovery: req.AllowExternalDiscovery,
204 MemberCount: 0,
205 SubscriberCount: 0,
206 CreatedAt: time.Now(),
207 UpdatedAt: time.Now(),
208 RecordURI: recordURI,
209 RecordCID: recordCID,
210 // V2: Cryptographic keys for portability (will be encrypted by repository)
211 RotationKeyPEM: pdsAccount.RotationKeyPEM, // CRITICAL: Enables DID migration
212 SigningKeyPEM: pdsAccount.SigningKeyPEM, // For atproto operations
213 }
214
215 // CRITICAL: Persist PDS credentials immediately to database
216 // The Jetstream consumer will eventually index the community profile from the firehose,
217 // but it won't have the PDS credentials. We must store them now so we can:
218 // 1. Update the community profile later (using its own credentials)
219 // 2. Re-authenticate if access tokens expire
220 _, err = s.repo.Create(ctx, community)
221 if err != nil {
222 return nil, fmt.Errorf("failed to persist community with credentials: %w", err)
223 }
224
225 return community, nil
226}
227
228// GetCommunity retrieves a community from AppView DB
229// identifier can be either a DID or handle
230func (s *communityService) GetCommunity(ctx context.Context, identifier string) (*Community, error) {
231 if identifier == "" {
232 return nil, ErrInvalidInput
233 }
234
235 // Determine if identifier is DID or handle
236 if strings.HasPrefix(identifier, "did:") {
237 return s.repo.GetByDID(ctx, identifier)
238 }
239
240 if strings.HasPrefix(identifier, "!") {
241 return s.repo.GetByHandle(ctx, identifier)
242 }
243
244 return nil, NewValidationError("identifier", "must be a DID or handle")
245}
246
247// GetByDID retrieves a community by its DID
248// Exported for use by post service when validating community references
249func (s *communityService) GetByDID(ctx context.Context, did string) (*Community, error) {
250 if did == "" {
251 return nil, ErrInvalidInput
252 }
253
254 if !strings.HasPrefix(did, "did:") {
255 return nil, NewValidationError("did", "must be a valid DID")
256 }
257
258 return s.repo.GetByDID(ctx, did)
259}
260
261// UpdateCommunity updates a community via write-forward to PDS
262func (s *communityService) UpdateCommunity(ctx context.Context, req UpdateCommunityRequest) (*Community, error) {
263 if req.CommunityDID == "" {
264 return nil, NewValidationError("communityDid", "required")
265 }
266
267 if req.UpdatedByDID == "" {
268 return nil, NewValidationError("updatedByDid", "required")
269 }
270
271 // Get existing community
272 existing, err := s.repo.GetByDID(ctx, req.CommunityDID)
273 if err != nil {
274 return nil, err
275 }
276
277 // CRITICAL: Ensure fresh PDS access token before write operation
278 // Community PDS tokens expire every ~2 hours and must be refreshed
279 existing, err = s.EnsureFreshToken(ctx, existing)
280 if err != nil {
281 return nil, fmt.Errorf("failed to ensure fresh credentials: %w", err)
282 }
283
284 // Authorization: verify user is the creator
285 // TODO(Communities-Auth): Add moderator check when moderation system is implemented
286 if existing.CreatedByDID != req.UpdatedByDID {
287 return nil, ErrUnauthorized
288 }
289
290 // Build updated profile record (start with existing)
291 profile := map[string]interface{}{
292 "$type": "social.coves.community.profile",
293 "handle": existing.Handle,
294 "name": existing.Name,
295 "owner": existing.OwnerDID,
296 "createdBy": existing.CreatedByDID,
297 "hostedBy": existing.HostedByDID,
298 "createdAt": existing.CreatedAt.Format(time.RFC3339),
299 }
300
301 // Apply updates
302 if req.DisplayName != nil {
303 profile["displayName"] = *req.DisplayName
304 } else {
305 profile["displayName"] = existing.DisplayName
306 }
307
308 if req.Description != nil {
309 profile["description"] = *req.Description
310 } else {
311 profile["description"] = existing.Description
312 }
313
314 if req.Visibility != nil {
315 profile["visibility"] = *req.Visibility
316 } else {
317 profile["visibility"] = existing.Visibility
318 }
319
320 if req.AllowExternalDiscovery != nil {
321 profile["federation"] = map[string]interface{}{
322 "allowExternalDiscovery": *req.AllowExternalDiscovery,
323 }
324 } else {
325 profile["federation"] = map[string]interface{}{
326 "allowExternalDiscovery": existing.AllowExternalDiscovery,
327 }
328 }
329
330 // Preserve moderation settings (even if empty)
331 // These fields are optional but should not be erased on update
332 if req.ModerationType != nil {
333 profile["moderationType"] = *req.ModerationType
334 } else if existing.ModerationType != "" {
335 profile["moderationType"] = existing.ModerationType
336 }
337
338 if len(req.ContentWarnings) > 0 {
339 profile["contentWarnings"] = req.ContentWarnings
340 } else if len(existing.ContentWarnings) > 0 {
341 profile["contentWarnings"] = existing.ContentWarnings
342 }
343
344 // Preserve counts
345 profile["memberCount"] = existing.MemberCount
346 profile["subscriberCount"] = existing.SubscriberCount
347
348 // V2: Community profiles always use "self" as rkey
349 // (No need to extract from URI - it's always "self" for V2 communities)
350
351 // V2 CRITICAL FIX: Write-forward using COMMUNITY's own DID and credentials
352 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self
353 // Authenticate as the community (not as instance!)
354 if existing.PDSAccessToken == "" {
355 return nil, fmt.Errorf("community %s missing PDS credentials - cannot update", existing.DID)
356 }
357
358 recordURI, recordCID, err := s.putRecordOnPDSAs(
359 ctx,
360 existing.DID, // repo = community's own DID (V2!)
361 "social.coves.community.profile",
362 "self", // V2: always "self"
363 profile,
364 existing.PDSAccessToken, // authenticate as the community
365 )
366 if err != nil {
367 return nil, fmt.Errorf("failed to update community on PDS: %w", err)
368 }
369
370 // Return updated community representation
371 // Actual AppView DB update happens via Jetstream consumer
372 updated := *existing
373 if req.DisplayName != nil {
374 updated.DisplayName = *req.DisplayName
375 }
376 if req.Description != nil {
377 updated.Description = *req.Description
378 }
379 if req.Visibility != nil {
380 updated.Visibility = *req.Visibility
381 }
382 if req.AllowExternalDiscovery != nil {
383 updated.AllowExternalDiscovery = *req.AllowExternalDiscovery
384 }
385 if req.ModerationType != nil {
386 updated.ModerationType = *req.ModerationType
387 }
388 if len(req.ContentWarnings) > 0 {
389 updated.ContentWarnings = req.ContentWarnings
390 }
391 updated.RecordURI = recordURI
392 updated.RecordCID = recordCID
393 updated.UpdatedAt = time.Now()
394
395 return &updated, nil
396}
397
398// getOrCreateRefreshMutex returns a mutex for the given community DID
399// Thread-safe with read-lock fast path for existing entries
400// SAFETY: Does NOT evict entries to avoid race condition where:
401// 1. Thread A holds mutex for community-123
402// 2. Thread B evicts community-123 from map
403// 3. Thread C creates NEW mutex for community-123
404// 4. Now two threads can refresh community-123 concurrently (mutex defeated!)
405func (s *communityService) getOrCreateRefreshMutex(did string) *sync.Mutex {
406 // Fast path: check if mutex already exists (read lock)
407 s.mapMutex.RLock()
408 mutex, exists := s.refreshMutexes[did]
409 s.mapMutex.RUnlock()
410
411 if exists {
412 return mutex
413 }
414
415 // Slow path: create new mutex (write lock)
416 s.mapMutex.Lock()
417 defer s.mapMutex.Unlock()
418
419 // Double-check after acquiring write lock (another goroutine might have created it)
420 mutex, exists = s.refreshMutexes[did]
421 if exists {
422 return mutex
423 }
424
425 // Create new mutex
426 mutex = &sync.Mutex{}
427 s.refreshMutexes[did] = mutex
428
429 // SAFETY: No eviction to prevent race condition
430 // Map will grow beyond maxMutexCacheSize but this is safer than evicting in-use mutexes
431 if len(s.refreshMutexes) > maxMutexCacheSize {
432 memoryKB := len(s.refreshMutexes) * 16 / 1024
433 log.Printf("[TOKEN-REFRESH] WARN: Mutex cache size (%d) exceeds recommended limit (%d) - this is safe but may indicate high community churn. Memory usage: ~%d KB",
434 len(s.refreshMutexes), maxMutexCacheSize, memoryKB)
435 }
436
437 return mutex
438}
439
440// ensureFreshToken checks if a community's access token needs refresh and updates if needed
441// Returns updated community with fresh credentials (or original if no refresh needed)
442// Thread-safe: Uses per-community mutex to prevent concurrent refresh attempts
443// EnsureFreshToken ensures the community's PDS access token is valid
444// Exported for use by post service when writing posts to community repos
445func (s *communityService) EnsureFreshToken(ctx context.Context, community *Community) (*Community, error) {
446 // Get or create mutex for this specific community DID
447 mutex := s.getOrCreateRefreshMutex(community.DID)
448
449 // Lock for this specific community (allows other communities to refresh concurrently)
450 mutex.Lock()
451 defer mutex.Unlock()
452
453 // Re-fetch community from DB (another goroutine might have already refreshed it)
454 fresh, err := s.repo.GetByDID(ctx, community.DID)
455 if err != nil {
456 return nil, fmt.Errorf("failed to re-fetch community: %w", err)
457 }
458
459 // Check if token needs refresh (5-minute buffer before expiration)
460 needsRefresh, err := NeedsRefresh(fresh.PDSAccessToken)
461 if err != nil {
462 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_parse_failed, Error: %v", fresh.DID, err)
463 return nil, fmt.Errorf("failed to check token expiration: %w", err)
464 }
465
466 if !needsRefresh {
467 // Token still valid, no refresh needed
468 return fresh, nil
469 }
470
471 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refresh_started, Message: Access token expiring soon", fresh.DID)
472
473 // Attempt token refresh using refresh token
474 newAccessToken, newRefreshToken, err := refreshPDSToken(ctx, fresh.PDSURL, fresh.PDSAccessToken, fresh.PDSRefreshToken)
475 if err != nil {
476 // Check if refresh token expired (need password fallback)
477 // Match both "ExpiredToken" and "Token has expired" error messages
478 if strings.Contains(strings.ToLower(err.Error()), "expired") {
479 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_token_expired, Message: Re-authenticating with password", fresh.DID)
480
481 // Fallback: Re-authenticate with stored password
482 newAccessToken, newRefreshToken, err = reauthenticateWithPassword(
483 ctx,
484 fresh.PDSURL,
485 fresh.PDSEmail,
486 fresh.PDSPassword, // Retrieved decrypted from DB
487 )
488 if err != nil {
489 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_auth_failed, Error: %v", fresh.DID, err)
490 return nil, fmt.Errorf("failed to re-authenticate community: %w", err)
491 }
492
493 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_fallback_success, Message: Re-authenticated after refresh token expiry", fresh.DID)
494 } else {
495 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_failed, Error: %v", fresh.DID, err)
496 return nil, fmt.Errorf("failed to refresh token: %w", err)
497 }
498 }
499
500 // CRITICAL: Update database with new tokens immediately
501 // Refresh tokens are SINGLE-USE - old one is now invalid
502 // Use retry logic to handle transient DB failures
503 const maxRetries = 3
504 var updateErr error
505 for attempt := 0; attempt < maxRetries; attempt++ {
506 updateErr = s.repo.UpdateCredentials(ctx, fresh.DID, newAccessToken, newRefreshToken)
507 if updateErr == nil {
508 break // Success
509 }
510
511 log.Printf("[TOKEN-REFRESH] Community: %s, Event: db_update_retry, Attempt: %d/%d, Error: %v",
512 fresh.DID, attempt+1, maxRetries, updateErr)
513
514 if attempt < maxRetries-1 {
515 // Exponential backoff: 100ms, 200ms, 400ms
516 backoff := time.Duration(1<<attempt) * 100 * time.Millisecond
517 time.Sleep(backoff)
518 }
519 }
520
521 if updateErr != nil {
522 // CRITICAL: Community is now locked out - old refresh token invalid, new one not saved
523 log.Printf("[TOKEN-REFRESH] CRITICAL: Community %s LOCKED OUT - failed to persist credentials after %d retries: %v",
524 fresh.DID, maxRetries, updateErr)
525 // TODO: Send alert to monitoring system (add in Beta)
526 return nil, fmt.Errorf("failed to persist refreshed credentials after %d retries (COMMUNITY LOCKED OUT): %w",
527 maxRetries, updateErr)
528 }
529
530 // Return updated community object with fresh tokens
531 updatedCommunity := *fresh
532 updatedCommunity.PDSAccessToken = newAccessToken
533 updatedCommunity.PDSRefreshToken = newRefreshToken
534
535 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refreshed, Message: Access token refreshed successfully", fresh.DID)
536
537 return &updatedCommunity, nil
538}
539
540// ListCommunities queries AppView DB for communities with filters
541func (s *communityService) ListCommunities(ctx context.Context, req ListCommunitiesRequest) ([]*Community, int, error) {
542 // Set defaults
543 if req.Limit <= 0 || req.Limit > 100 {
544 req.Limit = 50
545 }
546
547 return s.repo.List(ctx, req)
548}
549
550// SearchCommunities performs fuzzy search in AppView DB
551func (s *communityService) SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error) {
552 if req.Query == "" {
553 return nil, 0, NewValidationError("query", "search query is required")
554 }
555
556 // Set defaults
557 if req.Limit <= 0 || req.Limit > 100 {
558 req.Limit = 50
559 }
560
561 return s.repo.Search(ctx, req)
562}
563
564// SubscribeToCommunity creates a subscription via write-forward to PDS
565func (s *communityService) SubscribeToCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string, contentVisibility int) (*Subscription, error) {
566 if userDID == "" {
567 return nil, NewValidationError("userDid", "required")
568 }
569 if userAccessToken == "" {
570 return nil, NewValidationError("userAccessToken", "required")
571 }
572
573 // Clamp contentVisibility to valid range (1-5), default to 3 if 0 or invalid
574 if contentVisibility <= 0 || contentVisibility > 5 {
575 contentVisibility = 3
576 }
577
578 // Resolve community identifier to DID
579 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
580 if err != nil {
581 return nil, err
582 }
583
584 // Verify community exists
585 community, err := s.repo.GetByDID(ctx, communityDID)
586 if err != nil {
587 return nil, err
588 }
589
590 // Check visibility - can't subscribe to private communities without invitation (TODO)
591 if community.Visibility == "private" {
592 return nil, ErrUnauthorized
593 }
594
595 // Build subscription record
596 // CRITICAL: Collection is social.coves.community.subscription (RECORD TYPE), not social.coves.community.subscribe (XRPC procedure)
597 // This record will be created in the USER's repository: at://user_did/social.coves.community.subscription/{tid}
598 // Following atProto conventions, we use "subject" field to reference the community
599 subRecord := map[string]interface{}{
600 "$type": "social.coves.community.subscription",
601 "subject": communityDID, // atProto convention: "subject" for entity references
602 "createdAt": time.Now().Format(time.RFC3339),
603 "contentVisibility": contentVisibility,
604 }
605
606 // Write-forward: create subscription record in user's repo using their access token
607 // The collection parameter refers to the record type in the repository
608 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", "", subRecord, userAccessToken)
609 if err != nil {
610 return nil, fmt.Errorf("failed to create subscription on PDS: %w", err)
611 }
612
613 // Return subscription representation
614 subscription := &Subscription{
615 UserDID: userDID,
616 CommunityDID: communityDID,
617 ContentVisibility: contentVisibility,
618 SubscribedAt: time.Now(),
619 RecordURI: recordURI,
620 RecordCID: recordCID,
621 }
622
623 return subscription, nil
624}
625
626// UnsubscribeFromCommunity removes a subscription via PDS delete
627func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error {
628 if userDID == "" {
629 return NewValidationError("userDid", "required")
630 }
631 if userAccessToken == "" {
632 return NewValidationError("userAccessToken", "required")
633 }
634
635 // Resolve community identifier
636 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
637 if err != nil {
638 return err
639 }
640
641 // Get the subscription from AppView to find the record key
642 subscription, err := s.repo.GetSubscription(ctx, userDID, communityDID)
643 if err != nil {
644 return err
645 }
646
647 // Extract rkey from record URI (at://did/collection/rkey)
648 rkey := utils.ExtractRKeyFromURI(subscription.RecordURI)
649 if rkey == "" {
650 return fmt.Errorf("invalid subscription record URI")
651 }
652
653 // Write-forward: delete record from PDS using user's access token
654 // CRITICAL: Delete from social.coves.community.subscription (RECORD TYPE), not social.coves.community.unsubscribe
655 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", rkey, userAccessToken); err != nil {
656 return fmt.Errorf("failed to delete subscription on PDS: %w", err)
657 }
658
659 return nil
660}
661
662// GetUserSubscriptions queries AppView DB for user's subscriptions
663func (s *communityService) GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) {
664 if limit <= 0 || limit > 100 {
665 limit = 50
666 }
667
668 return s.repo.ListSubscriptions(ctx, userDID, limit, offset)
669}
670
671// GetCommunitySubscribers queries AppView DB for community subscribers
672func (s *communityService) GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) {
673 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
674 if err != nil {
675 return nil, err
676 }
677
678 if limit <= 0 || limit > 100 {
679 limit = 50
680 }
681
682 return s.repo.ListSubscribers(ctx, communityDID, limit, offset)
683}
684
685// GetMembership retrieves membership info from AppView DB
686func (s *communityService) GetMembership(ctx context.Context, userDID, communityIdentifier string) (*Membership, error) {
687 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
688 if err != nil {
689 return nil, err
690 }
691
692 return s.repo.GetMembership(ctx, userDID, communityDID)
693}
694
695// ListCommunityMembers queries AppView DB for members
696func (s *communityService) ListCommunityMembers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Membership, error) {
697 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
698 if err != nil {
699 return nil, err
700 }
701
702 if limit <= 0 || limit > 100 {
703 limit = 50
704 }
705
706 return s.repo.ListMembers(ctx, communityDID, limit, offset)
707}
708
709// BlockCommunity blocks a community via write-forward to PDS
710func (s *communityService) BlockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) (*CommunityBlock, error) {
711 if userDID == "" {
712 return nil, NewValidationError("userDid", "required")
713 }
714 if userAccessToken == "" {
715 return nil, NewValidationError("userAccessToken", "required")
716 }
717
718 // Resolve community identifier (also verifies community exists)
719 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
720 if err != nil {
721 return nil, err
722 }
723
724 // Build block record
725 // CRITICAL: Collection is social.coves.community.block (RECORD TYPE)
726 // This record will be created in the USER's repository: at://user_did/social.coves.community.block/{tid}
727 // Following atProto conventions and Bluesky's app.bsky.graph.block pattern
728 blockRecord := map[string]interface{}{
729 "$type": "social.coves.community.block",
730 "subject": communityDID, // DID of community being blocked
731 "createdAt": time.Now().Format(time.RFC3339),
732 }
733
734 // Write-forward: create block record in user's repo using their access token
735 // Note: We don't check for existing blocks first because:
736 // 1. The PDS may reject duplicates (depending on implementation)
737 // 2. The repository layer handles idempotency with ON CONFLICT DO NOTHING
738 // 3. This avoids a race condition where two concurrent requests both pass the check
739 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.block", "", blockRecord, userAccessToken)
740 if err != nil {
741 // Check if this is a duplicate/conflict error from PDS
742 // PDS should return 409 Conflict for duplicate records, but we also check common error messages
743 // for compatibility with different PDS implementations
744 errMsg := err.Error()
745 isDuplicate := strings.Contains(errMsg, "status 409") || // HTTP 409 Conflict
746 strings.Contains(errMsg, "duplicate") ||
747 strings.Contains(errMsg, "already exists") ||
748 strings.Contains(errMsg, "AlreadyExists")
749
750 if isDuplicate {
751 // Fetch and return existing block from our indexed view
752 existingBlock, getErr := s.repo.GetBlock(ctx, userDID, communityDID)
753 if getErr == nil {
754 // Block exists in our index - return it
755 return existingBlock, nil
756 }
757 // Only treat as "already exists" if the error is ErrBlockNotFound (race condition)
758 // Any other error (DB outage, connection failure, etc.) should bubble up
759 if errors.Is(getErr, ErrBlockNotFound) {
760 // Race condition: PDS has the block but Jetstream hasn't indexed it yet
761 // Return typed conflict error so handler can return 409 instead of 500
762 // This is normal in eventually-consistent systems
763 return nil, ErrBlockAlreadyExists
764 }
765 // Real datastore error - bubble it up so operators see the failure
766 return nil, fmt.Errorf("PDS reported duplicate block but failed to fetch from index: %w", getErr)
767 }
768 return nil, fmt.Errorf("failed to create block on PDS: %w", err)
769 }
770
771 // Return block representation
772 block := &CommunityBlock{
773 UserDID: userDID,
774 CommunityDID: communityDID,
775 BlockedAt: time.Now(),
776 RecordURI: recordURI,
777 RecordCID: recordCID,
778 }
779
780 return block, nil
781}
782
783// UnblockCommunity removes a block via PDS delete
784func (s *communityService) UnblockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error {
785 if userDID == "" {
786 return NewValidationError("userDid", "required")
787 }
788 if userAccessToken == "" {
789 return NewValidationError("userAccessToken", "required")
790 }
791
792 // Resolve community identifier
793 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
794 if err != nil {
795 return err
796 }
797
798 // Get the block from AppView to find the record key
799 block, err := s.repo.GetBlock(ctx, userDID, communityDID)
800 if err != nil {
801 return err
802 }
803
804 // Extract rkey from record URI (at://did/collection/rkey)
805 rkey := utils.ExtractRKeyFromURI(block.RecordURI)
806 if rkey == "" {
807 return fmt.Errorf("invalid block record URI")
808 }
809
810 // Write-forward: delete record from PDS using user's access token
811 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.block", rkey, userAccessToken); err != nil {
812 return fmt.Errorf("failed to delete block on PDS: %w", err)
813 }
814
815 return nil
816}
817
818// GetBlockedCommunities queries AppView DB for user's blocks
819func (s *communityService) GetBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*CommunityBlock, error) {
820 if limit <= 0 || limit > 100 {
821 limit = 50
822 }
823
824 return s.repo.ListBlockedCommunities(ctx, userDID, limit, offset)
825}
826
827// IsBlocked checks if a user has blocked a community
828func (s *communityService) IsBlocked(ctx context.Context, userDID, communityIdentifier string) (bool, error) {
829 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
830 if err != nil {
831 return false, err
832 }
833
834 return s.repo.IsBlocked(ctx, userDID, communityDID)
835}
836
837// ValidateHandle checks if a community handle is valid
838func (s *communityService) ValidateHandle(handle string) error {
839 if handle == "" {
840 return NewValidationError("handle", "required")
841 }
842
843 if !communityHandleRegex.MatchString(handle) {
844 return ErrInvalidHandle
845 }
846
847 return nil
848}
849
850// ResolveCommunityIdentifier converts a community identifier to a DID
851// Following Bluesky's pattern with Coves extensions:
852//
853// Accepts (like Bluesky's at-identifier):
854// 1. DID: did:plc:abc123 (pass through)
855// 2. Canonical handle: gardening.community.coves.social (atProto standard)
856// 3. At-identifier: @gardening.community.coves.social (strip @ prefix)
857//
858// Coves-specific extensions:
859// 4. Scoped format: !gardening@coves.social (parse and resolve)
860//
861// Returns: DID string
862func (s *communityService) ResolveCommunityIdentifier(ctx context.Context, identifier string) (string, error) {
863 identifier = strings.TrimSpace(identifier)
864
865 if identifier == "" {
866 return "", ErrInvalidInput
867 }
868
869 // 1. DID - verify it exists and return (Bluesky standard)
870 if strings.HasPrefix(identifier, "did:") {
871 _, err := s.repo.GetByDID(ctx, identifier)
872 if err != nil {
873 if IsNotFound(err) {
874 return "", fmt.Errorf("community not found for DID %s: %w", identifier, err)
875 }
876 return "", fmt.Errorf("failed to verify community DID %s: %w", identifier, err)
877 }
878 return identifier, nil
879 }
880
881 // 2. Scoped format: !name@instance (Coves-specific)
882 if strings.HasPrefix(identifier, "!") {
883 return s.resolveScopedIdentifier(ctx, identifier)
884 }
885
886 // 3. At-identifier format: @handle (Bluesky standard - strip @ prefix)
887 identifier = strings.TrimPrefix(identifier, "@")
888
889 // 4. Canonical handle: name.community.instance.com (Bluesky standard)
890 if strings.Contains(identifier, ".") {
891 community, err := s.repo.GetByHandle(ctx, strings.ToLower(identifier))
892 if err != nil {
893 return "", fmt.Errorf("community not found for handle %s: %w", identifier, err)
894 }
895 return community.DID, nil
896 }
897
898 return "", NewValidationError("identifier", "must be a DID, handle, or scoped identifier (!name@instance)")
899}
900
901// resolveScopedIdentifier handles Coves-specific !name@instance format
902// Formats accepted:
903//
904// !gardening@coves.social -> gardening.community.coves.social
905func (s *communityService) resolveScopedIdentifier(ctx context.Context, scoped string) (string, error) {
906 // Remove ! prefix
907 scoped = strings.TrimPrefix(scoped, "!")
908
909 var name string
910 var instanceDomain string
911
912 // Parse !name@instance
913 if !strings.Contains(scoped, "@") {
914 return "", NewValidationError("identifier", "scoped identifier must include @ symbol (!name@instance)")
915 }
916
917 parts := strings.SplitN(scoped, "@", 2)
918 name = strings.TrimSpace(parts[0])
919 instanceDomain = strings.TrimSpace(parts[1])
920
921 // Validate name format
922 if name == "" {
923 return "", NewValidationError("identifier", "community name cannot be empty")
924 }
925
926 // Validate name is a valid DNS label (RFC 1035)
927 // Must be 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen
928 if !isValidDNSLabel(name) {
929 return "", NewValidationError("identifier", "community name must be valid DNS label (alphanumeric and hyphens only, 1-63 chars, cannot start or end with hyphen)")
930 }
931
932 // Validate instance domain format
933 if !isValidDomain(instanceDomain) {
934 return "", NewValidationError("identifier", "invalid instance domain format")
935 }
936
937 // Normalize domain to lowercase (DNS is case-insensitive)
938 // This fixes the bug where !gardening@Coves.social would fail lookup
939 instanceDomain = strings.ToLower(instanceDomain)
940
941 // Validate the instance matches this server
942 if !s.isLocalInstance(instanceDomain) {
943 return "", NewValidationError("identifier",
944 fmt.Sprintf("community is not hosted on this instance (expected @%s)", s.instanceDomain))
945 }
946
947 // Construct canonical handle: {name}.community.{instanceDomain}
948 // Both name and instanceDomain are normalized to lowercase for consistent DB lookup
949 canonicalHandle := fmt.Sprintf("%s.community.%s",
950 strings.ToLower(name),
951 instanceDomain) // Already normalized to lowercase on line 923
952
953 // Look up by canonical handle
954 community, err := s.repo.GetByHandle(ctx, canonicalHandle)
955 if err != nil {
956 return "", fmt.Errorf("community not found for scoped identifier !%s@%s: %w", name, instanceDomain, err)
957 }
958
959 return community.DID, nil
960}
961
962// isLocalInstance checks if the provided domain matches this instance
963func (s *communityService) isLocalInstance(domain string) bool {
964 // Normalize both domains
965 domain = strings.ToLower(strings.TrimSpace(domain))
966 instanceDomain := strings.ToLower(s.instanceDomain)
967
968 // Direct match
969 return domain == instanceDomain
970}
971
972// Validation helpers
973
974// isValidDNSLabel validates that a string is a valid DNS label per RFC 1035
975// - 1-63 characters
976// - Alphanumeric and hyphens only
977// - Cannot start or end with hyphen
978func isValidDNSLabel(label string) bool {
979 return dnsLabelRegex.MatchString(label)
980}
981
982// isValidDomain validates that a string is a valid domain name
983// Simplified validation - checks basic DNS hostname structure
984func isValidDomain(domain string) bool {
985 if domain == "" || len(domain) > 253 {
986 return false
987 }
988 return domainRegex.MatchString(domain)
989}
990
991func (s *communityService) validateCreateRequest(req CreateCommunityRequest) error {
992 if req.Name == "" {
993 return NewValidationError("name", "required")
994 }
995
996 // DNS label limit: 63 characters per label
997 // Community handle format: {name}.community.{instanceDomain}
998 // The first label is just req.Name, so it must be <= 63 chars
999 if len(req.Name) > 63 {
1000 return NewValidationError("name", "must be 63 characters or less (DNS label limit)")
1001 }
1002
1003 // Name can only contain alphanumeric and hyphens
1004 // Must start and end with alphanumeric (not hyphen)
1005 nameRegex := regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
1006 if !nameRegex.MatchString(req.Name) {
1007 return NewValidationError("name", "must contain only alphanumeric characters and hyphens")
1008 }
1009
1010 if req.Description != "" && len(req.Description) > 3000 {
1011 return NewValidationError("description", "must be 3000 characters or less")
1012 }
1013
1014 // Visibility should already be set with default in CreateCommunity
1015 if req.Visibility != "public" && req.Visibility != "unlisted" && req.Visibility != "private" {
1016 return ErrInvalidVisibility
1017 }
1018
1019 if req.CreatedByDID == "" {
1020 return NewValidationError("createdByDid", "required")
1021 }
1022
1023 // hostedByDID is auto-populated by the service layer, no validation needed
1024 // The handler ensures clients cannot provide this field
1025
1026 return nil
1027}
1028
1029// PDS write-forward helpers
1030
1031// createRecordOnPDSAs creates a record with a specific access token (for V2 community auth)
1032func (s *communityService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) {
1033 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/"))
1034
1035 payload := map[string]interface{}{
1036 "repo": repoDID,
1037 "collection": collection,
1038 "record": record,
1039 }
1040
1041 if rkey != "" {
1042 payload["rkey"] = rkey
1043 }
1044
1045 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
1046}
1047
1048// putRecordOnPDSAs updates a record with a specific access token (for V2 community auth)
1049func (s *communityService) putRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) {
1050 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.putRecord", strings.TrimSuffix(s.pdsURL, "/"))
1051
1052 payload := map[string]interface{}{
1053 "repo": repoDID,
1054 "collection": collection,
1055 "rkey": rkey,
1056 "record": record,
1057 }
1058
1059 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
1060}
1061
1062// deleteRecordOnPDSAs deletes a record with a specific access token (for user-scoped deletions)
1063func (s *communityService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey, accessToken string) error {
1064 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/"))
1065
1066 payload := map[string]interface{}{
1067 "repo": repoDID,
1068 "collection": collection,
1069 "rkey": rkey,
1070 }
1071
1072 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
1073 return err
1074}
1075
1076// callPDSWithAuth makes a PDS call with a specific access token (V2: for community authentication)
1077func (s *communityService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) {
1078 jsonData, err := json.Marshal(payload)
1079 if err != nil {
1080 return "", "", fmt.Errorf("failed to marshal payload: %w", err)
1081 }
1082
1083 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData))
1084 if err != nil {
1085 return "", "", fmt.Errorf("failed to create request: %w", err)
1086 }
1087 req.Header.Set("Content-Type", "application/json")
1088
1089 // Add authentication with provided access token
1090 if accessToken != "" {
1091 req.Header.Set("Authorization", "Bearer "+accessToken)
1092 }
1093
1094 // Dynamic timeout based on operation type
1095 // Write operations (createAccount, createRecord, putRecord) are slower due to:
1096 // - Keypair generation
1097 // - DID PLC registration
1098 // - Database writes on PDS
1099 timeout := 10 * time.Second // Default for read operations
1100 if strings.Contains(endpoint, "createAccount") ||
1101 strings.Contains(endpoint, "createRecord") ||
1102 strings.Contains(endpoint, "putRecord") {
1103 timeout = 30 * time.Second // Extended timeout for write operations
1104 }
1105
1106 client := &http.Client{Timeout: timeout}
1107 resp, err := client.Do(req)
1108 if err != nil {
1109 return "", "", fmt.Errorf("failed to call PDS: %w", err)
1110 }
1111 defer func() {
1112 if closeErr := resp.Body.Close(); closeErr != nil {
1113 log.Printf("Failed to close response body: %v", closeErr)
1114 }
1115 }()
1116
1117 body, err := io.ReadAll(resp.Body)
1118 if err != nil {
1119 return "", "", fmt.Errorf("failed to read response: %w", err)
1120 }
1121
1122 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
1123 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
1124 }
1125
1126 // Parse response to extract URI and CID
1127 var result struct {
1128 URI string `json:"uri"`
1129 CID string `json:"cid"`
1130 }
1131 if err := json.Unmarshal(body, &result); err != nil {
1132 // For delete operations, there might not be a response body
1133 if method == "POST" && strings.Contains(endpoint, "deleteRecord") {
1134 return "", "", nil
1135 }
1136 return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
1137 }
1138
1139 return result.URI, result.CID, nil
1140}
1141
1142// Helper functions