A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/api/routes"
5 "Coves/internal/atproto/jetstream"
6 "Coves/internal/atproto/utils"
7 "Coves/internal/core/votes"
8 "Coves/internal/db/postgres"
9 "bytes"
10 "context"
11 "database/sql"
12 "encoding/json"
13 "fmt"
14 "io"
15 "net"
16 "net/http"
17 "net/http/httptest"
18 "os"
19 "strings"
20 "testing"
21 "time"
22
23 "github.com/go-chi/chi/v5"
24 "github.com/gorilla/websocket"
25 _ "github.com/lib/pq"
26 "github.com/pressly/goose/v3"
27)
28
29// TestVoteE2E_CreateUpvote tests the full vote creation flow with a real local PDS
30// Flow: Client → XRPC → PDS Write → Jetstream → Consumer → AppView
31func TestVoteE2E_CreateUpvote(t *testing.T) {
32 // Skip in short mode since this requires real PDS
33 if testing.Short() {
34 t.Skip("Skipping E2E test in short mode")
35 }
36
37 // Setup test database
38 dbURL := os.Getenv("TEST_DATABASE_URL")
39 if dbURL == "" {
40 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
41 }
42
43 db, err := sql.Open("postgres", dbURL)
44 if err != nil {
45 t.Fatalf("Failed to connect to test database: %v", err)
46 }
47 defer func() {
48 if closeErr := db.Close(); closeErr != nil {
49 t.Logf("Failed to close database: %v", closeErr)
50 }
51 }()
52
53 // Run migrations
54 if dialectErr := goose.SetDialect("postgres"); dialectErr != nil {
55 t.Fatalf("Failed to set goose dialect: %v", dialectErr)
56 }
57 if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil {
58 t.Fatalf("Failed to run migrations: %v", migrateErr)
59 }
60
61 // Check if PDS is running
62 pdsURL := os.Getenv("PDS_URL")
63 if pdsURL == "" {
64 pdsURL = "http://localhost:3001"
65 }
66
67 healthResp, err := http.Get(pdsURL + "/xrpc/_health")
68 if err != nil {
69 t.Skipf("PDS not running at %s: %v", pdsURL, err)
70 }
71 func() {
72 if closeErr := healthResp.Body.Close(); closeErr != nil {
73 t.Logf("Failed to close health response: %v", closeErr)
74 }
75 }()
76
77 ctx := context.Background()
78
79 // Setup repositories
80 voteRepo := postgres.NewVoteRepository(db)
81 postRepo := postgres.NewPostRepository(db)
82
83 // Setup services with password-based PDS client factory for E2E testing
84 voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory())
85
86 // Create test user on PDS
87 testUserHandle := fmt.Sprintf("voter-%d.local.coves.dev", time.Now().Unix())
88 testUserEmail := fmt.Sprintf("voter-%d@test.local", time.Now().Unix())
89 testUserPassword := "test-password-123"
90
91 t.Logf("Creating test user on PDS: %s", testUserHandle)
92 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
93 if err != nil {
94 t.Fatalf("Failed to create test user on PDS: %v", err)
95 }
96 t.Logf("Test user created: DID=%s", userDID)
97
98 // Index user in AppView
99 testUser := createTestUser(t, db, testUserHandle, userDID)
100
101 // Create test post to vote on
102 testCommunityDID, err := createFeedTestCommunity(db, ctx, "test-community", "owner.test")
103 if err != nil {
104 t.Fatalf("Failed to create test community: %v", err)
105 }
106
107 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
108 postCID := "bafypost123"
109
110 // Setup OAuth middleware with real PDS access token
111 e2eAuth := NewE2EOAuthMiddleware()
112 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL)
113
114 // Setup HTTP server with XRPC routes
115 r := chi.NewRouter()
116 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware)
117 httpServer := httptest.NewServer(r)
118 defer httpServer.Close()
119
120 // Setup Jetstream consumer
121 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
122
123 // ====================================================================================
124 // TEST: Create upvote on post
125 // ====================================================================================
126 t.Logf("\n📝 Creating upvote via XRPC endpoint...")
127
128 voteReq := map[string]interface{}{
129 "subject": map[string]interface{}{
130 "uri": postURI,
131 "cid": postCID,
132 },
133 "direction": "up",
134 }
135
136 reqBody, marshalErr := json.Marshal(voteReq)
137 if marshalErr != nil {
138 t.Fatalf("Failed to marshal request: %v", marshalErr)
139 }
140
141 req, err := http.NewRequest(http.MethodPost,
142 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
143 bytes.NewBuffer(reqBody))
144 if err != nil {
145 t.Fatalf("Failed to create request: %v", err)
146 }
147 req.Header.Set("Content-Type", "application/json")
148 req.Header.Set("Authorization", "Bearer "+token)
149
150 resp, err := http.DefaultClient.Do(req)
151 if err != nil {
152 t.Fatalf("Failed to POST vote: %v", err)
153 }
154 defer func() { _ = resp.Body.Close() }()
155
156 if resp.StatusCode != http.StatusOK {
157 body, readErr := io.ReadAll(resp.Body)
158 if readErr != nil {
159 t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr)
160 }
161 t.Logf("XRPC Vote Failed")
162 t.Logf(" Status: %d", resp.StatusCode)
163 t.Logf(" Response: %s", string(body))
164 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body))
165 }
166
167 var voteResp struct {
168 URI string `json:"uri"`
169 CID string `json:"cid"`
170 }
171
172 if decodeErr := json.NewDecoder(resp.Body).Decode(&voteResp); decodeErr != nil {
173 t.Fatalf("Failed to decode vote response: %v", decodeErr)
174 }
175
176 t.Logf("✅ XRPC response received:")
177 t.Logf(" URI: %s", voteResp.URI)
178 t.Logf(" CID: %s", voteResp.CID)
179
180 // Verify vote record was written to PDS
181 t.Logf("\n🔍 Verifying vote record on PDS...")
182 rkey := utils.ExtractRKeyFromURI(voteResp.URI)
183 collection := "social.coves.feed.vote"
184
185 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s",
186 pdsURL, userDID, collection, rkey))
187 if pdsErr != nil {
188 t.Fatalf("Failed to fetch vote record from PDS: %v", pdsErr)
189 }
190 defer func() {
191 if closeErr := pdsResp.Body.Close(); closeErr != nil {
192 t.Logf("Failed to close PDS response: %v", closeErr)
193 }
194 }()
195
196 if pdsResp.StatusCode != http.StatusOK {
197 body, _ := io.ReadAll(pdsResp.Body)
198 t.Fatalf("Vote record not found on PDS: status %d, body: %s", pdsResp.StatusCode, string(body))
199 }
200
201 var pdsRecord struct {
202 Value map[string]interface{} `json:"value"`
203 CID string `json:"cid"`
204 }
205 if decodeErr := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); decodeErr != nil {
206 t.Fatalf("Failed to decode PDS record: %v", decodeErr)
207 }
208
209 t.Logf("✅ Vote record found on PDS:")
210 t.Logf(" CID: %s", pdsRecord.CID)
211 t.Logf(" Direction: %v", pdsRecord.Value["direction"])
212
213 // Verify direction
214 if pdsRecord.Value["direction"] != "up" {
215 t.Errorf("Expected direction 'up', got %v", pdsRecord.Value["direction"])
216 }
217
218 // Simulate Jetstream consumer indexing the vote
219 t.Logf("\n🔄 Simulating Jetstream consumer indexing vote...")
220 voteEvent := jetstream.JetstreamEvent{
221 Did: userDID,
222 TimeUS: time.Now().UnixMicro(),
223 Kind: "commit",
224 Commit: &jetstream.CommitEvent{
225 Rev: "test-vote-rev",
226 Operation: "create",
227 Collection: "social.coves.feed.vote",
228 RKey: rkey,
229 CID: pdsRecord.CID,
230 Record: map[string]interface{}{
231 "$type": "social.coves.feed.vote",
232 "subject": map[string]interface{}{
233 "uri": postURI,
234 "cid": postCID,
235 },
236 "direction": "up",
237 "createdAt": time.Now().Format(time.RFC3339),
238 },
239 },
240 }
241
242 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil {
243 t.Fatalf("Failed to handle vote event: %v", handleErr)
244 }
245
246 // Verify vote was indexed in AppView
247 t.Logf("\n🔍 Verifying vote indexed in AppView...")
248 indexedVote, err := voteRepo.GetByURI(ctx, voteResp.URI)
249 if err != nil {
250 t.Fatalf("Vote not indexed in AppView: %v", err)
251 }
252
253 t.Logf("✅ Vote indexed in AppView:")
254 t.Logf(" VoterDID: %s", indexedVote.VoterDID)
255 t.Logf(" SubjectURI: %s", indexedVote.SubjectURI)
256 t.Logf(" Direction: %s", indexedVote.Direction)
257 t.Logf(" URI: %s", indexedVote.URI)
258
259 // Verify vote details
260 if indexedVote.VoterDID != userDID {
261 t.Errorf("Expected voter_did %s, got %s", userDID, indexedVote.VoterDID)
262 }
263 if indexedVote.SubjectURI != postURI {
264 t.Errorf("Expected subject_uri %s, got %s", postURI, indexedVote.SubjectURI)
265 }
266 if indexedVote.Direction != "up" {
267 t.Errorf("Expected direction 'up', got %s", indexedVote.Direction)
268 }
269
270 // Verify post counts updated
271 t.Logf("\n🔍 Verifying post vote counts updated...")
272 updatedPost, err := postRepo.GetByURI(ctx, postURI)
273 if err != nil {
274 t.Fatalf("Failed to get updated post: %v", err)
275 }
276
277 if updatedPost.UpvoteCount != 1 {
278 t.Errorf("Expected upvote_count = 1, got %d", updatedPost.UpvoteCount)
279 }
280 if updatedPost.Score != 1 {
281 t.Errorf("Expected score = 1, got %d", updatedPost.Score)
282 }
283
284 t.Logf("✅ TRUE E2E UPVOTE FLOW COMPLETE:")
285 t.Logf(" Client → XRPC → PDS Write → Jetstream → Consumer → AppView ✓")
286 t.Logf(" ✓ Vote written to PDS")
287 t.Logf(" ✓ Vote indexed in AppView")
288 t.Logf(" ✓ Post vote counts updated")
289}
290
291// TestVoteE2E_ToggleSameDirection tests voting twice in same direction (toggle off)
292func TestVoteE2E_ToggleSameDirection(t *testing.T) {
293 if testing.Short() {
294 t.Skip("Skipping E2E test in short mode")
295 }
296
297 db := setupTestDB(t)
298 defer func() { _ = db.Close() }()
299
300 ctx := context.Background()
301 pdsURL := getTestPDSURL()
302
303 // Setup repositories and services
304 voteRepo := postgres.NewVoteRepository(db)
305 postRepo := postgres.NewPostRepository(db)
306
307 voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory())
308
309 // Create test user
310 testUserHandle := fmt.Sprintf("toggle-%d.local.coves.dev", time.Now().Unix())
311 testUserEmail := fmt.Sprintf("toggle-%d@test.local", time.Now().Unix())
312 testUserPassword := "test-password-123"
313
314 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
315 if err != nil {
316 t.Skipf("PDS not available: %v", err)
317 }
318
319 testUser := createTestUser(t, db, testUserHandle, userDID)
320
321 // Create test post
322 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "toggle-community", "owner.test")
323 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
324 postCID := "bafypost456"
325
326 // Setup OAuth and HTTP server with real PDS access token
327 e2eAuth := NewE2EOAuthMiddleware()
328 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL)
329
330 r := chi.NewRouter()
331 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware)
332 httpServer := httptest.NewServer(r)
333 defer httpServer.Close()
334
335 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
336
337 // First upvote
338 t.Logf("\n📝 Creating first upvote...")
339 voteReq := map[string]interface{}{
340 "subject": map[string]interface{}{
341 "uri": postURI,
342 "cid": postCID,
343 },
344 "direction": "up",
345 }
346
347 reqBody, _ := json.Marshal(voteReq)
348 req, _ := http.NewRequest(http.MethodPost,
349 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
350 bytes.NewBuffer(reqBody))
351 req.Header.Set("Content-Type", "application/json")
352 req.Header.Set("Authorization", "Bearer "+token)
353
354 resp, err := http.DefaultClient.Do(req)
355 if err != nil {
356 t.Fatalf("Failed to create first vote: %v", err)
357 }
358
359 var firstVoteResp struct {
360 URI string `json:"uri"`
361 CID string `json:"cid"`
362 }
363 if decodeErr := json.NewDecoder(resp.Body).Decode(&firstVoteResp); decodeErr != nil {
364 t.Fatalf("Failed to decode first vote response: %v", decodeErr)
365 }
366 if closeErr := resp.Body.Close(); closeErr != nil {
367 t.Logf("Failed to close response body: %v", closeErr)
368 }
369
370 t.Logf("✅ First vote created: %s", firstVoteResp.URI)
371
372 // Index first vote
373 rkey := utils.ExtractRKeyFromURI(firstVoteResp.URI)
374 voteEvent := jetstream.JetstreamEvent{
375 Did: userDID,
376 TimeUS: time.Now().UnixMicro(),
377 Kind: "commit",
378 Commit: &jetstream.CommitEvent{
379 Rev: "test-vote-rev-1",
380 Operation: "create",
381 Collection: "social.coves.feed.vote",
382 RKey: rkey,
383 CID: firstVoteResp.CID,
384 Record: map[string]interface{}{
385 "$type": "social.coves.feed.vote",
386 "subject": map[string]interface{}{
387 "uri": postURI,
388 "cid": postCID,
389 },
390 "direction": "up",
391 "createdAt": time.Now().Format(time.RFC3339),
392 },
393 },
394 }
395 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil {
396 t.Fatalf("Failed to handle first vote event: %v", handleErr)
397 }
398
399 // Second upvote (same direction) - should toggle off (delete)
400 t.Logf("\n📝 Creating second upvote (toggle off)...")
401 req2, _ := http.NewRequest(http.MethodPost,
402 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
403 bytes.NewBuffer(reqBody))
404 req2.Header.Set("Content-Type", "application/json")
405 req2.Header.Set("Authorization", "Bearer "+token)
406
407 resp2, err := http.DefaultClient.Do(req2)
408 if err != nil {
409 t.Fatalf("Failed to toggle vote: %v", err)
410 }
411 defer func() {
412 if closeErr := resp2.Body.Close(); closeErr != nil {
413 t.Logf("Failed to close response body: %v", closeErr)
414 }
415 }()
416
417 if resp2.StatusCode != http.StatusOK {
418 body, _ := io.ReadAll(resp2.Body)
419 t.Fatalf("Expected 200, got %d: %s", resp2.StatusCode, string(body))
420 }
421
422 t.Logf("✅ Second vote request completed (toggle)")
423
424 // Simulate Jetstream DELETE event
425 t.Logf("\n🔄 Simulating Jetstream DELETE event...")
426 deleteEvent := jetstream.JetstreamEvent{
427 Did: userDID,
428 TimeUS: time.Now().UnixMicro(),
429 Kind: "commit",
430 Commit: &jetstream.CommitEvent{
431 Rev: "test-vote-rev-2",
432 Operation: "delete",
433 Collection: "social.coves.feed.vote",
434 RKey: rkey,
435 },
436 }
437 if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil {
438 t.Fatalf("Failed to handle delete event: %v", handleErr)
439 }
440
441 // Verify vote was removed from AppView
442 t.Logf("\n🔍 Verifying vote removed from AppView...")
443 _, err = voteRepo.GetByURI(ctx, firstVoteResp.URI)
444 if err == nil {
445 t.Error("Expected vote to be deleted, but it still exists")
446 }
447
448 // Verify post counts reset
449 updatedPost, _ := postRepo.GetByURI(ctx, postURI)
450 if updatedPost.UpvoteCount != 0 {
451 t.Errorf("Expected upvote_count = 0 after toggle, got %d", updatedPost.UpvoteCount)
452 }
453
454 t.Logf("✅ TOGGLE SAME DIRECTION FLOW COMPLETE:")
455 t.Logf(" ✓ First vote created and indexed")
456 t.Logf(" ✓ Second vote toggled off (deleted)")
457 t.Logf(" ✓ Post counts updated correctly")
458}
459
460// TestVoteE2E_ToggleDifferentDirection tests changing vote direction
461func TestVoteE2E_ToggleDifferentDirection(t *testing.T) {
462 if testing.Short() {
463 t.Skip("Skipping E2E test in short mode")
464 }
465
466 db := setupTestDB(t)
467 defer func() { _ = db.Close() }()
468
469 ctx := context.Background()
470 pdsURL := getTestPDSURL()
471
472 // Setup repositories and services
473 voteRepo := postgres.NewVoteRepository(db)
474 postRepo := postgres.NewPostRepository(db)
475
476 voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory())
477
478 // Create test user
479 testUserHandle := fmt.Sprintf("flip-%d.local.coves.dev", time.Now().Unix())
480 testUserEmail := fmt.Sprintf("flip-%d@test.local", time.Now().Unix())
481 testUserPassword := "test-password-123"
482
483 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
484 if err != nil {
485 t.Skipf("PDS not available: %v", err)
486 }
487
488 testUser := createTestUser(t, db, testUserHandle, userDID)
489
490 // Create test post
491 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "flip-community", "owner.test")
492 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
493 postCID := "bafypost789"
494
495 // Setup OAuth and HTTP server with real PDS access token
496 e2eAuth := NewE2EOAuthMiddleware()
497 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL)
498
499 r := chi.NewRouter()
500 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware)
501 httpServer := httptest.NewServer(r)
502 defer httpServer.Close()
503
504 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
505
506 // Create upvote
507 t.Logf("\n📝 Creating upvote...")
508 upvoteReq := map[string]interface{}{
509 "subject": map[string]interface{}{
510 "uri": postURI,
511 "cid": postCID,
512 },
513 "direction": "up",
514 }
515
516 reqBody, _ := json.Marshal(upvoteReq)
517 req, _ := http.NewRequest(http.MethodPost,
518 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
519 bytes.NewBuffer(reqBody))
520 req.Header.Set("Content-Type", "application/json")
521 req.Header.Set("Authorization", "Bearer "+token)
522
523 resp, err := http.DefaultClient.Do(req)
524 if err != nil {
525 t.Fatalf("Failed to create upvote: %v", err)
526 }
527 var upvoteResp struct {
528 URI string `json:"uri"`
529 CID string `json:"cid"`
530 }
531 if decodeErr := json.NewDecoder(resp.Body).Decode(&upvoteResp); decodeErr != nil {
532 t.Fatalf("Failed to decode upvote response: %v", decodeErr)
533 }
534 if closeErr := resp.Body.Close(); closeErr != nil {
535 t.Logf("Failed to close response body: %v", closeErr)
536 }
537
538 // Index upvote
539 rkey := utils.ExtractRKeyFromURI(upvoteResp.URI)
540 upvoteEvent := jetstream.JetstreamEvent{
541 Did: userDID,
542 TimeUS: time.Now().UnixMicro(),
543 Kind: "commit",
544 Commit: &jetstream.CommitEvent{
545 Rev: "test-vote-rev-up",
546 Operation: "create",
547 Collection: "social.coves.feed.vote",
548 RKey: rkey,
549 CID: upvoteResp.CID,
550 Record: map[string]interface{}{
551 "$type": "social.coves.feed.vote",
552 "subject": map[string]interface{}{
553 "uri": postURI,
554 "cid": postCID,
555 },
556 "direction": "up",
557 "createdAt": time.Now().Format(time.RFC3339),
558 },
559 },
560 }
561 if handleErr := voteConsumer.HandleEvent(ctx, &upvoteEvent); handleErr != nil {
562 t.Fatalf("Failed to handle upvote event: %v", handleErr)
563 }
564
565 t.Logf("✅ Upvote created and indexed")
566
567 // Change to downvote
568 t.Logf("\n📝 Changing to downvote...")
569 downvoteReq := map[string]interface{}{
570 "subject": map[string]interface{}{
571 "uri": postURI,
572 "cid": postCID,
573 },
574 "direction": "down",
575 }
576
577 reqBody2, _ := json.Marshal(downvoteReq)
578 req2, _ := http.NewRequest(http.MethodPost,
579 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
580 bytes.NewBuffer(reqBody2))
581 req2.Header.Set("Content-Type", "application/json")
582 req2.Header.Set("Authorization", "Bearer "+token)
583
584 resp2, err := http.DefaultClient.Do(req2)
585 if err != nil {
586 t.Fatalf("Failed to create downvote: %v", err)
587 }
588 var downvoteResp struct {
589 URI string `json:"uri"`
590 CID string `json:"cid"`
591 }
592 if decodeErr := json.NewDecoder(resp2.Body).Decode(&downvoteResp); decodeErr != nil {
593 t.Fatalf("Failed to decode downvote response: %v", decodeErr)
594 }
595 if closeErr := resp2.Body.Close(); closeErr != nil {
596 t.Logf("Failed to close response body: %v", closeErr)
597 }
598
599 // The service flow for direction change is:
600 // 1. DELETE old vote on PDS
601 // 2. CREATE new vote with NEW rkey on PDS
602 // So we simulate DELETE + CREATE events (not UPDATE)
603
604 // Simulate Jetstream DELETE event for old vote
605 t.Logf("\n🔄 Simulating Jetstream DELETE event for old upvote...")
606 deleteEvent := jetstream.JetstreamEvent{
607 Did: userDID,
608 TimeUS: time.Now().UnixMicro(),
609 Kind: "commit",
610 Commit: &jetstream.CommitEvent{
611 Rev: "test-vote-rev-delete",
612 Operation: "delete",
613 Collection: "social.coves.feed.vote",
614 RKey: rkey, // Old upvote rkey
615 },
616 }
617 if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil {
618 t.Fatalf("Failed to handle delete event: %v", handleErr)
619 }
620
621 // Simulate Jetstream CREATE event for new downvote
622 t.Logf("\n🔄 Simulating Jetstream CREATE event for new downvote...")
623 newRkey := utils.ExtractRKeyFromURI(downvoteResp.URI)
624 createEvent := jetstream.JetstreamEvent{
625 Did: userDID,
626 TimeUS: time.Now().UnixMicro(),
627 Kind: "commit",
628 Commit: &jetstream.CommitEvent{
629 Rev: "test-vote-rev-down",
630 Operation: "create",
631 Collection: "social.coves.feed.vote",
632 RKey: newRkey, // NEW rkey from downvote response
633 CID: downvoteResp.CID,
634 Record: map[string]interface{}{
635 "$type": "social.coves.feed.vote",
636 "subject": map[string]interface{}{
637 "uri": postURI,
638 "cid": postCID,
639 },
640 "direction": "down",
641 "createdAt": time.Now().Format(time.RFC3339),
642 },
643 },
644 }
645 if handleErr := voteConsumer.HandleEvent(ctx, &createEvent); handleErr != nil {
646 t.Fatalf("Failed to handle create event: %v", handleErr)
647 }
648
649 // Verify old upvote was deleted
650 t.Logf("\n🔍 Verifying old upvote was deleted...")
651 _, err = voteRepo.GetByURI(ctx, upvoteResp.URI)
652 if err == nil {
653 t.Error("Expected old upvote to be deleted, but it still exists")
654 }
655
656 // Verify new downvote was indexed
657 t.Logf("\n🔍 Verifying new downvote indexed in AppView...")
658 newVote, err := voteRepo.GetByURI(ctx, downvoteResp.URI)
659 if err != nil {
660 t.Fatalf("New downvote not found: %v", err)
661 }
662
663 if newVote.Direction != "down" {
664 t.Errorf("Expected direction 'down', got %s", newVote.Direction)
665 }
666
667 // Verify post counts updated
668 updatedPost, _ := postRepo.GetByURI(ctx, postURI)
669 if updatedPost.UpvoteCount != 0 {
670 t.Errorf("Expected upvote_count = 0, got %d", updatedPost.UpvoteCount)
671 }
672 if updatedPost.DownvoteCount != 1 {
673 t.Errorf("Expected downvote_count = 1, got %d", updatedPost.DownvoteCount)
674 }
675 if updatedPost.Score != -1 {
676 t.Errorf("Expected score = -1, got %d", updatedPost.Score)
677 }
678
679 t.Logf("✅ TOGGLE DIFFERENT DIRECTION FLOW COMPLETE:")
680 t.Logf(" ✓ Upvote created (score: +1)")
681 t.Logf(" ✓ Changed to downvote (score: -1)")
682 t.Logf(" ✓ Post counts updated correctly")
683}
684
685// TestVoteE2E_DeleteVote tests explicit vote deletion
686func TestVoteE2E_DeleteVote(t *testing.T) {
687 if testing.Short() {
688 t.Skip("Skipping E2E test in short mode")
689 }
690
691 db := setupTestDB(t)
692 defer func() { _ = db.Close() }()
693
694 ctx := context.Background()
695 pdsURL := getTestPDSURL()
696
697 // Setup repositories and services
698 voteRepo := postgres.NewVoteRepository(db)
699 postRepo := postgres.NewPostRepository(db)
700
701 voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory())
702
703 // Create test user
704 testUserHandle := fmt.Sprintf("delete-%d.local.coves.dev", time.Now().Unix())
705 testUserEmail := fmt.Sprintf("delete-%d@test.local", time.Now().Unix())
706 testUserPassword := "test-password-123"
707
708 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
709 if err != nil {
710 t.Skipf("PDS not available: %v", err)
711 }
712
713 testUser := createTestUser(t, db, testUserHandle, userDID)
714
715 // Create test post
716 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "delete-community", "owner.test")
717 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
718 postCID := "bafypost999"
719
720 // Setup OAuth and HTTP server with real PDS access token
721 e2eAuth := NewE2EOAuthMiddleware()
722 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL)
723
724 r := chi.NewRouter()
725 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware)
726 httpServer := httptest.NewServer(r)
727 defer httpServer.Close()
728
729 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
730
731 // Create vote first
732 t.Logf("\n📝 Creating vote to delete...")
733 voteReq := map[string]interface{}{
734 "subject": map[string]interface{}{
735 "uri": postURI,
736 "cid": postCID,
737 },
738 "direction": "up",
739 }
740
741 reqBody, _ := json.Marshal(voteReq)
742 req, _ := http.NewRequest(http.MethodPost,
743 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
744 bytes.NewBuffer(reqBody))
745 req.Header.Set("Content-Type", "application/json")
746 req.Header.Set("Authorization", "Bearer "+token)
747
748 resp, err := http.DefaultClient.Do(req)
749 if err != nil {
750 t.Fatalf("Failed to create vote: %v", err)
751 }
752 var voteResp struct {
753 URI string `json:"uri"`
754 CID string `json:"cid"`
755 }
756 if decodeErr := json.NewDecoder(resp.Body).Decode(&voteResp); decodeErr != nil {
757 t.Fatalf("Failed to decode vote response: %v", decodeErr)
758 }
759 if closeErr := resp.Body.Close(); closeErr != nil {
760 t.Logf("Failed to close response body: %v", closeErr)
761 }
762
763 // Index vote
764 rkey := utils.ExtractRKeyFromURI(voteResp.URI)
765 voteEvent := jetstream.JetstreamEvent{
766 Did: userDID,
767 TimeUS: time.Now().UnixMicro(),
768 Kind: "commit",
769 Commit: &jetstream.CommitEvent{
770 Rev: "test-vote-create",
771 Operation: "create",
772 Collection: "social.coves.feed.vote",
773 RKey: rkey,
774 CID: voteResp.CID,
775 Record: map[string]interface{}{
776 "$type": "social.coves.feed.vote",
777 "subject": map[string]interface{}{
778 "uri": postURI,
779 "cid": postCID,
780 },
781 "direction": "up",
782 "createdAt": time.Now().Format(time.RFC3339),
783 },
784 },
785 }
786 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil {
787 t.Fatalf("Failed to handle vote event: %v", handleErr)
788 }
789
790 t.Logf("✅ Vote created and indexed")
791
792 // Delete vote via XRPC
793 t.Logf("\n📝 Deleting vote via XRPC...")
794 deleteReq := map[string]interface{}{
795 "subject": map[string]interface{}{
796 "uri": postURI,
797 "cid": postCID,
798 },
799 }
800
801 deleteBody, _ := json.Marshal(deleteReq)
802 deleteHttpReq, _ := http.NewRequest(http.MethodPost,
803 httpServer.URL+"/xrpc/social.coves.feed.vote.delete",
804 bytes.NewBuffer(deleteBody))
805 deleteHttpReq.Header.Set("Content-Type", "application/json")
806 deleteHttpReq.Header.Set("Authorization", "Bearer "+token)
807
808 deleteResp, err := http.DefaultClient.Do(deleteHttpReq)
809 if err != nil {
810 t.Fatalf("Failed to delete vote: %v", err)
811 }
812 defer func() {
813 if closeErr := deleteResp.Body.Close(); closeErr != nil {
814 t.Logf("Failed to close response body: %v", closeErr)
815 }
816 }()
817
818 if deleteResp.StatusCode != http.StatusOK {
819 body, _ := io.ReadAll(deleteResp.Body)
820 t.Fatalf("Delete failed: status %d, body: %s", deleteResp.StatusCode, string(body))
821 }
822
823 // Per lexicon, delete returns empty object {}
824 var deleteRespBody map[string]interface{}
825 if decodeErr := json.NewDecoder(deleteResp.Body).Decode(&deleteRespBody); decodeErr != nil {
826 t.Fatalf("Failed to decode delete response: %v", decodeErr)
827 }
828
829 if len(deleteRespBody) != 0 {
830 t.Errorf("Expected empty object per lexicon, got %v", deleteRespBody)
831 }
832
833 t.Logf("✅ Delete vote request succeeded")
834
835 // Simulate Jetstream DELETE event
836 t.Logf("\n🔄 Simulating Jetstream DELETE event...")
837 deleteEvent := jetstream.JetstreamEvent{
838 Did: userDID,
839 TimeUS: time.Now().UnixMicro(),
840 Kind: "commit",
841 Commit: &jetstream.CommitEvent{
842 Rev: "test-vote-delete",
843 Operation: "delete",
844 Collection: "social.coves.feed.vote",
845 RKey: rkey,
846 },
847 }
848 if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil {
849 t.Fatalf("Failed to handle delete event: %v", handleErr)
850 }
851
852 // Verify vote removed from AppView
853 t.Logf("\n🔍 Verifying vote removed from AppView...")
854 _, err = voteRepo.GetByURI(ctx, voteResp.URI)
855 if err == nil {
856 t.Error("Expected vote to be deleted, but it still exists")
857 }
858
859 // Verify post counts reset
860 updatedPost, _ := postRepo.GetByURI(ctx, postURI)
861 if updatedPost.UpvoteCount != 0 {
862 t.Errorf("Expected upvote_count = 0 after delete, got %d", updatedPost.UpvoteCount)
863 }
864 if updatedPost.Score != 0 {
865 t.Errorf("Expected score = 0 after delete, got %d", updatedPost.Score)
866 }
867
868 t.Logf("✅ EXPLICIT DELETE FLOW COMPLETE:")
869 t.Logf(" ✓ Vote created and indexed")
870 t.Logf(" ✓ Vote deleted via XRPC")
871 t.Logf(" ✓ Vote removed from AppView")
872 t.Logf(" ✓ Post counts updated correctly")
873}
874
875// TestVoteE2E_JetstreamIndexing tests real Jetstream firehose consumption
876func TestVoteE2E_JetstreamIndexing(t *testing.T) {
877 if testing.Short() {
878 t.Skip("Skipping E2E test in short mode")
879 }
880
881 db := setupTestDB(t)
882 defer func() { _ = db.Close() }()
883
884 ctx := context.Background()
885 pdsURL := getTestPDSURL()
886
887 // Setup repositories
888 voteRepo := postgres.NewVoteRepository(db)
889
890 // Create test user on PDS
891 testUserHandle := fmt.Sprintf("jetstream-%d.local.coves.dev", time.Now().Unix())
892 testUserEmail := fmt.Sprintf("jetstream-%d@test.local", time.Now().Unix())
893 testUserPassword := "test-password-123"
894
895 accessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
896 if err != nil {
897 t.Skipf("PDS not available: %v", err)
898 }
899
900 testUser := createTestUser(t, db, testUserHandle, userDID)
901
902 // Create test post
903 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "jetstream-community", "owner.test")
904 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
905 postCID := "bafypostjetstream"
906
907 // Write vote directly to PDS
908 t.Logf("\n📝 Writing vote to PDS...")
909 voteRecord := map[string]interface{}{
910 "$type": "social.coves.feed.vote",
911 "subject": map[string]interface{}{
912 "uri": postURI,
913 "cid": postCID,
914 },
915 "direction": "up",
916 "createdAt": time.Now().Format(time.RFC3339),
917 }
918
919 voteURI, voteCID, err := writePDSRecord(pdsURL, accessToken, userDID, "social.coves.feed.vote", "", voteRecord)
920 if err != nil {
921 t.Fatalf("Failed to write vote to PDS: %v", err)
922 }
923
924 t.Logf("✅ Vote written to PDS:")
925 t.Logf(" URI: %s", voteURI)
926 t.Logf(" CID: %s", voteCID)
927
928 // Setup Jetstream consumer
929 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
930
931 // Subscribe to Jetstream
932 t.Logf("\n🔄 Subscribing to real Jetstream firehose...")
933 pdsHostname := strings.TrimPrefix(pdsURL, "http://")
934 pdsHostname = strings.TrimPrefix(pdsHostname, "https://")
935 pdsHostname = strings.Split(pdsHostname, ":")[0]
936
937 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.feed.vote", pdsHostname)
938 t.Logf(" Jetstream URL: %s", jetstreamURL)
939 t.Logf(" Looking for vote DID: %s", userDID)
940
941 // Channels for event communication
942 eventChan := make(chan *jetstream.JetstreamEvent, 10)
943 errorChan := make(chan error, 1)
944 done := make(chan bool)
945
946 // Start Jetstream consumer in background
947 go func() {
948 err := subscribeToJetstreamForVote(ctx, jetstreamURL, userDID, voteConsumer, eventChan, errorChan, done)
949 if err != nil {
950 errorChan <- err
951 }
952 }()
953
954 // Wait for event or timeout
955 t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...")
956
957 select {
958 case event := <-eventChan:
959 t.Logf("✅ Received real Jetstream event!")
960 t.Logf(" Event DID: %s", event.Did)
961 t.Logf(" Collection: %s", event.Commit.Collection)
962 t.Logf(" Operation: %s", event.Commit.Operation)
963 t.Logf(" RKey: %s", event.Commit.RKey)
964
965 // Verify it's our vote
966 if event.Did != userDID {
967 t.Errorf("Expected DID %s, got %s", userDID, event.Did)
968 }
969
970 // Verify indexed in AppView database
971 t.Logf("\n🔍 Querying AppView database...")
972 indexedVote, err := voteRepo.GetByURI(ctx, voteURI)
973 if err != nil {
974 t.Fatalf("Vote not indexed in AppView: %v", err)
975 }
976
977 t.Logf("✅ Vote indexed in AppView:")
978 t.Logf(" VoterDID: %s", indexedVote.VoterDID)
979 t.Logf(" SubjectURI: %s", indexedVote.SubjectURI)
980 t.Logf(" Direction: %s", indexedVote.Direction)
981 t.Logf(" URI: %s", indexedVote.URI)
982
983 // Signal to stop Jetstream consumer
984 close(done)
985
986 case err := <-errorChan:
987 t.Fatalf("Jetstream error: %v", err)
988
989 case <-time.After(30 * time.Second):
990 t.Fatalf("Timeout: No Jetstream event received within 30 seconds")
991 }
992
993 t.Logf("\n✅ TRUE E2E JETSTREAM FLOW COMPLETE:")
994 t.Logf(" PDS → Jetstream → Consumer → AppView ✓")
995}
996
997// subscribeToJetstreamForVote subscribes to real Jetstream firehose for vote events
998func subscribeToJetstreamForVote(
999 ctx context.Context,
1000 jetstreamURL string,
1001 targetDID string,
1002 consumer *jetstream.VoteEventConsumer,
1003 eventChan chan<- *jetstream.JetstreamEvent,
1004 errorChan chan<- error,
1005 done <-chan bool,
1006) error {
1007 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil)
1008 if err != nil {
1009 return fmt.Errorf("failed to connect to Jetstream: %w", err)
1010 }
1011 defer func() { _ = conn.Close() }()
1012
1013 // Read messages until we find our event or receive done signal
1014 for {
1015 select {
1016 case <-done:
1017 return nil
1018 case <-ctx.Done():
1019 return ctx.Err()
1020 default:
1021 // Set read deadline to avoid blocking forever
1022 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
1023 return fmt.Errorf("failed to set read deadline: %w", err)
1024 }
1025
1026 var event jetstream.JetstreamEvent
1027 err := conn.ReadJSON(&event)
1028 if err != nil {
1029 // Check if it's a timeout (expected)
1030 if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
1031 return nil
1032 }
1033 if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
1034 continue // Timeout is expected, keep listening
1035 }
1036 return fmt.Errorf("failed to read Jetstream message: %w", err)
1037 }
1038
1039 // Check if this is the event we're looking for
1040 if event.Did == targetDID && event.Kind == "commit" && event.Commit.Collection == "social.coves.feed.vote" {
1041 // Process the event through the consumer
1042 if err := consumer.HandleEvent(ctx, &event); err != nil {
1043 return fmt.Errorf("failed to process event: %w", err)
1044 }
1045
1046 // Send to channel so test can verify
1047 select {
1048 case eventChan <- &event:
1049 return nil
1050 case <-time.After(1 * time.Second):
1051 return fmt.Errorf("timeout sending event to channel")
1052 }
1053 }
1054 }
1055 }
1056}