A community based topic aggregation platform built on atproto
1package e2e
2
3import (
4 "Coves/internal/atproto/identity"
5 "Coves/internal/atproto/jetstream"
6 "Coves/internal/core/users"
7 "Coves/internal/db/postgres"
8 "bytes"
9 "context"
10 "database/sql"
11 "encoding/json"
12 "fmt"
13 "net/http"
14 "os"
15 "testing"
16 "time"
17
18 _ "github.com/lib/pq"
19 "github.com/pressly/goose/v3"
20)
21
22// TestE2E_UserSignup tests the full user signup flow:
23// Third-party client → social.coves.actor.signup XRPC → PDS account creation → Jetstream → AppView indexing
24//
25// This tests the same code path that a third-party client or UI would use.
26//
27// Prerequisites:
28// - AppView running on localhost:8081
29// - PDS running on localhost:3001
30// - Jetstream running on localhost:6008 (consuming from PDS)
31// - Test database on localhost:5434
32//
33// Run with:
34//
35// make e2e-up # Start infrastructure
36// go run ./cmd/server & # Start AppView
37// go test ./tests/integration -run TestE2E_UserSignup -v
38func TestE2E_UserSignup(t *testing.T) {
39 if testing.Short() {
40 t.Skip("Skipping E2E test in short mode")
41 }
42
43 // Check if AppView is available
44 if !isAppViewAvailable(t) {
45 t.Skip("AppView not available at localhost:8081 - run 'go run ./cmd/server' first")
46 }
47
48 // Check if PDS is available
49 if !isPDSAvailable(t) {
50 t.Skip("PDS not available at localhost:3001 - run 'make e2e-up' first")
51 }
52
53 // Check if Jetstream is available
54 if !isJetstreamAvailable(t) {
55 t.Skip("Jetstream not available at localhost:6008 - run 'make e2e-up' first")
56 }
57
58 db := setupTestDB(t)
59 defer func() {
60 if err := db.Close(); err != nil {
61 t.Logf("Failed to close database: %v", err)
62 }
63 }()
64
65 // Set up services
66 userRepo := postgres.NewUserRepository(db)
67 resolver := identity.NewResolver(db, identity.DefaultConfig())
68 userService := users.NewUserService(userRepo, resolver, "http://localhost:3001")
69
70 // Start Jetstream consumer
71 consumer := jetstream.NewUserEventConsumer(
72 userService,
73 resolver,
74 "ws://localhost:6008/subscribe",
75 "", // No PDS filter
76 )
77
78 ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
79 defer cancel()
80
81 // Start consumer in background
82 consumerDone := make(chan error, 1)
83 go func() {
84 consumerDone <- consumer.Start(ctx)
85 }()
86
87 // Give Jetstream consumer a moment to connect (no need to wait long)
88 t.Log("Waiting for Jetstream consumer to connect...")
89 time.Sleep(500 * time.Millisecond)
90
91 // Test 1: Create account on PDS
92 t.Run("Create account on PDS and verify indexing", func(t *testing.T) {
93 handle := fmt.Sprintf("alice-%d.local.coves.dev", time.Now().Unix())
94 email := fmt.Sprintf("alice-%d@test.com", time.Now().Unix())
95
96 t.Logf("Creating account: %s", handle)
97
98 // Create account via UserService (what UI would call)
99 did, err := createPDSAccount(t, userService, handle, email, "test1234")
100 if err != nil {
101 t.Fatalf("Failed to create PDS account: %v", err)
102 }
103
104 t.Logf("Account created with DID: %s", did)
105
106 // Wait for Jetstream to process and AppView to index (with retry)
107 t.Log("Waiting for Jetstream → AppView indexing...")
108 var user *users.User
109 deadline := time.Now().Add(10 * time.Second)
110 for time.Now().Before(deadline) {
111 user, err = userService.GetUserByDID(ctx, did)
112 if err == nil {
113 break // Successfully indexed!
114 }
115 time.Sleep(500 * time.Millisecond)
116 }
117 if err != nil {
118 t.Fatalf("User not indexed in AppView after 10s: %v", err)
119 }
120
121 if user.Handle != handle {
122 t.Errorf("Expected handle %s, got %s", handle, user.Handle)
123 }
124
125 if user.DID != did {
126 t.Errorf("Expected DID %s, got %s", did, user.DID)
127 }
128
129 t.Logf("✅ User successfully indexed: %s → %s", handle, did)
130 })
131
132 // Test 2: Idempotency
133 t.Run("Idempotent indexing on duplicate events", func(t *testing.T) {
134 handle := fmt.Sprintf("bob-%d.local.coves.dev", time.Now().Unix())
135 email := fmt.Sprintf("bob-%d@test.com", time.Now().Unix())
136
137 // Create account via UserService
138 did, err := createPDSAccount(t, userService, handle, email, "test1234")
139 if err != nil {
140 t.Fatalf("Failed to create PDS account: %v", err)
141 }
142
143 // Wait for indexing (with retry)
144 var user1 *users.User
145 deadline := time.Now().Add(10 * time.Second)
146 for time.Now().Before(deadline) {
147 user1, err = userService.GetUserByDID(ctx, did)
148 if err == nil {
149 break
150 }
151 time.Sleep(500 * time.Millisecond)
152 }
153 if err != nil {
154 t.Fatalf("User not indexed after 10s: %v", err)
155 }
156
157 // Manually trigger duplicate indexing (simulates Jetstream replay)
158 _, err = userService.CreateUser(ctx, users.CreateUserRequest{
159 DID: did,
160 Handle: handle,
161 PDSURL: "http://localhost:3001",
162 })
163 if err != nil {
164 t.Fatalf("Idempotent CreateUser should not error: %v", err)
165 }
166
167 // Verify still only one user
168 user2, err := userService.GetUserByDID(ctx, did)
169 if err != nil {
170 t.Fatalf("Failed to get user after duplicate: %v", err)
171 }
172
173 if user1.CreatedAt != user2.CreatedAt {
174 t.Errorf("Duplicate indexing created new user (timestamps differ)")
175 }
176
177 t.Logf("✅ Idempotency verified: duplicate events handled gracefully")
178 })
179
180 // Test 3: Multiple users
181 t.Run("Index multiple users concurrently", func(t *testing.T) {
182 const numUsers = 3
183 dids := make([]string, numUsers)
184
185 for i := 0; i < numUsers; i++ {
186 handle := fmt.Sprintf("user%d-%d.local.coves.dev", i, time.Now().Unix())
187 email := fmt.Sprintf("user%d-%d@test.com", i, time.Now().Unix())
188
189 did, err := createPDSAccount(t, userService, handle, email, "test1234")
190 if err != nil {
191 t.Fatalf("Failed to create account %d: %v", i, err)
192 }
193 dids[i] = did
194 t.Logf("Created user %d: %s", i, did)
195
196 // Small delay between creations
197 time.Sleep(500 * time.Millisecond)
198 }
199
200 // Verify all indexed (with retry for each user)
201 t.Log("Waiting for all users to be indexed...")
202 for i, did := range dids {
203 var user *users.User
204 var err error
205 deadline := time.Now().Add(15 * time.Second)
206 for time.Now().Before(deadline) {
207 user, err = userService.GetUserByDID(ctx, did)
208 if err == nil {
209 break
210 }
211 time.Sleep(500 * time.Millisecond)
212 }
213 if err != nil {
214 t.Errorf("User %d not indexed after 15s: %v", i, err)
215 continue
216 }
217 t.Logf("✅ User %d indexed: %s", i, user.Handle)
218 }
219 })
220
221 // Clean shutdown
222 cancel()
223 select {
224 case err := <-consumerDone:
225 if err != nil && err != context.Canceled {
226 t.Logf("Consumer stopped with error: %v", err)
227 }
228 case <-time.After(5 * time.Second):
229 t.Log("Consumer shutdown timeout")
230 }
231}
232
233// generateInviteCode generates a single-use invite code via PDS admin API
234func generateInviteCode(t *testing.T) (string, error) {
235 payload := map[string]int{
236 "useCount": 1,
237 }
238
239 jsonData, err := json.Marshal(payload)
240 if err != nil {
241 return "", fmt.Errorf("failed to marshal request: %w", err)
242 }
243
244 req, err := http.NewRequest(
245 "POST",
246 "http://localhost:3001/xrpc/com.atproto.server.createInviteCode",
247 bytes.NewBuffer(jsonData),
248 )
249 if err != nil {
250 return "", fmt.Errorf("failed to create request: %w", err)
251 }
252
253 // PDS admin authentication
254 req.SetBasicAuth("admin", "admin")
255 req.Header.Set("Content-Type", "application/json")
256
257 resp, err := http.DefaultClient.Do(req)
258 if err != nil {
259 return "", fmt.Errorf("failed to create invite code: %w", err)
260 }
261 defer func() { _ = resp.Body.Close() }()
262
263 if resp.StatusCode != http.StatusOK {
264 var errorResp map[string]interface{}
265 if err := json.NewDecoder(resp.Body).Decode(&errorResp); err != nil {
266 return "", fmt.Errorf("PDS admin API returned status %d (failed to decode error: %w)", resp.StatusCode, err)
267 }
268 return "", fmt.Errorf("PDS admin API returned status %d: %v", resp.StatusCode, errorResp)
269 }
270
271 var result struct {
272 Code string `json:"code"`
273 }
274
275 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
276 return "", fmt.Errorf("failed to decode response: %w", err)
277 }
278
279 t.Logf("Generated invite code: %s", result.Code)
280 return result.Code, nil
281}
282
283// createPDSAccount creates an account via the coves.user.signup XRPC endpoint
284// This is the same code path that a third-party client or UI would use
285func createPDSAccount(t *testing.T, userService users.UserService, handle, email, password string) (string, error) {
286 // Generate fresh invite code for each account
287 inviteCode, err := generateInviteCode(t)
288 if err != nil {
289 return "", fmt.Errorf("failed to generate invite code: %w", err)
290 }
291
292 // Call our XRPC endpoint (what a third-party client would call)
293 payload := map[string]string{
294 "handle": handle,
295 "email": email,
296 "password": password,
297 "inviteCode": inviteCode,
298 }
299
300 jsonData, err := json.Marshal(payload)
301 if err != nil {
302 return "", fmt.Errorf("failed to marshal request: %w", err)
303 }
304
305 resp, err := http.Post(
306 "http://localhost:8081/xrpc/social.coves.actor.signup",
307 "application/json",
308 bytes.NewBuffer(jsonData),
309 )
310 if err != nil {
311 return "", fmt.Errorf("failed to call signup endpoint: %w", err)
312 }
313 defer func() { _ = resp.Body.Close() }()
314
315 if resp.StatusCode != http.StatusOK {
316 var errorResp map[string]interface{}
317 if err := json.NewDecoder(resp.Body).Decode(&errorResp); err != nil {
318 return "", fmt.Errorf("signup endpoint returned status %d (failed to decode error: %w)", resp.StatusCode, err)
319 }
320 return "", fmt.Errorf("signup endpoint returned status %d: %v", resp.StatusCode, errorResp)
321 }
322
323 var result struct {
324 DID string `json:"did"`
325 Handle string `json:"handle"`
326 AccessJwt string `json:"accessJwt"`
327 RefreshJwt string `json:"refreshJwt"`
328 }
329
330 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
331 return "", fmt.Errorf("failed to decode response: %w", err)
332 }
333
334 t.Logf("Account created via XRPC endpoint: %s → %s", result.Handle, result.DID)
335
336 return result.DID, nil
337}
338
339// isPDSAvailable checks if PDS is running
340func isPDSAvailable(t *testing.T) bool {
341 resp, err := http.Get("http://localhost:3001/xrpc/_health")
342 if err != nil {
343 t.Logf("PDS not available: %v", err)
344 return false
345 }
346 defer func() { _ = resp.Body.Close() }()
347 return resp.StatusCode == http.StatusOK
348}
349
350// isJetstreamAvailable checks if Jetstream is running
351func isJetstreamAvailable(t *testing.T) bool {
352 // Use 127.0.0.1 instead of localhost to force IPv4
353 resp, err := http.Get("http://127.0.0.1:6009/metrics")
354 if err != nil {
355 t.Logf("Jetstream not available: %v", err)
356 return false
357 }
358 defer func() { _ = resp.Body.Close() }()
359 return resp.StatusCode == http.StatusOK
360}
361
362// isAppViewAvailable checks if AppView is running
363func isAppViewAvailable(t *testing.T) bool {
364 resp, err := http.Get("http://localhost:8081/health")
365 if err != nil {
366 t.Logf("AppView not available: %v", err)
367 return false
368 }
369 defer func() { _ = resp.Body.Close() }()
370 return resp.StatusCode == http.StatusOK
371}
372
373// setupTestDB connects to test database and runs migrations
374func setupTestDB(t *testing.T) *sql.DB {
375 // Build connection string from environment variables (set by .env.dev)
376 testUser := os.Getenv("POSTGRES_TEST_USER")
377 testPassword := os.Getenv("POSTGRES_TEST_PASSWORD")
378 testPort := os.Getenv("POSTGRES_TEST_PORT")
379 testDB := os.Getenv("POSTGRES_TEST_DB")
380
381 // Fallback to defaults if not set
382 if testUser == "" {
383 testUser = "test_user"
384 }
385 if testPassword == "" {
386 testPassword = "test_password"
387 }
388 if testPort == "" {
389 testPort = "5434"
390 }
391 if testDB == "" {
392 testDB = "coves_test"
393 }
394
395 dbURL := fmt.Sprintf("postgres://%s:%s@localhost:%s/%s?sslmode=disable",
396 testUser, testPassword, testPort, testDB)
397
398 db, err := sql.Open("postgres", dbURL)
399 if err != nil {
400 t.Fatalf("Failed to connect to test database: %v", err)
401 }
402
403 if pingErr := db.Ping(); pingErr != nil {
404 t.Fatalf("Failed to ping test database: %v", pingErr)
405 }
406
407 if dialectErr := goose.SetDialect("postgres"); dialectErr != nil {
408 t.Fatalf("Failed to set goose dialect: %v", dialectErr)
409 }
410
411 if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil {
412 t.Fatalf("Failed to run migrations: %v", migrateErr)
413 }
414
415 // Clean up any existing test data
416 _, err = db.Exec("DELETE FROM users WHERE handle LIKE '%.test' OR handle LIKE '%.local.coves.dev'")
417 if err != nil {
418 t.Logf("Warning: Failed to clean up test data: %v", err)
419 }
420
421 return db
422}