A community based topic aggregation platform built on atproto
at main 12 kB view raw
1package e2e 2 3import ( 4 "Coves/internal/atproto/identity" 5 "Coves/internal/atproto/jetstream" 6 "Coves/internal/core/users" 7 "Coves/internal/db/postgres" 8 "bytes" 9 "context" 10 "database/sql" 11 "encoding/json" 12 "fmt" 13 "net/http" 14 "os" 15 "testing" 16 "time" 17 18 _ "github.com/lib/pq" 19 "github.com/pressly/goose/v3" 20) 21 22// TestE2E_UserSignup tests the full user signup flow: 23// Third-party client → social.coves.actor.signup XRPC → PDS account creation → Jetstream → AppView indexing 24// 25// This tests the same code path that a third-party client or UI would use. 26// 27// Prerequisites: 28// - AppView running on localhost:8081 29// - PDS running on localhost:3001 30// - Jetstream running on localhost:6008 (consuming from PDS) 31// - Test database on localhost:5434 32// 33// Run with: 34// 35// make e2e-up # Start infrastructure 36// go run ./cmd/server & # Start AppView 37// go test ./tests/integration -run TestE2E_UserSignup -v 38func TestE2E_UserSignup(t *testing.T) { 39 if testing.Short() { 40 t.Skip("Skipping E2E test in short mode") 41 } 42 43 // Check if AppView is available 44 if !isAppViewAvailable(t) { 45 t.Skip("AppView not available at localhost:8081 - run 'go run ./cmd/server' first") 46 } 47 48 // Check if PDS is available 49 if !isPDSAvailable(t) { 50 t.Skip("PDS not available at localhost:3001 - run 'make e2e-up' first") 51 } 52 53 // Check if Jetstream is available 54 if !isJetstreamAvailable(t) { 55 t.Skip("Jetstream not available at localhost:6008 - run 'make e2e-up' first") 56 } 57 58 db := setupTestDB(t) 59 defer func() { 60 if err := db.Close(); err != nil { 61 t.Logf("Failed to close database: %v", err) 62 } 63 }() 64 65 // Set up services 66 userRepo := postgres.NewUserRepository(db) 67 resolver := identity.NewResolver(db, identity.DefaultConfig()) 68 userService := users.NewUserService(userRepo, resolver, "http://localhost:3001") 69 70 // Start Jetstream consumer 71 consumer := jetstream.NewUserEventConsumer( 72 userService, 73 resolver, 74 "ws://localhost:6008/subscribe", 75 "", // No PDS filter 76 ) 77 78 ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) 79 defer cancel() 80 81 // Start consumer in background 82 consumerDone := make(chan error, 1) 83 go func() { 84 consumerDone <- consumer.Start(ctx) 85 }() 86 87 // Give Jetstream consumer a moment to connect (no need to wait long) 88 t.Log("Waiting for Jetstream consumer to connect...") 89 time.Sleep(500 * time.Millisecond) 90 91 // Test 1: Create account on PDS 92 t.Run("Create account on PDS and verify indexing", func(t *testing.T) { 93 handle := fmt.Sprintf("alice-%d.local.coves.dev", time.Now().Unix()) 94 email := fmt.Sprintf("alice-%d@test.com", time.Now().Unix()) 95 96 t.Logf("Creating account: %s", handle) 97 98 // Create account via UserService (what UI would call) 99 did, err := createPDSAccount(t, userService, handle, email, "test1234") 100 if err != nil { 101 t.Fatalf("Failed to create PDS account: %v", err) 102 } 103 104 t.Logf("Account created with DID: %s", did) 105 106 // Wait for Jetstream to process and AppView to index (with retry) 107 t.Log("Waiting for Jetstream → AppView indexing...") 108 var user *users.User 109 deadline := time.Now().Add(10 * time.Second) 110 for time.Now().Before(deadline) { 111 user, err = userService.GetUserByDID(ctx, did) 112 if err == nil { 113 break // Successfully indexed! 114 } 115 time.Sleep(500 * time.Millisecond) 116 } 117 if err != nil { 118 t.Fatalf("User not indexed in AppView after 10s: %v", err) 119 } 120 121 if user.Handle != handle { 122 t.Errorf("Expected handle %s, got %s", handle, user.Handle) 123 } 124 125 if user.DID != did { 126 t.Errorf("Expected DID %s, got %s", did, user.DID) 127 } 128 129 t.Logf("✅ User successfully indexed: %s → %s", handle, did) 130 }) 131 132 // Test 2: Idempotency 133 t.Run("Idempotent indexing on duplicate events", func(t *testing.T) { 134 handle := fmt.Sprintf("bob-%d.local.coves.dev", time.Now().Unix()) 135 email := fmt.Sprintf("bob-%d@test.com", time.Now().Unix()) 136 137 // Create account via UserService 138 did, err := createPDSAccount(t, userService, handle, email, "test1234") 139 if err != nil { 140 t.Fatalf("Failed to create PDS account: %v", err) 141 } 142 143 // Wait for indexing (with retry) 144 var user1 *users.User 145 deadline := time.Now().Add(10 * time.Second) 146 for time.Now().Before(deadline) { 147 user1, err = userService.GetUserByDID(ctx, did) 148 if err == nil { 149 break 150 } 151 time.Sleep(500 * time.Millisecond) 152 } 153 if err != nil { 154 t.Fatalf("User not indexed after 10s: %v", err) 155 } 156 157 // Manually trigger duplicate indexing (simulates Jetstream replay) 158 _, err = userService.CreateUser(ctx, users.CreateUserRequest{ 159 DID: did, 160 Handle: handle, 161 PDSURL: "http://localhost:3001", 162 }) 163 if err != nil { 164 t.Fatalf("Idempotent CreateUser should not error: %v", err) 165 } 166 167 // Verify still only one user 168 user2, err := userService.GetUserByDID(ctx, did) 169 if err != nil { 170 t.Fatalf("Failed to get user after duplicate: %v", err) 171 } 172 173 if user1.CreatedAt != user2.CreatedAt { 174 t.Errorf("Duplicate indexing created new user (timestamps differ)") 175 } 176 177 t.Logf("✅ Idempotency verified: duplicate events handled gracefully") 178 }) 179 180 // Test 3: Multiple users 181 t.Run("Index multiple users concurrently", func(t *testing.T) { 182 const numUsers = 3 183 dids := make([]string, numUsers) 184 185 for i := 0; i < numUsers; i++ { 186 handle := fmt.Sprintf("user%d-%d.local.coves.dev", i, time.Now().Unix()) 187 email := fmt.Sprintf("user%d-%d@test.com", i, time.Now().Unix()) 188 189 did, err := createPDSAccount(t, userService, handle, email, "test1234") 190 if err != nil { 191 t.Fatalf("Failed to create account %d: %v", i, err) 192 } 193 dids[i] = did 194 t.Logf("Created user %d: %s", i, did) 195 196 // Small delay between creations 197 time.Sleep(500 * time.Millisecond) 198 } 199 200 // Verify all indexed (with retry for each user) 201 t.Log("Waiting for all users to be indexed...") 202 for i, did := range dids { 203 var user *users.User 204 var err error 205 deadline := time.Now().Add(15 * time.Second) 206 for time.Now().Before(deadline) { 207 user, err = userService.GetUserByDID(ctx, did) 208 if err == nil { 209 break 210 } 211 time.Sleep(500 * time.Millisecond) 212 } 213 if err != nil { 214 t.Errorf("User %d not indexed after 15s: %v", i, err) 215 continue 216 } 217 t.Logf("✅ User %d indexed: %s", i, user.Handle) 218 } 219 }) 220 221 // Clean shutdown 222 cancel() 223 select { 224 case err := <-consumerDone: 225 if err != nil && err != context.Canceled { 226 t.Logf("Consumer stopped with error: %v", err) 227 } 228 case <-time.After(5 * time.Second): 229 t.Log("Consumer shutdown timeout") 230 } 231} 232 233// generateInviteCode generates a single-use invite code via PDS admin API 234func generateInviteCode(t *testing.T) (string, error) { 235 payload := map[string]int{ 236 "useCount": 1, 237 } 238 239 jsonData, err := json.Marshal(payload) 240 if err != nil { 241 return "", fmt.Errorf("failed to marshal request: %w", err) 242 } 243 244 req, err := http.NewRequest( 245 "POST", 246 "http://localhost:3001/xrpc/com.atproto.server.createInviteCode", 247 bytes.NewBuffer(jsonData), 248 ) 249 if err != nil { 250 return "", fmt.Errorf("failed to create request: %w", err) 251 } 252 253 // PDS admin authentication 254 req.SetBasicAuth("admin", "admin") 255 req.Header.Set("Content-Type", "application/json") 256 257 resp, err := http.DefaultClient.Do(req) 258 if err != nil { 259 return "", fmt.Errorf("failed to create invite code: %w", err) 260 } 261 defer func() { _ = resp.Body.Close() }() 262 263 if resp.StatusCode != http.StatusOK { 264 var errorResp map[string]interface{} 265 if err := json.NewDecoder(resp.Body).Decode(&errorResp); err != nil { 266 return "", fmt.Errorf("PDS admin API returned status %d (failed to decode error: %w)", resp.StatusCode, err) 267 } 268 return "", fmt.Errorf("PDS admin API returned status %d: %v", resp.StatusCode, errorResp) 269 } 270 271 var result struct { 272 Code string `json:"code"` 273 } 274 275 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 276 return "", fmt.Errorf("failed to decode response: %w", err) 277 } 278 279 t.Logf("Generated invite code: %s", result.Code) 280 return result.Code, nil 281} 282 283// createPDSAccount creates an account via the coves.user.signup XRPC endpoint 284// This is the same code path that a third-party client or UI would use 285func createPDSAccount(t *testing.T, userService users.UserService, handle, email, password string) (string, error) { 286 // Generate fresh invite code for each account 287 inviteCode, err := generateInviteCode(t) 288 if err != nil { 289 return "", fmt.Errorf("failed to generate invite code: %w", err) 290 } 291 292 // Call our XRPC endpoint (what a third-party client would call) 293 payload := map[string]string{ 294 "handle": handle, 295 "email": email, 296 "password": password, 297 "inviteCode": inviteCode, 298 } 299 300 jsonData, err := json.Marshal(payload) 301 if err != nil { 302 return "", fmt.Errorf("failed to marshal request: %w", err) 303 } 304 305 resp, err := http.Post( 306 "http://localhost:8081/xrpc/social.coves.actor.signup", 307 "application/json", 308 bytes.NewBuffer(jsonData), 309 ) 310 if err != nil { 311 return "", fmt.Errorf("failed to call signup endpoint: %w", err) 312 } 313 defer func() { _ = resp.Body.Close() }() 314 315 if resp.StatusCode != http.StatusOK { 316 var errorResp map[string]interface{} 317 if err := json.NewDecoder(resp.Body).Decode(&errorResp); err != nil { 318 return "", fmt.Errorf("signup endpoint returned status %d (failed to decode error: %w)", resp.StatusCode, err) 319 } 320 return "", fmt.Errorf("signup endpoint returned status %d: %v", resp.StatusCode, errorResp) 321 } 322 323 var result struct { 324 DID string `json:"did"` 325 Handle string `json:"handle"` 326 AccessJwt string `json:"accessJwt"` 327 RefreshJwt string `json:"refreshJwt"` 328 } 329 330 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 331 return "", fmt.Errorf("failed to decode response: %w", err) 332 } 333 334 t.Logf("Account created via XRPC endpoint: %s → %s", result.Handle, result.DID) 335 336 return result.DID, nil 337} 338 339// isPDSAvailable checks if PDS is running 340func isPDSAvailable(t *testing.T) bool { 341 resp, err := http.Get("http://localhost:3001/xrpc/_health") 342 if err != nil { 343 t.Logf("PDS not available: %v", err) 344 return false 345 } 346 defer func() { _ = resp.Body.Close() }() 347 return resp.StatusCode == http.StatusOK 348} 349 350// isJetstreamAvailable checks if Jetstream is running 351func isJetstreamAvailable(t *testing.T) bool { 352 // Use 127.0.0.1 instead of localhost to force IPv4 353 resp, err := http.Get("http://127.0.0.1:6009/metrics") 354 if err != nil { 355 t.Logf("Jetstream not available: %v", err) 356 return false 357 } 358 defer func() { _ = resp.Body.Close() }() 359 return resp.StatusCode == http.StatusOK 360} 361 362// isAppViewAvailable checks if AppView is running 363func isAppViewAvailable(t *testing.T) bool { 364 resp, err := http.Get("http://localhost:8081/health") 365 if err != nil { 366 t.Logf("AppView not available: %v", err) 367 return false 368 } 369 defer func() { _ = resp.Body.Close() }() 370 return resp.StatusCode == http.StatusOK 371} 372 373// setupTestDB connects to test database and runs migrations 374func setupTestDB(t *testing.T) *sql.DB { 375 // Build connection string from environment variables (set by .env.dev) 376 testUser := os.Getenv("POSTGRES_TEST_USER") 377 testPassword := os.Getenv("POSTGRES_TEST_PASSWORD") 378 testPort := os.Getenv("POSTGRES_TEST_PORT") 379 testDB := os.Getenv("POSTGRES_TEST_DB") 380 381 // Fallback to defaults if not set 382 if testUser == "" { 383 testUser = "test_user" 384 } 385 if testPassword == "" { 386 testPassword = "test_password" 387 } 388 if testPort == "" { 389 testPort = "5434" 390 } 391 if testDB == "" { 392 testDB = "coves_test" 393 } 394 395 dbURL := fmt.Sprintf("postgres://%s:%s@localhost:%s/%s?sslmode=disable", 396 testUser, testPassword, testPort, testDB) 397 398 db, err := sql.Open("postgres", dbURL) 399 if err != nil { 400 t.Fatalf("Failed to connect to test database: %v", err) 401 } 402 403 if pingErr := db.Ping(); pingErr != nil { 404 t.Fatalf("Failed to ping test database: %v", pingErr) 405 } 406 407 if dialectErr := goose.SetDialect("postgres"); dialectErr != nil { 408 t.Fatalf("Failed to set goose dialect: %v", dialectErr) 409 } 410 411 if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil { 412 t.Fatalf("Failed to run migrations: %v", migrateErr) 413 } 414 415 // Clean up any existing test data 416 _, err = db.Exec("DELETE FROM users WHERE handle LIKE '%.test' OR handle LIKE '%.local.coves.dev'") 417 if err != nil { 418 t.Logf("Warning: Failed to clean up test data: %v", err) 419 } 420 421 return db 422}