A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "bytes" 5 "context" 6 "database/sql" 7 "encoding/json" 8 "fmt" 9 "net" 10 "net/http" 11 "net/http/httptest" 12 "os" 13 "strings" 14 "testing" 15 "time" 16 17 "Coves/internal/api/handlers/post" 18 "Coves/internal/api/middleware" 19 "Coves/internal/atproto/identity" 20 "Coves/internal/atproto/jetstream" 21 "Coves/internal/core/communities" 22 "Coves/internal/core/posts" 23 "Coves/internal/core/users" 24 "Coves/internal/db/postgres" 25 26 "github.com/gorilla/websocket" 27 _ "github.com/lib/pq" 28 "github.com/pressly/goose/v3" 29 "github.com/stretchr/testify/assert" 30 "github.com/stretchr/testify/require" 31) 32 33// TestPostCreation_E2E_WithJetstream tests the full post creation flow: 34// XRPC endpoint → AppView Service → PDS write → Jetstream consumer → DB indexing 35// 36// This is a TRUE E2E test that simulates what happens in production: 37// 1. Client calls POST /xrpc/social.coves.community.post.create with auth token 38// 2. Handler validates and calls PostService.CreatePost() 39// 3. Service writes post to community's PDS repository 40// 4. PDS broadcasts event to firehose/Jetstream 41// 5. Jetstream consumer receives event and indexes post in AppView DB 42// 6. Post is now queryable from AppView 43// 44// NOTE: This test simulates the Jetstream event (step 4-5) since we don't have 45// a live PDS/Jetstream in test environment. For true live testing, use TestPostCreation_E2E_LivePDS. 46func TestPostCreation_E2E_WithJetstream(t *testing.T) { 47 db := setupTestDB(t) 48 defer func() { 49 if err := db.Close(); err != nil { 50 t.Logf("Failed to close database: %v", err) 51 } 52 }() 53 54 // Cleanup old test data first 55 _, _ = db.Exec("DELETE FROM posts WHERE community_did = 'did:plc:gaming123'") 56 _, _ = db.Exec("DELETE FROM communities WHERE did = 'did:plc:gaming123'") 57 _, _ = db.Exec("DELETE FROM users WHERE did = 'did:plc:alice123'") 58 59 // Setup repositories 60 userRepo := postgres.NewUserRepository(db) 61 communityRepo := postgres.NewCommunityRepository(db) 62 postRepo := postgres.NewPostRepository(db) 63 64 // Setup user service for post consumer 65 identityConfig := identity.DefaultConfig() 66 identityResolver := identity.NewResolver(db, identityConfig) 67 userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001") 68 69 // Create test user (author) 70 author := createTestUser(t, db, "alice.test", "did:plc:alice123") 71 72 // Create test community with fake PDS credentials 73 // In real E2E, this would be a real community provisioned on PDS 74 community := &communities.Community{ 75 DID: "did:plc:gaming123", 76 Handle: "gaming.community.test.coves.social", 77 Name: "gaming", 78 DisplayName: "Gaming Community", 79 OwnerDID: "did:plc:gaming123", 80 CreatedByDID: author.DID, 81 HostedByDID: "did:web:coves.test", 82 Visibility: "public", 83 ModerationType: "moderator", 84 RecordURI: "at://did:plc:gaming123/social.coves.community.profile/self", 85 RecordCID: "fakecid123", 86 PDSAccessToken: "fake_token_for_testing", 87 PDSRefreshToken: "fake_refresh_token", 88 } 89 _, err := communityRepo.Create(context.Background(), community) 90 if err != nil { 91 t.Fatalf("Failed to create test community: %v", err) 92 } 93 94 t.Run("Full E2E flow - XRPC to DB via Jetstream", func(t *testing.T) { 95 ctx := context.Background() 96 97 // STEP 1: Simulate what the XRPC handler would receive 98 // In real flow, this comes from client with OAuth bearer token 99 title := "My First Post" 100 content := "This is a test post!" 101 postReq := posts.CreatePostRequest{ 102 Title: &title, 103 Content: &content, 104 // Community and AuthorDID set by handler from request context 105 } 106 107 // STEP 2: Simulate Jetstream consumer receiving the post CREATE event 108 // In real production, this event comes from PDS via Jetstream WebSocket 109 // For this test, we simulate the event that would be broadcast after PDS write 110 111 // Generate a realistic rkey (TID - timestamp identifier) 112 rkey := generateTID() 113 114 // Build the post record as it would appear in Jetstream 115 jetstreamEvent := jetstream.JetstreamEvent{ 116 Did: community.DID, // Repo owner (community) 117 Kind: "commit", 118 Commit: &jetstream.CommitEvent{ 119 Operation: "create", 120 Collection: "social.coves.community.post", 121 RKey: rkey, 122 CID: "bafy2bzaceabc123def456", // Fake CID 123 Record: map[string]interface{}{ 124 "$type": "social.coves.community.post", 125 "community": community.DID, 126 "author": author.DID, 127 "title": *postReq.Title, 128 "content": *postReq.Content, 129 "createdAt": time.Now().Format(time.RFC3339), 130 }, 131 }, 132 } 133 134 // STEP 3: Process event through Jetstream consumer 135 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 136 err := consumer.HandleEvent(ctx, &jetstreamEvent) 137 if err != nil { 138 t.Fatalf("Jetstream consumer failed to process event: %v", err) 139 } 140 141 // STEP 4: Verify post was indexed in AppView database 142 expectedURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", community.DID, rkey) 143 indexedPost, err := postRepo.GetByURI(ctx, expectedURI) 144 if err != nil { 145 t.Fatalf("Post not indexed in AppView: %v", err) 146 } 147 148 // STEP 5: Verify all fields are correct 149 if indexedPost.URI != expectedURI { 150 t.Errorf("Expected URI %s, got %s", expectedURI, indexedPost.URI) 151 } 152 if indexedPost.AuthorDID != author.DID { 153 t.Errorf("Expected author %s, got %s", author.DID, indexedPost.AuthorDID) 154 } 155 if indexedPost.CommunityDID != community.DID { 156 t.Errorf("Expected community %s, got %s", community.DID, indexedPost.CommunityDID) 157 } 158 if indexedPost.Title == nil || *indexedPost.Title != title { 159 t.Errorf("Expected title '%s', got %v", title, indexedPost.Title) 160 } 161 if indexedPost.Content == nil || *indexedPost.Content != content { 162 t.Errorf("Expected content '%s', got %v", content, indexedPost.Content) 163 } 164 165 // Verify stats initialized correctly 166 if indexedPost.UpvoteCount != 0 { 167 t.Errorf("Expected upvote_count 0, got %d", indexedPost.UpvoteCount) 168 } 169 if indexedPost.DownvoteCount != 0 { 170 t.Errorf("Expected downvote_count 0, got %d", indexedPost.DownvoteCount) 171 } 172 if indexedPost.Score != 0 { 173 t.Errorf("Expected score 0, got %d", indexedPost.Score) 174 } 175 176 t.Logf("✓ E2E test passed! Post indexed with URI: %s", indexedPost.URI) 177 }) 178 179 t.Run("Consumer validates repository ownership (security)", func(t *testing.T) { 180 ctx := context.Background() 181 182 // SECURITY TEST: Try to create a post that claims to be from the community 183 // but actually comes from a user's repository 184 // This should be REJECTED by the consumer 185 186 maliciousEvent := jetstream.JetstreamEvent{ 187 Did: author.DID, // Event from user's repo (NOT community repo) 188 Kind: "commit", 189 Commit: &jetstream.CommitEvent{ 190 Operation: "create", 191 Collection: "social.coves.community.post", 192 RKey: generateTID(), 193 CID: "bafy2bzacefake", 194 Record: map[string]interface{}{ 195 "$type": "social.coves.community.post", 196 "community": community.DID, // Claims to be for this community 197 "author": author.DID, 198 "title": "Fake Post", 199 "content": "This is a malicious post attempt", 200 "createdAt": time.Now().Format(time.RFC3339), 201 }, 202 }, 203 } 204 205 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 206 err := consumer.HandleEvent(ctx, &maliciousEvent) 207 208 // Should get security error 209 if err == nil { 210 t.Fatal("Expected security error for post from wrong repository, got nil") 211 } 212 213 if !contains(err.Error(), "repository DID") || !contains(err.Error(), "doesn't match") { 214 t.Errorf("Expected repository mismatch error, got: %v", err) 215 } 216 217 t.Logf("✓ Security validation passed: %v", err) 218 }) 219 220 t.Run("Idempotent indexing - duplicate events", func(t *testing.T) { 221 ctx := context.Background() 222 223 // Simulate the same Jetstream event arriving twice 224 // This can happen during Jetstream replays or network retries 225 rkey := generateTID() 226 event := jetstream.JetstreamEvent{ 227 Did: community.DID, 228 Kind: "commit", 229 Commit: &jetstream.CommitEvent{ 230 Operation: "create", 231 Collection: "social.coves.community.post", 232 RKey: rkey, 233 CID: "bafy2bzaceidempotent", 234 Record: map[string]interface{}{ 235 "$type": "social.coves.community.post", 236 "community": community.DID, 237 "author": author.DID, 238 "title": "Duplicate Test", 239 "content": "Testing idempotency", 240 "createdAt": time.Now().Format(time.RFC3339), 241 }, 242 }, 243 } 244 245 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 246 247 // First event - should succeed 248 err := consumer.HandleEvent(ctx, &event) 249 if err != nil { 250 t.Fatalf("First event failed: %v", err) 251 } 252 253 // Second event (duplicate) - should be handled gracefully 254 err = consumer.HandleEvent(ctx, &event) 255 if err != nil { 256 t.Fatalf("Duplicate event should be handled gracefully, got error: %v", err) 257 } 258 259 // Verify only one post in database 260 uri := fmt.Sprintf("at://%s/social.coves.community.post/%s", community.DID, rkey) 261 post, err := postRepo.GetByURI(ctx, uri) 262 if err != nil { 263 t.Fatalf("Post not found: %v", err) 264 } 265 266 if post.URI != uri { 267 t.Error("Post URI mismatch - possible duplicate indexing") 268 } 269 270 t.Logf("✓ Idempotency test passed") 271 }) 272 273 t.Run("Handles orphaned posts (unknown community)", func(t *testing.T) { 274 ctx := context.Background() 275 276 // Post references a community that doesn't exist in AppView yet 277 // This can happen if Jetstream delivers post event before community profile event 278 unknownCommunityDID := "did:plc:unknown999" 279 280 event := jetstream.JetstreamEvent{ 281 Did: unknownCommunityDID, 282 Kind: "commit", 283 Commit: &jetstream.CommitEvent{ 284 Operation: "create", 285 Collection: "social.coves.community.post", 286 RKey: generateTID(), 287 CID: "bafy2bzaceorphaned", 288 Record: map[string]interface{}{ 289 "$type": "social.coves.community.post", 290 "community": unknownCommunityDID, 291 "author": author.DID, 292 "title": "Orphaned Post", 293 "content": "Community not indexed yet", 294 "createdAt": time.Now().Format(time.RFC3339), 295 }, 296 }, 297 } 298 299 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 300 301 // Should log warning but NOT fail (eventual consistency) 302 // Note: This will fail due to foreign key constraint in current schema 303 // In production, you might want to handle this differently (defer indexing, etc.) 304 err := consumer.HandleEvent(ctx, &event) 305 306 // For now, we expect this to fail due to FK constraint 307 // In future, we might make FK constraint DEFERRABLE or handle orphaned posts differently 308 if err == nil { 309 t.Log("⚠️ Orphaned post was indexed (FK constraint not enforced)") 310 } else { 311 t.Logf("✓ Orphaned post rejected by FK constraint (expected): %v", err) 312 } 313 }) 314} 315 316// TestPostCreation_E2E_LivePDS tests the COMPLETE end-to-end flow with a live PDS: 317// 1. HTTP POST to /xrpc/social.coves.community.post.create (with auth) 318// 2. Handler → Service → Write to community's PDS repository 319// 3. PDS → Jetstream firehose event 320// 4. Jetstream consumer → Index in AppView database 321// 5. Verify post appears in database with correct data 322// 323// This is a TRUE E2E test that requires: 324// - Live PDS running at PDS_URL (default: http://localhost:3001) 325// - Live Jetstream running at JETSTREAM_URL (default: ws://localhost:6008/subscribe) 326// - Test database running 327func TestPostCreation_E2E_LivePDS(t *testing.T) { 328 if testing.Short() { 329 t.Skip("Skipping live PDS E2E test in short mode") 330 } 331 332 // Setup test database 333 dbURL := os.Getenv("TEST_DATABASE_URL") 334 if dbURL == "" { 335 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 336 } 337 338 db, err := sql.Open("postgres", dbURL) 339 require.NoError(t, err, "Failed to connect to test database") 340 defer func() { 341 if closeErr := db.Close(); closeErr != nil { 342 t.Logf("Failed to close database: %v", closeErr) 343 } 344 }() 345 346 // Run migrations 347 require.NoError(t, goose.SetDialect("postgres")) 348 require.NoError(t, goose.Up(db, "../../internal/db/migrations")) 349 350 // Check if PDS is running 351 pdsURL := os.Getenv("PDS_URL") 352 if pdsURL == "" { 353 pdsURL = "http://localhost:3001" 354 } 355 356 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 357 if err != nil { 358 t.Skipf("PDS not running at %s: %v", pdsURL, err) 359 } 360 _ = healthResp.Body.Close() 361 362 // Get instance credentials for authentication 363 instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE") 364 instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD") 365 if instanceHandle == "" { 366 instanceHandle = "testuser123.local.coves.dev" 367 } 368 if instancePassword == "" { 369 instancePassword = "test-password-123" 370 } 371 372 t.Logf("🔐 Authenticating with PDS as: %s", instanceHandle) 373 374 // Authenticate to get instance DID (needed for provisioner domain) 375 _, instanceDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword) 376 if err != nil { 377 t.Skipf("Failed to authenticate with PDS (may not be configured): %v", err) 378 } 379 380 t.Logf("✅ Authenticated - Instance DID: %s", instanceDID) 381 382 // Extract instance domain from DID for community provisioning 383 var instanceDomain string 384 if strings.HasPrefix(instanceDID, "did:web:") { 385 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 386 } else { 387 // Fallback for did:plc 388 instanceDomain = "coves.social" 389 } 390 391 // Setup repositories and services 392 communityRepo := postgres.NewCommunityRepository(db) 393 postRepo := postgres.NewPostRepository(db) 394 395 // Setup PDS account provisioner for community creation 396 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL) 397 398 // Setup community service with real PDS provisioner 399 communityService := communities.NewCommunityService( 400 communityRepo, 401 pdsURL, 402 instanceDID, 403 instanceDomain, 404 provisioner, // ✅ Real provisioner for creating communities on PDS 405 ) 406 407 postService := posts.NewPostService(postRepo, communityService, nil, pdsURL) // nil aggregatorService for user-only tests 408 409 // Setup auth middleware (skip JWT verification for testing) 410 authMiddleware := middleware.NewAtProtoAuthMiddleware(nil, true) 411 412 // Setup HTTP handler 413 createHandler := post.NewCreateHandler(postService) 414 415 ctx := context.Background() 416 417 // Cleanup old test data 418 _, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE 'did:plc:e2etest%'") 419 _, _ = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:e2etest%'") 420 _, _ = db.Exec("DELETE FROM users WHERE did LIKE 'did:plc:e2etest%'") 421 422 // Create test user (author) 423 author := createTestUser(t, db, "e2etestauthor.bsky.social", "did:plc:e2etestauthor123") 424 425 // ==================================================================================== 426 // Part 1: Write-Forward to PDS 427 // ==================================================================================== 428 t.Run("1. Write-Forward to PDS", func(t *testing.T) { 429 // TRUE E2E: Actually provision a real community on PDS 430 // This tests the full flow: 431 // 1. Call com.atproto.server.createAccount on PDS 432 // 2. PDS generates DID, keys, tokens 433 // 3. Write community profile to PDS repository 434 // 4. Store credentials in AppView DB 435 // 5. Use those credentials to create a post 436 437 // Use timestamp to ensure unique community name for each test run 438 communityName := fmt.Sprintf("e2epost%d", time.Now().Unix()) 439 440 t.Logf("\n📝 Provisioning test community on live PDS (name: %s)...", communityName) 441 community, err := communityService.CreateCommunity(ctx, communities.CreateCommunityRequest{ 442 Name: communityName, 443 DisplayName: "E2E Test Community", 444 Description: "Test community for E2E post creation testing", 445 CreatedByDID: author.DID, 446 Visibility: "public", 447 AllowExternalDiscovery: true, 448 }) 449 require.NoError(t, err, "Failed to provision community on PDS") 450 require.NotEmpty(t, community.DID, "Community should have DID from PDS") 451 require.NotEmpty(t, community.PDSAccessToken, "Community should have access token") 452 require.NotEmpty(t, community.PDSRefreshToken, "Community should have refresh token") 453 454 t.Logf("✓ Community provisioned: DID=%s, Handle=%s", community.DID, community.Handle) 455 456 // NOTE: Cleanup disabled to allow post-test inspection of indexed data 457 // Uncomment to enable cleanup after test 458 // defer func() { 459 // if err := communityRepo.Delete(ctx, community.DID); err != nil { 460 // t.Logf("Warning: Failed to cleanup test community: %v", err) 461 // } 462 // }() 463 464 // Build HTTP request for post creation 465 title := "E2E Test Post" 466 content := "This post was created via full E2E test with live PDS!" 467 reqBody := map[string]interface{}{ 468 "community": community.DID, 469 "title": title, 470 "content": content, 471 } 472 reqJSON, err := json.Marshal(reqBody) 473 require.NoError(t, err) 474 475 // Create HTTP request 476 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 477 req.Header.Set("Content-Type", "application/json") 478 479 // Create a simple JWT for testing (Phase 1: no signature verification) 480 // In production, this would be a real OAuth token from PDS 481 testJWT := createSimpleTestJWT(author.DID) 482 req.Header.Set("Authorization", "Bearer "+testJWT) 483 484 // Execute request through auth middleware + handler 485 rr := httptest.NewRecorder() 486 handler := authMiddleware.RequireAuth(http.HandlerFunc(createHandler.HandleCreate)) 487 handler.ServeHTTP(rr, req) 488 489 // Check response 490 require.Equal(t, http.StatusOK, rr.Code, "Handler should return 200 OK, body: %s", rr.Body.String()) 491 492 // Parse response 493 var response posts.CreatePostResponse 494 err = json.NewDecoder(rr.Body).Decode(&response) 495 require.NoError(t, err, "Failed to parse response") 496 497 t.Logf("✅ Post created on PDS:") 498 t.Logf(" URI: %s", response.URI) 499 t.Logf(" CID: %s", response.CID) 500 501 // ==================================================================================== 502 // Part 2: TRUE E2E - Real Jetstream Firehose Consumer 503 // ==================================================================================== 504 // This part tests the ACTUAL production code path in main.go 505 // including the WebSocket connection and consumer logic 506 t.Run("2. Real Jetstream Firehose Consumption", func(t *testing.T) { 507 t.Logf("\n🔄 TRUE E2E: Subscribing to real Jetstream firehose...") 508 509 // Get PDS hostname for Jetstream filtering 510 pdsHostname := strings.TrimPrefix(pdsURL, "http://") 511 pdsHostname = strings.TrimPrefix(pdsHostname, "https://") 512 pdsHostname = strings.Split(pdsHostname, ":")[0] // Remove port 513 514 // Build Jetstream URL with filters for post records 515 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.community.post", 516 pdsHostname) 517 518 t.Logf(" Jetstream URL: %s", jetstreamURL) 519 t.Logf(" Looking for post URI: %s", response.URI) 520 t.Logf(" Community DID: %s", community.DID) 521 522 // Setup user service (required by post consumer) 523 userRepo := postgres.NewUserRepository(db) 524 identityConfig := identity.DefaultConfig() 525 plcURL := os.Getenv("PLC_DIRECTORY_URL") 526 if plcURL == "" { 527 plcURL = "http://localhost:3002" 528 } 529 identityConfig.PLCURL = plcURL 530 identityResolver := identity.NewResolver(db, identityConfig) 531 userService := users.NewUserService(userRepo, identityResolver, pdsURL) 532 533 // Create post consumer (same as main.go) 534 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 535 536 // Channels to receive the event 537 eventChan := make(chan *jetstream.JetstreamEvent, 10) 538 errorChan := make(chan error, 1) 539 done := make(chan bool) 540 541 // Start Jetstream WebSocket subscriber in background 542 // This creates its own WebSocket connection to Jetstream 543 go func() { 544 err := subscribeToJetstreamForPost(ctx, jetstreamURL, community.DID, postConsumer, eventChan, errorChan, done) 545 if err != nil { 546 errorChan <- err 547 } 548 }() 549 550 // Wait for event or timeout 551 t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...") 552 553 select { 554 case event := <-eventChan: 555 t.Logf("✅ Received real Jetstream event!") 556 t.Logf(" Event DID: %s", event.Did) 557 t.Logf(" Collection: %s", event.Commit.Collection) 558 t.Logf(" Operation: %s", event.Commit.Operation) 559 t.Logf(" RKey: %s", event.Commit.RKey) 560 561 // Verify it's for our community 562 assert.Equal(t, community.DID, event.Did, "Event should be from community repo") 563 564 // Verify post was indexed in AppView database 565 t.Logf("\n🔍 Querying AppView database for indexed post...") 566 567 indexedPost, err := postRepo.GetByURI(ctx, response.URI) 568 require.NoError(t, err, "Post should be indexed in AppView") 569 570 t.Logf("✅ Post indexed in AppView:") 571 t.Logf(" URI: %s", indexedPost.URI) 572 t.Logf(" CID: %s", indexedPost.CID) 573 t.Logf(" Author DID: %s", indexedPost.AuthorDID) 574 t.Logf(" Community: %s", indexedPost.CommunityDID) 575 t.Logf(" Title: %v", indexedPost.Title) 576 t.Logf(" Content: %v", indexedPost.Content) 577 578 // Verify all fields match what we sent 579 assert.Equal(t, response.URI, indexedPost.URI, "URI should match") 580 assert.Equal(t, response.CID, indexedPost.CID, "CID should match") 581 assert.Equal(t, author.DID, indexedPost.AuthorDID, "Author DID should match") 582 assert.Equal(t, community.DID, indexedPost.CommunityDID, "Community DID should match") 583 assert.Equal(t, title, *indexedPost.Title, "Title should match") 584 assert.Equal(t, content, *indexedPost.Content, "Content should match") 585 586 // Verify stats initialized correctly 587 assert.Equal(t, 0, indexedPost.UpvoteCount, "Upvote count should be 0") 588 assert.Equal(t, 0, indexedPost.DownvoteCount, "Downvote count should be 0") 589 assert.Equal(t, 0, indexedPost.Score, "Score should be 0") 590 assert.Equal(t, 0, indexedPost.CommentCount, "Comment count should be 0") 591 592 // Verify timestamps 593 assert.False(t, indexedPost.CreatedAt.IsZero(), "CreatedAt should be set") 594 assert.False(t, indexedPost.IndexedAt.IsZero(), "IndexedAt should be set") 595 596 // Signal to stop Jetstream consumer 597 close(done) 598 599 t.Log("\n✅ Part 2 Complete: TRUE E2E - PDS → Jetstream → Consumer → AppView ✓") 600 601 case err := <-errorChan: 602 t.Fatalf("❌ Jetstream error: %v", err) 603 604 case <-time.After(30 * time.Second): 605 t.Fatalf("❌ Timeout: No Jetstream event received within 30 seconds") 606 } 607 }) 608 }) 609} 610 611// subscribeToJetstreamForPost subscribes to real Jetstream firehose and processes post events 612// This helper creates a WebSocket connection to Jetstream and waits for post events 613func subscribeToJetstreamForPost( 614 ctx context.Context, 615 jetstreamURL string, 616 targetDID string, 617 consumer *jetstream.PostEventConsumer, 618 eventChan chan<- *jetstream.JetstreamEvent, 619 errorChan chan<- error, 620 done <-chan bool, 621) error { 622 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 623 if err != nil { 624 return fmt.Errorf("failed to connect to Jetstream: %w", err) 625 } 626 defer func() { _ = conn.Close() }() 627 628 // Read messages until we find our event or receive done signal 629 for { 630 select { 631 case <-done: 632 return nil 633 case <-ctx.Done(): 634 return ctx.Err() 635 default: 636 // Set read deadline to avoid blocking forever 637 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { 638 return fmt.Errorf("failed to set read deadline: %w", err) 639 } 640 641 var event jetstream.JetstreamEvent 642 err := conn.ReadJSON(&event) 643 if err != nil { 644 // Check if it's a timeout (expected) 645 if websocket.IsCloseError(err, websocket.CloseNormalClosure) { 646 return nil 647 } 648 if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 649 continue // Timeout is expected, keep listening 650 } 651 // For other errors, don't retry reading from a broken connection 652 return fmt.Errorf("failed to read Jetstream message: %w", err) 653 } 654 655 // Check if this is a post event for the target DID 656 if event.Did == targetDID && event.Kind == "commit" && 657 event.Commit != nil && event.Commit.Collection == "social.coves.community.post" { 658 // Process the event through the consumer 659 if err := consumer.HandleEvent(ctx, &event); err != nil { 660 return fmt.Errorf("failed to process event: %w", err) 661 } 662 663 // Send to channel so test can verify 664 select { 665 case eventChan <- &event: 666 return nil 667 case <-time.After(1 * time.Second): 668 return fmt.Errorf("timeout sending event to channel") 669 } 670 } 671 } 672 } 673}