A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "bytes" 5 "context" 6 "database/sql" 7 "encoding/json" 8 "fmt" 9 "io" 10 "net/http" 11 "net/http/httptest" 12 "os" 13 "strings" 14 "testing" 15 "time" 16 17 "github.com/go-chi/chi/v5" 18 _ "github.com/lib/pq" 19 "github.com/pressly/goose/v3" 20 21 "Coves/internal/api/routes" 22 "Coves/internal/atproto/did" 23 "Coves/internal/atproto/jetstream" 24 "Coves/internal/core/communities" 25 "Coves/internal/db/postgres" 26) 27 28// TestCommunity_E2E is a comprehensive end-to-end test covering: 29// 1. Write-forward to PDS (service layer) 30// 2. Firehose consumer indexing 31// 3. XRPC HTTP endpoints (create, get, list) 32func TestCommunity_E2E(t *testing.T) { 33 // Skip in short mode since this requires real PDS 34 if testing.Short() { 35 t.Skip("Skipping E2E test in short mode") 36 } 37 38 // Setup test database 39 dbURL := os.Getenv("TEST_DATABASE_URL") 40 if dbURL == "" { 41 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 42 } 43 44 db, err := sql.Open("postgres", dbURL) 45 if err != nil { 46 t.Fatalf("Failed to connect to test database: %v", err) 47 } 48 defer db.Close() 49 50 // Run migrations 51 if err := goose.SetDialect("postgres"); err != nil { 52 t.Fatalf("Failed to set goose dialect: %v", err) 53 } 54 if err := goose.Up(db, "../../internal/db/migrations"); err != nil { 55 t.Fatalf("Failed to run migrations: %v", err) 56 } 57 58 // Check if PDS is running 59 pdsURL := os.Getenv("PDS_URL") 60 if pdsURL == "" { 61 pdsURL = "http://localhost:3001" 62 } 63 64 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 65 if err != nil { 66 t.Skipf("PDS not running at %s: %v", pdsURL, err) 67 } 68 healthResp.Body.Close() 69 70 // Setup dependencies 71 communityRepo := postgres.NewCommunityRepository(db) 72 didGen := did.NewGenerator(true, "https://plc.directory") 73 74 // Get instance credentials 75 instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE") 76 instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD") 77 if instanceHandle == "" { 78 instanceHandle = "testuser123.local.coves.dev" 79 } 80 if instancePassword == "" { 81 instancePassword = "test-password-123" 82 } 83 84 t.Logf("🔐 Authenticating with PDS as: %s", instanceHandle) 85 86 // Authenticate to get instance DID 87 accessToken, instanceDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword) 88 if err != nil { 89 t.Fatalf("Failed to authenticate with PDS: %v", err) 90 } 91 92 t.Logf("✅ Authenticated - Instance DID: %s", instanceDID) 93 94 // Create service and consumer 95 communityService := communities.NewCommunityService(communityRepo, didGen, pdsURL, instanceDID) 96 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 97 svc.SetPDSAccessToken(accessToken) 98 } 99 100 consumer := jetstream.NewCommunityEventConsumer(communityRepo) 101 102 // Setup HTTP server with XRPC routes 103 r := chi.NewRouter() 104 routes.RegisterCommunityRoutes(r, communityService) 105 httpServer := httptest.NewServer(r) 106 defer httpServer.Close() 107 108 ctx := context.Background() 109 110 // ==================================================================================== 111 // Part 1: Write-Forward to PDS (Service Layer) 112 // ==================================================================================== 113 t.Run("1. Write-Forward to PDS", func(t *testing.T) { 114 communityName := fmt.Sprintf("e2e-test-%d", time.Now().UnixNano()) 115 116 createReq := communities.CreateCommunityRequest{ 117 Name: communityName, 118 DisplayName: "E2E Test Community", 119 Description: "Testing full E2E flow", 120 Visibility: "public", 121 CreatedByDID: instanceDID, 122 HostedByDID: instanceDID, 123 AllowExternalDiscovery: true, 124 } 125 126 t.Logf("\n📝 Creating community via service: %s", communityName) 127 community, err := communityService.CreateCommunity(ctx, createReq) 128 if err != nil { 129 t.Fatalf("Failed to create community: %v", err) 130 } 131 132 t.Logf("✅ Service returned:") 133 t.Logf(" DID: %s", community.DID) 134 t.Logf(" Handle: %s", community.Handle) 135 t.Logf(" RecordURI: %s", community.RecordURI) 136 t.Logf(" RecordCID: %s", community.RecordCID) 137 138 // Verify DID format 139 if community.DID[:8] != "did:plc:" { 140 t.Errorf("Expected did:plc DID, got: %s", community.DID) 141 } 142 143 // Verify record exists in PDS 144 t.Logf("\n📡 Querying PDS for the record...") 145 146 collection := "social.coves.community.profile" 147 rkey := extractRKeyFromURI(community.RecordURI) 148 149 getRecordURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 150 pdsURL, instanceDID, collection, rkey) 151 152 pdsResp, err := http.Get(getRecordURL) 153 if err != nil { 154 t.Fatalf("Failed to query PDS: %v", err) 155 } 156 defer pdsResp.Body.Close() 157 158 if pdsResp.StatusCode != http.StatusOK { 159 body, _ := io.ReadAll(pdsResp.Body) 160 t.Fatalf("PDS returned status %d: %s", pdsResp.StatusCode, string(body)) 161 } 162 163 var pdsRecord struct { 164 URI string `json:"uri"` 165 CID string `json:"cid"` 166 Value map[string]interface{} `json:"value"` 167 } 168 169 if err := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); err != nil { 170 t.Fatalf("Failed to decode PDS response: %v", err) 171 } 172 173 t.Logf("✅ Record found in PDS!") 174 t.Logf(" URI: %s", pdsRecord.URI) 175 t.Logf(" CID: %s", pdsRecord.CID) 176 177 // Verify record has correct DIDs 178 if pdsRecord.Value["did"] != community.DID { 179 t.Errorf("Community DID mismatch in PDS record: expected %s, got %v", 180 community.DID, pdsRecord.Value["did"]) 181 } 182 183 // ==================================================================================== 184 // Part 2: Firehose Consumer Indexing 185 // ==================================================================================== 186 t.Run("2. Firehose Consumer Indexing", func(t *testing.T) { 187 t.Logf("\n🔄 Simulating Jetstream firehose event...") 188 189 // Simulate firehose event (in production, this comes from Jetstream) 190 firehoseEvent := jetstream.JetstreamEvent{ 191 Did: instanceDID, // Repository owner (instance DID, not community DID!) 192 TimeUS: time.Now().UnixMicro(), 193 Kind: "commit", 194 Commit: &jetstream.CommitEvent{ 195 Rev: "test-rev", 196 Operation: "create", 197 Collection: collection, 198 RKey: rkey, 199 CID: pdsRecord.CID, 200 Record: pdsRecord.Value, 201 }, 202 } 203 204 err := consumer.HandleEvent(ctx, &firehoseEvent) 205 if err != nil { 206 t.Fatalf("Failed to process firehose event: %v", err) 207 } 208 209 t.Logf("✅ Consumer processed event") 210 211 // Verify indexed in AppView database 212 t.Logf("\n🔍 Querying AppView database...") 213 214 indexed, err := communityRepo.GetByDID(ctx, community.DID) 215 if err != nil { 216 t.Fatalf("Community not indexed in AppView: %v", err) 217 } 218 219 t.Logf("✅ Community indexed in AppView:") 220 t.Logf(" DID: %s", indexed.DID) 221 t.Logf(" Handle: %s", indexed.Handle) 222 t.Logf(" DisplayName: %s", indexed.DisplayName) 223 t.Logf(" RecordURI: %s", indexed.RecordURI) 224 225 // Verify record_uri points to instance repo (not community repo) 226 if indexed.RecordURI[:len("at://"+instanceDID)] != "at://"+instanceDID { 227 t.Errorf("record_uri should point to instance repo, got: %s", indexed.RecordURI) 228 } 229 230 t.Logf("\n✅ Part 1 & 2 Complete: Write-Forward → PDS → Firehose → AppView ✓") 231 }) 232 }) 233 234 // ==================================================================================== 235 // Part 3: XRPC HTTP Endpoints 236 // ==================================================================================== 237 t.Run("3. XRPC HTTP Endpoints", func(t *testing.T) { 238 239 t.Run("Create via XRPC endpoint", func(t *testing.T) { 240 createReq := map[string]interface{}{ 241 "name": fmt.Sprintf("xrpc-%d", time.Now().UnixNano()), 242 "displayName": "XRPC E2E Test", 243 "description": "Testing true end-to-end flow", 244 "visibility": "public", 245 "createdByDid": instanceDID, 246 "hostedByDid": instanceDID, 247 "allowExternalDiscovery": true, 248 } 249 250 reqBody, _ := json.Marshal(createReq) 251 252 // Step 1: Client POSTs to XRPC endpoint 253 t.Logf("📡 Client → POST /xrpc/social.coves.community.create") 254 resp, err := http.Post( 255 httpServer.URL+"/xrpc/social.coves.community.create", 256 "application/json", 257 bytes.NewBuffer(reqBody), 258 ) 259 if err != nil { 260 t.Fatalf("Failed to POST: %v", err) 261 } 262 defer resp.Body.Close() 263 264 if resp.StatusCode != http.StatusOK { 265 body, _ := io.ReadAll(resp.Body) 266 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 267 } 268 269 var createResp struct { 270 URI string `json:"uri"` 271 CID string `json:"cid"` 272 DID string `json:"did"` 273 Handle string `json:"handle"` 274 } 275 276 json.NewDecoder(resp.Body).Decode(&createResp) 277 278 t.Logf("✅ XRPC response received:") 279 t.Logf(" DID: %s", createResp.DID) 280 t.Logf(" Handle: %s", createResp.Handle) 281 t.Logf(" URI: %s", createResp.URI) 282 283 // Step 2: Simulate firehose consumer picking up the event 284 t.Logf("🔄 Simulating Jetstream consumer indexing...") 285 rkey := extractRKeyFromURI(createResp.URI) 286 event := jetstream.JetstreamEvent{ 287 Did: instanceDID, 288 TimeUS: time.Now().UnixMicro(), 289 Kind: "commit", 290 Commit: &jetstream.CommitEvent{ 291 Rev: "test-rev", 292 Operation: "create", 293 Collection: "social.coves.community.profile", 294 RKey: rkey, 295 Record: map[string]interface{}{ 296 "did": createResp.DID, // Community's DID from response 297 "handle": createResp.Handle, // Community's handle from response 298 "name": createReq["name"], 299 "displayName": createReq["displayName"], 300 "description": createReq["description"], 301 "visibility": createReq["visibility"], 302 "createdBy": createReq["createdByDid"], 303 "hostedBy": createReq["hostedByDid"], 304 "federation": map[string]interface{}{ 305 "allowExternalDiscovery": createReq["allowExternalDiscovery"], 306 }, 307 "createdAt": time.Now().Format(time.RFC3339), 308 }, 309 CID: createResp.CID, 310 }, 311 } 312 consumer.HandleEvent(context.Background(), &event) 313 314 // Step 3: Verify it's indexed in AppView 315 t.Logf("🔍 Querying AppView to verify indexing...") 316 var indexedCommunity communities.Community 317 err = db.QueryRow(` 318 SELECT did, handle, display_name, description 319 FROM communities 320 WHERE did = $1 321 `, createResp.DID).Scan( 322 &indexedCommunity.DID, 323 &indexedCommunity.Handle, 324 &indexedCommunity.DisplayName, 325 &indexedCommunity.Description, 326 ) 327 if err != nil { 328 t.Fatalf("Community not indexed in AppView: %v", err) 329 } 330 331 t.Logf("✅ TRUE E2E FLOW COMPLETE:") 332 t.Logf(" Client → XRPC → PDS → Firehose → AppView ✓") 333 t.Logf(" Indexed community: %s (%s)", indexedCommunity.Handle, indexedCommunity.DisplayName) 334 }) 335 336 t.Run("Get via XRPC endpoint", func(t *testing.T) { 337 // Create a community first (via service, so it's indexed) 338 community := createAndIndexCommunity(t, communityService, consumer, instanceDID) 339 340 // GET via HTTP endpoint 341 resp, err := http.Get(fmt.Sprintf("%s/xrpc/social.coves.community.get?community=%s", 342 httpServer.URL, community.DID)) 343 if err != nil { 344 t.Fatalf("Failed to GET: %v", err) 345 } 346 defer resp.Body.Close() 347 348 if resp.StatusCode != http.StatusOK { 349 body, _ := io.ReadAll(resp.Body) 350 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 351 } 352 353 var getCommunity communities.Community 354 json.NewDecoder(resp.Body).Decode(&getCommunity) 355 356 t.Logf("✅ Retrieved via XRPC HTTP endpoint:") 357 t.Logf(" DID: %s", getCommunity.DID) 358 t.Logf(" DisplayName: %s", getCommunity.DisplayName) 359 360 if getCommunity.DID != community.DID { 361 t.Errorf("DID mismatch: expected %s, got %s", community.DID, getCommunity.DID) 362 } 363 }) 364 365 t.Run("List via XRPC endpoint", func(t *testing.T) { 366 // Create and index multiple communities 367 for i := 0; i < 3; i++ { 368 createAndIndexCommunity(t, communityService, consumer, instanceDID) 369 } 370 371 resp, err := http.Get(fmt.Sprintf("%s/xrpc/social.coves.community.list?limit=10", 372 httpServer.URL)) 373 if err != nil { 374 t.Fatalf("Failed to GET list: %v", err) 375 } 376 defer resp.Body.Close() 377 378 if resp.StatusCode != http.StatusOK { 379 body, _ := io.ReadAll(resp.Body) 380 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 381 } 382 383 var listResp struct { 384 Communities []communities.Community `json:"communities"` 385 Total int `json:"total"` 386 } 387 388 json.NewDecoder(resp.Body).Decode(&listResp) 389 390 t.Logf("✅ Listed %d communities via XRPC", len(listResp.Communities)) 391 392 if len(listResp.Communities) < 3 { 393 t.Errorf("Expected at least 3 communities, got %d", len(listResp.Communities)) 394 } 395 }) 396 397 t.Logf("\n✅ Part 3 Complete: All XRPC HTTP endpoints working ✓") 398 }) 399 400 divider := strings.Repeat("=", 70) 401 t.Logf("\n%s", divider) 402 t.Logf("✅ COMPREHENSIVE E2E TEST COMPLETE!") 403 t.Logf("%s", divider) 404 t.Logf("✓ Write-forward to PDS") 405 t.Logf("✓ Record stored with correct DIDs (community vs instance)") 406 t.Logf("✓ Firehose consumer indexes to AppView") 407 t.Logf("✓ XRPC create endpoint (HTTP)") 408 t.Logf("✓ XRPC get endpoint (HTTP)") 409 t.Logf("✓ XRPC list endpoint (HTTP)") 410 t.Logf("%s", divider) 411} 412 413// Helper: create and index a community (simulates full flow) 414func createAndIndexCommunity(t *testing.T, service communities.Service, consumer *jetstream.CommunityEventConsumer, instanceDID string) *communities.Community { 415 req := communities.CreateCommunityRequest{ 416 Name: fmt.Sprintf("test-%d", time.Now().UnixNano()), 417 DisplayName: "Test Community", 418 Description: "Test", 419 Visibility: "public", 420 CreatedByDID: instanceDID, 421 HostedByDID: instanceDID, 422 AllowExternalDiscovery: true, 423 } 424 425 community, err := service.CreateCommunity(context.Background(), req) 426 if err != nil { 427 t.Fatalf("Failed to create: %v", err) 428 } 429 430 // Fetch from PDS to get full record 431 pdsURL := "http://localhost:3001" 432 collection := "social.coves.community.profile" 433 rkey := extractRKeyFromURI(community.RecordURI) 434 435 pdsResp, _ := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 436 pdsURL, instanceDID, collection, rkey)) 437 defer pdsResp.Body.Close() 438 439 var pdsRecord struct { 440 CID string `json:"cid"` 441 Value map[string]interface{} `json:"value"` 442 } 443 json.NewDecoder(pdsResp.Body).Decode(&pdsRecord) 444 445 // Simulate firehose event 446 event := jetstream.JetstreamEvent{ 447 Did: instanceDID, 448 TimeUS: time.Now().UnixMicro(), 449 Kind: "commit", 450 Commit: &jetstream.CommitEvent{ 451 Rev: "test", 452 Operation: "create", 453 Collection: collection, 454 RKey: rkey, 455 CID: pdsRecord.CID, 456 Record: pdsRecord.Value, 457 }, 458 } 459 460 consumer.HandleEvent(context.Background(), &event) 461 462 return community 463} 464 465func extractRKeyFromURI(uri string) string { 466 // at://did/collection/rkey -> rkey 467 parts := strings.Split(uri, "/") 468 if len(parts) >= 4 { 469 return parts[len(parts)-1] 470 } 471 return "" 472} 473 474// authenticateWithPDS authenticates with the PDS and returns access token and DID 475func authenticateWithPDS(pdsURL, handle, password string) (string, string, error) { 476 // Call com.atproto.server.createSession 477 sessionReq := map[string]string{ 478 "identifier": handle, 479 "password": password, 480 } 481 482 reqBody, _ := json.Marshal(sessionReq) 483 resp, err := http.Post( 484 pdsURL+"/xrpc/com.atproto.server.createSession", 485 "application/json", 486 bytes.NewBuffer(reqBody), 487 ) 488 if err != nil { 489 return "", "", fmt.Errorf("failed to create session: %w", err) 490 } 491 defer resp.Body.Close() 492 493 if resp.StatusCode != http.StatusOK { 494 body, _ := io.ReadAll(resp.Body) 495 return "", "", fmt.Errorf("PDS auth failed (status %d): %s", resp.StatusCode, string(body)) 496 } 497 498 var sessionResp struct { 499 AccessJwt string `json:"accessJwt"` 500 DID string `json:"did"` 501 } 502 503 if err := json.NewDecoder(resp.Body).Decode(&sessionResp); err != nil { 504 return "", "", fmt.Errorf("failed to decode session response: %w", err) 505 } 506 507 return sessionResp.AccessJwt, sessionResp.DID, nil 508}