A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/api/handlers/post"
5 "Coves/internal/api/middleware"
6 "Coves/internal/atproto/identity"
7 "Coves/internal/atproto/jetstream"
8 "Coves/internal/core/communities"
9 "Coves/internal/core/posts"
10 "Coves/internal/core/users"
11 "Coves/internal/db/postgres"
12 "bytes"
13 "context"
14 "database/sql"
15 "encoding/json"
16 "fmt"
17 "net"
18 "net/http"
19 "net/http/httptest"
20 "os"
21 "strings"
22 "testing"
23 "time"
24
25 "github.com/gorilla/websocket"
26 _ "github.com/lib/pq"
27 "github.com/pressly/goose/v3"
28 "github.com/stretchr/testify/assert"
29 "github.com/stretchr/testify/require"
30)
31
32// TestPostCreation_E2E_WithJetstream tests the full post creation flow:
33// XRPC endpoint → AppView Service → PDS write → Jetstream consumer → DB indexing
34//
35// This is a TRUE E2E test that simulates what happens in production:
36// 1. Client calls POST /xrpc/social.coves.community.post.create with auth token
37// 2. Handler validates and calls PostService.CreatePost()
38// 3. Service writes post to community's PDS repository
39// 4. PDS broadcasts event to firehose/Jetstream
40// 5. Jetstream consumer receives event and indexes post in AppView DB
41// 6. Post is now queryable from AppView
42//
43// NOTE: This test simulates the Jetstream event (step 4-5) since we don't have
44// a live PDS/Jetstream in test environment. For true live testing, use TestPostCreation_E2E_LivePDS.
45func TestPostCreation_E2E_WithJetstream(t *testing.T) {
46 db := setupTestDB(t)
47 defer func() {
48 if err := db.Close(); err != nil {
49 t.Logf("Failed to close database: %v", err)
50 }
51 }()
52
53 // Cleanup old test data first
54 _, _ = db.Exec("DELETE FROM posts WHERE community_did = 'did:plc:gaming123'")
55 _, _ = db.Exec("DELETE FROM communities WHERE did = 'did:plc:gaming123'")
56 _, _ = db.Exec("DELETE FROM users WHERE did = 'did:plc:alice123'")
57
58 // Setup repositories
59 userRepo := postgres.NewUserRepository(db)
60 communityRepo := postgres.NewCommunityRepository(db)
61 postRepo := postgres.NewPostRepository(db)
62
63 // Setup user service for post consumer
64 identityConfig := identity.DefaultConfig()
65 identityResolver := identity.NewResolver(db, identityConfig)
66 userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001")
67
68 // Create test user (author)
69 author := createTestUser(t, db, "alice.test", "did:plc:alice123")
70
71 // Create test community with fake PDS credentials
72 // In real E2E, this would be a real community provisioned on PDS
73 community := &communities.Community{
74 DID: "did:plc:gaming123",
75 Handle: "gaming.community.test.coves.social",
76 Name: "gaming",
77 DisplayName: "Gaming Community",
78 OwnerDID: "did:plc:gaming123",
79 CreatedByDID: author.DID,
80 HostedByDID: "did:web:coves.test",
81 Visibility: "public",
82 ModerationType: "moderator",
83 RecordURI: "at://did:plc:gaming123/social.coves.community.profile/self",
84 RecordCID: "fakecid123",
85 PDSAccessToken: "fake_token_for_testing",
86 PDSRefreshToken: "fake_refresh_token",
87 }
88 _, err := communityRepo.Create(context.Background(), community)
89 if err != nil {
90 t.Fatalf("Failed to create test community: %v", err)
91 }
92
93 t.Run("Full E2E flow - XRPC to DB via Jetstream", func(t *testing.T) {
94 ctx := context.Background()
95
96 // STEP 1: Simulate what the XRPC handler would receive
97 // In real flow, this comes from client with OAuth bearer token
98 title := "My First Post"
99 content := "This is a test post!"
100 postReq := posts.CreatePostRequest{
101 Title: &title,
102 Content: &content,
103 // Community and AuthorDID set by handler from request context
104 }
105
106 // STEP 2: Simulate Jetstream consumer receiving the post CREATE event
107 // In real production, this event comes from PDS via Jetstream WebSocket
108 // For this test, we simulate the event that would be broadcast after PDS write
109
110 // Generate a realistic rkey (TID - timestamp identifier)
111 rkey := generateTID()
112
113 // Build the post record as it would appear in Jetstream
114 jetstreamEvent := jetstream.JetstreamEvent{
115 Did: community.DID, // Repo owner (community)
116 Kind: "commit",
117 Commit: &jetstream.CommitEvent{
118 Operation: "create",
119 Collection: "social.coves.community.post",
120 RKey: rkey,
121 CID: "bafy2bzaceabc123def456", // Fake CID
122 Record: map[string]interface{}{
123 "$type": "social.coves.community.post",
124 "community": community.DID,
125 "author": author.DID,
126 "title": *postReq.Title,
127 "content": *postReq.Content,
128 "createdAt": time.Now().Format(time.RFC3339),
129 },
130 },
131 }
132
133 // STEP 3: Process event through Jetstream consumer
134 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
135 err := consumer.HandleEvent(ctx, &jetstreamEvent)
136 if err != nil {
137 t.Fatalf("Jetstream consumer failed to process event: %v", err)
138 }
139
140 // STEP 4: Verify post was indexed in AppView database
141 expectedURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", community.DID, rkey)
142 indexedPost, err := postRepo.GetByURI(ctx, expectedURI)
143 if err != nil {
144 t.Fatalf("Post not indexed in AppView: %v", err)
145 }
146
147 // STEP 5: Verify all fields are correct
148 if indexedPost.URI != expectedURI {
149 t.Errorf("Expected URI %s, got %s", expectedURI, indexedPost.URI)
150 }
151 if indexedPost.AuthorDID != author.DID {
152 t.Errorf("Expected author %s, got %s", author.DID, indexedPost.AuthorDID)
153 }
154 if indexedPost.CommunityDID != community.DID {
155 t.Errorf("Expected community %s, got %s", community.DID, indexedPost.CommunityDID)
156 }
157 if indexedPost.Title == nil || *indexedPost.Title != title {
158 t.Errorf("Expected title '%s', got %v", title, indexedPost.Title)
159 }
160 if indexedPost.Content == nil || *indexedPost.Content != content {
161 t.Errorf("Expected content '%s', got %v", content, indexedPost.Content)
162 }
163
164 // Verify stats initialized correctly
165 if indexedPost.UpvoteCount != 0 {
166 t.Errorf("Expected upvote_count 0, got %d", indexedPost.UpvoteCount)
167 }
168 if indexedPost.DownvoteCount != 0 {
169 t.Errorf("Expected downvote_count 0, got %d", indexedPost.DownvoteCount)
170 }
171 if indexedPost.Score != 0 {
172 t.Errorf("Expected score 0, got %d", indexedPost.Score)
173 }
174
175 t.Logf("✓ E2E test passed! Post indexed with URI: %s", indexedPost.URI)
176 })
177
178 t.Run("Consumer validates repository ownership (security)", func(t *testing.T) {
179 ctx := context.Background()
180
181 // SECURITY TEST: Try to create a post that claims to be from the community
182 // but actually comes from a user's repository
183 // This should be REJECTED by the consumer
184
185 maliciousEvent := jetstream.JetstreamEvent{
186 Did: author.DID, // Event from user's repo (NOT community repo)
187 Kind: "commit",
188 Commit: &jetstream.CommitEvent{
189 Operation: "create",
190 Collection: "social.coves.community.post",
191 RKey: generateTID(),
192 CID: "bafy2bzacefake",
193 Record: map[string]interface{}{
194 "$type": "social.coves.community.post",
195 "community": community.DID, // Claims to be for this community
196 "author": author.DID,
197 "title": "Fake Post",
198 "content": "This is a malicious post attempt",
199 "createdAt": time.Now().Format(time.RFC3339),
200 },
201 },
202 }
203
204 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
205 err := consumer.HandleEvent(ctx, &maliciousEvent)
206
207 // Should get security error
208 if err == nil {
209 t.Fatal("Expected security error for post from wrong repository, got nil")
210 }
211
212 if !contains(err.Error(), "repository DID") || !contains(err.Error(), "doesn't match") {
213 t.Errorf("Expected repository mismatch error, got: %v", err)
214 }
215
216 t.Logf("✓ Security validation passed: %v", err)
217 })
218
219 t.Run("Idempotent indexing - duplicate events", func(t *testing.T) {
220 ctx := context.Background()
221
222 // Simulate the same Jetstream event arriving twice
223 // This can happen during Jetstream replays or network retries
224 rkey := generateTID()
225 event := jetstream.JetstreamEvent{
226 Did: community.DID,
227 Kind: "commit",
228 Commit: &jetstream.CommitEvent{
229 Operation: "create",
230 Collection: "social.coves.community.post",
231 RKey: rkey,
232 CID: "bafy2bzaceidempotent",
233 Record: map[string]interface{}{
234 "$type": "social.coves.community.post",
235 "community": community.DID,
236 "author": author.DID,
237 "title": "Duplicate Test",
238 "content": "Testing idempotency",
239 "createdAt": time.Now().Format(time.RFC3339),
240 },
241 },
242 }
243
244 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
245
246 // First event - should succeed
247 err := consumer.HandleEvent(ctx, &event)
248 if err != nil {
249 t.Fatalf("First event failed: %v", err)
250 }
251
252 // Second event (duplicate) - should be handled gracefully
253 err = consumer.HandleEvent(ctx, &event)
254 if err != nil {
255 t.Fatalf("Duplicate event should be handled gracefully, got error: %v", err)
256 }
257
258 // Verify only one post in database
259 uri := fmt.Sprintf("at://%s/social.coves.community.post/%s", community.DID, rkey)
260 post, err := postRepo.GetByURI(ctx, uri)
261 if err != nil {
262 t.Fatalf("Post not found: %v", err)
263 }
264
265 if post.URI != uri {
266 t.Error("Post URI mismatch - possible duplicate indexing")
267 }
268
269 t.Logf("✓ Idempotency test passed")
270 })
271
272 t.Run("Handles orphaned posts (unknown community)", func(t *testing.T) {
273 ctx := context.Background()
274
275 // Post references a community that doesn't exist in AppView yet
276 // This can happen if Jetstream delivers post event before community profile event
277 unknownCommunityDID := "did:plc:unknown999"
278
279 event := jetstream.JetstreamEvent{
280 Did: unknownCommunityDID,
281 Kind: "commit",
282 Commit: &jetstream.CommitEvent{
283 Operation: "create",
284 Collection: "social.coves.community.post",
285 RKey: generateTID(),
286 CID: "bafy2bzaceorphaned",
287 Record: map[string]interface{}{
288 "$type": "social.coves.community.post",
289 "community": unknownCommunityDID,
290 "author": author.DID,
291 "title": "Orphaned Post",
292 "content": "Community not indexed yet",
293 "createdAt": time.Now().Format(time.RFC3339),
294 },
295 },
296 }
297
298 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
299
300 // Should log warning but NOT fail (eventual consistency)
301 // Note: This will fail due to foreign key constraint in current schema
302 // In production, you might want to handle this differently (defer indexing, etc.)
303 err := consumer.HandleEvent(ctx, &event)
304
305 // For now, we expect this to fail due to FK constraint
306 // In future, we might make FK constraint DEFERRABLE or handle orphaned posts differently
307 if err == nil {
308 t.Log("⚠️ Orphaned post was indexed (FK constraint not enforced)")
309 } else {
310 t.Logf("✓ Orphaned post rejected by FK constraint (expected): %v", err)
311 }
312 })
313}
314
315// TestPostCreation_E2E_LivePDS tests the COMPLETE end-to-end flow with a live PDS:
316// 1. HTTP POST to /xrpc/social.coves.community.post.create (with auth)
317// 2. Handler → Service → Write to community's PDS repository
318// 3. PDS → Jetstream firehose event
319// 4. Jetstream consumer → Index in AppView database
320// 5. Verify post appears in database with correct data
321//
322// This is a TRUE E2E test that requires:
323// - Live PDS running at PDS_URL (default: http://localhost:3001)
324// - Live Jetstream running at JETSTREAM_URL (default: ws://localhost:6008/subscribe)
325// - Test database running
326func TestPostCreation_E2E_LivePDS(t *testing.T) {
327 if testing.Short() {
328 t.Skip("Skipping live PDS E2E test in short mode")
329 }
330
331 // Setup test database
332 dbURL := os.Getenv("TEST_DATABASE_URL")
333 if dbURL == "" {
334 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
335 }
336
337 db, err := sql.Open("postgres", dbURL)
338 require.NoError(t, err, "Failed to connect to test database")
339 defer func() {
340 if closeErr := db.Close(); closeErr != nil {
341 t.Logf("Failed to close database: %v", closeErr)
342 }
343 }()
344
345 // Run migrations
346 require.NoError(t, goose.SetDialect("postgres"))
347 require.NoError(t, goose.Up(db, "../../internal/db/migrations"))
348
349 // Check if PDS is running
350 pdsURL := os.Getenv("PDS_URL")
351 if pdsURL == "" {
352 pdsURL = "http://localhost:3001"
353 }
354
355 healthResp, err := http.Get(pdsURL + "/xrpc/_health")
356 if err != nil {
357 t.Skipf("PDS not running at %s: %v", pdsURL, err)
358 }
359 _ = healthResp.Body.Close()
360
361 // Get instance credentials for authentication
362 instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE")
363 instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD")
364 if instanceHandle == "" {
365 instanceHandle = "testuser123.local.coves.dev"
366 }
367 if instancePassword == "" {
368 instancePassword = "test-password-123"
369 }
370
371 t.Logf("🔐 Authenticating with PDS as: %s", instanceHandle)
372
373 // Authenticate to get instance DID (needed for provisioner domain)
374 _, instanceDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword)
375 if err != nil {
376 t.Skipf("Failed to authenticate with PDS (may not be configured): %v", err)
377 }
378
379 t.Logf("✅ Authenticated - Instance DID: %s", instanceDID)
380
381 // Extract instance domain from DID for community provisioning
382 var instanceDomain string
383 if strings.HasPrefix(instanceDID, "did:web:") {
384 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
385 } else {
386 // Fallback for did:plc
387 instanceDomain = "coves.social"
388 }
389
390 // Setup repositories and services
391 communityRepo := postgres.NewCommunityRepository(db)
392 postRepo := postgres.NewPostRepository(db)
393
394 // Setup PDS account provisioner for community creation
395 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL)
396
397 // Setup community service with real PDS provisioner
398 communityService := communities.NewCommunityService(
399 communityRepo,
400 pdsURL,
401 instanceDID,
402 instanceDomain,
403 provisioner, // ✅ Real provisioner for creating communities on PDS
404 )
405
406 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, pdsURL) // nil aggregatorService, blobService, unfurlService for user-only tests
407
408 // Setup auth middleware (skip JWT verification for testing)
409 authMiddleware := middleware.NewAtProtoAuthMiddleware(nil, true)
410 defer authMiddleware.Stop() // Clean up DPoP replay cache goroutine
411
412 // Setup HTTP handler
413 createHandler := post.NewCreateHandler(postService)
414
415 ctx := context.Background()
416
417 // Cleanup old test data
418 _, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE 'did:plc:e2etest%'")
419 _, _ = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:e2etest%'")
420 _, _ = db.Exec("DELETE FROM users WHERE did LIKE 'did:plc:e2etest%'")
421
422 // Create test user (author)
423 author := createTestUser(t, db, "e2etestauthor.bsky.social", "did:plc:e2etestauthor123")
424
425 // ====================================================================================
426 // Part 1: Write-Forward to PDS
427 // ====================================================================================
428 t.Run("1. Write-Forward to PDS", func(t *testing.T) {
429 // TRUE E2E: Actually provision a real community on PDS
430 // This tests the full flow:
431 // 1. Call com.atproto.server.createAccount on PDS
432 // 2. PDS generates DID, keys, tokens
433 // 3. Write community profile to PDS repository
434 // 4. Store credentials in AppView DB
435 // 5. Use those credentials to create a post
436
437 // Use timestamp to ensure unique community name for each test run
438 communityName := fmt.Sprintf("e2epost%d", time.Now().Unix())
439
440 t.Logf("\n📝 Provisioning test community on live PDS (name: %s)...", communityName)
441 community, err := communityService.CreateCommunity(ctx, communities.CreateCommunityRequest{
442 Name: communityName,
443 DisplayName: "E2E Test Community",
444 Description: "Test community for E2E post creation testing",
445 CreatedByDID: author.DID,
446 Visibility: "public",
447 AllowExternalDiscovery: true,
448 })
449 require.NoError(t, err, "Failed to provision community on PDS")
450 require.NotEmpty(t, community.DID, "Community should have DID from PDS")
451 require.NotEmpty(t, community.PDSAccessToken, "Community should have access token")
452 require.NotEmpty(t, community.PDSRefreshToken, "Community should have refresh token")
453
454 t.Logf("✓ Community provisioned: DID=%s, Handle=%s", community.DID, community.Handle)
455
456 // NOTE: Cleanup disabled to allow post-test inspection of indexed data
457 // Uncomment to enable cleanup after test
458 // defer func() {
459 // if err := communityRepo.Delete(ctx, community.DID); err != nil {
460 // t.Logf("Warning: Failed to cleanup test community: %v", err)
461 // }
462 // }()
463
464 // Build HTTP request for post creation
465 title := "E2E Test Post"
466 content := "This post was created via full E2E test with live PDS!"
467 reqBody := map[string]interface{}{
468 "community": community.DID,
469 "title": title,
470 "content": content,
471 }
472 reqJSON, err := json.Marshal(reqBody)
473 require.NoError(t, err)
474
475 // Create HTTP request
476 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON))
477 req.Header.Set("Content-Type", "application/json")
478
479 // Create a simple JWT for testing (Phase 1: no signature verification)
480 // In production, this would be a real OAuth token from PDS
481 testJWT := createSimpleTestJWT(author.DID)
482 req.Header.Set("Authorization", "DPoP "+testJWT)
483
484 // Execute request through auth middleware + handler
485 rr := httptest.NewRecorder()
486 handler := authMiddleware.RequireAuth(http.HandlerFunc(createHandler.HandleCreate))
487 handler.ServeHTTP(rr, req)
488
489 // Check response
490 require.Equal(t, http.StatusOK, rr.Code, "Handler should return 200 OK, body: %s", rr.Body.String())
491
492 // Parse response
493 var response posts.CreatePostResponse
494 err = json.NewDecoder(rr.Body).Decode(&response)
495 require.NoError(t, err, "Failed to parse response")
496
497 t.Logf("✅ Post created on PDS:")
498 t.Logf(" URI: %s", response.URI)
499 t.Logf(" CID: %s", response.CID)
500
501 // ====================================================================================
502 // Part 2: TRUE E2E - Real Jetstream Firehose Consumer
503 // ====================================================================================
504 // This part tests the ACTUAL production code path in main.go
505 // including the WebSocket connection and consumer logic
506 t.Run("2. Real Jetstream Firehose Consumption", func(t *testing.T) {
507 t.Logf("\n🔄 TRUE E2E: Subscribing to real Jetstream firehose...")
508
509 // Get PDS hostname for Jetstream filtering
510 pdsHostname := strings.TrimPrefix(pdsURL, "http://")
511 pdsHostname = strings.TrimPrefix(pdsHostname, "https://")
512 pdsHostname = strings.Split(pdsHostname, ":")[0] // Remove port
513
514 // Build Jetstream URL with filters for post records
515 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.community.post",
516 pdsHostname)
517
518 t.Logf(" Jetstream URL: %s", jetstreamURL)
519 t.Logf(" Looking for post URI: %s", response.URI)
520 t.Logf(" Community DID: %s", community.DID)
521
522 // Setup user service (required by post consumer)
523 userRepo := postgres.NewUserRepository(db)
524 identityConfig := identity.DefaultConfig()
525 plcURL := os.Getenv("PLC_DIRECTORY_URL")
526 if plcURL == "" {
527 plcURL = "http://localhost:3002"
528 }
529 identityConfig.PLCURL = plcURL
530 identityResolver := identity.NewResolver(db, identityConfig)
531 userService := users.NewUserService(userRepo, identityResolver, pdsURL)
532
533 // Create post consumer (same as main.go)
534 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
535
536 // Channels to receive the event
537 eventChan := make(chan *jetstream.JetstreamEvent, 10)
538 errorChan := make(chan error, 1)
539 done := make(chan bool)
540
541 // Start Jetstream WebSocket subscriber in background
542 // This creates its own WebSocket connection to Jetstream
543 go func() {
544 err := subscribeToJetstreamForPost(ctx, jetstreamURL, community.DID, postConsumer, eventChan, errorChan, done)
545 if err != nil {
546 errorChan <- err
547 }
548 }()
549
550 // Wait for event or timeout
551 t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...")
552
553 select {
554 case event := <-eventChan:
555 t.Logf("✅ Received real Jetstream event!")
556 t.Logf(" Event DID: %s", event.Did)
557 t.Logf(" Collection: %s", event.Commit.Collection)
558 t.Logf(" Operation: %s", event.Commit.Operation)
559 t.Logf(" RKey: %s", event.Commit.RKey)
560
561 // Verify it's for our community
562 assert.Equal(t, community.DID, event.Did, "Event should be from community repo")
563
564 // Verify post was indexed in AppView database
565 t.Logf("\n🔍 Querying AppView database for indexed post...")
566
567 indexedPost, err := postRepo.GetByURI(ctx, response.URI)
568 require.NoError(t, err, "Post should be indexed in AppView")
569
570 t.Logf("✅ Post indexed in AppView:")
571 t.Logf(" URI: %s", indexedPost.URI)
572 t.Logf(" CID: %s", indexedPost.CID)
573 t.Logf(" Author DID: %s", indexedPost.AuthorDID)
574 t.Logf(" Community: %s", indexedPost.CommunityDID)
575 t.Logf(" Title: %v", indexedPost.Title)
576 t.Logf(" Content: %v", indexedPost.Content)
577
578 // Verify all fields match what we sent
579 assert.Equal(t, response.URI, indexedPost.URI, "URI should match")
580 assert.Equal(t, response.CID, indexedPost.CID, "CID should match")
581 assert.Equal(t, author.DID, indexedPost.AuthorDID, "Author DID should match")
582 assert.Equal(t, community.DID, indexedPost.CommunityDID, "Community DID should match")
583 assert.Equal(t, title, *indexedPost.Title, "Title should match")
584 assert.Equal(t, content, *indexedPost.Content, "Content should match")
585
586 // Verify stats initialized correctly
587 assert.Equal(t, 0, indexedPost.UpvoteCount, "Upvote count should be 0")
588 assert.Equal(t, 0, indexedPost.DownvoteCount, "Downvote count should be 0")
589 assert.Equal(t, 0, indexedPost.Score, "Score should be 0")
590 assert.Equal(t, 0, indexedPost.CommentCount, "Comment count should be 0")
591
592 // Verify timestamps
593 assert.False(t, indexedPost.CreatedAt.IsZero(), "CreatedAt should be set")
594 assert.False(t, indexedPost.IndexedAt.IsZero(), "IndexedAt should be set")
595
596 // Signal to stop Jetstream consumer
597 close(done)
598
599 t.Log("\n✅ Part 2 Complete: TRUE E2E - PDS → Jetstream → Consumer → AppView ✓")
600
601 case err := <-errorChan:
602 t.Fatalf("❌ Jetstream error: %v", err)
603
604 case <-time.After(30 * time.Second):
605 t.Fatalf("❌ Timeout: No Jetstream event received within 30 seconds")
606 }
607 })
608 })
609}
610
611// subscribeToJetstreamForPost subscribes to real Jetstream firehose and processes post events
612// This helper creates a WebSocket connection to Jetstream and waits for post events
613func subscribeToJetstreamForPost(
614 ctx context.Context,
615 jetstreamURL string,
616 targetDID string,
617 consumer *jetstream.PostEventConsumer,
618 eventChan chan<- *jetstream.JetstreamEvent,
619 errorChan chan<- error,
620 done <-chan bool,
621) error {
622 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil)
623 if err != nil {
624 return fmt.Errorf("failed to connect to Jetstream: %w", err)
625 }
626 defer func() { _ = conn.Close() }()
627
628 // Read messages until we find our event or receive done signal
629 for {
630 select {
631 case <-done:
632 return nil
633 case <-ctx.Done():
634 return ctx.Err()
635 default:
636 // Set read deadline to avoid blocking forever
637 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
638 return fmt.Errorf("failed to set read deadline: %w", err)
639 }
640
641 var event jetstream.JetstreamEvent
642 err := conn.ReadJSON(&event)
643 if err != nil {
644 // Check if it's a timeout (expected)
645 if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
646 return nil
647 }
648 if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
649 continue // Timeout is expected, keep listening
650 }
651 // For other errors, don't retry reading from a broken connection
652 return fmt.Errorf("failed to read Jetstream message: %w", err)
653 }
654
655 // Check if this is a post event for the target DID
656 if event.Did == targetDID && event.Kind == "commit" &&
657 event.Commit != nil && event.Commit.Collection == "social.coves.community.post" {
658 // Process the event through the consumer
659 if err := consumer.HandleEvent(ctx, &event); err != nil {
660 return fmt.Errorf("failed to process event: %w", err)
661 }
662
663 // Send to channel so test can verify
664 select {
665 case eventChan <- &event:
666 return nil
667 case <-time.After(1 * time.Second):
668 return fmt.Errorf("timeout sending event to channel")
669 }
670 }
671 }
672 }
673}