A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/api/handlers/post" 5 "Coves/internal/api/middleware" 6 "Coves/internal/atproto/identity" 7 "Coves/internal/atproto/jetstream" 8 "Coves/internal/core/communities" 9 "Coves/internal/core/posts" 10 "Coves/internal/core/users" 11 "Coves/internal/db/postgres" 12 "bytes" 13 "context" 14 "database/sql" 15 "encoding/json" 16 "fmt" 17 "net" 18 "net/http" 19 "net/http/httptest" 20 "os" 21 "strings" 22 "testing" 23 "time" 24 25 "github.com/gorilla/websocket" 26 _ "github.com/lib/pq" 27 "github.com/pressly/goose/v3" 28 "github.com/stretchr/testify/assert" 29 "github.com/stretchr/testify/require" 30) 31 32// TestPostCreation_E2E_WithJetstream tests the full post creation flow: 33// XRPC endpoint → AppView Service → PDS write → Jetstream consumer → DB indexing 34// 35// This is a TRUE E2E test that simulates what happens in production: 36// 1. Client calls POST /xrpc/social.coves.community.post.create with auth token 37// 2. Handler validates and calls PostService.CreatePost() 38// 3. Service writes post to community's PDS repository 39// 4. PDS broadcasts event to firehose/Jetstream 40// 5. Jetstream consumer receives event and indexes post in AppView DB 41// 6. Post is now queryable from AppView 42// 43// NOTE: This test simulates the Jetstream event (step 4-5) since we don't have 44// a live PDS/Jetstream in test environment. For true live testing, use TestPostCreation_E2E_LivePDS. 45func TestPostCreation_E2E_WithJetstream(t *testing.T) { 46 db := setupTestDB(t) 47 defer func() { 48 if err := db.Close(); err != nil { 49 t.Logf("Failed to close database: %v", err) 50 } 51 }() 52 53 // Cleanup old test data first 54 _, _ = db.Exec("DELETE FROM posts WHERE community_did = 'did:plc:gaming123'") 55 _, _ = db.Exec("DELETE FROM communities WHERE did = 'did:plc:gaming123'") 56 _, _ = db.Exec("DELETE FROM users WHERE did = 'did:plc:alice123'") 57 58 // Setup repositories 59 userRepo := postgres.NewUserRepository(db) 60 communityRepo := postgres.NewCommunityRepository(db) 61 postRepo := postgres.NewPostRepository(db) 62 63 // Setup user service for post consumer 64 identityConfig := identity.DefaultConfig() 65 identityResolver := identity.NewResolver(db, identityConfig) 66 userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001") 67 68 // Create test user (author) 69 author := createTestUser(t, db, "alice.test", "did:plc:alice123") 70 71 // Create test community with fake PDS credentials 72 // In real E2E, this would be a real community provisioned on PDS 73 community := &communities.Community{ 74 DID: "did:plc:gaming123", 75 Handle: "gaming.community.test.coves.social", 76 Name: "gaming", 77 DisplayName: "Gaming Community", 78 OwnerDID: "did:plc:gaming123", 79 CreatedByDID: author.DID, 80 HostedByDID: "did:web:coves.test", 81 Visibility: "public", 82 ModerationType: "moderator", 83 RecordURI: "at://did:plc:gaming123/social.coves.community.profile/self", 84 RecordCID: "fakecid123", 85 PDSAccessToken: "fake_token_for_testing", 86 PDSRefreshToken: "fake_refresh_token", 87 } 88 _, err := communityRepo.Create(context.Background(), community) 89 if err != nil { 90 t.Fatalf("Failed to create test community: %v", err) 91 } 92 93 t.Run("Full E2E flow - XRPC to DB via Jetstream", func(t *testing.T) { 94 ctx := context.Background() 95 96 // STEP 1: Simulate what the XRPC handler would receive 97 // In real flow, this comes from client with OAuth bearer token 98 title := "My First Post" 99 content := "This is a test post!" 100 postReq := posts.CreatePostRequest{ 101 Title: &title, 102 Content: &content, 103 // Community and AuthorDID set by handler from request context 104 } 105 106 // STEP 2: Simulate Jetstream consumer receiving the post CREATE event 107 // In real production, this event comes from PDS via Jetstream WebSocket 108 // For this test, we simulate the event that would be broadcast after PDS write 109 110 // Generate a realistic rkey (TID - timestamp identifier) 111 rkey := generateTID() 112 113 // Build the post record as it would appear in Jetstream 114 jetstreamEvent := jetstream.JetstreamEvent{ 115 Did: community.DID, // Repo owner (community) 116 Kind: "commit", 117 Commit: &jetstream.CommitEvent{ 118 Operation: "create", 119 Collection: "social.coves.community.post", 120 RKey: rkey, 121 CID: "bafy2bzaceabc123def456", // Fake CID 122 Record: map[string]interface{}{ 123 "$type": "social.coves.community.post", 124 "community": community.DID, 125 "author": author.DID, 126 "title": *postReq.Title, 127 "content": *postReq.Content, 128 "createdAt": time.Now().Format(time.RFC3339), 129 }, 130 }, 131 } 132 133 // STEP 3: Process event through Jetstream consumer 134 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 135 err := consumer.HandleEvent(ctx, &jetstreamEvent) 136 if err != nil { 137 t.Fatalf("Jetstream consumer failed to process event: %v", err) 138 } 139 140 // STEP 4: Verify post was indexed in AppView database 141 expectedURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", community.DID, rkey) 142 indexedPost, err := postRepo.GetByURI(ctx, expectedURI) 143 if err != nil { 144 t.Fatalf("Post not indexed in AppView: %v", err) 145 } 146 147 // STEP 5: Verify all fields are correct 148 if indexedPost.URI != expectedURI { 149 t.Errorf("Expected URI %s, got %s", expectedURI, indexedPost.URI) 150 } 151 if indexedPost.AuthorDID != author.DID { 152 t.Errorf("Expected author %s, got %s", author.DID, indexedPost.AuthorDID) 153 } 154 if indexedPost.CommunityDID != community.DID { 155 t.Errorf("Expected community %s, got %s", community.DID, indexedPost.CommunityDID) 156 } 157 if indexedPost.Title == nil || *indexedPost.Title != title { 158 t.Errorf("Expected title '%s', got %v", title, indexedPost.Title) 159 } 160 if indexedPost.Content == nil || *indexedPost.Content != content { 161 t.Errorf("Expected content '%s', got %v", content, indexedPost.Content) 162 } 163 164 // Verify stats initialized correctly 165 if indexedPost.UpvoteCount != 0 { 166 t.Errorf("Expected upvote_count 0, got %d", indexedPost.UpvoteCount) 167 } 168 if indexedPost.DownvoteCount != 0 { 169 t.Errorf("Expected downvote_count 0, got %d", indexedPost.DownvoteCount) 170 } 171 if indexedPost.Score != 0 { 172 t.Errorf("Expected score 0, got %d", indexedPost.Score) 173 } 174 175 t.Logf("✓ E2E test passed! Post indexed with URI: %s", indexedPost.URI) 176 }) 177 178 t.Run("Consumer validates repository ownership (security)", func(t *testing.T) { 179 ctx := context.Background() 180 181 // SECURITY TEST: Try to create a post that claims to be from the community 182 // but actually comes from a user's repository 183 // This should be REJECTED by the consumer 184 185 maliciousEvent := jetstream.JetstreamEvent{ 186 Did: author.DID, // Event from user's repo (NOT community repo) 187 Kind: "commit", 188 Commit: &jetstream.CommitEvent{ 189 Operation: "create", 190 Collection: "social.coves.community.post", 191 RKey: generateTID(), 192 CID: "bafy2bzacefake", 193 Record: map[string]interface{}{ 194 "$type": "social.coves.community.post", 195 "community": community.DID, // Claims to be for this community 196 "author": author.DID, 197 "title": "Fake Post", 198 "content": "This is a malicious post attempt", 199 "createdAt": time.Now().Format(time.RFC3339), 200 }, 201 }, 202 } 203 204 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 205 err := consumer.HandleEvent(ctx, &maliciousEvent) 206 207 // Should get security error 208 if err == nil { 209 t.Fatal("Expected security error for post from wrong repository, got nil") 210 } 211 212 if !contains(err.Error(), "repository DID") || !contains(err.Error(), "doesn't match") { 213 t.Errorf("Expected repository mismatch error, got: %v", err) 214 } 215 216 t.Logf("✓ Security validation passed: %v", err) 217 }) 218 219 t.Run("Idempotent indexing - duplicate events", func(t *testing.T) { 220 ctx := context.Background() 221 222 // Simulate the same Jetstream event arriving twice 223 // This can happen during Jetstream replays or network retries 224 rkey := generateTID() 225 event := jetstream.JetstreamEvent{ 226 Did: community.DID, 227 Kind: "commit", 228 Commit: &jetstream.CommitEvent{ 229 Operation: "create", 230 Collection: "social.coves.community.post", 231 RKey: rkey, 232 CID: "bafy2bzaceidempotent", 233 Record: map[string]interface{}{ 234 "$type": "social.coves.community.post", 235 "community": community.DID, 236 "author": author.DID, 237 "title": "Duplicate Test", 238 "content": "Testing idempotency", 239 "createdAt": time.Now().Format(time.RFC3339), 240 }, 241 }, 242 } 243 244 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 245 246 // First event - should succeed 247 err := consumer.HandleEvent(ctx, &event) 248 if err != nil { 249 t.Fatalf("First event failed: %v", err) 250 } 251 252 // Second event (duplicate) - should be handled gracefully 253 err = consumer.HandleEvent(ctx, &event) 254 if err != nil { 255 t.Fatalf("Duplicate event should be handled gracefully, got error: %v", err) 256 } 257 258 // Verify only one post in database 259 uri := fmt.Sprintf("at://%s/social.coves.community.post/%s", community.DID, rkey) 260 post, err := postRepo.GetByURI(ctx, uri) 261 if err != nil { 262 t.Fatalf("Post not found: %v", err) 263 } 264 265 if post.URI != uri { 266 t.Error("Post URI mismatch - possible duplicate indexing") 267 } 268 269 t.Logf("✓ Idempotency test passed") 270 }) 271 272 t.Run("Handles orphaned posts (unknown community)", func(t *testing.T) { 273 ctx := context.Background() 274 275 // Post references a community that doesn't exist in AppView yet 276 // This can happen if Jetstream delivers post event before community profile event 277 unknownCommunityDID := "did:plc:unknown999" 278 279 event := jetstream.JetstreamEvent{ 280 Did: unknownCommunityDID, 281 Kind: "commit", 282 Commit: &jetstream.CommitEvent{ 283 Operation: "create", 284 Collection: "social.coves.community.post", 285 RKey: generateTID(), 286 CID: "bafy2bzaceorphaned", 287 Record: map[string]interface{}{ 288 "$type": "social.coves.community.post", 289 "community": unknownCommunityDID, 290 "author": author.DID, 291 "title": "Orphaned Post", 292 "content": "Community not indexed yet", 293 "createdAt": time.Now().Format(time.RFC3339), 294 }, 295 }, 296 } 297 298 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 299 300 // Should log warning but NOT fail (eventual consistency) 301 // Note: This will fail due to foreign key constraint in current schema 302 // In production, you might want to handle this differently (defer indexing, etc.) 303 err := consumer.HandleEvent(ctx, &event) 304 305 // For now, we expect this to fail due to FK constraint 306 // In future, we might make FK constraint DEFERRABLE or handle orphaned posts differently 307 if err == nil { 308 t.Log("⚠️ Orphaned post was indexed (FK constraint not enforced)") 309 } else { 310 t.Logf("✓ Orphaned post rejected by FK constraint (expected): %v", err) 311 } 312 }) 313} 314 315// TestPostCreation_E2E_LivePDS tests the COMPLETE end-to-end flow with a live PDS: 316// 1. HTTP POST to /xrpc/social.coves.community.post.create (with auth) 317// 2. Handler → Service → Write to community's PDS repository 318// 3. PDS → Jetstream firehose event 319// 4. Jetstream consumer → Index in AppView database 320// 5. Verify post appears in database with correct data 321// 322// This is a TRUE E2E test that requires: 323// - Live PDS running at PDS_URL (default: http://localhost:3001) 324// - Live Jetstream running at JETSTREAM_URL (default: ws://localhost:6008/subscribe) 325// - Test database running 326func TestPostCreation_E2E_LivePDS(t *testing.T) { 327 if testing.Short() { 328 t.Skip("Skipping live PDS E2E test in short mode") 329 } 330 331 // Setup test database 332 dbURL := os.Getenv("TEST_DATABASE_URL") 333 if dbURL == "" { 334 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 335 } 336 337 db, err := sql.Open("postgres", dbURL) 338 require.NoError(t, err, "Failed to connect to test database") 339 defer func() { 340 if closeErr := db.Close(); closeErr != nil { 341 t.Logf("Failed to close database: %v", closeErr) 342 } 343 }() 344 345 // Run migrations 346 require.NoError(t, goose.SetDialect("postgres")) 347 require.NoError(t, goose.Up(db, "../../internal/db/migrations")) 348 349 // Check if PDS is running 350 pdsURL := os.Getenv("PDS_URL") 351 if pdsURL == "" { 352 pdsURL = "http://localhost:3001" 353 } 354 355 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 356 if err != nil { 357 t.Skipf("PDS not running at %s: %v", pdsURL, err) 358 } 359 _ = healthResp.Body.Close() 360 361 // Get instance credentials for authentication 362 instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE") 363 instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD") 364 if instanceHandle == "" { 365 instanceHandle = "testuser123.local.coves.dev" 366 } 367 if instancePassword == "" { 368 instancePassword = "test-password-123" 369 } 370 371 t.Logf("🔐 Authenticating with PDS as: %s", instanceHandle) 372 373 // Authenticate to get instance DID (needed for provisioner domain) 374 _, instanceDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword) 375 if err != nil { 376 t.Skipf("Failed to authenticate with PDS (may not be configured): %v", err) 377 } 378 379 t.Logf("✅ Authenticated - Instance DID: %s", instanceDID) 380 381 // Extract instance domain from DID for community provisioning 382 var instanceDomain string 383 if strings.HasPrefix(instanceDID, "did:web:") { 384 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 385 } else { 386 // Fallback for did:plc 387 instanceDomain = "coves.social" 388 } 389 390 // Setup repositories and services 391 communityRepo := postgres.NewCommunityRepository(db) 392 postRepo := postgres.NewPostRepository(db) 393 394 // Setup PDS account provisioner for community creation 395 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL) 396 397 // Setup community service with real PDS provisioner 398 communityService := communities.NewCommunityService( 399 communityRepo, 400 pdsURL, 401 instanceDID, 402 instanceDomain, 403 provisioner, // ✅ Real provisioner for creating communities on PDS 404 ) 405 406 postService := posts.NewPostService(postRepo, communityService, nil, pdsURL) // nil aggregatorService for user-only tests 407 408 // Setup auth middleware (skip JWT verification for testing) 409 authMiddleware := middleware.NewAtProtoAuthMiddleware(nil, true) 410 411 // Setup HTTP handler 412 createHandler := post.NewCreateHandler(postService) 413 414 ctx := context.Background() 415 416 // Cleanup old test data 417 _, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE 'did:plc:e2etest%'") 418 _, _ = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:e2etest%'") 419 _, _ = db.Exec("DELETE FROM users WHERE did LIKE 'did:plc:e2etest%'") 420 421 // Create test user (author) 422 author := createTestUser(t, db, "e2etestauthor.bsky.social", "did:plc:e2etestauthor123") 423 424 // ==================================================================================== 425 // Part 1: Write-Forward to PDS 426 // ==================================================================================== 427 t.Run("1. Write-Forward to PDS", func(t *testing.T) { 428 // TRUE E2E: Actually provision a real community on PDS 429 // This tests the full flow: 430 // 1. Call com.atproto.server.createAccount on PDS 431 // 2. PDS generates DID, keys, tokens 432 // 3. Write community profile to PDS repository 433 // 4. Store credentials in AppView DB 434 // 5. Use those credentials to create a post 435 436 // Use timestamp to ensure unique community name for each test run 437 communityName := fmt.Sprintf("e2epost%d", time.Now().Unix()) 438 439 t.Logf("\n📝 Provisioning test community on live PDS (name: %s)...", communityName) 440 community, err := communityService.CreateCommunity(ctx, communities.CreateCommunityRequest{ 441 Name: communityName, 442 DisplayName: "E2E Test Community", 443 Description: "Test community for E2E post creation testing", 444 CreatedByDID: author.DID, 445 Visibility: "public", 446 AllowExternalDiscovery: true, 447 }) 448 require.NoError(t, err, "Failed to provision community on PDS") 449 require.NotEmpty(t, community.DID, "Community should have DID from PDS") 450 require.NotEmpty(t, community.PDSAccessToken, "Community should have access token") 451 require.NotEmpty(t, community.PDSRefreshToken, "Community should have refresh token") 452 453 t.Logf("✓ Community provisioned: DID=%s, Handle=%s", community.DID, community.Handle) 454 455 // NOTE: Cleanup disabled to allow post-test inspection of indexed data 456 // Uncomment to enable cleanup after test 457 // defer func() { 458 // if err := communityRepo.Delete(ctx, community.DID); err != nil { 459 // t.Logf("Warning: Failed to cleanup test community: %v", err) 460 // } 461 // }() 462 463 // Build HTTP request for post creation 464 title := "E2E Test Post" 465 content := "This post was created via full E2E test with live PDS!" 466 reqBody := map[string]interface{}{ 467 "community": community.DID, 468 "title": title, 469 "content": content, 470 } 471 reqJSON, err := json.Marshal(reqBody) 472 require.NoError(t, err) 473 474 // Create HTTP request 475 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 476 req.Header.Set("Content-Type", "application/json") 477 478 // Create a simple JWT for testing (Phase 1: no signature verification) 479 // In production, this would be a real OAuth token from PDS 480 testJWT := createSimpleTestJWT(author.DID) 481 req.Header.Set("Authorization", "Bearer "+testJWT) 482 483 // Execute request through auth middleware + handler 484 rr := httptest.NewRecorder() 485 handler := authMiddleware.RequireAuth(http.HandlerFunc(createHandler.HandleCreate)) 486 handler.ServeHTTP(rr, req) 487 488 // Check response 489 require.Equal(t, http.StatusOK, rr.Code, "Handler should return 200 OK, body: %s", rr.Body.String()) 490 491 // Parse response 492 var response posts.CreatePostResponse 493 err = json.NewDecoder(rr.Body).Decode(&response) 494 require.NoError(t, err, "Failed to parse response") 495 496 t.Logf("✅ Post created on PDS:") 497 t.Logf(" URI: %s", response.URI) 498 t.Logf(" CID: %s", response.CID) 499 500 // ==================================================================================== 501 // Part 2: TRUE E2E - Real Jetstream Firehose Consumer 502 // ==================================================================================== 503 // This part tests the ACTUAL production code path in main.go 504 // including the WebSocket connection and consumer logic 505 t.Run("2. Real Jetstream Firehose Consumption", func(t *testing.T) { 506 t.Logf("\n🔄 TRUE E2E: Subscribing to real Jetstream firehose...") 507 508 // Get PDS hostname for Jetstream filtering 509 pdsHostname := strings.TrimPrefix(pdsURL, "http://") 510 pdsHostname = strings.TrimPrefix(pdsHostname, "https://") 511 pdsHostname = strings.Split(pdsHostname, ":")[0] // Remove port 512 513 // Build Jetstream URL with filters for post records 514 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.community.post", 515 pdsHostname) 516 517 t.Logf(" Jetstream URL: %s", jetstreamURL) 518 t.Logf(" Looking for post URI: %s", response.URI) 519 t.Logf(" Community DID: %s", community.DID) 520 521 // Setup user service (required by post consumer) 522 userRepo := postgres.NewUserRepository(db) 523 identityConfig := identity.DefaultConfig() 524 plcURL := os.Getenv("PLC_DIRECTORY_URL") 525 if plcURL == "" { 526 plcURL = "http://localhost:3002" 527 } 528 identityConfig.PLCURL = plcURL 529 identityResolver := identity.NewResolver(db, identityConfig) 530 userService := users.NewUserService(userRepo, identityResolver, pdsURL) 531 532 // Create post consumer (same as main.go) 533 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 534 535 // Channels to receive the event 536 eventChan := make(chan *jetstream.JetstreamEvent, 10) 537 errorChan := make(chan error, 1) 538 done := make(chan bool) 539 540 // Start Jetstream WebSocket subscriber in background 541 // This creates its own WebSocket connection to Jetstream 542 go func() { 543 err := subscribeToJetstreamForPost(ctx, jetstreamURL, community.DID, postConsumer, eventChan, errorChan, done) 544 if err != nil { 545 errorChan <- err 546 } 547 }() 548 549 // Wait for event or timeout 550 t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...") 551 552 select { 553 case event := <-eventChan: 554 t.Logf("✅ Received real Jetstream event!") 555 t.Logf(" Event DID: %s", event.Did) 556 t.Logf(" Collection: %s", event.Commit.Collection) 557 t.Logf(" Operation: %s", event.Commit.Operation) 558 t.Logf(" RKey: %s", event.Commit.RKey) 559 560 // Verify it's for our community 561 assert.Equal(t, community.DID, event.Did, "Event should be from community repo") 562 563 // Verify post was indexed in AppView database 564 t.Logf("\n🔍 Querying AppView database for indexed post...") 565 566 indexedPost, err := postRepo.GetByURI(ctx, response.URI) 567 require.NoError(t, err, "Post should be indexed in AppView") 568 569 t.Logf("✅ Post indexed in AppView:") 570 t.Logf(" URI: %s", indexedPost.URI) 571 t.Logf(" CID: %s", indexedPost.CID) 572 t.Logf(" Author DID: %s", indexedPost.AuthorDID) 573 t.Logf(" Community: %s", indexedPost.CommunityDID) 574 t.Logf(" Title: %v", indexedPost.Title) 575 t.Logf(" Content: %v", indexedPost.Content) 576 577 // Verify all fields match what we sent 578 assert.Equal(t, response.URI, indexedPost.URI, "URI should match") 579 assert.Equal(t, response.CID, indexedPost.CID, "CID should match") 580 assert.Equal(t, author.DID, indexedPost.AuthorDID, "Author DID should match") 581 assert.Equal(t, community.DID, indexedPost.CommunityDID, "Community DID should match") 582 assert.Equal(t, title, *indexedPost.Title, "Title should match") 583 assert.Equal(t, content, *indexedPost.Content, "Content should match") 584 585 // Verify stats initialized correctly 586 assert.Equal(t, 0, indexedPost.UpvoteCount, "Upvote count should be 0") 587 assert.Equal(t, 0, indexedPost.DownvoteCount, "Downvote count should be 0") 588 assert.Equal(t, 0, indexedPost.Score, "Score should be 0") 589 assert.Equal(t, 0, indexedPost.CommentCount, "Comment count should be 0") 590 591 // Verify timestamps 592 assert.False(t, indexedPost.CreatedAt.IsZero(), "CreatedAt should be set") 593 assert.False(t, indexedPost.IndexedAt.IsZero(), "IndexedAt should be set") 594 595 // Signal to stop Jetstream consumer 596 close(done) 597 598 t.Log("\n✅ Part 2 Complete: TRUE E2E - PDS → Jetstream → Consumer → AppView ✓") 599 600 case err := <-errorChan: 601 t.Fatalf("❌ Jetstream error: %v", err) 602 603 case <-time.After(30 * time.Second): 604 t.Fatalf("❌ Timeout: No Jetstream event received within 30 seconds") 605 } 606 }) 607 }) 608} 609 610// subscribeToJetstreamForPost subscribes to real Jetstream firehose and processes post events 611// This helper creates a WebSocket connection to Jetstream and waits for post events 612func subscribeToJetstreamForPost( 613 ctx context.Context, 614 jetstreamURL string, 615 targetDID string, 616 consumer *jetstream.PostEventConsumer, 617 eventChan chan<- *jetstream.JetstreamEvent, 618 errorChan chan<- error, 619 done <-chan bool, 620) error { 621 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 622 if err != nil { 623 return fmt.Errorf("failed to connect to Jetstream: %w", err) 624 } 625 defer func() { _ = conn.Close() }() 626 627 // Read messages until we find our event or receive done signal 628 for { 629 select { 630 case <-done: 631 return nil 632 case <-ctx.Done(): 633 return ctx.Err() 634 default: 635 // Set read deadline to avoid blocking forever 636 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { 637 return fmt.Errorf("failed to set read deadline: %w", err) 638 } 639 640 var event jetstream.JetstreamEvent 641 err := conn.ReadJSON(&event) 642 if err != nil { 643 // Check if it's a timeout (expected) 644 if websocket.IsCloseError(err, websocket.CloseNormalClosure) { 645 return nil 646 } 647 if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 648 continue // Timeout is expected, keep listening 649 } 650 // For other errors, don't retry reading from a broken connection 651 return fmt.Errorf("failed to read Jetstream message: %w", err) 652 } 653 654 // Check if this is a post event for the target DID 655 if event.Did == targetDID && event.Kind == "commit" && 656 event.Commit != nil && event.Commit.Collection == "social.coves.community.post" { 657 // Process the event through the consumer 658 if err := consumer.HandleEvent(ctx, &event); err != nil { 659 return fmt.Errorf("failed to process event: %w", err) 660 } 661 662 // Send to channel so test can verify 663 select { 664 case eventChan <- &event: 665 return nil 666 case <-time.After(1 * time.Second): 667 return fmt.Errorf("timeout sending event to channel") 668 } 669 } 670 } 671 } 672}