A community based topic aggregation platform built on atproto
1package communities
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log"
11 "net/http"
12 "regexp"
13 "strings"
14 "sync"
15 "time"
16
17 "Coves/internal/atproto/utils"
18)
19
20// Community handle validation regex (DNS-valid handle: name.community.instance.com)
21// Matches standard DNS hostname format (RFC 1035)
22var communityHandleRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
23
24// DNS label validation (RFC 1035: 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen)
25var dnsLabelRegex = regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
26
27// Domain validation (simplified - checks for valid DNS hostname structure)
28var domainRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)*[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
29
30type communityService struct {
31 // Interfaces and pointers first (better alignment)
32 repo Repository
33 provisioner *PDSAccountProvisioner
34
35 // Token refresh concurrency control
36 // Each community gets its own mutex to prevent concurrent refresh attempts
37 refreshMutexes map[string]*sync.Mutex
38
39 // Strings
40 pdsURL string
41 instanceDID string
42 instanceDomain string
43 pdsAccessToken string
44
45 // Sync primitives last
46 mapMutex sync.RWMutex // Protects refreshMutexes map itself
47}
48
49const (
50 // Maximum recommended size for mutex cache (warning threshold, not hard limit)
51 // At 10,000 entries × 16 bytes = ~160KB memory (negligible overhead)
52 // Map can grow larger in production - even 100,000 entries = 1.6MB is acceptable
53 maxMutexCacheSize = 10000
54)
55
56// NewCommunityService creates a new community service
57func NewCommunityService(repo Repository, pdsURL, instanceDID, instanceDomain string, provisioner *PDSAccountProvisioner) Service {
58 // SECURITY: Basic validation that did:web domain matches configured instanceDomain
59 // This catches honest configuration mistakes but NOT malicious code modifications
60 // Full verification (Phase 2) requires fetching DID document from domain
61 // See: docs/PRD_BACKLOG.md - "did:web Domain Verification"
62 if strings.HasPrefix(instanceDID, "did:web:") {
63 didDomain := strings.TrimPrefix(instanceDID, "did:web:")
64 if didDomain != instanceDomain {
65 log.Printf("⚠️ SECURITY WARNING: Instance DID domain (%s) doesn't match configured domain (%s)",
66 didDomain, instanceDomain)
67 log.Printf(" This could indicate a configuration error or potential domain spoofing attempt")
68 log.Printf(" Communities will be hosted by: %s", instanceDID)
69 }
70 }
71
72 return &communityService{
73 repo: repo,
74 pdsURL: pdsURL,
75 instanceDID: instanceDID,
76 instanceDomain: instanceDomain,
77 provisioner: provisioner,
78 refreshMutexes: make(map[string]*sync.Mutex),
79 }
80}
81
82// SetPDSAccessToken sets the PDS access token for authentication
83// This should be called after creating a session for the Coves instance DID on the PDS
84func (s *communityService) SetPDSAccessToken(token string) {
85 s.pdsAccessToken = token
86}
87
88// CreateCommunity creates a new community via write-forward to PDS
89// V2 Flow:
90// 1. Service creates PDS account for community (PDS generates signing keypair)
91// 2. Service writes community profile to COMMUNITY's own repository
92// 3. Firehose emits event
93// 4. Consumer indexes to AppView DB
94//
95// V2 Architecture:
96// - Community owns its own repository (at://community_did/social.coves.community.profile/self)
97// - PDS manages the signing keypair (we never see it)
98// - We store PDS credentials to act on behalf of the community
99// - Community can migrate to other instances (future V2.1 with rotation keys)
100func (s *communityService) CreateCommunity(ctx context.Context, req CreateCommunityRequest) (*Community, error) {
101 // Apply defaults before validation
102 if req.Visibility == "" {
103 req.Visibility = "public"
104 }
105
106 // SECURITY: Auto-populate hostedByDID from instance configuration
107 // Clients MUST NOT provide this field - it's derived from the instance receiving the request
108 // This prevents malicious instances from claiming to host communities for domains they don't own
109 req.HostedByDID = s.instanceDID
110
111 // Validate request
112 if err := s.validateCreateRequest(req); err != nil {
113 return nil, err
114 }
115
116 // V2: Provision a real PDS account for this community
117 // This calls com.atproto.server.createAccount internally
118 // The PDS will:
119 // 1. Generate a signing keypair (stored in PDS, we never see it)
120 // 2. Create a DID (did:plc:xxx)
121 // 3. Return credentials (DID, tokens)
122 pdsAccount, err := s.provisioner.ProvisionCommunityAccount(ctx, req.Name)
123 if err != nil {
124 return nil, fmt.Errorf("failed to provision PDS account for community: %w", err)
125 }
126
127 // Validate the atProto handle
128 if validateErr := s.ValidateHandle(pdsAccount.Handle); validateErr != nil {
129 return nil, fmt.Errorf("generated atProto handle is invalid: %w", validateErr)
130 }
131
132 // Build community profile record
133 profile := map[string]interface{}{
134 "$type": "social.coves.community.profile",
135 "name": req.Name, // Short name for !mentions (e.g., "gaming")
136 "visibility": req.Visibility,
137 "hostedBy": s.instanceDID, // V2: Instance hosts, community owns
138 "createdBy": req.CreatedByDID,
139 "createdAt": time.Now().Format(time.RFC3339),
140 "federation": map[string]interface{}{
141 "allowExternalDiscovery": req.AllowExternalDiscovery,
142 },
143 }
144
145 // Add optional fields
146 if req.DisplayName != "" {
147 profile["displayName"] = req.DisplayName
148 }
149 if req.Description != "" {
150 profile["description"] = req.Description
151 }
152 if len(req.Rules) > 0 {
153 profile["rules"] = req.Rules
154 }
155 if len(req.Categories) > 0 {
156 profile["categories"] = req.Categories
157 }
158 if req.Language != "" {
159 profile["language"] = req.Language
160 }
161
162 // TODO: Handle avatar and banner blobs
163 // For now, we'll skip blob uploads. This would require:
164 // 1. Upload blob to PDS via com.atproto.repo.uploadBlob
165 // 2. Get blob ref (CID)
166 // 3. Add to profile record
167
168 // V2: Write to COMMUNITY's own repository (not instance repo!)
169 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self
170 // Authenticate using community's access token
171 recordURI, recordCID, err := s.createRecordOnPDSAs(
172 ctx,
173 pdsAccount.DID, // repo = community's DID (community owns its repo!)
174 "social.coves.community.profile",
175 "self", // canonical rkey for profile
176 profile,
177 pdsAccount.AccessToken, // authenticate as the community
178 )
179 if err != nil {
180 return nil, fmt.Errorf("failed to create community profile record: %w", err)
181 }
182
183 // Build Community object with PDS credentials AND cryptographic keys
184 community := &Community{
185 DID: pdsAccount.DID, // Community's DID (owns the repo!)
186 Handle: pdsAccount.Handle, // atProto handle (e.g., gaming.community.coves.social)
187 Name: req.Name,
188 DisplayName: req.DisplayName,
189 Description: req.Description,
190 OwnerDID: pdsAccount.DID, // V2: Community owns itself
191 CreatedByDID: req.CreatedByDID,
192 HostedByDID: req.HostedByDID,
193 PDSEmail: pdsAccount.Email,
194 PDSPassword: pdsAccount.Password,
195 PDSAccessToken: pdsAccount.AccessToken,
196 PDSRefreshToken: pdsAccount.RefreshToken,
197 PDSURL: pdsAccount.PDSURL,
198 Visibility: req.Visibility,
199 AllowExternalDiscovery: req.AllowExternalDiscovery,
200 MemberCount: 0,
201 SubscriberCount: 0,
202 CreatedAt: time.Now(),
203 UpdatedAt: time.Now(),
204 RecordURI: recordURI,
205 RecordCID: recordCID,
206 // V2: Cryptographic keys for portability (will be encrypted by repository)
207 RotationKeyPEM: pdsAccount.RotationKeyPEM, // CRITICAL: Enables DID migration
208 SigningKeyPEM: pdsAccount.SigningKeyPEM, // For atproto operations
209 }
210
211 // CRITICAL: Persist PDS credentials immediately to database
212 // The Jetstream consumer will eventually index the community profile from the firehose,
213 // but it won't have the PDS credentials. We must store them now so we can:
214 // 1. Update the community profile later (using its own credentials)
215 // 2. Re-authenticate if access tokens expire
216 _, err = s.repo.Create(ctx, community)
217 if err != nil {
218 return nil, fmt.Errorf("failed to persist community with credentials: %w", err)
219 }
220
221 return community, nil
222}
223
224// GetCommunity retrieves a community from AppView DB
225// identifier can be either a DID or handle
226func (s *communityService) GetCommunity(ctx context.Context, identifier string) (*Community, error) {
227 if identifier == "" {
228 return nil, ErrInvalidInput
229 }
230
231 // Determine if identifier is DID or handle
232 if strings.HasPrefix(identifier, "did:") {
233 return s.repo.GetByDID(ctx, identifier)
234 }
235
236 if strings.HasPrefix(identifier, "!") {
237 return s.repo.GetByHandle(ctx, identifier)
238 }
239
240 return nil, NewValidationError("identifier", "must be a DID or handle")
241}
242
243// GetByDID retrieves a community by its DID
244// Exported for use by post service when validating community references
245func (s *communityService) GetByDID(ctx context.Context, did string) (*Community, error) {
246 if did == "" {
247 return nil, ErrInvalidInput
248 }
249
250 if !strings.HasPrefix(did, "did:") {
251 return nil, NewValidationError("did", "must be a valid DID")
252 }
253
254 return s.repo.GetByDID(ctx, did)
255}
256
257// UpdateCommunity updates a community via write-forward to PDS
258func (s *communityService) UpdateCommunity(ctx context.Context, req UpdateCommunityRequest) (*Community, error) {
259 if req.CommunityDID == "" {
260 return nil, NewValidationError("communityDid", "required")
261 }
262
263 if req.UpdatedByDID == "" {
264 return nil, NewValidationError("updatedByDid", "required")
265 }
266
267 // Get existing community
268 existing, err := s.repo.GetByDID(ctx, req.CommunityDID)
269 if err != nil {
270 return nil, err
271 }
272
273 // CRITICAL: Ensure fresh PDS access token before write operation
274 // Community PDS tokens expire every ~2 hours and must be refreshed
275 existing, err = s.EnsureFreshToken(ctx, existing)
276 if err != nil {
277 return nil, fmt.Errorf("failed to ensure fresh credentials: %w", err)
278 }
279
280 // Authorization: verify user is the creator
281 // TODO(Communities-Auth): Add moderator check when moderation system is implemented
282 if existing.CreatedByDID != req.UpdatedByDID {
283 return nil, ErrUnauthorized
284 }
285
286 // Build updated profile record (start with existing)
287 profile := map[string]interface{}{
288 "$type": "social.coves.community.profile",
289 "name": existing.Name,
290 "owner": existing.OwnerDID,
291 "createdBy": existing.CreatedByDID,
292 "hostedBy": existing.HostedByDID,
293 "createdAt": existing.CreatedAt.Format(time.RFC3339),
294 }
295
296 // Apply updates
297 if req.DisplayName != nil {
298 profile["displayName"] = *req.DisplayName
299 } else {
300 profile["displayName"] = existing.DisplayName
301 }
302
303 if req.Description != nil {
304 profile["description"] = *req.Description
305 } else {
306 profile["description"] = existing.Description
307 }
308
309 if req.Visibility != nil {
310 profile["visibility"] = *req.Visibility
311 } else {
312 profile["visibility"] = existing.Visibility
313 }
314
315 if req.AllowExternalDiscovery != nil {
316 profile["federation"] = map[string]interface{}{
317 "allowExternalDiscovery": *req.AllowExternalDiscovery,
318 }
319 } else {
320 profile["federation"] = map[string]interface{}{
321 "allowExternalDiscovery": existing.AllowExternalDiscovery,
322 }
323 }
324
325 // Preserve moderation settings (even if empty)
326 // These fields are optional but should not be erased on update
327 if req.ModerationType != nil {
328 profile["moderationType"] = *req.ModerationType
329 } else if existing.ModerationType != "" {
330 profile["moderationType"] = existing.ModerationType
331 }
332
333 if len(req.ContentWarnings) > 0 {
334 profile["contentWarnings"] = req.ContentWarnings
335 } else if len(existing.ContentWarnings) > 0 {
336 profile["contentWarnings"] = existing.ContentWarnings
337 }
338
339 // V2: Community profiles always use "self" as rkey
340 // (No need to extract from URI - it's always "self" for V2 communities)
341
342 // V2 CRITICAL FIX: Write-forward using COMMUNITY's own DID and credentials
343 // Repository: at://COMMUNITY_DID/social.coves.community.profile/self
344 // Authenticate as the community (not as instance!)
345 if existing.PDSAccessToken == "" {
346 return nil, fmt.Errorf("community %s missing PDS credentials - cannot update", existing.DID)
347 }
348
349 recordURI, recordCID, err := s.putRecordOnPDSAs(
350 ctx,
351 existing.DID, // repo = community's own DID (V2!)
352 "social.coves.community.profile",
353 "self", // V2: always "self"
354 profile,
355 existing.PDSAccessToken, // authenticate as the community
356 )
357 if err != nil {
358 return nil, fmt.Errorf("failed to update community on PDS: %w", err)
359 }
360
361 // Return updated community representation
362 // Actual AppView DB update happens via Jetstream consumer
363 updated := *existing
364 if req.DisplayName != nil {
365 updated.DisplayName = *req.DisplayName
366 }
367 if req.Description != nil {
368 updated.Description = *req.Description
369 }
370 if req.Visibility != nil {
371 updated.Visibility = *req.Visibility
372 }
373 if req.AllowExternalDiscovery != nil {
374 updated.AllowExternalDiscovery = *req.AllowExternalDiscovery
375 }
376 if req.ModerationType != nil {
377 updated.ModerationType = *req.ModerationType
378 }
379 if len(req.ContentWarnings) > 0 {
380 updated.ContentWarnings = req.ContentWarnings
381 }
382 updated.RecordURI = recordURI
383 updated.RecordCID = recordCID
384 updated.UpdatedAt = time.Now()
385
386 return &updated, nil
387}
388
389// getOrCreateRefreshMutex returns a mutex for the given community DID
390// Thread-safe with read-lock fast path for existing entries
391// SAFETY: Does NOT evict entries to avoid race condition where:
392// 1. Thread A holds mutex for community-123
393// 2. Thread B evicts community-123 from map
394// 3. Thread C creates NEW mutex for community-123
395// 4. Now two threads can refresh community-123 concurrently (mutex defeated!)
396func (s *communityService) getOrCreateRefreshMutex(did string) *sync.Mutex {
397 // Fast path: check if mutex already exists (read lock)
398 s.mapMutex.RLock()
399 mutex, exists := s.refreshMutexes[did]
400 s.mapMutex.RUnlock()
401
402 if exists {
403 return mutex
404 }
405
406 // Slow path: create new mutex (write lock)
407 s.mapMutex.Lock()
408 defer s.mapMutex.Unlock()
409
410 // Double-check after acquiring write lock (another goroutine might have created it)
411 mutex, exists = s.refreshMutexes[did]
412 if exists {
413 return mutex
414 }
415
416 // Create new mutex
417 mutex = &sync.Mutex{}
418 s.refreshMutexes[did] = mutex
419
420 // SAFETY: No eviction to prevent race condition
421 // Map will grow beyond maxMutexCacheSize but this is safer than evicting in-use mutexes
422 if len(s.refreshMutexes) > maxMutexCacheSize {
423 memoryKB := len(s.refreshMutexes) * 16 / 1024
424 log.Printf("[TOKEN-REFRESH] WARN: Mutex cache size (%d) exceeds recommended limit (%d) - this is safe but may indicate high community churn. Memory usage: ~%d KB",
425 len(s.refreshMutexes), maxMutexCacheSize, memoryKB)
426 }
427
428 return mutex
429}
430
431// ensureFreshToken checks if a community's access token needs refresh and updates if needed
432// Returns updated community with fresh credentials (or original if no refresh needed)
433// Thread-safe: Uses per-community mutex to prevent concurrent refresh attempts
434// EnsureFreshToken ensures the community's PDS access token is valid
435// Exported for use by post service when writing posts to community repos
436func (s *communityService) EnsureFreshToken(ctx context.Context, community *Community) (*Community, error) {
437 // Get or create mutex for this specific community DID
438 mutex := s.getOrCreateRefreshMutex(community.DID)
439
440 // Lock for this specific community (allows other communities to refresh concurrently)
441 mutex.Lock()
442 defer mutex.Unlock()
443
444 // Re-fetch community from DB (another goroutine might have already refreshed it)
445 fresh, err := s.repo.GetByDID(ctx, community.DID)
446 if err != nil {
447 return nil, fmt.Errorf("failed to re-fetch community: %w", err)
448 }
449
450 // Check if token needs refresh (5-minute buffer before expiration)
451 needsRefresh, err := NeedsRefresh(fresh.PDSAccessToken)
452 if err != nil {
453 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_parse_failed, Error: %v", fresh.DID, err)
454 return nil, fmt.Errorf("failed to check token expiration: %w", err)
455 }
456
457 if !needsRefresh {
458 // Token still valid, no refresh needed
459 return fresh, nil
460 }
461
462 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refresh_started, Message: Access token expiring soon", fresh.DID)
463
464 // Attempt token refresh using refresh token
465 newAccessToken, newRefreshToken, err := refreshPDSToken(ctx, fresh.PDSURL, fresh.PDSAccessToken, fresh.PDSRefreshToken)
466 if err != nil {
467 // Check if refresh token expired (need password fallback)
468 // Match both "ExpiredToken" and "Token has expired" error messages
469 if strings.Contains(strings.ToLower(err.Error()), "expired") {
470 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_token_expired, Message: Re-authenticating with password", fresh.DID)
471
472 // Fallback: Re-authenticate with stored password
473 newAccessToken, newRefreshToken, err = reauthenticateWithPassword(
474 ctx,
475 fresh.PDSURL,
476 fresh.PDSEmail,
477 fresh.PDSPassword, // Retrieved decrypted from DB
478 )
479 if err != nil {
480 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_auth_failed, Error: %v", fresh.DID, err)
481 return nil, fmt.Errorf("failed to re-authenticate community: %w", err)
482 }
483
484 log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_fallback_success, Message: Re-authenticated after refresh token expiry", fresh.DID)
485 } else {
486 log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_failed, Error: %v", fresh.DID, err)
487 return nil, fmt.Errorf("failed to refresh token: %w", err)
488 }
489 }
490
491 // CRITICAL: Update database with new tokens immediately
492 // Refresh tokens are SINGLE-USE - old one is now invalid
493 // Use retry logic to handle transient DB failures
494 const maxRetries = 3
495 var updateErr error
496 for attempt := 0; attempt < maxRetries; attempt++ {
497 updateErr = s.repo.UpdateCredentials(ctx, fresh.DID, newAccessToken, newRefreshToken)
498 if updateErr == nil {
499 break // Success
500 }
501
502 log.Printf("[TOKEN-REFRESH] Community: %s, Event: db_update_retry, Attempt: %d/%d, Error: %v",
503 fresh.DID, attempt+1, maxRetries, updateErr)
504
505 if attempt < maxRetries-1 {
506 // Exponential backoff: 100ms, 200ms, 400ms
507 backoff := time.Duration(1<<attempt) * 100 * time.Millisecond
508 time.Sleep(backoff)
509 }
510 }
511
512 if updateErr != nil {
513 // CRITICAL: Community is now locked out - old refresh token invalid, new one not saved
514 log.Printf("[TOKEN-REFRESH] CRITICAL: Community %s LOCKED OUT - failed to persist credentials after %d retries: %v",
515 fresh.DID, maxRetries, updateErr)
516 // TODO: Send alert to monitoring system (add in Beta)
517 return nil, fmt.Errorf("failed to persist refreshed credentials after %d retries (COMMUNITY LOCKED OUT): %w",
518 maxRetries, updateErr)
519 }
520
521 // Return updated community object with fresh tokens
522 updatedCommunity := *fresh
523 updatedCommunity.PDSAccessToken = newAccessToken
524 updatedCommunity.PDSRefreshToken = newRefreshToken
525
526 log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refreshed, Message: Access token refreshed successfully", fresh.DID)
527
528 return &updatedCommunity, nil
529}
530
531// ListCommunities queries AppView DB for communities with filters
532func (s *communityService) ListCommunities(ctx context.Context, req ListCommunitiesRequest) ([]*Community, int, error) {
533 // Set defaults
534 if req.Limit <= 0 || req.Limit > 100 {
535 req.Limit = 50
536 }
537
538 return s.repo.List(ctx, req)
539}
540
541// SearchCommunities performs fuzzy search in AppView DB
542func (s *communityService) SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error) {
543 if req.Query == "" {
544 return nil, 0, NewValidationError("query", "search query is required")
545 }
546
547 // Set defaults
548 if req.Limit <= 0 || req.Limit > 100 {
549 req.Limit = 50
550 }
551
552 return s.repo.Search(ctx, req)
553}
554
555// SubscribeToCommunity creates a subscription via write-forward to PDS
556func (s *communityService) SubscribeToCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string, contentVisibility int) (*Subscription, error) {
557 if userDID == "" {
558 return nil, NewValidationError("userDid", "required")
559 }
560 if userAccessToken == "" {
561 return nil, NewValidationError("userAccessToken", "required")
562 }
563
564 // Clamp contentVisibility to valid range (1-5), default to 3 if 0 or invalid
565 if contentVisibility <= 0 || contentVisibility > 5 {
566 contentVisibility = 3
567 }
568
569 // Resolve community identifier to DID
570 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
571 if err != nil {
572 return nil, err
573 }
574
575 // Verify community exists
576 community, err := s.repo.GetByDID(ctx, communityDID)
577 if err != nil {
578 return nil, err
579 }
580
581 // Check visibility - can't subscribe to private communities without invitation (TODO)
582 if community.Visibility == "private" {
583 return nil, ErrUnauthorized
584 }
585
586 // Build subscription record
587 // CRITICAL: Collection is social.coves.community.subscription (RECORD TYPE), not social.coves.community.subscribe (XRPC procedure)
588 // This record will be created in the USER's repository: at://user_did/social.coves.community.subscription/{tid}
589 // Following atProto conventions, we use "subject" field to reference the community
590 subRecord := map[string]interface{}{
591 "$type": "social.coves.community.subscription",
592 "subject": communityDID, // atProto convention: "subject" for entity references
593 "createdAt": time.Now().Format(time.RFC3339),
594 "contentVisibility": contentVisibility,
595 }
596
597 // Write-forward: create subscription record in user's repo using their access token
598 // The collection parameter refers to the record type in the repository
599 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", "", subRecord, userAccessToken)
600 if err != nil {
601 return nil, fmt.Errorf("failed to create subscription on PDS: %w", err)
602 }
603
604 // Return subscription representation
605 subscription := &Subscription{
606 UserDID: userDID,
607 CommunityDID: communityDID,
608 ContentVisibility: contentVisibility,
609 SubscribedAt: time.Now(),
610 RecordURI: recordURI,
611 RecordCID: recordCID,
612 }
613
614 return subscription, nil
615}
616
617// UnsubscribeFromCommunity removes a subscription via PDS delete
618func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error {
619 if userDID == "" {
620 return NewValidationError("userDid", "required")
621 }
622 if userAccessToken == "" {
623 return NewValidationError("userAccessToken", "required")
624 }
625
626 // Resolve community identifier
627 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
628 if err != nil {
629 return err
630 }
631
632 // Get the subscription from AppView to find the record key
633 subscription, err := s.repo.GetSubscription(ctx, userDID, communityDID)
634 if err != nil {
635 return err
636 }
637
638 // Extract rkey from record URI (at://did/collection/rkey)
639 rkey := utils.ExtractRKeyFromURI(subscription.RecordURI)
640 if rkey == "" {
641 return fmt.Errorf("invalid subscription record URI")
642 }
643
644 // Write-forward: delete record from PDS using user's access token
645 // CRITICAL: Delete from social.coves.community.subscription (RECORD TYPE), not social.coves.community.unsubscribe
646 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", rkey, userAccessToken); err != nil {
647 return fmt.Errorf("failed to delete subscription on PDS: %w", err)
648 }
649
650 return nil
651}
652
653// GetUserSubscriptions queries AppView DB for user's subscriptions
654func (s *communityService) GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) {
655 if limit <= 0 || limit > 100 {
656 limit = 50
657 }
658
659 return s.repo.ListSubscriptions(ctx, userDID, limit, offset)
660}
661
662// GetCommunitySubscribers queries AppView DB for community subscribers
663func (s *communityService) GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) {
664 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
665 if err != nil {
666 return nil, err
667 }
668
669 if limit <= 0 || limit > 100 {
670 limit = 50
671 }
672
673 return s.repo.ListSubscribers(ctx, communityDID, limit, offset)
674}
675
676// GetMembership retrieves membership info from AppView DB
677func (s *communityService) GetMembership(ctx context.Context, userDID, communityIdentifier string) (*Membership, error) {
678 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
679 if err != nil {
680 return nil, err
681 }
682
683 return s.repo.GetMembership(ctx, userDID, communityDID)
684}
685
686// ListCommunityMembers queries AppView DB for members
687func (s *communityService) ListCommunityMembers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Membership, error) {
688 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
689 if err != nil {
690 return nil, err
691 }
692
693 if limit <= 0 || limit > 100 {
694 limit = 50
695 }
696
697 return s.repo.ListMembers(ctx, communityDID, limit, offset)
698}
699
700// BlockCommunity blocks a community via write-forward to PDS
701func (s *communityService) BlockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) (*CommunityBlock, error) {
702 if userDID == "" {
703 return nil, NewValidationError("userDid", "required")
704 }
705 if userAccessToken == "" {
706 return nil, NewValidationError("userAccessToken", "required")
707 }
708
709 // Resolve community identifier (also verifies community exists)
710 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
711 if err != nil {
712 return nil, err
713 }
714
715 // Build block record
716 // CRITICAL: Collection is social.coves.community.block (RECORD TYPE)
717 // This record will be created in the USER's repository: at://user_did/social.coves.community.block/{tid}
718 // Following atProto conventions and Bluesky's app.bsky.graph.block pattern
719 blockRecord := map[string]interface{}{
720 "$type": "social.coves.community.block",
721 "subject": communityDID, // DID of community being blocked
722 "createdAt": time.Now().Format(time.RFC3339),
723 }
724
725 // Write-forward: create block record in user's repo using their access token
726 // Note: We don't check for existing blocks first because:
727 // 1. The PDS may reject duplicates (depending on implementation)
728 // 2. The repository layer handles idempotency with ON CONFLICT DO NOTHING
729 // 3. This avoids a race condition where two concurrent requests both pass the check
730 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.block", "", blockRecord, userAccessToken)
731 if err != nil {
732 // Check if this is a duplicate/conflict error from PDS
733 // PDS should return 409 Conflict for duplicate records, but we also check common error messages
734 // for compatibility with different PDS implementations
735 errMsg := err.Error()
736 isDuplicate := strings.Contains(errMsg, "status 409") || // HTTP 409 Conflict
737 strings.Contains(errMsg, "duplicate") ||
738 strings.Contains(errMsg, "already exists") ||
739 strings.Contains(errMsg, "AlreadyExists")
740
741 if isDuplicate {
742 // Fetch and return existing block from our indexed view
743 existingBlock, getErr := s.repo.GetBlock(ctx, userDID, communityDID)
744 if getErr == nil {
745 // Block exists in our index - return it
746 return existingBlock, nil
747 }
748 // Only treat as "already exists" if the error is ErrBlockNotFound (race condition)
749 // Any other error (DB outage, connection failure, etc.) should bubble up
750 if errors.Is(getErr, ErrBlockNotFound) {
751 // Race condition: PDS has the block but Jetstream hasn't indexed it yet
752 // Return typed conflict error so handler can return 409 instead of 500
753 // This is normal in eventually-consistent systems
754 return nil, ErrBlockAlreadyExists
755 }
756 // Real datastore error - bubble it up so operators see the failure
757 return nil, fmt.Errorf("PDS reported duplicate block but failed to fetch from index: %w", getErr)
758 }
759 return nil, fmt.Errorf("failed to create block on PDS: %w", err)
760 }
761
762 // Return block representation
763 block := &CommunityBlock{
764 UserDID: userDID,
765 CommunityDID: communityDID,
766 BlockedAt: time.Now(),
767 RecordURI: recordURI,
768 RecordCID: recordCID,
769 }
770
771 return block, nil
772}
773
774// UnblockCommunity removes a block via PDS delete
775func (s *communityService) UnblockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error {
776 if userDID == "" {
777 return NewValidationError("userDid", "required")
778 }
779 if userAccessToken == "" {
780 return NewValidationError("userAccessToken", "required")
781 }
782
783 // Resolve community identifier
784 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
785 if err != nil {
786 return err
787 }
788
789 // Get the block from AppView to find the record key
790 block, err := s.repo.GetBlock(ctx, userDID, communityDID)
791 if err != nil {
792 return err
793 }
794
795 // Extract rkey from record URI (at://did/collection/rkey)
796 rkey := utils.ExtractRKeyFromURI(block.RecordURI)
797 if rkey == "" {
798 return fmt.Errorf("invalid block record URI")
799 }
800
801 // Write-forward: delete record from PDS using user's access token
802 if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.block", rkey, userAccessToken); err != nil {
803 return fmt.Errorf("failed to delete block on PDS: %w", err)
804 }
805
806 return nil
807}
808
809// GetBlockedCommunities queries AppView DB for user's blocks
810func (s *communityService) GetBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*CommunityBlock, error) {
811 if limit <= 0 || limit > 100 {
812 limit = 50
813 }
814
815 return s.repo.ListBlockedCommunities(ctx, userDID, limit, offset)
816}
817
818// IsBlocked checks if a user has blocked a community
819func (s *communityService) IsBlocked(ctx context.Context, userDID, communityIdentifier string) (bool, error) {
820 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
821 if err != nil {
822 return false, err
823 }
824
825 return s.repo.IsBlocked(ctx, userDID, communityDID)
826}
827
828// ValidateHandle checks if a community handle is valid
829func (s *communityService) ValidateHandle(handle string) error {
830 if handle == "" {
831 return NewValidationError("handle", "required")
832 }
833
834 if !communityHandleRegex.MatchString(handle) {
835 return ErrInvalidHandle
836 }
837
838 return nil
839}
840
841// ResolveCommunityIdentifier converts a community identifier to a DID
842// Following Bluesky's pattern with Coves extensions:
843//
844// Accepts (like Bluesky's at-identifier):
845// 1. DID: did:plc:abc123 (pass through)
846// 2. Canonical handle: gardening.community.coves.social (atProto standard)
847// 3. At-identifier: @gardening.community.coves.social (strip @ prefix)
848//
849// Coves-specific extensions:
850// 4. Scoped format: !gardening@coves.social (parse and resolve)
851//
852// Returns: DID string
853func (s *communityService) ResolveCommunityIdentifier(ctx context.Context, identifier string) (string, error) {
854 identifier = strings.TrimSpace(identifier)
855
856 if identifier == "" {
857 return "", ErrInvalidInput
858 }
859
860 // 1. DID - verify it exists and return (Bluesky standard)
861 if strings.HasPrefix(identifier, "did:") {
862 _, err := s.repo.GetByDID(ctx, identifier)
863 if err != nil {
864 if IsNotFound(err) {
865 return "", fmt.Errorf("community not found for DID %s: %w", identifier, err)
866 }
867 return "", fmt.Errorf("failed to verify community DID %s: %w", identifier, err)
868 }
869 return identifier, nil
870 }
871
872 // 2. Scoped format: !name@instance (Coves-specific)
873 if strings.HasPrefix(identifier, "!") {
874 return s.resolveScopedIdentifier(ctx, identifier)
875 }
876
877 // 3. At-identifier format: @handle (Bluesky standard - strip @ prefix)
878 identifier = strings.TrimPrefix(identifier, "@")
879
880 // 4. Canonical handle: name.community.instance.com (Bluesky standard)
881 if strings.Contains(identifier, ".") {
882 community, err := s.repo.GetByHandle(ctx, strings.ToLower(identifier))
883 if err != nil {
884 return "", fmt.Errorf("community not found for handle %s: %w", identifier, err)
885 }
886 return community.DID, nil
887 }
888
889 return "", NewValidationError("identifier", "must be a DID, handle, or scoped identifier (!name@instance)")
890}
891
892// resolveScopedIdentifier handles Coves-specific !name@instance format
893// Formats accepted:
894//
895// !gardening@coves.social -> gardening.community.coves.social
896func (s *communityService) resolveScopedIdentifier(ctx context.Context, scoped string) (string, error) {
897 // Remove ! prefix
898 scoped = strings.TrimPrefix(scoped, "!")
899
900 var name string
901 var instanceDomain string
902
903 // Parse !name@instance
904 if !strings.Contains(scoped, "@") {
905 return "", NewValidationError("identifier", "scoped identifier must include @ symbol (!name@instance)")
906 }
907
908 parts := strings.SplitN(scoped, "@", 2)
909 name = strings.TrimSpace(parts[0])
910 instanceDomain = strings.TrimSpace(parts[1])
911
912 // Validate name format
913 if name == "" {
914 return "", NewValidationError("identifier", "community name cannot be empty")
915 }
916
917 // Validate name is a valid DNS label (RFC 1035)
918 // Must be 1-63 chars, alphanumeric + hyphen, can't start/end with hyphen
919 if !isValidDNSLabel(name) {
920 return "", NewValidationError("identifier", "community name must be valid DNS label (alphanumeric and hyphens only, 1-63 chars, cannot start or end with hyphen)")
921 }
922
923 // Validate instance domain format
924 if !isValidDomain(instanceDomain) {
925 return "", NewValidationError("identifier", "invalid instance domain format")
926 }
927
928 // Normalize domain to lowercase (DNS is case-insensitive)
929 // This fixes the bug where !gardening@Coves.social would fail lookup
930 instanceDomain = strings.ToLower(instanceDomain)
931
932 // Validate the instance matches this server
933 if !s.isLocalInstance(instanceDomain) {
934 return "", NewValidationError("identifier",
935 fmt.Sprintf("community is not hosted on this instance (expected @%s)", s.instanceDomain))
936 }
937
938 // Construct canonical handle: {name}.community.{instanceDomain}
939 // Both name and instanceDomain are normalized to lowercase for consistent DB lookup
940 canonicalHandle := fmt.Sprintf("%s.community.%s",
941 strings.ToLower(name),
942 instanceDomain) // Already normalized to lowercase on line 923
943
944 // Look up by canonical handle
945 community, err := s.repo.GetByHandle(ctx, canonicalHandle)
946 if err != nil {
947 return "", fmt.Errorf("community not found for scoped identifier !%s@%s: %w", name, instanceDomain, err)
948 }
949
950 return community.DID, nil
951}
952
953// isLocalInstance checks if the provided domain matches this instance
954func (s *communityService) isLocalInstance(domain string) bool {
955 // Normalize both domains
956 domain = strings.ToLower(strings.TrimSpace(domain))
957 instanceDomain := strings.ToLower(s.instanceDomain)
958
959 // Direct match
960 return domain == instanceDomain
961}
962
963// Validation helpers
964
965// isValidDNSLabel validates that a string is a valid DNS label per RFC 1035
966// - 1-63 characters
967// - Alphanumeric and hyphens only
968// - Cannot start or end with hyphen
969func isValidDNSLabel(label string) bool {
970 return dnsLabelRegex.MatchString(label)
971}
972
973// isValidDomain validates that a string is a valid domain name
974// Simplified validation - checks basic DNS hostname structure
975func isValidDomain(domain string) bool {
976 if domain == "" || len(domain) > 253 {
977 return false
978 }
979 return domainRegex.MatchString(domain)
980}
981
982func (s *communityService) validateCreateRequest(req CreateCommunityRequest) error {
983 if req.Name == "" {
984 return NewValidationError("name", "required")
985 }
986
987 // DNS label limit: 63 characters per label
988 // Community handle format: {name}.community.{instanceDomain}
989 // The first label is just req.Name, so it must be <= 63 chars
990 if len(req.Name) > 63 {
991 return NewValidationError("name", "must be 63 characters or less (DNS label limit)")
992 }
993
994 // Name can only contain alphanumeric and hyphens
995 // Must start and end with alphanumeric (not hyphen)
996 nameRegex := regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
997 if !nameRegex.MatchString(req.Name) {
998 return NewValidationError("name", "must contain only alphanumeric characters and hyphens")
999 }
1000
1001 if req.Description != "" && len(req.Description) > 3000 {
1002 return NewValidationError("description", "must be 3000 characters or less")
1003 }
1004
1005 // Visibility should already be set with default in CreateCommunity
1006 if req.Visibility != "public" && req.Visibility != "unlisted" && req.Visibility != "private" {
1007 return ErrInvalidVisibility
1008 }
1009
1010 if req.CreatedByDID == "" {
1011 return NewValidationError("createdByDid", "required")
1012 }
1013
1014 // hostedByDID is auto-populated by the service layer, no validation needed
1015 // The handler ensures clients cannot provide this field
1016
1017 return nil
1018}
1019
1020// PDS write-forward helpers
1021
1022// createRecordOnPDSAs creates a record with a specific access token (for V2 community auth)
1023func (s *communityService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) {
1024 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/"))
1025
1026 payload := map[string]interface{}{
1027 "repo": repoDID,
1028 "collection": collection,
1029 "record": record,
1030 }
1031
1032 if rkey != "" {
1033 payload["rkey"] = rkey
1034 }
1035
1036 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
1037}
1038
1039// putRecordOnPDSAs updates a record with a specific access token (for V2 community auth)
1040func (s *communityService) putRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) {
1041 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.putRecord", strings.TrimSuffix(s.pdsURL, "/"))
1042
1043 payload := map[string]interface{}{
1044 "repo": repoDID,
1045 "collection": collection,
1046 "rkey": rkey,
1047 "record": record,
1048 }
1049
1050 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
1051}
1052
1053// deleteRecordOnPDSAs deletes a record with a specific access token (for user-scoped deletions)
1054func (s *communityService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey, accessToken string) error {
1055 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/"))
1056
1057 payload := map[string]interface{}{
1058 "repo": repoDID,
1059 "collection": collection,
1060 "rkey": rkey,
1061 }
1062
1063 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
1064 return err
1065}
1066
1067// callPDSWithAuth makes a PDS call with a specific access token (V2: for community authentication)
1068func (s *communityService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) {
1069 jsonData, err := json.Marshal(payload)
1070 if err != nil {
1071 return "", "", fmt.Errorf("failed to marshal payload: %w", err)
1072 }
1073
1074 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData))
1075 if err != nil {
1076 return "", "", fmt.Errorf("failed to create request: %w", err)
1077 }
1078 req.Header.Set("Content-Type", "application/json")
1079
1080 // Add authentication with provided access token
1081 if accessToken != "" {
1082 req.Header.Set("Authorization", "Bearer "+accessToken)
1083 }
1084
1085 // Dynamic timeout based on operation type
1086 // Write operations (createAccount, createRecord, putRecord) are slower due to:
1087 // - Keypair generation
1088 // - DID PLC registration
1089 // - Database writes on PDS
1090 timeout := 10 * time.Second // Default for read operations
1091 if strings.Contains(endpoint, "createAccount") ||
1092 strings.Contains(endpoint, "createRecord") ||
1093 strings.Contains(endpoint, "putRecord") {
1094 timeout = 30 * time.Second // Extended timeout for write operations
1095 }
1096
1097 client := &http.Client{Timeout: timeout}
1098 resp, err := client.Do(req)
1099 if err != nil {
1100 return "", "", fmt.Errorf("failed to call PDS: %w", err)
1101 }
1102 defer func() {
1103 if closeErr := resp.Body.Close(); closeErr != nil {
1104 log.Printf("Failed to close response body: %v", closeErr)
1105 }
1106 }()
1107
1108 body, err := io.ReadAll(resp.Body)
1109 if err != nil {
1110 return "", "", fmt.Errorf("failed to read response: %w", err)
1111 }
1112
1113 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
1114 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
1115 }
1116
1117 // Parse response to extract URI and CID
1118 var result struct {
1119 URI string `json:"uri"`
1120 CID string `json:"cid"`
1121 }
1122 if err := json.Unmarshal(body, &result); err != nil {
1123 // For delete operations, there might not be a response body
1124 if method == "POST" && strings.Contains(endpoint, "deleteRecord") {
1125 return "", "", nil
1126 }
1127 return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
1128 }
1129
1130 return result.URI, result.CID, nil
1131}
1132
1133// Helper functions