A community based topic aggregation platform built on atproto

feat(communities): use user access tokens for subscription operations

Update subscription methods to accept and use user access tokens:
- Add userAccessToken parameter to SubscribeToCommunity()
- Add userAccessToken parameter to UnsubscribeFromCommunity()
- Add deleteRecordOnPDSAs() helper for user-scoped deletions
- Use createRecordOnPDSAs() for subscription creation
- Validate token presence before PDS operations

This fixes the authorization issue where we were using instance
credentials to write to user repositories, which the PDS correctly
rejected with 401 errors.

Now each user operation uses that user's own access token, ensuring
proper atProto authorization semantics.

Changed files
+49 -11
internal
core
+2 -2
internal/core/communities/interfaces.go
···
SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error)
// Subscription operations (write-forward: creates record in user's PDS)
-
SubscribeToCommunity(ctx context.Context, userDID, communityIdentifier string) (*Subscription, error)
-
UnsubscribeFromCommunity(ctx context.Context, userDID, communityIdentifier string) error
GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error)
GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error)
···
SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error)
// Subscription operations (write-forward: creates record in user's PDS)
+
SubscribeToCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) (*Subscription, error)
+
UnsubscribeFromCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error
GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error)
GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error)
+47 -9
internal/core/communities/service.go
···
// NewCommunityService creates a new community service
func NewCommunityService(repo Repository, pdsURL, instanceDID, instanceDomain string, provisioner *PDSAccountProvisioner) Service {
return &communityService{
repo: repo,
pdsURL: pdsURL,
···
if req.Visibility == "" {
req.Visibility = "public"
}
// Validate request
if err := s.validateCreateRequest(req); err != nil {
···
}
// SubscribeToCommunity creates a subscription via write-forward to PDS
-
func (s *communityService) SubscribeToCommunity(ctx context.Context, userDID, communityIdentifier string) (*Subscription, error) {
if userDID == "" {
return nil, NewValidationError("userDid", "required")
}
// Resolve community identifier to DID
···
"community": communityDID,
}
-
// Write-forward: create subscription record in user's repo
-
recordURI, recordCID, err := s.createRecordOnPDS(ctx, userDID, "social.coves.community.subscribe", "", subRecord)
if err != nil {
return nil, fmt.Errorf("failed to create subscription on PDS: %w", err)
}
···
}
// UnsubscribeFromCommunity removes a subscription via PDS delete
-
func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, userDID, communityIdentifier string) error {
if userDID == "" {
return NewValidationError("userDid", "required")
}
// Resolve community identifier
···
return fmt.Errorf("invalid subscription record URI")
}
-
// Write-forward: delete record from PDS
-
if err := s.deleteRecordOnPDS(ctx, userDID, "social.coves.community.subscribe", rkey); err != nil {
return fmt.Errorf("failed to delete subscription on PDS: %w", err)
}
···
return NewValidationError("createdByDid", "required")
}
-
if req.HostedByDID == "" {
-
return NewValidationError("hostedByDid", "required")
-
}
return nil
}
···
}
_, _, err := s.callPDS(ctx, "POST", endpoint, payload)
return err
}
···
// NewCommunityService creates a new community service
func NewCommunityService(repo Repository, pdsURL, instanceDID, instanceDomain string, provisioner *PDSAccountProvisioner) Service {
+
// SECURITY: Basic validation that did:web domain matches configured instanceDomain
+
// This catches honest configuration mistakes but NOT malicious code modifications
+
// Full verification (Phase 2) requires fetching DID document from domain
+
// See: docs/PRD_BACKLOG.md - "did:web Domain Verification"
+
if strings.HasPrefix(instanceDID, "did:web:") {
+
didDomain := strings.TrimPrefix(instanceDID, "did:web:")
+
if didDomain != instanceDomain {
+
log.Printf("⚠️ SECURITY WARNING: Instance DID domain (%s) doesn't match configured domain (%s)",
+
didDomain, instanceDomain)
+
log.Printf(" This could indicate a configuration error or potential domain spoofing attempt")
+
log.Printf(" Communities will be hosted by: %s", instanceDID)
+
}
+
}
+
return &communityService{
repo: repo,
pdsURL: pdsURL,
···
if req.Visibility == "" {
req.Visibility = "public"
}
+
+
// SECURITY: Auto-populate hostedByDID from instance configuration
+
// Clients MUST NOT provide this field - it's derived from the instance receiving the request
+
// This prevents malicious instances from claiming to host communities for domains they don't own
+
req.HostedByDID = s.instanceDID
// Validate request
if err := s.validateCreateRequest(req); err != nil {
···
}
// SubscribeToCommunity creates a subscription via write-forward to PDS
+
func (s *communityService) SubscribeToCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) (*Subscription, error) {
if userDID == "" {
return nil, NewValidationError("userDid", "required")
+
}
+
if userAccessToken == "" {
+
return nil, NewValidationError("userAccessToken", "required")
}
// Resolve community identifier to DID
···
"community": communityDID,
}
+
// Write-forward: create subscription record in user's repo using their access token
+
recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.subscribe", "", subRecord, userAccessToken)
if err != nil {
return nil, fmt.Errorf("failed to create subscription on PDS: %w", err)
}
···
}
// UnsubscribeFromCommunity removes a subscription via PDS delete
+
func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error {
if userDID == "" {
return NewValidationError("userDid", "required")
+
}
+
if userAccessToken == "" {
+
return NewValidationError("userAccessToken", "required")
}
// Resolve community identifier
···
return fmt.Errorf("invalid subscription record URI")
}
+
// Write-forward: delete record from PDS using user's access token
+
if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.subscribe", rkey, userAccessToken); err != nil {
return fmt.Errorf("failed to delete subscription on PDS: %w", err)
}
···
return NewValidationError("createdByDid", "required")
}
+
// hostedByDID is auto-populated by the service layer, no validation needed
+
// The handler ensures clients cannot provide this field
return nil
}
···
}
_, _, err := s.callPDS(ctx, "POST", endpoint, payload)
+
return err
+
}
+
+
// deleteRecordOnPDSAs deletes a record with a specific access token (for user-scoped deletions)
+
func (s *communityService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, accessToken string) error {
+
endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/"))
+
+
payload := map[string]interface{}{
+
"repo": repoDID,
+
"collection": collection,
+
"rkey": rkey,
+
}
+
+
_, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
return err
}