A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/api/routes"
5 "Coves/internal/atproto/jetstream"
6 "Coves/internal/atproto/utils"
7 "Coves/internal/core/votes"
8 "Coves/internal/db/postgres"
9 "bytes"
10 "context"
11 "database/sql"
12 "encoding/json"
13 "fmt"
14 "io"
15 "net"
16 "net/http"
17 "net/http/httptest"
18 "os"
19 "strings"
20 "testing"
21 "time"
22
23 "github.com/go-chi/chi/v5"
24 "github.com/gorilla/websocket"
25 _ "github.com/lib/pq"
26 "github.com/pressly/goose/v3"
27)
28
29// TestVoteE2E_CreateUpvote tests the full vote creation flow with a real local PDS
30// Flow: Client → XRPC → PDS Write → Jetstream → Consumer → AppView
31func TestVoteE2E_CreateUpvote(t *testing.T) {
32 // Skip in short mode since this requires real PDS
33 if testing.Short() {
34 t.Skip("Skipping E2E test in short mode")
35 }
36
37 // Setup test database
38 dbURL := os.Getenv("TEST_DATABASE_URL")
39 if dbURL == "" {
40 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
41 }
42
43 db, err := sql.Open("postgres", dbURL)
44 if err != nil {
45 t.Fatalf("Failed to connect to test database: %v", err)
46 }
47 defer func() {
48 if closeErr := db.Close(); closeErr != nil {
49 t.Logf("Failed to close database: %v", closeErr)
50 }
51 }()
52
53 // Run migrations
54 if dialectErr := goose.SetDialect("postgres"); dialectErr != nil {
55 t.Fatalf("Failed to set goose dialect: %v", dialectErr)
56 }
57 if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil {
58 t.Fatalf("Failed to run migrations: %v", migrateErr)
59 }
60
61 // Check if PDS is running
62 pdsURL := os.Getenv("PDS_URL")
63 if pdsURL == "" {
64 pdsURL = "http://localhost:3001"
65 }
66
67 healthResp, err := http.Get(pdsURL + "/xrpc/_health")
68 if err != nil {
69 t.Skipf("PDS not running at %s: %v", pdsURL, err)
70 }
71 func() {
72 if closeErr := healthResp.Body.Close(); closeErr != nil {
73 t.Logf("Failed to close health response: %v", closeErr)
74 }
75 }()
76
77 ctx := context.Background()
78
79 // Setup repositories
80 voteRepo := postgres.NewVoteRepository(db)
81 postRepo := postgres.NewPostRepository(db)
82
83 // Setup OAuth client and store for vote service
84 oauthStore := SetupOAuthTestStore(t, db)
85 oauthClient := SetupOAuthTestClient(t, oauthStore)
86
87 // Setup services
88 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
89
90 // Create test user on PDS
91 testUserHandle := fmt.Sprintf("voter-%d.local.coves.dev", time.Now().Unix())
92 testUserEmail := fmt.Sprintf("voter-%d@test.local", time.Now().Unix())
93 testUserPassword := "test-password-123"
94
95 t.Logf("Creating test user on PDS: %s", testUserHandle)
96 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
97 if err != nil {
98 t.Fatalf("Failed to create test user on PDS: %v", err)
99 }
100 t.Logf("Test user created: DID=%s", userDID)
101
102 // Index user in AppView
103 testUser := createTestUser(t, db, testUserHandle, userDID)
104
105 // Create test post to vote on
106 testCommunityDID, err := createFeedTestCommunity(db, ctx, "test-community", "owner.test")
107 if err != nil {
108 t.Fatalf("Failed to create test community: %v", err)
109 }
110
111 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
112 postCID := "bafypost123"
113
114 // Setup OAuth middleware with real PDS access token
115 e2eAuth := NewE2EOAuthMiddleware()
116 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL)
117
118 // Setup HTTP server with XRPC routes
119 r := chi.NewRouter()
120 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware)
121 httpServer := httptest.NewServer(r)
122 defer httpServer.Close()
123
124 // Setup Jetstream consumer
125 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
126
127 // ====================================================================================
128 // TEST: Create upvote on post
129 // ====================================================================================
130 t.Logf("\n📝 Creating upvote via XRPC endpoint...")
131
132 voteReq := map[string]interface{}{
133 "subject": map[string]interface{}{
134 "uri": postURI,
135 "cid": postCID,
136 },
137 "direction": "up",
138 }
139
140 reqBody, marshalErr := json.Marshal(voteReq)
141 if marshalErr != nil {
142 t.Fatalf("Failed to marshal request: %v", marshalErr)
143 }
144
145 req, err := http.NewRequest(http.MethodPost,
146 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
147 bytes.NewBuffer(reqBody))
148 if err != nil {
149 t.Fatalf("Failed to create request: %v", err)
150 }
151 req.Header.Set("Content-Type", "application/json")
152 req.Header.Set("Authorization", "Bearer "+token)
153
154 resp, err := http.DefaultClient.Do(req)
155 if err != nil {
156 t.Fatalf("Failed to POST vote: %v", err)
157 }
158 defer func() { _ = resp.Body.Close() }()
159
160 if resp.StatusCode != http.StatusOK {
161 body, readErr := io.ReadAll(resp.Body)
162 if readErr != nil {
163 t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr)
164 }
165 t.Logf("XRPC Vote Failed")
166 t.Logf(" Status: %d", resp.StatusCode)
167 t.Logf(" Response: %s", string(body))
168 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body))
169 }
170
171 var voteResp struct {
172 URI string `json:"uri"`
173 CID string `json:"cid"`
174 }
175
176 if decodeErr := json.NewDecoder(resp.Body).Decode(&voteResp); decodeErr != nil {
177 t.Fatalf("Failed to decode vote response: %v", decodeErr)
178 }
179
180 t.Logf("✅ XRPC response received:")
181 t.Logf(" URI: %s", voteResp.URI)
182 t.Logf(" CID: %s", voteResp.CID)
183
184 // Verify vote record was written to PDS
185 t.Logf("\n🔍 Verifying vote record on PDS...")
186 rkey := utils.ExtractRKeyFromURI(voteResp.URI)
187 collection := "social.coves.feed.vote"
188
189 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s",
190 pdsURL, userDID, collection, rkey))
191 if pdsErr != nil {
192 t.Fatalf("Failed to fetch vote record from PDS: %v", pdsErr)
193 }
194 defer func() {
195 if closeErr := pdsResp.Body.Close(); closeErr != nil {
196 t.Logf("Failed to close PDS response: %v", closeErr)
197 }
198 }()
199
200 if pdsResp.StatusCode != http.StatusOK {
201 body, _ := io.ReadAll(pdsResp.Body)
202 t.Fatalf("Vote record not found on PDS: status %d, body: %s", pdsResp.StatusCode, string(body))
203 }
204
205 var pdsRecord struct {
206 Value map[string]interface{} `json:"value"`
207 CID string `json:"cid"`
208 }
209 if decodeErr := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); decodeErr != nil {
210 t.Fatalf("Failed to decode PDS record: %v", decodeErr)
211 }
212
213 t.Logf("✅ Vote record found on PDS:")
214 t.Logf(" CID: %s", pdsRecord.CID)
215 t.Logf(" Direction: %v", pdsRecord.Value["direction"])
216
217 // Verify direction
218 if pdsRecord.Value["direction"] != "up" {
219 t.Errorf("Expected direction 'up', got %v", pdsRecord.Value["direction"])
220 }
221
222 // Simulate Jetstream consumer indexing the vote
223 t.Logf("\n🔄 Simulating Jetstream consumer indexing vote...")
224 voteEvent := jetstream.JetstreamEvent{
225 Did: userDID,
226 TimeUS: time.Now().UnixMicro(),
227 Kind: "commit",
228 Commit: &jetstream.CommitEvent{
229 Rev: "test-vote-rev",
230 Operation: "create",
231 Collection: "social.coves.feed.vote",
232 RKey: rkey,
233 CID: pdsRecord.CID,
234 Record: map[string]interface{}{
235 "$type": "social.coves.feed.vote",
236 "subject": map[string]interface{}{
237 "uri": postURI,
238 "cid": postCID,
239 },
240 "direction": "up",
241 "createdAt": time.Now().Format(time.RFC3339),
242 },
243 },
244 }
245
246 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil {
247 t.Fatalf("Failed to handle vote event: %v", handleErr)
248 }
249
250 // Verify vote was indexed in AppView
251 t.Logf("\n🔍 Verifying vote indexed in AppView...")
252 indexedVote, err := voteRepo.GetByURI(ctx, voteResp.URI)
253 if err != nil {
254 t.Fatalf("Vote not indexed in AppView: %v", err)
255 }
256
257 t.Logf("✅ Vote indexed in AppView:")
258 t.Logf(" VoterDID: %s", indexedVote.VoterDID)
259 t.Logf(" SubjectURI: %s", indexedVote.SubjectURI)
260 t.Logf(" Direction: %s", indexedVote.Direction)
261 t.Logf(" URI: %s", indexedVote.URI)
262
263 // Verify vote details
264 if indexedVote.VoterDID != userDID {
265 t.Errorf("Expected voter_did %s, got %s", userDID, indexedVote.VoterDID)
266 }
267 if indexedVote.SubjectURI != postURI {
268 t.Errorf("Expected subject_uri %s, got %s", postURI, indexedVote.SubjectURI)
269 }
270 if indexedVote.Direction != "up" {
271 t.Errorf("Expected direction 'up', got %s", indexedVote.Direction)
272 }
273
274 // Verify post counts updated
275 t.Logf("\n🔍 Verifying post vote counts updated...")
276 updatedPost, err := postRepo.GetByURI(ctx, postURI)
277 if err != nil {
278 t.Fatalf("Failed to get updated post: %v", err)
279 }
280
281 if updatedPost.UpvoteCount != 1 {
282 t.Errorf("Expected upvote_count = 1, got %d", updatedPost.UpvoteCount)
283 }
284 if updatedPost.Score != 1 {
285 t.Errorf("Expected score = 1, got %d", updatedPost.Score)
286 }
287
288 t.Logf("✅ TRUE E2E UPVOTE FLOW COMPLETE:")
289 t.Logf(" Client → XRPC → PDS Write → Jetstream → Consumer → AppView ✓")
290 t.Logf(" ✓ Vote written to PDS")
291 t.Logf(" ✓ Vote indexed in AppView")
292 t.Logf(" ✓ Post vote counts updated")
293}
294
295// TestVoteE2E_ToggleSameDirection tests voting twice in same direction (toggle off)
296func TestVoteE2E_ToggleSameDirection(t *testing.T) {
297 if testing.Short() {
298 t.Skip("Skipping E2E test in short mode")
299 }
300
301 db := setupTestDB(t)
302 defer func() { _ = db.Close() }()
303
304 ctx := context.Background()
305 pdsURL := getTestPDSURL()
306
307 // Setup repositories and services
308 voteRepo := postgres.NewVoteRepository(db)
309 postRepo := postgres.NewPostRepository(db)
310
311 oauthStore := SetupOAuthTestStore(t, db)
312 oauthClient := SetupOAuthTestClient(t, oauthStore)
313 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
314
315 // Create test user
316 testUserHandle := fmt.Sprintf("toggle-%d.local.coves.dev", time.Now().Unix())
317 testUserEmail := fmt.Sprintf("toggle-%d@test.local", time.Now().Unix())
318 testUserPassword := "test-password-123"
319
320 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
321 if err != nil {
322 t.Skipf("PDS not available: %v", err)
323 }
324
325 testUser := createTestUser(t, db, testUserHandle, userDID)
326
327 // Create test post
328 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "toggle-community", "owner.test")
329 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
330 postCID := "bafypost456"
331
332 // Setup OAuth and HTTP server with real PDS access token
333 e2eAuth := NewE2EOAuthMiddleware()
334 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL)
335
336 r := chi.NewRouter()
337 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware)
338 httpServer := httptest.NewServer(r)
339 defer httpServer.Close()
340
341 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
342
343 // First upvote
344 t.Logf("\n📝 Creating first upvote...")
345 voteReq := map[string]interface{}{
346 "subject": map[string]interface{}{
347 "uri": postURI,
348 "cid": postCID,
349 },
350 "direction": "up",
351 }
352
353 reqBody, _ := json.Marshal(voteReq)
354 req, _ := http.NewRequest(http.MethodPost,
355 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
356 bytes.NewBuffer(reqBody))
357 req.Header.Set("Content-Type", "application/json")
358 req.Header.Set("Authorization", "Bearer "+token)
359
360 resp, err := http.DefaultClient.Do(req)
361 if err != nil {
362 t.Fatalf("Failed to create first vote: %v", err)
363 }
364
365 var firstVoteResp struct {
366 URI string `json:"uri"`
367 CID string `json:"cid"`
368 }
369 if decodeErr := json.NewDecoder(resp.Body).Decode(&firstVoteResp); decodeErr != nil {
370 t.Fatalf("Failed to decode first vote response: %v", decodeErr)
371 }
372 if closeErr := resp.Body.Close(); closeErr != nil {
373 t.Logf("Failed to close response body: %v", closeErr)
374 }
375
376 t.Logf("✅ First vote created: %s", firstVoteResp.URI)
377
378 // Index first vote
379 rkey := utils.ExtractRKeyFromURI(firstVoteResp.URI)
380 voteEvent := jetstream.JetstreamEvent{
381 Did: userDID,
382 TimeUS: time.Now().UnixMicro(),
383 Kind: "commit",
384 Commit: &jetstream.CommitEvent{
385 Rev: "test-vote-rev-1",
386 Operation: "create",
387 Collection: "social.coves.feed.vote",
388 RKey: rkey,
389 CID: firstVoteResp.CID,
390 Record: map[string]interface{}{
391 "$type": "social.coves.feed.vote",
392 "subject": map[string]interface{}{
393 "uri": postURI,
394 "cid": postCID,
395 },
396 "direction": "up",
397 "createdAt": time.Now().Format(time.RFC3339),
398 },
399 },
400 }
401 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil {
402 t.Fatalf("Failed to handle first vote event: %v", handleErr)
403 }
404
405 // Second upvote (same direction) - should toggle off (delete)
406 t.Logf("\n📝 Creating second upvote (toggle off)...")
407 req2, _ := http.NewRequest(http.MethodPost,
408 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
409 bytes.NewBuffer(reqBody))
410 req2.Header.Set("Content-Type", "application/json")
411 req2.Header.Set("Authorization", "Bearer "+token)
412
413 resp2, err := http.DefaultClient.Do(req2)
414 if err != nil {
415 t.Fatalf("Failed to toggle vote: %v", err)
416 }
417 defer func() {
418 if closeErr := resp2.Body.Close(); closeErr != nil {
419 t.Logf("Failed to close response body: %v", closeErr)
420 }
421 }()
422
423 if resp2.StatusCode != http.StatusOK {
424 body, _ := io.ReadAll(resp2.Body)
425 t.Fatalf("Expected 200, got %d: %s", resp2.StatusCode, string(body))
426 }
427
428 t.Logf("✅ Second vote request completed (toggle)")
429
430 // Simulate Jetstream DELETE event
431 t.Logf("\n🔄 Simulating Jetstream DELETE event...")
432 deleteEvent := jetstream.JetstreamEvent{
433 Did: userDID,
434 TimeUS: time.Now().UnixMicro(),
435 Kind: "commit",
436 Commit: &jetstream.CommitEvent{
437 Rev: "test-vote-rev-2",
438 Operation: "delete",
439 Collection: "social.coves.feed.vote",
440 RKey: rkey,
441 },
442 }
443 if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil {
444 t.Fatalf("Failed to handle delete event: %v", handleErr)
445 }
446
447 // Verify vote was removed from AppView
448 t.Logf("\n🔍 Verifying vote removed from AppView...")
449 _, err = voteRepo.GetByURI(ctx, firstVoteResp.URI)
450 if err == nil {
451 t.Error("Expected vote to be deleted, but it still exists")
452 }
453
454 // Verify post counts reset
455 updatedPost, _ := postRepo.GetByURI(ctx, postURI)
456 if updatedPost.UpvoteCount != 0 {
457 t.Errorf("Expected upvote_count = 0 after toggle, got %d", updatedPost.UpvoteCount)
458 }
459
460 t.Logf("✅ TOGGLE SAME DIRECTION FLOW COMPLETE:")
461 t.Logf(" ✓ First vote created and indexed")
462 t.Logf(" ✓ Second vote toggled off (deleted)")
463 t.Logf(" ✓ Post counts updated correctly")
464}
465
466// TestVoteE2E_ToggleDifferentDirection tests changing vote direction
467func TestVoteE2E_ToggleDifferentDirection(t *testing.T) {
468 if testing.Short() {
469 t.Skip("Skipping E2E test in short mode")
470 }
471
472 db := setupTestDB(t)
473 defer func() { _ = db.Close() }()
474
475 ctx := context.Background()
476 pdsURL := getTestPDSURL()
477
478 // Setup repositories and services
479 voteRepo := postgres.NewVoteRepository(db)
480 postRepo := postgres.NewPostRepository(db)
481
482 oauthStore := SetupOAuthTestStore(t, db)
483 oauthClient := SetupOAuthTestClient(t, oauthStore)
484 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
485
486 // Create test user
487 testUserHandle := fmt.Sprintf("flip-%d.local.coves.dev", time.Now().Unix())
488 testUserEmail := fmt.Sprintf("flip-%d@test.local", time.Now().Unix())
489 testUserPassword := "test-password-123"
490
491 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
492 if err != nil {
493 t.Skipf("PDS not available: %v", err)
494 }
495
496 testUser := createTestUser(t, db, testUserHandle, userDID)
497
498 // Create test post
499 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "flip-community", "owner.test")
500 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
501 postCID := "bafypost789"
502
503 // Setup OAuth and HTTP server with real PDS access token
504 e2eAuth := NewE2EOAuthMiddleware()
505 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL)
506
507 r := chi.NewRouter()
508 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware)
509 httpServer := httptest.NewServer(r)
510 defer httpServer.Close()
511
512 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
513
514 // Create upvote
515 t.Logf("\n📝 Creating upvote...")
516 upvoteReq := map[string]interface{}{
517 "subject": map[string]interface{}{
518 "uri": postURI,
519 "cid": postCID,
520 },
521 "direction": "up",
522 }
523
524 reqBody, _ := json.Marshal(upvoteReq)
525 req, _ := http.NewRequest(http.MethodPost,
526 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
527 bytes.NewBuffer(reqBody))
528 req.Header.Set("Content-Type", "application/json")
529 req.Header.Set("Authorization", "Bearer "+token)
530
531 resp, err := http.DefaultClient.Do(req)
532 if err != nil {
533 t.Fatalf("Failed to create upvote: %v", err)
534 }
535 var upvoteResp struct {
536 URI string `json:"uri"`
537 CID string `json:"cid"`
538 }
539 if decodeErr := json.NewDecoder(resp.Body).Decode(&upvoteResp); decodeErr != nil {
540 t.Fatalf("Failed to decode upvote response: %v", decodeErr)
541 }
542 if closeErr := resp.Body.Close(); closeErr != nil {
543 t.Logf("Failed to close response body: %v", closeErr)
544 }
545
546 // Index upvote
547 rkey := utils.ExtractRKeyFromURI(upvoteResp.URI)
548 upvoteEvent := jetstream.JetstreamEvent{
549 Did: userDID,
550 TimeUS: time.Now().UnixMicro(),
551 Kind: "commit",
552 Commit: &jetstream.CommitEvent{
553 Rev: "test-vote-rev-up",
554 Operation: "create",
555 Collection: "social.coves.feed.vote",
556 RKey: rkey,
557 CID: upvoteResp.CID,
558 Record: map[string]interface{}{
559 "$type": "social.coves.feed.vote",
560 "subject": map[string]interface{}{
561 "uri": postURI,
562 "cid": postCID,
563 },
564 "direction": "up",
565 "createdAt": time.Now().Format(time.RFC3339),
566 },
567 },
568 }
569 if handleErr := voteConsumer.HandleEvent(ctx, &upvoteEvent); handleErr != nil {
570 t.Fatalf("Failed to handle upvote event: %v", handleErr)
571 }
572
573 t.Logf("✅ Upvote created and indexed")
574
575 // Change to downvote
576 t.Logf("\n📝 Changing to downvote...")
577 downvoteReq := map[string]interface{}{
578 "subject": map[string]interface{}{
579 "uri": postURI,
580 "cid": postCID,
581 },
582 "direction": "down",
583 }
584
585 reqBody2, _ := json.Marshal(downvoteReq)
586 req2, _ := http.NewRequest(http.MethodPost,
587 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
588 bytes.NewBuffer(reqBody2))
589 req2.Header.Set("Content-Type", "application/json")
590 req2.Header.Set("Authorization", "Bearer "+token)
591
592 resp2, err := http.DefaultClient.Do(req2)
593 if err != nil {
594 t.Fatalf("Failed to create downvote: %v", err)
595 }
596 var downvoteResp struct {
597 URI string `json:"uri"`
598 CID string `json:"cid"`
599 }
600 if decodeErr := json.NewDecoder(resp2.Body).Decode(&downvoteResp); decodeErr != nil {
601 t.Fatalf("Failed to decode downvote response: %v", decodeErr)
602 }
603 if closeErr := resp2.Body.Close(); closeErr != nil {
604 t.Logf("Failed to close response body: %v", closeErr)
605 }
606
607 // The service flow for direction change is:
608 // 1. DELETE old vote on PDS
609 // 2. CREATE new vote with NEW rkey on PDS
610 // So we simulate DELETE + CREATE events (not UPDATE)
611
612 // Simulate Jetstream DELETE event for old vote
613 t.Logf("\n🔄 Simulating Jetstream DELETE event for old upvote...")
614 deleteEvent := jetstream.JetstreamEvent{
615 Did: userDID,
616 TimeUS: time.Now().UnixMicro(),
617 Kind: "commit",
618 Commit: &jetstream.CommitEvent{
619 Rev: "test-vote-rev-delete",
620 Operation: "delete",
621 Collection: "social.coves.feed.vote",
622 RKey: rkey, // Old upvote rkey
623 },
624 }
625 if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil {
626 t.Fatalf("Failed to handle delete event: %v", handleErr)
627 }
628
629 // Simulate Jetstream CREATE event for new downvote
630 t.Logf("\n🔄 Simulating Jetstream CREATE event for new downvote...")
631 newRkey := utils.ExtractRKeyFromURI(downvoteResp.URI)
632 createEvent := jetstream.JetstreamEvent{
633 Did: userDID,
634 TimeUS: time.Now().UnixMicro(),
635 Kind: "commit",
636 Commit: &jetstream.CommitEvent{
637 Rev: "test-vote-rev-down",
638 Operation: "create",
639 Collection: "social.coves.feed.vote",
640 RKey: newRkey, // NEW rkey from downvote response
641 CID: downvoteResp.CID,
642 Record: map[string]interface{}{
643 "$type": "social.coves.feed.vote",
644 "subject": map[string]interface{}{
645 "uri": postURI,
646 "cid": postCID,
647 },
648 "direction": "down",
649 "createdAt": time.Now().Format(time.RFC3339),
650 },
651 },
652 }
653 if handleErr := voteConsumer.HandleEvent(ctx, &createEvent); handleErr != nil {
654 t.Fatalf("Failed to handle create event: %v", handleErr)
655 }
656
657 // Verify old upvote was deleted
658 t.Logf("\n🔍 Verifying old upvote was deleted...")
659 _, err = voteRepo.GetByURI(ctx, upvoteResp.URI)
660 if err == nil {
661 t.Error("Expected old upvote to be deleted, but it still exists")
662 }
663
664 // Verify new downvote was indexed
665 t.Logf("\n🔍 Verifying new downvote indexed in AppView...")
666 newVote, err := voteRepo.GetByURI(ctx, downvoteResp.URI)
667 if err != nil {
668 t.Fatalf("New downvote not found: %v", err)
669 }
670
671 if newVote.Direction != "down" {
672 t.Errorf("Expected direction 'down', got %s", newVote.Direction)
673 }
674
675 // Verify post counts updated
676 updatedPost, _ := postRepo.GetByURI(ctx, postURI)
677 if updatedPost.UpvoteCount != 0 {
678 t.Errorf("Expected upvote_count = 0, got %d", updatedPost.UpvoteCount)
679 }
680 if updatedPost.DownvoteCount != 1 {
681 t.Errorf("Expected downvote_count = 1, got %d", updatedPost.DownvoteCount)
682 }
683 if updatedPost.Score != -1 {
684 t.Errorf("Expected score = -1, got %d", updatedPost.Score)
685 }
686
687 t.Logf("✅ TOGGLE DIFFERENT DIRECTION FLOW COMPLETE:")
688 t.Logf(" ✓ Upvote created (score: +1)")
689 t.Logf(" ✓ Changed to downvote (score: -1)")
690 t.Logf(" ✓ Post counts updated correctly")
691}
692
693// TestVoteE2E_DeleteVote tests explicit vote deletion
694func TestVoteE2E_DeleteVote(t *testing.T) {
695 if testing.Short() {
696 t.Skip("Skipping E2E test in short mode")
697 }
698
699 db := setupTestDB(t)
700 defer func() { _ = db.Close() }()
701
702 ctx := context.Background()
703 pdsURL := getTestPDSURL()
704
705 // Setup repositories and services
706 voteRepo := postgres.NewVoteRepository(db)
707 postRepo := postgres.NewPostRepository(db)
708
709 oauthStore := SetupOAuthTestStore(t, db)
710 oauthClient := SetupOAuthTestClient(t, oauthStore)
711 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
712
713 // Create test user
714 testUserHandle := fmt.Sprintf("delete-%d.local.coves.dev", time.Now().Unix())
715 testUserEmail := fmt.Sprintf("delete-%d@test.local", time.Now().Unix())
716 testUserPassword := "test-password-123"
717
718 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
719 if err != nil {
720 t.Skipf("PDS not available: %v", err)
721 }
722
723 testUser := createTestUser(t, db, testUserHandle, userDID)
724
725 // Create test post
726 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "delete-community", "owner.test")
727 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
728 postCID := "bafypost999"
729
730 // Setup OAuth and HTTP server with real PDS access token
731 e2eAuth := NewE2EOAuthMiddleware()
732 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL)
733
734 r := chi.NewRouter()
735 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware)
736 httpServer := httptest.NewServer(r)
737 defer httpServer.Close()
738
739 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
740
741 // Create vote first
742 t.Logf("\n📝 Creating vote to delete...")
743 voteReq := map[string]interface{}{
744 "subject": map[string]interface{}{
745 "uri": postURI,
746 "cid": postCID,
747 },
748 "direction": "up",
749 }
750
751 reqBody, _ := json.Marshal(voteReq)
752 req, _ := http.NewRequest(http.MethodPost,
753 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
754 bytes.NewBuffer(reqBody))
755 req.Header.Set("Content-Type", "application/json")
756 req.Header.Set("Authorization", "Bearer "+token)
757
758 resp, err := http.DefaultClient.Do(req)
759 if err != nil {
760 t.Fatalf("Failed to create vote: %v", err)
761 }
762 var voteResp struct {
763 URI string `json:"uri"`
764 CID string `json:"cid"`
765 }
766 if decodeErr := json.NewDecoder(resp.Body).Decode(&voteResp); decodeErr != nil {
767 t.Fatalf("Failed to decode vote response: %v", decodeErr)
768 }
769 if closeErr := resp.Body.Close(); closeErr != nil {
770 t.Logf("Failed to close response body: %v", closeErr)
771 }
772
773 // Index vote
774 rkey := utils.ExtractRKeyFromURI(voteResp.URI)
775 voteEvent := jetstream.JetstreamEvent{
776 Did: userDID,
777 TimeUS: time.Now().UnixMicro(),
778 Kind: "commit",
779 Commit: &jetstream.CommitEvent{
780 Rev: "test-vote-create",
781 Operation: "create",
782 Collection: "social.coves.feed.vote",
783 RKey: rkey,
784 CID: voteResp.CID,
785 Record: map[string]interface{}{
786 "$type": "social.coves.feed.vote",
787 "subject": map[string]interface{}{
788 "uri": postURI,
789 "cid": postCID,
790 },
791 "direction": "up",
792 "createdAt": time.Now().Format(time.RFC3339),
793 },
794 },
795 }
796 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil {
797 t.Fatalf("Failed to handle vote event: %v", handleErr)
798 }
799
800 t.Logf("✅ Vote created and indexed")
801
802 // Delete vote via XRPC
803 t.Logf("\n📝 Deleting vote via XRPC...")
804 deleteReq := map[string]interface{}{
805 "subject": map[string]interface{}{
806 "uri": postURI,
807 "cid": postCID,
808 },
809 }
810
811 deleteBody, _ := json.Marshal(deleteReq)
812 deleteHttpReq, _ := http.NewRequest(http.MethodPost,
813 httpServer.URL+"/xrpc/social.coves.feed.vote.delete",
814 bytes.NewBuffer(deleteBody))
815 deleteHttpReq.Header.Set("Content-Type", "application/json")
816 deleteHttpReq.Header.Set("Authorization", "Bearer "+token)
817
818 deleteResp, err := http.DefaultClient.Do(deleteHttpReq)
819 if err != nil {
820 t.Fatalf("Failed to delete vote: %v", err)
821 }
822 defer func() {
823 if closeErr := deleteResp.Body.Close(); closeErr != nil {
824 t.Logf("Failed to close response body: %v", closeErr)
825 }
826 }()
827
828 if deleteResp.StatusCode != http.StatusOK {
829 body, _ := io.ReadAll(deleteResp.Body)
830 t.Fatalf("Delete failed: status %d, body: %s", deleteResp.StatusCode, string(body))
831 }
832
833 // Per lexicon, delete returns empty object {}
834 var deleteRespBody map[string]interface{}
835 if decodeErr := json.NewDecoder(deleteResp.Body).Decode(&deleteRespBody); decodeErr != nil {
836 t.Fatalf("Failed to decode delete response: %v", decodeErr)
837 }
838
839 if len(deleteRespBody) != 0 {
840 t.Errorf("Expected empty object per lexicon, got %v", deleteRespBody)
841 }
842
843 t.Logf("✅ Delete vote request succeeded")
844
845 // Simulate Jetstream DELETE event
846 t.Logf("\n🔄 Simulating Jetstream DELETE event...")
847 deleteEvent := jetstream.JetstreamEvent{
848 Did: userDID,
849 TimeUS: time.Now().UnixMicro(),
850 Kind: "commit",
851 Commit: &jetstream.CommitEvent{
852 Rev: "test-vote-delete",
853 Operation: "delete",
854 Collection: "social.coves.feed.vote",
855 RKey: rkey,
856 },
857 }
858 if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil {
859 t.Fatalf("Failed to handle delete event: %v", handleErr)
860 }
861
862 // Verify vote removed from AppView
863 t.Logf("\n🔍 Verifying vote removed from AppView...")
864 _, err = voteRepo.GetByURI(ctx, voteResp.URI)
865 if err == nil {
866 t.Error("Expected vote to be deleted, but it still exists")
867 }
868
869 // Verify post counts reset
870 updatedPost, _ := postRepo.GetByURI(ctx, postURI)
871 if updatedPost.UpvoteCount != 0 {
872 t.Errorf("Expected upvote_count = 0 after delete, got %d", updatedPost.UpvoteCount)
873 }
874 if updatedPost.Score != 0 {
875 t.Errorf("Expected score = 0 after delete, got %d", updatedPost.Score)
876 }
877
878 t.Logf("✅ EXPLICIT DELETE FLOW COMPLETE:")
879 t.Logf(" ✓ Vote created and indexed")
880 t.Logf(" ✓ Vote deleted via XRPC")
881 t.Logf(" ✓ Vote removed from AppView")
882 t.Logf(" ✓ Post counts updated correctly")
883}
884
885// TestVoteE2E_JetstreamIndexing tests real Jetstream firehose consumption
886func TestVoteE2E_JetstreamIndexing(t *testing.T) {
887 if testing.Short() {
888 t.Skip("Skipping E2E test in short mode")
889 }
890
891 db := setupTestDB(t)
892 defer func() { _ = db.Close() }()
893
894 ctx := context.Background()
895 pdsURL := getTestPDSURL()
896
897 // Setup repositories
898 voteRepo := postgres.NewVoteRepository(db)
899
900 // Create test user on PDS
901 testUserHandle := fmt.Sprintf("jetstream-%d.local.coves.dev", time.Now().Unix())
902 testUserEmail := fmt.Sprintf("jetstream-%d@test.local", time.Now().Unix())
903 testUserPassword := "test-password-123"
904
905 accessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
906 if err != nil {
907 t.Skipf("PDS not available: %v", err)
908 }
909
910 testUser := createTestUser(t, db, testUserHandle, userDID)
911
912 // Create test post
913 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "jetstream-community", "owner.test")
914 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
915 postCID := "bafypostjetstream"
916
917 // Write vote directly to PDS
918 t.Logf("\n📝 Writing vote to PDS...")
919 voteRecord := map[string]interface{}{
920 "$type": "social.coves.feed.vote",
921 "subject": map[string]interface{}{
922 "uri": postURI,
923 "cid": postCID,
924 },
925 "direction": "up",
926 "createdAt": time.Now().Format(time.RFC3339),
927 }
928
929 voteURI, voteCID, err := writePDSRecord(pdsURL, accessToken, userDID, "social.coves.feed.vote", "", voteRecord)
930 if err != nil {
931 t.Fatalf("Failed to write vote to PDS: %v", err)
932 }
933
934 t.Logf("✅ Vote written to PDS:")
935 t.Logf(" URI: %s", voteURI)
936 t.Logf(" CID: %s", voteCID)
937
938 // Setup Jetstream consumer
939 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
940
941 // Subscribe to Jetstream
942 t.Logf("\n🔄 Subscribing to real Jetstream firehose...")
943 pdsHostname := strings.TrimPrefix(pdsURL, "http://")
944 pdsHostname = strings.TrimPrefix(pdsHostname, "https://")
945 pdsHostname = strings.Split(pdsHostname, ":")[0]
946
947 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.feed.vote", pdsHostname)
948 t.Logf(" Jetstream URL: %s", jetstreamURL)
949 t.Logf(" Looking for vote DID: %s", userDID)
950
951 // Channels for event communication
952 eventChan := make(chan *jetstream.JetstreamEvent, 10)
953 errorChan := make(chan error, 1)
954 done := make(chan bool)
955
956 // Start Jetstream consumer in background
957 go func() {
958 err := subscribeToJetstreamForVote(ctx, jetstreamURL, userDID, voteConsumer, eventChan, errorChan, done)
959 if err != nil {
960 errorChan <- err
961 }
962 }()
963
964 // Wait for event or timeout
965 t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...")
966
967 select {
968 case event := <-eventChan:
969 t.Logf("✅ Received real Jetstream event!")
970 t.Logf(" Event DID: %s", event.Did)
971 t.Logf(" Collection: %s", event.Commit.Collection)
972 t.Logf(" Operation: %s", event.Commit.Operation)
973 t.Logf(" RKey: %s", event.Commit.RKey)
974
975 // Verify it's our vote
976 if event.Did != userDID {
977 t.Errorf("Expected DID %s, got %s", userDID, event.Did)
978 }
979
980 // Verify indexed in AppView database
981 t.Logf("\n🔍 Querying AppView database...")
982 indexedVote, err := voteRepo.GetByURI(ctx, voteURI)
983 if err != nil {
984 t.Fatalf("Vote not indexed in AppView: %v", err)
985 }
986
987 t.Logf("✅ Vote indexed in AppView:")
988 t.Logf(" VoterDID: %s", indexedVote.VoterDID)
989 t.Logf(" SubjectURI: %s", indexedVote.SubjectURI)
990 t.Logf(" Direction: %s", indexedVote.Direction)
991 t.Logf(" URI: %s", indexedVote.URI)
992
993 // Signal to stop Jetstream consumer
994 close(done)
995
996 case err := <-errorChan:
997 t.Fatalf("Jetstream error: %v", err)
998
999 case <-time.After(30 * time.Second):
1000 t.Fatalf("Timeout: No Jetstream event received within 30 seconds")
1001 }
1002
1003 t.Logf("\n✅ TRUE E2E JETSTREAM FLOW COMPLETE:")
1004 t.Logf(" PDS → Jetstream → Consumer → AppView ✓")
1005}
1006
1007// subscribeToJetstreamForVote subscribes to real Jetstream firehose for vote events
1008func subscribeToJetstreamForVote(
1009 ctx context.Context,
1010 jetstreamURL string,
1011 targetDID string,
1012 consumer *jetstream.VoteEventConsumer,
1013 eventChan chan<- *jetstream.JetstreamEvent,
1014 errorChan chan<- error,
1015 done <-chan bool,
1016) error {
1017 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil)
1018 if err != nil {
1019 return fmt.Errorf("failed to connect to Jetstream: %w", err)
1020 }
1021 defer func() { _ = conn.Close() }()
1022
1023 // Read messages until we find our event or receive done signal
1024 for {
1025 select {
1026 case <-done:
1027 return nil
1028 case <-ctx.Done():
1029 return ctx.Err()
1030 default:
1031 // Set read deadline to avoid blocking forever
1032 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
1033 return fmt.Errorf("failed to set read deadline: %w", err)
1034 }
1035
1036 var event jetstream.JetstreamEvent
1037 err := conn.ReadJSON(&event)
1038 if err != nil {
1039 // Check if it's a timeout (expected)
1040 if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
1041 return nil
1042 }
1043 if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
1044 continue // Timeout is expected, keep listening
1045 }
1046 return fmt.Errorf("failed to read Jetstream message: %w", err)
1047 }
1048
1049 // Check if this is the event we're looking for
1050 if event.Did == targetDID && event.Kind == "commit" && event.Commit.Collection == "social.coves.feed.vote" {
1051 // Process the event through the consumer
1052 if err := consumer.HandleEvent(ctx, &event); err != nil {
1053 return fmt.Errorf("failed to process event: %w", err)
1054 }
1055
1056 // Send to channel so test can verify
1057 select {
1058 case eventChan <- &event:
1059 return nil
1060 case <-time.After(1 * time.Second):
1061 return fmt.Errorf("timeout sending event to channel")
1062 }
1063 }
1064 }
1065 }
1066}