···
4
+
"Coves/internal/api/routes"
5
+
"Coves/internal/atproto/jetstream"
6
+
"Coves/internal/atproto/utils"
7
+
"Coves/internal/core/votes"
8
+
"Coves/internal/db/postgres"
23
+
"github.com/go-chi/chi/v5"
24
+
"github.com/gorilla/websocket"
25
+
_ "github.com/lib/pq"
26
+
"github.com/pressly/goose/v3"
29
+
// TestVoteE2E_CreateUpvote tests the full vote creation flow with a real local PDS
30
+
// Flow: Client → XRPC → PDS Write → Jetstream → Consumer → AppView
31
+
func TestVoteE2E_CreateUpvote(t *testing.T) {
32
+
// Skip in short mode since this requires real PDS
33
+
if testing.Short() {
34
+
t.Skip("Skipping E2E test in short mode")
37
+
// Setup test database
38
+
dbURL := os.Getenv("TEST_DATABASE_URL")
40
+
dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
43
+
db, err := sql.Open("postgres", dbURL)
45
+
t.Fatalf("Failed to connect to test database: %v", err)
48
+
if closeErr := db.Close(); closeErr != nil {
49
+
t.Logf("Failed to close database: %v", closeErr)
54
+
if dialectErr := goose.SetDialect("postgres"); dialectErr != nil {
55
+
t.Fatalf("Failed to set goose dialect: %v", dialectErr)
57
+
if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil {
58
+
t.Fatalf("Failed to run migrations: %v", migrateErr)
61
+
// Check if PDS is running
62
+
pdsURL := os.Getenv("PDS_URL")
64
+
pdsURL = "http://localhost:3001"
67
+
healthResp, err := http.Get(pdsURL + "/xrpc/_health")
69
+
t.Skipf("PDS not running at %s: %v", pdsURL, err)
72
+
if closeErr := healthResp.Body.Close(); closeErr != nil {
73
+
t.Logf("Failed to close health response: %v", closeErr)
77
+
ctx := context.Background()
79
+
// Setup repositories
80
+
voteRepo := postgres.NewVoteRepository(db)
81
+
postRepo := postgres.NewPostRepository(db)
83
+
// Setup OAuth client and store for vote service
84
+
oauthStore := SetupOAuthTestStore(t, db)
85
+
oauthClient := SetupOAuthTestClient(t, oauthStore)
88
+
voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
90
+
// Create test user on PDS
91
+
testUserHandle := fmt.Sprintf("voter-%d.local.coves.dev", time.Now().Unix())
92
+
testUserEmail := fmt.Sprintf("voter-%d@test.local", time.Now().Unix())
93
+
testUserPassword := "test-password-123"
95
+
t.Logf("Creating test user on PDS: %s", testUserHandle)
96
+
pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
98
+
t.Fatalf("Failed to create test user on PDS: %v", err)
100
+
t.Logf("Test user created: DID=%s", userDID)
102
+
// Index user in AppView
103
+
testUser := createTestUser(t, db, testUserHandle, userDID)
105
+
// Create test post to vote on
106
+
testCommunityDID, err := createFeedTestCommunity(db, ctx, "test-community", "owner.test")
108
+
t.Fatalf("Failed to create test community: %v", err)
111
+
postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
112
+
postCID := "bafypost123"
114
+
// Setup OAuth middleware with real PDS access token
115
+
e2eAuth := NewE2EOAuthMiddleware()
116
+
token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL)
118
+
// Setup HTTP server with XRPC routes
119
+
r := chi.NewRouter()
120
+
routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware)
121
+
httpServer := httptest.NewServer(r)
122
+
defer httpServer.Close()
124
+
// Setup Jetstream consumer
125
+
voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
127
+
// ====================================================================================
128
+
// TEST: Create upvote on post
129
+
// ====================================================================================
130
+
t.Logf("\n📝 Creating upvote via XRPC endpoint...")
132
+
voteReq := map[string]interface{}{
133
+
"subject": map[string]interface{}{
140
+
reqBody, marshalErr := json.Marshal(voteReq)
141
+
if marshalErr != nil {
142
+
t.Fatalf("Failed to marshal request: %v", marshalErr)
145
+
req, err := http.NewRequest(http.MethodPost,
146
+
httpServer.URL+"/xrpc/social.coves.feed.vote.create",
147
+
bytes.NewBuffer(reqBody))
149
+
t.Fatalf("Failed to create request: %v", err)
151
+
req.Header.Set("Content-Type", "application/json")
152
+
req.Header.Set("Authorization", "Bearer "+token)
154
+
resp, err := http.DefaultClient.Do(req)
156
+
t.Fatalf("Failed to POST vote: %v", err)
158
+
defer func() { _ = resp.Body.Close() }()
160
+
if resp.StatusCode != http.StatusOK {
161
+
body, readErr := io.ReadAll(resp.Body)
162
+
if readErr != nil {
163
+
t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr)
165
+
t.Logf("XRPC Vote Failed")
166
+
t.Logf(" Status: %d", resp.StatusCode)
167
+
t.Logf(" Response: %s", string(body))
168
+
t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body))
171
+
var voteResp struct {
172
+
URI string `json:"uri"`
173
+
CID string `json:"cid"`
176
+
if err := json.NewDecoder(resp.Body).Decode(&voteResp); err != nil {
177
+
t.Fatalf("Failed to decode vote response: %v", err)
180
+
t.Logf("✅ XRPC response received:")
181
+
t.Logf(" URI: %s", voteResp.URI)
182
+
t.Logf(" CID: %s", voteResp.CID)
184
+
// Verify vote record was written to PDS
185
+
t.Logf("\n🔍 Verifying vote record on PDS...")
186
+
rkey := utils.ExtractRKeyFromURI(voteResp.URI)
187
+
collection := "social.coves.feed.vote"
189
+
pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s",
190
+
pdsURL, userDID, collection, rkey))
192
+
t.Fatalf("Failed to fetch vote record from PDS: %v", pdsErr)
195
+
if closeErr := pdsResp.Body.Close(); closeErr != nil {
196
+
t.Logf("Failed to close PDS response: %v", closeErr)
200
+
if pdsResp.StatusCode != http.StatusOK {
201
+
body, _ := io.ReadAll(pdsResp.Body)
202
+
t.Fatalf("Vote record not found on PDS: status %d, body: %s", pdsResp.StatusCode, string(body))
205
+
var pdsRecord struct {
206
+
Value map[string]interface{} `json:"value"`
207
+
CID string `json:"cid"`
209
+
if decodeErr := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); decodeErr != nil {
210
+
t.Fatalf("Failed to decode PDS record: %v", decodeErr)
213
+
t.Logf("✅ Vote record found on PDS:")
214
+
t.Logf(" CID: %s", pdsRecord.CID)
215
+
t.Logf(" Direction: %v", pdsRecord.Value["direction"])
217
+
// Verify direction
218
+
if pdsRecord.Value["direction"] != "up" {
219
+
t.Errorf("Expected direction 'up', got %v", pdsRecord.Value["direction"])
222
+
// Simulate Jetstream consumer indexing the vote
223
+
t.Logf("\n🔄 Simulating Jetstream consumer indexing vote...")
224
+
voteEvent := jetstream.JetstreamEvent{
226
+
TimeUS: time.Now().UnixMicro(),
228
+
Commit: &jetstream.CommitEvent{
229
+
Rev: "test-vote-rev",
230
+
Operation: "create",
231
+
Collection: "social.coves.feed.vote",
233
+
CID: pdsRecord.CID,
234
+
Record: map[string]interface{}{
235
+
"$type": "social.coves.feed.vote",
236
+
"subject": map[string]interface{}{
241
+
"createdAt": time.Now().Format(time.RFC3339),
246
+
if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil {
247
+
t.Fatalf("Failed to handle vote event: %v", handleErr)
250
+
// Verify vote was indexed in AppView
251
+
t.Logf("\n🔍 Verifying vote indexed in AppView...")
252
+
indexedVote, err := voteRepo.GetByURI(ctx, voteResp.URI)
254
+
t.Fatalf("Vote not indexed in AppView: %v", err)
257
+
t.Logf("✅ Vote indexed in AppView:")
258
+
t.Logf(" VoterDID: %s", indexedVote.VoterDID)
259
+
t.Logf(" SubjectURI: %s", indexedVote.SubjectURI)
260
+
t.Logf(" Direction: %s", indexedVote.Direction)
261
+
t.Logf(" URI: %s", indexedVote.URI)
263
+
// Verify vote details
264
+
if indexedVote.VoterDID != userDID {
265
+
t.Errorf("Expected voter_did %s, got %s", userDID, indexedVote.VoterDID)
267
+
if indexedVote.SubjectURI != postURI {
268
+
t.Errorf("Expected subject_uri %s, got %s", postURI, indexedVote.SubjectURI)
270
+
if indexedVote.Direction != "up" {
271
+
t.Errorf("Expected direction 'up', got %s", indexedVote.Direction)
274
+
// Verify post counts updated
275
+
t.Logf("\n🔍 Verifying post vote counts updated...")
276
+
updatedPost, err := postRepo.GetByURI(ctx, postURI)
278
+
t.Fatalf("Failed to get updated post: %v", err)
281
+
if updatedPost.UpvoteCount != 1 {
282
+
t.Errorf("Expected upvote_count = 1, got %d", updatedPost.UpvoteCount)
284
+
if updatedPost.Score != 1 {
285
+
t.Errorf("Expected score = 1, got %d", updatedPost.Score)
288
+
t.Logf("✅ TRUE E2E UPVOTE FLOW COMPLETE:")
289
+
t.Logf(" Client → XRPC → PDS Write → Jetstream → Consumer → AppView ✓")
290
+
t.Logf(" ✓ Vote written to PDS")
291
+
t.Logf(" ✓ Vote indexed in AppView")
292
+
t.Logf(" ✓ Post vote counts updated")
295
+
// TestVoteE2E_ToggleSameDirection tests voting twice in same direction (toggle off)
296
+
func TestVoteE2E_ToggleSameDirection(t *testing.T) {
297
+
if testing.Short() {
298
+
t.Skip("Skipping E2E test in short mode")
301
+
db := setupTestDB(t)
302
+
defer func() { _ = db.Close() }()
304
+
ctx := context.Background()
305
+
pdsURL := getTestPDSURL()
307
+
// Setup repositories and services
308
+
voteRepo := postgres.NewVoteRepository(db)
309
+
postRepo := postgres.NewPostRepository(db)
311
+
oauthStore := SetupOAuthTestStore(t, db)
312
+
oauthClient := SetupOAuthTestClient(t, oauthStore)
313
+
voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
315
+
// Create test user
316
+
testUserHandle := fmt.Sprintf("toggle-%d.local.coves.dev", time.Now().Unix())
317
+
testUserEmail := fmt.Sprintf("toggle-%d@test.local", time.Now().Unix())
318
+
testUserPassword := "test-password-123"
320
+
pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
322
+
t.Skipf("PDS not available: %v", err)
325
+
testUser := createTestUser(t, db, testUserHandle, userDID)
327
+
// Create test post
328
+
testCommunityDID, _ := createFeedTestCommunity(db, ctx, "toggle-community", "owner.test")
329
+
postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
330
+
postCID := "bafypost456"
332
+
// Setup OAuth and HTTP server with real PDS access token
333
+
e2eAuth := NewE2EOAuthMiddleware()
334
+
token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL)
336
+
r := chi.NewRouter()
337
+
routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware)
338
+
httpServer := httptest.NewServer(r)
339
+
defer httpServer.Close()
341
+
voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
344
+
t.Logf("\n📝 Creating first upvote...")
345
+
voteReq := map[string]interface{}{
346
+
"subject": map[string]interface{}{
353
+
reqBody, _ := json.Marshal(voteReq)
354
+
req, _ := http.NewRequest(http.MethodPost,
355
+
httpServer.URL+"/xrpc/social.coves.feed.vote.create",
356
+
bytes.NewBuffer(reqBody))
357
+
req.Header.Set("Content-Type", "application/json")
358
+
req.Header.Set("Authorization", "Bearer "+token)
360
+
resp, err := http.DefaultClient.Do(req)
362
+
t.Fatalf("Failed to create first vote: %v", err)
365
+
var firstVoteResp struct {
366
+
URI string `json:"uri"`
367
+
CID string `json:"cid"`
369
+
json.NewDecoder(resp.Body).Decode(&firstVoteResp)
372
+
t.Logf("✅ First vote created: %s", firstVoteResp.URI)
374
+
// Index first vote
375
+
rkey := utils.ExtractRKeyFromURI(firstVoteResp.URI)
376
+
voteEvent := jetstream.JetstreamEvent{
378
+
TimeUS: time.Now().UnixMicro(),
380
+
Commit: &jetstream.CommitEvent{
381
+
Rev: "test-vote-rev-1",
382
+
Operation: "create",
383
+
Collection: "social.coves.feed.vote",
385
+
CID: firstVoteResp.CID,
386
+
Record: map[string]interface{}{
387
+
"$type": "social.coves.feed.vote",
388
+
"subject": map[string]interface{}{
393
+
"createdAt": time.Now().Format(time.RFC3339),
397
+
voteConsumer.HandleEvent(ctx, &voteEvent)
399
+
// Second upvote (same direction) - should toggle off (delete)
400
+
t.Logf("\n📝 Creating second upvote (toggle off)...")
401
+
req2, _ := http.NewRequest(http.MethodPost,
402
+
httpServer.URL+"/xrpc/social.coves.feed.vote.create",
403
+
bytes.NewBuffer(reqBody))
404
+
req2.Header.Set("Content-Type", "application/json")
405
+
req2.Header.Set("Authorization", "Bearer "+token)
407
+
resp2, err := http.DefaultClient.Do(req2)
409
+
t.Fatalf("Failed to toggle vote: %v", err)
411
+
defer resp2.Body.Close()
413
+
if resp2.StatusCode != http.StatusOK {
414
+
body, _ := io.ReadAll(resp2.Body)
415
+
t.Fatalf("Expected 200, got %d: %s", resp2.StatusCode, string(body))
418
+
t.Logf("✅ Second vote request completed (toggle)")
420
+
// Simulate Jetstream DELETE event
421
+
t.Logf("\n🔄 Simulating Jetstream DELETE event...")
422
+
deleteEvent := jetstream.JetstreamEvent{
424
+
TimeUS: time.Now().UnixMicro(),
426
+
Commit: &jetstream.CommitEvent{
427
+
Rev: "test-vote-rev-2",
428
+
Operation: "delete",
429
+
Collection: "social.coves.feed.vote",
433
+
voteConsumer.HandleEvent(ctx, &deleteEvent)
435
+
// Verify vote was removed from AppView
436
+
t.Logf("\n🔍 Verifying vote removed from AppView...")
437
+
_, err = voteRepo.GetByURI(ctx, firstVoteResp.URI)
439
+
t.Error("Expected vote to be deleted, but it still exists")
442
+
// Verify post counts reset
443
+
updatedPost, _ := postRepo.GetByURI(ctx, postURI)
444
+
if updatedPost.UpvoteCount != 0 {
445
+
t.Errorf("Expected upvote_count = 0 after toggle, got %d", updatedPost.UpvoteCount)
448
+
t.Logf("✅ TOGGLE SAME DIRECTION FLOW COMPLETE:")
449
+
t.Logf(" ✓ First vote created and indexed")
450
+
t.Logf(" ✓ Second vote toggled off (deleted)")
451
+
t.Logf(" ✓ Post counts updated correctly")
454
+
// TestVoteE2E_ToggleDifferentDirection tests changing vote direction
455
+
func TestVoteE2E_ToggleDifferentDirection(t *testing.T) {
456
+
if testing.Short() {
457
+
t.Skip("Skipping E2E test in short mode")
460
+
db := setupTestDB(t)
461
+
defer func() { _ = db.Close() }()
463
+
ctx := context.Background()
464
+
pdsURL := getTestPDSURL()
466
+
// Setup repositories and services
467
+
voteRepo := postgres.NewVoteRepository(db)
468
+
postRepo := postgres.NewPostRepository(db)
470
+
oauthStore := SetupOAuthTestStore(t, db)
471
+
oauthClient := SetupOAuthTestClient(t, oauthStore)
472
+
voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
474
+
// Create test user
475
+
testUserHandle := fmt.Sprintf("flip-%d.local.coves.dev", time.Now().Unix())
476
+
testUserEmail := fmt.Sprintf("flip-%d@test.local", time.Now().Unix())
477
+
testUserPassword := "test-password-123"
479
+
pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
481
+
t.Skipf("PDS not available: %v", err)
484
+
testUser := createTestUser(t, db, testUserHandle, userDID)
486
+
// Create test post
487
+
testCommunityDID, _ := createFeedTestCommunity(db, ctx, "flip-community", "owner.test")
488
+
postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
489
+
postCID := "bafypost789"
491
+
// Setup OAuth and HTTP server with real PDS access token
492
+
e2eAuth := NewE2EOAuthMiddleware()
493
+
token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL)
495
+
r := chi.NewRouter()
496
+
routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware)
497
+
httpServer := httptest.NewServer(r)
498
+
defer httpServer.Close()
500
+
voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
503
+
t.Logf("\n📝 Creating upvote...")
504
+
upvoteReq := map[string]interface{}{
505
+
"subject": map[string]interface{}{
512
+
reqBody, _ := json.Marshal(upvoteReq)
513
+
req, _ := http.NewRequest(http.MethodPost,
514
+
httpServer.URL+"/xrpc/social.coves.feed.vote.create",
515
+
bytes.NewBuffer(reqBody))
516
+
req.Header.Set("Content-Type", "application/json")
517
+
req.Header.Set("Authorization", "Bearer "+token)
519
+
resp, _ := http.DefaultClient.Do(req)
520
+
var upvoteResp struct {
521
+
URI string `json:"uri"`
522
+
CID string `json:"cid"`
524
+
json.NewDecoder(resp.Body).Decode(&upvoteResp)
528
+
rkey := utils.ExtractRKeyFromURI(upvoteResp.URI)
529
+
upvoteEvent := jetstream.JetstreamEvent{
531
+
TimeUS: time.Now().UnixMicro(),
533
+
Commit: &jetstream.CommitEvent{
534
+
Rev: "test-vote-rev-up",
535
+
Operation: "create",
536
+
Collection: "social.coves.feed.vote",
538
+
CID: upvoteResp.CID,
539
+
Record: map[string]interface{}{
540
+
"$type": "social.coves.feed.vote",
541
+
"subject": map[string]interface{}{
546
+
"createdAt": time.Now().Format(time.RFC3339),
550
+
voteConsumer.HandleEvent(ctx, &upvoteEvent)
552
+
t.Logf("✅ Upvote created and indexed")
554
+
// Change to downvote
555
+
t.Logf("\n📝 Changing to downvote...")
556
+
downvoteReq := map[string]interface{}{
557
+
"subject": map[string]interface{}{
561
+
"direction": "down",
564
+
reqBody2, _ := json.Marshal(downvoteReq)
565
+
req2, _ := http.NewRequest(http.MethodPost,
566
+
httpServer.URL+"/xrpc/social.coves.feed.vote.create",
567
+
bytes.NewBuffer(reqBody2))
568
+
req2.Header.Set("Content-Type", "application/json")
569
+
req2.Header.Set("Authorization", "Bearer "+token)
571
+
resp2, _ := http.DefaultClient.Do(req2)
572
+
var downvoteResp struct {
573
+
URI string `json:"uri"`
574
+
CID string `json:"cid"`
576
+
json.NewDecoder(resp2.Body).Decode(&downvoteResp)
579
+
// Simulate Jetstream UPDATE event (PDS updates the existing record)
580
+
t.Logf("\n🔄 Simulating Jetstream UPDATE event...")
581
+
updateEvent := jetstream.JetstreamEvent{
583
+
TimeUS: time.Now().UnixMicro(),
585
+
Commit: &jetstream.CommitEvent{
586
+
Rev: "test-vote-rev-down",
587
+
Operation: "update",
588
+
Collection: "social.coves.feed.vote",
589
+
RKey: rkey, // Same rkey as before
590
+
CID: downvoteResp.CID,
591
+
Record: map[string]interface{}{
592
+
"$type": "social.coves.feed.vote",
593
+
"subject": map[string]interface{}{
597
+
"direction": "down", // Changed direction
598
+
"createdAt": time.Now().Format(time.RFC3339),
602
+
voteConsumer.HandleEvent(ctx, &updateEvent)
604
+
// Verify vote direction changed in AppView
605
+
t.Logf("\n🔍 Verifying vote direction changed in AppView...")
606
+
updatedVote, err := voteRepo.GetByURI(ctx, upvoteResp.URI)
608
+
t.Fatalf("Vote not found after update: %v", err)
611
+
if updatedVote.Direction != "down" {
612
+
t.Errorf("Expected direction 'down', got %s", updatedVote.Direction)
615
+
// Verify post counts updated
616
+
updatedPost, _ := postRepo.GetByURI(ctx, postURI)
617
+
if updatedPost.UpvoteCount != 0 {
618
+
t.Errorf("Expected upvote_count = 0, got %d", updatedPost.UpvoteCount)
620
+
if updatedPost.DownvoteCount != 1 {
621
+
t.Errorf("Expected downvote_count = 1, got %d", updatedPost.DownvoteCount)
623
+
if updatedPost.Score != -1 {
624
+
t.Errorf("Expected score = -1, got %d", updatedPost.Score)
627
+
t.Logf("✅ TOGGLE DIFFERENT DIRECTION FLOW COMPLETE:")
628
+
t.Logf(" ✓ Upvote created (score: +1)")
629
+
t.Logf(" ✓ Changed to downvote (score: -1)")
630
+
t.Logf(" ✓ Post counts updated correctly")
633
+
// TestVoteE2E_DeleteVote tests explicit vote deletion
634
+
func TestVoteE2E_DeleteVote(t *testing.T) {
635
+
if testing.Short() {
636
+
t.Skip("Skipping E2E test in short mode")
639
+
db := setupTestDB(t)
640
+
defer func() { _ = db.Close() }()
642
+
ctx := context.Background()
643
+
pdsURL := getTestPDSURL()
645
+
// Setup repositories and services
646
+
voteRepo := postgres.NewVoteRepository(db)
647
+
postRepo := postgres.NewPostRepository(db)
649
+
oauthStore := SetupOAuthTestStore(t, db)
650
+
oauthClient := SetupOAuthTestClient(t, oauthStore)
651
+
voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
653
+
// Create test user
654
+
testUserHandle := fmt.Sprintf("delete-%d.local.coves.dev", time.Now().Unix())
655
+
testUserEmail := fmt.Sprintf("delete-%d@test.local", time.Now().Unix())
656
+
testUserPassword := "test-password-123"
658
+
pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
660
+
t.Skipf("PDS not available: %v", err)
663
+
testUser := createTestUser(t, db, testUserHandle, userDID)
665
+
// Create test post
666
+
testCommunityDID, _ := createFeedTestCommunity(db, ctx, "delete-community", "owner.test")
667
+
postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
668
+
postCID := "bafypost999"
670
+
// Setup OAuth and HTTP server with real PDS access token
671
+
e2eAuth := NewE2EOAuthMiddleware()
672
+
token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL)
674
+
r := chi.NewRouter()
675
+
routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware)
676
+
httpServer := httptest.NewServer(r)
677
+
defer httpServer.Close()
679
+
voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
681
+
// Create vote first
682
+
t.Logf("\n📝 Creating vote to delete...")
683
+
voteReq := map[string]interface{}{
684
+
"subject": map[string]interface{}{
691
+
reqBody, _ := json.Marshal(voteReq)
692
+
req, _ := http.NewRequest(http.MethodPost,
693
+
httpServer.URL+"/xrpc/social.coves.feed.vote.create",
694
+
bytes.NewBuffer(reqBody))
695
+
req.Header.Set("Content-Type", "application/json")
696
+
req.Header.Set("Authorization", "Bearer "+token)
698
+
resp, _ := http.DefaultClient.Do(req)
699
+
var voteResp struct {
700
+
URI string `json:"uri"`
701
+
CID string `json:"cid"`
703
+
json.NewDecoder(resp.Body).Decode(&voteResp)
707
+
rkey := utils.ExtractRKeyFromURI(voteResp.URI)
708
+
voteEvent := jetstream.JetstreamEvent{
710
+
TimeUS: time.Now().UnixMicro(),
712
+
Commit: &jetstream.CommitEvent{
713
+
Rev: "test-vote-create",
714
+
Operation: "create",
715
+
Collection: "social.coves.feed.vote",
718
+
Record: map[string]interface{}{
719
+
"$type": "social.coves.feed.vote",
720
+
"subject": map[string]interface{}{
725
+
"createdAt": time.Now().Format(time.RFC3339),
729
+
voteConsumer.HandleEvent(ctx, &voteEvent)
731
+
t.Logf("✅ Vote created and indexed")
733
+
// Delete vote via XRPC
734
+
t.Logf("\n📝 Deleting vote via XRPC...")
735
+
deleteReq := map[string]interface{}{
736
+
"subject": map[string]interface{}{
742
+
deleteBody, _ := json.Marshal(deleteReq)
743
+
deleteHttpReq, _ := http.NewRequest(http.MethodPost,
744
+
httpServer.URL+"/xrpc/social.coves.feed.vote.delete",
745
+
bytes.NewBuffer(deleteBody))
746
+
deleteHttpReq.Header.Set("Content-Type", "application/json")
747
+
deleteHttpReq.Header.Set("Authorization", "Bearer "+token)
749
+
deleteResp, _ := http.DefaultClient.Do(deleteHttpReq)
750
+
defer deleteResp.Body.Close()
752
+
if deleteResp.StatusCode != http.StatusOK {
753
+
body, _ := io.ReadAll(deleteResp.Body)
754
+
t.Fatalf("Delete failed: status %d, body: %s", deleteResp.StatusCode, string(body))
757
+
// Per lexicon, delete returns empty object {}
758
+
var deleteRespBody map[string]interface{}
759
+
json.NewDecoder(deleteResp.Body).Decode(&deleteRespBody)
761
+
if len(deleteRespBody) != 0 {
762
+
t.Errorf("Expected empty object per lexicon, got %v", deleteRespBody)
765
+
t.Logf("✅ Delete vote request succeeded")
767
+
// Simulate Jetstream DELETE event
768
+
t.Logf("\n🔄 Simulating Jetstream DELETE event...")
769
+
deleteEvent := jetstream.JetstreamEvent{
771
+
TimeUS: time.Now().UnixMicro(),
773
+
Commit: &jetstream.CommitEvent{
774
+
Rev: "test-vote-delete",
775
+
Operation: "delete",
776
+
Collection: "social.coves.feed.vote",
780
+
voteConsumer.HandleEvent(ctx, &deleteEvent)
782
+
// Verify vote removed from AppView
783
+
t.Logf("\n🔍 Verifying vote removed from AppView...")
784
+
_, err = voteRepo.GetByURI(ctx, voteResp.URI)
786
+
t.Error("Expected vote to be deleted, but it still exists")
789
+
// Verify post counts reset
790
+
updatedPost, _ := postRepo.GetByURI(ctx, postURI)
791
+
if updatedPost.UpvoteCount != 0 {
792
+
t.Errorf("Expected upvote_count = 0 after delete, got %d", updatedPost.UpvoteCount)
794
+
if updatedPost.Score != 0 {
795
+
t.Errorf("Expected score = 0 after delete, got %d", updatedPost.Score)
798
+
t.Logf("✅ EXPLICIT DELETE FLOW COMPLETE:")
799
+
t.Logf(" ✓ Vote created and indexed")
800
+
t.Logf(" ✓ Vote deleted via XRPC")
801
+
t.Logf(" ✓ Vote removed from AppView")
802
+
t.Logf(" ✓ Post counts updated correctly")
805
+
// TestVoteE2E_JetstreamIndexing tests real Jetstream firehose consumption
806
+
func TestVoteE2E_JetstreamIndexing(t *testing.T) {
807
+
if testing.Short() {
808
+
t.Skip("Skipping E2E test in short mode")
811
+
db := setupTestDB(t)
812
+
defer func() { _ = db.Close() }()
814
+
ctx := context.Background()
815
+
pdsURL := getTestPDSURL()
817
+
// Setup repositories
818
+
voteRepo := postgres.NewVoteRepository(db)
820
+
// Create test user on PDS
821
+
testUserHandle := fmt.Sprintf("jetstream-%d.local.coves.dev", time.Now().Unix())
822
+
testUserEmail := fmt.Sprintf("jetstream-%d@test.local", time.Now().Unix())
823
+
testUserPassword := "test-password-123"
825
+
accessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
827
+
t.Skipf("PDS not available: %v", err)
830
+
testUser := createTestUser(t, db, testUserHandle, userDID)
832
+
// Create test post
833
+
testCommunityDID, _ := createFeedTestCommunity(db, ctx, "jetstream-community", "owner.test")
834
+
postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
835
+
postCID := "bafypostjetstream"
837
+
// Write vote directly to PDS
838
+
t.Logf("\n📝 Writing vote to PDS...")
839
+
voteRecord := map[string]interface{}{
840
+
"$type": "social.coves.feed.vote",
841
+
"subject": map[string]interface{}{
846
+
"createdAt": time.Now().Format(time.RFC3339),
849
+
voteURI, voteCID, err := writePDSRecord(pdsURL, accessToken, userDID, "social.coves.feed.vote", "", voteRecord)
851
+
t.Fatalf("Failed to write vote to PDS: %v", err)
854
+
t.Logf("✅ Vote written to PDS:")
855
+
t.Logf(" URI: %s", voteURI)
856
+
t.Logf(" CID: %s", voteCID)
858
+
// Setup Jetstream consumer
859
+
voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
861
+
// Subscribe to Jetstream
862
+
t.Logf("\n🔄 Subscribing to real Jetstream firehose...")
863
+
pdsHostname := strings.TrimPrefix(pdsURL, "http://")
864
+
pdsHostname = strings.TrimPrefix(pdsHostname, "https://")
865
+
pdsHostname = strings.Split(pdsHostname, ":")[0]
867
+
jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.feed.vote", pdsHostname)
868
+
t.Logf(" Jetstream URL: %s", jetstreamURL)
869
+
t.Logf(" Looking for vote DID: %s", userDID)
871
+
// Channels for event communication
872
+
eventChan := make(chan *jetstream.JetstreamEvent, 10)
873
+
errorChan := make(chan error, 1)
874
+
done := make(chan bool)
876
+
// Start Jetstream consumer in background
878
+
err := subscribeToJetstreamForVote(ctx, jetstreamURL, userDID, voteConsumer, eventChan, errorChan, done)
884
+
// Wait for event or timeout
885
+
t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...")
888
+
case event := <-eventChan:
889
+
t.Logf("✅ Received real Jetstream event!")
890
+
t.Logf(" Event DID: %s", event.Did)
891
+
t.Logf(" Collection: %s", event.Commit.Collection)
892
+
t.Logf(" Operation: %s", event.Commit.Operation)
893
+
t.Logf(" RKey: %s", event.Commit.RKey)
895
+
// Verify it's our vote
896
+
if event.Did != userDID {
897
+
t.Errorf("Expected DID %s, got %s", userDID, event.Did)
900
+
// Verify indexed in AppView database
901
+
t.Logf("\n🔍 Querying AppView database...")
902
+
indexedVote, err := voteRepo.GetByURI(ctx, voteURI)
904
+
t.Fatalf("Vote not indexed in AppView: %v", err)
907
+
t.Logf("✅ Vote indexed in AppView:")
908
+
t.Logf(" VoterDID: %s", indexedVote.VoterDID)
909
+
t.Logf(" SubjectURI: %s", indexedVote.SubjectURI)
910
+
t.Logf(" Direction: %s", indexedVote.Direction)
911
+
t.Logf(" URI: %s", indexedVote.URI)
913
+
// Signal to stop Jetstream consumer
916
+
case err := <-errorChan:
917
+
t.Fatalf("Jetstream error: %v", err)
919
+
case <-time.After(30 * time.Second):
920
+
t.Fatalf("Timeout: No Jetstream event received within 30 seconds")
923
+
t.Logf("\n✅ TRUE E2E JETSTREAM FLOW COMPLETE:")
924
+
t.Logf(" PDS → Jetstream → Consumer → AppView ✓")
927
+
// subscribeToJetstreamForVote subscribes to real Jetstream firehose for vote events
928
+
func subscribeToJetstreamForVote(
929
+
ctx context.Context,
930
+
jetstreamURL string,
932
+
consumer *jetstream.VoteEventConsumer,
933
+
eventChan chan<- *jetstream.JetstreamEvent,
934
+
errorChan chan<- error,
937
+
conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil)
939
+
return fmt.Errorf("failed to connect to Jetstream: %w", err)
941
+
defer func() { _ = conn.Close() }()
943
+
// Read messages until we find our event or receive done signal
951
+
// Set read deadline to avoid blocking forever
952
+
if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
953
+
return fmt.Errorf("failed to set read deadline: %w", err)
956
+
var event jetstream.JetstreamEvent
957
+
err := conn.ReadJSON(&event)
959
+
// Check if it's a timeout (expected)
960
+
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
963
+
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
964
+
continue // Timeout is expected, keep listening
966
+
return fmt.Errorf("failed to read Jetstream message: %w", err)
969
+
// Check if this is the event we're looking for
970
+
if event.Did == targetDID && event.Kind == "commit" && event.Commit.Collection == "social.coves.feed.vote" {
971
+
// Process the event through the consumer
972
+
if err := consumer.HandleEvent(ctx, &event); err != nil {
973
+
return fmt.Errorf("failed to process event: %w", err)
976
+
// Send to channel so test can verify
978
+
case eventChan <- &event:
980
+
case <-time.After(1 * time.Second):
981
+
return fmt.Errorf("timeout sending event to channel")