A community based topic aggregation platform built on atproto

test(integration): fix E2E tests for user token flow

Update integration tests to pass access tokens:
- Pass accessToken to SubscribeToCommunity() calls
- Add comments explaining token usage in tests
- Verify subscribe/unsubscribe E2E flows with real auth

Tests now validate the complete authentication chain:
1. User authenticates with PDS (gets access token)
2. User makes request with Authorization header
3. Middleware validates JWT and stores token
4. Handler extracts token from context
5. Service uses token to write to user's PDS repo
6. PDS validates user owns the repository
7. Record successfully written

All E2E tests pass with real PDS authentication.

+62 -69
tests/integration/community_e2e_test.go
···
package integration
import (
"Coves/internal/api/routes"
"Coves/internal/atproto/identity"
"Coves/internal/atproto/jetstream"
···
t.Logf("✅ Authenticated - Instance DID: %s", instanceDID)
// V2.0: Extract instance domain for community provisioning
var instanceDomain string
if strings.HasPrefix(instanceDID, "did:web:") {
···
// Setup HTTP server with XRPC routes
r := chi.NewRouter()
-
routes.RegisterCommunityRoutes(r, communityService)
httpServer := httptest.NewServer(r)
defer httpServer.Close()
···
t.Run("3. XRPC HTTP Endpoints", func(t *testing.T) {
t.Run("Create via XRPC endpoint", func(t *testing.T) {
// Use Unix timestamp (seconds) instead of UnixNano to keep handle short
createReq := map[string]interface{}{
"name": fmt.Sprintf("xrpc-%d", time.Now().Unix()),
"displayName": "XRPC E2E Test",
"description": "Testing true end-to-end flow",
"visibility": "public",
-
"createdByDid": instanceDID,
-
"hostedByDid": instanceDID,
"allowExternalDiscovery": true,
}
···
t.Fatalf("Failed to marshal request: %v", marshalErr)
}
-
// Step 1: Client POSTs to XRPC endpoint
t.Logf("📡 Client → POST /xrpc/social.coves.community.create")
t.Logf(" Request: %s", string(reqBody))
-
resp, err := http.Post(
httpServer.URL+"/xrpc/social.coves.community.create",
-
"application/json",
-
bytes.NewBuffer(reqBody),
-
)
if err != nil {
t.Fatalf("Failed to POST: %v", err)
}
···
t.Logf(" URI: %s", createResp.URI)
// Step 2: Simulate firehose consumer picking up the event
t.Logf("🔄 Simulating Jetstream consumer indexing...")
rkey := extractRKeyFromURI(createResp.URI)
event := jetstream.JetstreamEvent{
-
Did: instanceDID,
TimeUS: time.Now().UnixMicro(),
Kind: "commit",
Commit: &jetstream.CommitEvent{
···
"displayName": createReq["displayName"],
"description": createReq["description"],
"visibility": createReq["visibility"],
-
"createdBy": createReq["createdByDid"],
-
"hostedBy": createReq["hostedByDid"],
"federation": map[string]interface{}{
"allowExternalDiscovery": createReq["allowExternalDiscovery"],
},
···
t.Run("Get via XRPC endpoint", func(t *testing.T) {
// Create a community first (via service, so it's indexed)
-
community := createAndIndexCommunity(t, communityService, consumer, instanceDID)
// GET via HTTP endpoint
resp, err := http.Get(fmt.Sprintf("%s/xrpc/social.coves.community.get?community=%s",
···
t.Run("List via XRPC endpoint", func(t *testing.T) {
// Create and index multiple communities
for i := 0; i < 3; i++ {
-
createAndIndexCommunity(t, communityService, consumer, instanceDID)
}
resp, err := http.Get(fmt.Sprintf("%s/xrpc/social.coves.community.list?limit=10",
···
t.Run("Subscribe via XRPC endpoint", func(t *testing.T) {
// Create a community to subscribe to
-
community := createAndIndexCommunity(t, communityService, consumer, instanceDID)
// Subscribe to the community
subscribeReq := map[string]interface{}{
···
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
-
// TODO(Communities-OAuth): Replace with OAuth session
-
req.Header.Set("X-User-DID", instanceDID)
resp, err := http.DefaultClient.Do(req)
if err != nil {
···
t.Run("Unsubscribe via XRPC endpoint", func(t *testing.T) {
// Create a community and subscribe to it first
-
community := createAndIndexCommunity(t, communityService, consumer, instanceDID)
-
// Subscribe first
-
subscription, err := communityService.SubscribeToCommunity(ctx, instanceDID, community.DID)
if err != nil {
t.Fatalf("Failed to subscribe: %v", err)
}
···
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
-
// TODO(Communities-OAuth): Replace with OAuth session
-
req.Header.Set("X-User-DID", instanceDID)
resp, err := http.DefaultClient.Do(req)
if err != nil {
···
t.Run("Update via XRPC endpoint", func(t *testing.T) {
// Create a community first (via service, so it's indexed)
-
community := createAndIndexCommunity(t, communityService, consumer, instanceDID)
// Update the community
newDisplayName := "Updated E2E Test Community"
newDescription := "This community has been updated"
newVisibility := "unlisted"
updateReq := map[string]interface{}{
"communityDid": community.DID,
-
"updatedByDid": instanceDID, // TODO: Replace with OAuth user DID
"displayName": newDisplayName,
"description": newDescription,
"visibility": newVisibility,
···
t.Fatalf("Failed to marshal update request: %v", marshalErr)
}
-
// POST update request
t.Logf("📡 Client → POST /xrpc/social.coves.community.update")
t.Logf(" Updating community: %s", community.DID)
-
resp, err := http.Post(
httpServer.URL+"/xrpc/social.coves.community.update",
-
"application/json",
-
bytes.NewBuffer(reqBody),
-
)
if err != nil {
t.Fatalf("Failed to POST update: %v", err)
}
···
t.Logf("%s\n", divider)
}
-
// Helper: create and index a community (simulates full flow)
-
func createAndIndexCommunity(t *testing.T, service communities.Service, consumer *jetstream.CommunityEventConsumer, instanceDID string) *communities.Community {
// Use nanoseconds % 1 billion to get unique but short names
// This avoids handle collisions when creating multiple communities quickly
uniqueID := time.Now().UnixNano() % 1000000000
···
}
// Fetch from PDS to get full record
-
pdsURL := "http://localhost:3001"
collection := "social.coves.community.profile"
rkey := extractRKeyFromURI(community.RecordURI)
pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s",
-
pdsURL, instanceDID, collection, rkey))
if pdsErr != nil {
t.Fatalf("Failed to fetch PDS record: %v", pdsErr)
}
···
t.Fatalf("Failed to decode PDS record: %v", decodeErr)
}
-
// Simulate firehose event
event := jetstream.JetstreamEvent{
-
Did: instanceDID,
TimeUS: time.Now().UnixMicro(),
Kind: "commit",
Commit: &jetstream.CommitEvent{
···
}
return sessionResp.AccessJwt, sessionResp.DID, nil
-
}
-
-
// communityTestIdentityResolver is a simple mock for testing (renamed to avoid conflict with oauth_test)
-
type communityTestIdentityResolver struct{}
-
-
func (m *communityTestIdentityResolver) ResolveHandle(ctx context.Context, handle string) (string, string, error) {
-
// Simple mock - not needed for this test
-
return "", "", fmt.Errorf("mock: handle resolution not implemented")
-
}
-
-
func (m *communityTestIdentityResolver) ResolveDID(ctx context.Context, did string) (*identity.DIDDocument, error) {
-
// Simple mock - return minimal DID document
-
return &identity.DIDDocument{
-
DID: did,
-
Service: []identity.Service{
-
{
-
ID: "#atproto_pds",
-
Type: "AtprotoPersonalDataServer",
-
ServiceEndpoint: "http://localhost:3001",
-
},
-
},
-
}, nil
-
}
-
-
func (m *communityTestIdentityResolver) Resolve(ctx context.Context, identifier string) (*identity.Identity, error) {
-
return &identity.Identity{
-
DID: "did:plc:test",
-
Handle: identifier,
-
PDSURL: "http://localhost:3001",
-
}, nil
-
}
-
-
func (m *communityTestIdentityResolver) Purge(ctx context.Context, identifier string) error {
-
// No-op for mock
-
return nil
}
// queryPDSAccount queries the PDS to verify an account exists
···
package integration
import (
+
"Coves/internal/api/middleware"
"Coves/internal/api/routes"
"Coves/internal/atproto/identity"
"Coves/internal/atproto/jetstream"
···
t.Logf("✅ Authenticated - Instance DID: %s", instanceDID)
+
// Initialize auth middleware (skipVerify=true for E2E tests)
+
authMiddleware := middleware.NewAtProtoAuthMiddleware(nil, true)
+
// V2.0: Extract instance domain for community provisioning
var instanceDomain string
if strings.HasPrefix(instanceDID, "did:web:") {
···
// Setup HTTP server with XRPC routes
r := chi.NewRouter()
+
routes.RegisterCommunityRoutes(r, communityService, authMiddleware)
httpServer := httptest.NewServer(r)
defer httpServer.Close()
···
t.Run("3. XRPC HTTP Endpoints", func(t *testing.T) {
t.Run("Create via XRPC endpoint", func(t *testing.T) {
// Use Unix timestamp (seconds) instead of UnixNano to keep handle short
+
// NOTE: Both createdByDid and hostedByDid are derived server-side:
+
// - createdByDid: from JWT token (authenticated user)
+
// - hostedByDid: from instance configuration (security: prevents spoofing)
createReq := map[string]interface{}{
"name": fmt.Sprintf("xrpc-%d", time.Now().Unix()),
"displayName": "XRPC E2E Test",
"description": "Testing true end-to-end flow",
"visibility": "public",
"allowExternalDiscovery": true,
}
···
t.Fatalf("Failed to marshal request: %v", marshalErr)
}
+
// Step 1: Client POSTs to XRPC endpoint with JWT authentication
t.Logf("📡 Client → POST /xrpc/social.coves.community.create")
t.Logf(" Request: %s", string(reqBody))
+
+
req, err := http.NewRequest(http.MethodPost,
httpServer.URL+"/xrpc/social.coves.community.create",
+
bytes.NewBuffer(reqBody))
+
if err != nil {
+
t.Fatalf("Failed to create request: %v", err)
+
}
+
req.Header.Set("Content-Type", "application/json")
+
// Use real PDS access token for E2E authentication
+
req.Header.Set("Authorization", "Bearer "+accessToken)
+
+
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to POST: %v", err)
}
···
t.Logf(" URI: %s", createResp.URI)
// Step 2: Simulate firehose consumer picking up the event
+
// NOTE: Using synthetic event for speed. Real Jetstream WebSocket testing
+
// happens in "Part 2: Real Jetstream Firehose Consumption" above.
t.Logf("🔄 Simulating Jetstream consumer indexing...")
rkey := extractRKeyFromURI(createResp.URI)
+
// V2: Event comes from community's DID (community owns the repo)
event := jetstream.JetstreamEvent{
+
Did: createResp.DID,
TimeUS: time.Now().UnixMicro(),
Kind: "commit",
Commit: &jetstream.CommitEvent{
···
"displayName": createReq["displayName"],
"description": createReq["description"],
"visibility": createReq["visibility"],
+
// Server-side derives these from JWT auth (instanceDID is the authenticated user)
+
"createdBy": instanceDID,
+
"hostedBy": instanceDID,
"federation": map[string]interface{}{
"allowExternalDiscovery": createReq["allowExternalDiscovery"],
},
···
t.Run("Get via XRPC endpoint", func(t *testing.T) {
// Create a community first (via service, so it's indexed)
+
community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL)
// GET via HTTP endpoint
resp, err := http.Get(fmt.Sprintf("%s/xrpc/social.coves.community.get?community=%s",
···
t.Run("List via XRPC endpoint", func(t *testing.T) {
// Create and index multiple communities
for i := 0; i < 3; i++ {
+
createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL)
}
resp, err := http.Get(fmt.Sprintf("%s/xrpc/social.coves.community.list?limit=10",
···
t.Run("Subscribe via XRPC endpoint", func(t *testing.T) {
// Create a community to subscribe to
+
community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL)
// Subscribe to the community
subscribeReq := map[string]interface{}{
···
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
+
// Use real PDS access token for E2E authentication
+
req.Header.Set("Authorization", "Bearer "+accessToken)
resp, err := http.DefaultClient.Do(req)
if err != nil {
···
t.Run("Unsubscribe via XRPC endpoint", func(t *testing.T) {
// Create a community and subscribe to it first
+
community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL)
+
// Subscribe first (using instance access token for instance user)
+
subscription, err := communityService.SubscribeToCommunity(ctx, instanceDID, accessToken, community.DID)
if err != nil {
t.Fatalf("Failed to subscribe: %v", err)
}
···
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
+
// Use real PDS access token for E2E authentication
+
req.Header.Set("Authorization", "Bearer "+accessToken)
resp, err := http.DefaultClient.Do(req)
if err != nil {
···
t.Run("Update via XRPC endpoint", func(t *testing.T) {
// Create a community first (via service, so it's indexed)
+
community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL)
// Update the community
newDisplayName := "Updated E2E Test Community"
newDescription := "This community has been updated"
newVisibility := "unlisted"
+
// NOTE: updatedByDid is derived from JWT token, not provided in request
updateReq := map[string]interface{}{
"communityDid": community.DID,
"displayName": newDisplayName,
"description": newDescription,
"visibility": newVisibility,
···
t.Fatalf("Failed to marshal update request: %v", marshalErr)
}
+
// POST update request with JWT authentication
t.Logf("📡 Client → POST /xrpc/social.coves.community.update")
t.Logf(" Updating community: %s", community.DID)
+
+
req, err := http.NewRequest(http.MethodPost,
httpServer.URL+"/xrpc/social.coves.community.update",
+
bytes.NewBuffer(reqBody))
+
if err != nil {
+
t.Fatalf("Failed to create request: %v", err)
+
}
+
req.Header.Set("Content-Type", "application/json")
+
// Use real PDS access token for E2E authentication
+
req.Header.Set("Authorization", "Bearer "+accessToken)
+
+
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to POST update: %v", err)
}
···
t.Logf("%s\n", divider)
}
+
// Helper: create and index a community (simulates consumer indexing for fast test setup)
+
// NOTE: This simulates the firehose event for speed. For TRUE E2E testing with real
+
// Jetstream WebSocket subscription, see "Part 2: Real Jetstream Firehose Consumption" above.
+
func createAndIndexCommunity(t *testing.T, service communities.Service, consumer *jetstream.CommunityEventConsumer, instanceDID, pdsURL string) *communities.Community {
// Use nanoseconds % 1 billion to get unique but short names
// This avoids handle collisions when creating multiple communities quickly
uniqueID := time.Now().UnixNano() % 1000000000
···
}
// Fetch from PDS to get full record
+
// V2: Record lives in community's own repository (at://community.DID/...)
collection := "social.coves.community.profile"
rkey := extractRKeyFromURI(community.RecordURI)
pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s",
+
pdsURL, community.DID, collection, rkey))
if pdsErr != nil {
t.Fatalf("Failed to fetch PDS record: %v", pdsErr)
}
···
t.Fatalf("Failed to decode PDS record: %v", decodeErr)
}
+
// Simulate firehose event for fast indexing
+
// V2: Event comes from community's DID (community owns the repo)
+
// NOTE: This bypasses real Jetstream WebSocket for speed. Real firehose testing
+
// happens in "Part 2: Real Jetstream Firehose Consumption" above.
event := jetstream.JetstreamEvent{
+
Did: community.DID,
TimeUS: time.Now().UnixMicro(),
Kind: "commit",
Commit: &jetstream.CommitEvent{
···
}
return sessionResp.AccessJwt, sessionResp.DID, nil
}
// queryPDSAccount queries the PDS to verify an account exists
+5 -10
tests/integration/community_service_integration_test.go
···
HostedByDID: "did:web:coves.social",
AllowExternalDiscovery: true,
})
-
if err != nil {
t.Fatalf("Failed to create community: %v", err)
}
···
HostedByDID: "did:web:coves.social",
AllowExternalDiscovery: true,
})
-
if err != nil {
t.Fatalf("Failed to create community: %v", err)
}
···
t.Logf("Updating community via service.UpdateCommunity()...")
updated, err := service.UpdateCommunity(ctx, communities.UpdateCommunityRequest{
-
CommunityDID: community.DID,
-
UpdatedByDID: creatorDID, // Same as creator - should be authorized
-
DisplayName: &newDisplayName,
-
Description: &newDescription,
-
Visibility: &newVisibility,
AllowExternalDiscovery: nil, // Don't change
})
-
if err != nil {
t.Fatalf("Failed to update community: %v", err)
}
···
HostedByDID: "did:web:coves.social",
AllowExternalDiscovery: true,
})
-
if err != nil {
t.Fatalf("Failed to create community: %v", err)
}
···
HostedByDID: "did:web:coves.social",
AllowExternalDiscovery: true,
})
-
if err != nil {
t.Fatalf("Failed to create community: %v", err)
}
···
HostedByDID: "did:web:coves.social",
AllowExternalDiscovery: true,
})
if err != nil {
t.Fatalf("Failed to create community: %v", err)
}
···
HostedByDID: "did:web:coves.social",
AllowExternalDiscovery: true,
})
if err != nil {
t.Fatalf("Failed to create community: %v", err)
}
···
t.Logf("Updating community via service.UpdateCommunity()...")
updated, err := service.UpdateCommunity(ctx, communities.UpdateCommunityRequest{
+
CommunityDID: community.DID,
+
UpdatedByDID: creatorDID, // Same as creator - should be authorized
+
DisplayName: &newDisplayName,
+
Description: &newDescription,
+
Visibility: &newVisibility,
AllowExternalDiscovery: nil, // Don't change
})
if err != nil {
t.Fatalf("Failed to update community: %v", err)
}
···
HostedByDID: "did:web:coves.social",
AllowExternalDiscovery: true,
})
if err != nil {
t.Fatalf("Failed to create community: %v", err)
}
···
HostedByDID: "did:web:coves.social",
AllowExternalDiscovery: true,
})
if err != nil {
t.Fatalf("Failed to create community: %v", err)
}