A community based topic aggregation platform built on atproto
1package e2e
2
3import (
4 "bytes"
5 "context"
6 "database/sql"
7 "encoding/json"
8 "fmt"
9 "net/http"
10 "os"
11 "testing"
12 "time"
13
14 "Coves/internal/atproto/identity"
15 "Coves/internal/atproto/jetstream"
16 "Coves/internal/core/users"
17 "Coves/internal/db/postgres"
18 _ "github.com/lib/pq"
19 "github.com/pressly/goose/v3"
20)
21
22// TestE2E_UserSignup tests the full user signup flow:
23// Third-party client → social.coves.actor.signup XRPC → PDS account creation → Jetstream → AppView indexing
24//
25// This tests the same code path that a third-party client or UI would use.
26//
27// Prerequisites:
28// - AppView running on localhost:8081
29// - PDS running on localhost:3001
30// - Jetstream running on localhost:6008 (consuming from PDS)
31// - Test database on localhost:5434
32//
33// Run with:
34// make e2e-up # Start infrastructure
35// go run ./cmd/server & # Start AppView
36// go test ./tests/integration -run TestE2E_UserSignup -v
37func TestE2E_UserSignup(t *testing.T) {
38 if testing.Short() {
39 t.Skip("Skipping E2E test in short mode")
40 }
41
42 // Check if AppView is available
43 if !isAppViewAvailable(t) {
44 t.Skip("AppView not available at localhost:8081 - run 'go run ./cmd/server' first")
45 }
46
47 // Check if PDS is available
48 if !isPDSAvailable(t) {
49 t.Skip("PDS not available at localhost:3001 - run 'make e2e-up' first")
50 }
51
52 // Check if Jetstream is available
53 if !isJetstreamAvailable(t) {
54 t.Skip("Jetstream not available at localhost:6008 - run 'make e2e-up' first")
55 }
56
57 db := setupTestDB(t)
58 defer db.Close()
59
60 // Set up services
61 userRepo := postgres.NewUserRepository(db)
62 resolver := identity.NewResolver(db, identity.DefaultConfig())
63 userService := users.NewUserService(userRepo, resolver, "http://localhost:3001")
64
65 // Start Jetstream consumer
66 consumer := jetstream.NewUserEventConsumer(
67 userService,
68 resolver,
69 "ws://localhost:6008/subscribe",
70 "", // No PDS filter
71 )
72
73 ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
74 defer cancel()
75
76 // Start consumer in background
77 consumerDone := make(chan error, 1)
78 go func() {
79 consumerDone <- consumer.Start(ctx)
80 }()
81
82 // Give Jetstream consumer a moment to connect (no need to wait long)
83 t.Log("Waiting for Jetstream consumer to connect...")
84 time.Sleep(500 * time.Millisecond)
85
86 // Test 1: Create account on PDS
87 t.Run("Create account on PDS and verify indexing", func(t *testing.T) {
88 handle := fmt.Sprintf("alice-%d.local.coves.dev", time.Now().Unix())
89 email := fmt.Sprintf("alice-%d@test.com", time.Now().Unix())
90
91 t.Logf("Creating account: %s", handle)
92
93 // Create account via UserService (what UI would call)
94 did, err := createPDSAccount(t, userService, handle, email, "test1234")
95 if err != nil {
96 t.Fatalf("Failed to create PDS account: %v", err)
97 }
98
99 t.Logf("Account created with DID: %s", did)
100
101 // Wait for Jetstream to process and AppView to index (with retry)
102 t.Log("Waiting for Jetstream → AppView indexing...")
103 var user *users.User
104 deadline := time.Now().Add(10 * time.Second)
105 for time.Now().Before(deadline) {
106 user, err = userService.GetUserByDID(ctx, did)
107 if err == nil {
108 break // Successfully indexed!
109 }
110 time.Sleep(500 * time.Millisecond)
111 }
112 if err != nil {
113 t.Fatalf("User not indexed in AppView after 10s: %v", err)
114 }
115
116 if user.Handle != handle {
117 t.Errorf("Expected handle %s, got %s", handle, user.Handle)
118 }
119
120 if user.DID != did {
121 t.Errorf("Expected DID %s, got %s", did, user.DID)
122 }
123
124 t.Logf("✅ User successfully indexed: %s → %s", handle, did)
125 })
126
127 // Test 2: Idempotency
128 t.Run("Idempotent indexing on duplicate events", func(t *testing.T) {
129 handle := fmt.Sprintf("bob-%d.local.coves.dev", time.Now().Unix())
130 email := fmt.Sprintf("bob-%d@test.com", time.Now().Unix())
131
132 // Create account via UserService
133 did, err := createPDSAccount(t, userService, handle, email, "test1234")
134 if err != nil {
135 t.Fatalf("Failed to create PDS account: %v", err)
136 }
137
138 // Wait for indexing (with retry)
139 var user1 *users.User
140 deadline := time.Now().Add(10 * time.Second)
141 for time.Now().Before(deadline) {
142 user1, err = userService.GetUserByDID(ctx, did)
143 if err == nil {
144 break
145 }
146 time.Sleep(500 * time.Millisecond)
147 }
148 if err != nil {
149 t.Fatalf("User not indexed after 10s: %v", err)
150 }
151
152 // Manually trigger duplicate indexing (simulates Jetstream replay)
153 _, err = userService.CreateUser(ctx, users.CreateUserRequest{
154 DID: did,
155 Handle: handle,
156 PDSURL: "http://localhost:3001",
157 })
158 if err != nil {
159 t.Fatalf("Idempotent CreateUser should not error: %v", err)
160 }
161
162 // Verify still only one user
163 user2, err := userService.GetUserByDID(ctx, did)
164 if err != nil {
165 t.Fatalf("Failed to get user after duplicate: %v", err)
166 }
167
168 if user1.CreatedAt != user2.CreatedAt {
169 t.Errorf("Duplicate indexing created new user (timestamps differ)")
170 }
171
172 t.Logf("✅ Idempotency verified: duplicate events handled gracefully")
173 })
174
175 // Test 3: Multiple users
176 t.Run("Index multiple users concurrently", func(t *testing.T) {
177 const numUsers = 3
178 dids := make([]string, numUsers)
179
180 for i := 0; i < numUsers; i++ {
181 handle := fmt.Sprintf("user%d-%d.local.coves.dev", i, time.Now().Unix())
182 email := fmt.Sprintf("user%d-%d@test.com", i, time.Now().Unix())
183
184 did, err := createPDSAccount(t, userService, handle, email, "test1234")
185 if err != nil {
186 t.Fatalf("Failed to create account %d: %v", i, err)
187 }
188 dids[i] = did
189 t.Logf("Created user %d: %s", i, did)
190
191 // Small delay between creations
192 time.Sleep(500 * time.Millisecond)
193 }
194
195 // Verify all indexed (with retry for each user)
196 t.Log("Waiting for all users to be indexed...")
197 for i, did := range dids {
198 var user *users.User
199 var err error
200 deadline := time.Now().Add(15 * time.Second)
201 for time.Now().Before(deadline) {
202 user, err = userService.GetUserByDID(ctx, did)
203 if err == nil {
204 break
205 }
206 time.Sleep(500 * time.Millisecond)
207 }
208 if err != nil {
209 t.Errorf("User %d not indexed after 15s: %v", i, err)
210 continue
211 }
212 t.Logf("✅ User %d indexed: %s", i, user.Handle)
213 }
214 })
215
216 // Clean shutdown
217 cancel()
218 select {
219 case err := <-consumerDone:
220 if err != nil && err != context.Canceled {
221 t.Logf("Consumer stopped with error: %v", err)
222 }
223 case <-time.After(5 * time.Second):
224 t.Log("Consumer shutdown timeout")
225 }
226}
227
228// generateInviteCode generates a single-use invite code via PDS admin API
229func generateInviteCode(t *testing.T) (string, error) {
230 payload := map[string]int{
231 "useCount": 1,
232 }
233
234 jsonData, err := json.Marshal(payload)
235 if err != nil {
236 return "", fmt.Errorf("failed to marshal request: %w", err)
237 }
238
239 req, err := http.NewRequest(
240 "POST",
241 "http://localhost:3001/xrpc/com.atproto.server.createInviteCode",
242 bytes.NewBuffer(jsonData),
243 )
244 if err != nil {
245 return "", fmt.Errorf("failed to create request: %w", err)
246 }
247
248 // PDS admin authentication
249 req.SetBasicAuth("admin", "admin")
250 req.Header.Set("Content-Type", "application/json")
251
252 resp, err := http.DefaultClient.Do(req)
253 if err != nil {
254 return "", fmt.Errorf("failed to create invite code: %w", err)
255 }
256 defer resp.Body.Close()
257
258 if resp.StatusCode != http.StatusOK {
259 var errorResp map[string]interface{}
260 json.NewDecoder(resp.Body).Decode(&errorResp)
261 return "", fmt.Errorf("PDS admin API returned status %d: %v", resp.StatusCode, errorResp)
262 }
263
264 var result struct {
265 Code string `json:"code"`
266 }
267
268 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
269 return "", fmt.Errorf("failed to decode response: %w", err)
270 }
271
272 t.Logf("Generated invite code: %s", result.Code)
273 return result.Code, nil
274}
275
276// createPDSAccount creates an account via the coves.user.signup XRPC endpoint
277// This is the same code path that a third-party client or UI would use
278func createPDSAccount(t *testing.T, userService users.UserService, handle, email, password string) (string, error) {
279 // Generate fresh invite code for each account
280 inviteCode, err := generateInviteCode(t)
281 if err != nil {
282 return "", fmt.Errorf("failed to generate invite code: %w", err)
283 }
284
285 // Call our XRPC endpoint (what a third-party client would call)
286 payload := map[string]string{
287 "handle": handle,
288 "email": email,
289 "password": password,
290 "inviteCode": inviteCode,
291 }
292
293 jsonData, err := json.Marshal(payload)
294 if err != nil {
295 return "", fmt.Errorf("failed to marshal request: %w", err)
296 }
297
298 resp, err := http.Post(
299 "http://localhost:8081/xrpc/social.coves.actor.signup",
300 "application/json",
301 bytes.NewBuffer(jsonData),
302 )
303 if err != nil {
304 return "", fmt.Errorf("failed to call signup endpoint: %w", err)
305 }
306 defer resp.Body.Close()
307
308 if resp.StatusCode != http.StatusOK {
309 var errorResp map[string]interface{}
310 json.NewDecoder(resp.Body).Decode(&errorResp)
311 return "", fmt.Errorf("signup endpoint returned status %d: %v", resp.StatusCode, errorResp)
312 }
313
314 var result struct {
315 DID string `json:"did"`
316 Handle string `json:"handle"`
317 AccessJwt string `json:"accessJwt"`
318 RefreshJwt string `json:"refreshJwt"`
319 }
320
321 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
322 return "", fmt.Errorf("failed to decode response: %w", err)
323 }
324
325 t.Logf("Account created via XRPC endpoint: %s → %s", result.Handle, result.DID)
326
327 return result.DID, nil
328}
329
330// isPDSAvailable checks if PDS is running
331func isPDSAvailable(t *testing.T) bool {
332 resp, err := http.Get("http://localhost:3001/xrpc/_health")
333 if err != nil {
334 t.Logf("PDS not available: %v", err)
335 return false
336 }
337 defer resp.Body.Close()
338 return resp.StatusCode == http.StatusOK
339}
340
341// isJetstreamAvailable checks if Jetstream is running
342func isJetstreamAvailable(t *testing.T) bool {
343 // Use 127.0.0.1 instead of localhost to force IPv4
344 resp, err := http.Get("http://127.0.0.1:6009/metrics")
345 if err != nil {
346 t.Logf("Jetstream not available: %v", err)
347 return false
348 }
349 defer resp.Body.Close()
350 return resp.StatusCode == http.StatusOK
351}
352
353// isAppViewAvailable checks if AppView is running
354func isAppViewAvailable(t *testing.T) bool {
355 resp, err := http.Get("http://localhost:8081/health")
356 if err != nil {
357 t.Logf("AppView not available: %v", err)
358 return false
359 }
360 defer resp.Body.Close()
361 return resp.StatusCode == http.StatusOK
362}
363
364// setupTestDB connects to test database and runs migrations
365func setupTestDB(t *testing.T) *sql.DB {
366 // Build connection string from environment variables (set by .env.dev)
367 testUser := os.Getenv("POSTGRES_TEST_USER")
368 testPassword := os.Getenv("POSTGRES_TEST_PASSWORD")
369 testPort := os.Getenv("POSTGRES_TEST_PORT")
370 testDB := os.Getenv("POSTGRES_TEST_DB")
371
372 // Fallback to defaults if not set
373 if testUser == "" {
374 testUser = "test_user"
375 }
376 if testPassword == "" {
377 testPassword = "test_password"
378 }
379 if testPort == "" {
380 testPort = "5434"
381 }
382 if testDB == "" {
383 testDB = "coves_test"
384 }
385
386 dbURL := fmt.Sprintf("postgres://%s:%s@localhost:%s/%s?sslmode=disable",
387 testUser, testPassword, testPort, testDB)
388
389 db, err := sql.Open("postgres", dbURL)
390 if err != nil {
391 t.Fatalf("Failed to connect to test database: %v", err)
392 }
393
394 if err := db.Ping(); err != nil {
395 t.Fatalf("Failed to ping test database: %v", err)
396 }
397
398 if err := goose.SetDialect("postgres"); err != nil {
399 t.Fatalf("Failed to set goose dialect: %v", err)
400 }
401
402 if err := goose.Up(db, "../../internal/db/migrations"); err != nil {
403 t.Fatalf("Failed to run migrations: %v", err)
404 }
405
406 // Clean up any existing test data
407 _, err = db.Exec("DELETE FROM users WHERE handle LIKE '%.test' OR handle LIKE '%.local.coves.dev'")
408 if err != nil {
409 t.Logf("Warning: Failed to clean up test data: %v", err)
410 }
411
412 return db
413}