···
4
-
"Coves/internal/api/handlers/vote"
5
-
"Coves/internal/api/middleware"
6
-
"Coves/internal/atproto/identity"
7
-
"Coves/internal/atproto/jetstream"
8
-
"Coves/internal/core/communities"
9
-
"Coves/internal/core/posts"
10
-
"Coves/internal/core/users"
11
-
"Coves/internal/core/votes"
12
-
"Coves/internal/db/postgres"
26
-
"github.com/gorilla/websocket"
27
-
_ "github.com/lib/pq"
28
-
"github.com/pressly/goose/v3"
29
-
"github.com/stretchr/testify/assert"
30
-
"github.com/stretchr/testify/require"
33
-
// TestVote_E2E_WithJetstream tests the full vote flow with simulated Jetstream:
34
-
// XRPC endpoint → AppView Service → PDS write → (Simulated) Jetstream consumer → DB indexing
36
-
// This is a fast integration test that simulates what happens in production:
37
-
// 1. Client calls POST /xrpc/social.coves.interaction.createVote with auth token
38
-
// 2. Handler validates and calls VoteService.CreateVote()
39
-
// 3. Service writes vote to user's PDS repository
40
-
// 4. (Simulated) PDS broadcasts event to Jetstream
41
-
// 5. Jetstream consumer receives event and indexes vote in AppView DB
42
-
// 6. Vote is now queryable from AppView + post counts updated
44
-
// NOTE: This test simulates the Jetstream event (step 4-5) since we don't have
45
-
// a live PDS/Jetstream in test environment. For true live testing, use TestVote_E2E_LivePDS.
46
-
func TestVote_E2E_WithJetstream(t *testing.T) {
47
-
db := setupTestDB(t)
49
-
if err := db.Close(); err != nil {
50
-
t.Logf("Failed to close database: %v", err)
54
-
// Cleanup old test data first
55
-
_, _ = db.Exec("DELETE FROM votes WHERE voter_did LIKE 'did:plc:votee2e%'")
56
-
_, _ = db.Exec("DELETE FROM posts WHERE community_did = 'did:plc:votecommunity123'")
57
-
_, _ = db.Exec("DELETE FROM communities WHERE did = 'did:plc:votecommunity123'")
58
-
_, _ = db.Exec("DELETE FROM users WHERE did LIKE 'did:plc:votee2e%'")
60
-
// Setup repositories
61
-
userRepo := postgres.NewUserRepository(db)
62
-
communityRepo := postgres.NewCommunityRepository(db)
63
-
postRepo := postgres.NewPostRepository(db)
64
-
voteRepo := postgres.NewVoteRepository(db)
66
-
// Setup user service for consumers
67
-
identityConfig := identity.DefaultConfig()
68
-
identityResolver := identity.NewResolver(db, identityConfig)
69
-
userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001")
71
-
// Create test users (voter and author)
72
-
voter := createTestUser(t, db, "voter.test", "did:plc:votee2evoter123")
73
-
author := createTestUser(t, db, "author.test", "did:plc:votee2eauthor123")
75
-
// Create test community
76
-
community := &communities.Community{
77
-
DID: "did:plc:votecommunity123",
78
-
Handle: "votecommunity.test.coves.social",
79
-
Name: "votecommunity",
80
-
DisplayName: "Vote Test Community",
81
-
OwnerDID: "did:plc:votecommunity123",
82
-
CreatedByDID: author.DID,
83
-
HostedByDID: "did:web:coves.test",
84
-
Visibility: "public",
85
-
ModerationType: "moderator",
86
-
RecordURI: "at://did:plc:votecommunity123/social.coves.community.profile/self",
87
-
RecordCID: "fakecid123",
88
-
PDSAccessToken: "fake_token_for_testing",
89
-
PDSRefreshToken: "fake_refresh_token",
91
-
_, err := communityRepo.Create(context.Background(), community)
93
-
t.Fatalf("Failed to create test community: %v", err)
96
-
// Create test post (subject of votes)
97
-
postRkey := generateTID()
98
-
postURI := fmt.Sprintf("at://%s/social.coves.post.record/%s", community.DID, postRkey)
99
-
postCID := "bafy2bzacepostcid123"
100
-
post := &posts.Post{
104
-
AuthorDID: author.DID,
105
-
CommunityDID: community.DID,
106
-
Title: stringPtr("Test Post for Voting"),
107
-
Content: stringPtr("This post will receive votes"),
108
-
CreatedAt: time.Now(),
113
-
err = postRepo.Create(context.Background(), post)
115
-
t.Fatalf("Failed to create test post: %v", err)
118
-
t.Run("Full E2E flow - Create upvote via Jetstream", func(t *testing.T) {
119
-
ctx := context.Background()
121
-
// STEP 1: Simulate Jetstream consumer receiving a vote CREATE event
122
-
// In real production, this event comes from PDS via Jetstream WebSocket
123
-
voteRkey := generateTID()
124
-
voteURI := fmt.Sprintf("at://%s/social.coves.interaction.vote/%s", voter.DID, voteRkey)
126
-
jetstreamEvent := jetstream.JetstreamEvent{
127
-
Did: voter.DID, // Vote comes from voter's repo
129
-
Commit: &jetstream.CommitEvent{
130
-
Operation: "create",
131
-
Collection: "social.coves.interaction.vote",
133
-
CID: "bafy2bzacevotecid123",
134
-
Record: map[string]interface{}{
135
-
"$type": "social.coves.interaction.vote",
136
-
"subject": map[string]interface{}{
141
-
"createdAt": time.Now().Format(time.RFC3339),
146
-
// STEP 2: Process event through Jetstream consumer
147
-
consumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
148
-
err := consumer.HandleEvent(ctx, &jetstreamEvent)
150
-
t.Fatalf("Jetstream consumer failed to process event: %v", err)
153
-
// STEP 3: Verify vote was indexed in AppView database
154
-
indexedVote, err := voteRepo.GetByURI(ctx, voteURI)
156
-
t.Fatalf("Vote not indexed in AppView: %v", err)
159
-
// STEP 4: Verify vote fields are correct
160
-
assert.Equal(t, voteURI, indexedVote.URI, "Vote URI should match")
161
-
assert.Equal(t, voter.DID, indexedVote.VoterDID, "Voter DID should match")
162
-
assert.Equal(t, postURI, indexedVote.SubjectURI, "Subject URI should match")
163
-
assert.Equal(t, postCID, indexedVote.SubjectCID, "Subject CID should match (strong reference)")
164
-
assert.Equal(t, "up", indexedVote.Direction, "Direction should be 'up'")
166
-
// STEP 5: Verify post vote counts were updated atomically
167
-
updatedPost, err := postRepo.GetByURI(ctx, postURI)
168
-
require.NoError(t, err, "Post should still exist")
169
-
assert.Equal(t, 1, updatedPost.UpvoteCount, "Post upvote_count should be 1")
170
-
assert.Equal(t, 0, updatedPost.DownvoteCount, "Post downvote_count should be 0")
171
-
assert.Equal(t, 1, updatedPost.Score, "Post score should be 1 (upvotes - downvotes)")
173
-
t.Logf("✓ E2E test passed! Vote indexed with URI: %s, post upvotes: %d", indexedVote.URI, updatedPost.UpvoteCount)
176
-
t.Run("Create downvote and verify counts", func(t *testing.T) {
177
-
ctx := context.Background()
179
-
// Create a different voter for this test to avoid unique constraint violation
180
-
downvoter := createTestUser(t, db, "downvoter.test", "did:plc:votee2edownvoter")
183
-
voteRkey := generateTID()
184
-
voteURI := fmt.Sprintf("at://%s/social.coves.interaction.vote/%s", downvoter.DID, voteRkey)
186
-
jetstreamEvent := jetstream.JetstreamEvent{
187
-
Did: downvoter.DID,
189
-
Commit: &jetstream.CommitEvent{
190
-
Operation: "create",
191
-
Collection: "social.coves.interaction.vote",
193
-
CID: "bafy2bzacedownvotecid",
194
-
Record: map[string]interface{}{
195
-
"$type": "social.coves.interaction.vote",
196
-
"subject": map[string]interface{}{
200
-
"direction": "down",
201
-
"createdAt": time.Now().Format(time.RFC3339),
206
-
consumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
207
-
err := consumer.HandleEvent(ctx, &jetstreamEvent)
208
-
require.NoError(t, err, "Consumer should process downvote")
210
-
// Verify vote indexed
211
-
indexedVote, err := voteRepo.GetByURI(ctx, voteURI)
212
-
require.NoError(t, err, "Downvote should be indexed")
213
-
assert.Equal(t, "down", indexedVote.Direction, "Direction should be 'down'")
215
-
// Verify post counts (now has 1 upvote + 1 downvote from previous test)
216
-
updatedPost, err := postRepo.GetByURI(ctx, postURI)
217
-
require.NoError(t, err)
218
-
assert.Equal(t, 1, updatedPost.UpvoteCount, "Upvote count should still be 1")
219
-
assert.Equal(t, 1, updatedPost.DownvoteCount, "Downvote count should be 1")
220
-
assert.Equal(t, 0, updatedPost.Score, "Score should be 0 (1 up - 1 down)")
222
-
t.Logf("✓ Downvote indexed, post counts: up=%d down=%d score=%d",
223
-
updatedPost.UpvoteCount, updatedPost.DownvoteCount, updatedPost.Score)
226
-
t.Run("Delete vote and verify counts decremented", func(t *testing.T) {
227
-
ctx := context.Background()
229
-
// Create a different voter for this test
230
-
deletevoter := createTestUser(t, db, "deletevoter.test", "did:plc:votee2edeletevoter")
232
-
// Get current counts
233
-
beforePost, _ := postRepo.GetByURI(ctx, postURI)
235
-
// Create a vote first
236
-
voteRkey := generateTID()
237
-
voteURI := fmt.Sprintf("at://%s/social.coves.interaction.vote/%s", deletevoter.DID, voteRkey)
239
-
createEvent := jetstream.JetstreamEvent{
240
-
Did: deletevoter.DID,
242
-
Commit: &jetstream.CommitEvent{
243
-
Operation: "create",
244
-
Collection: "social.coves.interaction.vote",
246
-
CID: "bafy2bzacedeleteme",
247
-
Record: map[string]interface{}{
248
-
"$type": "social.coves.interaction.vote",
249
-
"subject": map[string]interface{}{
254
-
"createdAt": time.Now().Format(time.RFC3339),
259
-
consumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
260
-
err := consumer.HandleEvent(ctx, &createEvent)
261
-
require.NoError(t, err)
264
-
deleteEvent := jetstream.JetstreamEvent{
265
-
Did: deletevoter.DID,
267
-
Commit: &jetstream.CommitEvent{
268
-
Operation: "delete",
269
-
Collection: "social.coves.interaction.vote",
274
-
err = consumer.HandleEvent(ctx, &deleteEvent)
275
-
require.NoError(t, err, "Consumer should process delete")
277
-
// Verify vote is soft-deleted
278
-
deletedVote, err := voteRepo.GetByURI(ctx, voteURI)
279
-
require.NoError(t, err, "Vote should still exist (soft delete)")
280
-
assert.NotNil(t, deletedVote.DeletedAt, "Vote should have deleted_at timestamp")
282
-
// Verify post counts decremented
283
-
afterPost, err := postRepo.GetByURI(ctx, postURI)
284
-
require.NoError(t, err)
285
-
assert.Equal(t, beforePost.UpvoteCount, afterPost.UpvoteCount,
286
-
"Upvote count should be back to original (delete decremented)")
288
-
t.Logf("✓ Vote deleted, counts decremented correctly")
291
-
t.Run("Idempotent indexing - duplicate events", func(t *testing.T) {
292
-
ctx := context.Background()
294
-
// Create a different voter for this test
295
-
idempotentvoter := createTestUser(t, db, "idempotentvoter.test", "did:plc:votee2eidempotent")
298
-
voteRkey := generateTID()
299
-
voteURI := fmt.Sprintf("at://%s/social.coves.interaction.vote/%s", idempotentvoter.DID, voteRkey)
301
-
event := jetstream.JetstreamEvent{
302
-
Did: idempotentvoter.DID,
304
-
Commit: &jetstream.CommitEvent{
305
-
Operation: "create",
306
-
Collection: "social.coves.interaction.vote",
308
-
CID: "bafy2bzaceidempotent",
309
-
Record: map[string]interface{}{
310
-
"$type": "social.coves.interaction.vote",
311
-
"subject": map[string]interface{}{
316
-
"createdAt": time.Now().Format(time.RFC3339),
321
-
consumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
323
-
// First event - should succeed
324
-
err := consumer.HandleEvent(ctx, &event)
325
-
require.NoError(t, err, "First event should succeed")
327
-
// Get counts after first event
328
-
firstPost, _ := postRepo.GetByURI(ctx, postURI)
330
-
// Second event (duplicate) - should be handled gracefully
331
-
err = consumer.HandleEvent(ctx, &event)
332
-
require.NoError(t, err, "Duplicate event should be handled gracefully")
334
-
// Verify counts NOT incremented again (idempotent)
335
-
secondPost, err := postRepo.GetByURI(ctx, postURI)
336
-
require.NoError(t, err)
337
-
assert.Equal(t, firstPost.UpvoteCount, secondPost.UpvoteCount,
338
-
"Duplicate event should not increment count again")
340
-
// Verify only one vote in database
341
-
vote, err := voteRepo.GetByURI(ctx, voteURI)
342
-
require.NoError(t, err)
343
-
assert.Equal(t, voteURI, vote.URI, "Should still be the same vote")
345
-
t.Logf("✓ Idempotency test passed - duplicate event handled correctly")
348
-
t.Run("Security: Vote from wrong repository rejected", func(t *testing.T) {
349
-
ctx := context.Background()
351
-
// SECURITY TEST: Try to create a vote that claims to be from the voter
352
-
// but actually comes from a different user's repository
353
-
// This should be REJECTED by the consumer
355
-
maliciousUser := createTestUser(t, db, "hacker.test", "did:plc:hacker123")
357
-
maliciousEvent := jetstream.JetstreamEvent{
358
-
Did: maliciousUser.DID, // Event from hacker's repo
360
-
Commit: &jetstream.CommitEvent{
361
-
Operation: "create",
362
-
Collection: "social.coves.interaction.vote",
363
-
RKey: generateTID(),
364
-
CID: "bafy2bzacefake",
365
-
Record: map[string]interface{}{
366
-
"$type": "social.coves.interaction.vote",
367
-
"subject": map[string]interface{}{
372
-
"createdAt": time.Now().Format(time.RFC3339),
377
-
consumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
378
-
err := consumer.HandleEvent(ctx, &maliciousEvent)
380
-
// Should succeed (vote is created in hacker's repo, which is valid)
381
-
// The vote record itself is FROM their repo, so it's legitimate
382
-
// This is different from posts which must come from community repo
383
-
assert.NoError(t, err, "Votes in user repos are valid")
385
-
t.Logf("✓ Security validation passed - user repo votes are allowed")
389
-
// TestVote_E2E_LivePDS tests the COMPLETE end-to-end flow with a live PDS:
390
-
// 1. HTTP POST to /xrpc/social.coves.interaction.createVote (with auth)
391
-
// 2. Handler → Service → Write to user's PDS repository
392
-
// 3. PDS → Jetstream firehose event
393
-
// 4. Jetstream consumer → Index in AppView database
394
-
// 5. Verify vote appears in database + post counts updated
396
-
// This is a TRUE E2E test that requires:
397
-
// - Live PDS running at PDS_URL (default: http://localhost:3001)
398
-
// - Live Jetstream running at JETSTREAM_URL (default: ws://localhost:6008/subscribe)
399
-
// - Test database running
400
-
func TestVote_E2E_LivePDS(t *testing.T) {
401
-
if testing.Short() {
402
-
t.Skip("Skipping live PDS E2E test in short mode")
405
-
// Setup test database
406
-
dbURL := os.Getenv("TEST_DATABASE_URL")
408
-
dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
411
-
db, err := sql.Open("postgres", dbURL)
412
-
require.NoError(t, err, "Failed to connect to test database")
414
-
if closeErr := db.Close(); closeErr != nil {
415
-
t.Logf("Failed to close database: %v", closeErr)
420
-
require.NoError(t, goose.SetDialect("postgres"))
421
-
require.NoError(t, goose.Up(db, "../../internal/db/migrations"))
423
-
// Check if PDS is running
424
-
pdsURL := os.Getenv("PDS_URL")
426
-
pdsURL = "http://localhost:3001"
429
-
healthResp, err := http.Get(pdsURL + "/xrpc/_health")
431
-
t.Skipf("PDS not running at %s: %v", pdsURL, err)
433
-
_ = healthResp.Body.Close()
435
-
// Check if Jetstream is running
436
-
jetstreamHealthURL := "http://127.0.0.1:6009/metrics" // Use 127.0.0.1 for IPv4
437
-
jetstreamResp, err := http.Get(jetstreamHealthURL)
439
-
t.Skipf("Jetstream not running: %v", err)
441
-
_ = jetstreamResp.Body.Close()
443
-
ctx := context.Background()
445
-
// Cleanup old test data
446
-
_, _ = db.Exec("DELETE FROM votes WHERE voter_did LIKE 'did:plc:votee2elive%' OR voter_did IN (SELECT did FROM users WHERE handle LIKE '%votee2elive%')")
447
-
_, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE 'did:plc:votee2elive%'")
448
-
_, _ = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:votee2elive%'")
449
-
_, _ = db.Exec("DELETE FROM users WHERE did LIKE 'did:plc:votee2elive%' OR handle LIKE '%votee2elive%' OR handle LIKE '%authore2e%'")
451
-
// Setup repositories and services
452
-
userRepo := postgres.NewUserRepository(db)
453
-
communityRepo := postgres.NewCommunityRepository(db)
454
-
postRepo := postgres.NewPostRepository(db)
455
-
voteRepo := postgres.NewVoteRepository(db)
457
-
identityConfig := identity.DefaultConfig()
458
-
identityResolver := identity.NewResolver(db, identityConfig)
459
-
userService := users.NewUserService(userRepo, identityResolver, pdsURL)
461
-
// Create test voter
462
-
voter := createTestUser(t, db, "votee2elive.bsky.social", "did:plc:votee2elive123")
464
-
// Create test community and post (simplified - using fake credentials)
465
-
author := createTestUser(t, db, "authore2e.bsky.social", "did:plc:votee2eliveauthor")
466
-
community := &communities.Community{
467
-
DID: "did:plc:votee2elivecommunity",
468
-
Handle: "votee2elivecommunity.test.coves.social",
469
-
Name: "votee2elivecommunity",
470
-
DisplayName: "Vote E2E Live Community",
471
-
OwnerDID: author.DID,
472
-
CreatedByDID: author.DID,
473
-
HostedByDID: "did:web:coves.test",
474
-
Visibility: "public",
475
-
ModerationType: "moderator",
476
-
RecordURI: "at://did:plc:votee2elivecommunity/social.coves.community.profile/self",
477
-
RecordCID: "fakecid",
478
-
PDSAccessToken: "fake_token",
479
-
PDSRefreshToken: "fake_refresh",
481
-
_, err = communityRepo.Create(ctx, community)
482
-
require.NoError(t, err)
484
-
postRkey := generateTID()
485
-
postURI := fmt.Sprintf("at://%s/social.coves.post.record/%s", community.DID, postRkey)
486
-
postCID := "bafy2bzaceposte2e"
487
-
post := &posts.Post{
491
-
AuthorDID: author.DID,
492
-
CommunityDID: community.DID,
493
-
Title: stringPtr("E2E Vote Test Post"),
494
-
Content: stringPtr("This post will receive live votes"),
495
-
CreatedAt: time.Now(),
500
-
err = postRepo.Create(ctx, post)
501
-
require.NoError(t, err)
503
-
// Setup vote service and handler
504
-
voteService := votes.NewVoteService(voteRepo, postRepo, pdsURL)
505
-
voteHandler := vote.NewCreateVoteHandler(voteService)
506
-
authMiddleware := middleware.NewAtProtoAuthMiddleware(nil, true) // Skip JWT verification for testing
508
-
t.Run("Live E2E: Create vote and verify via Jetstream", func(t *testing.T) {
509
-
t.Logf("\n🔄 TRUE E2E: Creating vote via XRPC endpoint...")
511
-
// Authenticate voter with PDS to get real access token
512
-
// Note: This assumes the voter account already exists on PDS
513
-
// For a complete test, you'd create the account first via com.atproto.server.createAccount
514
-
instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE")
515
-
instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD")
516
-
if instanceHandle == "" {
517
-
instanceHandle = "testuser123.local.coves.dev"
519
-
if instancePassword == "" {
520
-
instancePassword = "test-password-123"
523
-
t.Logf("🔐 Authenticating voter with PDS as: %s", instanceHandle)
524
-
voterAccessToken, voterDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword)
526
-
t.Skipf("Failed to authenticate voter with PDS (account may not exist): %v", err)
528
-
t.Logf("✅ Authenticated - Voter DID: %s", voterDID)
530
-
// Update voter record to match authenticated DID
531
-
_, err = db.Exec("UPDATE users SET did = $1 WHERE did = $2", voterDID, voter.DID)
532
-
require.NoError(t, err)
533
-
voter.DID = voterDID
535
-
// Build HTTP request for vote creation
536
-
reqBody := map[string]interface{}{
537
-
"subject": postURI,
540
-
reqJSON, err := json.Marshal(reqBody)
541
-
require.NoError(t, err)
543
-
// Create HTTP request
544
-
req := httptest.NewRequest("POST", "/xrpc/social.coves.interaction.createVote", bytes.NewReader(reqJSON))
545
-
req.Header.Set("Content-Type", "application/json")
547
-
// Use REAL PDS access token (not mock JWT)
548
-
req.Header.Set("Authorization", "Bearer "+voterAccessToken)
550
-
// Execute request through auth middleware + handler
551
-
rr := httptest.NewRecorder()
552
-
handler := authMiddleware.RequireAuth(http.HandlerFunc(voteHandler.HandleCreateVote))
553
-
handler.ServeHTTP(rr, req)
556
-
require.Equal(t, http.StatusOK, rr.Code, "Handler should return 200 OK, body: %s", rr.Body.String())
559
-
var response map[string]interface{}
560
-
err = json.NewDecoder(rr.Body).Decode(&response)
561
-
require.NoError(t, err, "Failed to parse response")
563
-
voteURI := response["uri"].(string)
564
-
voteCID := response["cid"].(string)
566
-
t.Logf("✅ Vote created on PDS:")
567
-
t.Logf(" URI: %s", voteURI)
568
-
t.Logf(" CID: %s", voteCID)
570
-
// ====================================================================================
571
-
// Part 2: Query the PDS to verify the vote record exists
572
-
// ====================================================================================
573
-
t.Run("2a. Verify vote record on PDS", func(t *testing.T) {
574
-
t.Logf("\n📡 Querying PDS for vote record...")
576
-
// Extract rkey from vote URI (at://did/collection/rkey)
577
-
parts := strings.Split(voteURI, "/")
578
-
rkey := parts[len(parts)-1]
580
-
// Query PDS for the vote record
581
-
getRecordURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s",
582
-
pdsURL, voterDID, "social.coves.interaction.vote", rkey)
584
-
t.Logf(" GET %s", getRecordURL)
586
-
pdsResp, err := http.Get(getRecordURL)
587
-
require.NoError(t, err, "Failed to query PDS")
588
-
defer pdsResp.Body.Close()
590
-
require.Equal(t, http.StatusOK, pdsResp.StatusCode, "Vote record should exist on PDS")
592
-
var pdsRecord struct {
593
-
Value map[string]interface{} `json:"value"`
594
-
URI string `json:"uri"`
595
-
CID string `json:"cid"`
598
-
err = json.NewDecoder(pdsResp.Body).Decode(&pdsRecord)
599
-
require.NoError(t, err, "Failed to decode PDS response")
601
-
t.Logf("✅ Vote record found on PDS!")
602
-
t.Logf(" URI: %s", pdsRecord.URI)
603
-
t.Logf(" CID: %s", pdsRecord.CID)
604
-
t.Logf(" Direction: %v", pdsRecord.Value["direction"])
605
-
t.Logf(" Subject: %v", pdsRecord.Value["subject"])
607
-
// Verify the record matches what we created
608
-
assert.Equal(t, voteURI, pdsRecord.URI, "PDS URI should match")
609
-
assert.Equal(t, voteCID, pdsRecord.CID, "PDS CID should match")
610
-
assert.Equal(t, "up", pdsRecord.Value["direction"], "Direction should be 'up'")
612
-
// Print full record for inspection
613
-
recordJSON, _ := json.MarshalIndent(pdsRecord.Value, " ", " ")
614
-
t.Logf(" Full record:\n %s", string(recordJSON))
617
-
// ====================================================================================
618
-
// Part 2b: TRUE E2E - Real Jetstream Firehose Consumer
619
-
// ====================================================================================
620
-
t.Run("2b. Real Jetstream Firehose Consumption", func(t *testing.T) {
621
-
t.Logf("\n🔄 TRUE E2E: Subscribing to real Jetstream firehose...")
623
-
// Get PDS hostname for Jetstream filtering
624
-
pdsHostname := strings.TrimPrefix(pdsURL, "http://")
625
-
pdsHostname = strings.TrimPrefix(pdsHostname, "https://")
626
-
pdsHostname = strings.Split(pdsHostname, ":")[0] // Remove port
628
-
// Build Jetstream URL with filters for vote records
629
-
jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.interaction.vote",
632
-
t.Logf(" Jetstream URL: %s", jetstreamURL)
633
-
t.Logf(" Looking for vote URI: %s", voteURI)
634
-
t.Logf(" Voter DID: %s", voterDID)
636
-
// Create vote consumer (same as main.go)
637
-
consumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
639
-
// Channels to receive the event
640
-
eventChan := make(chan *jetstream.JetstreamEvent, 10)
641
-
errorChan := make(chan error, 1)
642
-
done := make(chan bool)
644
-
// Start Jetstream WebSocket subscriber in background
646
-
err := subscribeToJetstreamForVote(ctx, jetstreamURL, voterDID, postURI, consumer, eventChan, errorChan, done)
652
-
// Wait for event or timeout
653
-
t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...")
656
-
case event := <-eventChan:
657
-
t.Logf("✅ Received real Jetstream event!")
658
-
t.Logf(" Event DID: %s", event.Did)
659
-
t.Logf(" Collection: %s", event.Commit.Collection)
660
-
t.Logf(" Operation: %s", event.Commit.Operation)
661
-
t.Logf(" RKey: %s", event.Commit.RKey)
663
-
// Verify it's for our voter
664
-
assert.Equal(t, voterDID, event.Did, "Event should be from voter's repo")
666
-
// Verify vote was indexed in AppView database
667
-
t.Logf("\n🔍 Querying AppView database for indexed vote...")
669
-
indexedVote, err := voteRepo.GetByVoterAndSubject(ctx, voterDID, postURI)
670
-
require.NoError(t, err, "Vote should be indexed in AppView")
672
-
t.Logf("✅ Vote indexed in AppView:")
673
-
t.Logf(" URI: %s", indexedVote.URI)
674
-
t.Logf(" CID: %s", indexedVote.CID)
675
-
t.Logf(" Voter DID: %s", indexedVote.VoterDID)
676
-
t.Logf(" Subject: %s", indexedVote.SubjectURI)
677
-
t.Logf(" Direction: %s", indexedVote.Direction)
679
-
// Verify all fields match
680
-
assert.Equal(t, voteURI, indexedVote.URI, "URI should match")
681
-
assert.Equal(t, voteCID, indexedVote.CID, "CID should match")
682
-
assert.Equal(t, voterDID, indexedVote.VoterDID, "Voter DID should match")
683
-
assert.Equal(t, postURI, indexedVote.SubjectURI, "Subject URI should match")
684
-
assert.Equal(t, "up", indexedVote.Direction, "Direction should be 'up'")
686
-
// Verify post counts were updated
687
-
t.Logf("\n🔍 Verifying post vote counts updated...")
688
-
updatedPost, err := postRepo.GetByURI(ctx, postURI)
689
-
require.NoError(t, err, "Post should exist")
691
-
t.Logf("✅ Post vote counts updated:")
692
-
t.Logf(" Upvotes: %d", updatedPost.UpvoteCount)
693
-
t.Logf(" Downvotes: %d", updatedPost.DownvoteCount)
694
-
t.Logf(" Score: %d", updatedPost.Score)
696
-
assert.Equal(t, 1, updatedPost.UpvoteCount, "Upvote count should be 1")
697
-
assert.Equal(t, 0, updatedPost.DownvoteCount, "Downvote count should be 0")
698
-
assert.Equal(t, 1, updatedPost.Score, "Score should be 1")
700
-
// Signal to stop Jetstream consumer
703
-
t.Log("\n✅ TRUE E2E COMPLETE: PDS → Jetstream → Consumer → AppView ✓")
705
-
case err := <-errorChan:
706
-
t.Fatalf("❌ Jetstream error: %v", err)
708
-
case <-time.After(30 * time.Second):
709
-
t.Fatalf("❌ Timeout: No Jetstream event received within 30 seconds")
715
-
// subscribeToJetstreamForVote subscribes to real Jetstream firehose and processes vote events
716
-
// This helper creates a WebSocket connection to Jetstream and waits for vote events
717
-
func subscribeToJetstreamForVote(
718
-
ctx context.Context,
719
-
jetstreamURL string,
720
-
targetVoterDID string,
721
-
targetSubjectURI string,
722
-
consumer *jetstream.VoteEventConsumer,
723
-
eventChan chan<- *jetstream.JetstreamEvent,
724
-
errorChan chan<- error,
727
-
conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil)
729
-
return fmt.Errorf("failed to connect to Jetstream: %w", err)
731
-
defer func() { _ = conn.Close() }()
733
-
// Read messages until we find our event or receive done signal
741
-
// Set read deadline to avoid blocking forever
742
-
if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
743
-
return fmt.Errorf("failed to set read deadline: %w", err)
746
-
var event jetstream.JetstreamEvent
747
-
err := conn.ReadJSON(&event)
749
-
// Check if it's a timeout (expected)
750
-
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
753
-
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
754
-
continue // Timeout is expected, keep listening
756
-
return fmt.Errorf("failed to read Jetstream message: %w", err)
759
-
// Check if this is a vote event for the target voter + subject
760
-
if event.Did == targetVoterDID && event.Kind == "commit" &&
761
-
event.Commit != nil && event.Commit.Collection == "social.coves.interaction.vote" {
763
-
// Verify it's for the target subject
764
-
record := event.Commit.Record
765
-
if subject, ok := record["subject"].(map[string]interface{}); ok {
766
-
if subjectURI, ok := subject["uri"].(string); ok && subjectURI == targetSubjectURI {
767
-
// This is our vote! Process it
768
-
if err := consumer.HandleEvent(ctx, &event); err != nil {
769
-
return fmt.Errorf("failed to process event: %w", err)
772
-
// Send to channel so test can verify
774
-
case eventChan <- &event:
776
-
case <-time.After(1 * time.Second):
777
-
return fmt.Errorf("timeout sending event to channel")
787
-
func stringPtr(s string) *string {