A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/api/handlers/post"
5 "Coves/internal/atproto/identity"
6 "Coves/internal/atproto/jetstream"
7 "Coves/internal/core/communities"
8 "Coves/internal/core/posts"
9 "Coves/internal/core/users"
10 "Coves/internal/db/postgres"
11 "bytes"
12 "context"
13 "database/sql"
14 "encoding/json"
15 "fmt"
16 "net"
17 "net/http"
18 "net/http/httptest"
19 "os"
20 "strings"
21 "testing"
22 "time"
23
24 "github.com/gorilla/websocket"
25 _ "github.com/lib/pq"
26 "github.com/pressly/goose/v3"
27 "github.com/stretchr/testify/assert"
28 "github.com/stretchr/testify/require"
29)
30
31// TestPostCreation_E2E_WithJetstream tests the full post creation flow:
32// XRPC endpoint → AppView Service → PDS write → Jetstream consumer → DB indexing
33//
34// This is a TRUE E2E test that simulates what happens in production:
35// 1. Client calls POST /xrpc/social.coves.community.post.create with auth token
36// 2. Handler validates and calls PostService.CreatePost()
37// 3. Service writes post to community's PDS repository
38// 4. PDS broadcasts event to firehose/Jetstream
39// 5. Jetstream consumer receives event and indexes post in AppView DB
40// 6. Post is now queryable from AppView
41//
42// NOTE: This test simulates the Jetstream event (step 4-5) since we don't have
43// a live PDS/Jetstream in test environment. For true live testing, use TestPostCreation_E2E_LivePDS.
44func TestPostCreation_E2E_WithJetstream(t *testing.T) {
45 db := setupTestDB(t)
46 defer func() {
47 if err := db.Close(); err != nil {
48 t.Logf("Failed to close database: %v", err)
49 }
50 }()
51
52 // Cleanup old test data first
53 _, _ = db.Exec("DELETE FROM posts WHERE community_did = 'did:plc:gaming123'")
54 _, _ = db.Exec("DELETE FROM communities WHERE did = 'did:plc:gaming123'")
55 _, _ = db.Exec("DELETE FROM users WHERE did = 'did:plc:alice123'")
56
57 // Setup repositories
58 userRepo := postgres.NewUserRepository(db)
59 communityRepo := postgres.NewCommunityRepository(db)
60 postRepo := postgres.NewPostRepository(db)
61
62 // Setup user service for post consumer
63 identityConfig := identity.DefaultConfig()
64 identityResolver := identity.NewResolver(db, identityConfig)
65 userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001")
66
67 // Create test user (author)
68 author := createTestUser(t, db, "alice.test", "did:plc:alice123")
69
70 // Create test community with fake PDS credentials
71 // In real E2E, this would be a real community provisioned on PDS
72 community := &communities.Community{
73 DID: "did:plc:gaming123",
74 Handle: "gaming.community.test.coves.social",
75 Name: "gaming",
76 DisplayName: "Gaming Community",
77 OwnerDID: "did:plc:gaming123",
78 CreatedByDID: author.DID,
79 HostedByDID: "did:web:coves.test",
80 Visibility: "public",
81 ModerationType: "moderator",
82 RecordURI: "at://did:plc:gaming123/social.coves.community.profile/self",
83 RecordCID: "fakecid123",
84 PDSAccessToken: "fake_token_for_testing",
85 PDSRefreshToken: "fake_refresh_token",
86 }
87 _, err := communityRepo.Create(context.Background(), community)
88 if err != nil {
89 t.Fatalf("Failed to create test community: %v", err)
90 }
91
92 t.Run("Full E2E flow - XRPC to DB via Jetstream", func(t *testing.T) {
93 ctx := context.Background()
94
95 // STEP 1: Simulate what the XRPC handler would receive
96 // In real flow, this comes from client with OAuth bearer token
97 title := "My First Post"
98 content := "This is a test post!"
99 postReq := posts.CreatePostRequest{
100 Title: &title,
101 Content: &content,
102 // Community and AuthorDID set by handler from request context
103 }
104
105 // STEP 2: Simulate Jetstream consumer receiving the post CREATE event
106 // In real production, this event comes from PDS via Jetstream WebSocket
107 // For this test, we simulate the event that would be broadcast after PDS write
108
109 // Generate a realistic rkey (TID - timestamp identifier)
110 rkey := generateTID()
111
112 // Build the post record as it would appear in Jetstream
113 jetstreamEvent := jetstream.JetstreamEvent{
114 Did: community.DID, // Repo owner (community)
115 Kind: "commit",
116 Commit: &jetstream.CommitEvent{
117 Operation: "create",
118 Collection: "social.coves.community.post",
119 RKey: rkey,
120 CID: "bafy2bzaceabc123def456", // Fake CID
121 Record: map[string]interface{}{
122 "$type": "social.coves.community.post",
123 "community": community.DID,
124 "author": author.DID,
125 "title": *postReq.Title,
126 "content": *postReq.Content,
127 "createdAt": time.Now().Format(time.RFC3339),
128 },
129 },
130 }
131
132 // STEP 3: Process event through Jetstream consumer
133 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
134 err := consumer.HandleEvent(ctx, &jetstreamEvent)
135 if err != nil {
136 t.Fatalf("Jetstream consumer failed to process event: %v", err)
137 }
138
139 // STEP 4: Verify post was indexed in AppView database
140 expectedURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", community.DID, rkey)
141 indexedPost, err := postRepo.GetByURI(ctx, expectedURI)
142 if err != nil {
143 t.Fatalf("Post not indexed in AppView: %v", err)
144 }
145
146 // STEP 5: Verify all fields are correct
147 if indexedPost.URI != expectedURI {
148 t.Errorf("Expected URI %s, got %s", expectedURI, indexedPost.URI)
149 }
150 if indexedPost.AuthorDID != author.DID {
151 t.Errorf("Expected author %s, got %s", author.DID, indexedPost.AuthorDID)
152 }
153 if indexedPost.CommunityDID != community.DID {
154 t.Errorf("Expected community %s, got %s", community.DID, indexedPost.CommunityDID)
155 }
156 if indexedPost.Title == nil || *indexedPost.Title != title {
157 t.Errorf("Expected title '%s', got %v", title, indexedPost.Title)
158 }
159 if indexedPost.Content == nil || *indexedPost.Content != content {
160 t.Errorf("Expected content '%s', got %v", content, indexedPost.Content)
161 }
162
163 // Verify stats initialized correctly
164 if indexedPost.UpvoteCount != 0 {
165 t.Errorf("Expected upvote_count 0, got %d", indexedPost.UpvoteCount)
166 }
167 if indexedPost.DownvoteCount != 0 {
168 t.Errorf("Expected downvote_count 0, got %d", indexedPost.DownvoteCount)
169 }
170 if indexedPost.Score != 0 {
171 t.Errorf("Expected score 0, got %d", indexedPost.Score)
172 }
173
174 t.Logf("✓ E2E test passed! Post indexed with URI: %s", indexedPost.URI)
175 })
176
177 t.Run("Consumer validates repository ownership (security)", func(t *testing.T) {
178 ctx := context.Background()
179
180 // SECURITY TEST: Try to create a post that claims to be from the community
181 // but actually comes from a user's repository
182 // This should be REJECTED by the consumer
183
184 maliciousEvent := jetstream.JetstreamEvent{
185 Did: author.DID, // Event from user's repo (NOT community repo)
186 Kind: "commit",
187 Commit: &jetstream.CommitEvent{
188 Operation: "create",
189 Collection: "social.coves.community.post",
190 RKey: generateTID(),
191 CID: "bafy2bzacefake",
192 Record: map[string]interface{}{
193 "$type": "social.coves.community.post",
194 "community": community.DID, // Claims to be for this community
195 "author": author.DID,
196 "title": "Fake Post",
197 "content": "This is a malicious post attempt",
198 "createdAt": time.Now().Format(time.RFC3339),
199 },
200 },
201 }
202
203 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
204 err := consumer.HandleEvent(ctx, &maliciousEvent)
205
206 // Should get security error
207 if err == nil {
208 t.Fatal("Expected security error for post from wrong repository, got nil")
209 }
210
211 if !contains(err.Error(), "repository DID") || !contains(err.Error(), "doesn't match") {
212 t.Errorf("Expected repository mismatch error, got: %v", err)
213 }
214
215 t.Logf("✓ Security validation passed: %v", err)
216 })
217
218 t.Run("Idempotent indexing - duplicate events", func(t *testing.T) {
219 ctx := context.Background()
220
221 // Simulate the same Jetstream event arriving twice
222 // This can happen during Jetstream replays or network retries
223 rkey := generateTID()
224 event := jetstream.JetstreamEvent{
225 Did: community.DID,
226 Kind: "commit",
227 Commit: &jetstream.CommitEvent{
228 Operation: "create",
229 Collection: "social.coves.community.post",
230 RKey: rkey,
231 CID: "bafy2bzaceidempotent",
232 Record: map[string]interface{}{
233 "$type": "social.coves.community.post",
234 "community": community.DID,
235 "author": author.DID,
236 "title": "Duplicate Test",
237 "content": "Testing idempotency",
238 "createdAt": time.Now().Format(time.RFC3339),
239 },
240 },
241 }
242
243 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
244
245 // First event - should succeed
246 err := consumer.HandleEvent(ctx, &event)
247 if err != nil {
248 t.Fatalf("First event failed: %v", err)
249 }
250
251 // Second event (duplicate) - should be handled gracefully
252 err = consumer.HandleEvent(ctx, &event)
253 if err != nil {
254 t.Fatalf("Duplicate event should be handled gracefully, got error: %v", err)
255 }
256
257 // Verify only one post in database
258 uri := fmt.Sprintf("at://%s/social.coves.community.post/%s", community.DID, rkey)
259 post, err := postRepo.GetByURI(ctx, uri)
260 if err != nil {
261 t.Fatalf("Post not found: %v", err)
262 }
263
264 if post.URI != uri {
265 t.Error("Post URI mismatch - possible duplicate indexing")
266 }
267
268 t.Logf("✓ Idempotency test passed")
269 })
270
271 t.Run("Handles orphaned posts (unknown community)", func(t *testing.T) {
272 ctx := context.Background()
273
274 // Post references a community that doesn't exist in AppView yet
275 // This can happen if Jetstream delivers post event before community profile event
276 unknownCommunityDID := "did:plc:unknown999"
277
278 event := jetstream.JetstreamEvent{
279 Did: unknownCommunityDID,
280 Kind: "commit",
281 Commit: &jetstream.CommitEvent{
282 Operation: "create",
283 Collection: "social.coves.community.post",
284 RKey: generateTID(),
285 CID: "bafy2bzaceorphaned",
286 Record: map[string]interface{}{
287 "$type": "social.coves.community.post",
288 "community": unknownCommunityDID,
289 "author": author.DID,
290 "title": "Orphaned Post",
291 "content": "Community not indexed yet",
292 "createdAt": time.Now().Format(time.RFC3339),
293 },
294 },
295 }
296
297 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
298
299 // Should log warning but NOT fail (eventual consistency)
300 // Note: This will fail due to foreign key constraint in current schema
301 // In production, you might want to handle this differently (defer indexing, etc.)
302 err := consumer.HandleEvent(ctx, &event)
303
304 // For now, we expect this to fail due to FK constraint
305 // In future, we might make FK constraint DEFERRABLE or handle orphaned posts differently
306 if err == nil {
307 t.Log("⚠️ Orphaned post was indexed (FK constraint not enforced)")
308 } else {
309 t.Logf("✓ Orphaned post rejected by FK constraint (expected): %v", err)
310 }
311 })
312}
313
314// TestPostCreation_E2E_LivePDS tests the COMPLETE end-to-end flow with a live PDS:
315// 1. HTTP POST to /xrpc/social.coves.community.post.create (with auth)
316// 2. Handler → Service → Write to community's PDS repository
317// 3. PDS → Jetstream firehose event
318// 4. Jetstream consumer → Index in AppView database
319// 5. Verify post appears in database with correct data
320//
321// This is a TRUE E2E test that requires:
322// - Live PDS running at PDS_URL (default: http://localhost:3001)
323// - Live Jetstream running at JETSTREAM_URL (default: ws://localhost:6008/subscribe)
324// - Test database running
325func TestPostCreation_E2E_LivePDS(t *testing.T) {
326 if testing.Short() {
327 t.Skip("Skipping live PDS E2E test in short mode")
328 }
329
330 // Setup test database
331 dbURL := os.Getenv("TEST_DATABASE_URL")
332 if dbURL == "" {
333 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
334 }
335
336 db, err := sql.Open("postgres", dbURL)
337 require.NoError(t, err, "Failed to connect to test database")
338 defer func() {
339 if closeErr := db.Close(); closeErr != nil {
340 t.Logf("Failed to close database: %v", closeErr)
341 }
342 }()
343
344 // Run migrations
345 require.NoError(t, goose.SetDialect("postgres"))
346 require.NoError(t, goose.Up(db, "../../internal/db/migrations"))
347
348 // Check if PDS is running
349 pdsURL := os.Getenv("PDS_URL")
350 if pdsURL == "" {
351 pdsURL = "http://localhost:3001"
352 }
353
354 healthResp, err := http.Get(pdsURL + "/xrpc/_health")
355 if err != nil {
356 t.Skipf("PDS not running at %s: %v", pdsURL, err)
357 }
358 _ = healthResp.Body.Close()
359
360 // Get instance credentials for authentication
361 instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE")
362 instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD")
363 if instanceHandle == "" {
364 instanceHandle = "testuser123.local.coves.dev"
365 }
366 if instancePassword == "" {
367 instancePassword = "test-password-123"
368 }
369
370 t.Logf("🔐 Authenticating with PDS as: %s", instanceHandle)
371
372 // Authenticate to get instance DID (needed for provisioner domain)
373 _, instanceDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword)
374 if err != nil {
375 t.Skipf("Failed to authenticate with PDS (may not be configured): %v", err)
376 }
377
378 t.Logf("✅ Authenticated - Instance DID: %s", instanceDID)
379
380 // Extract instance domain from DID for community provisioning
381 var instanceDomain string
382 if strings.HasPrefix(instanceDID, "did:web:") {
383 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
384 } else {
385 // Fallback for did:plc
386 instanceDomain = "coves.social"
387 }
388
389 // Setup repositories and services
390 communityRepo := postgres.NewCommunityRepository(db)
391 postRepo := postgres.NewPostRepository(db)
392
393 // Setup PDS account provisioner for community creation
394 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL)
395
396 // Setup community service with real PDS provisioner
397 communityService := communities.NewCommunityService(
398 communityRepo,
399 pdsURL,
400 instanceDID,
401 instanceDomain,
402 provisioner, // ✅ Real provisioner for creating communities on PDS
403 )
404
405 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, pdsURL) // nil aggregatorService, blobService, unfurlService for user-only tests
406
407 // Setup OAuth auth middleware for E2E testing
408 e2eAuth := NewE2EOAuthMiddleware()
409
410 // Setup HTTP handler
411 createHandler := post.NewCreateHandler(postService)
412
413 ctx := context.Background()
414
415 // Cleanup old test data
416 _, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE 'did:plc:e2etest%'")
417 _, _ = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:e2etest%'")
418 _, _ = db.Exec("DELETE FROM users WHERE did LIKE 'did:plc:e2etest%'")
419
420 // Create test user (author)
421 author := createTestUser(t, db, "e2etestauthor.bsky.social", "did:plc:e2etestauthor123")
422
423 // ====================================================================================
424 // Part 1: Write-Forward to PDS
425 // ====================================================================================
426 t.Run("1. Write-Forward to PDS", func(t *testing.T) {
427 // TRUE E2E: Actually provision a real community on PDS
428 // This tests the full flow:
429 // 1. Call com.atproto.server.createAccount on PDS
430 // 2. PDS generates DID, keys, tokens
431 // 3. Write community profile to PDS repository
432 // 4. Store credentials in AppView DB
433 // 5. Use those credentials to create a post
434
435 // Use timestamp to ensure unique community name for each test run
436 communityName := fmt.Sprintf("e2epost%d", time.Now().Unix())
437
438 t.Logf("\n📝 Provisioning test community on live PDS (name: %s)...", communityName)
439 community, err := communityService.CreateCommunity(ctx, communities.CreateCommunityRequest{
440 Name: communityName,
441 DisplayName: "E2E Test Community",
442 Description: "Test community for E2E post creation testing",
443 CreatedByDID: author.DID,
444 Visibility: "public",
445 AllowExternalDiscovery: true,
446 })
447 require.NoError(t, err, "Failed to provision community on PDS")
448 require.NotEmpty(t, community.DID, "Community should have DID from PDS")
449 require.NotEmpty(t, community.PDSAccessToken, "Community should have access token")
450 require.NotEmpty(t, community.PDSRefreshToken, "Community should have refresh token")
451
452 t.Logf("✓ Community provisioned: DID=%s, Handle=%s", community.DID, community.Handle)
453
454 // NOTE: Cleanup disabled to allow post-test inspection of indexed data
455 // Uncomment to enable cleanup after test
456 // defer func() {
457 // if err := communityRepo.Delete(ctx, community.DID); err != nil {
458 // t.Logf("Warning: Failed to cleanup test community: %v", err)
459 // }
460 // }()
461
462 // Build HTTP request for post creation
463 title := "E2E Test Post"
464 content := "This post was created via full E2E test with live PDS!"
465 reqBody := map[string]interface{}{
466 "community": community.DID,
467 "title": title,
468 "content": content,
469 }
470 reqJSON, err := json.Marshal(reqBody)
471 require.NoError(t, err)
472
473 // Create HTTP request
474 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON))
475 req.Header.Set("Content-Type", "application/json")
476
477 // Register the author user with OAuth middleware and get test token
478 // For Coves API handlers, use Bearer scheme with OAuth middleware
479 token := e2eAuth.AddUser(author.DID)
480 req.Header.Set("Authorization", "Bearer "+token)
481
482 // Execute request through auth middleware + handler
483 rr := httptest.NewRecorder()
484 handler := e2eAuth.RequireAuth(http.HandlerFunc(createHandler.HandleCreate))
485 handler.ServeHTTP(rr, req)
486
487 // Check response
488 require.Equal(t, http.StatusOK, rr.Code, "Handler should return 200 OK, body: %s", rr.Body.String())
489
490 // Parse response
491 var response posts.CreatePostResponse
492 err = json.NewDecoder(rr.Body).Decode(&response)
493 require.NoError(t, err, "Failed to parse response")
494
495 t.Logf("✅ Post created on PDS:")
496 t.Logf(" URI: %s", response.URI)
497 t.Logf(" CID: %s", response.CID)
498
499 // ====================================================================================
500 // Part 2: TRUE E2E - Real Jetstream Firehose Consumer
501 // ====================================================================================
502 // This part tests the ACTUAL production code path in main.go
503 // including the WebSocket connection and consumer logic
504 t.Run("2. Real Jetstream Firehose Consumption", func(t *testing.T) {
505 t.Logf("\n🔄 TRUE E2E: Subscribing to real Jetstream firehose...")
506
507 // Get PDS hostname for Jetstream filtering
508 pdsHostname := strings.TrimPrefix(pdsURL, "http://")
509 pdsHostname = strings.TrimPrefix(pdsHostname, "https://")
510 pdsHostname = strings.Split(pdsHostname, ":")[0] // Remove port
511
512 // Build Jetstream URL with filters for post records
513 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.community.post",
514 pdsHostname)
515
516 t.Logf(" Jetstream URL: %s", jetstreamURL)
517 t.Logf(" Looking for post URI: %s", response.URI)
518 t.Logf(" Community DID: %s", community.DID)
519
520 // Setup user service (required by post consumer)
521 userRepo := postgres.NewUserRepository(db)
522 identityConfig := identity.DefaultConfig()
523 plcURL := os.Getenv("PLC_DIRECTORY_URL")
524 if plcURL == "" {
525 plcURL = "http://localhost:3002"
526 }
527 identityConfig.PLCURL = plcURL
528 identityResolver := identity.NewResolver(db, identityConfig)
529 userService := users.NewUserService(userRepo, identityResolver, pdsURL)
530
531 // Create post consumer (same as main.go)
532 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
533
534 // Channels to receive the event
535 eventChan := make(chan *jetstream.JetstreamEvent, 10)
536 errorChan := make(chan error, 1)
537 done := make(chan bool)
538
539 // Start Jetstream WebSocket subscriber in background
540 // This creates its own WebSocket connection to Jetstream
541 go func() {
542 err := subscribeToJetstreamForPost(ctx, jetstreamURL, community.DID, postConsumer, eventChan, errorChan, done)
543 if err != nil {
544 errorChan <- err
545 }
546 }()
547
548 // Wait for event or timeout
549 t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...")
550
551 select {
552 case event := <-eventChan:
553 t.Logf("✅ Received real Jetstream event!")
554 t.Logf(" Event DID: %s", event.Did)
555 t.Logf(" Collection: %s", event.Commit.Collection)
556 t.Logf(" Operation: %s", event.Commit.Operation)
557 t.Logf(" RKey: %s", event.Commit.RKey)
558
559 // Verify it's for our community
560 assert.Equal(t, community.DID, event.Did, "Event should be from community repo")
561
562 // Verify post was indexed in AppView database
563 t.Logf("\n🔍 Querying AppView database for indexed post...")
564
565 indexedPost, err := postRepo.GetByURI(ctx, response.URI)
566 require.NoError(t, err, "Post should be indexed in AppView")
567
568 t.Logf("✅ Post indexed in AppView:")
569 t.Logf(" URI: %s", indexedPost.URI)
570 t.Logf(" CID: %s", indexedPost.CID)
571 t.Logf(" Author DID: %s", indexedPost.AuthorDID)
572 t.Logf(" Community: %s", indexedPost.CommunityDID)
573 t.Logf(" Title: %v", indexedPost.Title)
574 t.Logf(" Content: %v", indexedPost.Content)
575
576 // Verify all fields match what we sent
577 assert.Equal(t, response.URI, indexedPost.URI, "URI should match")
578 assert.Equal(t, response.CID, indexedPost.CID, "CID should match")
579 assert.Equal(t, author.DID, indexedPost.AuthorDID, "Author DID should match")
580 assert.Equal(t, community.DID, indexedPost.CommunityDID, "Community DID should match")
581 assert.Equal(t, title, *indexedPost.Title, "Title should match")
582 assert.Equal(t, content, *indexedPost.Content, "Content should match")
583
584 // Verify stats initialized correctly
585 assert.Equal(t, 0, indexedPost.UpvoteCount, "Upvote count should be 0")
586 assert.Equal(t, 0, indexedPost.DownvoteCount, "Downvote count should be 0")
587 assert.Equal(t, 0, indexedPost.Score, "Score should be 0")
588 assert.Equal(t, 0, indexedPost.CommentCount, "Comment count should be 0")
589
590 // Verify timestamps
591 assert.False(t, indexedPost.CreatedAt.IsZero(), "CreatedAt should be set")
592 assert.False(t, indexedPost.IndexedAt.IsZero(), "IndexedAt should be set")
593
594 // Signal to stop Jetstream consumer
595 close(done)
596
597 t.Log("\n✅ Part 2 Complete: TRUE E2E - PDS → Jetstream → Consumer → AppView ✓")
598
599 case err := <-errorChan:
600 t.Fatalf("❌ Jetstream error: %v", err)
601
602 case <-time.After(30 * time.Second):
603 t.Fatalf("❌ Timeout: No Jetstream event received within 30 seconds")
604 }
605 })
606 })
607}
608
609// subscribeToJetstreamForPost subscribes to real Jetstream firehose and processes post events
610// This helper creates a WebSocket connection to Jetstream and waits for post events
611func subscribeToJetstreamForPost(
612 ctx context.Context,
613 jetstreamURL string,
614 targetDID string,
615 consumer *jetstream.PostEventConsumer,
616 eventChan chan<- *jetstream.JetstreamEvent,
617 errorChan chan<- error,
618 done <-chan bool,
619) error {
620 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil)
621 if err != nil {
622 return fmt.Errorf("failed to connect to Jetstream: %w", err)
623 }
624 defer func() { _ = conn.Close() }()
625
626 // Read messages until we find our event or receive done signal
627 for {
628 select {
629 case <-done:
630 return nil
631 case <-ctx.Done():
632 return ctx.Err()
633 default:
634 // Set read deadline to avoid blocking forever
635 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
636 return fmt.Errorf("failed to set read deadline: %w", err)
637 }
638
639 var event jetstream.JetstreamEvent
640 err := conn.ReadJSON(&event)
641 if err != nil {
642 // Check if it's a timeout (expected)
643 if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
644 return nil
645 }
646 if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
647 continue // Timeout is expected, keep listening
648 }
649 // For other errors, don't retry reading from a broken connection
650 return fmt.Errorf("failed to read Jetstream message: %w", err)
651 }
652
653 // Check if this is a post event for the target DID
654 if event.Did == targetDID && event.Kind == "commit" &&
655 event.Commit != nil && event.Commit.Collection == "social.coves.community.post" {
656 // Process the event through the consumer
657 if err := consumer.HandleEvent(ctx, &event); err != nil {
658 return fmt.Errorf("failed to process event: %w", err)
659 }
660
661 // Send to channel so test can verify
662 select {
663 case eventChan <- &event:
664 return nil
665 case <-time.After(1 * time.Second):
666 return fmt.Errorf("timeout sending event to channel")
667 }
668 }
669 }
670 }
671}