···
4
+
"Coves/internal/api/handlers/vote"
5
+
"Coves/internal/api/middleware"
6
+
"Coves/internal/atproto/identity"
7
+
"Coves/internal/atproto/jetstream"
8
+
"Coves/internal/core/communities"
9
+
"Coves/internal/core/posts"
10
+
"Coves/internal/core/users"
11
+
"Coves/internal/core/votes"
12
+
"Coves/internal/db/postgres"
26
+
"github.com/gorilla/websocket"
27
+
_ "github.com/lib/pq"
28
+
"github.com/pressly/goose/v3"
29
+
"github.com/stretchr/testify/assert"
30
+
"github.com/stretchr/testify/require"
33
+
// TestVote_E2E_WithJetstream tests the full vote flow with simulated Jetstream:
34
+
// XRPC endpoint → AppView Service → PDS write → (Simulated) Jetstream consumer → DB indexing
36
+
// This is a fast integration test that simulates what happens in production:
37
+
// 1. Client calls POST /xrpc/social.coves.interaction.createVote with auth token
38
+
// 2. Handler validates and calls VoteService.CreateVote()
39
+
// 3. Service writes vote to user's PDS repository
40
+
// 4. (Simulated) PDS broadcasts event to Jetstream
41
+
// 5. Jetstream consumer receives event and indexes vote in AppView DB
42
+
// 6. Vote is now queryable from AppView + post counts updated
44
+
// NOTE: This test simulates the Jetstream event (step 4-5) since we don't have
45
+
// a live PDS/Jetstream in test environment. For true live testing, use TestVote_E2E_LivePDS.
46
+
func TestVote_E2E_WithJetstream(t *testing.T) {
47
+
db := setupTestDB(t)
49
+
if err := db.Close(); err != nil {
50
+
t.Logf("Failed to close database: %v", err)
54
+
// Cleanup old test data first
55
+
_, _ = db.Exec("DELETE FROM votes WHERE voter_did LIKE 'did:plc:votee2e%'")
56
+
_, _ = db.Exec("DELETE FROM posts WHERE community_did = 'did:plc:votecommunity123'")
57
+
_, _ = db.Exec("DELETE FROM communities WHERE did = 'did:plc:votecommunity123'")
58
+
_, _ = db.Exec("DELETE FROM users WHERE did LIKE 'did:plc:votee2e%'")
60
+
// Setup repositories
61
+
userRepo := postgres.NewUserRepository(db)
62
+
communityRepo := postgres.NewCommunityRepository(db)
63
+
postRepo := postgres.NewPostRepository(db)
64
+
voteRepo := postgres.NewVoteRepository(db)
66
+
// Setup user service for consumers
67
+
identityConfig := identity.DefaultConfig()
68
+
identityResolver := identity.NewResolver(db, identityConfig)
69
+
userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001")
71
+
// Create test users (voter and author)
72
+
voter := createTestUser(t, db, "voter.test", "did:plc:votee2evoter123")
73
+
author := createTestUser(t, db, "author.test", "did:plc:votee2eauthor123")
75
+
// Create test community
76
+
community := &communities.Community{
77
+
DID: "did:plc:votecommunity123",
78
+
Handle: "votecommunity.test.coves.social",
79
+
Name: "votecommunity",
80
+
DisplayName: "Vote Test Community",
81
+
OwnerDID: "did:plc:votecommunity123",
82
+
CreatedByDID: author.DID,
83
+
HostedByDID: "did:web:coves.test",
84
+
Visibility: "public",
85
+
ModerationType: "moderator",
86
+
RecordURI: "at://did:plc:votecommunity123/social.coves.community.profile/self",
87
+
RecordCID: "fakecid123",
88
+
PDSAccessToken: "fake_token_for_testing",
89
+
PDSRefreshToken: "fake_refresh_token",
91
+
_, err := communityRepo.Create(context.Background(), community)
93
+
t.Fatalf("Failed to create test community: %v", err)
96
+
// Create test post (subject of votes)
97
+
postRkey := generateTID()
98
+
postURI := fmt.Sprintf("at://%s/social.coves.post.record/%s", community.DID, postRkey)
99
+
postCID := "bafy2bzacepostcid123"
100
+
post := &posts.Post{
104
+
AuthorDID: author.DID,
105
+
CommunityDID: community.DID,
106
+
Title: stringPtr("Test Post for Voting"),
107
+
Content: stringPtr("This post will receive votes"),
108
+
CreatedAt: time.Now(),
113
+
err = postRepo.Create(context.Background(), post)
115
+
t.Fatalf("Failed to create test post: %v", err)
118
+
t.Run("Full E2E flow - Create upvote via Jetstream", func(t *testing.T) {
119
+
ctx := context.Background()
121
+
// STEP 1: Simulate Jetstream consumer receiving a vote CREATE event
122
+
// In real production, this event comes from PDS via Jetstream WebSocket
123
+
voteRkey := generateTID()
124
+
voteURI := fmt.Sprintf("at://%s/social.coves.interaction.vote/%s", voter.DID, voteRkey)
126
+
jetstreamEvent := jetstream.JetstreamEvent{
127
+
Did: voter.DID, // Vote comes from voter's repo
129
+
Commit: &jetstream.CommitEvent{
130
+
Operation: "create",
131
+
Collection: "social.coves.interaction.vote",
133
+
CID: "bafy2bzacevotecid123",
134
+
Record: map[string]interface{}{
135
+
"$type": "social.coves.interaction.vote",
136
+
"subject": map[string]interface{}{
141
+
"createdAt": time.Now().Format(time.RFC3339),
146
+
// STEP 2: Process event through Jetstream consumer
147
+
consumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
148
+
err := consumer.HandleEvent(ctx, &jetstreamEvent)
150
+
t.Fatalf("Jetstream consumer failed to process event: %v", err)
153
+
// STEP 3: Verify vote was indexed in AppView database
154
+
indexedVote, err := voteRepo.GetByURI(ctx, voteURI)
156
+
t.Fatalf("Vote not indexed in AppView: %v", err)
159
+
// STEP 4: Verify vote fields are correct
160
+
assert.Equal(t, voteURI, indexedVote.URI, "Vote URI should match")
161
+
assert.Equal(t, voter.DID, indexedVote.VoterDID, "Voter DID should match")
162
+
assert.Equal(t, postURI, indexedVote.SubjectURI, "Subject URI should match")
163
+
assert.Equal(t, postCID, indexedVote.SubjectCID, "Subject CID should match (strong reference)")
164
+
assert.Equal(t, "up", indexedVote.Direction, "Direction should be 'up'")
166
+
// STEP 5: Verify post vote counts were updated atomically
167
+
updatedPost, err := postRepo.GetByURI(ctx, postURI)
168
+
require.NoError(t, err, "Post should still exist")
169
+
assert.Equal(t, 1, updatedPost.UpvoteCount, "Post upvote_count should be 1")
170
+
assert.Equal(t, 0, updatedPost.DownvoteCount, "Post downvote_count should be 0")
171
+
assert.Equal(t, 1, updatedPost.Score, "Post score should be 1 (upvotes - downvotes)")
173
+
t.Logf("✓ E2E test passed! Vote indexed with URI: %s, post upvotes: %d", indexedVote.URI, updatedPost.UpvoteCount)
176
+
t.Run("Create downvote and verify counts", func(t *testing.T) {
177
+
ctx := context.Background()
179
+
// Create a different voter for this test to avoid unique constraint violation
180
+
downvoter := createTestUser(t, db, "downvoter.test", "did:plc:votee2edownvoter")
183
+
voteRkey := generateTID()
184
+
voteURI := fmt.Sprintf("at://%s/social.coves.interaction.vote/%s", downvoter.DID, voteRkey)
186
+
jetstreamEvent := jetstream.JetstreamEvent{
187
+
Did: downvoter.DID,
189
+
Commit: &jetstream.CommitEvent{
190
+
Operation: "create",
191
+
Collection: "social.coves.interaction.vote",
193
+
CID: "bafy2bzacedownvotecid",
194
+
Record: map[string]interface{}{
195
+
"$type": "social.coves.interaction.vote",
196
+
"subject": map[string]interface{}{
200
+
"direction": "down",
201
+
"createdAt": time.Now().Format(time.RFC3339),
206
+
consumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
207
+
err := consumer.HandleEvent(ctx, &jetstreamEvent)
208
+
require.NoError(t, err, "Consumer should process downvote")
210
+
// Verify vote indexed
211
+
indexedVote, err := voteRepo.GetByURI(ctx, voteURI)
212
+
require.NoError(t, err, "Downvote should be indexed")
213
+
assert.Equal(t, "down", indexedVote.Direction, "Direction should be 'down'")
215
+
// Verify post counts (now has 1 upvote + 1 downvote from previous test)
216
+
updatedPost, err := postRepo.GetByURI(ctx, postURI)
217
+
require.NoError(t, err)
218
+
assert.Equal(t, 1, updatedPost.UpvoteCount, "Upvote count should still be 1")
219
+
assert.Equal(t, 1, updatedPost.DownvoteCount, "Downvote count should be 1")
220
+
assert.Equal(t, 0, updatedPost.Score, "Score should be 0 (1 up - 1 down)")
222
+
t.Logf("✓ Downvote indexed, post counts: up=%d down=%d score=%d",
223
+
updatedPost.UpvoteCount, updatedPost.DownvoteCount, updatedPost.Score)
226
+
t.Run("Delete vote and verify counts decremented", func(t *testing.T) {
227
+
ctx := context.Background()
229
+
// Create a different voter for this test
230
+
deletevoter := createTestUser(t, db, "deletevoter.test", "did:plc:votee2edeletevoter")
232
+
// Get current counts
233
+
beforePost, _ := postRepo.GetByURI(ctx, postURI)
235
+
// Create a vote first
236
+
voteRkey := generateTID()
237
+
voteURI := fmt.Sprintf("at://%s/social.coves.interaction.vote/%s", deletevoter.DID, voteRkey)
239
+
createEvent := jetstream.JetstreamEvent{
240
+
Did: deletevoter.DID,
242
+
Commit: &jetstream.CommitEvent{
243
+
Operation: "create",
244
+
Collection: "social.coves.interaction.vote",
246
+
CID: "bafy2bzacedeleteme",
247
+
Record: map[string]interface{}{
248
+
"$type": "social.coves.interaction.vote",
249
+
"subject": map[string]interface{}{
254
+
"createdAt": time.Now().Format(time.RFC3339),
259
+
consumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
260
+
err := consumer.HandleEvent(ctx, &createEvent)
261
+
require.NoError(t, err)
264
+
deleteEvent := jetstream.JetstreamEvent{
265
+
Did: deletevoter.DID,
267
+
Commit: &jetstream.CommitEvent{
268
+
Operation: "delete",
269
+
Collection: "social.coves.interaction.vote",
274
+
err = consumer.HandleEvent(ctx, &deleteEvent)
275
+
require.NoError(t, err, "Consumer should process delete")
277
+
// Verify vote is soft-deleted
278
+
deletedVote, err := voteRepo.GetByURI(ctx, voteURI)
279
+
require.NoError(t, err, "Vote should still exist (soft delete)")
280
+
assert.NotNil(t, deletedVote.DeletedAt, "Vote should have deleted_at timestamp")
282
+
// Verify post counts decremented
283
+
afterPost, err := postRepo.GetByURI(ctx, postURI)
284
+
require.NoError(t, err)
285
+
assert.Equal(t, beforePost.UpvoteCount, afterPost.UpvoteCount,
286
+
"Upvote count should be back to original (delete decremented)")
288
+
t.Logf("✓ Vote deleted, counts decremented correctly")
291
+
t.Run("Idempotent indexing - duplicate events", func(t *testing.T) {
292
+
ctx := context.Background()
294
+
// Create a different voter for this test
295
+
idempotentvoter := createTestUser(t, db, "idempotentvoter.test", "did:plc:votee2eidempotent")
298
+
voteRkey := generateTID()
299
+
voteURI := fmt.Sprintf("at://%s/social.coves.interaction.vote/%s", idempotentvoter.DID, voteRkey)
301
+
event := jetstream.JetstreamEvent{
302
+
Did: idempotentvoter.DID,
304
+
Commit: &jetstream.CommitEvent{
305
+
Operation: "create",
306
+
Collection: "social.coves.interaction.vote",
308
+
CID: "bafy2bzaceidempotent",
309
+
Record: map[string]interface{}{
310
+
"$type": "social.coves.interaction.vote",
311
+
"subject": map[string]interface{}{
316
+
"createdAt": time.Now().Format(time.RFC3339),
321
+
consumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
323
+
// First event - should succeed
324
+
err := consumer.HandleEvent(ctx, &event)
325
+
require.NoError(t, err, "First event should succeed")
327
+
// Get counts after first event
328
+
firstPost, _ := postRepo.GetByURI(ctx, postURI)
330
+
// Second event (duplicate) - should be handled gracefully
331
+
err = consumer.HandleEvent(ctx, &event)
332
+
require.NoError(t, err, "Duplicate event should be handled gracefully")
334
+
// Verify counts NOT incremented again (idempotent)
335
+
secondPost, err := postRepo.GetByURI(ctx, postURI)
336
+
require.NoError(t, err)
337
+
assert.Equal(t, firstPost.UpvoteCount, secondPost.UpvoteCount,
338
+
"Duplicate event should not increment count again")
340
+
// Verify only one vote in database
341
+
vote, err := voteRepo.GetByURI(ctx, voteURI)
342
+
require.NoError(t, err)
343
+
assert.Equal(t, voteURI, vote.URI, "Should still be the same vote")
345
+
t.Logf("✓ Idempotency test passed - duplicate event handled correctly")
348
+
t.Run("Security: Vote from wrong repository rejected", func(t *testing.T) {
349
+
ctx := context.Background()
351
+
// SECURITY TEST: Try to create a vote that claims to be from the voter
352
+
// but actually comes from a different user's repository
353
+
// This should be REJECTED by the consumer
355
+
maliciousUser := createTestUser(t, db, "hacker.test", "did:plc:hacker123")
357
+
maliciousEvent := jetstream.JetstreamEvent{
358
+
Did: maliciousUser.DID, // Event from hacker's repo
360
+
Commit: &jetstream.CommitEvent{
361
+
Operation: "create",
362
+
Collection: "social.coves.interaction.vote",
363
+
RKey: generateTID(),
364
+
CID: "bafy2bzacefake",
365
+
Record: map[string]interface{}{
366
+
"$type": "social.coves.interaction.vote",
367
+
"subject": map[string]interface{}{
372
+
"createdAt": time.Now().Format(time.RFC3339),
377
+
consumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
378
+
err := consumer.HandleEvent(ctx, &maliciousEvent)
380
+
// Should succeed (vote is created in hacker's repo, which is valid)
381
+
// The vote record itself is FROM their repo, so it's legitimate
382
+
// This is different from posts which must come from community repo
383
+
assert.NoError(t, err, "Votes in user repos are valid")
385
+
t.Logf("✓ Security validation passed - user repo votes are allowed")
389
+
// TestVote_E2E_LivePDS tests the COMPLETE end-to-end flow with a live PDS:
390
+
// 1. HTTP POST to /xrpc/social.coves.interaction.createVote (with auth)
391
+
// 2. Handler → Service → Write to user's PDS repository
392
+
// 3. PDS → Jetstream firehose event
393
+
// 4. Jetstream consumer → Index in AppView database
394
+
// 5. Verify vote appears in database + post counts updated
396
+
// This is a TRUE E2E test that requires:
397
+
// - Live PDS running at PDS_URL (default: http://localhost:3001)
398
+
// - Live Jetstream running at JETSTREAM_URL (default: ws://localhost:6008/subscribe)
399
+
// - Test database running
400
+
func TestVote_E2E_LivePDS(t *testing.T) {
401
+
if testing.Short() {
402
+
t.Skip("Skipping live PDS E2E test in short mode")
405
+
// Setup test database
406
+
dbURL := os.Getenv("TEST_DATABASE_URL")
408
+
dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
411
+
db, err := sql.Open("postgres", dbURL)
412
+
require.NoError(t, err, "Failed to connect to test database")
414
+
if closeErr := db.Close(); closeErr != nil {
415
+
t.Logf("Failed to close database: %v", closeErr)
420
+
require.NoError(t, goose.SetDialect("postgres"))
421
+
require.NoError(t, goose.Up(db, "../../internal/db/migrations"))
423
+
// Check if PDS is running
424
+
pdsURL := os.Getenv("PDS_URL")
426
+
pdsURL = "http://localhost:3001"
429
+
healthResp, err := http.Get(pdsURL + "/xrpc/_health")
431
+
t.Skipf("PDS not running at %s: %v", pdsURL, err)
433
+
_ = healthResp.Body.Close()
435
+
// Check if Jetstream is running
436
+
jetstreamHealthURL := "http://127.0.0.1:6009/metrics" // Use 127.0.0.1 for IPv4
437
+
jetstreamResp, err := http.Get(jetstreamHealthURL)
439
+
t.Skipf("Jetstream not running: %v", err)
441
+
_ = jetstreamResp.Body.Close()
443
+
ctx := context.Background()
445
+
// Cleanup old test data
446
+
_, _ = db.Exec("DELETE FROM votes WHERE voter_did LIKE 'did:plc:votee2elive%' OR voter_did IN (SELECT did FROM users WHERE handle LIKE '%votee2elive%')")
447
+
_, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE 'did:plc:votee2elive%'")
448
+
_, _ = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:votee2elive%'")
449
+
_, _ = db.Exec("DELETE FROM users WHERE did LIKE 'did:plc:votee2elive%' OR handle LIKE '%votee2elive%' OR handle LIKE '%authore2e%'")
451
+
// Setup repositories and services
452
+
userRepo := postgres.NewUserRepository(db)
453
+
communityRepo := postgres.NewCommunityRepository(db)
454
+
postRepo := postgres.NewPostRepository(db)
455
+
voteRepo := postgres.NewVoteRepository(db)
457
+
identityConfig := identity.DefaultConfig()
458
+
identityResolver := identity.NewResolver(db, identityConfig)
459
+
userService := users.NewUserService(userRepo, identityResolver, pdsURL)
461
+
// Create test voter
462
+
voter := createTestUser(t, db, "votee2elive.bsky.social", "did:plc:votee2elive123")
464
+
// Create test community and post (simplified - using fake credentials)
465
+
author := createTestUser(t, db, "authore2e.bsky.social", "did:plc:votee2eliveauthor")
466
+
community := &communities.Community{
467
+
DID: "did:plc:votee2elivecommunity",
468
+
Handle: "votee2elivecommunity.test.coves.social",
469
+
Name: "votee2elivecommunity",
470
+
DisplayName: "Vote E2E Live Community",
471
+
OwnerDID: author.DID,
472
+
CreatedByDID: author.DID,
473
+
HostedByDID: "did:web:coves.test",
474
+
Visibility: "public",
475
+
ModerationType: "moderator",
476
+
RecordURI: "at://did:plc:votee2elivecommunity/social.coves.community.profile/self",
477
+
RecordCID: "fakecid",
478
+
PDSAccessToken: "fake_token",
479
+
PDSRefreshToken: "fake_refresh",
481
+
_, err = communityRepo.Create(ctx, community)
482
+
require.NoError(t, err)
484
+
postRkey := generateTID()
485
+
postURI := fmt.Sprintf("at://%s/social.coves.post.record/%s", community.DID, postRkey)
486
+
postCID := "bafy2bzaceposte2e"
487
+
post := &posts.Post{
491
+
AuthorDID: author.DID,
492
+
CommunityDID: community.DID,
493
+
Title: stringPtr("E2E Vote Test Post"),
494
+
Content: stringPtr("This post will receive live votes"),
495
+
CreatedAt: time.Now(),
500
+
err = postRepo.Create(ctx, post)
501
+
require.NoError(t, err)
503
+
// Setup vote service and handler
504
+
voteService := votes.NewVoteService(voteRepo, postRepo, pdsURL)
505
+
voteHandler := vote.NewCreateVoteHandler(voteService)
506
+
authMiddleware := middleware.NewAtProtoAuthMiddleware(nil, true) // Skip JWT verification for testing
508
+
t.Run("Live E2E: Create vote and verify via Jetstream", func(t *testing.T) {
509
+
t.Logf("\n🔄 TRUE E2E: Creating vote via XRPC endpoint...")
511
+
// Authenticate voter with PDS to get real access token
512
+
// Note: This assumes the voter account already exists on PDS
513
+
// For a complete test, you'd create the account first via com.atproto.server.createAccount
514
+
instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE")
515
+
instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD")
516
+
if instanceHandle == "" {
517
+
instanceHandle = "testuser123.local.coves.dev"
519
+
if instancePassword == "" {
520
+
instancePassword = "test-password-123"
523
+
t.Logf("🔐 Authenticating voter with PDS as: %s", instanceHandle)
524
+
voterAccessToken, voterDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword)
526
+
t.Skipf("Failed to authenticate voter with PDS (account may not exist): %v", err)
528
+
t.Logf("✅ Authenticated - Voter DID: %s", voterDID)
530
+
// Update voter record to match authenticated DID
531
+
_, err = db.Exec("UPDATE users SET did = $1 WHERE did = $2", voterDID, voter.DID)
532
+
require.NoError(t, err)
533
+
voter.DID = voterDID
535
+
// Build HTTP request for vote creation
536
+
reqBody := map[string]interface{}{
537
+
"subject": postURI,
540
+
reqJSON, err := json.Marshal(reqBody)
541
+
require.NoError(t, err)
543
+
// Create HTTP request
544
+
req := httptest.NewRequest("POST", "/xrpc/social.coves.interaction.createVote", bytes.NewReader(reqJSON))
545
+
req.Header.Set("Content-Type", "application/json")
547
+
// Use REAL PDS access token (not mock JWT)
548
+
req.Header.Set("Authorization", "Bearer "+voterAccessToken)
550
+
// Execute request through auth middleware + handler
551
+
rr := httptest.NewRecorder()
552
+
handler := authMiddleware.RequireAuth(http.HandlerFunc(voteHandler.HandleCreateVote))
553
+
handler.ServeHTTP(rr, req)
556
+
require.Equal(t, http.StatusOK, rr.Code, "Handler should return 200 OK, body: %s", rr.Body.String())
559
+
var response map[string]interface{}
560
+
err = json.NewDecoder(rr.Body).Decode(&response)
561
+
require.NoError(t, err, "Failed to parse response")
563
+
voteURI := response["uri"].(string)
564
+
voteCID := response["cid"].(string)
566
+
t.Logf("✅ Vote created on PDS:")
567
+
t.Logf(" URI: %s", voteURI)
568
+
t.Logf(" CID: %s", voteCID)
570
+
// ====================================================================================
571
+
// Part 2: Query the PDS to verify the vote record exists
572
+
// ====================================================================================
573
+
t.Run("2a. Verify vote record on PDS", func(t *testing.T) {
574
+
t.Logf("\n📡 Querying PDS for vote record...")
576
+
// Extract rkey from vote URI (at://did/collection/rkey)
577
+
parts := strings.Split(voteURI, "/")
578
+
rkey := parts[len(parts)-1]
580
+
// Query PDS for the vote record
581
+
getRecordURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s",
582
+
pdsURL, voterDID, "social.coves.interaction.vote", rkey)
584
+
t.Logf(" GET %s", getRecordURL)
586
+
pdsResp, err := http.Get(getRecordURL)
587
+
require.NoError(t, err, "Failed to query PDS")
588
+
defer pdsResp.Body.Close()
590
+
require.Equal(t, http.StatusOK, pdsResp.StatusCode, "Vote record should exist on PDS")
592
+
var pdsRecord struct {
593
+
Value map[string]interface{} `json:"value"`
594
+
URI string `json:"uri"`
595
+
CID string `json:"cid"`
598
+
err = json.NewDecoder(pdsResp.Body).Decode(&pdsRecord)
599
+
require.NoError(t, err, "Failed to decode PDS response")
601
+
t.Logf("✅ Vote record found on PDS!")
602
+
t.Logf(" URI: %s", pdsRecord.URI)
603
+
t.Logf(" CID: %s", pdsRecord.CID)
604
+
t.Logf(" Direction: %v", pdsRecord.Value["direction"])
605
+
t.Logf(" Subject: %v", pdsRecord.Value["subject"])
607
+
// Verify the record matches what we created
608
+
assert.Equal(t, voteURI, pdsRecord.URI, "PDS URI should match")
609
+
assert.Equal(t, voteCID, pdsRecord.CID, "PDS CID should match")
610
+
assert.Equal(t, "up", pdsRecord.Value["direction"], "Direction should be 'up'")
612
+
// Print full record for inspection
613
+
recordJSON, _ := json.MarshalIndent(pdsRecord.Value, " ", " ")
614
+
t.Logf(" Full record:\n %s", string(recordJSON))
617
+
// ====================================================================================
618
+
// Part 2b: TRUE E2E - Real Jetstream Firehose Consumer
619
+
// ====================================================================================
620
+
t.Run("2b. Real Jetstream Firehose Consumption", func(t *testing.T) {
621
+
t.Logf("\n🔄 TRUE E2E: Subscribing to real Jetstream firehose...")
623
+
// Get PDS hostname for Jetstream filtering
624
+
pdsHostname := strings.TrimPrefix(pdsURL, "http://")
625
+
pdsHostname = strings.TrimPrefix(pdsHostname, "https://")
626
+
pdsHostname = strings.Split(pdsHostname, ":")[0] // Remove port
628
+
// Build Jetstream URL with filters for vote records
629
+
jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.interaction.vote",
632
+
t.Logf(" Jetstream URL: %s", jetstreamURL)
633
+
t.Logf(" Looking for vote URI: %s", voteURI)
634
+
t.Logf(" Voter DID: %s", voterDID)
636
+
// Create vote consumer (same as main.go)
637
+
consumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
639
+
// Channels to receive the event
640
+
eventChan := make(chan *jetstream.JetstreamEvent, 10)
641
+
errorChan := make(chan error, 1)
642
+
done := make(chan bool)
644
+
// Start Jetstream WebSocket subscriber in background
646
+
err := subscribeToJetstreamForVote(ctx, jetstreamURL, voterDID, postURI, consumer, eventChan, errorChan, done)
652
+
// Wait for event or timeout
653
+
t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...")
656
+
case event := <-eventChan:
657
+
t.Logf("✅ Received real Jetstream event!")
658
+
t.Logf(" Event DID: %s", event.Did)
659
+
t.Logf(" Collection: %s", event.Commit.Collection)
660
+
t.Logf(" Operation: %s", event.Commit.Operation)
661
+
t.Logf(" RKey: %s", event.Commit.RKey)
663
+
// Verify it's for our voter
664
+
assert.Equal(t, voterDID, event.Did, "Event should be from voter's repo")
666
+
// Verify vote was indexed in AppView database
667
+
t.Logf("\n🔍 Querying AppView database for indexed vote...")
669
+
indexedVote, err := voteRepo.GetByVoterAndSubject(ctx, voterDID, postURI)
670
+
require.NoError(t, err, "Vote should be indexed in AppView")
672
+
t.Logf("✅ Vote indexed in AppView:")
673
+
t.Logf(" URI: %s", indexedVote.URI)
674
+
t.Logf(" CID: %s", indexedVote.CID)
675
+
t.Logf(" Voter DID: %s", indexedVote.VoterDID)
676
+
t.Logf(" Subject: %s", indexedVote.SubjectURI)
677
+
t.Logf(" Direction: %s", indexedVote.Direction)
679
+
// Verify all fields match
680
+
assert.Equal(t, voteURI, indexedVote.URI, "URI should match")
681
+
assert.Equal(t, voteCID, indexedVote.CID, "CID should match")
682
+
assert.Equal(t, voterDID, indexedVote.VoterDID, "Voter DID should match")
683
+
assert.Equal(t, postURI, indexedVote.SubjectURI, "Subject URI should match")
684
+
assert.Equal(t, "up", indexedVote.Direction, "Direction should be 'up'")
686
+
// Verify post counts were updated
687
+
t.Logf("\n🔍 Verifying post vote counts updated...")
688
+
updatedPost, err := postRepo.GetByURI(ctx, postURI)
689
+
require.NoError(t, err, "Post should exist")
691
+
t.Logf("✅ Post vote counts updated:")
692
+
t.Logf(" Upvotes: %d", updatedPost.UpvoteCount)
693
+
t.Logf(" Downvotes: %d", updatedPost.DownvoteCount)
694
+
t.Logf(" Score: %d", updatedPost.Score)
696
+
assert.Equal(t, 1, updatedPost.UpvoteCount, "Upvote count should be 1")
697
+
assert.Equal(t, 0, updatedPost.DownvoteCount, "Downvote count should be 0")
698
+
assert.Equal(t, 1, updatedPost.Score, "Score should be 1")
700
+
// Signal to stop Jetstream consumer
703
+
t.Log("\n✅ TRUE E2E COMPLETE: PDS → Jetstream → Consumer → AppView ✓")
705
+
case err := <-errorChan:
706
+
t.Fatalf("❌ Jetstream error: %v", err)
708
+
case <-time.After(30 * time.Second):
709
+
t.Fatalf("❌ Timeout: No Jetstream event received within 30 seconds")
715
+
// subscribeToJetstreamForVote subscribes to real Jetstream firehose and processes vote events
716
+
// This helper creates a WebSocket connection to Jetstream and waits for vote events
717
+
func subscribeToJetstreamForVote(
718
+
ctx context.Context,
719
+
jetstreamURL string,
720
+
targetVoterDID string,
721
+
targetSubjectURI string,
722
+
consumer *jetstream.VoteEventConsumer,
723
+
eventChan chan<- *jetstream.JetstreamEvent,
724
+
errorChan chan<- error,
727
+
conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil)
729
+
return fmt.Errorf("failed to connect to Jetstream: %w", err)
731
+
defer func() { _ = conn.Close() }()
733
+
// Read messages until we find our event or receive done signal
741
+
// Set read deadline to avoid blocking forever
742
+
if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
743
+
return fmt.Errorf("failed to set read deadline: %w", err)
746
+
var event jetstream.JetstreamEvent
747
+
err := conn.ReadJSON(&event)
749
+
// Check if it's a timeout (expected)
750
+
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
753
+
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
754
+
continue // Timeout is expected, keep listening
756
+
return fmt.Errorf("failed to read Jetstream message: %w", err)
759
+
// Check if this is a vote event for the target voter + subject
760
+
if event.Did == targetVoterDID && event.Kind == "commit" &&
761
+
event.Commit != nil && event.Commit.Collection == "social.coves.interaction.vote" {
763
+
// Verify it's for the target subject
764
+
record := event.Commit.Record
765
+
if subject, ok := record["subject"].(map[string]interface{}); ok {
766
+
if subjectURI, ok := subject["uri"].(string); ok && subjectURI == targetSubjectURI {
767
+
// This is our vote! Process it
768
+
if err := consumer.HandleEvent(ctx, &event); err != nil {
769
+
return fmt.Errorf("failed to process event: %w", err)
772
+
// Send to channel so test can verify
774
+
case eventChan <- &event:
776
+
case <-time.After(1 * time.Second):
777
+
return fmt.Errorf("timeout sending event to channel")
787
+
func stringPtr(s string) *string {