A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "bytes" 5 "context" 6 "database/sql" 7 "encoding/json" 8 "fmt" 9 "io" 10 "net" 11 "net/http" 12 "net/http/httptest" 13 "os" 14 "strings" 15 "testing" 16 "time" 17 18 "Coves/internal/api/middleware" 19 "Coves/internal/api/routes" 20 "Coves/internal/atproto/identity" 21 "Coves/internal/atproto/jetstream" 22 "Coves/internal/atproto/utils" 23 "Coves/internal/core/communities" 24 "Coves/internal/core/users" 25 "Coves/internal/db/postgres" 26 27 "github.com/go-chi/chi/v5" 28 "github.com/gorilla/websocket" 29 _ "github.com/lib/pq" 30 "github.com/pressly/goose/v3" 31) 32 33// TestCommunity_E2E is a TRUE end-to-end test covering the complete flow: 34// 1. HTTP Endpoint → Service Layer → PDS Account Creation → PDS Record Write 35// 2. PDS → REAL Jetstream Firehose → Consumer → AppView DB (TRUE E2E!) 36// 3. AppView DB → XRPC HTTP Endpoints → Client 37// 38// This test verifies: 39// - V2: Community owns its own PDS account and repository 40// - V2: Record URI points to community's repo (at://community_did/...) 41// - Real Jetstream firehose subscription and event consumption 42// - Complete data flow from HTTP write to HTTP read via real infrastructure 43func TestCommunity_E2E(t *testing.T) { 44 // Skip in short mode since this requires real PDS 45 if testing.Short() { 46 t.Skip("Skipping E2E test in short mode") 47 } 48 49 // Setup test database 50 dbURL := os.Getenv("TEST_DATABASE_URL") 51 if dbURL == "" { 52 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 53 } 54 55 db, err := sql.Open("postgres", dbURL) 56 if err != nil { 57 t.Fatalf("Failed to connect to test database: %v", err) 58 } 59 defer func() { 60 if closeErr := db.Close(); closeErr != nil { 61 t.Logf("Failed to close database: %v", closeErr) 62 } 63 }() 64 65 // Run migrations 66 if dialectErr := goose.SetDialect("postgres"); dialectErr != nil { 67 t.Fatalf("Failed to set goose dialect: %v", dialectErr) 68 } 69 if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil { 70 t.Fatalf("Failed to run migrations: %v", migrateErr) 71 } 72 73 // Check if PDS is running 74 pdsURL := os.Getenv("PDS_URL") 75 if pdsURL == "" { 76 pdsURL = "http://localhost:3001" 77 } 78 79 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 80 if err != nil { 81 t.Skipf("PDS not running at %s: %v", pdsURL, err) 82 } 83 func() { 84 if closeErr := healthResp.Body.Close(); closeErr != nil { 85 t.Logf("Failed to close health response: %v", closeErr) 86 } 87 }() 88 89 // Setup dependencies 90 communityRepo := postgres.NewCommunityRepository(db) 91 92 // Get instance credentials 93 instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE") 94 instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD") 95 if instanceHandle == "" { 96 instanceHandle = "testuser123.local.coves.dev" 97 } 98 if instancePassword == "" { 99 instancePassword = "test-password-123" 100 } 101 102 t.Logf("🔐 Authenticating with PDS as: %s", instanceHandle) 103 104 // Authenticate to get instance DID 105 accessToken, instanceDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword) 106 if err != nil { 107 t.Fatalf("Failed to authenticate with PDS: %v", err) 108 } 109 110 t.Logf("✅ Authenticated - Instance DID: %s", instanceDID) 111 112 // Initialize auth middleware (skipVerify=true for E2E tests) 113 authMiddleware := middleware.NewAtProtoAuthMiddleware(nil, true) 114 115 // V2.0: Extract instance domain for community provisioning 116 var instanceDomain string 117 if strings.HasPrefix(instanceDID, "did:web:") { 118 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 119 } else { 120 // Use .social for testing (not .local - that TLD is disallowed by atProto) 121 instanceDomain = "coves.social" 122 } 123 124 // V2.0: Create user service with REAL identity resolution using local PLC 125 plcURL := os.Getenv("PLC_DIRECTORY_URL") 126 if plcURL == "" { 127 plcURL = "http://localhost:3002" // Local PLC directory 128 } 129 userRepo := postgres.NewUserRepository(db) 130 identityConfig := identity.DefaultConfig() 131 identityConfig.PLCURL = plcURL // Use local PLC for identity resolution 132 identityResolver := identity.NewResolver(db, identityConfig) 133 _ = users.NewUserService(userRepo, identityResolver, pdsURL) // Keep for potential future use 134 t.Logf("✅ Identity resolver configured with local PLC: %s", plcURL) 135 136 // V2.0: Initialize PDS account provisioner (simplified - no DID generator needed!) 137 // PDS handles all DID generation and registration automatically 138 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL) 139 140 // Create service (no longer needs didGen directly - provisioner owns it) 141 communityService := communities.NewCommunityService(communityRepo, pdsURL, instanceDID, instanceDomain, provisioner) 142 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 143 svc.SetPDSAccessToken(accessToken) 144 } 145 146 // Use real identity resolver with local PLC for production-like testing 147 consumer := jetstream.NewCommunityEventConsumer(communityRepo, "did:web:coves.local", true, identityResolver) 148 149 // Setup HTTP server with XRPC routes 150 r := chi.NewRouter() 151 routes.RegisterCommunityRoutes(r, communityService, authMiddleware) 152 httpServer := httptest.NewServer(r) 153 defer httpServer.Close() 154 155 ctx := context.Background() 156 157 // ==================================================================================== 158 // Part 1: Write-Forward to PDS (Service Layer) 159 // ==================================================================================== 160 t.Run("1. Write-Forward to PDS", func(t *testing.T) { 161 // Use shorter names to avoid "Handle too long" errors 162 // atProto handles max: 63 chars, format: name.community.coves.social 163 communityName := fmt.Sprintf("e2e-%d", time.Now().Unix()) 164 165 createReq := communities.CreateCommunityRequest{ 166 Name: communityName, 167 DisplayName: "E2E Test Community", 168 Description: "Testing full E2E flow", 169 Visibility: "public", 170 CreatedByDID: instanceDID, 171 HostedByDID: instanceDID, 172 AllowExternalDiscovery: true, 173 } 174 175 t.Logf("\n📝 Creating community via service: %s", communityName) 176 community, err := communityService.CreateCommunity(ctx, createReq) 177 if err != nil { 178 t.Fatalf("Failed to create community: %v", err) 179 } 180 181 t.Logf("✅ Service returned:") 182 t.Logf(" DID: %s", community.DID) 183 t.Logf(" Handle: %s", community.Handle) 184 t.Logf(" RecordURI: %s", community.RecordURI) 185 t.Logf(" RecordCID: %s", community.RecordCID) 186 187 // Verify DID format 188 if community.DID[:8] != "did:plc:" { 189 t.Errorf("Expected did:plc DID, got: %s", community.DID) 190 } 191 192 // V2: Verify PDS account was created for the community 193 t.Logf("\n🔍 V2: Verifying community PDS account exists...") 194 expectedHandle := fmt.Sprintf("%s.community.%s", communityName, instanceDomain) 195 t.Logf(" Expected handle: %s", expectedHandle) 196 t.Logf(" (Using subdomain: *.community.%s)", instanceDomain) 197 198 accountDID, accountHandle, err := queryPDSAccount(pdsURL, expectedHandle) 199 if err != nil { 200 t.Fatalf("❌ V2: Community PDS account not found: %v", err) 201 } 202 203 t.Logf("✅ V2: Community PDS account exists!") 204 t.Logf(" Account DID: %s", accountDID) 205 t.Logf(" Account Handle: %s", accountHandle) 206 207 // Verify the account DID matches the community DID 208 if accountDID != community.DID { 209 t.Errorf("❌ V2: Account DID mismatch! Community DID: %s, PDS Account DID: %s", 210 community.DID, accountDID) 211 } else { 212 t.Logf("✅ V2: Community DID matches PDS account DID (self-owned repository)") 213 } 214 215 // V2: Verify record exists in PDS (in community's own repository) 216 t.Logf("\n📡 V2: Querying PDS for record in community's repository...") 217 218 collection := "social.coves.community.profile" 219 rkey := utils.ExtractRKeyFromURI(community.RecordURI) 220 221 // V2: Query community's repository (not instance repository!) 222 getRecordURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 223 pdsURL, community.DID, collection, rkey) 224 225 t.Logf(" Querying: at://%s/%s/%s", community.DID, collection, rkey) 226 227 pdsResp, err := http.Get(getRecordURL) 228 if err != nil { 229 t.Fatalf("Failed to query PDS: %v", err) 230 } 231 defer func() { _ = pdsResp.Body.Close() }() 232 233 if pdsResp.StatusCode != http.StatusOK { 234 body, readErr := io.ReadAll(pdsResp.Body) 235 if readErr != nil { 236 t.Fatalf("PDS returned status %d (failed to read body: %v)", pdsResp.StatusCode, readErr) 237 } 238 t.Fatalf("PDS returned status %d: %s", pdsResp.StatusCode, string(body)) 239 } 240 241 var pdsRecord struct { 242 Value map[string]interface{} `json:"value"` 243 URI string `json:"uri"` 244 CID string `json:"cid"` 245 } 246 247 if err := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); err != nil { 248 t.Fatalf("Failed to decode PDS response: %v", err) 249 } 250 251 t.Logf("✅ Record found in PDS!") 252 t.Logf(" URI: %s", pdsRecord.URI) 253 t.Logf(" CID: %s", pdsRecord.CID) 254 255 // Print full record for inspection 256 recordJSON, marshalErr := json.MarshalIndent(pdsRecord.Value, " ", " ") 257 if marshalErr != nil { 258 t.Logf(" Failed to marshal record: %v", marshalErr) 259 } else { 260 t.Logf(" Record value:\n %s", string(recordJSON)) 261 } 262 263 // V2: DID and Handle are NOT in the record - they're resolved from the repository URI 264 // The record should have name, hostedBy, createdBy, etc. but no 'did' or 'handle' fields 265 // This matches Bluesky's app.bsky.actor.profile pattern (no handle in record) 266 // Handles are mutable and resolved from DIDs via PLC, so they shouldn't be stored in immutable records 267 268 // ==================================================================================== 269 // Part 2: TRUE E2E - Real Jetstream Firehose Consumer 270 // ==================================================================================== 271 t.Run("2. Real Jetstream Firehose Consumption", func(t *testing.T) { 272 t.Logf("\n🔄 TRUE E2E: Subscribing to real Jetstream firehose...") 273 274 // Get PDS hostname for Jetstream filtering 275 pdsHostname := strings.TrimPrefix(pdsURL, "http://") 276 pdsHostname = strings.TrimPrefix(pdsHostname, "https://") 277 pdsHostname = strings.Split(pdsHostname, ":")[0] // Remove port 278 279 // Build Jetstream URL with filters 280 // Filter to our PDS and social.coves.community.profile collection 281 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.community.profile", 282 pdsHostname) 283 284 t.Logf(" Jetstream URL: %s", jetstreamURL) 285 t.Logf(" Looking for community DID: %s", community.DID) 286 287 // Channel to receive the event 288 eventChan := make(chan *jetstream.JetstreamEvent, 10) 289 errorChan := make(chan error, 1) 290 done := make(chan bool) 291 292 // Start Jetstream consumer in background 293 go func() { 294 err := subscribeToJetstream(ctx, jetstreamURL, community.DID, consumer, eventChan, errorChan, done) 295 if err != nil { 296 errorChan <- err 297 } 298 }() 299 300 // Wait for event or timeout 301 t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...") 302 303 select { 304 case event := <-eventChan: 305 t.Logf("✅ Received real Jetstream event!") 306 t.Logf(" Event DID: %s", event.Did) 307 t.Logf(" Collection: %s", event.Commit.Collection) 308 t.Logf(" Operation: %s", event.Commit.Operation) 309 t.Logf(" RKey: %s", event.Commit.RKey) 310 311 // Verify it's our community 312 if event.Did != community.DID { 313 t.Errorf("❌ Expected DID %s, got %s", community.DID, event.Did) 314 } 315 316 // Verify indexed in AppView database 317 t.Logf("\n🔍 Querying AppView database...") 318 319 indexed, err := communityRepo.GetByDID(ctx, community.DID) 320 if err != nil { 321 t.Fatalf("Community not indexed in AppView: %v", err) 322 } 323 324 t.Logf("✅ Community indexed in AppView:") 325 t.Logf(" DID: %s", indexed.DID) 326 t.Logf(" Handle: %s", indexed.Handle) 327 t.Logf(" DisplayName: %s", indexed.DisplayName) 328 t.Logf(" RecordURI: %s", indexed.RecordURI) 329 330 // V2: Verify record_uri points to COMMUNITY's own repo 331 expectedURIPrefix := "at://" + community.DID 332 if !strings.HasPrefix(indexed.RecordURI, expectedURIPrefix) { 333 t.Errorf("❌ V2: record_uri should point to community's repo\n Expected prefix: %s\n Got: %s", 334 expectedURIPrefix, indexed.RecordURI) 335 } else { 336 t.Logf("✅ V2: Record URI correctly points to community's own repository") 337 } 338 339 // Signal to stop Jetstream consumer 340 close(done) 341 342 case err := <-errorChan: 343 t.Fatalf("❌ Jetstream error: %v", err) 344 345 case <-time.After(30 * time.Second): 346 t.Fatalf("❌ Timeout: No Jetstream event received within 30 seconds") 347 } 348 349 t.Logf("\n✅ Part 2 Complete: TRUE E2E - PDS → Jetstream → Consumer → AppView ✓") 350 }) 351 }) 352 353 // ==================================================================================== 354 // Part 3: XRPC HTTP Endpoints 355 // ==================================================================================== 356 t.Run("3. XRPC HTTP Endpoints", func(t *testing.T) { 357 t.Run("Create via XRPC endpoint", func(t *testing.T) { 358 // Use Unix timestamp (seconds) instead of UnixNano to keep handle short 359 // NOTE: Both createdByDid and hostedByDid are derived server-side: 360 // - createdByDid: from JWT token (authenticated user) 361 // - hostedByDid: from instance configuration (security: prevents spoofing) 362 createReq := map[string]interface{}{ 363 "name": fmt.Sprintf("xrpc-%d", time.Now().Unix()), 364 "displayName": "XRPC E2E Test", 365 "description": "Testing true end-to-end flow", 366 "visibility": "public", 367 "allowExternalDiscovery": true, 368 } 369 370 reqBody, marshalErr := json.Marshal(createReq) 371 if marshalErr != nil { 372 t.Fatalf("Failed to marshal request: %v", marshalErr) 373 } 374 375 // Step 1: Client POSTs to XRPC endpoint with JWT authentication 376 t.Logf("📡 Client → POST /xrpc/social.coves.community.create") 377 t.Logf(" Request: %s", string(reqBody)) 378 379 req, err := http.NewRequest(http.MethodPost, 380 httpServer.URL+"/xrpc/social.coves.community.create", 381 bytes.NewBuffer(reqBody)) 382 if err != nil { 383 t.Fatalf("Failed to create request: %v", err) 384 } 385 req.Header.Set("Content-Type", "application/json") 386 // Use real PDS access token for E2E authentication 387 req.Header.Set("Authorization", "Bearer "+accessToken) 388 389 resp, err := http.DefaultClient.Do(req) 390 if err != nil { 391 t.Fatalf("Failed to POST: %v", err) 392 } 393 defer func() { _ = resp.Body.Close() }() 394 395 if resp.StatusCode != http.StatusOK { 396 body, readErr := io.ReadAll(resp.Body) 397 if readErr != nil { 398 t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr) 399 } 400 t.Logf("❌ XRPC Create Failed") 401 t.Logf(" Status: %d", resp.StatusCode) 402 t.Logf(" Response: %s", string(body)) 403 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 404 } 405 406 var createResp struct { 407 URI string `json:"uri"` 408 CID string `json:"cid"` 409 DID string `json:"did"` 410 Handle string `json:"handle"` 411 } 412 413 if err := json.NewDecoder(resp.Body).Decode(&createResp); err != nil { 414 t.Fatalf("Failed to decode create response: %v", err) 415 } 416 417 t.Logf("✅ XRPC response received:") 418 t.Logf(" DID: %s", createResp.DID) 419 t.Logf(" Handle: %s", createResp.Handle) 420 t.Logf(" URI: %s", createResp.URI) 421 422 // Step 2: Simulate firehose consumer picking up the event 423 // NOTE: Using synthetic event for speed. Real Jetstream WebSocket testing 424 // happens in "Part 2: Real Jetstream Firehose Consumption" above. 425 t.Logf("🔄 Simulating Jetstream consumer indexing...") 426 rkey := utils.ExtractRKeyFromURI(createResp.URI) 427 // V2: Event comes from community's DID (community owns the repo) 428 event := jetstream.JetstreamEvent{ 429 Did: createResp.DID, 430 TimeUS: time.Now().UnixMicro(), 431 Kind: "commit", 432 Commit: &jetstream.CommitEvent{ 433 Rev: "test-rev", 434 Operation: "create", 435 Collection: "social.coves.community.profile", 436 RKey: rkey, 437 Record: map[string]interface{}{ 438 // Note: No 'did' or 'handle' in record (atProto best practice) 439 // These are mutable and resolved from DIDs, not stored in immutable records 440 "name": createReq["name"], 441 "displayName": createReq["displayName"], 442 "description": createReq["description"], 443 "visibility": createReq["visibility"], 444 // Server-side derives these from JWT auth (instanceDID is the authenticated user) 445 "owner": instanceDID, 446 "createdBy": instanceDID, 447 "hostedBy": instanceDID, 448 "federation": map[string]interface{}{ 449 "allowExternalDiscovery": createReq["allowExternalDiscovery"], 450 }, 451 "createdAt": time.Now().Format(time.RFC3339), 452 }, 453 CID: createResp.CID, 454 }, 455 } 456 if handleErr := consumer.HandleEvent(context.Background(), &event); handleErr != nil { 457 t.Logf("Warning: failed to handle event: %v", handleErr) 458 } 459 460 // Step 3: Verify it's indexed in AppView 461 t.Logf("🔍 Querying AppView to verify indexing...") 462 var indexedCommunity communities.Community 463 err = db.QueryRow(` 464 SELECT did, handle, display_name, description 465 FROM communities 466 WHERE did = $1 467 `, createResp.DID).Scan( 468 &indexedCommunity.DID, 469 &indexedCommunity.Handle, 470 &indexedCommunity.DisplayName, 471 &indexedCommunity.Description, 472 ) 473 if err != nil { 474 t.Fatalf("Community not indexed in AppView: %v", err) 475 } 476 477 t.Logf("✅ TRUE E2E FLOW COMPLETE:") 478 t.Logf(" Client → XRPC → PDS → Firehose → AppView ✓") 479 t.Logf(" Indexed community: %s (%s)", indexedCommunity.Handle, indexedCommunity.DisplayName) 480 }) 481 482 t.Run("Get via XRPC endpoint", func(t *testing.T) { 483 // Create a community first (via service, so it's indexed) 484 community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL) 485 486 // GET via HTTP endpoint 487 resp, err := http.Get(fmt.Sprintf("%s/xrpc/social.coves.community.get?community=%s", 488 httpServer.URL, community.DID)) 489 if err != nil { 490 t.Fatalf("Failed to GET: %v", err) 491 } 492 defer func() { _ = resp.Body.Close() }() 493 494 if resp.StatusCode != http.StatusOK { 495 body, readErr := io.ReadAll(resp.Body) 496 if readErr != nil { 497 t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr) 498 } 499 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 500 } 501 502 var getCommunity communities.Community 503 if err := json.NewDecoder(resp.Body).Decode(&getCommunity); err != nil { 504 t.Fatalf("Failed to decode get response: %v", err) 505 } 506 507 t.Logf("Retrieved via XRPC HTTP endpoint:") 508 t.Logf(" DID: %s", getCommunity.DID) 509 t.Logf(" DisplayName: %s", getCommunity.DisplayName) 510 511 if getCommunity.DID != community.DID { 512 t.Errorf("DID mismatch: expected %s, got %s", community.DID, getCommunity.DID) 513 } 514 }) 515 516 t.Run("List via XRPC endpoint", func(t *testing.T) { 517 // Create and index multiple communities 518 for i := 0; i < 3; i++ { 519 createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL) 520 } 521 522 resp, err := http.Get(fmt.Sprintf("%s/xrpc/social.coves.community.list?limit=10", 523 httpServer.URL)) 524 if err != nil { 525 t.Fatalf("Failed to GET list: %v", err) 526 } 527 defer func() { _ = resp.Body.Close() }() 528 529 if resp.StatusCode != http.StatusOK { 530 body, readErr := io.ReadAll(resp.Body) 531 if readErr != nil { 532 t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr) 533 } 534 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 535 } 536 537 var listResp struct { 538 Communities []communities.Community `json:"communities"` 539 Total int `json:"total"` 540 } 541 542 if err := json.NewDecoder(resp.Body).Decode(&listResp); err != nil { 543 t.Fatalf("Failed to decode list response: %v", err) 544 } 545 546 t.Logf("✅ Listed %d communities via XRPC", len(listResp.Communities)) 547 548 if len(listResp.Communities) < 3 { 549 t.Errorf("Expected at least 3 communities, got %d", len(listResp.Communities)) 550 } 551 }) 552 553 t.Run("Subscribe via XRPC endpoint", func(t *testing.T) { 554 // Create a community to subscribe to 555 community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL) 556 557 // Get initial subscriber count 558 initialCommunity, err := communityRepo.GetByDID(ctx, community.DID) 559 if err != nil { 560 t.Fatalf("Failed to get initial community state: %v", err) 561 } 562 initialSubscriberCount := initialCommunity.SubscriberCount 563 t.Logf("Initial subscriber count: %d", initialSubscriberCount) 564 565 // Subscribe to the community with contentVisibility=5 (test max visibility) 566 // NOTE: HTTP API uses "community" field, but atProto record uses "subject" internally 567 subscribeReq := map[string]interface{}{ 568 "community": community.DID, 569 "contentVisibility": 5, // Test with max visibility 570 } 571 572 reqBody, marshalErr := json.Marshal(subscribeReq) 573 if marshalErr != nil { 574 t.Fatalf("Failed to marshal subscribe request: %v", marshalErr) 575 } 576 577 // POST subscribe request 578 t.Logf("📡 Client → POST /xrpc/social.coves.community.subscribe") 579 t.Logf(" Subscribing to community: %s", community.DID) 580 581 req, err := http.NewRequest(http.MethodPost, 582 httpServer.URL+"/xrpc/social.coves.community.subscribe", 583 bytes.NewBuffer(reqBody)) 584 if err != nil { 585 t.Fatalf("Failed to create request: %v", err) 586 } 587 req.Header.Set("Content-Type", "application/json") 588 // Use real PDS access token for E2E authentication 589 req.Header.Set("Authorization", "Bearer "+accessToken) 590 591 resp, err := http.DefaultClient.Do(req) 592 if err != nil { 593 t.Fatalf("Failed to POST subscribe: %v", err) 594 } 595 defer func() { _ = resp.Body.Close() }() 596 597 if resp.StatusCode != http.StatusOK { 598 body, readErr := io.ReadAll(resp.Body) 599 if readErr != nil { 600 t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr) 601 } 602 t.Logf("❌ XRPC Subscribe Failed") 603 t.Logf(" Status: %d", resp.StatusCode) 604 t.Logf(" Response: %s", string(body)) 605 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 606 } 607 608 var subscribeResp struct { 609 URI string `json:"uri"` 610 CID string `json:"cid"` 611 Existing bool `json:"existing"` 612 } 613 614 if err := json.NewDecoder(resp.Body).Decode(&subscribeResp); err != nil { 615 t.Fatalf("Failed to decode subscribe response: %v", err) 616 } 617 618 t.Logf("✅ XRPC subscribe response received:") 619 t.Logf(" URI: %s", subscribeResp.URI) 620 t.Logf(" CID: %s", subscribeResp.CID) 621 t.Logf(" Existing: %v", subscribeResp.Existing) 622 623 // Verify the subscription was written to PDS (in user's repository) 624 t.Logf("🔍 Verifying subscription record on PDS...") 625 pdsURL := os.Getenv("PDS_URL") 626 if pdsURL == "" { 627 pdsURL = "http://localhost:3001" 628 } 629 630 rkey := utils.ExtractRKeyFromURI(subscribeResp.URI) 631 // CRITICAL: Use correct collection name (record type, not XRPC endpoint) 632 collection := "social.coves.community.subscription" 633 634 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 635 pdsURL, instanceDID, collection, rkey)) 636 if pdsErr != nil { 637 t.Fatalf("Failed to fetch subscription record from PDS: %v", pdsErr) 638 } 639 defer func() { 640 if closeErr := pdsResp.Body.Close(); closeErr != nil { 641 t.Logf("Failed to close PDS response: %v", closeErr) 642 } 643 }() 644 645 if pdsResp.StatusCode != http.StatusOK { 646 body, _ := io.ReadAll(pdsResp.Body) 647 t.Fatalf("Subscription record not found on PDS: status %d, body: %s", pdsResp.StatusCode, string(body)) 648 } 649 650 var pdsRecord struct { 651 Value map[string]interface{} `json:"value"` 652 } 653 if decodeErr := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); decodeErr != nil { 654 t.Fatalf("Failed to decode PDS record: %v", decodeErr) 655 } 656 657 t.Logf("✅ Subscription record found on PDS:") 658 t.Logf(" Subject (community): %v", pdsRecord.Value["subject"]) 659 t.Logf(" ContentVisibility: %v", pdsRecord.Value["contentVisibility"]) 660 661 // Verify the subject (community) DID matches 662 if pdsRecord.Value["subject"] != community.DID { 663 t.Errorf("Community DID mismatch: expected %s, got %v", community.DID, pdsRecord.Value["subject"]) 664 } 665 666 // Verify contentVisibility was stored correctly 667 if cv, ok := pdsRecord.Value["contentVisibility"].(float64); ok { 668 if int(cv) != 5 { 669 t.Errorf("ContentVisibility mismatch: expected 5, got %v", cv) 670 } 671 } else { 672 t.Errorf("ContentVisibility not found or wrong type in PDS record") 673 } 674 675 // CRITICAL: Simulate Jetstream consumer indexing the subscription 676 // This is the MISSING PIECE - we need to verify the firehose event gets indexed 677 t.Logf("🔄 Simulating Jetstream consumer indexing subscription...") 678 subEvent := jetstream.JetstreamEvent{ 679 Did: instanceDID, 680 TimeUS: time.Now().UnixMicro(), 681 Kind: "commit", 682 Commit: &jetstream.CommitEvent{ 683 Rev: "test-sub-rev", 684 Operation: "create", 685 Collection: "social.coves.community.subscription", // CORRECT collection 686 RKey: rkey, 687 CID: subscribeResp.CID, 688 Record: map[string]interface{}{ 689 "$type": "social.coves.community.subscription", 690 "subject": community.DID, 691 "contentVisibility": float64(5), // JSON numbers are float64 692 "createdAt": time.Now().Format(time.RFC3339), 693 }, 694 }, 695 } 696 if handleErr := consumer.HandleEvent(context.Background(), &subEvent); handleErr != nil { 697 t.Fatalf("Failed to handle subscription event: %v", handleErr) 698 } 699 700 // Verify subscription was indexed in AppView 701 t.Logf("🔍 Verifying subscription indexed in AppView...") 702 indexedSub, err := communityRepo.GetSubscription(ctx, instanceDID, community.DID) 703 if err != nil { 704 t.Fatalf("Subscription not indexed in AppView: %v", err) 705 } 706 707 t.Logf("✅ Subscription indexed in AppView:") 708 t.Logf(" User: %s", indexedSub.UserDID) 709 t.Logf(" Community: %s", indexedSub.CommunityDID) 710 t.Logf(" ContentVisibility: %d", indexedSub.ContentVisibility) 711 t.Logf(" RecordURI: %s", indexedSub.RecordURI) 712 713 // Verify contentVisibility was indexed correctly 714 if indexedSub.ContentVisibility != 5 { 715 t.Errorf("ContentVisibility not indexed correctly: expected 5, got %d", indexedSub.ContentVisibility) 716 } 717 718 // Verify subscriber count was incremented 719 t.Logf("🔍 Verifying subscriber count incremented...") 720 updatedCommunity, err := communityRepo.GetByDID(ctx, community.DID) 721 if err != nil { 722 t.Fatalf("Failed to get updated community: %v", err) 723 } 724 725 expectedCount := initialSubscriberCount + 1 726 if updatedCommunity.SubscriberCount != expectedCount { 727 t.Errorf("Subscriber count not incremented: expected %d, got %d", 728 expectedCount, updatedCommunity.SubscriberCount) 729 } else { 730 t.Logf("✅ Subscriber count incremented: %d → %d", 731 initialSubscriberCount, updatedCommunity.SubscriberCount) 732 } 733 734 t.Logf("✅ TRUE E2E SUBSCRIBE FLOW COMPLETE:") 735 t.Logf(" Client → XRPC Subscribe → PDS (user repo) → Firehose → Consumer → AppView ✓") 736 t.Logf(" ✓ Subscription written to PDS") 737 t.Logf(" ✓ Subscription indexed in AppView") 738 t.Logf(" ✓ ContentVisibility stored and indexed correctly (5)") 739 t.Logf(" ✓ Subscriber count incremented") 740 }) 741 742 t.Run("Unsubscribe via XRPC endpoint", func(t *testing.T) { 743 // Create a community and subscribe to it first 744 community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL) 745 746 // Get initial subscriber count 747 initialCommunity, err := communityRepo.GetByDID(ctx, community.DID) 748 if err != nil { 749 t.Fatalf("Failed to get initial community state: %v", err) 750 } 751 initialSubscriberCount := initialCommunity.SubscriberCount 752 t.Logf("Initial subscriber count: %d", initialSubscriberCount) 753 754 // Subscribe first (using instance access token for instance user, with contentVisibility=3) 755 subscription, err := communityService.SubscribeToCommunity(ctx, instanceDID, accessToken, community.DID, 3) 756 if err != nil { 757 t.Fatalf("Failed to subscribe: %v", err) 758 } 759 760 // Index the subscription in AppView (simulate firehose event) 761 rkey := utils.ExtractRKeyFromURI(subscription.RecordURI) 762 subEvent := jetstream.JetstreamEvent{ 763 Did: instanceDID, 764 TimeUS: time.Now().UnixMicro(), 765 Kind: "commit", 766 Commit: &jetstream.CommitEvent{ 767 Rev: "test-sub-rev", 768 Operation: "create", 769 Collection: "social.coves.community.subscription", // CORRECT collection 770 RKey: rkey, 771 CID: subscription.RecordCID, 772 Record: map[string]interface{}{ 773 "$type": "social.coves.community.subscription", 774 "subject": community.DID, 775 "contentVisibility": float64(3), 776 "createdAt": time.Now().Format(time.RFC3339), 777 }, 778 }, 779 } 780 if handleErr := consumer.HandleEvent(context.Background(), &subEvent); handleErr != nil { 781 t.Fatalf("Failed to handle subscription event: %v", handleErr) 782 } 783 784 // Verify subscription was indexed 785 _, err = communityRepo.GetSubscription(ctx, instanceDID, community.DID) 786 if err != nil { 787 t.Fatalf("Subscription not indexed: %v", err) 788 } 789 790 // Verify subscriber count incremented 791 midCommunity, err := communityRepo.GetByDID(ctx, community.DID) 792 if err != nil { 793 t.Fatalf("Failed to get community after subscribe: %v", err) 794 } 795 if midCommunity.SubscriberCount != initialSubscriberCount+1 { 796 t.Errorf("Subscriber count not incremented after subscribe: expected %d, got %d", 797 initialSubscriberCount+1, midCommunity.SubscriberCount) 798 } 799 800 t.Logf("📝 Subscription created and indexed: %s", subscription.RecordURI) 801 802 // Now unsubscribe via XRPC endpoint 803 unsubscribeReq := map[string]interface{}{ 804 "community": community.DID, 805 } 806 807 reqBody, marshalErr := json.Marshal(unsubscribeReq) 808 if marshalErr != nil { 809 t.Fatalf("Failed to marshal unsubscribe request: %v", marshalErr) 810 } 811 812 // POST unsubscribe request 813 t.Logf("📡 Client → POST /xrpc/social.coves.community.unsubscribe") 814 t.Logf(" Unsubscribing from community: %s", community.DID) 815 816 req, err := http.NewRequest(http.MethodPost, 817 httpServer.URL+"/xrpc/social.coves.community.unsubscribe", 818 bytes.NewBuffer(reqBody)) 819 if err != nil { 820 t.Fatalf("Failed to create request: %v", err) 821 } 822 req.Header.Set("Content-Type", "application/json") 823 // Use real PDS access token for E2E authentication 824 req.Header.Set("Authorization", "Bearer "+accessToken) 825 826 resp, err := http.DefaultClient.Do(req) 827 if err != nil { 828 t.Fatalf("Failed to POST unsubscribe: %v", err) 829 } 830 defer func() { _ = resp.Body.Close() }() 831 832 if resp.StatusCode != http.StatusOK { 833 body, readErr := io.ReadAll(resp.Body) 834 if readErr != nil { 835 t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr) 836 } 837 t.Logf("❌ XRPC Unsubscribe Failed") 838 t.Logf(" Status: %d", resp.StatusCode) 839 t.Logf(" Response: %s", string(body)) 840 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 841 } 842 843 var unsubscribeResp struct { 844 Success bool `json:"success"` 845 } 846 847 if err := json.NewDecoder(resp.Body).Decode(&unsubscribeResp); err != nil { 848 t.Fatalf("Failed to decode unsubscribe response: %v", err) 849 } 850 851 t.Logf("✅ XRPC unsubscribe response received:") 852 t.Logf(" Success: %v", unsubscribeResp.Success) 853 854 if !unsubscribeResp.Success { 855 t.Errorf("Expected success: true, got: %v", unsubscribeResp.Success) 856 } 857 858 // Verify the subscription record was deleted from PDS 859 t.Logf("🔍 Verifying subscription record deleted from PDS...") 860 pdsURL := os.Getenv("PDS_URL") 861 if pdsURL == "" { 862 pdsURL = "http://localhost:3001" 863 } 864 865 // CRITICAL: Use correct collection name (record type, not XRPC endpoint) 866 collection := "social.coves.community.subscription" 867 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 868 pdsURL, instanceDID, collection, rkey)) 869 if pdsErr != nil { 870 t.Fatalf("Failed to query PDS: %v", pdsErr) 871 } 872 defer func() { 873 if closeErr := pdsResp.Body.Close(); closeErr != nil { 874 t.Logf("Failed to close PDS response: %v", closeErr) 875 } 876 }() 877 878 // Should return 404 since record was deleted 879 if pdsResp.StatusCode == http.StatusOK { 880 t.Errorf("❌ Subscription record still exists on PDS (expected 404, got 200)") 881 } else { 882 t.Logf("✅ Subscription record successfully deleted from PDS (status: %d)", pdsResp.StatusCode) 883 } 884 885 // CRITICAL: Simulate Jetstream consumer indexing the DELETE event 886 t.Logf("🔄 Simulating Jetstream consumer indexing DELETE event...") 887 deleteEvent := jetstream.JetstreamEvent{ 888 Did: instanceDID, 889 TimeUS: time.Now().UnixMicro(), 890 Kind: "commit", 891 Commit: &jetstream.CommitEvent{ 892 Rev: "test-unsub-rev", 893 Operation: "delete", 894 Collection: "social.coves.community.subscription", 895 RKey: rkey, 896 CID: "", // No CID on deletes 897 Record: nil, // No record data on deletes 898 }, 899 } 900 if handleErr := consumer.HandleEvent(context.Background(), &deleteEvent); handleErr != nil { 901 t.Fatalf("Failed to handle delete event: %v", handleErr) 902 } 903 904 // Verify subscription was removed from AppView 905 t.Logf("🔍 Verifying subscription removed from AppView...") 906 _, err = communityRepo.GetSubscription(ctx, instanceDID, community.DID) 907 if err == nil { 908 t.Errorf("❌ Subscription still exists in AppView (should be deleted)") 909 } else if !communities.IsNotFound(err) { 910 t.Fatalf("Unexpected error querying subscription: %v", err) 911 } else { 912 t.Logf("✅ Subscription removed from AppView") 913 } 914 915 // Verify subscriber count was decremented 916 t.Logf("🔍 Verifying subscriber count decremented...") 917 finalCommunity, err := communityRepo.GetByDID(ctx, community.DID) 918 if err != nil { 919 t.Fatalf("Failed to get final community state: %v", err) 920 } 921 922 if finalCommunity.SubscriberCount != initialSubscriberCount { 923 t.Errorf("Subscriber count not decremented: expected %d, got %d", 924 initialSubscriberCount, finalCommunity.SubscriberCount) 925 } else { 926 t.Logf("✅ Subscriber count decremented: %d → %d", 927 initialSubscriberCount+1, finalCommunity.SubscriberCount) 928 } 929 930 t.Logf("✅ TRUE E2E UNSUBSCRIBE FLOW COMPLETE:") 931 t.Logf(" Client → XRPC Unsubscribe → PDS Delete → Firehose → Consumer → AppView ✓") 932 t.Logf(" ✓ Subscription deleted from PDS") 933 t.Logf(" ✓ Subscription removed from AppView") 934 t.Logf(" ✓ Subscriber count decremented") 935 }) 936 937 t.Run("Block via XRPC endpoint", func(t *testing.T) { 938 // Create a community to block 939 community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL) 940 941 t.Logf("🚫 Blocking community via XRPC endpoint...") 942 blockReq := map[string]interface{}{ 943 "community": community.DID, 944 } 945 946 blockJSON, err := json.Marshal(blockReq) 947 if err != nil { 948 t.Fatalf("Failed to marshal block request: %v", err) 949 } 950 951 req, err := http.NewRequest("POST", httpServer.URL+"/xrpc/social.coves.community.blockCommunity", bytes.NewBuffer(blockJSON)) 952 if err != nil { 953 t.Fatalf("Failed to create block request: %v", err) 954 } 955 req.Header.Set("Content-Type", "application/json") 956 req.Header.Set("Authorization", "Bearer "+accessToken) 957 958 resp, err := http.DefaultClient.Do(req) 959 if err != nil { 960 t.Fatalf("Failed to POST block: %v", err) 961 } 962 defer func() { _ = resp.Body.Close() }() 963 964 if resp.StatusCode != http.StatusOK { 965 body, readErr := io.ReadAll(resp.Body) 966 if readErr != nil { 967 t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr) 968 } 969 t.Logf("❌ XRPC Block Failed") 970 t.Logf(" Status: %d", resp.StatusCode) 971 t.Logf(" Response: %s", string(body)) 972 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 973 } 974 975 var blockResp struct { 976 Block struct { 977 RecordURI string `json:"recordUri"` 978 RecordCID string `json:"recordCid"` 979 } `json:"block"` 980 } 981 982 if err := json.NewDecoder(resp.Body).Decode(&blockResp); err != nil { 983 t.Fatalf("Failed to decode block response: %v", err) 984 } 985 986 t.Logf("✅ XRPC block response received:") 987 t.Logf(" RecordURI: %s", blockResp.Block.RecordURI) 988 t.Logf(" RecordCID: %s", blockResp.Block.RecordCID) 989 990 // Extract rkey from URI for verification 991 rkey := "" 992 if uriParts := strings.Split(blockResp.Block.RecordURI, "/"); len(uriParts) >= 4 { 993 rkey = uriParts[len(uriParts)-1] 994 } 995 996 // Verify the block record exists on PDS 997 t.Logf("🔍 Verifying block record exists on PDS...") 998 collection := "social.coves.community.block" 999 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 1000 pdsURL, instanceDID, collection, rkey)) 1001 if pdsErr != nil { 1002 t.Fatalf("Failed to query PDS: %v", pdsErr) 1003 } 1004 defer func() { 1005 if closeErr := pdsResp.Body.Close(); closeErr != nil { 1006 t.Logf("Failed to close PDS response: %v", closeErr) 1007 } 1008 }() 1009 1010 if pdsResp.StatusCode != http.StatusOK { 1011 body, readErr := io.ReadAll(pdsResp.Body) 1012 if readErr != nil { 1013 t.Fatalf("Block record not found on PDS (status: %d, failed to read body: %v)", pdsResp.StatusCode, readErr) 1014 } 1015 t.Fatalf("Block record not found on PDS (status: %d): %s", pdsResp.StatusCode, string(body)) 1016 } 1017 t.Logf("✅ Block record exists on PDS") 1018 1019 // CRITICAL: Simulate Jetstream consumer indexing the block 1020 t.Logf("🔄 Simulating Jetstream consumer indexing block event...") 1021 blockEvent := jetstream.JetstreamEvent{ 1022 Did: instanceDID, 1023 TimeUS: time.Now().UnixMicro(), 1024 Kind: "commit", 1025 Commit: &jetstream.CommitEvent{ 1026 Rev: "test-block-rev", 1027 Operation: "create", 1028 Collection: "social.coves.community.block", 1029 RKey: rkey, 1030 CID: blockResp.Block.RecordCID, 1031 Record: map[string]interface{}{ 1032 "subject": community.DID, 1033 "createdAt": time.Now().Format(time.RFC3339), 1034 }, 1035 }, 1036 } 1037 if handleErr := consumer.HandleEvent(context.Background(), &blockEvent); handleErr != nil { 1038 t.Fatalf("Failed to handle block event: %v", handleErr) 1039 } 1040 1041 // Verify block was indexed in AppView 1042 t.Logf("🔍 Verifying block indexed in AppView...") 1043 block, err := communityRepo.GetBlock(ctx, instanceDID, community.DID) 1044 if err != nil { 1045 t.Fatalf("Failed to get block from AppView: %v", err) 1046 } 1047 if block.RecordURI != blockResp.Block.RecordURI { 1048 t.Errorf("RecordURI mismatch: expected %s, got %s", blockResp.Block.RecordURI, block.RecordURI) 1049 } 1050 1051 t.Logf("✅ TRUE E2E BLOCK FLOW COMPLETE:") 1052 t.Logf(" Client → XRPC Block → PDS Create → Firehose → Consumer → AppView ✓") 1053 t.Logf(" ✓ Block record created on PDS") 1054 t.Logf(" ✓ Block indexed in AppView") 1055 }) 1056 1057 t.Run("Unblock via XRPC endpoint", func(t *testing.T) { 1058 // Create a community and block it first 1059 community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL) 1060 1061 // Block the community 1062 t.Logf("🚫 Blocking community first...") 1063 blockReq := map[string]interface{}{ 1064 "community": community.DID, 1065 } 1066 blockJSON, err := json.Marshal(blockReq) 1067 if err != nil { 1068 t.Fatalf("Failed to marshal block request: %v", err) 1069 } 1070 1071 blockHttpReq, err := http.NewRequest("POST", httpServer.URL+"/xrpc/social.coves.community.blockCommunity", bytes.NewBuffer(blockJSON)) 1072 if err != nil { 1073 t.Fatalf("Failed to create block request: %v", err) 1074 } 1075 blockHttpReq.Header.Set("Content-Type", "application/json") 1076 blockHttpReq.Header.Set("Authorization", "Bearer "+accessToken) 1077 1078 blockResp, err := http.DefaultClient.Do(blockHttpReq) 1079 if err != nil { 1080 t.Fatalf("Failed to POST block: %v", err) 1081 } 1082 1083 var blockRespData struct { 1084 Block struct { 1085 RecordURI string `json:"recordUri"` 1086 } `json:"block"` 1087 } 1088 if err := json.NewDecoder(blockResp.Body).Decode(&blockRespData); err != nil { 1089 func() { _ = blockResp.Body.Close() }() 1090 t.Fatalf("Failed to decode block response: %v", err) 1091 } 1092 func() { _ = blockResp.Body.Close() }() 1093 1094 rkey := "" 1095 if uriParts := strings.Split(blockRespData.Block.RecordURI, "/"); len(uriParts) >= 4 { 1096 rkey = uriParts[len(uriParts)-1] 1097 } 1098 1099 // Index the block via consumer 1100 blockEvent := jetstream.JetstreamEvent{ 1101 Did: instanceDID, 1102 TimeUS: time.Now().UnixMicro(), 1103 Kind: "commit", 1104 Commit: &jetstream.CommitEvent{ 1105 Rev: "test-block-rev", 1106 Operation: "create", 1107 Collection: "social.coves.community.block", 1108 RKey: rkey, 1109 CID: "test-block-cid", 1110 Record: map[string]interface{}{ 1111 "subject": community.DID, 1112 "createdAt": time.Now().Format(time.RFC3339), 1113 }, 1114 }, 1115 } 1116 if handleErr := consumer.HandleEvent(context.Background(), &blockEvent); handleErr != nil { 1117 t.Fatalf("Failed to handle block event: %v", handleErr) 1118 } 1119 1120 // Now unblock the community 1121 t.Logf("✅ Unblocking community via XRPC endpoint...") 1122 unblockReq := map[string]interface{}{ 1123 "community": community.DID, 1124 } 1125 1126 unblockJSON, err := json.Marshal(unblockReq) 1127 if err != nil { 1128 t.Fatalf("Failed to marshal unblock request: %v", err) 1129 } 1130 1131 req, err := http.NewRequest("POST", httpServer.URL+"/xrpc/social.coves.community.unblockCommunity", bytes.NewBuffer(unblockJSON)) 1132 if err != nil { 1133 t.Fatalf("Failed to create unblock request: %v", err) 1134 } 1135 req.Header.Set("Content-Type", "application/json") 1136 req.Header.Set("Authorization", "Bearer "+accessToken) 1137 1138 resp, err := http.DefaultClient.Do(req) 1139 if err != nil { 1140 t.Fatalf("Failed to POST unblock: %v", err) 1141 } 1142 defer func() { _ = resp.Body.Close() }() 1143 1144 if resp.StatusCode != http.StatusOK { 1145 body, readErr := io.ReadAll(resp.Body) 1146 if readErr != nil { 1147 t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr) 1148 } 1149 t.Logf("❌ XRPC Unblock Failed") 1150 t.Logf(" Status: %d", resp.StatusCode) 1151 t.Logf(" Response: %s", string(body)) 1152 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 1153 } 1154 1155 var unblockResp struct { 1156 Success bool `json:"success"` 1157 } 1158 1159 if err := json.NewDecoder(resp.Body).Decode(&unblockResp); err != nil { 1160 t.Fatalf("Failed to decode unblock response: %v", err) 1161 } 1162 1163 if !unblockResp.Success { 1164 t.Errorf("Expected success: true, got: %v", unblockResp.Success) 1165 } 1166 1167 // Verify the block record was deleted from PDS 1168 t.Logf("🔍 Verifying block record deleted from PDS...") 1169 collection := "social.coves.community.block" 1170 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 1171 pdsURL, instanceDID, collection, rkey)) 1172 if pdsErr != nil { 1173 t.Fatalf("Failed to query PDS: %v", pdsErr) 1174 } 1175 defer func() { 1176 if closeErr := pdsResp.Body.Close(); closeErr != nil { 1177 t.Logf("Failed to close PDS response: %v", closeErr) 1178 } 1179 }() 1180 1181 if pdsResp.StatusCode == http.StatusOK { 1182 t.Errorf("❌ Block record still exists on PDS (expected 404, got 200)") 1183 } else { 1184 t.Logf("✅ Block record successfully deleted from PDS (status: %d)", pdsResp.StatusCode) 1185 } 1186 1187 // CRITICAL: Simulate Jetstream consumer indexing the DELETE event 1188 t.Logf("🔄 Simulating Jetstream consumer indexing DELETE event...") 1189 deleteEvent := jetstream.JetstreamEvent{ 1190 Did: instanceDID, 1191 TimeUS: time.Now().UnixMicro(), 1192 Kind: "commit", 1193 Commit: &jetstream.CommitEvent{ 1194 Rev: "test-unblock-rev", 1195 Operation: "delete", 1196 Collection: "social.coves.community.block", 1197 RKey: rkey, 1198 CID: "", 1199 Record: nil, 1200 }, 1201 } 1202 if handleErr := consumer.HandleEvent(context.Background(), &deleteEvent); handleErr != nil { 1203 t.Fatalf("Failed to handle delete event: %v", handleErr) 1204 } 1205 1206 // Verify block was removed from AppView 1207 t.Logf("🔍 Verifying block removed from AppView...") 1208 _, err = communityRepo.GetBlock(ctx, instanceDID, community.DID) 1209 if err == nil { 1210 t.Errorf("❌ Block still exists in AppView (should be deleted)") 1211 } else if !communities.IsNotFound(err) { 1212 t.Fatalf("Unexpected error querying block: %v", err) 1213 } else { 1214 t.Logf("✅ Block removed from AppView") 1215 } 1216 1217 t.Logf("✅ TRUE E2E UNBLOCK FLOW COMPLETE:") 1218 t.Logf(" Client → XRPC Unblock → PDS Delete → Firehose → Consumer → AppView ✓") 1219 t.Logf(" ✓ Block deleted from PDS") 1220 t.Logf(" ✓ Block removed from AppView") 1221 }) 1222 1223 t.Run("Block fails without authentication", func(t *testing.T) { 1224 // Create a community to attempt blocking 1225 community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL) 1226 1227 t.Logf("🔒 Attempting to block community without auth token...") 1228 blockReq := map[string]interface{}{ 1229 "community": community.DID, 1230 } 1231 1232 blockJSON, err := json.Marshal(blockReq) 1233 if err != nil { 1234 t.Fatalf("Failed to marshal block request: %v", err) 1235 } 1236 1237 req, err := http.NewRequest("POST", httpServer.URL+"/xrpc/social.coves.community.blockCommunity", bytes.NewBuffer(blockJSON)) 1238 if err != nil { 1239 t.Fatalf("Failed to create block request: %v", err) 1240 } 1241 req.Header.Set("Content-Type", "application/json") 1242 // NO Authorization header 1243 1244 resp, err := http.DefaultClient.Do(req) 1245 if err != nil { 1246 t.Fatalf("Failed to POST block: %v", err) 1247 } 1248 defer func() { _ = resp.Body.Close() }() 1249 1250 // Should fail with 401 Unauthorized 1251 if resp.StatusCode != http.StatusUnauthorized { 1252 body, _ := io.ReadAll(resp.Body) 1253 t.Errorf("Expected 401 Unauthorized, got %d: %s", resp.StatusCode, string(body)) 1254 } else { 1255 t.Logf("✅ Block correctly rejected without authentication (401)") 1256 } 1257 }) 1258 1259 t.Run("Update via XRPC endpoint", func(t *testing.T) { 1260 // Create a community first (via service, so it's indexed) 1261 community := createAndIndexCommunity(t, communityService, consumer, instanceDID, pdsURL) 1262 1263 // Update the community 1264 newDisplayName := "Updated E2E Test Community" 1265 newDescription := "This community has been updated" 1266 newVisibility := "unlisted" 1267 1268 // NOTE: updatedByDid is derived from JWT token, not provided in request 1269 updateReq := map[string]interface{}{ 1270 "communityDid": community.DID, 1271 "displayName": newDisplayName, 1272 "description": newDescription, 1273 "visibility": newVisibility, 1274 } 1275 1276 reqBody, marshalErr := json.Marshal(updateReq) 1277 if marshalErr != nil { 1278 t.Fatalf("Failed to marshal update request: %v", marshalErr) 1279 } 1280 1281 // POST update request with JWT authentication 1282 t.Logf("📡 Client → POST /xrpc/social.coves.community.update") 1283 t.Logf(" Updating community: %s", community.DID) 1284 1285 req, err := http.NewRequest(http.MethodPost, 1286 httpServer.URL+"/xrpc/social.coves.community.update", 1287 bytes.NewBuffer(reqBody)) 1288 if err != nil { 1289 t.Fatalf("Failed to create request: %v", err) 1290 } 1291 req.Header.Set("Content-Type", "application/json") 1292 // Use real PDS access token for E2E authentication 1293 req.Header.Set("Authorization", "Bearer "+accessToken) 1294 1295 resp, err := http.DefaultClient.Do(req) 1296 if err != nil { 1297 t.Fatalf("Failed to POST update: %v", err) 1298 } 1299 defer func() { _ = resp.Body.Close() }() 1300 1301 if resp.StatusCode != http.StatusOK { 1302 body, readErr := io.ReadAll(resp.Body) 1303 if readErr != nil { 1304 t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr) 1305 } 1306 t.Logf("❌ XRPC Update Failed") 1307 t.Logf(" Status: %d", resp.StatusCode) 1308 t.Logf(" Response: %s", string(body)) 1309 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 1310 } 1311 1312 var updateResp struct { 1313 URI string `json:"uri"` 1314 CID string `json:"cid"` 1315 DID string `json:"did"` 1316 Handle string `json:"handle"` 1317 } 1318 1319 if err := json.NewDecoder(resp.Body).Decode(&updateResp); err != nil { 1320 t.Fatalf("Failed to decode update response: %v", err) 1321 } 1322 1323 t.Logf("✅ XRPC update response received:") 1324 t.Logf(" DID: %s", updateResp.DID) 1325 t.Logf(" URI: %s", updateResp.URI) 1326 t.Logf(" CID: %s (changed after update)", updateResp.CID) 1327 1328 // Verify the CID changed (update creates a new version) 1329 if updateResp.CID == community.RecordCID { 1330 t.Logf("⚠️ Warning: CID did not change after update (expected for a new version)") 1331 } 1332 1333 // Simulate Jetstream consumer picking up the update event 1334 t.Logf("🔄 Simulating Jetstream consumer indexing update...") 1335 rkey := utils.ExtractRKeyFromURI(updateResp.URI) 1336 1337 // Fetch updated record from PDS 1338 pdsURL := os.Getenv("PDS_URL") 1339 if pdsURL == "" { 1340 pdsURL = "http://localhost:3001" 1341 } 1342 1343 collection := "social.coves.community.profile" 1344 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 1345 pdsURL, community.DID, collection, rkey)) 1346 if pdsErr != nil { 1347 t.Fatalf("Failed to fetch updated PDS record: %v", pdsErr) 1348 } 1349 defer func() { 1350 if closeErr := pdsResp.Body.Close(); closeErr != nil { 1351 t.Logf("Failed to close PDS response: %v", closeErr) 1352 } 1353 }() 1354 1355 var pdsRecord struct { 1356 Value map[string]interface{} `json:"value"` 1357 CID string `json:"cid"` 1358 } 1359 if decodeErr := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); decodeErr != nil { 1360 t.Fatalf("Failed to decode PDS record: %v", decodeErr) 1361 } 1362 1363 // Create update event for consumer 1364 updateEvent := jetstream.JetstreamEvent{ 1365 Did: community.DID, 1366 TimeUS: time.Now().UnixMicro(), 1367 Kind: "commit", 1368 Commit: &jetstream.CommitEvent{ 1369 Rev: "test-update-rev", 1370 Operation: "update", 1371 Collection: collection, 1372 RKey: rkey, 1373 CID: pdsRecord.CID, 1374 Record: pdsRecord.Value, 1375 }, 1376 } 1377 1378 if handleErr := consumer.HandleEvent(context.Background(), &updateEvent); handleErr != nil { 1379 t.Fatalf("Failed to handle update event: %v", handleErr) 1380 } 1381 1382 // Verify update was indexed in AppView 1383 t.Logf("🔍 Querying AppView to verify update was indexed...") 1384 updated, err := communityService.GetCommunity(ctx, community.DID) 1385 if err != nil { 1386 t.Fatalf("Failed to get updated community: %v", err) 1387 } 1388 1389 t.Logf("✅ Update indexed in AppView:") 1390 t.Logf(" DisplayName: %s (was: %s)", updated.DisplayName, community.DisplayName) 1391 t.Logf(" Description: %s", updated.Description) 1392 t.Logf(" Visibility: %s (was: %s)", updated.Visibility, community.Visibility) 1393 1394 // Verify the updates were applied 1395 if updated.DisplayName != newDisplayName { 1396 t.Errorf("DisplayName not updated: expected %s, got %s", newDisplayName, updated.DisplayName) 1397 } 1398 if updated.Description != newDescription { 1399 t.Errorf("Description not updated: expected %s, got %s", newDescription, updated.Description) 1400 } 1401 if updated.Visibility != newVisibility { 1402 t.Errorf("Visibility not updated: expected %s, got %s", newVisibility, updated.Visibility) 1403 } 1404 1405 t.Logf("✅ TRUE E2E UPDATE FLOW COMPLETE:") 1406 t.Logf(" Client → XRPC Update → PDS → Firehose → AppView ✓") 1407 }) 1408 1409 t.Logf("\n✅ Part 3 Complete: All XRPC HTTP endpoints working ✓") 1410 }) 1411 1412 divider := strings.Repeat("=", 80) 1413 t.Logf("\n%s", divider) 1414 t.Logf("✅ TRUE END-TO-END TEST COMPLETE - V2 COMMUNITIES ARCHITECTURE") 1415 t.Logf("%s", divider) 1416 t.Logf("\n🎯 Complete Flow Tested:") 1417 t.Logf(" 1. HTTP Request → Service Layer") 1418 t.Logf(" 2. Service → PDS Account Creation (com.atproto.server.createAccount)") 1419 t.Logf(" 3. Service → PDS Record Write (at://community_did/profile/self)") 1420 t.Logf(" 4. PDS → Jetstream Firehose (REAL WebSocket subscription!)") 1421 t.Logf(" 5. Jetstream → Consumer Event Handler") 1422 t.Logf(" 6. Consumer → AppView PostgreSQL Database") 1423 t.Logf(" 7. AppView DB → XRPC HTTP Endpoints") 1424 t.Logf(" 8. XRPC → Client Response") 1425 t.Logf("\n✅ V2 Architecture Verified:") 1426 t.Logf(" ✓ Community owns its own PDS account") 1427 t.Logf(" ✓ Community owns its own repository (at://community_did/...)") 1428 t.Logf(" ✓ PDS manages signing keypair (we only store credentials)") 1429 t.Logf(" ✓ Real Jetstream firehose event consumption") 1430 t.Logf(" ✓ True portability (community can migrate instances)") 1431 t.Logf(" ✓ Full atProto compliance") 1432 t.Logf("\n%s", divider) 1433 t.Logf("🚀 V2 Communities: Production Ready!") 1434 t.Logf("%s\n", divider) 1435} 1436 1437// Helper: create and index a community (simulates consumer indexing for fast test setup) 1438// NOTE: This simulates the firehose event for speed. For TRUE E2E testing with real 1439// Jetstream WebSocket subscription, see "Part 2: Real Jetstream Firehose Consumption" above. 1440func createAndIndexCommunity(t *testing.T, service communities.Service, consumer *jetstream.CommunityEventConsumer, instanceDID, pdsURL string) *communities.Community { 1441 // Use nanoseconds % 1 billion to get unique but short names 1442 // This avoids handle collisions when creating multiple communities quickly 1443 uniqueID := time.Now().UnixNano() % 1000000000 1444 req := communities.CreateCommunityRequest{ 1445 Name: fmt.Sprintf("test-%d", uniqueID), 1446 DisplayName: "Test Community", 1447 Description: "Test", 1448 Visibility: "public", 1449 CreatedByDID: instanceDID, 1450 HostedByDID: instanceDID, 1451 AllowExternalDiscovery: true, 1452 } 1453 1454 community, err := service.CreateCommunity(context.Background(), req) 1455 if err != nil { 1456 t.Fatalf("Failed to create: %v", err) 1457 } 1458 1459 // Fetch from PDS to get full record 1460 // V2: Record lives in community's own repository (at://community.DID/...) 1461 collection := "social.coves.community.profile" 1462 rkey := utils.ExtractRKeyFromURI(community.RecordURI) 1463 1464 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 1465 pdsURL, community.DID, collection, rkey)) 1466 if pdsErr != nil { 1467 t.Fatalf("Failed to fetch PDS record: %v", pdsErr) 1468 } 1469 defer func() { 1470 if closeErr := pdsResp.Body.Close(); closeErr != nil { 1471 t.Logf("Failed to close PDS response: %v", closeErr) 1472 } 1473 }() 1474 1475 var pdsRecord struct { 1476 Value map[string]interface{} `json:"value"` 1477 CID string `json:"cid"` 1478 } 1479 if decodeErr := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); decodeErr != nil { 1480 t.Fatalf("Failed to decode PDS record: %v", decodeErr) 1481 } 1482 1483 // Simulate firehose event for fast indexing 1484 // V2: Event comes from community's DID (community owns the repo) 1485 // NOTE: This bypasses real Jetstream WebSocket for speed. Real firehose testing 1486 // happens in "Part 2: Real Jetstream Firehose Consumption" above. 1487 event := jetstream.JetstreamEvent{ 1488 Did: community.DID, 1489 TimeUS: time.Now().UnixMicro(), 1490 Kind: "commit", 1491 Commit: &jetstream.CommitEvent{ 1492 Rev: "test", 1493 Operation: "create", 1494 Collection: collection, 1495 RKey: rkey, 1496 CID: pdsRecord.CID, 1497 Record: pdsRecord.Value, 1498 }, 1499 } 1500 1501 if handleErr := consumer.HandleEvent(context.Background(), &event); handleErr != nil { 1502 t.Logf("Warning: failed to handle event: %v", handleErr) 1503 } 1504 1505 return community 1506} 1507 1508// queryPDSAccount queries the PDS to verify an account exists 1509// Returns the account's DID and handle if found 1510func queryPDSAccount(pdsURL, handle string) (string, string, error) { 1511 // Use com.atproto.identity.resolveHandle to verify account exists 1512 resp, err := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.identity.resolveHandle?handle=%s", pdsURL, handle)) 1513 if err != nil { 1514 return "", "", fmt.Errorf("failed to query PDS: %w", err) 1515 } 1516 defer func() { _ = resp.Body.Close() }() 1517 1518 if resp.StatusCode != http.StatusOK { 1519 body, readErr := io.ReadAll(resp.Body) 1520 if readErr != nil { 1521 return "", "", fmt.Errorf("account not found (status %d, failed to read body: %w)", resp.StatusCode, readErr) 1522 } 1523 return "", "", fmt.Errorf("account not found (status %d): %s", resp.StatusCode, string(body)) 1524 } 1525 1526 var result struct { 1527 DID string `json:"did"` 1528 } 1529 1530 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 1531 return "", "", fmt.Errorf("failed to decode response: %w", err) 1532 } 1533 1534 return result.DID, handle, nil 1535} 1536 1537// subscribeToJetstream subscribes to real Jetstream firehose and processes events 1538// This enables TRUE E2E testing: PDS → Jetstream → Consumer → AppView 1539func subscribeToJetstream( 1540 ctx context.Context, 1541 jetstreamURL string, 1542 targetDID string, 1543 consumer *jetstream.CommunityEventConsumer, 1544 eventChan chan<- *jetstream.JetstreamEvent, 1545 errorChan chan<- error, 1546 done <-chan bool, 1547) error { 1548 // Import needed for websocket 1549 // Note: We'll use the gorilla websocket library 1550 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 1551 if err != nil { 1552 return fmt.Errorf("failed to connect to Jetstream: %w", err) 1553 } 1554 defer func() { _ = conn.Close() }() 1555 1556 // Read messages until we find our event or receive done signal 1557 for { 1558 select { 1559 case <-done: 1560 return nil 1561 case <-ctx.Done(): 1562 return ctx.Err() 1563 default: 1564 // Set read deadline to avoid blocking forever 1565 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { 1566 return fmt.Errorf("failed to set read deadline: %w", err) 1567 } 1568 1569 var event jetstream.JetstreamEvent 1570 err := conn.ReadJSON(&event) 1571 if err != nil { 1572 // Check if it's a timeout (expected) 1573 if websocket.IsCloseError(err, websocket.CloseNormalClosure) { 1574 return nil 1575 } 1576 if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 1577 continue // Timeout is expected, keep listening 1578 } 1579 // For other errors, don't retry reading from a broken connection 1580 return fmt.Errorf("failed to read Jetstream message: %w", err) 1581 } 1582 1583 // Check if this is the event we're looking for 1584 if event.Did == targetDID && event.Kind == "commit" { 1585 // Process the event through the consumer 1586 if err := consumer.HandleEvent(ctx, &event); err != nil { 1587 return fmt.Errorf("failed to process event: %w", err) 1588 } 1589 1590 // Send to channel so test can verify 1591 select { 1592 case eventChan <- &event: 1593 return nil 1594 case <-time.After(1 * time.Second): 1595 return fmt.Errorf("timeout sending event to channel") 1596 } 1597 } 1598 } 1599 } 1600}