A community based topic aggregation platform built on atproto
1package e2e 2 3import ( 4 "bytes" 5 "context" 6 "database/sql" 7 "encoding/json" 8 "fmt" 9 "net/http" 10 "os" 11 "testing" 12 "time" 13 14 "Coves/internal/atproto/identity" 15 "Coves/internal/atproto/jetstream" 16 "Coves/internal/core/users" 17 "Coves/internal/db/postgres" 18 _ "github.com/lib/pq" 19 "github.com/pressly/goose/v3" 20) 21 22// TestE2E_UserSignup tests the full user signup flow: 23// Third-party client → social.coves.actor.signup XRPC → PDS account creation → Jetstream → AppView indexing 24// 25// This tests the same code path that a third-party client or UI would use. 26// 27// Prerequisites: 28// - AppView running on localhost:8081 29// - PDS running on localhost:3001 30// - Jetstream running on localhost:6008 (consuming from PDS) 31// - Test database on localhost:5434 32// 33// Run with: 34// make e2e-up # Start infrastructure 35// go run ./cmd/server & # Start AppView 36// go test ./tests/integration -run TestE2E_UserSignup -v 37func TestE2E_UserSignup(t *testing.T) { 38 if testing.Short() { 39 t.Skip("Skipping E2E test in short mode") 40 } 41 42 // Check if AppView is available 43 if !isAppViewAvailable(t) { 44 t.Skip("AppView not available at localhost:8081 - run 'go run ./cmd/server' first") 45 } 46 47 // Check if PDS is available 48 if !isPDSAvailable(t) { 49 t.Skip("PDS not available at localhost:3001 - run 'make e2e-up' first") 50 } 51 52 // Check if Jetstream is available 53 if !isJetstreamAvailable(t) { 54 t.Skip("Jetstream not available at localhost:6008 - run 'make e2e-up' first") 55 } 56 57 db := setupTestDB(t) 58 defer db.Close() 59 60 // Set up services 61 userRepo := postgres.NewUserRepository(db) 62 resolver := identity.NewResolver(db, identity.DefaultConfig()) 63 userService := users.NewUserService(userRepo, resolver, "http://localhost:3001") 64 65 // Start Jetstream consumer 66 consumer := jetstream.NewUserEventConsumer( 67 userService, 68 resolver, 69 "ws://localhost:6008/subscribe", 70 "", // No PDS filter 71 ) 72 73 ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) 74 defer cancel() 75 76 // Start consumer in background 77 consumerDone := make(chan error, 1) 78 go func() { 79 consumerDone <- consumer.Start(ctx) 80 }() 81 82 // Give Jetstream consumer a moment to connect (no need to wait long) 83 t.Log("Waiting for Jetstream consumer to connect...") 84 time.Sleep(500 * time.Millisecond) 85 86 // Test 1: Create account on PDS 87 t.Run("Create account on PDS and verify indexing", func(t *testing.T) { 88 handle := fmt.Sprintf("alice-%d.local.coves.dev", time.Now().Unix()) 89 email := fmt.Sprintf("alice-%d@test.com", time.Now().Unix()) 90 91 t.Logf("Creating account: %s", handle) 92 93 // Create account via UserService (what UI would call) 94 did, err := createPDSAccount(t, userService, handle, email, "test1234") 95 if err != nil { 96 t.Fatalf("Failed to create PDS account: %v", err) 97 } 98 99 t.Logf("Account created with DID: %s", did) 100 101 // Wait for Jetstream to process and AppView to index (with retry) 102 t.Log("Waiting for Jetstream → AppView indexing...") 103 var user *users.User 104 deadline := time.Now().Add(10 * time.Second) 105 for time.Now().Before(deadline) { 106 user, err = userService.GetUserByDID(ctx, did) 107 if err == nil { 108 break // Successfully indexed! 109 } 110 time.Sleep(500 * time.Millisecond) 111 } 112 if err != nil { 113 t.Fatalf("User not indexed in AppView after 10s: %v", err) 114 } 115 116 if user.Handle != handle { 117 t.Errorf("Expected handle %s, got %s", handle, user.Handle) 118 } 119 120 if user.DID != did { 121 t.Errorf("Expected DID %s, got %s", did, user.DID) 122 } 123 124 t.Logf("✅ User successfully indexed: %s → %s", handle, did) 125 }) 126 127 // Test 2: Idempotency 128 t.Run("Idempotent indexing on duplicate events", func(t *testing.T) { 129 handle := fmt.Sprintf("bob-%d.local.coves.dev", time.Now().Unix()) 130 email := fmt.Sprintf("bob-%d@test.com", time.Now().Unix()) 131 132 // Create account via UserService 133 did, err := createPDSAccount(t, userService, handle, email, "test1234") 134 if err != nil { 135 t.Fatalf("Failed to create PDS account: %v", err) 136 } 137 138 // Wait for indexing (with retry) 139 var user1 *users.User 140 deadline := time.Now().Add(10 * time.Second) 141 for time.Now().Before(deadline) { 142 user1, err = userService.GetUserByDID(ctx, did) 143 if err == nil { 144 break 145 } 146 time.Sleep(500 * time.Millisecond) 147 } 148 if err != nil { 149 t.Fatalf("User not indexed after 10s: %v", err) 150 } 151 152 // Manually trigger duplicate indexing (simulates Jetstream replay) 153 _, err = userService.CreateUser(ctx, users.CreateUserRequest{ 154 DID: did, 155 Handle: handle, 156 PDSURL: "http://localhost:3001", 157 }) 158 if err != nil { 159 t.Fatalf("Idempotent CreateUser should not error: %v", err) 160 } 161 162 // Verify still only one user 163 user2, err := userService.GetUserByDID(ctx, did) 164 if err != nil { 165 t.Fatalf("Failed to get user after duplicate: %v", err) 166 } 167 168 if user1.CreatedAt != user2.CreatedAt { 169 t.Errorf("Duplicate indexing created new user (timestamps differ)") 170 } 171 172 t.Logf("✅ Idempotency verified: duplicate events handled gracefully") 173 }) 174 175 // Test 3: Multiple users 176 t.Run("Index multiple users concurrently", func(t *testing.T) { 177 const numUsers = 3 178 dids := make([]string, numUsers) 179 180 for i := 0; i < numUsers; i++ { 181 handle := fmt.Sprintf("user%d-%d.local.coves.dev", i, time.Now().Unix()) 182 email := fmt.Sprintf("user%d-%d@test.com", i, time.Now().Unix()) 183 184 did, err := createPDSAccount(t, userService, handle, email, "test1234") 185 if err != nil { 186 t.Fatalf("Failed to create account %d: %v", i, err) 187 } 188 dids[i] = did 189 t.Logf("Created user %d: %s", i, did) 190 191 // Small delay between creations 192 time.Sleep(500 * time.Millisecond) 193 } 194 195 // Verify all indexed (with retry for each user) 196 t.Log("Waiting for all users to be indexed...") 197 for i, did := range dids { 198 var user *users.User 199 var err error 200 deadline := time.Now().Add(15 * time.Second) 201 for time.Now().Before(deadline) { 202 user, err = userService.GetUserByDID(ctx, did) 203 if err == nil { 204 break 205 } 206 time.Sleep(500 * time.Millisecond) 207 } 208 if err != nil { 209 t.Errorf("User %d not indexed after 15s: %v", i, err) 210 continue 211 } 212 t.Logf("✅ User %d indexed: %s", i, user.Handle) 213 } 214 }) 215 216 // Clean shutdown 217 cancel() 218 select { 219 case err := <-consumerDone: 220 if err != nil && err != context.Canceled { 221 t.Logf("Consumer stopped with error: %v", err) 222 } 223 case <-time.After(5 * time.Second): 224 t.Log("Consumer shutdown timeout") 225 } 226} 227 228// generateInviteCode generates a single-use invite code via PDS admin API 229func generateInviteCode(t *testing.T) (string, error) { 230 payload := map[string]int{ 231 "useCount": 1, 232 } 233 234 jsonData, err := json.Marshal(payload) 235 if err != nil { 236 return "", fmt.Errorf("failed to marshal request: %w", err) 237 } 238 239 req, err := http.NewRequest( 240 "POST", 241 "http://localhost:3001/xrpc/com.atproto.server.createInviteCode", 242 bytes.NewBuffer(jsonData), 243 ) 244 if err != nil { 245 return "", fmt.Errorf("failed to create request: %w", err) 246 } 247 248 // PDS admin authentication 249 req.SetBasicAuth("admin", "admin") 250 req.Header.Set("Content-Type", "application/json") 251 252 resp, err := http.DefaultClient.Do(req) 253 if err != nil { 254 return "", fmt.Errorf("failed to create invite code: %w", err) 255 } 256 defer resp.Body.Close() 257 258 if resp.StatusCode != http.StatusOK { 259 var errorResp map[string]interface{} 260 json.NewDecoder(resp.Body).Decode(&errorResp) 261 return "", fmt.Errorf("PDS admin API returned status %d: %v", resp.StatusCode, errorResp) 262 } 263 264 var result struct { 265 Code string `json:"code"` 266 } 267 268 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 269 return "", fmt.Errorf("failed to decode response: %w", err) 270 } 271 272 t.Logf("Generated invite code: %s", result.Code) 273 return result.Code, nil 274} 275 276// createPDSAccount creates an account via the coves.user.signup XRPC endpoint 277// This is the same code path that a third-party client or UI would use 278func createPDSAccount(t *testing.T, userService users.UserService, handle, email, password string) (string, error) { 279 // Generate fresh invite code for each account 280 inviteCode, err := generateInviteCode(t) 281 if err != nil { 282 return "", fmt.Errorf("failed to generate invite code: %w", err) 283 } 284 285 // Call our XRPC endpoint (what a third-party client would call) 286 payload := map[string]string{ 287 "handle": handle, 288 "email": email, 289 "password": password, 290 "inviteCode": inviteCode, 291 } 292 293 jsonData, err := json.Marshal(payload) 294 if err != nil { 295 return "", fmt.Errorf("failed to marshal request: %w", err) 296 } 297 298 resp, err := http.Post( 299 "http://localhost:8081/xrpc/social.coves.actor.signup", 300 "application/json", 301 bytes.NewBuffer(jsonData), 302 ) 303 if err != nil { 304 return "", fmt.Errorf("failed to call signup endpoint: %w", err) 305 } 306 defer resp.Body.Close() 307 308 if resp.StatusCode != http.StatusOK { 309 var errorResp map[string]interface{} 310 json.NewDecoder(resp.Body).Decode(&errorResp) 311 return "", fmt.Errorf("signup endpoint returned status %d: %v", resp.StatusCode, errorResp) 312 } 313 314 var result struct { 315 DID string `json:"did"` 316 Handle string `json:"handle"` 317 AccessJwt string `json:"accessJwt"` 318 RefreshJwt string `json:"refreshJwt"` 319 } 320 321 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 322 return "", fmt.Errorf("failed to decode response: %w", err) 323 } 324 325 t.Logf("Account created via XRPC endpoint: %s → %s", result.Handle, result.DID) 326 327 return result.DID, nil 328} 329 330// isPDSAvailable checks if PDS is running 331func isPDSAvailable(t *testing.T) bool { 332 resp, err := http.Get("http://localhost:3001/xrpc/_health") 333 if err != nil { 334 t.Logf("PDS not available: %v", err) 335 return false 336 } 337 defer resp.Body.Close() 338 return resp.StatusCode == http.StatusOK 339} 340 341// isJetstreamAvailable checks if Jetstream is running 342func isJetstreamAvailable(t *testing.T) bool { 343 // Use 127.0.0.1 instead of localhost to force IPv4 344 resp, err := http.Get("http://127.0.0.1:6009/metrics") 345 if err != nil { 346 t.Logf("Jetstream not available: %v", err) 347 return false 348 } 349 defer resp.Body.Close() 350 return resp.StatusCode == http.StatusOK 351} 352 353// isAppViewAvailable checks if AppView is running 354func isAppViewAvailable(t *testing.T) bool { 355 resp, err := http.Get("http://localhost:8081/health") 356 if err != nil { 357 t.Logf("AppView not available: %v", err) 358 return false 359 } 360 defer resp.Body.Close() 361 return resp.StatusCode == http.StatusOK 362} 363 364// setupTestDB connects to test database and runs migrations 365func setupTestDB(t *testing.T) *sql.DB { 366 // Build connection string from environment variables (set by .env.dev) 367 testUser := os.Getenv("POSTGRES_TEST_USER") 368 testPassword := os.Getenv("POSTGRES_TEST_PASSWORD") 369 testPort := os.Getenv("POSTGRES_TEST_PORT") 370 testDB := os.Getenv("POSTGRES_TEST_DB") 371 372 // Fallback to defaults if not set 373 if testUser == "" { 374 testUser = "test_user" 375 } 376 if testPassword == "" { 377 testPassword = "test_password" 378 } 379 if testPort == "" { 380 testPort = "5434" 381 } 382 if testDB == "" { 383 testDB = "coves_test" 384 } 385 386 dbURL := fmt.Sprintf("postgres://%s:%s@localhost:%s/%s?sslmode=disable", 387 testUser, testPassword, testPort, testDB) 388 389 db, err := sql.Open("postgres", dbURL) 390 if err != nil { 391 t.Fatalf("Failed to connect to test database: %v", err) 392 } 393 394 if err := db.Ping(); err != nil { 395 t.Fatalf("Failed to ping test database: %v", err) 396 } 397 398 if err := goose.SetDialect("postgres"); err != nil { 399 t.Fatalf("Failed to set goose dialect: %v", err) 400 } 401 402 if err := goose.Up(db, "../../internal/db/migrations"); err != nil { 403 t.Fatalf("Failed to run migrations: %v", err) 404 } 405 406 // Clean up any existing test data 407 _, err = db.Exec("DELETE FROM users WHERE handle LIKE '%.test' OR handle LIKE '%.local.coves.dev'") 408 if err != nil { 409 t.Logf("Warning: Failed to clean up test data: %v", err) 410 } 411 412 return db 413}