A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "bytes"
5 "context"
6 "database/sql"
7 "encoding/json"
8 "fmt"
9 "io"
10 "net/http"
11 "net/http/httptest"
12 "os"
13 "strings"
14 "testing"
15 "time"
16
17 "github.com/go-chi/chi/v5"
18 _ "github.com/lib/pq"
19 "github.com/pressly/goose/v3"
20
21 "Coves/internal/api/routes"
22 "Coves/internal/atproto/did"
23 "Coves/internal/atproto/jetstream"
24 "Coves/internal/core/communities"
25 "Coves/internal/db/postgres"
26)
27
28// TestCommunity_E2E is a comprehensive end-to-end test covering:
29// 1. Write-forward to PDS (service layer)
30// 2. Firehose consumer indexing
31// 3. XRPC HTTP endpoints (create, get, list)
32func TestCommunity_E2E(t *testing.T) {
33 // Skip in short mode since this requires real PDS
34 if testing.Short() {
35 t.Skip("Skipping E2E test in short mode")
36 }
37
38 // Setup test database
39 dbURL := os.Getenv("TEST_DATABASE_URL")
40 if dbURL == "" {
41 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
42 }
43
44 db, err := sql.Open("postgres", dbURL)
45 if err != nil {
46 t.Fatalf("Failed to connect to test database: %v", err)
47 }
48 defer db.Close()
49
50 // Run migrations
51 if err := goose.SetDialect("postgres"); err != nil {
52 t.Fatalf("Failed to set goose dialect: %v", err)
53 }
54 if err := goose.Up(db, "../../internal/db/migrations"); err != nil {
55 t.Fatalf("Failed to run migrations: %v", err)
56 }
57
58 // Check if PDS is running
59 pdsURL := os.Getenv("PDS_URL")
60 if pdsURL == "" {
61 pdsURL = "http://localhost:3001"
62 }
63
64 healthResp, err := http.Get(pdsURL + "/xrpc/_health")
65 if err != nil {
66 t.Skipf("PDS not running at %s: %v", pdsURL, err)
67 }
68 healthResp.Body.Close()
69
70 // Setup dependencies
71 communityRepo := postgres.NewCommunityRepository(db)
72 didGen := did.NewGenerator(true, "https://plc.directory")
73
74 // Get instance credentials
75 instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE")
76 instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD")
77 if instanceHandle == "" {
78 instanceHandle = "testuser123.local.coves.dev"
79 }
80 if instancePassword == "" {
81 instancePassword = "test-password-123"
82 }
83
84 t.Logf("🔐 Authenticating with PDS as: %s", instanceHandle)
85
86 // Authenticate to get instance DID
87 accessToken, instanceDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword)
88 if err != nil {
89 t.Fatalf("Failed to authenticate with PDS: %v", err)
90 }
91
92 t.Logf("✅ Authenticated - Instance DID: %s", instanceDID)
93
94 // Create service and consumer
95 communityService := communities.NewCommunityService(communityRepo, didGen, pdsURL, instanceDID)
96 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
97 svc.SetPDSAccessToken(accessToken)
98 }
99
100 consumer := jetstream.NewCommunityEventConsumer(communityRepo)
101
102 // Setup HTTP server with XRPC routes
103 r := chi.NewRouter()
104 routes.RegisterCommunityRoutes(r, communityService)
105 httpServer := httptest.NewServer(r)
106 defer httpServer.Close()
107
108 ctx := context.Background()
109
110 // ====================================================================================
111 // Part 1: Write-Forward to PDS (Service Layer)
112 // ====================================================================================
113 t.Run("1. Write-Forward to PDS", func(t *testing.T) {
114 communityName := fmt.Sprintf("e2e-test-%d", time.Now().UnixNano())
115
116 createReq := communities.CreateCommunityRequest{
117 Name: communityName,
118 DisplayName: "E2E Test Community",
119 Description: "Testing full E2E flow",
120 Visibility: "public",
121 CreatedByDID: instanceDID,
122 HostedByDID: instanceDID,
123 AllowExternalDiscovery: true,
124 }
125
126 t.Logf("\n📝 Creating community via service: %s", communityName)
127 community, err := communityService.CreateCommunity(ctx, createReq)
128 if err != nil {
129 t.Fatalf("Failed to create community: %v", err)
130 }
131
132 t.Logf("✅ Service returned:")
133 t.Logf(" DID: %s", community.DID)
134 t.Logf(" Handle: %s", community.Handle)
135 t.Logf(" RecordURI: %s", community.RecordURI)
136 t.Logf(" RecordCID: %s", community.RecordCID)
137
138 // Verify DID format
139 if community.DID[:8] != "did:plc:" {
140 t.Errorf("Expected did:plc DID, got: %s", community.DID)
141 }
142
143 // Verify record exists in PDS
144 t.Logf("\n📡 Querying PDS for the record...")
145
146 collection := "social.coves.community.profile"
147 rkey := extractRKeyFromURI(community.RecordURI)
148
149 getRecordURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s",
150 pdsURL, instanceDID, collection, rkey)
151
152 pdsResp, err := http.Get(getRecordURL)
153 if err != nil {
154 t.Fatalf("Failed to query PDS: %v", err)
155 }
156 defer pdsResp.Body.Close()
157
158 if pdsResp.StatusCode != http.StatusOK {
159 body, _ := io.ReadAll(pdsResp.Body)
160 t.Fatalf("PDS returned status %d: %s", pdsResp.StatusCode, string(body))
161 }
162
163 var pdsRecord struct {
164 URI string `json:"uri"`
165 CID string `json:"cid"`
166 Value map[string]interface{} `json:"value"`
167 }
168
169 if err := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); err != nil {
170 t.Fatalf("Failed to decode PDS response: %v", err)
171 }
172
173 t.Logf("✅ Record found in PDS!")
174 t.Logf(" URI: %s", pdsRecord.URI)
175 t.Logf(" CID: %s", pdsRecord.CID)
176
177 // Verify record has correct DIDs
178 if pdsRecord.Value["did"] != community.DID {
179 t.Errorf("Community DID mismatch in PDS record: expected %s, got %v",
180 community.DID, pdsRecord.Value["did"])
181 }
182
183 // ====================================================================================
184 // Part 2: Firehose Consumer Indexing
185 // ====================================================================================
186 t.Run("2. Firehose Consumer Indexing", func(t *testing.T) {
187 t.Logf("\n🔄 Simulating Jetstream firehose event...")
188
189 // Simulate firehose event (in production, this comes from Jetstream)
190 firehoseEvent := jetstream.JetstreamEvent{
191 Did: instanceDID, // Repository owner (instance DID, not community DID!)
192 TimeUS: time.Now().UnixMicro(),
193 Kind: "commit",
194 Commit: &jetstream.CommitEvent{
195 Rev: "test-rev",
196 Operation: "create",
197 Collection: collection,
198 RKey: rkey,
199 CID: pdsRecord.CID,
200 Record: pdsRecord.Value,
201 },
202 }
203
204 err := consumer.HandleEvent(ctx, &firehoseEvent)
205 if err != nil {
206 t.Fatalf("Failed to process firehose event: %v", err)
207 }
208
209 t.Logf("✅ Consumer processed event")
210
211 // Verify indexed in AppView database
212 t.Logf("\n🔍 Querying AppView database...")
213
214 indexed, err := communityRepo.GetByDID(ctx, community.DID)
215 if err != nil {
216 t.Fatalf("Community not indexed in AppView: %v", err)
217 }
218
219 t.Logf("✅ Community indexed in AppView:")
220 t.Logf(" DID: %s", indexed.DID)
221 t.Logf(" Handle: %s", indexed.Handle)
222 t.Logf(" DisplayName: %s", indexed.DisplayName)
223 t.Logf(" RecordURI: %s", indexed.RecordURI)
224
225 // Verify record_uri points to instance repo (not community repo)
226 if indexed.RecordURI[:len("at://"+instanceDID)] != "at://"+instanceDID {
227 t.Errorf("record_uri should point to instance repo, got: %s", indexed.RecordURI)
228 }
229
230 t.Logf("\n✅ Part 1 & 2 Complete: Write-Forward → PDS → Firehose → AppView ✓")
231 })
232 })
233
234 // ====================================================================================
235 // Part 3: XRPC HTTP Endpoints
236 // ====================================================================================
237 t.Run("3. XRPC HTTP Endpoints", func(t *testing.T) {
238
239 t.Run("Create via XRPC endpoint", func(t *testing.T) {
240 createReq := map[string]interface{}{
241 "name": fmt.Sprintf("xrpc-%d", time.Now().UnixNano()),
242 "displayName": "XRPC E2E Test",
243 "description": "Testing true end-to-end flow",
244 "visibility": "public",
245 "createdByDid": instanceDID,
246 "hostedByDid": instanceDID,
247 "allowExternalDiscovery": true,
248 }
249
250 reqBody, _ := json.Marshal(createReq)
251
252 // Step 1: Client POSTs to XRPC endpoint
253 t.Logf("📡 Client → POST /xrpc/social.coves.community.create")
254 resp, err := http.Post(
255 httpServer.URL+"/xrpc/social.coves.community.create",
256 "application/json",
257 bytes.NewBuffer(reqBody),
258 )
259 if err != nil {
260 t.Fatalf("Failed to POST: %v", err)
261 }
262 defer resp.Body.Close()
263
264 if resp.StatusCode != http.StatusOK {
265 body, _ := io.ReadAll(resp.Body)
266 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body))
267 }
268
269 var createResp struct {
270 URI string `json:"uri"`
271 CID string `json:"cid"`
272 DID string `json:"did"`
273 Handle string `json:"handle"`
274 }
275
276 json.NewDecoder(resp.Body).Decode(&createResp)
277
278 t.Logf("✅ XRPC response received:")
279 t.Logf(" DID: %s", createResp.DID)
280 t.Logf(" Handle: %s", createResp.Handle)
281 t.Logf(" URI: %s", createResp.URI)
282
283 // Step 2: Simulate firehose consumer picking up the event
284 t.Logf("🔄 Simulating Jetstream consumer indexing...")
285 rkey := extractRKeyFromURI(createResp.URI)
286 event := jetstream.JetstreamEvent{
287 Did: instanceDID,
288 TimeUS: time.Now().UnixMicro(),
289 Kind: "commit",
290 Commit: &jetstream.CommitEvent{
291 Rev: "test-rev",
292 Operation: "create",
293 Collection: "social.coves.community.profile",
294 RKey: rkey,
295 Record: map[string]interface{}{
296 "did": createResp.DID, // Community's DID from response
297 "handle": createResp.Handle, // Community's handle from response
298 "name": createReq["name"],
299 "displayName": createReq["displayName"],
300 "description": createReq["description"],
301 "visibility": createReq["visibility"],
302 "createdBy": createReq["createdByDid"],
303 "hostedBy": createReq["hostedByDid"],
304 "federation": map[string]interface{}{
305 "allowExternalDiscovery": createReq["allowExternalDiscovery"],
306 },
307 "createdAt": time.Now().Format(time.RFC3339),
308 },
309 CID: createResp.CID,
310 },
311 }
312 consumer.HandleEvent(context.Background(), &event)
313
314 // Step 3: Verify it's indexed in AppView
315 t.Logf("🔍 Querying AppView to verify indexing...")
316 var indexedCommunity communities.Community
317 err = db.QueryRow(`
318 SELECT did, handle, display_name, description
319 FROM communities
320 WHERE did = $1
321 `, createResp.DID).Scan(
322 &indexedCommunity.DID,
323 &indexedCommunity.Handle,
324 &indexedCommunity.DisplayName,
325 &indexedCommunity.Description,
326 )
327 if err != nil {
328 t.Fatalf("Community not indexed in AppView: %v", err)
329 }
330
331 t.Logf("✅ TRUE E2E FLOW COMPLETE:")
332 t.Logf(" Client → XRPC → PDS → Firehose → AppView ✓")
333 t.Logf(" Indexed community: %s (%s)", indexedCommunity.Handle, indexedCommunity.DisplayName)
334 })
335
336 t.Run("Get via XRPC endpoint", func(t *testing.T) {
337 // Create a community first (via service, so it's indexed)
338 community := createAndIndexCommunity(t, communityService, consumer, instanceDID)
339
340 // GET via HTTP endpoint
341 resp, err := http.Get(fmt.Sprintf("%s/xrpc/social.coves.community.get?community=%s",
342 httpServer.URL, community.DID))
343 if err != nil {
344 t.Fatalf("Failed to GET: %v", err)
345 }
346 defer resp.Body.Close()
347
348 if resp.StatusCode != http.StatusOK {
349 body, _ := io.ReadAll(resp.Body)
350 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body))
351 }
352
353 var getCommunity communities.Community
354 json.NewDecoder(resp.Body).Decode(&getCommunity)
355
356 t.Logf("✅ Retrieved via XRPC HTTP endpoint:")
357 t.Logf(" DID: %s", getCommunity.DID)
358 t.Logf(" DisplayName: %s", getCommunity.DisplayName)
359
360 if getCommunity.DID != community.DID {
361 t.Errorf("DID mismatch: expected %s, got %s", community.DID, getCommunity.DID)
362 }
363 })
364
365 t.Run("List via XRPC endpoint", func(t *testing.T) {
366 // Create and index multiple communities
367 for i := 0; i < 3; i++ {
368 createAndIndexCommunity(t, communityService, consumer, instanceDID)
369 }
370
371 resp, err := http.Get(fmt.Sprintf("%s/xrpc/social.coves.community.list?limit=10",
372 httpServer.URL))
373 if err != nil {
374 t.Fatalf("Failed to GET list: %v", err)
375 }
376 defer resp.Body.Close()
377
378 if resp.StatusCode != http.StatusOK {
379 body, _ := io.ReadAll(resp.Body)
380 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body))
381 }
382
383 var listResp struct {
384 Communities []communities.Community `json:"communities"`
385 Total int `json:"total"`
386 }
387
388 json.NewDecoder(resp.Body).Decode(&listResp)
389
390 t.Logf("✅ Listed %d communities via XRPC", len(listResp.Communities))
391
392 if len(listResp.Communities) < 3 {
393 t.Errorf("Expected at least 3 communities, got %d", len(listResp.Communities))
394 }
395 })
396
397 t.Logf("\n✅ Part 3 Complete: All XRPC HTTP endpoints working ✓")
398 })
399
400 divider := strings.Repeat("=", 70)
401 t.Logf("\n%s", divider)
402 t.Logf("✅ COMPREHENSIVE E2E TEST COMPLETE!")
403 t.Logf("%s", divider)
404 t.Logf("✓ Write-forward to PDS")
405 t.Logf("✓ Record stored with correct DIDs (community vs instance)")
406 t.Logf("✓ Firehose consumer indexes to AppView")
407 t.Logf("✓ XRPC create endpoint (HTTP)")
408 t.Logf("✓ XRPC get endpoint (HTTP)")
409 t.Logf("✓ XRPC list endpoint (HTTP)")
410 t.Logf("%s", divider)
411}
412
413// Helper: create and index a community (simulates full flow)
414func createAndIndexCommunity(t *testing.T, service communities.Service, consumer *jetstream.CommunityEventConsumer, instanceDID string) *communities.Community {
415 req := communities.CreateCommunityRequest{
416 Name: fmt.Sprintf("test-%d", time.Now().UnixNano()),
417 DisplayName: "Test Community",
418 Description: "Test",
419 Visibility: "public",
420 CreatedByDID: instanceDID,
421 HostedByDID: instanceDID,
422 AllowExternalDiscovery: true,
423 }
424
425 community, err := service.CreateCommunity(context.Background(), req)
426 if err != nil {
427 t.Fatalf("Failed to create: %v", err)
428 }
429
430 // Fetch from PDS to get full record
431 pdsURL := "http://localhost:3001"
432 collection := "social.coves.community.profile"
433 rkey := extractRKeyFromURI(community.RecordURI)
434
435 pdsResp, _ := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s",
436 pdsURL, instanceDID, collection, rkey))
437 defer pdsResp.Body.Close()
438
439 var pdsRecord struct {
440 CID string `json:"cid"`
441 Value map[string]interface{} `json:"value"`
442 }
443 json.NewDecoder(pdsResp.Body).Decode(&pdsRecord)
444
445 // Simulate firehose event
446 event := jetstream.JetstreamEvent{
447 Did: instanceDID,
448 TimeUS: time.Now().UnixMicro(),
449 Kind: "commit",
450 Commit: &jetstream.CommitEvent{
451 Rev: "test",
452 Operation: "create",
453 Collection: collection,
454 RKey: rkey,
455 CID: pdsRecord.CID,
456 Record: pdsRecord.Value,
457 },
458 }
459
460 consumer.HandleEvent(context.Background(), &event)
461
462 return community
463}
464
465func extractRKeyFromURI(uri string) string {
466 // at://did/collection/rkey -> rkey
467 parts := strings.Split(uri, "/")
468 if len(parts) >= 4 {
469 return parts[len(parts)-1]
470 }
471 return ""
472}
473
474// authenticateWithPDS authenticates with the PDS and returns access token and DID
475func authenticateWithPDS(pdsURL, handle, password string) (string, string, error) {
476 // Call com.atproto.server.createSession
477 sessionReq := map[string]string{
478 "identifier": handle,
479 "password": password,
480 }
481
482 reqBody, _ := json.Marshal(sessionReq)
483 resp, err := http.Post(
484 pdsURL+"/xrpc/com.atproto.server.createSession",
485 "application/json",
486 bytes.NewBuffer(reqBody),
487 )
488 if err != nil {
489 return "", "", fmt.Errorf("failed to create session: %w", err)
490 }
491 defer resp.Body.Close()
492
493 if resp.StatusCode != http.StatusOK {
494 body, _ := io.ReadAll(resp.Body)
495 return "", "", fmt.Errorf("PDS auth failed (status %d): %s", resp.StatusCode, string(body))
496 }
497
498 var sessionResp struct {
499 AccessJwt string `json:"accessJwt"`
500 DID string `json:"did"`
501 }
502
503 if err := json.NewDecoder(resp.Body).Decode(&sessionResp); err != nil {
504 return "", "", fmt.Errorf("failed to decode session response: %w", err)
505 }
506
507 return sessionResp.AccessJwt, sessionResp.DID, nil
508}