···
4
+
"Coves/internal/api/handlers/post"
5
+
"Coves/internal/api/middleware"
6
+
"Coves/internal/atproto/auth"
7
+
"Coves/internal/atproto/identity"
8
+
"Coves/internal/atproto/jetstream"
9
+
"Coves/internal/core/communities"
10
+
"Coves/internal/core/posts"
11
+
"Coves/internal/core/users"
12
+
"Coves/internal/db/postgres"
27
+
"github.com/golang-jwt/jwt/v5"
28
+
"github.com/gorilla/websocket"
29
+
_ "github.com/lib/pq"
30
+
"github.com/pressly/goose/v3"
31
+
"github.com/stretchr/testify/assert"
32
+
"github.com/stretchr/testify/require"
35
+
// TestPostCreation_E2E_WithJetstream tests the full post creation flow:
36
+
// XRPC endpoint → AppView Service → PDS write → Jetstream consumer → DB indexing
38
+
// This is a TRUE E2E test that simulates what happens in production:
39
+
// 1. Client calls POST /xrpc/social.coves.post.create with auth token
40
+
// 2. Handler validates and calls PostService.CreatePost()
41
+
// 3. Service writes post to community's PDS repository
42
+
// 4. PDS broadcasts event to firehose/Jetstream
43
+
// 5. Jetstream consumer receives event and indexes post in AppView DB
44
+
// 6. Post is now queryable from AppView
46
+
// NOTE: This test simulates the Jetstream event (step 4-5) since we don't have
47
+
// a live PDS/Jetstream in test environment. For true live testing, use TestPostCreation_E2E_LivePDS.
48
+
func TestPostCreation_E2E_WithJetstream(t *testing.T) {
49
+
db := setupTestDB(t)
51
+
if err := db.Close(); err != nil {
52
+
t.Logf("Failed to close database: %v", err)
56
+
// Cleanup old test data first
57
+
_, _ = db.Exec("DELETE FROM posts WHERE community_did = 'did:plc:gaming123'")
58
+
_, _ = db.Exec("DELETE FROM communities WHERE did = 'did:plc:gaming123'")
59
+
_, _ = db.Exec("DELETE FROM users WHERE did = 'did:plc:alice123'")
61
+
// Setup repositories
62
+
userRepo := postgres.NewUserRepository(db)
63
+
communityRepo := postgres.NewCommunityRepository(db)
64
+
postRepo := postgres.NewPostRepository(db)
66
+
// Setup user service for post consumer
67
+
identityConfig := identity.DefaultConfig()
68
+
identityResolver := identity.NewResolver(db, identityConfig)
69
+
userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001")
71
+
// Create test user (author)
72
+
author := createTestUser(t, db, "alice.test", "did:plc:alice123")
74
+
// Create test community with fake PDS credentials
75
+
// In real E2E, this would be a real community provisioned on PDS
76
+
community := &communities.Community{
77
+
DID: "did:plc:gaming123",
78
+
Handle: "gaming.community.test.coves.social",
80
+
DisplayName: "Gaming Community",
81
+
OwnerDID: "did:plc:gaming123",
82
+
CreatedByDID: author.DID,
83
+
HostedByDID: "did:web:coves.test",
84
+
Visibility: "public",
85
+
ModerationType: "moderator",
86
+
RecordURI: "at://did:plc:gaming123/social.coves.community.profile/self",
87
+
RecordCID: "fakecid123",
88
+
PDSAccessToken: "fake_token_for_testing",
89
+
PDSRefreshToken: "fake_refresh_token",
91
+
_, err := communityRepo.Create(context.Background(), community)
93
+
t.Fatalf("Failed to create test community: %v", err)
96
+
t.Run("Full E2E flow - XRPC to DB via Jetstream", func(t *testing.T) {
97
+
ctx := context.Background()
99
+
// STEP 1: Simulate what the XRPC handler would receive
100
+
// In real flow, this comes from client with OAuth bearer token
101
+
title := "My First Post"
102
+
content := "This is a test post!"
103
+
postReq := posts.CreatePostRequest{
106
+
// Community and AuthorDID set by handler from request context
109
+
// STEP 2: Simulate Jetstream consumer receiving the post CREATE event
110
+
// In real production, this event comes from PDS via Jetstream WebSocket
111
+
// For this test, we simulate the event that would be broadcast after PDS write
113
+
// Generate a realistic rkey (TID - timestamp identifier)
114
+
rkey := generateTID()
116
+
// Build the post record as it would appear in Jetstream
117
+
jetstreamEvent := jetstream.JetstreamEvent{
118
+
Did: community.DID, // Repo owner (community)
120
+
Commit: &jetstream.CommitEvent{
121
+
Operation: "create",
122
+
Collection: "social.coves.post.record",
124
+
CID: "bafy2bzaceabc123def456", // Fake CID
125
+
Record: map[string]interface{}{
126
+
"$type": "social.coves.post.record",
127
+
"community": community.DID,
128
+
"author": author.DID,
129
+
"title": *postReq.Title,
130
+
"content": *postReq.Content,
131
+
"createdAt": time.Now().Format(time.RFC3339),
136
+
// STEP 3: Process event through Jetstream consumer
137
+
consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
138
+
err := consumer.HandleEvent(ctx, &jetstreamEvent)
140
+
t.Fatalf("Jetstream consumer failed to process event: %v", err)
143
+
// STEP 4: Verify post was indexed in AppView database
144
+
expectedURI := fmt.Sprintf("at://%s/social.coves.post.record/%s", community.DID, rkey)
145
+
indexedPost, err := postRepo.GetByURI(ctx, expectedURI)
147
+
t.Fatalf("Post not indexed in AppView: %v", err)
150
+
// STEP 5: Verify all fields are correct
151
+
if indexedPost.URI != expectedURI {
152
+
t.Errorf("Expected URI %s, got %s", expectedURI, indexedPost.URI)
154
+
if indexedPost.AuthorDID != author.DID {
155
+
t.Errorf("Expected author %s, got %s", author.DID, indexedPost.AuthorDID)
157
+
if indexedPost.CommunityDID != community.DID {
158
+
t.Errorf("Expected community %s, got %s", community.DID, indexedPost.CommunityDID)
160
+
if indexedPost.Title == nil || *indexedPost.Title != title {
161
+
t.Errorf("Expected title '%s', got %v", title, indexedPost.Title)
163
+
if indexedPost.Content == nil || *indexedPost.Content != content {
164
+
t.Errorf("Expected content '%s', got %v", content, indexedPost.Content)
167
+
// Verify stats initialized correctly
168
+
if indexedPost.UpvoteCount != 0 {
169
+
t.Errorf("Expected upvote_count 0, got %d", indexedPost.UpvoteCount)
171
+
if indexedPost.DownvoteCount != 0 {
172
+
t.Errorf("Expected downvote_count 0, got %d", indexedPost.DownvoteCount)
174
+
if indexedPost.Score != 0 {
175
+
t.Errorf("Expected score 0, got %d", indexedPost.Score)
178
+
t.Logf("✓ E2E test passed! Post indexed with URI: %s", indexedPost.URI)
181
+
t.Run("Consumer validates repository ownership (security)", func(t *testing.T) {
182
+
ctx := context.Background()
184
+
// SECURITY TEST: Try to create a post that claims to be from the community
185
+
// but actually comes from a user's repository
186
+
// This should be REJECTED by the consumer
188
+
maliciousEvent := jetstream.JetstreamEvent{
189
+
Did: author.DID, // Event from user's repo (NOT community repo)
191
+
Commit: &jetstream.CommitEvent{
192
+
Operation: "create",
193
+
Collection: "social.coves.post.record",
194
+
RKey: generateTID(),
195
+
CID: "bafy2bzacefake",
196
+
Record: map[string]interface{}{
197
+
"$type": "social.coves.post.record",
198
+
"community": community.DID, // Claims to be for this community
199
+
"author": author.DID,
200
+
"title": "Fake Post",
201
+
"content": "This is a malicious post attempt",
202
+
"createdAt": time.Now().Format(time.RFC3339),
207
+
consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
208
+
err := consumer.HandleEvent(ctx, &maliciousEvent)
210
+
// Should get security error
212
+
t.Fatal("Expected security error for post from wrong repository, got nil")
215
+
if !contains(err.Error(), "repository DID") || !contains(err.Error(), "doesn't match") {
216
+
t.Errorf("Expected repository mismatch error, got: %v", err)
219
+
t.Logf("✓ Security validation passed: %v", err)
222
+
t.Run("Idempotent indexing - duplicate events", func(t *testing.T) {
223
+
ctx := context.Background()
225
+
// Simulate the same Jetstream event arriving twice
226
+
// This can happen during Jetstream replays or network retries
227
+
rkey := generateTID()
228
+
event := jetstream.JetstreamEvent{
229
+
Did: community.DID,
231
+
Commit: &jetstream.CommitEvent{
232
+
Operation: "create",
233
+
Collection: "social.coves.post.record",
235
+
CID: "bafy2bzaceidempotent",
236
+
Record: map[string]interface{}{
237
+
"$type": "social.coves.post.record",
238
+
"community": community.DID,
239
+
"author": author.DID,
240
+
"title": "Duplicate Test",
241
+
"content": "Testing idempotency",
242
+
"createdAt": time.Now().Format(time.RFC3339),
247
+
consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
249
+
// First event - should succeed
250
+
err := consumer.HandleEvent(ctx, &event)
252
+
t.Fatalf("First event failed: %v", err)
255
+
// Second event (duplicate) - should be handled gracefully
256
+
err = consumer.HandleEvent(ctx, &event)
258
+
t.Fatalf("Duplicate event should be handled gracefully, got error: %v", err)
261
+
// Verify only one post in database
262
+
uri := fmt.Sprintf("at://%s/social.coves.post.record/%s", community.DID, rkey)
263
+
post, err := postRepo.GetByURI(ctx, uri)
265
+
t.Fatalf("Post not found: %v", err)
268
+
if post.URI != uri {
269
+
t.Error("Post URI mismatch - possible duplicate indexing")
272
+
t.Logf("✓ Idempotency test passed")
275
+
t.Run("Handles orphaned posts (unknown community)", func(t *testing.T) {
276
+
ctx := context.Background()
278
+
// Post references a community that doesn't exist in AppView yet
279
+
// This can happen if Jetstream delivers post event before community profile event
280
+
unknownCommunityDID := "did:plc:unknown999"
282
+
event := jetstream.JetstreamEvent{
283
+
Did: unknownCommunityDID,
285
+
Commit: &jetstream.CommitEvent{
286
+
Operation: "create",
287
+
Collection: "social.coves.post.record",
288
+
RKey: generateTID(),
289
+
CID: "bafy2bzaceorphaned",
290
+
Record: map[string]interface{}{
291
+
"$type": "social.coves.post.record",
292
+
"community": unknownCommunityDID,
293
+
"author": author.DID,
294
+
"title": "Orphaned Post",
295
+
"content": "Community not indexed yet",
296
+
"createdAt": time.Now().Format(time.RFC3339),
301
+
consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
303
+
// Should log warning but NOT fail (eventual consistency)
304
+
// Note: This will fail due to foreign key constraint in current schema
305
+
// In production, you might want to handle this differently (defer indexing, etc.)
306
+
err := consumer.HandleEvent(ctx, &event)
308
+
// For now, we expect this to fail due to FK constraint
309
+
// In future, we might make FK constraint DEFERRABLE or handle orphaned posts differently
311
+
t.Log("⚠️ Orphaned post was indexed (FK constraint not enforced)")
313
+
t.Logf("✓ Orphaned post rejected by FK constraint (expected): %v", err)
318
+
// TestPostCreation_E2E_LivePDS tests the COMPLETE end-to-end flow with a live PDS:
319
+
// 1. HTTP POST to /xrpc/social.coves.post.create (with auth)
320
+
// 2. Handler → Service → Write to community's PDS repository
321
+
// 3. PDS → Jetstream firehose event
322
+
// 4. Jetstream consumer → Index in AppView database
323
+
// 5. Verify post appears in database with correct data
325
+
// This is a TRUE E2E test that requires:
326
+
// - Live PDS running at PDS_URL (default: http://localhost:3001)
327
+
// - Live Jetstream running at JETSTREAM_URL (default: ws://localhost:6008/subscribe)
328
+
// - Test database running
329
+
func TestPostCreation_E2E_LivePDS(t *testing.T) {
330
+
if testing.Short() {
331
+
t.Skip("Skipping live PDS E2E test in short mode")
334
+
// Setup test database
335
+
dbURL := os.Getenv("TEST_DATABASE_URL")
337
+
dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
340
+
db, err := sql.Open("postgres", dbURL)
341
+
require.NoError(t, err, "Failed to connect to test database")
343
+
if closeErr := db.Close(); closeErr != nil {
344
+
t.Logf("Failed to close database: %v", closeErr)
349
+
require.NoError(t, goose.SetDialect("postgres"))
350
+
require.NoError(t, goose.Up(db, "../../internal/db/migrations"))
352
+
// Check if PDS is running
353
+
pdsURL := os.Getenv("PDS_URL")
355
+
pdsURL = "http://localhost:3001"
358
+
healthResp, err := http.Get(pdsURL + "/xrpc/_health")
360
+
t.Skipf("PDS not running at %s: %v", pdsURL, err)
362
+
_ = healthResp.Body.Close()
364
+
// Get instance credentials for authentication
365
+
instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE")
366
+
instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD")
367
+
if instanceHandle == "" {
368
+
instanceHandle = "testuser123.local.coves.dev"
370
+
if instancePassword == "" {
371
+
instancePassword = "test-password-123"
374
+
t.Logf("🔐 Authenticating with PDS as: %s", instanceHandle)
376
+
// Authenticate to get instance DID (needed for provisioner domain)
377
+
_, instanceDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword)
379
+
t.Skipf("Failed to authenticate with PDS (may not be configured): %v", err)
382
+
t.Logf("✅ Authenticated - Instance DID: %s", instanceDID)
384
+
// Extract instance domain from DID for community provisioning
385
+
var instanceDomain string
386
+
if strings.HasPrefix(instanceDID, "did:web:") {
387
+
instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
389
+
// Fallback for did:plc
390
+
instanceDomain = "coves.social"
393
+
// Setup repositories and services
394
+
communityRepo := postgres.NewCommunityRepository(db)
395
+
postRepo := postgres.NewPostRepository(db)
397
+
// Setup PDS account provisioner for community creation
398
+
provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL)
400
+
// Setup community service with real PDS provisioner
401
+
communityService := communities.NewCommunityService(
406
+
provisioner, // ✅ Real provisioner for creating communities on PDS
409
+
postService := posts.NewPostService(postRepo, communityService, pdsURL)
411
+
// Setup auth middleware (skip JWT verification for testing)
412
+
authMiddleware := middleware.NewAtProtoAuthMiddleware(nil, true)
414
+
// Setup HTTP handler
415
+
createHandler := post.NewCreateHandler(postService)
417
+
ctx := context.Background()
419
+
// Cleanup old test data
420
+
_, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE 'did:plc:e2etest%'")
421
+
_, _ = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:e2etest%'")
422
+
_, _ = db.Exec("DELETE FROM users WHERE did LIKE 'did:plc:e2etest%'")
424
+
// Create test user (author)
425
+
author := createTestUser(t, db, "e2etestauthor.bsky.social", "did:plc:e2etestauthor123")
427
+
// ====================================================================================
428
+
// Part 1: Write-Forward to PDS
429
+
// ====================================================================================
430
+
t.Run("1. Write-Forward to PDS", func(t *testing.T) {
431
+
// TRUE E2E: Actually provision a real community on PDS
432
+
// This tests the full flow:
433
+
// 1. Call com.atproto.server.createAccount on PDS
434
+
// 2. PDS generates DID, keys, tokens
435
+
// 3. Write community profile to PDS repository
436
+
// 4. Store credentials in AppView DB
437
+
// 5. Use those credentials to create a post
439
+
// Use timestamp to ensure unique community name for each test run
440
+
communityName := fmt.Sprintf("e2epost%d", time.Now().Unix())
442
+
t.Logf("\n📝 Provisioning test community on live PDS (name: %s)...", communityName)
443
+
community, err := communityService.CreateCommunity(ctx, communities.CreateCommunityRequest{
444
+
Name: communityName,
445
+
DisplayName: "E2E Test Community",
446
+
Description: "Test community for E2E post creation testing",
447
+
CreatedByDID: author.DID,
448
+
Visibility: "public",
449
+
AllowExternalDiscovery: true,
451
+
require.NoError(t, err, "Failed to provision community on PDS")
452
+
require.NotEmpty(t, community.DID, "Community should have DID from PDS")
453
+
require.NotEmpty(t, community.PDSAccessToken, "Community should have access token")
454
+
require.NotEmpty(t, community.PDSRefreshToken, "Community should have refresh token")
456
+
t.Logf("✓ Community provisioned: DID=%s, Handle=%s", community.DID, community.Handle)
458
+
// NOTE: Cleanup disabled to allow post-test inspection of indexed data
459
+
// Uncomment to enable cleanup after test
461
+
// if err := communityRepo.Delete(ctx, community.DID); err != nil {
462
+
// t.Logf("Warning: Failed to cleanup test community: %v", err)
466
+
// Build HTTP request for post creation
467
+
title := "E2E Test Post"
468
+
content := "This post was created via full E2E test with live PDS!"
469
+
reqBody := map[string]interface{}{
470
+
"community": community.DID,
472
+
"content": content,
474
+
reqJSON, err := json.Marshal(reqBody)
475
+
require.NoError(t, err)
477
+
// Create HTTP request
478
+
req := httptest.NewRequest("POST", "/xrpc/social.coves.post.create", bytes.NewReader(reqJSON))
479
+
req.Header.Set("Content-Type", "application/json")
481
+
// Create a simple JWT for testing (Phase 1: no signature verification)
482
+
// In production, this would be a real OAuth token from PDS
483
+
testJWT := createSimpleTestJWT(author.DID)
484
+
req.Header.Set("Authorization", "Bearer "+testJWT)
486
+
// Execute request through auth middleware + handler
487
+
rr := httptest.NewRecorder()
488
+
handler := authMiddleware.RequireAuth(http.HandlerFunc(createHandler.HandleCreate))
489
+
handler.ServeHTTP(rr, req)
492
+
require.Equal(t, http.StatusOK, rr.Code, "Handler should return 200 OK, body: %s", rr.Body.String())
495
+
var response posts.CreatePostResponse
496
+
err = json.NewDecoder(rr.Body).Decode(&response)
497
+
require.NoError(t, err, "Failed to parse response")
499
+
t.Logf("✅ Post created on PDS:")
500
+
t.Logf(" URI: %s", response.URI)
501
+
t.Logf(" CID: %s", response.CID)
503
+
// ====================================================================================
504
+
// Part 2: TRUE E2E - Real Jetstream Firehose Consumer
505
+
// ====================================================================================
506
+
// This part tests the ACTUAL production code path in main.go
507
+
// including the WebSocket connection and consumer logic
508
+
t.Run("2. Real Jetstream Firehose Consumption", func(t *testing.T) {
509
+
t.Logf("\n🔄 TRUE E2E: Subscribing to real Jetstream firehose...")
511
+
// Get PDS hostname for Jetstream filtering
512
+
pdsHostname := strings.TrimPrefix(pdsURL, "http://")
513
+
pdsHostname = strings.TrimPrefix(pdsHostname, "https://")
514
+
pdsHostname = strings.Split(pdsHostname, ":")[0] // Remove port
516
+
// Build Jetstream URL with filters for post records
517
+
jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.post.record",
520
+
t.Logf(" Jetstream URL: %s", jetstreamURL)
521
+
t.Logf(" Looking for post URI: %s", response.URI)
522
+
t.Logf(" Community DID: %s", community.DID)
524
+
// Setup user service (required by post consumer)
525
+
userRepo := postgres.NewUserRepository(db)
526
+
identityConfig := identity.DefaultConfig()
527
+
plcURL := os.Getenv("PLC_DIRECTORY_URL")
529
+
plcURL = "http://localhost:3002"
531
+
identityConfig.PLCURL = plcURL
532
+
identityResolver := identity.NewResolver(db, identityConfig)
533
+
userService := users.NewUserService(userRepo, identityResolver, pdsURL)
535
+
// Create post consumer (same as main.go)
536
+
postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
538
+
// Channels to receive the event
539
+
eventChan := make(chan *jetstream.JetstreamEvent, 10)
540
+
errorChan := make(chan error, 1)
541
+
done := make(chan bool)
543
+
// Start Jetstream WebSocket subscriber in background
544
+
// This creates its own WebSocket connection to Jetstream
546
+
err := subscribeToJetstreamForPost(ctx, jetstreamURL, community.DID, postConsumer, eventChan, errorChan, done)
552
+
// Wait for event or timeout
553
+
t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...")
556
+
case event := <-eventChan:
557
+
t.Logf("✅ Received real Jetstream event!")
558
+
t.Logf(" Event DID: %s", event.Did)
559
+
t.Logf(" Collection: %s", event.Commit.Collection)
560
+
t.Logf(" Operation: %s", event.Commit.Operation)
561
+
t.Logf(" RKey: %s", event.Commit.RKey)
563
+
// Verify it's for our community
564
+
assert.Equal(t, community.DID, event.Did, "Event should be from community repo")
566
+
// Verify post was indexed in AppView database
567
+
t.Logf("\n🔍 Querying AppView database for indexed post...")
569
+
indexedPost, err := postRepo.GetByURI(ctx, response.URI)
570
+
require.NoError(t, err, "Post should be indexed in AppView")
572
+
t.Logf("✅ Post indexed in AppView:")
573
+
t.Logf(" URI: %s", indexedPost.URI)
574
+
t.Logf(" CID: %s", indexedPost.CID)
575
+
t.Logf(" Author DID: %s", indexedPost.AuthorDID)
576
+
t.Logf(" Community: %s", indexedPost.CommunityDID)
577
+
t.Logf(" Title: %v", indexedPost.Title)
578
+
t.Logf(" Content: %v", indexedPost.Content)
580
+
// Verify all fields match what we sent
581
+
assert.Equal(t, response.URI, indexedPost.URI, "URI should match")
582
+
assert.Equal(t, response.CID, indexedPost.CID, "CID should match")
583
+
assert.Equal(t, author.DID, indexedPost.AuthorDID, "Author DID should match")
584
+
assert.Equal(t, community.DID, indexedPost.CommunityDID, "Community DID should match")
585
+
assert.Equal(t, title, *indexedPost.Title, "Title should match")
586
+
assert.Equal(t, content, *indexedPost.Content, "Content should match")
588
+
// Verify stats initialized correctly
589
+
assert.Equal(t, 0, indexedPost.UpvoteCount, "Upvote count should be 0")
590
+
assert.Equal(t, 0, indexedPost.DownvoteCount, "Downvote count should be 0")
591
+
assert.Equal(t, 0, indexedPost.Score, "Score should be 0")
592
+
assert.Equal(t, 0, indexedPost.CommentCount, "Comment count should be 0")
594
+
// Verify timestamps
595
+
assert.False(t, indexedPost.CreatedAt.IsZero(), "CreatedAt should be set")
596
+
assert.False(t, indexedPost.IndexedAt.IsZero(), "IndexedAt should be set")
598
+
// Signal to stop Jetstream consumer
601
+
t.Log("\n✅ Part 2 Complete: TRUE E2E - PDS → Jetstream → Consumer → AppView ✓")
603
+
case err := <-errorChan:
604
+
t.Fatalf("❌ Jetstream error: %v", err)
606
+
case <-time.After(30 * time.Second):
607
+
t.Fatalf("❌ Timeout: No Jetstream event received within 30 seconds")
613
+
// createSimpleTestJWT creates a minimal JWT for testing (Phase 1 - no signature)
614
+
// In production, this would be a real OAuth token from PDS with proper signatures
615
+
func createSimpleTestJWT(userDID string) string {
616
+
// Create minimal JWT claims using RegisteredClaims
617
+
// Use userDID as issuer since we don't have a proper PDS DID for testing
618
+
claims := auth.Claims{
619
+
RegisteredClaims: jwt.RegisteredClaims{
621
+
Issuer: userDID, // Use DID as issuer for testing (valid per atProto)
622
+
Audience: jwt.ClaimStrings{"did:web:test.coves.social"},
623
+
IssuedAt: jwt.NewNumericDate(time.Now()),
624
+
ExpiresAt: jwt.NewNumericDate(time.Now().Add(1 * time.Hour)),
626
+
Scope: "com.atproto.access",
629
+
// For Phase 1 testing, we create an unsigned JWT
630
+
// The middleware is configured with skipVerify=true for testing
631
+
header := map[string]interface{}{
636
+
headerJSON, _ := json.Marshal(header)
637
+
claimsJSON, _ := json.Marshal(claims)
639
+
// Base64url encode (without padding)
640
+
headerB64 := base64.RawURLEncoding.EncodeToString(headerJSON)
641
+
claimsB64 := base64.RawURLEncoding.EncodeToString(claimsJSON)
643
+
// For "alg: none", signature is empty
644
+
return headerB64 + "." + claimsB64 + "."
647
+
// generateTID generates a simple timestamp-based identifier for testing
648
+
// In production, PDS generates proper TIDs
649
+
func generateTID() string {
650
+
return fmt.Sprintf("3k%d", time.Now().UnixNano()/1000)
653
+
// subscribeToJetstreamForPost subscribes to real Jetstream firehose and processes post events
654
+
// This helper creates a WebSocket connection to Jetstream and waits for post events
655
+
func subscribeToJetstreamForPost(
656
+
ctx context.Context,
657
+
jetstreamURL string,
659
+
consumer *jetstream.PostEventConsumer,
660
+
eventChan chan<- *jetstream.JetstreamEvent,
661
+
errorChan chan<- error,
664
+
conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil)
666
+
return fmt.Errorf("failed to connect to Jetstream: %w", err)
668
+
defer func() { _ = conn.Close() }()
670
+
// Read messages until we find our event or receive done signal
678
+
// Set read deadline to avoid blocking forever
679
+
if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
680
+
return fmt.Errorf("failed to set read deadline: %w", err)
683
+
var event jetstream.JetstreamEvent
684
+
err := conn.ReadJSON(&event)
686
+
// Check if it's a timeout (expected)
687
+
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
690
+
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
691
+
continue // Timeout is expected, keep listening
693
+
// For other errors, don't retry reading from a broken connection
694
+
return fmt.Errorf("failed to read Jetstream message: %w", err)
697
+
// Check if this is a post event for the target DID
698
+
if event.Did == targetDID && event.Kind == "commit" &&
699
+
event.Commit != nil && event.Commit.Collection == "social.coves.post.record" {
700
+
// Process the event through the consumer
701
+
if err := consumer.HandleEvent(ctx, &event); err != nil {
702
+
return fmt.Errorf("failed to process event: %w", err)
705
+
// Send to channel so test can verify
707
+
case eventChan <- &event:
709
+
case <-time.After(1 * time.Second):
710
+
return fmt.Errorf("timeout sending event to channel")