···
···
var communityHandleRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
type communityService struct {
24
-
provisioner *PDSAccountProvisioner
24
+
// Interfaces and pointers first (better alignment)
26
+
provisioner *PDSAccountProvisioner
28
+
// Token refresh concurrency control
29
+
// Each community gets its own mutex to prevent concurrent refresh attempts
30
+
refreshMutexes map[string]*sync.Mutex
38
+
// Sync primitives last
39
+
mapMutex sync.RWMutex // Protects refreshMutexes map itself
43
+
// Maximum recommended size for mutex cache (warning threshold, not hard limit)
44
+
// At 10,000 entries ร 16 bytes = ~160KB memory (negligible overhead)
45
+
// Map can grow larger in production - even 100,000 entries = 1.6MB is acceptable
46
+
maxMutexCacheSize = 10000
// NewCommunityService creates a new community service
func NewCommunityService(repo Repository, pdsURL, instanceDID, instanceDomain string, provisioner *PDSAccountProvisioner) Service {
···
instanceDID: instanceDID,
instanceDomain: instanceDomain,
provisioner: provisioner,
71
+
refreshMutexes: make(map[string]*sync.Mutex),
···
257
+
// CRITICAL: Ensure fresh PDS access token before write operation
258
+
// Community PDS tokens expire every ~2 hours and must be refreshed
259
+
existing, err = s.ensureFreshToken(ctx, existing)
261
+
return nil, fmt.Errorf("failed to ensure fresh credentials: %w", err)
// Authorization: verify user is the creator
// TODO(Communities-Auth): Add moderator check when moderation system is implemented
if existing.CreatedByDID != req.UpdatedByDID {
···
updated.UpdatedAt = time.Now()
378
+
// getOrCreateRefreshMutex returns a mutex for the given community DID
379
+
// Thread-safe with read-lock fast path for existing entries
380
+
// SAFETY: Does NOT evict entries to avoid race condition where:
381
+
// 1. Thread A holds mutex for community-123
382
+
// 2. Thread B evicts community-123 from map
383
+
// 3. Thread C creates NEW mutex for community-123
384
+
// 4. Now two threads can refresh community-123 concurrently (mutex defeated!)
385
+
func (s *communityService) getOrCreateRefreshMutex(did string) *sync.Mutex {
386
+
// Fast path: check if mutex already exists (read lock)
388
+
mutex, exists := s.refreshMutexes[did]
389
+
s.mapMutex.RUnlock()
395
+
// Slow path: create new mutex (write lock)
397
+
defer s.mapMutex.Unlock()
399
+
// Double-check after acquiring write lock (another goroutine might have created it)
400
+
mutex, exists = s.refreshMutexes[did]
405
+
// Create new mutex
406
+
mutex = &sync.Mutex{}
407
+
s.refreshMutexes[did] = mutex
409
+
// SAFETY: No eviction to prevent race condition
410
+
// Map will grow beyond maxMutexCacheSize but this is safer than evicting in-use mutexes
411
+
if len(s.refreshMutexes) > maxMutexCacheSize {
412
+
memoryKB := len(s.refreshMutexes) * 16 / 1024
413
+
log.Printf("[TOKEN-REFRESH] WARN: Mutex cache size (%d) exceeds recommended limit (%d) - this is safe but may indicate high community churn. Memory usage: ~%d KB",
414
+
len(s.refreshMutexes), maxMutexCacheSize, memoryKB)
420
+
// ensureFreshToken checks if a community's access token needs refresh and updates if needed
421
+
// Returns updated community with fresh credentials (or original if no refresh needed)
422
+
// Thread-safe: Uses per-community mutex to prevent concurrent refresh attempts
423
+
func (s *communityService) ensureFreshToken(ctx context.Context, community *Community) (*Community, error) {
424
+
// Get or create mutex for this specific community DID
425
+
mutex := s.getOrCreateRefreshMutex(community.DID)
427
+
// Lock for this specific community (allows other communities to refresh concurrently)
429
+
defer mutex.Unlock()
431
+
// Re-fetch community from DB (another goroutine might have already refreshed it)
432
+
fresh, err := s.repo.GetByDID(ctx, community.DID)
434
+
return nil, fmt.Errorf("failed to re-fetch community: %w", err)
437
+
// Check if token needs refresh (5-minute buffer before expiration)
438
+
needsRefresh, err := NeedsRefresh(fresh.PDSAccessToken)
440
+
log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_parse_failed, Error: %v", fresh.DID, err)
441
+
return nil, fmt.Errorf("failed to check token expiration: %w", err)
445
+
// Token still valid, no refresh needed
449
+
log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refresh_started, Message: Access token expiring soon", fresh.DID)
451
+
// Attempt token refresh using refresh token
452
+
newAccessToken, newRefreshToken, err := refreshPDSToken(ctx, fresh.PDSURL, fresh.PDSAccessToken, fresh.PDSRefreshToken)
454
+
// Check if refresh token expired (need password fallback)
455
+
if strings.Contains(err.Error(), "expired or invalid") {
456
+
log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_token_expired, Message: Re-authenticating with password", fresh.DID)
458
+
// Fallback: Re-authenticate with stored password
459
+
newAccessToken, newRefreshToken, err = reauthenticateWithPassword(
463
+
fresh.PDSPassword, // Retrieved decrypted from DB
466
+
log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_auth_failed, Error: %v", fresh.DID, err)
467
+
return nil, fmt.Errorf("failed to re-authenticate community: %w", err)
470
+
log.Printf("[TOKEN-REFRESH] Community: %s, Event: password_fallback_success, Message: Re-authenticated after refresh token expiry", fresh.DID)
472
+
log.Printf("[TOKEN-REFRESH] Community: %s, Event: refresh_failed, Error: %v", fresh.DID, err)
473
+
return nil, fmt.Errorf("failed to refresh token: %w", err)
477
+
// CRITICAL: Update database with new tokens immediately
478
+
// Refresh tokens are SINGLE-USE - old one is now invalid
479
+
// Use retry logic to handle transient DB failures
480
+
const maxRetries = 3
481
+
var updateErr error
482
+
for attempt := 0; attempt < maxRetries; attempt++ {
483
+
updateErr = s.repo.UpdateCredentials(ctx, fresh.DID, newAccessToken, newRefreshToken)
484
+
if updateErr == nil {
488
+
log.Printf("[TOKEN-REFRESH] Community: %s, Event: db_update_retry, Attempt: %d/%d, Error: %v",
489
+
fresh.DID, attempt+1, maxRetries, updateErr)
491
+
if attempt < maxRetries-1 {
492
+
// Exponential backoff: 100ms, 200ms, 400ms
493
+
backoff := time.Duration(1<<attempt) * 100 * time.Millisecond
494
+
time.Sleep(backoff)
498
+
if updateErr != nil {
499
+
// CRITICAL: Community is now locked out - old refresh token invalid, new one not saved
500
+
log.Printf("[TOKEN-REFRESH] CRITICAL: Community %s LOCKED OUT - failed to persist credentials after %d retries: %v",
501
+
fresh.DID, maxRetries, updateErr)
502
+
// TODO: Send alert to monitoring system (add in Beta)
503
+
return nil, fmt.Errorf("failed to persist refreshed credentials after %d retries (COMMUNITY LOCKED OUT): %w",
504
+
maxRetries, updateErr)
507
+
// Return updated community object with fresh tokens
508
+
updatedCommunity := *fresh
509
+
updatedCommunity.PDSAccessToken = newAccessToken
510
+
updatedCommunity.PDSRefreshToken = newRefreshToken
512
+
log.Printf("[TOKEN-REFRESH] Community: %s, Event: token_refreshed, Message: Access token refreshed successfully", fresh.DID)
514
+
return &updatedCommunity, nil
// ListCommunities queries AppView DB for communities with filters