A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/api/routes" 5 "Coves/internal/atproto/jetstream" 6 "Coves/internal/atproto/utils" 7 "Coves/internal/core/votes" 8 "Coves/internal/db/postgres" 9 "bytes" 10 "context" 11 "database/sql" 12 "encoding/json" 13 "fmt" 14 "io" 15 "net" 16 "net/http" 17 "net/http/httptest" 18 "os" 19 "strings" 20 "testing" 21 "time" 22 23 "github.com/go-chi/chi/v5" 24 "github.com/gorilla/websocket" 25 _ "github.com/lib/pq" 26 "github.com/pressly/goose/v3" 27) 28 29// TestVoteE2E_CreateUpvote tests the full vote creation flow with a real local PDS 30// Flow: Client → XRPC → PDS Write → Jetstream → Consumer → AppView 31func TestVoteE2E_CreateUpvote(t *testing.T) { 32 // Skip in short mode since this requires real PDS 33 if testing.Short() { 34 t.Skip("Skipping E2E test in short mode") 35 } 36 37 // Setup test database 38 dbURL := os.Getenv("TEST_DATABASE_URL") 39 if dbURL == "" { 40 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 41 } 42 43 db, err := sql.Open("postgres", dbURL) 44 if err != nil { 45 t.Fatalf("Failed to connect to test database: %v", err) 46 } 47 defer func() { 48 if closeErr := db.Close(); closeErr != nil { 49 t.Logf("Failed to close database: %v", closeErr) 50 } 51 }() 52 53 // Run migrations 54 if dialectErr := goose.SetDialect("postgres"); dialectErr != nil { 55 t.Fatalf("Failed to set goose dialect: %v", dialectErr) 56 } 57 if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil { 58 t.Fatalf("Failed to run migrations: %v", migrateErr) 59 } 60 61 // Check if PDS is running 62 pdsURL := os.Getenv("PDS_URL") 63 if pdsURL == "" { 64 pdsURL = "http://localhost:3001" 65 } 66 67 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 68 if err != nil { 69 t.Skipf("PDS not running at %s: %v", pdsURL, err) 70 } 71 func() { 72 if closeErr := healthResp.Body.Close(); closeErr != nil { 73 t.Logf("Failed to close health response: %v", closeErr) 74 } 75 }() 76 77 ctx := context.Background() 78 79 // Setup repositories 80 voteRepo := postgres.NewVoteRepository(db) 81 postRepo := postgres.NewPostRepository(db) 82 83 // Setup OAuth client and store for vote service 84 oauthStore := SetupOAuthTestStore(t, db) 85 oauthClient := SetupOAuthTestClient(t, oauthStore) 86 87 // Setup services 88 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil) 89 90 // Create test user on PDS 91 testUserHandle := fmt.Sprintf("voter-%d.local.coves.dev", time.Now().Unix()) 92 testUserEmail := fmt.Sprintf("voter-%d@test.local", time.Now().Unix()) 93 testUserPassword := "test-password-123" 94 95 t.Logf("Creating test user on PDS: %s", testUserHandle) 96 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 97 if err != nil { 98 t.Fatalf("Failed to create test user on PDS: %v", err) 99 } 100 t.Logf("Test user created: DID=%s", userDID) 101 102 // Index user in AppView 103 testUser := createTestUser(t, db, testUserHandle, userDID) 104 105 // Create test post to vote on 106 testCommunityDID, err := createFeedTestCommunity(db, ctx, "test-community", "owner.test") 107 if err != nil { 108 t.Fatalf("Failed to create test community: %v", err) 109 } 110 111 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 112 postCID := "bafypost123" 113 114 // Setup OAuth middleware with real PDS access token 115 e2eAuth := NewE2EOAuthMiddleware() 116 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL) 117 118 // Setup HTTP server with XRPC routes 119 r := chi.NewRouter() 120 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware) 121 httpServer := httptest.NewServer(r) 122 defer httpServer.Close() 123 124 // Setup Jetstream consumer 125 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db) 126 127 // ==================================================================================== 128 // TEST: Create upvote on post 129 // ==================================================================================== 130 t.Logf("\n📝 Creating upvote via XRPC endpoint...") 131 132 voteReq := map[string]interface{}{ 133 "subject": map[string]interface{}{ 134 "uri": postURI, 135 "cid": postCID, 136 }, 137 "direction": "up", 138 } 139 140 reqBody, marshalErr := json.Marshal(voteReq) 141 if marshalErr != nil { 142 t.Fatalf("Failed to marshal request: %v", marshalErr) 143 } 144 145 req, err := http.NewRequest(http.MethodPost, 146 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 147 bytes.NewBuffer(reqBody)) 148 if err != nil { 149 t.Fatalf("Failed to create request: %v", err) 150 } 151 req.Header.Set("Content-Type", "application/json") 152 req.Header.Set("Authorization", "Bearer "+token) 153 154 resp, err := http.DefaultClient.Do(req) 155 if err != nil { 156 t.Fatalf("Failed to POST vote: %v", err) 157 } 158 defer func() { _ = resp.Body.Close() }() 159 160 if resp.StatusCode != http.StatusOK { 161 body, readErr := io.ReadAll(resp.Body) 162 if readErr != nil { 163 t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr) 164 } 165 t.Logf("XRPC Vote Failed") 166 t.Logf(" Status: %d", resp.StatusCode) 167 t.Logf(" Response: %s", string(body)) 168 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 169 } 170 171 var voteResp struct { 172 URI string `json:"uri"` 173 CID string `json:"cid"` 174 } 175 176 if decodeErr := json.NewDecoder(resp.Body).Decode(&voteResp); decodeErr != nil { 177 t.Fatalf("Failed to decode vote response: %v", decodeErr) 178 } 179 180 t.Logf("✅ XRPC response received:") 181 t.Logf(" URI: %s", voteResp.URI) 182 t.Logf(" CID: %s", voteResp.CID) 183 184 // Verify vote record was written to PDS 185 t.Logf("\n🔍 Verifying vote record on PDS...") 186 rkey := utils.ExtractRKeyFromURI(voteResp.URI) 187 collection := "social.coves.feed.vote" 188 189 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 190 pdsURL, userDID, collection, rkey)) 191 if pdsErr != nil { 192 t.Fatalf("Failed to fetch vote record from PDS: %v", pdsErr) 193 } 194 defer func() { 195 if closeErr := pdsResp.Body.Close(); closeErr != nil { 196 t.Logf("Failed to close PDS response: %v", closeErr) 197 } 198 }() 199 200 if pdsResp.StatusCode != http.StatusOK { 201 body, _ := io.ReadAll(pdsResp.Body) 202 t.Fatalf("Vote record not found on PDS: status %d, body: %s", pdsResp.StatusCode, string(body)) 203 } 204 205 var pdsRecord struct { 206 Value map[string]interface{} `json:"value"` 207 CID string `json:"cid"` 208 } 209 if decodeErr := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); decodeErr != nil { 210 t.Fatalf("Failed to decode PDS record: %v", decodeErr) 211 } 212 213 t.Logf("✅ Vote record found on PDS:") 214 t.Logf(" CID: %s", pdsRecord.CID) 215 t.Logf(" Direction: %v", pdsRecord.Value["direction"]) 216 217 // Verify direction 218 if pdsRecord.Value["direction"] != "up" { 219 t.Errorf("Expected direction 'up', got %v", pdsRecord.Value["direction"]) 220 } 221 222 // Simulate Jetstream consumer indexing the vote 223 t.Logf("\n🔄 Simulating Jetstream consumer indexing vote...") 224 voteEvent := jetstream.JetstreamEvent{ 225 Did: userDID, 226 TimeUS: time.Now().UnixMicro(), 227 Kind: "commit", 228 Commit: &jetstream.CommitEvent{ 229 Rev: "test-vote-rev", 230 Operation: "create", 231 Collection: "social.coves.feed.vote", 232 RKey: rkey, 233 CID: pdsRecord.CID, 234 Record: map[string]interface{}{ 235 "$type": "social.coves.feed.vote", 236 "subject": map[string]interface{}{ 237 "uri": postURI, 238 "cid": postCID, 239 }, 240 "direction": "up", 241 "createdAt": time.Now().Format(time.RFC3339), 242 }, 243 }, 244 } 245 246 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil { 247 t.Fatalf("Failed to handle vote event: %v", handleErr) 248 } 249 250 // Verify vote was indexed in AppView 251 t.Logf("\n🔍 Verifying vote indexed in AppView...") 252 indexedVote, err := voteRepo.GetByURI(ctx, voteResp.URI) 253 if err != nil { 254 t.Fatalf("Vote not indexed in AppView: %v", err) 255 } 256 257 t.Logf("✅ Vote indexed in AppView:") 258 t.Logf(" VoterDID: %s", indexedVote.VoterDID) 259 t.Logf(" SubjectURI: %s", indexedVote.SubjectURI) 260 t.Logf(" Direction: %s", indexedVote.Direction) 261 t.Logf(" URI: %s", indexedVote.URI) 262 263 // Verify vote details 264 if indexedVote.VoterDID != userDID { 265 t.Errorf("Expected voter_did %s, got %s", userDID, indexedVote.VoterDID) 266 } 267 if indexedVote.SubjectURI != postURI { 268 t.Errorf("Expected subject_uri %s, got %s", postURI, indexedVote.SubjectURI) 269 } 270 if indexedVote.Direction != "up" { 271 t.Errorf("Expected direction 'up', got %s", indexedVote.Direction) 272 } 273 274 // Verify post counts updated 275 t.Logf("\n🔍 Verifying post vote counts updated...") 276 updatedPost, err := postRepo.GetByURI(ctx, postURI) 277 if err != nil { 278 t.Fatalf("Failed to get updated post: %v", err) 279 } 280 281 if updatedPost.UpvoteCount != 1 { 282 t.Errorf("Expected upvote_count = 1, got %d", updatedPost.UpvoteCount) 283 } 284 if updatedPost.Score != 1 { 285 t.Errorf("Expected score = 1, got %d", updatedPost.Score) 286 } 287 288 t.Logf("✅ TRUE E2E UPVOTE FLOW COMPLETE:") 289 t.Logf(" Client → XRPC → PDS Write → Jetstream → Consumer → AppView ✓") 290 t.Logf(" ✓ Vote written to PDS") 291 t.Logf(" ✓ Vote indexed in AppView") 292 t.Logf(" ✓ Post vote counts updated") 293} 294 295// TestVoteE2E_ToggleSameDirection tests voting twice in same direction (toggle off) 296func TestVoteE2E_ToggleSameDirection(t *testing.T) { 297 if testing.Short() { 298 t.Skip("Skipping E2E test in short mode") 299 } 300 301 db := setupTestDB(t) 302 defer func() { _ = db.Close() }() 303 304 ctx := context.Background() 305 pdsURL := getTestPDSURL() 306 307 // Setup repositories and services 308 voteRepo := postgres.NewVoteRepository(db) 309 postRepo := postgres.NewPostRepository(db) 310 311 oauthStore := SetupOAuthTestStore(t, db) 312 oauthClient := SetupOAuthTestClient(t, oauthStore) 313 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil) 314 315 // Create test user 316 testUserHandle := fmt.Sprintf("toggle-%d.local.coves.dev", time.Now().Unix()) 317 testUserEmail := fmt.Sprintf("toggle-%d@test.local", time.Now().Unix()) 318 testUserPassword := "test-password-123" 319 320 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 321 if err != nil { 322 t.Skipf("PDS not available: %v", err) 323 } 324 325 testUser := createTestUser(t, db, testUserHandle, userDID) 326 327 // Create test post 328 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "toggle-community", "owner.test") 329 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 330 postCID := "bafypost456" 331 332 // Setup OAuth and HTTP server with real PDS access token 333 e2eAuth := NewE2EOAuthMiddleware() 334 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL) 335 336 r := chi.NewRouter() 337 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware) 338 httpServer := httptest.NewServer(r) 339 defer httpServer.Close() 340 341 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db) 342 343 // First upvote 344 t.Logf("\n📝 Creating first upvote...") 345 voteReq := map[string]interface{}{ 346 "subject": map[string]interface{}{ 347 "uri": postURI, 348 "cid": postCID, 349 }, 350 "direction": "up", 351 } 352 353 reqBody, _ := json.Marshal(voteReq) 354 req, _ := http.NewRequest(http.MethodPost, 355 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 356 bytes.NewBuffer(reqBody)) 357 req.Header.Set("Content-Type", "application/json") 358 req.Header.Set("Authorization", "Bearer "+token) 359 360 resp, err := http.DefaultClient.Do(req) 361 if err != nil { 362 t.Fatalf("Failed to create first vote: %v", err) 363 } 364 365 var firstVoteResp struct { 366 URI string `json:"uri"` 367 CID string `json:"cid"` 368 } 369 if decodeErr := json.NewDecoder(resp.Body).Decode(&firstVoteResp); decodeErr != nil { 370 t.Fatalf("Failed to decode first vote response: %v", decodeErr) 371 } 372 if closeErr := resp.Body.Close(); closeErr != nil { 373 t.Logf("Failed to close response body: %v", closeErr) 374 } 375 376 t.Logf("✅ First vote created: %s", firstVoteResp.URI) 377 378 // Index first vote 379 rkey := utils.ExtractRKeyFromURI(firstVoteResp.URI) 380 voteEvent := jetstream.JetstreamEvent{ 381 Did: userDID, 382 TimeUS: time.Now().UnixMicro(), 383 Kind: "commit", 384 Commit: &jetstream.CommitEvent{ 385 Rev: "test-vote-rev-1", 386 Operation: "create", 387 Collection: "social.coves.feed.vote", 388 RKey: rkey, 389 CID: firstVoteResp.CID, 390 Record: map[string]interface{}{ 391 "$type": "social.coves.feed.vote", 392 "subject": map[string]interface{}{ 393 "uri": postURI, 394 "cid": postCID, 395 }, 396 "direction": "up", 397 "createdAt": time.Now().Format(time.RFC3339), 398 }, 399 }, 400 } 401 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil { 402 t.Fatalf("Failed to handle first vote event: %v", handleErr) 403 } 404 405 // Second upvote (same direction) - should toggle off (delete) 406 t.Logf("\n📝 Creating second upvote (toggle off)...") 407 req2, _ := http.NewRequest(http.MethodPost, 408 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 409 bytes.NewBuffer(reqBody)) 410 req2.Header.Set("Content-Type", "application/json") 411 req2.Header.Set("Authorization", "Bearer "+token) 412 413 resp2, err := http.DefaultClient.Do(req2) 414 if err != nil { 415 t.Fatalf("Failed to toggle vote: %v", err) 416 } 417 defer func() { 418 if closeErr := resp2.Body.Close(); closeErr != nil { 419 t.Logf("Failed to close response body: %v", closeErr) 420 } 421 }() 422 423 if resp2.StatusCode != http.StatusOK { 424 body, _ := io.ReadAll(resp2.Body) 425 t.Fatalf("Expected 200, got %d: %s", resp2.StatusCode, string(body)) 426 } 427 428 t.Logf("✅ Second vote request completed (toggle)") 429 430 // Simulate Jetstream DELETE event 431 t.Logf("\n🔄 Simulating Jetstream DELETE event...") 432 deleteEvent := jetstream.JetstreamEvent{ 433 Did: userDID, 434 TimeUS: time.Now().UnixMicro(), 435 Kind: "commit", 436 Commit: &jetstream.CommitEvent{ 437 Rev: "test-vote-rev-2", 438 Operation: "delete", 439 Collection: "social.coves.feed.vote", 440 RKey: rkey, 441 }, 442 } 443 if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil { 444 t.Fatalf("Failed to handle delete event: %v", handleErr) 445 } 446 447 // Verify vote was removed from AppView 448 t.Logf("\n🔍 Verifying vote removed from AppView...") 449 _, err = voteRepo.GetByURI(ctx, firstVoteResp.URI) 450 if err == nil { 451 t.Error("Expected vote to be deleted, but it still exists") 452 } 453 454 // Verify post counts reset 455 updatedPost, _ := postRepo.GetByURI(ctx, postURI) 456 if updatedPost.UpvoteCount != 0 { 457 t.Errorf("Expected upvote_count = 0 after toggle, got %d", updatedPost.UpvoteCount) 458 } 459 460 t.Logf("✅ TOGGLE SAME DIRECTION FLOW COMPLETE:") 461 t.Logf(" ✓ First vote created and indexed") 462 t.Logf(" ✓ Second vote toggled off (deleted)") 463 t.Logf(" ✓ Post counts updated correctly") 464} 465 466// TestVoteE2E_ToggleDifferentDirection tests changing vote direction 467func TestVoteE2E_ToggleDifferentDirection(t *testing.T) { 468 if testing.Short() { 469 t.Skip("Skipping E2E test in short mode") 470 } 471 472 db := setupTestDB(t) 473 defer func() { _ = db.Close() }() 474 475 ctx := context.Background() 476 pdsURL := getTestPDSURL() 477 478 // Setup repositories and services 479 voteRepo := postgres.NewVoteRepository(db) 480 postRepo := postgres.NewPostRepository(db) 481 482 oauthStore := SetupOAuthTestStore(t, db) 483 oauthClient := SetupOAuthTestClient(t, oauthStore) 484 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil) 485 486 // Create test user 487 testUserHandle := fmt.Sprintf("flip-%d.local.coves.dev", time.Now().Unix()) 488 testUserEmail := fmt.Sprintf("flip-%d@test.local", time.Now().Unix()) 489 testUserPassword := "test-password-123" 490 491 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 492 if err != nil { 493 t.Skipf("PDS not available: %v", err) 494 } 495 496 testUser := createTestUser(t, db, testUserHandle, userDID) 497 498 // Create test post 499 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "flip-community", "owner.test") 500 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 501 postCID := "bafypost789" 502 503 // Setup OAuth and HTTP server with real PDS access token 504 e2eAuth := NewE2EOAuthMiddleware() 505 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL) 506 507 r := chi.NewRouter() 508 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware) 509 httpServer := httptest.NewServer(r) 510 defer httpServer.Close() 511 512 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db) 513 514 // Create upvote 515 t.Logf("\n📝 Creating upvote...") 516 upvoteReq := map[string]interface{}{ 517 "subject": map[string]interface{}{ 518 "uri": postURI, 519 "cid": postCID, 520 }, 521 "direction": "up", 522 } 523 524 reqBody, _ := json.Marshal(upvoteReq) 525 req, _ := http.NewRequest(http.MethodPost, 526 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 527 bytes.NewBuffer(reqBody)) 528 req.Header.Set("Content-Type", "application/json") 529 req.Header.Set("Authorization", "Bearer "+token) 530 531 resp, err := http.DefaultClient.Do(req) 532 if err != nil { 533 t.Fatalf("Failed to create upvote: %v", err) 534 } 535 var upvoteResp struct { 536 URI string `json:"uri"` 537 CID string `json:"cid"` 538 } 539 if decodeErr := json.NewDecoder(resp.Body).Decode(&upvoteResp); decodeErr != nil { 540 t.Fatalf("Failed to decode upvote response: %v", decodeErr) 541 } 542 if closeErr := resp.Body.Close(); closeErr != nil { 543 t.Logf("Failed to close response body: %v", closeErr) 544 } 545 546 // Index upvote 547 rkey := utils.ExtractRKeyFromURI(upvoteResp.URI) 548 upvoteEvent := jetstream.JetstreamEvent{ 549 Did: userDID, 550 TimeUS: time.Now().UnixMicro(), 551 Kind: "commit", 552 Commit: &jetstream.CommitEvent{ 553 Rev: "test-vote-rev-up", 554 Operation: "create", 555 Collection: "social.coves.feed.vote", 556 RKey: rkey, 557 CID: upvoteResp.CID, 558 Record: map[string]interface{}{ 559 "$type": "social.coves.feed.vote", 560 "subject": map[string]interface{}{ 561 "uri": postURI, 562 "cid": postCID, 563 }, 564 "direction": "up", 565 "createdAt": time.Now().Format(time.RFC3339), 566 }, 567 }, 568 } 569 if handleErr := voteConsumer.HandleEvent(ctx, &upvoteEvent); handleErr != nil { 570 t.Fatalf("Failed to handle upvote event: %v", handleErr) 571 } 572 573 t.Logf("✅ Upvote created and indexed") 574 575 // Change to downvote 576 t.Logf("\n📝 Changing to downvote...") 577 downvoteReq := map[string]interface{}{ 578 "subject": map[string]interface{}{ 579 "uri": postURI, 580 "cid": postCID, 581 }, 582 "direction": "down", 583 } 584 585 reqBody2, _ := json.Marshal(downvoteReq) 586 req2, _ := http.NewRequest(http.MethodPost, 587 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 588 bytes.NewBuffer(reqBody2)) 589 req2.Header.Set("Content-Type", "application/json") 590 req2.Header.Set("Authorization", "Bearer "+token) 591 592 resp2, err := http.DefaultClient.Do(req2) 593 if err != nil { 594 t.Fatalf("Failed to create downvote: %v", err) 595 } 596 var downvoteResp struct { 597 URI string `json:"uri"` 598 CID string `json:"cid"` 599 } 600 if decodeErr := json.NewDecoder(resp2.Body).Decode(&downvoteResp); decodeErr != nil { 601 t.Fatalf("Failed to decode downvote response: %v", decodeErr) 602 } 603 if closeErr := resp2.Body.Close(); closeErr != nil { 604 t.Logf("Failed to close response body: %v", closeErr) 605 } 606 607 // Simulate Jetstream UPDATE event (PDS updates the existing record) 608 t.Logf("\n🔄 Simulating Jetstream UPDATE event...") 609 updateEvent := jetstream.JetstreamEvent{ 610 Did: userDID, 611 TimeUS: time.Now().UnixMicro(), 612 Kind: "commit", 613 Commit: &jetstream.CommitEvent{ 614 Rev: "test-vote-rev-down", 615 Operation: "update", 616 Collection: "social.coves.feed.vote", 617 RKey: rkey, // Same rkey as before 618 CID: downvoteResp.CID, 619 Record: map[string]interface{}{ 620 "$type": "social.coves.feed.vote", 621 "subject": map[string]interface{}{ 622 "uri": postURI, 623 "cid": postCID, 624 }, 625 "direction": "down", // Changed direction 626 "createdAt": time.Now().Format(time.RFC3339), 627 }, 628 }, 629 } 630 if handleErr := voteConsumer.HandleEvent(ctx, &updateEvent); handleErr != nil { 631 t.Fatalf("Failed to handle update event: %v", handleErr) 632 } 633 634 // Verify vote direction changed in AppView 635 t.Logf("\n🔍 Verifying vote direction changed in AppView...") 636 updatedVote, err := voteRepo.GetByURI(ctx, upvoteResp.URI) 637 if err != nil { 638 t.Fatalf("Vote not found after update: %v", err) 639 } 640 641 if updatedVote.Direction != "down" { 642 t.Errorf("Expected direction 'down', got %s", updatedVote.Direction) 643 } 644 645 // Verify post counts updated 646 updatedPost, _ := postRepo.GetByURI(ctx, postURI) 647 if updatedPost.UpvoteCount != 0 { 648 t.Errorf("Expected upvote_count = 0, got %d", updatedPost.UpvoteCount) 649 } 650 if updatedPost.DownvoteCount != 1 { 651 t.Errorf("Expected downvote_count = 1, got %d", updatedPost.DownvoteCount) 652 } 653 if updatedPost.Score != -1 { 654 t.Errorf("Expected score = -1, got %d", updatedPost.Score) 655 } 656 657 t.Logf("✅ TOGGLE DIFFERENT DIRECTION FLOW COMPLETE:") 658 t.Logf(" ✓ Upvote created (score: +1)") 659 t.Logf(" ✓ Changed to downvote (score: -1)") 660 t.Logf(" ✓ Post counts updated correctly") 661} 662 663// TestVoteE2E_DeleteVote tests explicit vote deletion 664func TestVoteE2E_DeleteVote(t *testing.T) { 665 if testing.Short() { 666 t.Skip("Skipping E2E test in short mode") 667 } 668 669 db := setupTestDB(t) 670 defer func() { _ = db.Close() }() 671 672 ctx := context.Background() 673 pdsURL := getTestPDSURL() 674 675 // Setup repositories and services 676 voteRepo := postgres.NewVoteRepository(db) 677 postRepo := postgres.NewPostRepository(db) 678 679 oauthStore := SetupOAuthTestStore(t, db) 680 oauthClient := SetupOAuthTestClient(t, oauthStore) 681 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil) 682 683 // Create test user 684 testUserHandle := fmt.Sprintf("delete-%d.local.coves.dev", time.Now().Unix()) 685 testUserEmail := fmt.Sprintf("delete-%d@test.local", time.Now().Unix()) 686 testUserPassword := "test-password-123" 687 688 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 689 if err != nil { 690 t.Skipf("PDS not available: %v", err) 691 } 692 693 testUser := createTestUser(t, db, testUserHandle, userDID) 694 695 // Create test post 696 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "delete-community", "owner.test") 697 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 698 postCID := "bafypost999" 699 700 // Setup OAuth and HTTP server with real PDS access token 701 e2eAuth := NewE2EOAuthMiddleware() 702 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL) 703 704 r := chi.NewRouter() 705 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware) 706 httpServer := httptest.NewServer(r) 707 defer httpServer.Close() 708 709 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db) 710 711 // Create vote first 712 t.Logf("\n📝 Creating vote to delete...") 713 voteReq := map[string]interface{}{ 714 "subject": map[string]interface{}{ 715 "uri": postURI, 716 "cid": postCID, 717 }, 718 "direction": "up", 719 } 720 721 reqBody, _ := json.Marshal(voteReq) 722 req, _ := http.NewRequest(http.MethodPost, 723 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 724 bytes.NewBuffer(reqBody)) 725 req.Header.Set("Content-Type", "application/json") 726 req.Header.Set("Authorization", "Bearer "+token) 727 728 resp, err := http.DefaultClient.Do(req) 729 if err != nil { 730 t.Fatalf("Failed to create vote: %v", err) 731 } 732 var voteResp struct { 733 URI string `json:"uri"` 734 CID string `json:"cid"` 735 } 736 if decodeErr := json.NewDecoder(resp.Body).Decode(&voteResp); decodeErr != nil { 737 t.Fatalf("Failed to decode vote response: %v", decodeErr) 738 } 739 if closeErr := resp.Body.Close(); closeErr != nil { 740 t.Logf("Failed to close response body: %v", closeErr) 741 } 742 743 // Index vote 744 rkey := utils.ExtractRKeyFromURI(voteResp.URI) 745 voteEvent := jetstream.JetstreamEvent{ 746 Did: userDID, 747 TimeUS: time.Now().UnixMicro(), 748 Kind: "commit", 749 Commit: &jetstream.CommitEvent{ 750 Rev: "test-vote-create", 751 Operation: "create", 752 Collection: "social.coves.feed.vote", 753 RKey: rkey, 754 CID: voteResp.CID, 755 Record: map[string]interface{}{ 756 "$type": "social.coves.feed.vote", 757 "subject": map[string]interface{}{ 758 "uri": postURI, 759 "cid": postCID, 760 }, 761 "direction": "up", 762 "createdAt": time.Now().Format(time.RFC3339), 763 }, 764 }, 765 } 766 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil { 767 t.Fatalf("Failed to handle vote event: %v", handleErr) 768 } 769 770 t.Logf("✅ Vote created and indexed") 771 772 // Delete vote via XRPC 773 t.Logf("\n📝 Deleting vote via XRPC...") 774 deleteReq := map[string]interface{}{ 775 "subject": map[string]interface{}{ 776 "uri": postURI, 777 "cid": postCID, 778 }, 779 } 780 781 deleteBody, _ := json.Marshal(deleteReq) 782 deleteHttpReq, _ := http.NewRequest(http.MethodPost, 783 httpServer.URL+"/xrpc/social.coves.feed.vote.delete", 784 bytes.NewBuffer(deleteBody)) 785 deleteHttpReq.Header.Set("Content-Type", "application/json") 786 deleteHttpReq.Header.Set("Authorization", "Bearer "+token) 787 788 deleteResp, err := http.DefaultClient.Do(deleteHttpReq) 789 if err != nil { 790 t.Fatalf("Failed to delete vote: %v", err) 791 } 792 defer func() { 793 if closeErr := deleteResp.Body.Close(); closeErr != nil { 794 t.Logf("Failed to close response body: %v", closeErr) 795 } 796 }() 797 798 if deleteResp.StatusCode != http.StatusOK { 799 body, _ := io.ReadAll(deleteResp.Body) 800 t.Fatalf("Delete failed: status %d, body: %s", deleteResp.StatusCode, string(body)) 801 } 802 803 // Per lexicon, delete returns empty object {} 804 var deleteRespBody map[string]interface{} 805 if decodeErr := json.NewDecoder(deleteResp.Body).Decode(&deleteRespBody); decodeErr != nil { 806 t.Fatalf("Failed to decode delete response: %v", decodeErr) 807 } 808 809 if len(deleteRespBody) != 0 { 810 t.Errorf("Expected empty object per lexicon, got %v", deleteRespBody) 811 } 812 813 t.Logf("✅ Delete vote request succeeded") 814 815 // Simulate Jetstream DELETE event 816 t.Logf("\n🔄 Simulating Jetstream DELETE event...") 817 deleteEvent := jetstream.JetstreamEvent{ 818 Did: userDID, 819 TimeUS: time.Now().UnixMicro(), 820 Kind: "commit", 821 Commit: &jetstream.CommitEvent{ 822 Rev: "test-vote-delete", 823 Operation: "delete", 824 Collection: "social.coves.feed.vote", 825 RKey: rkey, 826 }, 827 } 828 if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil { 829 t.Fatalf("Failed to handle delete event: %v", handleErr) 830 } 831 832 // Verify vote removed from AppView 833 t.Logf("\n🔍 Verifying vote removed from AppView...") 834 _, err = voteRepo.GetByURI(ctx, voteResp.URI) 835 if err == nil { 836 t.Error("Expected vote to be deleted, but it still exists") 837 } 838 839 // Verify post counts reset 840 updatedPost, _ := postRepo.GetByURI(ctx, postURI) 841 if updatedPost.UpvoteCount != 0 { 842 t.Errorf("Expected upvote_count = 0 after delete, got %d", updatedPost.UpvoteCount) 843 } 844 if updatedPost.Score != 0 { 845 t.Errorf("Expected score = 0 after delete, got %d", updatedPost.Score) 846 } 847 848 t.Logf("✅ EXPLICIT DELETE FLOW COMPLETE:") 849 t.Logf(" ✓ Vote created and indexed") 850 t.Logf(" ✓ Vote deleted via XRPC") 851 t.Logf(" ✓ Vote removed from AppView") 852 t.Logf(" ✓ Post counts updated correctly") 853} 854 855// TestVoteE2E_JetstreamIndexing tests real Jetstream firehose consumption 856func TestVoteE2E_JetstreamIndexing(t *testing.T) { 857 if testing.Short() { 858 t.Skip("Skipping E2E test in short mode") 859 } 860 861 db := setupTestDB(t) 862 defer func() { _ = db.Close() }() 863 864 ctx := context.Background() 865 pdsURL := getTestPDSURL() 866 867 // Setup repositories 868 voteRepo := postgres.NewVoteRepository(db) 869 870 // Create test user on PDS 871 testUserHandle := fmt.Sprintf("jetstream-%d.local.coves.dev", time.Now().Unix()) 872 testUserEmail := fmt.Sprintf("jetstream-%d@test.local", time.Now().Unix()) 873 testUserPassword := "test-password-123" 874 875 accessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 876 if err != nil { 877 t.Skipf("PDS not available: %v", err) 878 } 879 880 testUser := createTestUser(t, db, testUserHandle, userDID) 881 882 // Create test post 883 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "jetstream-community", "owner.test") 884 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 885 postCID := "bafypostjetstream" 886 887 // Write vote directly to PDS 888 t.Logf("\n📝 Writing vote to PDS...") 889 voteRecord := map[string]interface{}{ 890 "$type": "social.coves.feed.vote", 891 "subject": map[string]interface{}{ 892 "uri": postURI, 893 "cid": postCID, 894 }, 895 "direction": "up", 896 "createdAt": time.Now().Format(time.RFC3339), 897 } 898 899 voteURI, voteCID, err := writePDSRecord(pdsURL, accessToken, userDID, "social.coves.feed.vote", "", voteRecord) 900 if err != nil { 901 t.Fatalf("Failed to write vote to PDS: %v", err) 902 } 903 904 t.Logf("✅ Vote written to PDS:") 905 t.Logf(" URI: %s", voteURI) 906 t.Logf(" CID: %s", voteCID) 907 908 // Setup Jetstream consumer 909 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db) 910 911 // Subscribe to Jetstream 912 t.Logf("\n🔄 Subscribing to real Jetstream firehose...") 913 pdsHostname := strings.TrimPrefix(pdsURL, "http://") 914 pdsHostname = strings.TrimPrefix(pdsHostname, "https://") 915 pdsHostname = strings.Split(pdsHostname, ":")[0] 916 917 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.feed.vote", pdsHostname) 918 t.Logf(" Jetstream URL: %s", jetstreamURL) 919 t.Logf(" Looking for vote DID: %s", userDID) 920 921 // Channels for event communication 922 eventChan := make(chan *jetstream.JetstreamEvent, 10) 923 errorChan := make(chan error, 1) 924 done := make(chan bool) 925 926 // Start Jetstream consumer in background 927 go func() { 928 err := subscribeToJetstreamForVote(ctx, jetstreamURL, userDID, voteConsumer, eventChan, errorChan, done) 929 if err != nil { 930 errorChan <- err 931 } 932 }() 933 934 // Wait for event or timeout 935 t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...") 936 937 select { 938 case event := <-eventChan: 939 t.Logf("✅ Received real Jetstream event!") 940 t.Logf(" Event DID: %s", event.Did) 941 t.Logf(" Collection: %s", event.Commit.Collection) 942 t.Logf(" Operation: %s", event.Commit.Operation) 943 t.Logf(" RKey: %s", event.Commit.RKey) 944 945 // Verify it's our vote 946 if event.Did != userDID { 947 t.Errorf("Expected DID %s, got %s", userDID, event.Did) 948 } 949 950 // Verify indexed in AppView database 951 t.Logf("\n🔍 Querying AppView database...") 952 indexedVote, err := voteRepo.GetByURI(ctx, voteURI) 953 if err != nil { 954 t.Fatalf("Vote not indexed in AppView: %v", err) 955 } 956 957 t.Logf("✅ Vote indexed in AppView:") 958 t.Logf(" VoterDID: %s", indexedVote.VoterDID) 959 t.Logf(" SubjectURI: %s", indexedVote.SubjectURI) 960 t.Logf(" Direction: %s", indexedVote.Direction) 961 t.Logf(" URI: %s", indexedVote.URI) 962 963 // Signal to stop Jetstream consumer 964 close(done) 965 966 case err := <-errorChan: 967 t.Fatalf("Jetstream error: %v", err) 968 969 case <-time.After(30 * time.Second): 970 t.Fatalf("Timeout: No Jetstream event received within 30 seconds") 971 } 972 973 t.Logf("\n✅ TRUE E2E JETSTREAM FLOW COMPLETE:") 974 t.Logf(" PDS → Jetstream → Consumer → AppView ✓") 975} 976 977// subscribeToJetstreamForVote subscribes to real Jetstream firehose for vote events 978func subscribeToJetstreamForVote( 979 ctx context.Context, 980 jetstreamURL string, 981 targetDID string, 982 consumer *jetstream.VoteEventConsumer, 983 eventChan chan<- *jetstream.JetstreamEvent, 984 errorChan chan<- error, 985 done <-chan bool, 986) error { 987 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 988 if err != nil { 989 return fmt.Errorf("failed to connect to Jetstream: %w", err) 990 } 991 defer func() { _ = conn.Close() }() 992 993 // Read messages until we find our event or receive done signal 994 for { 995 select { 996 case <-done: 997 return nil 998 case <-ctx.Done(): 999 return ctx.Err() 1000 default: 1001 // Set read deadline to avoid blocking forever 1002 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { 1003 return fmt.Errorf("failed to set read deadline: %w", err) 1004 } 1005 1006 var event jetstream.JetstreamEvent 1007 err := conn.ReadJSON(&event) 1008 if err != nil { 1009 // Check if it's a timeout (expected) 1010 if websocket.IsCloseError(err, websocket.CloseNormalClosure) { 1011 return nil 1012 } 1013 if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 1014 continue // Timeout is expected, keep listening 1015 } 1016 return fmt.Errorf("failed to read Jetstream message: %w", err) 1017 } 1018 1019 // Check if this is the event we're looking for 1020 if event.Did == targetDID && event.Kind == "commit" && event.Commit.Collection == "social.coves.feed.vote" { 1021 // Process the event through the consumer 1022 if err := consumer.HandleEvent(ctx, &event); err != nil { 1023 return fmt.Errorf("failed to process event: %w", err) 1024 } 1025 1026 // Send to channel so test can verify 1027 select { 1028 case eventChan <- &event: 1029 return nil 1030 case <-time.After(1 * time.Second): 1031 return fmt.Errorf("timeout sending event to channel") 1032 } 1033 } 1034 } 1035 } 1036}