A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/api/routes"
5 "Coves/internal/atproto/jetstream"
6 "Coves/internal/atproto/utils"
7 "Coves/internal/core/votes"
8 "Coves/internal/db/postgres"
9 "bytes"
10 "context"
11 "database/sql"
12 "encoding/json"
13 "fmt"
14 "io"
15 "net"
16 "net/http"
17 "net/http/httptest"
18 "os"
19 "strings"
20 "testing"
21 "time"
22
23 "github.com/go-chi/chi/v5"
24 "github.com/gorilla/websocket"
25 _ "github.com/lib/pq"
26 "github.com/pressly/goose/v3"
27)
28
29// TestVoteE2E_CreateUpvote tests the full vote creation flow with a real local PDS
30// Flow: Client → XRPC → PDS Write → Jetstream → Consumer → AppView
31func TestVoteE2E_CreateUpvote(t *testing.T) {
32 // Skip in short mode since this requires real PDS
33 if testing.Short() {
34 t.Skip("Skipping E2E test in short mode")
35 }
36
37 // Setup test database
38 dbURL := os.Getenv("TEST_DATABASE_URL")
39 if dbURL == "" {
40 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
41 }
42
43 db, err := sql.Open("postgres", dbURL)
44 if err != nil {
45 t.Fatalf("Failed to connect to test database: %v", err)
46 }
47 defer func() {
48 if closeErr := db.Close(); closeErr != nil {
49 t.Logf("Failed to close database: %v", closeErr)
50 }
51 }()
52
53 // Run migrations
54 if dialectErr := goose.SetDialect("postgres"); dialectErr != nil {
55 t.Fatalf("Failed to set goose dialect: %v", dialectErr)
56 }
57 if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil {
58 t.Fatalf("Failed to run migrations: %v", migrateErr)
59 }
60
61 // Check if PDS is running
62 pdsURL := os.Getenv("PDS_URL")
63 if pdsURL == "" {
64 pdsURL = "http://localhost:3001"
65 }
66
67 healthResp, err := http.Get(pdsURL + "/xrpc/_health")
68 if err != nil {
69 t.Skipf("PDS not running at %s: %v", pdsURL, err)
70 }
71 func() {
72 if closeErr := healthResp.Body.Close(); closeErr != nil {
73 t.Logf("Failed to close health response: %v", closeErr)
74 }
75 }()
76
77 ctx := context.Background()
78
79 // Setup repositories
80 voteRepo := postgres.NewVoteRepository(db)
81 postRepo := postgres.NewPostRepository(db)
82
83 // Setup OAuth client and store for vote service
84 oauthStore := SetupOAuthTestStore(t, db)
85 oauthClient := SetupOAuthTestClient(t, oauthStore)
86
87 // Setup services
88 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
89
90 // Create test user on PDS
91 testUserHandle := fmt.Sprintf("voter-%d.local.coves.dev", time.Now().Unix())
92 testUserEmail := fmt.Sprintf("voter-%d@test.local", time.Now().Unix())
93 testUserPassword := "test-password-123"
94
95 t.Logf("Creating test user on PDS: %s", testUserHandle)
96 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
97 if err != nil {
98 t.Fatalf("Failed to create test user on PDS: %v", err)
99 }
100 t.Logf("Test user created: DID=%s", userDID)
101
102 // Index user in AppView
103 testUser := createTestUser(t, db, testUserHandle, userDID)
104
105 // Create test post to vote on
106 testCommunityDID, err := createFeedTestCommunity(db, ctx, "test-community", "owner.test")
107 if err != nil {
108 t.Fatalf("Failed to create test community: %v", err)
109 }
110
111 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
112 postCID := "bafypost123"
113
114 // Setup OAuth middleware with real PDS access token
115 e2eAuth := NewE2EOAuthMiddleware()
116 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL)
117
118 // Setup HTTP server with XRPC routes
119 r := chi.NewRouter()
120 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware)
121 httpServer := httptest.NewServer(r)
122 defer httpServer.Close()
123
124 // Setup Jetstream consumer
125 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
126
127 // ====================================================================================
128 // TEST: Create upvote on post
129 // ====================================================================================
130 t.Logf("\n📝 Creating upvote via XRPC endpoint...")
131
132 voteReq := map[string]interface{}{
133 "subject": map[string]interface{}{
134 "uri": postURI,
135 "cid": postCID,
136 },
137 "direction": "up",
138 }
139
140 reqBody, marshalErr := json.Marshal(voteReq)
141 if marshalErr != nil {
142 t.Fatalf("Failed to marshal request: %v", marshalErr)
143 }
144
145 req, err := http.NewRequest(http.MethodPost,
146 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
147 bytes.NewBuffer(reqBody))
148 if err != nil {
149 t.Fatalf("Failed to create request: %v", err)
150 }
151 req.Header.Set("Content-Type", "application/json")
152 req.Header.Set("Authorization", "Bearer "+token)
153
154 resp, err := http.DefaultClient.Do(req)
155 if err != nil {
156 t.Fatalf("Failed to POST vote: %v", err)
157 }
158 defer func() { _ = resp.Body.Close() }()
159
160 if resp.StatusCode != http.StatusOK {
161 body, readErr := io.ReadAll(resp.Body)
162 if readErr != nil {
163 t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr)
164 }
165 t.Logf("XRPC Vote Failed")
166 t.Logf(" Status: %d", resp.StatusCode)
167 t.Logf(" Response: %s", string(body))
168 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body))
169 }
170
171 var voteResp struct {
172 URI string `json:"uri"`
173 CID string `json:"cid"`
174 }
175
176 if decodeErr := json.NewDecoder(resp.Body).Decode(&voteResp); decodeErr != nil {
177 t.Fatalf("Failed to decode vote response: %v", decodeErr)
178 }
179
180 t.Logf("✅ XRPC response received:")
181 t.Logf(" URI: %s", voteResp.URI)
182 t.Logf(" CID: %s", voteResp.CID)
183
184 // Verify vote record was written to PDS
185 t.Logf("\n🔍 Verifying vote record on PDS...")
186 rkey := utils.ExtractRKeyFromURI(voteResp.URI)
187 collection := "social.coves.feed.vote"
188
189 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s",
190 pdsURL, userDID, collection, rkey))
191 if pdsErr != nil {
192 t.Fatalf("Failed to fetch vote record from PDS: %v", pdsErr)
193 }
194 defer func() {
195 if closeErr := pdsResp.Body.Close(); closeErr != nil {
196 t.Logf("Failed to close PDS response: %v", closeErr)
197 }
198 }()
199
200 if pdsResp.StatusCode != http.StatusOK {
201 body, _ := io.ReadAll(pdsResp.Body)
202 t.Fatalf("Vote record not found on PDS: status %d, body: %s", pdsResp.StatusCode, string(body))
203 }
204
205 var pdsRecord struct {
206 Value map[string]interface{} `json:"value"`
207 CID string `json:"cid"`
208 }
209 if decodeErr := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); decodeErr != nil {
210 t.Fatalf("Failed to decode PDS record: %v", decodeErr)
211 }
212
213 t.Logf("✅ Vote record found on PDS:")
214 t.Logf(" CID: %s", pdsRecord.CID)
215 t.Logf(" Direction: %v", pdsRecord.Value["direction"])
216
217 // Verify direction
218 if pdsRecord.Value["direction"] != "up" {
219 t.Errorf("Expected direction 'up', got %v", pdsRecord.Value["direction"])
220 }
221
222 // Simulate Jetstream consumer indexing the vote
223 t.Logf("\n🔄 Simulating Jetstream consumer indexing vote...")
224 voteEvent := jetstream.JetstreamEvent{
225 Did: userDID,
226 TimeUS: time.Now().UnixMicro(),
227 Kind: "commit",
228 Commit: &jetstream.CommitEvent{
229 Rev: "test-vote-rev",
230 Operation: "create",
231 Collection: "social.coves.feed.vote",
232 RKey: rkey,
233 CID: pdsRecord.CID,
234 Record: map[string]interface{}{
235 "$type": "social.coves.feed.vote",
236 "subject": map[string]interface{}{
237 "uri": postURI,
238 "cid": postCID,
239 },
240 "direction": "up",
241 "createdAt": time.Now().Format(time.RFC3339),
242 },
243 },
244 }
245
246 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil {
247 t.Fatalf("Failed to handle vote event: %v", handleErr)
248 }
249
250 // Verify vote was indexed in AppView
251 t.Logf("\n🔍 Verifying vote indexed in AppView...")
252 indexedVote, err := voteRepo.GetByURI(ctx, voteResp.URI)
253 if err != nil {
254 t.Fatalf("Vote not indexed in AppView: %v", err)
255 }
256
257 t.Logf("✅ Vote indexed in AppView:")
258 t.Logf(" VoterDID: %s", indexedVote.VoterDID)
259 t.Logf(" SubjectURI: %s", indexedVote.SubjectURI)
260 t.Logf(" Direction: %s", indexedVote.Direction)
261 t.Logf(" URI: %s", indexedVote.URI)
262
263 // Verify vote details
264 if indexedVote.VoterDID != userDID {
265 t.Errorf("Expected voter_did %s, got %s", userDID, indexedVote.VoterDID)
266 }
267 if indexedVote.SubjectURI != postURI {
268 t.Errorf("Expected subject_uri %s, got %s", postURI, indexedVote.SubjectURI)
269 }
270 if indexedVote.Direction != "up" {
271 t.Errorf("Expected direction 'up', got %s", indexedVote.Direction)
272 }
273
274 // Verify post counts updated
275 t.Logf("\n🔍 Verifying post vote counts updated...")
276 updatedPost, err := postRepo.GetByURI(ctx, postURI)
277 if err != nil {
278 t.Fatalf("Failed to get updated post: %v", err)
279 }
280
281 if updatedPost.UpvoteCount != 1 {
282 t.Errorf("Expected upvote_count = 1, got %d", updatedPost.UpvoteCount)
283 }
284 if updatedPost.Score != 1 {
285 t.Errorf("Expected score = 1, got %d", updatedPost.Score)
286 }
287
288 t.Logf("✅ TRUE E2E UPVOTE FLOW COMPLETE:")
289 t.Logf(" Client → XRPC → PDS Write → Jetstream → Consumer → AppView ✓")
290 t.Logf(" ✓ Vote written to PDS")
291 t.Logf(" ✓ Vote indexed in AppView")
292 t.Logf(" ✓ Post vote counts updated")
293}
294
295// TestVoteE2E_ToggleSameDirection tests voting twice in same direction (toggle off)
296func TestVoteE2E_ToggleSameDirection(t *testing.T) {
297 if testing.Short() {
298 t.Skip("Skipping E2E test in short mode")
299 }
300
301 db := setupTestDB(t)
302 defer func() { _ = db.Close() }()
303
304 ctx := context.Background()
305 pdsURL := getTestPDSURL()
306
307 // Setup repositories and services
308 voteRepo := postgres.NewVoteRepository(db)
309 postRepo := postgres.NewPostRepository(db)
310
311 oauthStore := SetupOAuthTestStore(t, db)
312 oauthClient := SetupOAuthTestClient(t, oauthStore)
313 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
314
315 // Create test user
316 testUserHandle := fmt.Sprintf("toggle-%d.local.coves.dev", time.Now().Unix())
317 testUserEmail := fmt.Sprintf("toggle-%d@test.local", time.Now().Unix())
318 testUserPassword := "test-password-123"
319
320 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
321 if err != nil {
322 t.Skipf("PDS not available: %v", err)
323 }
324
325 testUser := createTestUser(t, db, testUserHandle, userDID)
326
327 // Create test post
328 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "toggle-community", "owner.test")
329 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
330 postCID := "bafypost456"
331
332 // Setup OAuth and HTTP server with real PDS access token
333 e2eAuth := NewE2EOAuthMiddleware()
334 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL)
335
336 r := chi.NewRouter()
337 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware)
338 httpServer := httptest.NewServer(r)
339 defer httpServer.Close()
340
341 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
342
343 // First upvote
344 t.Logf("\n📝 Creating first upvote...")
345 voteReq := map[string]interface{}{
346 "subject": map[string]interface{}{
347 "uri": postURI,
348 "cid": postCID,
349 },
350 "direction": "up",
351 }
352
353 reqBody, _ := json.Marshal(voteReq)
354 req, _ := http.NewRequest(http.MethodPost,
355 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
356 bytes.NewBuffer(reqBody))
357 req.Header.Set("Content-Type", "application/json")
358 req.Header.Set("Authorization", "Bearer "+token)
359
360 resp, err := http.DefaultClient.Do(req)
361 if err != nil {
362 t.Fatalf("Failed to create first vote: %v", err)
363 }
364
365 var firstVoteResp struct {
366 URI string `json:"uri"`
367 CID string `json:"cid"`
368 }
369 if decodeErr := json.NewDecoder(resp.Body).Decode(&firstVoteResp); decodeErr != nil {
370 t.Fatalf("Failed to decode first vote response: %v", decodeErr)
371 }
372 if closeErr := resp.Body.Close(); closeErr != nil {
373 t.Logf("Failed to close response body: %v", closeErr)
374 }
375
376 t.Logf("✅ First vote created: %s", firstVoteResp.URI)
377
378 // Index first vote
379 rkey := utils.ExtractRKeyFromURI(firstVoteResp.URI)
380 voteEvent := jetstream.JetstreamEvent{
381 Did: userDID,
382 TimeUS: time.Now().UnixMicro(),
383 Kind: "commit",
384 Commit: &jetstream.CommitEvent{
385 Rev: "test-vote-rev-1",
386 Operation: "create",
387 Collection: "social.coves.feed.vote",
388 RKey: rkey,
389 CID: firstVoteResp.CID,
390 Record: map[string]interface{}{
391 "$type": "social.coves.feed.vote",
392 "subject": map[string]interface{}{
393 "uri": postURI,
394 "cid": postCID,
395 },
396 "direction": "up",
397 "createdAt": time.Now().Format(time.RFC3339),
398 },
399 },
400 }
401 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil {
402 t.Fatalf("Failed to handle first vote event: %v", handleErr)
403 }
404
405 // Second upvote (same direction) - should toggle off (delete)
406 t.Logf("\n📝 Creating second upvote (toggle off)...")
407 req2, _ := http.NewRequest(http.MethodPost,
408 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
409 bytes.NewBuffer(reqBody))
410 req2.Header.Set("Content-Type", "application/json")
411 req2.Header.Set("Authorization", "Bearer "+token)
412
413 resp2, err := http.DefaultClient.Do(req2)
414 if err != nil {
415 t.Fatalf("Failed to toggle vote: %v", err)
416 }
417 defer func() {
418 if closeErr := resp2.Body.Close(); closeErr != nil {
419 t.Logf("Failed to close response body: %v", closeErr)
420 }
421 }()
422
423 if resp2.StatusCode != http.StatusOK {
424 body, _ := io.ReadAll(resp2.Body)
425 t.Fatalf("Expected 200, got %d: %s", resp2.StatusCode, string(body))
426 }
427
428 t.Logf("✅ Second vote request completed (toggle)")
429
430 // Simulate Jetstream DELETE event
431 t.Logf("\n🔄 Simulating Jetstream DELETE event...")
432 deleteEvent := jetstream.JetstreamEvent{
433 Did: userDID,
434 TimeUS: time.Now().UnixMicro(),
435 Kind: "commit",
436 Commit: &jetstream.CommitEvent{
437 Rev: "test-vote-rev-2",
438 Operation: "delete",
439 Collection: "social.coves.feed.vote",
440 RKey: rkey,
441 },
442 }
443 if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil {
444 t.Fatalf("Failed to handle delete event: %v", handleErr)
445 }
446
447 // Verify vote was removed from AppView
448 t.Logf("\n🔍 Verifying vote removed from AppView...")
449 _, err = voteRepo.GetByURI(ctx, firstVoteResp.URI)
450 if err == nil {
451 t.Error("Expected vote to be deleted, but it still exists")
452 }
453
454 // Verify post counts reset
455 updatedPost, _ := postRepo.GetByURI(ctx, postURI)
456 if updatedPost.UpvoteCount != 0 {
457 t.Errorf("Expected upvote_count = 0 after toggle, got %d", updatedPost.UpvoteCount)
458 }
459
460 t.Logf("✅ TOGGLE SAME DIRECTION FLOW COMPLETE:")
461 t.Logf(" ✓ First vote created and indexed")
462 t.Logf(" ✓ Second vote toggled off (deleted)")
463 t.Logf(" ✓ Post counts updated correctly")
464}
465
466// TestVoteE2E_ToggleDifferentDirection tests changing vote direction
467func TestVoteE2E_ToggleDifferentDirection(t *testing.T) {
468 if testing.Short() {
469 t.Skip("Skipping E2E test in short mode")
470 }
471
472 db := setupTestDB(t)
473 defer func() { _ = db.Close() }()
474
475 ctx := context.Background()
476 pdsURL := getTestPDSURL()
477
478 // Setup repositories and services
479 voteRepo := postgres.NewVoteRepository(db)
480 postRepo := postgres.NewPostRepository(db)
481
482 oauthStore := SetupOAuthTestStore(t, db)
483 oauthClient := SetupOAuthTestClient(t, oauthStore)
484 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
485
486 // Create test user
487 testUserHandle := fmt.Sprintf("flip-%d.local.coves.dev", time.Now().Unix())
488 testUserEmail := fmt.Sprintf("flip-%d@test.local", time.Now().Unix())
489 testUserPassword := "test-password-123"
490
491 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
492 if err != nil {
493 t.Skipf("PDS not available: %v", err)
494 }
495
496 testUser := createTestUser(t, db, testUserHandle, userDID)
497
498 // Create test post
499 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "flip-community", "owner.test")
500 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
501 postCID := "bafypost789"
502
503 // Setup OAuth and HTTP server with real PDS access token
504 e2eAuth := NewE2EOAuthMiddleware()
505 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL)
506
507 r := chi.NewRouter()
508 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware)
509 httpServer := httptest.NewServer(r)
510 defer httpServer.Close()
511
512 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
513
514 // Create upvote
515 t.Logf("\n📝 Creating upvote...")
516 upvoteReq := map[string]interface{}{
517 "subject": map[string]interface{}{
518 "uri": postURI,
519 "cid": postCID,
520 },
521 "direction": "up",
522 }
523
524 reqBody, _ := json.Marshal(upvoteReq)
525 req, _ := http.NewRequest(http.MethodPost,
526 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
527 bytes.NewBuffer(reqBody))
528 req.Header.Set("Content-Type", "application/json")
529 req.Header.Set("Authorization", "Bearer "+token)
530
531 resp, err := http.DefaultClient.Do(req)
532 if err != nil {
533 t.Fatalf("Failed to create upvote: %v", err)
534 }
535 var upvoteResp struct {
536 URI string `json:"uri"`
537 CID string `json:"cid"`
538 }
539 if decodeErr := json.NewDecoder(resp.Body).Decode(&upvoteResp); decodeErr != nil {
540 t.Fatalf("Failed to decode upvote response: %v", decodeErr)
541 }
542 if closeErr := resp.Body.Close(); closeErr != nil {
543 t.Logf("Failed to close response body: %v", closeErr)
544 }
545
546 // Index upvote
547 rkey := utils.ExtractRKeyFromURI(upvoteResp.URI)
548 upvoteEvent := jetstream.JetstreamEvent{
549 Did: userDID,
550 TimeUS: time.Now().UnixMicro(),
551 Kind: "commit",
552 Commit: &jetstream.CommitEvent{
553 Rev: "test-vote-rev-up",
554 Operation: "create",
555 Collection: "social.coves.feed.vote",
556 RKey: rkey,
557 CID: upvoteResp.CID,
558 Record: map[string]interface{}{
559 "$type": "social.coves.feed.vote",
560 "subject": map[string]interface{}{
561 "uri": postURI,
562 "cid": postCID,
563 },
564 "direction": "up",
565 "createdAt": time.Now().Format(time.RFC3339),
566 },
567 },
568 }
569 if handleErr := voteConsumer.HandleEvent(ctx, &upvoteEvent); handleErr != nil {
570 t.Fatalf("Failed to handle upvote event: %v", handleErr)
571 }
572
573 t.Logf("✅ Upvote created and indexed")
574
575 // Change to downvote
576 t.Logf("\n📝 Changing to downvote...")
577 downvoteReq := map[string]interface{}{
578 "subject": map[string]interface{}{
579 "uri": postURI,
580 "cid": postCID,
581 },
582 "direction": "down",
583 }
584
585 reqBody2, _ := json.Marshal(downvoteReq)
586 req2, _ := http.NewRequest(http.MethodPost,
587 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
588 bytes.NewBuffer(reqBody2))
589 req2.Header.Set("Content-Type", "application/json")
590 req2.Header.Set("Authorization", "Bearer "+token)
591
592 resp2, err := http.DefaultClient.Do(req2)
593 if err != nil {
594 t.Fatalf("Failed to create downvote: %v", err)
595 }
596 var downvoteResp struct {
597 URI string `json:"uri"`
598 CID string `json:"cid"`
599 }
600 if decodeErr := json.NewDecoder(resp2.Body).Decode(&downvoteResp); decodeErr != nil {
601 t.Fatalf("Failed to decode downvote response: %v", decodeErr)
602 }
603 if closeErr := resp2.Body.Close(); closeErr != nil {
604 t.Logf("Failed to close response body: %v", closeErr)
605 }
606
607 // Simulate Jetstream UPDATE event (PDS updates the existing record)
608 t.Logf("\n🔄 Simulating Jetstream UPDATE event...")
609 updateEvent := jetstream.JetstreamEvent{
610 Did: userDID,
611 TimeUS: time.Now().UnixMicro(),
612 Kind: "commit",
613 Commit: &jetstream.CommitEvent{
614 Rev: "test-vote-rev-down",
615 Operation: "update",
616 Collection: "social.coves.feed.vote",
617 RKey: rkey, // Same rkey as before
618 CID: downvoteResp.CID,
619 Record: map[string]interface{}{
620 "$type": "social.coves.feed.vote",
621 "subject": map[string]interface{}{
622 "uri": postURI,
623 "cid": postCID,
624 },
625 "direction": "down", // Changed direction
626 "createdAt": time.Now().Format(time.RFC3339),
627 },
628 },
629 }
630 if handleErr := voteConsumer.HandleEvent(ctx, &updateEvent); handleErr != nil {
631 t.Fatalf("Failed to handle update event: %v", handleErr)
632 }
633
634 // Verify vote direction changed in AppView
635 t.Logf("\n🔍 Verifying vote direction changed in AppView...")
636 updatedVote, err := voteRepo.GetByURI(ctx, upvoteResp.URI)
637 if err != nil {
638 t.Fatalf("Vote not found after update: %v", err)
639 }
640
641 if updatedVote.Direction != "down" {
642 t.Errorf("Expected direction 'down', got %s", updatedVote.Direction)
643 }
644
645 // Verify post counts updated
646 updatedPost, _ := postRepo.GetByURI(ctx, postURI)
647 if updatedPost.UpvoteCount != 0 {
648 t.Errorf("Expected upvote_count = 0, got %d", updatedPost.UpvoteCount)
649 }
650 if updatedPost.DownvoteCount != 1 {
651 t.Errorf("Expected downvote_count = 1, got %d", updatedPost.DownvoteCount)
652 }
653 if updatedPost.Score != -1 {
654 t.Errorf("Expected score = -1, got %d", updatedPost.Score)
655 }
656
657 t.Logf("✅ TOGGLE DIFFERENT DIRECTION FLOW COMPLETE:")
658 t.Logf(" ✓ Upvote created (score: +1)")
659 t.Logf(" ✓ Changed to downvote (score: -1)")
660 t.Logf(" ✓ Post counts updated correctly")
661}
662
663// TestVoteE2E_DeleteVote tests explicit vote deletion
664func TestVoteE2E_DeleteVote(t *testing.T) {
665 if testing.Short() {
666 t.Skip("Skipping E2E test in short mode")
667 }
668
669 db := setupTestDB(t)
670 defer func() { _ = db.Close() }()
671
672 ctx := context.Background()
673 pdsURL := getTestPDSURL()
674
675 // Setup repositories and services
676 voteRepo := postgres.NewVoteRepository(db)
677 postRepo := postgres.NewPostRepository(db)
678
679 oauthStore := SetupOAuthTestStore(t, db)
680 oauthClient := SetupOAuthTestClient(t, oauthStore)
681 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
682
683 // Create test user
684 testUserHandle := fmt.Sprintf("delete-%d.local.coves.dev", time.Now().Unix())
685 testUserEmail := fmt.Sprintf("delete-%d@test.local", time.Now().Unix())
686 testUserPassword := "test-password-123"
687
688 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
689 if err != nil {
690 t.Skipf("PDS not available: %v", err)
691 }
692
693 testUser := createTestUser(t, db, testUserHandle, userDID)
694
695 // Create test post
696 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "delete-community", "owner.test")
697 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
698 postCID := "bafypost999"
699
700 // Setup OAuth and HTTP server with real PDS access token
701 e2eAuth := NewE2EOAuthMiddleware()
702 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL)
703
704 r := chi.NewRouter()
705 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware)
706 httpServer := httptest.NewServer(r)
707 defer httpServer.Close()
708
709 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
710
711 // Create vote first
712 t.Logf("\n📝 Creating vote to delete...")
713 voteReq := map[string]interface{}{
714 "subject": map[string]interface{}{
715 "uri": postURI,
716 "cid": postCID,
717 },
718 "direction": "up",
719 }
720
721 reqBody, _ := json.Marshal(voteReq)
722 req, _ := http.NewRequest(http.MethodPost,
723 httpServer.URL+"/xrpc/social.coves.feed.vote.create",
724 bytes.NewBuffer(reqBody))
725 req.Header.Set("Content-Type", "application/json")
726 req.Header.Set("Authorization", "Bearer "+token)
727
728 resp, err := http.DefaultClient.Do(req)
729 if err != nil {
730 t.Fatalf("Failed to create vote: %v", err)
731 }
732 var voteResp struct {
733 URI string `json:"uri"`
734 CID string `json:"cid"`
735 }
736 if decodeErr := json.NewDecoder(resp.Body).Decode(&voteResp); decodeErr != nil {
737 t.Fatalf("Failed to decode vote response: %v", decodeErr)
738 }
739 if closeErr := resp.Body.Close(); closeErr != nil {
740 t.Logf("Failed to close response body: %v", closeErr)
741 }
742
743 // Index vote
744 rkey := utils.ExtractRKeyFromURI(voteResp.URI)
745 voteEvent := jetstream.JetstreamEvent{
746 Did: userDID,
747 TimeUS: time.Now().UnixMicro(),
748 Kind: "commit",
749 Commit: &jetstream.CommitEvent{
750 Rev: "test-vote-create",
751 Operation: "create",
752 Collection: "social.coves.feed.vote",
753 RKey: rkey,
754 CID: voteResp.CID,
755 Record: map[string]interface{}{
756 "$type": "social.coves.feed.vote",
757 "subject": map[string]interface{}{
758 "uri": postURI,
759 "cid": postCID,
760 },
761 "direction": "up",
762 "createdAt": time.Now().Format(time.RFC3339),
763 },
764 },
765 }
766 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil {
767 t.Fatalf("Failed to handle vote event: %v", handleErr)
768 }
769
770 t.Logf("✅ Vote created and indexed")
771
772 // Delete vote via XRPC
773 t.Logf("\n📝 Deleting vote via XRPC...")
774 deleteReq := map[string]interface{}{
775 "subject": map[string]interface{}{
776 "uri": postURI,
777 "cid": postCID,
778 },
779 }
780
781 deleteBody, _ := json.Marshal(deleteReq)
782 deleteHttpReq, _ := http.NewRequest(http.MethodPost,
783 httpServer.URL+"/xrpc/social.coves.feed.vote.delete",
784 bytes.NewBuffer(deleteBody))
785 deleteHttpReq.Header.Set("Content-Type", "application/json")
786 deleteHttpReq.Header.Set("Authorization", "Bearer "+token)
787
788 deleteResp, err := http.DefaultClient.Do(deleteHttpReq)
789 if err != nil {
790 t.Fatalf("Failed to delete vote: %v", err)
791 }
792 defer func() {
793 if closeErr := deleteResp.Body.Close(); closeErr != nil {
794 t.Logf("Failed to close response body: %v", closeErr)
795 }
796 }()
797
798 if deleteResp.StatusCode != http.StatusOK {
799 body, _ := io.ReadAll(deleteResp.Body)
800 t.Fatalf("Delete failed: status %d, body: %s", deleteResp.StatusCode, string(body))
801 }
802
803 // Per lexicon, delete returns empty object {}
804 var deleteRespBody map[string]interface{}
805 if decodeErr := json.NewDecoder(deleteResp.Body).Decode(&deleteRespBody); decodeErr != nil {
806 t.Fatalf("Failed to decode delete response: %v", decodeErr)
807 }
808
809 if len(deleteRespBody) != 0 {
810 t.Errorf("Expected empty object per lexicon, got %v", deleteRespBody)
811 }
812
813 t.Logf("✅ Delete vote request succeeded")
814
815 // Simulate Jetstream DELETE event
816 t.Logf("\n🔄 Simulating Jetstream DELETE event...")
817 deleteEvent := jetstream.JetstreamEvent{
818 Did: userDID,
819 TimeUS: time.Now().UnixMicro(),
820 Kind: "commit",
821 Commit: &jetstream.CommitEvent{
822 Rev: "test-vote-delete",
823 Operation: "delete",
824 Collection: "social.coves.feed.vote",
825 RKey: rkey,
826 },
827 }
828 if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil {
829 t.Fatalf("Failed to handle delete event: %v", handleErr)
830 }
831
832 // Verify vote removed from AppView
833 t.Logf("\n🔍 Verifying vote removed from AppView...")
834 _, err = voteRepo.GetByURI(ctx, voteResp.URI)
835 if err == nil {
836 t.Error("Expected vote to be deleted, but it still exists")
837 }
838
839 // Verify post counts reset
840 updatedPost, _ := postRepo.GetByURI(ctx, postURI)
841 if updatedPost.UpvoteCount != 0 {
842 t.Errorf("Expected upvote_count = 0 after delete, got %d", updatedPost.UpvoteCount)
843 }
844 if updatedPost.Score != 0 {
845 t.Errorf("Expected score = 0 after delete, got %d", updatedPost.Score)
846 }
847
848 t.Logf("✅ EXPLICIT DELETE FLOW COMPLETE:")
849 t.Logf(" ✓ Vote created and indexed")
850 t.Logf(" ✓ Vote deleted via XRPC")
851 t.Logf(" ✓ Vote removed from AppView")
852 t.Logf(" ✓ Post counts updated correctly")
853}
854
855// TestVoteE2E_JetstreamIndexing tests real Jetstream firehose consumption
856func TestVoteE2E_JetstreamIndexing(t *testing.T) {
857 if testing.Short() {
858 t.Skip("Skipping E2E test in short mode")
859 }
860
861 db := setupTestDB(t)
862 defer func() { _ = db.Close() }()
863
864 ctx := context.Background()
865 pdsURL := getTestPDSURL()
866
867 // Setup repositories
868 voteRepo := postgres.NewVoteRepository(db)
869
870 // Create test user on PDS
871 testUserHandle := fmt.Sprintf("jetstream-%d.local.coves.dev", time.Now().Unix())
872 testUserEmail := fmt.Sprintf("jetstream-%d@test.local", time.Now().Unix())
873 testUserPassword := "test-password-123"
874
875 accessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
876 if err != nil {
877 t.Skipf("PDS not available: %v", err)
878 }
879
880 testUser := createTestUser(t, db, testUserHandle, userDID)
881
882 // Create test post
883 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "jetstream-community", "owner.test")
884 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
885 postCID := "bafypostjetstream"
886
887 // Write vote directly to PDS
888 t.Logf("\n📝 Writing vote to PDS...")
889 voteRecord := map[string]interface{}{
890 "$type": "social.coves.feed.vote",
891 "subject": map[string]interface{}{
892 "uri": postURI,
893 "cid": postCID,
894 },
895 "direction": "up",
896 "createdAt": time.Now().Format(time.RFC3339),
897 }
898
899 voteURI, voteCID, err := writePDSRecord(pdsURL, accessToken, userDID, "social.coves.feed.vote", "", voteRecord)
900 if err != nil {
901 t.Fatalf("Failed to write vote to PDS: %v", err)
902 }
903
904 t.Logf("✅ Vote written to PDS:")
905 t.Logf(" URI: %s", voteURI)
906 t.Logf(" CID: %s", voteCID)
907
908 // Setup Jetstream consumer
909 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db)
910
911 // Subscribe to Jetstream
912 t.Logf("\n🔄 Subscribing to real Jetstream firehose...")
913 pdsHostname := strings.TrimPrefix(pdsURL, "http://")
914 pdsHostname = strings.TrimPrefix(pdsHostname, "https://")
915 pdsHostname = strings.Split(pdsHostname, ":")[0]
916
917 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.feed.vote", pdsHostname)
918 t.Logf(" Jetstream URL: %s", jetstreamURL)
919 t.Logf(" Looking for vote DID: %s", userDID)
920
921 // Channels for event communication
922 eventChan := make(chan *jetstream.JetstreamEvent, 10)
923 errorChan := make(chan error, 1)
924 done := make(chan bool)
925
926 // Start Jetstream consumer in background
927 go func() {
928 err := subscribeToJetstreamForVote(ctx, jetstreamURL, userDID, voteConsumer, eventChan, errorChan, done)
929 if err != nil {
930 errorChan <- err
931 }
932 }()
933
934 // Wait for event or timeout
935 t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...")
936
937 select {
938 case event := <-eventChan:
939 t.Logf("✅ Received real Jetstream event!")
940 t.Logf(" Event DID: %s", event.Did)
941 t.Logf(" Collection: %s", event.Commit.Collection)
942 t.Logf(" Operation: %s", event.Commit.Operation)
943 t.Logf(" RKey: %s", event.Commit.RKey)
944
945 // Verify it's our vote
946 if event.Did != userDID {
947 t.Errorf("Expected DID %s, got %s", userDID, event.Did)
948 }
949
950 // Verify indexed in AppView database
951 t.Logf("\n🔍 Querying AppView database...")
952 indexedVote, err := voteRepo.GetByURI(ctx, voteURI)
953 if err != nil {
954 t.Fatalf("Vote not indexed in AppView: %v", err)
955 }
956
957 t.Logf("✅ Vote indexed in AppView:")
958 t.Logf(" VoterDID: %s", indexedVote.VoterDID)
959 t.Logf(" SubjectURI: %s", indexedVote.SubjectURI)
960 t.Logf(" Direction: %s", indexedVote.Direction)
961 t.Logf(" URI: %s", indexedVote.URI)
962
963 // Signal to stop Jetstream consumer
964 close(done)
965
966 case err := <-errorChan:
967 t.Fatalf("Jetstream error: %v", err)
968
969 case <-time.After(30 * time.Second):
970 t.Fatalf("Timeout: No Jetstream event received within 30 seconds")
971 }
972
973 t.Logf("\n✅ TRUE E2E JETSTREAM FLOW COMPLETE:")
974 t.Logf(" PDS → Jetstream → Consumer → AppView ✓")
975}
976
977// subscribeToJetstreamForVote subscribes to real Jetstream firehose for vote events
978func subscribeToJetstreamForVote(
979 ctx context.Context,
980 jetstreamURL string,
981 targetDID string,
982 consumer *jetstream.VoteEventConsumer,
983 eventChan chan<- *jetstream.JetstreamEvent,
984 errorChan chan<- error,
985 done <-chan bool,
986) error {
987 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil)
988 if err != nil {
989 return fmt.Errorf("failed to connect to Jetstream: %w", err)
990 }
991 defer func() { _ = conn.Close() }()
992
993 // Read messages until we find our event or receive done signal
994 for {
995 select {
996 case <-done:
997 return nil
998 case <-ctx.Done():
999 return ctx.Err()
1000 default:
1001 // Set read deadline to avoid blocking forever
1002 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
1003 return fmt.Errorf("failed to set read deadline: %w", err)
1004 }
1005
1006 var event jetstream.JetstreamEvent
1007 err := conn.ReadJSON(&event)
1008 if err != nil {
1009 // Check if it's a timeout (expected)
1010 if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
1011 return nil
1012 }
1013 if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
1014 continue // Timeout is expected, keep listening
1015 }
1016 return fmt.Errorf("failed to read Jetstream message: %w", err)
1017 }
1018
1019 // Check if this is the event we're looking for
1020 if event.Did == targetDID && event.Kind == "commit" && event.Commit.Collection == "social.coves.feed.vote" {
1021 // Process the event through the consumer
1022 if err := consumer.HandleEvent(ctx, &event); err != nil {
1023 return fmt.Errorf("failed to process event: %w", err)
1024 }
1025
1026 // Send to channel so test can verify
1027 select {
1028 case eventChan <- &event:
1029 return nil
1030 case <-time.After(1 * time.Second):
1031 return fmt.Errorf("timeout sending event to channel")
1032 }
1033 }
1034 }
1035 }
1036}