A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/api/handlers/post"
5 "Coves/internal/api/middleware"
6 "Coves/internal/atproto/auth"
7 "Coves/internal/atproto/identity"
8 "Coves/internal/atproto/jetstream"
9 "Coves/internal/core/communities"
10 "Coves/internal/core/posts"
11 "Coves/internal/core/users"
12 "Coves/internal/db/postgres"
13 "bytes"
14 "context"
15 "database/sql"
16 "encoding/base64"
17 "encoding/json"
18 "fmt"
19 "net"
20 "net/http"
21 "net/http/httptest"
22 "os"
23 "strings"
24 "testing"
25 "time"
26
27 "github.com/golang-jwt/jwt/v5"
28 "github.com/gorilla/websocket"
29 _ "github.com/lib/pq"
30 "github.com/pressly/goose/v3"
31 "github.com/stretchr/testify/assert"
32 "github.com/stretchr/testify/require"
33)
34
35// TestPostCreation_E2E_WithJetstream tests the full post creation flow:
36// XRPC endpoint → AppView Service → PDS write → Jetstream consumer → DB indexing
37//
38// This is a TRUE E2E test that simulates what happens in production:
39// 1. Client calls POST /xrpc/social.coves.post.create with auth token
40// 2. Handler validates and calls PostService.CreatePost()
41// 3. Service writes post to community's PDS repository
42// 4. PDS broadcasts event to firehose/Jetstream
43// 5. Jetstream consumer receives event and indexes post in AppView DB
44// 6. Post is now queryable from AppView
45//
46// NOTE: This test simulates the Jetstream event (step 4-5) since we don't have
47// a live PDS/Jetstream in test environment. For true live testing, use TestPostCreation_E2E_LivePDS.
48func TestPostCreation_E2E_WithJetstream(t *testing.T) {
49 db := setupTestDB(t)
50 defer func() {
51 if err := db.Close(); err != nil {
52 t.Logf("Failed to close database: %v", err)
53 }
54 }()
55
56 // Cleanup old test data first
57 _, _ = db.Exec("DELETE FROM posts WHERE community_did = 'did:plc:gaming123'")
58 _, _ = db.Exec("DELETE FROM communities WHERE did = 'did:plc:gaming123'")
59 _, _ = db.Exec("DELETE FROM users WHERE did = 'did:plc:alice123'")
60
61 // Setup repositories
62 userRepo := postgres.NewUserRepository(db)
63 communityRepo := postgres.NewCommunityRepository(db)
64 postRepo := postgres.NewPostRepository(db)
65
66 // Setup user service for post consumer
67 identityConfig := identity.DefaultConfig()
68 identityResolver := identity.NewResolver(db, identityConfig)
69 userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001")
70
71 // Create test user (author)
72 author := createTestUser(t, db, "alice.test", "did:plc:alice123")
73
74 // Create test community with fake PDS credentials
75 // In real E2E, this would be a real community provisioned on PDS
76 community := &communities.Community{
77 DID: "did:plc:gaming123",
78 Handle: "gaming.community.test.coves.social",
79 Name: "gaming",
80 DisplayName: "Gaming Community",
81 OwnerDID: "did:plc:gaming123",
82 CreatedByDID: author.DID,
83 HostedByDID: "did:web:coves.test",
84 Visibility: "public",
85 ModerationType: "moderator",
86 RecordURI: "at://did:plc:gaming123/social.coves.community.profile/self",
87 RecordCID: "fakecid123",
88 PDSAccessToken: "fake_token_for_testing",
89 PDSRefreshToken: "fake_refresh_token",
90 }
91 _, err := communityRepo.Create(context.Background(), community)
92 if err != nil {
93 t.Fatalf("Failed to create test community: %v", err)
94 }
95
96 t.Run("Full E2E flow - XRPC to DB via Jetstream", func(t *testing.T) {
97 ctx := context.Background()
98
99 // STEP 1: Simulate what the XRPC handler would receive
100 // In real flow, this comes from client with OAuth bearer token
101 title := "My First Post"
102 content := "This is a test post!"
103 postReq := posts.CreatePostRequest{
104 Title: &title,
105 Content: &content,
106 // Community and AuthorDID set by handler from request context
107 }
108
109 // STEP 2: Simulate Jetstream consumer receiving the post CREATE event
110 // In real production, this event comes from PDS via Jetstream WebSocket
111 // For this test, we simulate the event that would be broadcast after PDS write
112
113 // Generate a realistic rkey (TID - timestamp identifier)
114 rkey := generateTID()
115
116 // Build the post record as it would appear in Jetstream
117 jetstreamEvent := jetstream.JetstreamEvent{
118 Did: community.DID, // Repo owner (community)
119 Kind: "commit",
120 Commit: &jetstream.CommitEvent{
121 Operation: "create",
122 Collection: "social.coves.post.record",
123 RKey: rkey,
124 CID: "bafy2bzaceabc123def456", // Fake CID
125 Record: map[string]interface{}{
126 "$type": "social.coves.post.record",
127 "community": community.DID,
128 "author": author.DID,
129 "title": *postReq.Title,
130 "content": *postReq.Content,
131 "createdAt": time.Now().Format(time.RFC3339),
132 },
133 },
134 }
135
136 // STEP 3: Process event through Jetstream consumer
137 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
138 err := consumer.HandleEvent(ctx, &jetstreamEvent)
139 if err != nil {
140 t.Fatalf("Jetstream consumer failed to process event: %v", err)
141 }
142
143 // STEP 4: Verify post was indexed in AppView database
144 expectedURI := fmt.Sprintf("at://%s/social.coves.post.record/%s", community.DID, rkey)
145 indexedPost, err := postRepo.GetByURI(ctx, expectedURI)
146 if err != nil {
147 t.Fatalf("Post not indexed in AppView: %v", err)
148 }
149
150 // STEP 5: Verify all fields are correct
151 if indexedPost.URI != expectedURI {
152 t.Errorf("Expected URI %s, got %s", expectedURI, indexedPost.URI)
153 }
154 if indexedPost.AuthorDID != author.DID {
155 t.Errorf("Expected author %s, got %s", author.DID, indexedPost.AuthorDID)
156 }
157 if indexedPost.CommunityDID != community.DID {
158 t.Errorf("Expected community %s, got %s", community.DID, indexedPost.CommunityDID)
159 }
160 if indexedPost.Title == nil || *indexedPost.Title != title {
161 t.Errorf("Expected title '%s', got %v", title, indexedPost.Title)
162 }
163 if indexedPost.Content == nil || *indexedPost.Content != content {
164 t.Errorf("Expected content '%s', got %v", content, indexedPost.Content)
165 }
166
167 // Verify stats initialized correctly
168 if indexedPost.UpvoteCount != 0 {
169 t.Errorf("Expected upvote_count 0, got %d", indexedPost.UpvoteCount)
170 }
171 if indexedPost.DownvoteCount != 0 {
172 t.Errorf("Expected downvote_count 0, got %d", indexedPost.DownvoteCount)
173 }
174 if indexedPost.Score != 0 {
175 t.Errorf("Expected score 0, got %d", indexedPost.Score)
176 }
177
178 t.Logf("✓ E2E test passed! Post indexed with URI: %s", indexedPost.URI)
179 })
180
181 t.Run("Consumer validates repository ownership (security)", func(t *testing.T) {
182 ctx := context.Background()
183
184 // SECURITY TEST: Try to create a post that claims to be from the community
185 // but actually comes from a user's repository
186 // This should be REJECTED by the consumer
187
188 maliciousEvent := jetstream.JetstreamEvent{
189 Did: author.DID, // Event from user's repo (NOT community repo)
190 Kind: "commit",
191 Commit: &jetstream.CommitEvent{
192 Operation: "create",
193 Collection: "social.coves.post.record",
194 RKey: generateTID(),
195 CID: "bafy2bzacefake",
196 Record: map[string]interface{}{
197 "$type": "social.coves.post.record",
198 "community": community.DID, // Claims to be for this community
199 "author": author.DID,
200 "title": "Fake Post",
201 "content": "This is a malicious post attempt",
202 "createdAt": time.Now().Format(time.RFC3339),
203 },
204 },
205 }
206
207 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
208 err := consumer.HandleEvent(ctx, &maliciousEvent)
209
210 // Should get security error
211 if err == nil {
212 t.Fatal("Expected security error for post from wrong repository, got nil")
213 }
214
215 if !contains(err.Error(), "repository DID") || !contains(err.Error(), "doesn't match") {
216 t.Errorf("Expected repository mismatch error, got: %v", err)
217 }
218
219 t.Logf("✓ Security validation passed: %v", err)
220 })
221
222 t.Run("Idempotent indexing - duplicate events", func(t *testing.T) {
223 ctx := context.Background()
224
225 // Simulate the same Jetstream event arriving twice
226 // This can happen during Jetstream replays or network retries
227 rkey := generateTID()
228 event := jetstream.JetstreamEvent{
229 Did: community.DID,
230 Kind: "commit",
231 Commit: &jetstream.CommitEvent{
232 Operation: "create",
233 Collection: "social.coves.post.record",
234 RKey: rkey,
235 CID: "bafy2bzaceidempotent",
236 Record: map[string]interface{}{
237 "$type": "social.coves.post.record",
238 "community": community.DID,
239 "author": author.DID,
240 "title": "Duplicate Test",
241 "content": "Testing idempotency",
242 "createdAt": time.Now().Format(time.RFC3339),
243 },
244 },
245 }
246
247 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
248
249 // First event - should succeed
250 err := consumer.HandleEvent(ctx, &event)
251 if err != nil {
252 t.Fatalf("First event failed: %v", err)
253 }
254
255 // Second event (duplicate) - should be handled gracefully
256 err = consumer.HandleEvent(ctx, &event)
257 if err != nil {
258 t.Fatalf("Duplicate event should be handled gracefully, got error: %v", err)
259 }
260
261 // Verify only one post in database
262 uri := fmt.Sprintf("at://%s/social.coves.post.record/%s", community.DID, rkey)
263 post, err := postRepo.GetByURI(ctx, uri)
264 if err != nil {
265 t.Fatalf("Post not found: %v", err)
266 }
267
268 if post.URI != uri {
269 t.Error("Post URI mismatch - possible duplicate indexing")
270 }
271
272 t.Logf("✓ Idempotency test passed")
273 })
274
275 t.Run("Handles orphaned posts (unknown community)", func(t *testing.T) {
276 ctx := context.Background()
277
278 // Post references a community that doesn't exist in AppView yet
279 // This can happen if Jetstream delivers post event before community profile event
280 unknownCommunityDID := "did:plc:unknown999"
281
282 event := jetstream.JetstreamEvent{
283 Did: unknownCommunityDID,
284 Kind: "commit",
285 Commit: &jetstream.CommitEvent{
286 Operation: "create",
287 Collection: "social.coves.post.record",
288 RKey: generateTID(),
289 CID: "bafy2bzaceorphaned",
290 Record: map[string]interface{}{
291 "$type": "social.coves.post.record",
292 "community": unknownCommunityDID,
293 "author": author.DID,
294 "title": "Orphaned Post",
295 "content": "Community not indexed yet",
296 "createdAt": time.Now().Format(time.RFC3339),
297 },
298 },
299 }
300
301 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
302
303 // Should log warning but NOT fail (eventual consistency)
304 // Note: This will fail due to foreign key constraint in current schema
305 // In production, you might want to handle this differently (defer indexing, etc.)
306 err := consumer.HandleEvent(ctx, &event)
307
308 // For now, we expect this to fail due to FK constraint
309 // In future, we might make FK constraint DEFERRABLE or handle orphaned posts differently
310 if err == nil {
311 t.Log("⚠️ Orphaned post was indexed (FK constraint not enforced)")
312 } else {
313 t.Logf("✓ Orphaned post rejected by FK constraint (expected): %v", err)
314 }
315 })
316}
317
318// TestPostCreation_E2E_LivePDS tests the COMPLETE end-to-end flow with a live PDS:
319// 1. HTTP POST to /xrpc/social.coves.post.create (with auth)
320// 2. Handler → Service → Write to community's PDS repository
321// 3. PDS → Jetstream firehose event
322// 4. Jetstream consumer → Index in AppView database
323// 5. Verify post appears in database with correct data
324//
325// This is a TRUE E2E test that requires:
326// - Live PDS running at PDS_URL (default: http://localhost:3001)
327// - Live Jetstream running at JETSTREAM_URL (default: ws://localhost:6008/subscribe)
328// - Test database running
329func TestPostCreation_E2E_LivePDS(t *testing.T) {
330 if testing.Short() {
331 t.Skip("Skipping live PDS E2E test in short mode")
332 }
333
334 // Setup test database
335 dbURL := os.Getenv("TEST_DATABASE_URL")
336 if dbURL == "" {
337 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
338 }
339
340 db, err := sql.Open("postgres", dbURL)
341 require.NoError(t, err, "Failed to connect to test database")
342 defer func() {
343 if closeErr := db.Close(); closeErr != nil {
344 t.Logf("Failed to close database: %v", closeErr)
345 }
346 }()
347
348 // Run migrations
349 require.NoError(t, goose.SetDialect("postgres"))
350 require.NoError(t, goose.Up(db, "../../internal/db/migrations"))
351
352 // Check if PDS is running
353 pdsURL := os.Getenv("PDS_URL")
354 if pdsURL == "" {
355 pdsURL = "http://localhost:3001"
356 }
357
358 healthResp, err := http.Get(pdsURL + "/xrpc/_health")
359 if err != nil {
360 t.Skipf("PDS not running at %s: %v", pdsURL, err)
361 }
362 _ = healthResp.Body.Close()
363
364 // Get instance credentials for authentication
365 instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE")
366 instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD")
367 if instanceHandle == "" {
368 instanceHandle = "testuser123.local.coves.dev"
369 }
370 if instancePassword == "" {
371 instancePassword = "test-password-123"
372 }
373
374 t.Logf("🔐 Authenticating with PDS as: %s", instanceHandle)
375
376 // Authenticate to get instance DID (needed for provisioner domain)
377 _, instanceDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword)
378 if err != nil {
379 t.Skipf("Failed to authenticate with PDS (may not be configured): %v", err)
380 }
381
382 t.Logf("✅ Authenticated - Instance DID: %s", instanceDID)
383
384 // Extract instance domain from DID for community provisioning
385 var instanceDomain string
386 if strings.HasPrefix(instanceDID, "did:web:") {
387 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
388 } else {
389 // Fallback for did:plc
390 instanceDomain = "coves.social"
391 }
392
393 // Setup repositories and services
394 communityRepo := postgres.NewCommunityRepository(db)
395 postRepo := postgres.NewPostRepository(db)
396
397 // Setup PDS account provisioner for community creation
398 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL)
399
400 // Setup community service with real PDS provisioner
401 communityService := communities.NewCommunityService(
402 communityRepo,
403 pdsURL,
404 instanceDID,
405 instanceDomain,
406 provisioner, // ✅ Real provisioner for creating communities on PDS
407 )
408
409 postService := posts.NewPostService(postRepo, communityService, pdsURL)
410
411 // Setup auth middleware (skip JWT verification for testing)
412 authMiddleware := middleware.NewAtProtoAuthMiddleware(nil, true)
413
414 // Setup HTTP handler
415 createHandler := post.NewCreateHandler(postService)
416
417 ctx := context.Background()
418
419 // Cleanup old test data
420 _, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE 'did:plc:e2etest%'")
421 _, _ = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:e2etest%'")
422 _, _ = db.Exec("DELETE FROM users WHERE did LIKE 'did:plc:e2etest%'")
423
424 // Create test user (author)
425 author := createTestUser(t, db, "e2etestauthor.bsky.social", "did:plc:e2etestauthor123")
426
427 // ====================================================================================
428 // Part 1: Write-Forward to PDS
429 // ====================================================================================
430 t.Run("1. Write-Forward to PDS", func(t *testing.T) {
431 // TRUE E2E: Actually provision a real community on PDS
432 // This tests the full flow:
433 // 1. Call com.atproto.server.createAccount on PDS
434 // 2. PDS generates DID, keys, tokens
435 // 3. Write community profile to PDS repository
436 // 4. Store credentials in AppView DB
437 // 5. Use those credentials to create a post
438
439 // Use timestamp to ensure unique community name for each test run
440 communityName := fmt.Sprintf("e2epost%d", time.Now().Unix())
441
442 t.Logf("\n📝 Provisioning test community on live PDS (name: %s)...", communityName)
443 community, err := communityService.CreateCommunity(ctx, communities.CreateCommunityRequest{
444 Name: communityName,
445 DisplayName: "E2E Test Community",
446 Description: "Test community for E2E post creation testing",
447 CreatedByDID: author.DID,
448 Visibility: "public",
449 AllowExternalDiscovery: true,
450 })
451 require.NoError(t, err, "Failed to provision community on PDS")
452 require.NotEmpty(t, community.DID, "Community should have DID from PDS")
453 require.NotEmpty(t, community.PDSAccessToken, "Community should have access token")
454 require.NotEmpty(t, community.PDSRefreshToken, "Community should have refresh token")
455
456 t.Logf("✓ Community provisioned: DID=%s, Handle=%s", community.DID, community.Handle)
457
458 // NOTE: Cleanup disabled to allow post-test inspection of indexed data
459 // Uncomment to enable cleanup after test
460 // defer func() {
461 // if err := communityRepo.Delete(ctx, community.DID); err != nil {
462 // t.Logf("Warning: Failed to cleanup test community: %v", err)
463 // }
464 // }()
465
466 // Build HTTP request for post creation
467 title := "E2E Test Post"
468 content := "This post was created via full E2E test with live PDS!"
469 reqBody := map[string]interface{}{
470 "community": community.DID,
471 "title": title,
472 "content": content,
473 }
474 reqJSON, err := json.Marshal(reqBody)
475 require.NoError(t, err)
476
477 // Create HTTP request
478 req := httptest.NewRequest("POST", "/xrpc/social.coves.post.create", bytes.NewReader(reqJSON))
479 req.Header.Set("Content-Type", "application/json")
480
481 // Create a simple JWT for testing (Phase 1: no signature verification)
482 // In production, this would be a real OAuth token from PDS
483 testJWT := createSimpleTestJWT(author.DID)
484 req.Header.Set("Authorization", "Bearer "+testJWT)
485
486 // Execute request through auth middleware + handler
487 rr := httptest.NewRecorder()
488 handler := authMiddleware.RequireAuth(http.HandlerFunc(createHandler.HandleCreate))
489 handler.ServeHTTP(rr, req)
490
491 // Check response
492 require.Equal(t, http.StatusOK, rr.Code, "Handler should return 200 OK, body: %s", rr.Body.String())
493
494 // Parse response
495 var response posts.CreatePostResponse
496 err = json.NewDecoder(rr.Body).Decode(&response)
497 require.NoError(t, err, "Failed to parse response")
498
499 t.Logf("✅ Post created on PDS:")
500 t.Logf(" URI: %s", response.URI)
501 t.Logf(" CID: %s", response.CID)
502
503 // ====================================================================================
504 // Part 2: TRUE E2E - Real Jetstream Firehose Consumer
505 // ====================================================================================
506 // This part tests the ACTUAL production code path in main.go
507 // including the WebSocket connection and consumer logic
508 t.Run("2. Real Jetstream Firehose Consumption", func(t *testing.T) {
509 t.Logf("\n🔄 TRUE E2E: Subscribing to real Jetstream firehose...")
510
511 // Get PDS hostname for Jetstream filtering
512 pdsHostname := strings.TrimPrefix(pdsURL, "http://")
513 pdsHostname = strings.TrimPrefix(pdsHostname, "https://")
514 pdsHostname = strings.Split(pdsHostname, ":")[0] // Remove port
515
516 // Build Jetstream URL with filters for post records
517 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.post.record",
518 pdsHostname)
519
520 t.Logf(" Jetstream URL: %s", jetstreamURL)
521 t.Logf(" Looking for post URI: %s", response.URI)
522 t.Logf(" Community DID: %s", community.DID)
523
524 // Setup user service (required by post consumer)
525 userRepo := postgres.NewUserRepository(db)
526 identityConfig := identity.DefaultConfig()
527 plcURL := os.Getenv("PLC_DIRECTORY_URL")
528 if plcURL == "" {
529 plcURL = "http://localhost:3002"
530 }
531 identityConfig.PLCURL = plcURL
532 identityResolver := identity.NewResolver(db, identityConfig)
533 userService := users.NewUserService(userRepo, identityResolver, pdsURL)
534
535 // Create post consumer (same as main.go)
536 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
537
538 // Channels to receive the event
539 eventChan := make(chan *jetstream.JetstreamEvent, 10)
540 errorChan := make(chan error, 1)
541 done := make(chan bool)
542
543 // Start Jetstream WebSocket subscriber in background
544 // This creates its own WebSocket connection to Jetstream
545 go func() {
546 err := subscribeToJetstreamForPost(ctx, jetstreamURL, community.DID, postConsumer, eventChan, errorChan, done)
547 if err != nil {
548 errorChan <- err
549 }
550 }()
551
552 // Wait for event or timeout
553 t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...")
554
555 select {
556 case event := <-eventChan:
557 t.Logf("✅ Received real Jetstream event!")
558 t.Logf(" Event DID: %s", event.Did)
559 t.Logf(" Collection: %s", event.Commit.Collection)
560 t.Logf(" Operation: %s", event.Commit.Operation)
561 t.Logf(" RKey: %s", event.Commit.RKey)
562
563 // Verify it's for our community
564 assert.Equal(t, community.DID, event.Did, "Event should be from community repo")
565
566 // Verify post was indexed in AppView database
567 t.Logf("\n🔍 Querying AppView database for indexed post...")
568
569 indexedPost, err := postRepo.GetByURI(ctx, response.URI)
570 require.NoError(t, err, "Post should be indexed in AppView")
571
572 t.Logf("✅ Post indexed in AppView:")
573 t.Logf(" URI: %s", indexedPost.URI)
574 t.Logf(" CID: %s", indexedPost.CID)
575 t.Logf(" Author DID: %s", indexedPost.AuthorDID)
576 t.Logf(" Community: %s", indexedPost.CommunityDID)
577 t.Logf(" Title: %v", indexedPost.Title)
578 t.Logf(" Content: %v", indexedPost.Content)
579
580 // Verify all fields match what we sent
581 assert.Equal(t, response.URI, indexedPost.URI, "URI should match")
582 assert.Equal(t, response.CID, indexedPost.CID, "CID should match")
583 assert.Equal(t, author.DID, indexedPost.AuthorDID, "Author DID should match")
584 assert.Equal(t, community.DID, indexedPost.CommunityDID, "Community DID should match")
585 assert.Equal(t, title, *indexedPost.Title, "Title should match")
586 assert.Equal(t, content, *indexedPost.Content, "Content should match")
587
588 // Verify stats initialized correctly
589 assert.Equal(t, 0, indexedPost.UpvoteCount, "Upvote count should be 0")
590 assert.Equal(t, 0, indexedPost.DownvoteCount, "Downvote count should be 0")
591 assert.Equal(t, 0, indexedPost.Score, "Score should be 0")
592 assert.Equal(t, 0, indexedPost.CommentCount, "Comment count should be 0")
593
594 // Verify timestamps
595 assert.False(t, indexedPost.CreatedAt.IsZero(), "CreatedAt should be set")
596 assert.False(t, indexedPost.IndexedAt.IsZero(), "IndexedAt should be set")
597
598 // Signal to stop Jetstream consumer
599 close(done)
600
601 t.Log("\n✅ Part 2 Complete: TRUE E2E - PDS → Jetstream → Consumer → AppView ✓")
602
603 case err := <-errorChan:
604 t.Fatalf("❌ Jetstream error: %v", err)
605
606 case <-time.After(30 * time.Second):
607 t.Fatalf("❌ Timeout: No Jetstream event received within 30 seconds")
608 }
609 })
610 })
611}
612
613// createSimpleTestJWT creates a minimal JWT for testing (Phase 1 - no signature)
614// In production, this would be a real OAuth token from PDS with proper signatures
615func createSimpleTestJWT(userDID string) string {
616 // Create minimal JWT claims using RegisteredClaims
617 // Use userDID as issuer since we don't have a proper PDS DID for testing
618 claims := auth.Claims{
619 RegisteredClaims: jwt.RegisteredClaims{
620 Subject: userDID,
621 Issuer: userDID, // Use DID as issuer for testing (valid per atProto)
622 Audience: jwt.ClaimStrings{"did:web:test.coves.social"},
623 IssuedAt: jwt.NewNumericDate(time.Now()),
624 ExpiresAt: jwt.NewNumericDate(time.Now().Add(1 * time.Hour)),
625 },
626 Scope: "com.atproto.access",
627 }
628
629 // For Phase 1 testing, we create an unsigned JWT
630 // The middleware is configured with skipVerify=true for testing
631 header := map[string]interface{}{
632 "alg": "none",
633 "typ": "JWT",
634 }
635
636 headerJSON, _ := json.Marshal(header)
637 claimsJSON, _ := json.Marshal(claims)
638
639 // Base64url encode (without padding)
640 headerB64 := base64.RawURLEncoding.EncodeToString(headerJSON)
641 claimsB64 := base64.RawURLEncoding.EncodeToString(claimsJSON)
642
643 // For "alg: none", signature is empty
644 return headerB64 + "." + claimsB64 + "."
645}
646
647// generateTID generates a simple timestamp-based identifier for testing
648// In production, PDS generates proper TIDs
649func generateTID() string {
650 return fmt.Sprintf("3k%d", time.Now().UnixNano()/1000)
651}
652
653// subscribeToJetstreamForPost subscribes to real Jetstream firehose and processes post events
654// This helper creates a WebSocket connection to Jetstream and waits for post events
655func subscribeToJetstreamForPost(
656 ctx context.Context,
657 jetstreamURL string,
658 targetDID string,
659 consumer *jetstream.PostEventConsumer,
660 eventChan chan<- *jetstream.JetstreamEvent,
661 errorChan chan<- error,
662 done <-chan bool,
663) error {
664 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil)
665 if err != nil {
666 return fmt.Errorf("failed to connect to Jetstream: %w", err)
667 }
668 defer func() { _ = conn.Close() }()
669
670 // Read messages until we find our event or receive done signal
671 for {
672 select {
673 case <-done:
674 return nil
675 case <-ctx.Done():
676 return ctx.Err()
677 default:
678 // Set read deadline to avoid blocking forever
679 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
680 return fmt.Errorf("failed to set read deadline: %w", err)
681 }
682
683 var event jetstream.JetstreamEvent
684 err := conn.ReadJSON(&event)
685 if err != nil {
686 // Check if it's a timeout (expected)
687 if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
688 return nil
689 }
690 if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
691 continue // Timeout is expected, keep listening
692 }
693 // For other errors, don't retry reading from a broken connection
694 return fmt.Errorf("failed to read Jetstream message: %w", err)
695 }
696
697 // Check if this is a post event for the target DID
698 if event.Did == targetDID && event.Kind == "commit" &&
699 event.Commit != nil && event.Commit.Collection == "social.coves.post.record" {
700 // Process the event through the consumer
701 if err := consumer.HandleEvent(ctx, &event); err != nil {
702 return fmt.Errorf("failed to process event: %w", err)
703 }
704
705 // Send to channel so test can verify
706 select {
707 case eventChan <- &event:
708 return nil
709 case <-time.After(1 * time.Second):
710 return fmt.Errorf("timeout sending event to channel")
711 }
712 }
713 }
714 }
715}