A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/api/handlers/post" 5 "Coves/internal/atproto/identity" 6 "Coves/internal/atproto/jetstream" 7 "Coves/internal/core/communities" 8 "Coves/internal/core/posts" 9 "Coves/internal/core/users" 10 "Coves/internal/db/postgres" 11 "bytes" 12 "context" 13 "database/sql" 14 "encoding/json" 15 "fmt" 16 "net" 17 "net/http" 18 "net/http/httptest" 19 "os" 20 "strings" 21 "testing" 22 "time" 23 24 "github.com/gorilla/websocket" 25 _ "github.com/lib/pq" 26 "github.com/pressly/goose/v3" 27 "github.com/stretchr/testify/assert" 28 "github.com/stretchr/testify/require" 29) 30 31// TestPostCreation_E2E_WithJetstream tests the full post creation flow: 32// XRPC endpoint → AppView Service → PDS write → Jetstream consumer → DB indexing 33// 34// This is a TRUE E2E test that simulates what happens in production: 35// 1. Client calls POST /xrpc/social.coves.community.post.create with auth token 36// 2. Handler validates and calls PostService.CreatePost() 37// 3. Service writes post to community's PDS repository 38// 4. PDS broadcasts event to firehose/Jetstream 39// 5. Jetstream consumer receives event and indexes post in AppView DB 40// 6. Post is now queryable from AppView 41// 42// NOTE: This test simulates the Jetstream event (step 4-5) since we don't have 43// a live PDS/Jetstream in test environment. For true live testing, use TestPostCreation_E2E_LivePDS. 44func TestPostCreation_E2E_WithJetstream(t *testing.T) { 45 db := setupTestDB(t) 46 defer func() { 47 if err := db.Close(); err != nil { 48 t.Logf("Failed to close database: %v", err) 49 } 50 }() 51 52 // Cleanup old test data first 53 _, _ = db.Exec("DELETE FROM posts WHERE community_did = 'did:plc:gaming123'") 54 _, _ = db.Exec("DELETE FROM communities WHERE did = 'did:plc:gaming123'") 55 _, _ = db.Exec("DELETE FROM users WHERE did = 'did:plc:alice123'") 56 57 // Setup repositories 58 userRepo := postgres.NewUserRepository(db) 59 communityRepo := postgres.NewCommunityRepository(db) 60 postRepo := postgres.NewPostRepository(db) 61 62 // Setup user service for post consumer 63 identityConfig := identity.DefaultConfig() 64 identityResolver := identity.NewResolver(db, identityConfig) 65 userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001") 66 67 // Create test user (author) 68 author := createTestUser(t, db, "alice.test", "did:plc:alice123") 69 70 // Create test community with fake PDS credentials 71 // In real E2E, this would be a real community provisioned on PDS 72 community := &communities.Community{ 73 DID: "did:plc:gaming123", 74 Handle: "gaming.community.test.coves.social", 75 Name: "gaming", 76 DisplayName: "Gaming Community", 77 OwnerDID: "did:plc:gaming123", 78 CreatedByDID: author.DID, 79 HostedByDID: "did:web:coves.test", 80 Visibility: "public", 81 ModerationType: "moderator", 82 RecordURI: "at://did:plc:gaming123/social.coves.community.profile/self", 83 RecordCID: "fakecid123", 84 PDSAccessToken: "fake_token_for_testing", 85 PDSRefreshToken: "fake_refresh_token", 86 } 87 _, err := communityRepo.Create(context.Background(), community) 88 if err != nil { 89 t.Fatalf("Failed to create test community: %v", err) 90 } 91 92 t.Run("Full E2E flow - XRPC to DB via Jetstream", func(t *testing.T) { 93 ctx := context.Background() 94 95 // STEP 1: Simulate what the XRPC handler would receive 96 // In real flow, this comes from client with OAuth bearer token 97 title := "My First Post" 98 content := "This is a test post!" 99 postReq := posts.CreatePostRequest{ 100 Title: &title, 101 Content: &content, 102 // Community and AuthorDID set by handler from request context 103 } 104 105 // STEP 2: Simulate Jetstream consumer receiving the post CREATE event 106 // In real production, this event comes from PDS via Jetstream WebSocket 107 // For this test, we simulate the event that would be broadcast after PDS write 108 109 // Generate a realistic rkey (TID - timestamp identifier) 110 rkey := generateTID() 111 112 // Build the post record as it would appear in Jetstream 113 jetstreamEvent := jetstream.JetstreamEvent{ 114 Did: community.DID, // Repo owner (community) 115 Kind: "commit", 116 Commit: &jetstream.CommitEvent{ 117 Operation: "create", 118 Collection: "social.coves.community.post", 119 RKey: rkey, 120 CID: "bafy2bzaceabc123def456", // Fake CID 121 Record: map[string]interface{}{ 122 "$type": "social.coves.community.post", 123 "community": community.DID, 124 "author": author.DID, 125 "title": *postReq.Title, 126 "content": *postReq.Content, 127 "createdAt": time.Now().Format(time.RFC3339), 128 }, 129 }, 130 } 131 132 // STEP 3: Process event through Jetstream consumer 133 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 134 err := consumer.HandleEvent(ctx, &jetstreamEvent) 135 if err != nil { 136 t.Fatalf("Jetstream consumer failed to process event: %v", err) 137 } 138 139 // STEP 4: Verify post was indexed in AppView database 140 expectedURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", community.DID, rkey) 141 indexedPost, err := postRepo.GetByURI(ctx, expectedURI) 142 if err != nil { 143 t.Fatalf("Post not indexed in AppView: %v", err) 144 } 145 146 // STEP 5: Verify all fields are correct 147 if indexedPost.URI != expectedURI { 148 t.Errorf("Expected URI %s, got %s", expectedURI, indexedPost.URI) 149 } 150 if indexedPost.AuthorDID != author.DID { 151 t.Errorf("Expected author %s, got %s", author.DID, indexedPost.AuthorDID) 152 } 153 if indexedPost.CommunityDID != community.DID { 154 t.Errorf("Expected community %s, got %s", community.DID, indexedPost.CommunityDID) 155 } 156 if indexedPost.Title == nil || *indexedPost.Title != title { 157 t.Errorf("Expected title '%s', got %v", title, indexedPost.Title) 158 } 159 if indexedPost.Content == nil || *indexedPost.Content != content { 160 t.Errorf("Expected content '%s', got %v", content, indexedPost.Content) 161 } 162 163 // Verify stats initialized correctly 164 if indexedPost.UpvoteCount != 0 { 165 t.Errorf("Expected upvote_count 0, got %d", indexedPost.UpvoteCount) 166 } 167 if indexedPost.DownvoteCount != 0 { 168 t.Errorf("Expected downvote_count 0, got %d", indexedPost.DownvoteCount) 169 } 170 if indexedPost.Score != 0 { 171 t.Errorf("Expected score 0, got %d", indexedPost.Score) 172 } 173 174 t.Logf("✓ E2E test passed! Post indexed with URI: %s", indexedPost.URI) 175 }) 176 177 t.Run("Consumer validates repository ownership (security)", func(t *testing.T) { 178 ctx := context.Background() 179 180 // SECURITY TEST: Try to create a post that claims to be from the community 181 // but actually comes from a user's repository 182 // This should be REJECTED by the consumer 183 184 maliciousEvent := jetstream.JetstreamEvent{ 185 Did: author.DID, // Event from user's repo (NOT community repo) 186 Kind: "commit", 187 Commit: &jetstream.CommitEvent{ 188 Operation: "create", 189 Collection: "social.coves.community.post", 190 RKey: generateTID(), 191 CID: "bafy2bzacefake", 192 Record: map[string]interface{}{ 193 "$type": "social.coves.community.post", 194 "community": community.DID, // Claims to be for this community 195 "author": author.DID, 196 "title": "Fake Post", 197 "content": "This is a malicious post attempt", 198 "createdAt": time.Now().Format(time.RFC3339), 199 }, 200 }, 201 } 202 203 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 204 err := consumer.HandleEvent(ctx, &maliciousEvent) 205 206 // Should get security error 207 if err == nil { 208 t.Fatal("Expected security error for post from wrong repository, got nil") 209 } 210 211 if !contains(err.Error(), "repository DID") || !contains(err.Error(), "doesn't match") { 212 t.Errorf("Expected repository mismatch error, got: %v", err) 213 } 214 215 t.Logf("✓ Security validation passed: %v", err) 216 }) 217 218 t.Run("Idempotent indexing - duplicate events", func(t *testing.T) { 219 ctx := context.Background() 220 221 // Simulate the same Jetstream event arriving twice 222 // This can happen during Jetstream replays or network retries 223 rkey := generateTID() 224 event := jetstream.JetstreamEvent{ 225 Did: community.DID, 226 Kind: "commit", 227 Commit: &jetstream.CommitEvent{ 228 Operation: "create", 229 Collection: "social.coves.community.post", 230 RKey: rkey, 231 CID: "bafy2bzaceidempotent", 232 Record: map[string]interface{}{ 233 "$type": "social.coves.community.post", 234 "community": community.DID, 235 "author": author.DID, 236 "title": "Duplicate Test", 237 "content": "Testing idempotency", 238 "createdAt": time.Now().Format(time.RFC3339), 239 }, 240 }, 241 } 242 243 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 244 245 // First event - should succeed 246 err := consumer.HandleEvent(ctx, &event) 247 if err != nil { 248 t.Fatalf("First event failed: %v", err) 249 } 250 251 // Second event (duplicate) - should be handled gracefully 252 err = consumer.HandleEvent(ctx, &event) 253 if err != nil { 254 t.Fatalf("Duplicate event should be handled gracefully, got error: %v", err) 255 } 256 257 // Verify only one post in database 258 uri := fmt.Sprintf("at://%s/social.coves.community.post/%s", community.DID, rkey) 259 post, err := postRepo.GetByURI(ctx, uri) 260 if err != nil { 261 t.Fatalf("Post not found: %v", err) 262 } 263 264 if post.URI != uri { 265 t.Error("Post URI mismatch - possible duplicate indexing") 266 } 267 268 t.Logf("✓ Idempotency test passed") 269 }) 270 271 t.Run("Handles orphaned posts (unknown community)", func(t *testing.T) { 272 ctx := context.Background() 273 274 // Post references a community that doesn't exist in AppView yet 275 // This can happen if Jetstream delivers post event before community profile event 276 unknownCommunityDID := "did:plc:unknown999" 277 278 event := jetstream.JetstreamEvent{ 279 Did: unknownCommunityDID, 280 Kind: "commit", 281 Commit: &jetstream.CommitEvent{ 282 Operation: "create", 283 Collection: "social.coves.community.post", 284 RKey: generateTID(), 285 CID: "bafy2bzaceorphaned", 286 Record: map[string]interface{}{ 287 "$type": "social.coves.community.post", 288 "community": unknownCommunityDID, 289 "author": author.DID, 290 "title": "Orphaned Post", 291 "content": "Community not indexed yet", 292 "createdAt": time.Now().Format(time.RFC3339), 293 }, 294 }, 295 } 296 297 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 298 299 // Should log warning but NOT fail (eventual consistency) 300 // Note: This will fail due to foreign key constraint in current schema 301 // In production, you might want to handle this differently (defer indexing, etc.) 302 err := consumer.HandleEvent(ctx, &event) 303 304 // For now, we expect this to fail due to FK constraint 305 // In future, we might make FK constraint DEFERRABLE or handle orphaned posts differently 306 if err == nil { 307 t.Log("⚠️ Orphaned post was indexed (FK constraint not enforced)") 308 } else { 309 t.Logf("✓ Orphaned post rejected by FK constraint (expected): %v", err) 310 } 311 }) 312} 313 314// TestPostCreation_E2E_LivePDS tests the COMPLETE end-to-end flow with a live PDS: 315// 1. HTTP POST to /xrpc/social.coves.community.post.create (with auth) 316// 2. Handler → Service → Write to community's PDS repository 317// 3. PDS → Jetstream firehose event 318// 4. Jetstream consumer → Index in AppView database 319// 5. Verify post appears in database with correct data 320// 321// This is a TRUE E2E test that requires: 322// - Live PDS running at PDS_URL (default: http://localhost:3001) 323// - Live Jetstream running at JETSTREAM_URL (default: ws://localhost:6008/subscribe) 324// - Test database running 325func TestPostCreation_E2E_LivePDS(t *testing.T) { 326 if testing.Short() { 327 t.Skip("Skipping live PDS E2E test in short mode") 328 } 329 330 // Setup test database 331 dbURL := os.Getenv("TEST_DATABASE_URL") 332 if dbURL == "" { 333 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 334 } 335 336 db, err := sql.Open("postgres", dbURL) 337 require.NoError(t, err, "Failed to connect to test database") 338 defer func() { 339 if closeErr := db.Close(); closeErr != nil { 340 t.Logf("Failed to close database: %v", closeErr) 341 } 342 }() 343 344 // Run migrations 345 require.NoError(t, goose.SetDialect("postgres")) 346 require.NoError(t, goose.Up(db, "../../internal/db/migrations")) 347 348 // Check if PDS is running 349 pdsURL := os.Getenv("PDS_URL") 350 if pdsURL == "" { 351 pdsURL = "http://localhost:3001" 352 } 353 354 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 355 if err != nil { 356 t.Skipf("PDS not running at %s: %v", pdsURL, err) 357 } 358 _ = healthResp.Body.Close() 359 360 // Get instance credentials for authentication 361 instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE") 362 instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD") 363 if instanceHandle == "" { 364 instanceHandle = "testuser123.local.coves.dev" 365 } 366 if instancePassword == "" { 367 instancePassword = "test-password-123" 368 } 369 370 t.Logf("🔐 Authenticating with PDS as: %s", instanceHandle) 371 372 // Authenticate to get instance DID (needed for provisioner domain) 373 _, instanceDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword) 374 if err != nil { 375 t.Skipf("Failed to authenticate with PDS (may not be configured): %v", err) 376 } 377 378 t.Logf("✅ Authenticated - Instance DID: %s", instanceDID) 379 380 // Extract instance domain from DID for community provisioning 381 var instanceDomain string 382 if strings.HasPrefix(instanceDID, "did:web:") { 383 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 384 } else { 385 // Fallback for did:plc 386 instanceDomain = "coves.social" 387 } 388 389 // Setup repositories and services 390 communityRepo := postgres.NewCommunityRepository(db) 391 postRepo := postgres.NewPostRepository(db) 392 393 // Setup PDS account provisioner for community creation 394 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL) 395 396 // Setup community service with real PDS provisioner 397 communityService := communities.NewCommunityService( 398 communityRepo, 399 pdsURL, 400 instanceDID, 401 instanceDomain, 402 provisioner, // ✅ Real provisioner for creating communities on PDS 403 ) 404 405 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, pdsURL) // nil aggregatorService, blobService, unfurlService for user-only tests 406 407 // Setup OAuth auth middleware for E2E testing 408 e2eAuth := NewE2EOAuthMiddleware() 409 410 // Setup HTTP handler 411 createHandler := post.NewCreateHandler(postService) 412 413 ctx := context.Background() 414 415 // Cleanup old test data 416 _, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE 'did:plc:e2etest%'") 417 _, _ = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:e2etest%'") 418 _, _ = db.Exec("DELETE FROM users WHERE did LIKE 'did:plc:e2etest%'") 419 420 // Create test user (author) 421 author := createTestUser(t, db, "e2etestauthor.bsky.social", "did:plc:e2etestauthor123") 422 423 // ==================================================================================== 424 // Part 1: Write-Forward to PDS 425 // ==================================================================================== 426 t.Run("1. Write-Forward to PDS", func(t *testing.T) { 427 // TRUE E2E: Actually provision a real community on PDS 428 // This tests the full flow: 429 // 1. Call com.atproto.server.createAccount on PDS 430 // 2. PDS generates DID, keys, tokens 431 // 3. Write community profile to PDS repository 432 // 4. Store credentials in AppView DB 433 // 5. Use those credentials to create a post 434 435 // Use timestamp to ensure unique community name for each test run 436 communityName := fmt.Sprintf("e2epost%d", time.Now().Unix()) 437 438 t.Logf("\n📝 Provisioning test community on live PDS (name: %s)...", communityName) 439 community, err := communityService.CreateCommunity(ctx, communities.CreateCommunityRequest{ 440 Name: communityName, 441 DisplayName: "E2E Test Community", 442 Description: "Test community for E2E post creation testing", 443 CreatedByDID: author.DID, 444 Visibility: "public", 445 AllowExternalDiscovery: true, 446 }) 447 require.NoError(t, err, "Failed to provision community on PDS") 448 require.NotEmpty(t, community.DID, "Community should have DID from PDS") 449 require.NotEmpty(t, community.PDSAccessToken, "Community should have access token") 450 require.NotEmpty(t, community.PDSRefreshToken, "Community should have refresh token") 451 452 t.Logf("✓ Community provisioned: DID=%s, Handle=%s", community.DID, community.Handle) 453 454 // NOTE: Cleanup disabled to allow post-test inspection of indexed data 455 // Uncomment to enable cleanup after test 456 // defer func() { 457 // if err := communityRepo.Delete(ctx, community.DID); err != nil { 458 // t.Logf("Warning: Failed to cleanup test community: %v", err) 459 // } 460 // }() 461 462 // Build HTTP request for post creation 463 title := "E2E Test Post" 464 content := "This post was created via full E2E test with live PDS!" 465 reqBody := map[string]interface{}{ 466 "community": community.DID, 467 "title": title, 468 "content": content, 469 } 470 reqJSON, err := json.Marshal(reqBody) 471 require.NoError(t, err) 472 473 // Create HTTP request 474 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 475 req.Header.Set("Content-Type", "application/json") 476 477 // Register the author user with OAuth middleware and get test token 478 // For Coves API handlers, use Bearer scheme with OAuth middleware 479 token := e2eAuth.AddUser(author.DID) 480 req.Header.Set("Authorization", "Bearer "+token) 481 482 // Execute request through auth middleware + handler 483 rr := httptest.NewRecorder() 484 handler := e2eAuth.RequireAuth(http.HandlerFunc(createHandler.HandleCreate)) 485 handler.ServeHTTP(rr, req) 486 487 // Check response 488 require.Equal(t, http.StatusOK, rr.Code, "Handler should return 200 OK, body: %s", rr.Body.String()) 489 490 // Parse response 491 var response posts.CreatePostResponse 492 err = json.NewDecoder(rr.Body).Decode(&response) 493 require.NoError(t, err, "Failed to parse response") 494 495 t.Logf("✅ Post created on PDS:") 496 t.Logf(" URI: %s", response.URI) 497 t.Logf(" CID: %s", response.CID) 498 499 // ==================================================================================== 500 // Part 2: TRUE E2E - Real Jetstream Firehose Consumer 501 // ==================================================================================== 502 // This part tests the ACTUAL production code path in main.go 503 // including the WebSocket connection and consumer logic 504 t.Run("2. Real Jetstream Firehose Consumption", func(t *testing.T) { 505 t.Logf("\n🔄 TRUE E2E: Subscribing to real Jetstream firehose...") 506 507 // Get PDS hostname for Jetstream filtering 508 pdsHostname := strings.TrimPrefix(pdsURL, "http://") 509 pdsHostname = strings.TrimPrefix(pdsHostname, "https://") 510 pdsHostname = strings.Split(pdsHostname, ":")[0] // Remove port 511 512 // Build Jetstream URL with filters for post records 513 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.community.post", 514 pdsHostname) 515 516 t.Logf(" Jetstream URL: %s", jetstreamURL) 517 t.Logf(" Looking for post URI: %s", response.URI) 518 t.Logf(" Community DID: %s", community.DID) 519 520 // Setup user service (required by post consumer) 521 userRepo := postgres.NewUserRepository(db) 522 identityConfig := identity.DefaultConfig() 523 plcURL := os.Getenv("PLC_DIRECTORY_URL") 524 if plcURL == "" { 525 plcURL = "http://localhost:3002" 526 } 527 identityConfig.PLCURL = plcURL 528 identityResolver := identity.NewResolver(db, identityConfig) 529 userService := users.NewUserService(userRepo, identityResolver, pdsURL) 530 531 // Create post consumer (same as main.go) 532 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 533 534 // Channels to receive the event 535 eventChan := make(chan *jetstream.JetstreamEvent, 10) 536 errorChan := make(chan error, 1) 537 done := make(chan bool) 538 539 // Start Jetstream WebSocket subscriber in background 540 // This creates its own WebSocket connection to Jetstream 541 go func() { 542 err := subscribeToJetstreamForPost(ctx, jetstreamURL, community.DID, postConsumer, eventChan, errorChan, done) 543 if err != nil { 544 errorChan <- err 545 } 546 }() 547 548 // Wait for event or timeout 549 t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...") 550 551 select { 552 case event := <-eventChan: 553 t.Logf("✅ Received real Jetstream event!") 554 t.Logf(" Event DID: %s", event.Did) 555 t.Logf(" Collection: %s", event.Commit.Collection) 556 t.Logf(" Operation: %s", event.Commit.Operation) 557 t.Logf(" RKey: %s", event.Commit.RKey) 558 559 // Verify it's for our community 560 assert.Equal(t, community.DID, event.Did, "Event should be from community repo") 561 562 // Verify post was indexed in AppView database 563 t.Logf("\n🔍 Querying AppView database for indexed post...") 564 565 indexedPost, err := postRepo.GetByURI(ctx, response.URI) 566 require.NoError(t, err, "Post should be indexed in AppView") 567 568 t.Logf("✅ Post indexed in AppView:") 569 t.Logf(" URI: %s", indexedPost.URI) 570 t.Logf(" CID: %s", indexedPost.CID) 571 t.Logf(" Author DID: %s", indexedPost.AuthorDID) 572 t.Logf(" Community: %s", indexedPost.CommunityDID) 573 t.Logf(" Title: %v", indexedPost.Title) 574 t.Logf(" Content: %v", indexedPost.Content) 575 576 // Verify all fields match what we sent 577 assert.Equal(t, response.URI, indexedPost.URI, "URI should match") 578 assert.Equal(t, response.CID, indexedPost.CID, "CID should match") 579 assert.Equal(t, author.DID, indexedPost.AuthorDID, "Author DID should match") 580 assert.Equal(t, community.DID, indexedPost.CommunityDID, "Community DID should match") 581 assert.Equal(t, title, *indexedPost.Title, "Title should match") 582 assert.Equal(t, content, *indexedPost.Content, "Content should match") 583 584 // Verify stats initialized correctly 585 assert.Equal(t, 0, indexedPost.UpvoteCount, "Upvote count should be 0") 586 assert.Equal(t, 0, indexedPost.DownvoteCount, "Downvote count should be 0") 587 assert.Equal(t, 0, indexedPost.Score, "Score should be 0") 588 assert.Equal(t, 0, indexedPost.CommentCount, "Comment count should be 0") 589 590 // Verify timestamps 591 assert.False(t, indexedPost.CreatedAt.IsZero(), "CreatedAt should be set") 592 assert.False(t, indexedPost.IndexedAt.IsZero(), "IndexedAt should be set") 593 594 // Signal to stop Jetstream consumer 595 close(done) 596 597 t.Log("\n✅ Part 2 Complete: TRUE E2E - PDS → Jetstream → Consumer → AppView ✓") 598 599 case err := <-errorChan: 600 t.Fatalf("❌ Jetstream error: %v", err) 601 602 case <-time.After(30 * time.Second): 603 t.Fatalf("❌ Timeout: No Jetstream event received within 30 seconds") 604 } 605 }) 606 }) 607} 608 609// subscribeToJetstreamForPost subscribes to real Jetstream firehose and processes post events 610// This helper creates a WebSocket connection to Jetstream and waits for post events 611func subscribeToJetstreamForPost( 612 ctx context.Context, 613 jetstreamURL string, 614 targetDID string, 615 consumer *jetstream.PostEventConsumer, 616 eventChan chan<- *jetstream.JetstreamEvent, 617 errorChan chan<- error, 618 done <-chan bool, 619) error { 620 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 621 if err != nil { 622 return fmt.Errorf("failed to connect to Jetstream: %w", err) 623 } 624 defer func() { _ = conn.Close() }() 625 626 // Read messages until we find our event or receive done signal 627 for { 628 select { 629 case <-done: 630 return nil 631 case <-ctx.Done(): 632 return ctx.Err() 633 default: 634 // Set read deadline to avoid blocking forever 635 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { 636 return fmt.Errorf("failed to set read deadline: %w", err) 637 } 638 639 var event jetstream.JetstreamEvent 640 err := conn.ReadJSON(&event) 641 if err != nil { 642 // Check if it's a timeout (expected) 643 if websocket.IsCloseError(err, websocket.CloseNormalClosure) { 644 return nil 645 } 646 if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 647 continue // Timeout is expected, keep listening 648 } 649 // For other errors, don't retry reading from a broken connection 650 return fmt.Errorf("failed to read Jetstream message: %w", err) 651 } 652 653 // Check if this is a post event for the target DID 654 if event.Did == targetDID && event.Kind == "commit" && 655 event.Commit != nil && event.Commit.Collection == "social.coves.community.post" { 656 // Process the event through the consumer 657 if err := consumer.HandleEvent(ctx, &event); err != nil { 658 return fmt.Errorf("failed to process event: %w", err) 659 } 660 661 // Send to channel so test can verify 662 select { 663 case eventChan <- &event: 664 return nil 665 case <-time.After(1 * time.Second): 666 return fmt.Errorf("timeout sending event to channel") 667 } 668 } 669 } 670 } 671}