A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/api/routes" 5 "Coves/internal/atproto/jetstream" 6 "Coves/internal/atproto/utils" 7 "Coves/internal/core/votes" 8 "Coves/internal/db/postgres" 9 "bytes" 10 "context" 11 "database/sql" 12 "encoding/json" 13 "fmt" 14 "io" 15 "net" 16 "net/http" 17 "net/http/httptest" 18 "os" 19 "strings" 20 "testing" 21 "time" 22 23 "github.com/go-chi/chi/v5" 24 "github.com/gorilla/websocket" 25 _ "github.com/lib/pq" 26 "github.com/pressly/goose/v3" 27) 28 29// TestVoteE2E_CreateUpvote tests the full vote creation flow with a real local PDS 30// Flow: Client → XRPC → PDS Write → Jetstream → Consumer → AppView 31func TestVoteE2E_CreateUpvote(t *testing.T) { 32 // Skip in short mode since this requires real PDS 33 if testing.Short() { 34 t.Skip("Skipping E2E test in short mode") 35 } 36 37 // Setup test database 38 dbURL := os.Getenv("TEST_DATABASE_URL") 39 if dbURL == "" { 40 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 41 } 42 43 db, err := sql.Open("postgres", dbURL) 44 if err != nil { 45 t.Fatalf("Failed to connect to test database: %v", err) 46 } 47 defer func() { 48 if closeErr := db.Close(); closeErr != nil { 49 t.Logf("Failed to close database: %v", closeErr) 50 } 51 }() 52 53 // Run migrations 54 if dialectErr := goose.SetDialect("postgres"); dialectErr != nil { 55 t.Fatalf("Failed to set goose dialect: %v", dialectErr) 56 } 57 if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil { 58 t.Fatalf("Failed to run migrations: %v", migrateErr) 59 } 60 61 // Check if PDS is running 62 pdsURL := os.Getenv("PDS_URL") 63 if pdsURL == "" { 64 pdsURL = "http://localhost:3001" 65 } 66 67 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 68 if err != nil { 69 t.Skipf("PDS not running at %s: %v", pdsURL, err) 70 } 71 func() { 72 if closeErr := healthResp.Body.Close(); closeErr != nil { 73 t.Logf("Failed to close health response: %v", closeErr) 74 } 75 }() 76 77 ctx := context.Background() 78 79 // Setup repositories 80 voteRepo := postgres.NewVoteRepository(db) 81 postRepo := postgres.NewPostRepository(db) 82 83 // Setup OAuth client and store for vote service 84 oauthStore := SetupOAuthTestStore(t, db) 85 oauthClient := SetupOAuthTestClient(t, oauthStore) 86 87 // Setup services 88 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil) 89 90 // Create test user on PDS 91 testUserHandle := fmt.Sprintf("voter-%d.local.coves.dev", time.Now().Unix()) 92 testUserEmail := fmt.Sprintf("voter-%d@test.local", time.Now().Unix()) 93 testUserPassword := "test-password-123" 94 95 t.Logf("Creating test user on PDS: %s", testUserHandle) 96 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 97 if err != nil { 98 t.Fatalf("Failed to create test user on PDS: %v", err) 99 } 100 t.Logf("Test user created: DID=%s", userDID) 101 102 // Index user in AppView 103 testUser := createTestUser(t, db, testUserHandle, userDID) 104 105 // Create test post to vote on 106 testCommunityDID, err := createFeedTestCommunity(db, ctx, "test-community", "owner.test") 107 if err != nil { 108 t.Fatalf("Failed to create test community: %v", err) 109 } 110 111 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 112 postCID := "bafypost123" 113 114 // Setup OAuth middleware with real PDS access token 115 e2eAuth := NewE2EOAuthMiddleware() 116 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL) 117 118 // Setup HTTP server with XRPC routes 119 r := chi.NewRouter() 120 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware) 121 httpServer := httptest.NewServer(r) 122 defer httpServer.Close() 123 124 // Setup Jetstream consumer 125 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db) 126 127 // ==================================================================================== 128 // TEST: Create upvote on post 129 // ==================================================================================== 130 t.Logf("\n📝 Creating upvote via XRPC endpoint...") 131 132 voteReq := map[string]interface{}{ 133 "subject": map[string]interface{}{ 134 "uri": postURI, 135 "cid": postCID, 136 }, 137 "direction": "up", 138 } 139 140 reqBody, marshalErr := json.Marshal(voteReq) 141 if marshalErr != nil { 142 t.Fatalf("Failed to marshal request: %v", marshalErr) 143 } 144 145 req, err := http.NewRequest(http.MethodPost, 146 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 147 bytes.NewBuffer(reqBody)) 148 if err != nil { 149 t.Fatalf("Failed to create request: %v", err) 150 } 151 req.Header.Set("Content-Type", "application/json") 152 req.Header.Set("Authorization", "Bearer "+token) 153 154 resp, err := http.DefaultClient.Do(req) 155 if err != nil { 156 t.Fatalf("Failed to POST vote: %v", err) 157 } 158 defer func() { _ = resp.Body.Close() }() 159 160 if resp.StatusCode != http.StatusOK { 161 body, readErr := io.ReadAll(resp.Body) 162 if readErr != nil { 163 t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr) 164 } 165 t.Logf("XRPC Vote Failed") 166 t.Logf(" Status: %d", resp.StatusCode) 167 t.Logf(" Response: %s", string(body)) 168 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 169 } 170 171 var voteResp struct { 172 URI string `json:"uri"` 173 CID string `json:"cid"` 174 } 175 176 if decodeErr := json.NewDecoder(resp.Body).Decode(&voteResp); decodeErr != nil { 177 t.Fatalf("Failed to decode vote response: %v", decodeErr) 178 } 179 180 t.Logf("✅ XRPC response received:") 181 t.Logf(" URI: %s", voteResp.URI) 182 t.Logf(" CID: %s", voteResp.CID) 183 184 // Verify vote record was written to PDS 185 t.Logf("\n🔍 Verifying vote record on PDS...") 186 rkey := utils.ExtractRKeyFromURI(voteResp.URI) 187 collection := "social.coves.feed.vote" 188 189 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 190 pdsURL, userDID, collection, rkey)) 191 if pdsErr != nil { 192 t.Fatalf("Failed to fetch vote record from PDS: %v", pdsErr) 193 } 194 defer func() { 195 if closeErr := pdsResp.Body.Close(); closeErr != nil { 196 t.Logf("Failed to close PDS response: %v", closeErr) 197 } 198 }() 199 200 if pdsResp.StatusCode != http.StatusOK { 201 body, _ := io.ReadAll(pdsResp.Body) 202 t.Fatalf("Vote record not found on PDS: status %d, body: %s", pdsResp.StatusCode, string(body)) 203 } 204 205 var pdsRecord struct { 206 Value map[string]interface{} `json:"value"` 207 CID string `json:"cid"` 208 } 209 if decodeErr := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); decodeErr != nil { 210 t.Fatalf("Failed to decode PDS record: %v", decodeErr) 211 } 212 213 t.Logf("✅ Vote record found on PDS:") 214 t.Logf(" CID: %s", pdsRecord.CID) 215 t.Logf(" Direction: %v", pdsRecord.Value["direction"]) 216 217 // Verify direction 218 if pdsRecord.Value["direction"] != "up" { 219 t.Errorf("Expected direction 'up', got %v", pdsRecord.Value["direction"]) 220 } 221 222 // Simulate Jetstream consumer indexing the vote 223 t.Logf("\n🔄 Simulating Jetstream consumer indexing vote...") 224 voteEvent := jetstream.JetstreamEvent{ 225 Did: userDID, 226 TimeUS: time.Now().UnixMicro(), 227 Kind: "commit", 228 Commit: &jetstream.CommitEvent{ 229 Rev: "test-vote-rev", 230 Operation: "create", 231 Collection: "social.coves.feed.vote", 232 RKey: rkey, 233 CID: pdsRecord.CID, 234 Record: map[string]interface{}{ 235 "$type": "social.coves.feed.vote", 236 "subject": map[string]interface{}{ 237 "uri": postURI, 238 "cid": postCID, 239 }, 240 "direction": "up", 241 "createdAt": time.Now().Format(time.RFC3339), 242 }, 243 }, 244 } 245 246 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil { 247 t.Fatalf("Failed to handle vote event: %v", handleErr) 248 } 249 250 // Verify vote was indexed in AppView 251 t.Logf("\n🔍 Verifying vote indexed in AppView...") 252 indexedVote, err := voteRepo.GetByURI(ctx, voteResp.URI) 253 if err != nil { 254 t.Fatalf("Vote not indexed in AppView: %v", err) 255 } 256 257 t.Logf("✅ Vote indexed in AppView:") 258 t.Logf(" VoterDID: %s", indexedVote.VoterDID) 259 t.Logf(" SubjectURI: %s", indexedVote.SubjectURI) 260 t.Logf(" Direction: %s", indexedVote.Direction) 261 t.Logf(" URI: %s", indexedVote.URI) 262 263 // Verify vote details 264 if indexedVote.VoterDID != userDID { 265 t.Errorf("Expected voter_did %s, got %s", userDID, indexedVote.VoterDID) 266 } 267 if indexedVote.SubjectURI != postURI { 268 t.Errorf("Expected subject_uri %s, got %s", postURI, indexedVote.SubjectURI) 269 } 270 if indexedVote.Direction != "up" { 271 t.Errorf("Expected direction 'up', got %s", indexedVote.Direction) 272 } 273 274 // Verify post counts updated 275 t.Logf("\n🔍 Verifying post vote counts updated...") 276 updatedPost, err := postRepo.GetByURI(ctx, postURI) 277 if err != nil { 278 t.Fatalf("Failed to get updated post: %v", err) 279 } 280 281 if updatedPost.UpvoteCount != 1 { 282 t.Errorf("Expected upvote_count = 1, got %d", updatedPost.UpvoteCount) 283 } 284 if updatedPost.Score != 1 { 285 t.Errorf("Expected score = 1, got %d", updatedPost.Score) 286 } 287 288 t.Logf("✅ TRUE E2E UPVOTE FLOW COMPLETE:") 289 t.Logf(" Client → XRPC → PDS Write → Jetstream → Consumer → AppView ✓") 290 t.Logf(" ✓ Vote written to PDS") 291 t.Logf(" ✓ Vote indexed in AppView") 292 t.Logf(" ✓ Post vote counts updated") 293} 294 295// TestVoteE2E_ToggleSameDirection tests voting twice in same direction (toggle off) 296func TestVoteE2E_ToggleSameDirection(t *testing.T) { 297 if testing.Short() { 298 t.Skip("Skipping E2E test in short mode") 299 } 300 301 db := setupTestDB(t) 302 defer func() { _ = db.Close() }() 303 304 ctx := context.Background() 305 pdsURL := getTestPDSURL() 306 307 // Setup repositories and services 308 voteRepo := postgres.NewVoteRepository(db) 309 postRepo := postgres.NewPostRepository(db) 310 311 oauthStore := SetupOAuthTestStore(t, db) 312 oauthClient := SetupOAuthTestClient(t, oauthStore) 313 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil) 314 315 // Create test user 316 testUserHandle := fmt.Sprintf("toggle-%d.local.coves.dev", time.Now().Unix()) 317 testUserEmail := fmt.Sprintf("toggle-%d@test.local", time.Now().Unix()) 318 testUserPassword := "test-password-123" 319 320 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 321 if err != nil { 322 t.Skipf("PDS not available: %v", err) 323 } 324 325 testUser := createTestUser(t, db, testUserHandle, userDID) 326 327 // Create test post 328 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "toggle-community", "owner.test") 329 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 330 postCID := "bafypost456" 331 332 // Setup OAuth and HTTP server with real PDS access token 333 e2eAuth := NewE2EOAuthMiddleware() 334 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL) 335 336 r := chi.NewRouter() 337 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware) 338 httpServer := httptest.NewServer(r) 339 defer httpServer.Close() 340 341 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db) 342 343 // First upvote 344 t.Logf("\n📝 Creating first upvote...") 345 voteReq := map[string]interface{}{ 346 "subject": map[string]interface{}{ 347 "uri": postURI, 348 "cid": postCID, 349 }, 350 "direction": "up", 351 } 352 353 reqBody, _ := json.Marshal(voteReq) 354 req, _ := http.NewRequest(http.MethodPost, 355 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 356 bytes.NewBuffer(reqBody)) 357 req.Header.Set("Content-Type", "application/json") 358 req.Header.Set("Authorization", "Bearer "+token) 359 360 resp, err := http.DefaultClient.Do(req) 361 if err != nil { 362 t.Fatalf("Failed to create first vote: %v", err) 363 } 364 365 var firstVoteResp struct { 366 URI string `json:"uri"` 367 CID string `json:"cid"` 368 } 369 if decodeErr := json.NewDecoder(resp.Body).Decode(&firstVoteResp); decodeErr != nil { 370 t.Fatalf("Failed to decode first vote response: %v", decodeErr) 371 } 372 if closeErr := resp.Body.Close(); closeErr != nil { 373 t.Logf("Failed to close response body: %v", closeErr) 374 } 375 376 t.Logf("✅ First vote created: %s", firstVoteResp.URI) 377 378 // Index first vote 379 rkey := utils.ExtractRKeyFromURI(firstVoteResp.URI) 380 voteEvent := jetstream.JetstreamEvent{ 381 Did: userDID, 382 TimeUS: time.Now().UnixMicro(), 383 Kind: "commit", 384 Commit: &jetstream.CommitEvent{ 385 Rev: "test-vote-rev-1", 386 Operation: "create", 387 Collection: "social.coves.feed.vote", 388 RKey: rkey, 389 CID: firstVoteResp.CID, 390 Record: map[string]interface{}{ 391 "$type": "social.coves.feed.vote", 392 "subject": map[string]interface{}{ 393 "uri": postURI, 394 "cid": postCID, 395 }, 396 "direction": "up", 397 "createdAt": time.Now().Format(time.RFC3339), 398 }, 399 }, 400 } 401 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil { 402 t.Fatalf("Failed to handle first vote event: %v", handleErr) 403 } 404 405 // Second upvote (same direction) - should toggle off (delete) 406 t.Logf("\n📝 Creating second upvote (toggle off)...") 407 req2, _ := http.NewRequest(http.MethodPost, 408 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 409 bytes.NewBuffer(reqBody)) 410 req2.Header.Set("Content-Type", "application/json") 411 req2.Header.Set("Authorization", "Bearer "+token) 412 413 resp2, err := http.DefaultClient.Do(req2) 414 if err != nil { 415 t.Fatalf("Failed to toggle vote: %v", err) 416 } 417 defer func() { 418 if closeErr := resp2.Body.Close(); closeErr != nil { 419 t.Logf("Failed to close response body: %v", closeErr) 420 } 421 }() 422 423 if resp2.StatusCode != http.StatusOK { 424 body, _ := io.ReadAll(resp2.Body) 425 t.Fatalf("Expected 200, got %d: %s", resp2.StatusCode, string(body)) 426 } 427 428 t.Logf("✅ Second vote request completed (toggle)") 429 430 // Simulate Jetstream DELETE event 431 t.Logf("\n🔄 Simulating Jetstream DELETE event...") 432 deleteEvent := jetstream.JetstreamEvent{ 433 Did: userDID, 434 TimeUS: time.Now().UnixMicro(), 435 Kind: "commit", 436 Commit: &jetstream.CommitEvent{ 437 Rev: "test-vote-rev-2", 438 Operation: "delete", 439 Collection: "social.coves.feed.vote", 440 RKey: rkey, 441 }, 442 } 443 if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil { 444 t.Fatalf("Failed to handle delete event: %v", handleErr) 445 } 446 447 // Verify vote was removed from AppView 448 t.Logf("\n🔍 Verifying vote removed from AppView...") 449 _, err = voteRepo.GetByURI(ctx, firstVoteResp.URI) 450 if err == nil { 451 t.Error("Expected vote to be deleted, but it still exists") 452 } 453 454 // Verify post counts reset 455 updatedPost, _ := postRepo.GetByURI(ctx, postURI) 456 if updatedPost.UpvoteCount != 0 { 457 t.Errorf("Expected upvote_count = 0 after toggle, got %d", updatedPost.UpvoteCount) 458 } 459 460 t.Logf("✅ TOGGLE SAME DIRECTION FLOW COMPLETE:") 461 t.Logf(" ✓ First vote created and indexed") 462 t.Logf(" ✓ Second vote toggled off (deleted)") 463 t.Logf(" ✓ Post counts updated correctly") 464} 465 466// TestVoteE2E_ToggleDifferentDirection tests changing vote direction 467func TestVoteE2E_ToggleDifferentDirection(t *testing.T) { 468 if testing.Short() { 469 t.Skip("Skipping E2E test in short mode") 470 } 471 472 db := setupTestDB(t) 473 defer func() { _ = db.Close() }() 474 475 ctx := context.Background() 476 pdsURL := getTestPDSURL() 477 478 // Setup repositories and services 479 voteRepo := postgres.NewVoteRepository(db) 480 postRepo := postgres.NewPostRepository(db) 481 482 oauthStore := SetupOAuthTestStore(t, db) 483 oauthClient := SetupOAuthTestClient(t, oauthStore) 484 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil) 485 486 // Create test user 487 testUserHandle := fmt.Sprintf("flip-%d.local.coves.dev", time.Now().Unix()) 488 testUserEmail := fmt.Sprintf("flip-%d@test.local", time.Now().Unix()) 489 testUserPassword := "test-password-123" 490 491 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 492 if err != nil { 493 t.Skipf("PDS not available: %v", err) 494 } 495 496 testUser := createTestUser(t, db, testUserHandle, userDID) 497 498 // Create test post 499 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "flip-community", "owner.test") 500 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 501 postCID := "bafypost789" 502 503 // Setup OAuth and HTTP server with real PDS access token 504 e2eAuth := NewE2EOAuthMiddleware() 505 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL) 506 507 r := chi.NewRouter() 508 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware) 509 httpServer := httptest.NewServer(r) 510 defer httpServer.Close() 511 512 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db) 513 514 // Create upvote 515 t.Logf("\n📝 Creating upvote...") 516 upvoteReq := map[string]interface{}{ 517 "subject": map[string]interface{}{ 518 "uri": postURI, 519 "cid": postCID, 520 }, 521 "direction": "up", 522 } 523 524 reqBody, _ := json.Marshal(upvoteReq) 525 req, _ := http.NewRequest(http.MethodPost, 526 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 527 bytes.NewBuffer(reqBody)) 528 req.Header.Set("Content-Type", "application/json") 529 req.Header.Set("Authorization", "Bearer "+token) 530 531 resp, err := http.DefaultClient.Do(req) 532 if err != nil { 533 t.Fatalf("Failed to create upvote: %v", err) 534 } 535 var upvoteResp struct { 536 URI string `json:"uri"` 537 CID string `json:"cid"` 538 } 539 if decodeErr := json.NewDecoder(resp.Body).Decode(&upvoteResp); decodeErr != nil { 540 t.Fatalf("Failed to decode upvote response: %v", decodeErr) 541 } 542 if closeErr := resp.Body.Close(); closeErr != nil { 543 t.Logf("Failed to close response body: %v", closeErr) 544 } 545 546 // Index upvote 547 rkey := utils.ExtractRKeyFromURI(upvoteResp.URI) 548 upvoteEvent := jetstream.JetstreamEvent{ 549 Did: userDID, 550 TimeUS: time.Now().UnixMicro(), 551 Kind: "commit", 552 Commit: &jetstream.CommitEvent{ 553 Rev: "test-vote-rev-up", 554 Operation: "create", 555 Collection: "social.coves.feed.vote", 556 RKey: rkey, 557 CID: upvoteResp.CID, 558 Record: map[string]interface{}{ 559 "$type": "social.coves.feed.vote", 560 "subject": map[string]interface{}{ 561 "uri": postURI, 562 "cid": postCID, 563 }, 564 "direction": "up", 565 "createdAt": time.Now().Format(time.RFC3339), 566 }, 567 }, 568 } 569 if handleErr := voteConsumer.HandleEvent(ctx, &upvoteEvent); handleErr != nil { 570 t.Fatalf("Failed to handle upvote event: %v", handleErr) 571 } 572 573 t.Logf("✅ Upvote created and indexed") 574 575 // Change to downvote 576 t.Logf("\n📝 Changing to downvote...") 577 downvoteReq := map[string]interface{}{ 578 "subject": map[string]interface{}{ 579 "uri": postURI, 580 "cid": postCID, 581 }, 582 "direction": "down", 583 } 584 585 reqBody2, _ := json.Marshal(downvoteReq) 586 req2, _ := http.NewRequest(http.MethodPost, 587 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 588 bytes.NewBuffer(reqBody2)) 589 req2.Header.Set("Content-Type", "application/json") 590 req2.Header.Set("Authorization", "Bearer "+token) 591 592 resp2, err := http.DefaultClient.Do(req2) 593 if err != nil { 594 t.Fatalf("Failed to create downvote: %v", err) 595 } 596 var downvoteResp struct { 597 URI string `json:"uri"` 598 CID string `json:"cid"` 599 } 600 if decodeErr := json.NewDecoder(resp2.Body).Decode(&downvoteResp); decodeErr != nil { 601 t.Fatalf("Failed to decode downvote response: %v", decodeErr) 602 } 603 if closeErr := resp2.Body.Close(); closeErr != nil { 604 t.Logf("Failed to close response body: %v", closeErr) 605 } 606 607 // The service flow for direction change is: 608 // 1. DELETE old vote on PDS 609 // 2. CREATE new vote with NEW rkey on PDS 610 // So we simulate DELETE + CREATE events (not UPDATE) 611 612 // Simulate Jetstream DELETE event for old vote 613 t.Logf("\n🔄 Simulating Jetstream DELETE event for old upvote...") 614 deleteEvent := jetstream.JetstreamEvent{ 615 Did: userDID, 616 TimeUS: time.Now().UnixMicro(), 617 Kind: "commit", 618 Commit: &jetstream.CommitEvent{ 619 Rev: "test-vote-rev-delete", 620 Operation: "delete", 621 Collection: "social.coves.feed.vote", 622 RKey: rkey, // Old upvote rkey 623 }, 624 } 625 if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil { 626 t.Fatalf("Failed to handle delete event: %v", handleErr) 627 } 628 629 // Simulate Jetstream CREATE event for new downvote 630 t.Logf("\n🔄 Simulating Jetstream CREATE event for new downvote...") 631 newRkey := utils.ExtractRKeyFromURI(downvoteResp.URI) 632 createEvent := jetstream.JetstreamEvent{ 633 Did: userDID, 634 TimeUS: time.Now().UnixMicro(), 635 Kind: "commit", 636 Commit: &jetstream.CommitEvent{ 637 Rev: "test-vote-rev-down", 638 Operation: "create", 639 Collection: "social.coves.feed.vote", 640 RKey: newRkey, // NEW rkey from downvote response 641 CID: downvoteResp.CID, 642 Record: map[string]interface{}{ 643 "$type": "social.coves.feed.vote", 644 "subject": map[string]interface{}{ 645 "uri": postURI, 646 "cid": postCID, 647 }, 648 "direction": "down", 649 "createdAt": time.Now().Format(time.RFC3339), 650 }, 651 }, 652 } 653 if handleErr := voteConsumer.HandleEvent(ctx, &createEvent); handleErr != nil { 654 t.Fatalf("Failed to handle create event: %v", handleErr) 655 } 656 657 // Verify old upvote was deleted 658 t.Logf("\n🔍 Verifying old upvote was deleted...") 659 _, err = voteRepo.GetByURI(ctx, upvoteResp.URI) 660 if err == nil { 661 t.Error("Expected old upvote to be deleted, but it still exists") 662 } 663 664 // Verify new downvote was indexed 665 t.Logf("\n🔍 Verifying new downvote indexed in AppView...") 666 newVote, err := voteRepo.GetByURI(ctx, downvoteResp.URI) 667 if err != nil { 668 t.Fatalf("New downvote not found: %v", err) 669 } 670 671 if newVote.Direction != "down" { 672 t.Errorf("Expected direction 'down', got %s", newVote.Direction) 673 } 674 675 // Verify post counts updated 676 updatedPost, _ := postRepo.GetByURI(ctx, postURI) 677 if updatedPost.UpvoteCount != 0 { 678 t.Errorf("Expected upvote_count = 0, got %d", updatedPost.UpvoteCount) 679 } 680 if updatedPost.DownvoteCount != 1 { 681 t.Errorf("Expected downvote_count = 1, got %d", updatedPost.DownvoteCount) 682 } 683 if updatedPost.Score != -1 { 684 t.Errorf("Expected score = -1, got %d", updatedPost.Score) 685 } 686 687 t.Logf("✅ TOGGLE DIFFERENT DIRECTION FLOW COMPLETE:") 688 t.Logf(" ✓ Upvote created (score: +1)") 689 t.Logf(" ✓ Changed to downvote (score: -1)") 690 t.Logf(" ✓ Post counts updated correctly") 691} 692 693// TestVoteE2E_DeleteVote tests explicit vote deletion 694func TestVoteE2E_DeleteVote(t *testing.T) { 695 if testing.Short() { 696 t.Skip("Skipping E2E test in short mode") 697 } 698 699 db := setupTestDB(t) 700 defer func() { _ = db.Close() }() 701 702 ctx := context.Background() 703 pdsURL := getTestPDSURL() 704 705 // Setup repositories and services 706 voteRepo := postgres.NewVoteRepository(db) 707 postRepo := postgres.NewPostRepository(db) 708 709 oauthStore := SetupOAuthTestStore(t, db) 710 oauthClient := SetupOAuthTestClient(t, oauthStore) 711 voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil) 712 713 // Create test user 714 testUserHandle := fmt.Sprintf("delete-%d.local.coves.dev", time.Now().Unix()) 715 testUserEmail := fmt.Sprintf("delete-%d@test.local", time.Now().Unix()) 716 testUserPassword := "test-password-123" 717 718 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 719 if err != nil { 720 t.Skipf("PDS not available: %v", err) 721 } 722 723 testUser := createTestUser(t, db, testUserHandle, userDID) 724 725 // Create test post 726 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "delete-community", "owner.test") 727 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 728 postCID := "bafypost999" 729 730 // Setup OAuth and HTTP server with real PDS access token 731 e2eAuth := NewE2EOAuthMiddleware() 732 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL) 733 734 r := chi.NewRouter() 735 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware) 736 httpServer := httptest.NewServer(r) 737 defer httpServer.Close() 738 739 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db) 740 741 // Create vote first 742 t.Logf("\n📝 Creating vote to delete...") 743 voteReq := map[string]interface{}{ 744 "subject": map[string]interface{}{ 745 "uri": postURI, 746 "cid": postCID, 747 }, 748 "direction": "up", 749 } 750 751 reqBody, _ := json.Marshal(voteReq) 752 req, _ := http.NewRequest(http.MethodPost, 753 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 754 bytes.NewBuffer(reqBody)) 755 req.Header.Set("Content-Type", "application/json") 756 req.Header.Set("Authorization", "Bearer "+token) 757 758 resp, err := http.DefaultClient.Do(req) 759 if err != nil { 760 t.Fatalf("Failed to create vote: %v", err) 761 } 762 var voteResp struct { 763 URI string `json:"uri"` 764 CID string `json:"cid"` 765 } 766 if decodeErr := json.NewDecoder(resp.Body).Decode(&voteResp); decodeErr != nil { 767 t.Fatalf("Failed to decode vote response: %v", decodeErr) 768 } 769 if closeErr := resp.Body.Close(); closeErr != nil { 770 t.Logf("Failed to close response body: %v", closeErr) 771 } 772 773 // Index vote 774 rkey := utils.ExtractRKeyFromURI(voteResp.URI) 775 voteEvent := jetstream.JetstreamEvent{ 776 Did: userDID, 777 TimeUS: time.Now().UnixMicro(), 778 Kind: "commit", 779 Commit: &jetstream.CommitEvent{ 780 Rev: "test-vote-create", 781 Operation: "create", 782 Collection: "social.coves.feed.vote", 783 RKey: rkey, 784 CID: voteResp.CID, 785 Record: map[string]interface{}{ 786 "$type": "social.coves.feed.vote", 787 "subject": map[string]interface{}{ 788 "uri": postURI, 789 "cid": postCID, 790 }, 791 "direction": "up", 792 "createdAt": time.Now().Format(time.RFC3339), 793 }, 794 }, 795 } 796 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil { 797 t.Fatalf("Failed to handle vote event: %v", handleErr) 798 } 799 800 t.Logf("✅ Vote created and indexed") 801 802 // Delete vote via XRPC 803 t.Logf("\n📝 Deleting vote via XRPC...") 804 deleteReq := map[string]interface{}{ 805 "subject": map[string]interface{}{ 806 "uri": postURI, 807 "cid": postCID, 808 }, 809 } 810 811 deleteBody, _ := json.Marshal(deleteReq) 812 deleteHttpReq, _ := http.NewRequest(http.MethodPost, 813 httpServer.URL+"/xrpc/social.coves.feed.vote.delete", 814 bytes.NewBuffer(deleteBody)) 815 deleteHttpReq.Header.Set("Content-Type", "application/json") 816 deleteHttpReq.Header.Set("Authorization", "Bearer "+token) 817 818 deleteResp, err := http.DefaultClient.Do(deleteHttpReq) 819 if err != nil { 820 t.Fatalf("Failed to delete vote: %v", err) 821 } 822 defer func() { 823 if closeErr := deleteResp.Body.Close(); closeErr != nil { 824 t.Logf("Failed to close response body: %v", closeErr) 825 } 826 }() 827 828 if deleteResp.StatusCode != http.StatusOK { 829 body, _ := io.ReadAll(deleteResp.Body) 830 t.Fatalf("Delete failed: status %d, body: %s", deleteResp.StatusCode, string(body)) 831 } 832 833 // Per lexicon, delete returns empty object {} 834 var deleteRespBody map[string]interface{} 835 if decodeErr := json.NewDecoder(deleteResp.Body).Decode(&deleteRespBody); decodeErr != nil { 836 t.Fatalf("Failed to decode delete response: %v", decodeErr) 837 } 838 839 if len(deleteRespBody) != 0 { 840 t.Errorf("Expected empty object per lexicon, got %v", deleteRespBody) 841 } 842 843 t.Logf("✅ Delete vote request succeeded") 844 845 // Simulate Jetstream DELETE event 846 t.Logf("\n🔄 Simulating Jetstream DELETE event...") 847 deleteEvent := jetstream.JetstreamEvent{ 848 Did: userDID, 849 TimeUS: time.Now().UnixMicro(), 850 Kind: "commit", 851 Commit: &jetstream.CommitEvent{ 852 Rev: "test-vote-delete", 853 Operation: "delete", 854 Collection: "social.coves.feed.vote", 855 RKey: rkey, 856 }, 857 } 858 if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil { 859 t.Fatalf("Failed to handle delete event: %v", handleErr) 860 } 861 862 // Verify vote removed from AppView 863 t.Logf("\n🔍 Verifying vote removed from AppView...") 864 _, err = voteRepo.GetByURI(ctx, voteResp.URI) 865 if err == nil { 866 t.Error("Expected vote to be deleted, but it still exists") 867 } 868 869 // Verify post counts reset 870 updatedPost, _ := postRepo.GetByURI(ctx, postURI) 871 if updatedPost.UpvoteCount != 0 { 872 t.Errorf("Expected upvote_count = 0 after delete, got %d", updatedPost.UpvoteCount) 873 } 874 if updatedPost.Score != 0 { 875 t.Errorf("Expected score = 0 after delete, got %d", updatedPost.Score) 876 } 877 878 t.Logf("✅ EXPLICIT DELETE FLOW COMPLETE:") 879 t.Logf(" ✓ Vote created and indexed") 880 t.Logf(" ✓ Vote deleted via XRPC") 881 t.Logf(" ✓ Vote removed from AppView") 882 t.Logf(" ✓ Post counts updated correctly") 883} 884 885// TestVoteE2E_JetstreamIndexing tests real Jetstream firehose consumption 886func TestVoteE2E_JetstreamIndexing(t *testing.T) { 887 if testing.Short() { 888 t.Skip("Skipping E2E test in short mode") 889 } 890 891 db := setupTestDB(t) 892 defer func() { _ = db.Close() }() 893 894 ctx := context.Background() 895 pdsURL := getTestPDSURL() 896 897 // Setup repositories 898 voteRepo := postgres.NewVoteRepository(db) 899 900 // Create test user on PDS 901 testUserHandle := fmt.Sprintf("jetstream-%d.local.coves.dev", time.Now().Unix()) 902 testUserEmail := fmt.Sprintf("jetstream-%d@test.local", time.Now().Unix()) 903 testUserPassword := "test-password-123" 904 905 accessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 906 if err != nil { 907 t.Skipf("PDS not available: %v", err) 908 } 909 910 testUser := createTestUser(t, db, testUserHandle, userDID) 911 912 // Create test post 913 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "jetstream-community", "owner.test") 914 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 915 postCID := "bafypostjetstream" 916 917 // Write vote directly to PDS 918 t.Logf("\n📝 Writing vote to PDS...") 919 voteRecord := map[string]interface{}{ 920 "$type": "social.coves.feed.vote", 921 "subject": map[string]interface{}{ 922 "uri": postURI, 923 "cid": postCID, 924 }, 925 "direction": "up", 926 "createdAt": time.Now().Format(time.RFC3339), 927 } 928 929 voteURI, voteCID, err := writePDSRecord(pdsURL, accessToken, userDID, "social.coves.feed.vote", "", voteRecord) 930 if err != nil { 931 t.Fatalf("Failed to write vote to PDS: %v", err) 932 } 933 934 t.Logf("✅ Vote written to PDS:") 935 t.Logf(" URI: %s", voteURI) 936 t.Logf(" CID: %s", voteCID) 937 938 // Setup Jetstream consumer 939 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db) 940 941 // Subscribe to Jetstream 942 t.Logf("\n🔄 Subscribing to real Jetstream firehose...") 943 pdsHostname := strings.TrimPrefix(pdsURL, "http://") 944 pdsHostname = strings.TrimPrefix(pdsHostname, "https://") 945 pdsHostname = strings.Split(pdsHostname, ":")[0] 946 947 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.feed.vote", pdsHostname) 948 t.Logf(" Jetstream URL: %s", jetstreamURL) 949 t.Logf(" Looking for vote DID: %s", userDID) 950 951 // Channels for event communication 952 eventChan := make(chan *jetstream.JetstreamEvent, 10) 953 errorChan := make(chan error, 1) 954 done := make(chan bool) 955 956 // Start Jetstream consumer in background 957 go func() { 958 err := subscribeToJetstreamForVote(ctx, jetstreamURL, userDID, voteConsumer, eventChan, errorChan, done) 959 if err != nil { 960 errorChan <- err 961 } 962 }() 963 964 // Wait for event or timeout 965 t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...") 966 967 select { 968 case event := <-eventChan: 969 t.Logf("✅ Received real Jetstream event!") 970 t.Logf(" Event DID: %s", event.Did) 971 t.Logf(" Collection: %s", event.Commit.Collection) 972 t.Logf(" Operation: %s", event.Commit.Operation) 973 t.Logf(" RKey: %s", event.Commit.RKey) 974 975 // Verify it's our vote 976 if event.Did != userDID { 977 t.Errorf("Expected DID %s, got %s", userDID, event.Did) 978 } 979 980 // Verify indexed in AppView database 981 t.Logf("\n🔍 Querying AppView database...") 982 indexedVote, err := voteRepo.GetByURI(ctx, voteURI) 983 if err != nil { 984 t.Fatalf("Vote not indexed in AppView: %v", err) 985 } 986 987 t.Logf("✅ Vote indexed in AppView:") 988 t.Logf(" VoterDID: %s", indexedVote.VoterDID) 989 t.Logf(" SubjectURI: %s", indexedVote.SubjectURI) 990 t.Logf(" Direction: %s", indexedVote.Direction) 991 t.Logf(" URI: %s", indexedVote.URI) 992 993 // Signal to stop Jetstream consumer 994 close(done) 995 996 case err := <-errorChan: 997 t.Fatalf("Jetstream error: %v", err) 998 999 case <-time.After(30 * time.Second): 1000 t.Fatalf("Timeout: No Jetstream event received within 30 seconds") 1001 } 1002 1003 t.Logf("\n✅ TRUE E2E JETSTREAM FLOW COMPLETE:") 1004 t.Logf(" PDS → Jetstream → Consumer → AppView ✓") 1005} 1006 1007// subscribeToJetstreamForVote subscribes to real Jetstream firehose for vote events 1008func subscribeToJetstreamForVote( 1009 ctx context.Context, 1010 jetstreamURL string, 1011 targetDID string, 1012 consumer *jetstream.VoteEventConsumer, 1013 eventChan chan<- *jetstream.JetstreamEvent, 1014 errorChan chan<- error, 1015 done <-chan bool, 1016) error { 1017 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 1018 if err != nil { 1019 return fmt.Errorf("failed to connect to Jetstream: %w", err) 1020 } 1021 defer func() { _ = conn.Close() }() 1022 1023 // Read messages until we find our event or receive done signal 1024 for { 1025 select { 1026 case <-done: 1027 return nil 1028 case <-ctx.Done(): 1029 return ctx.Err() 1030 default: 1031 // Set read deadline to avoid blocking forever 1032 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { 1033 return fmt.Errorf("failed to set read deadline: %w", err) 1034 } 1035 1036 var event jetstream.JetstreamEvent 1037 err := conn.ReadJSON(&event) 1038 if err != nil { 1039 // Check if it's a timeout (expected) 1040 if websocket.IsCloseError(err, websocket.CloseNormalClosure) { 1041 return nil 1042 } 1043 if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 1044 continue // Timeout is expected, keep listening 1045 } 1046 return fmt.Errorf("failed to read Jetstream message: %w", err) 1047 } 1048 1049 // Check if this is the event we're looking for 1050 if event.Did == targetDID && event.Kind == "commit" && event.Commit.Collection == "social.coves.feed.vote" { 1051 // Process the event through the consumer 1052 if err := consumer.HandleEvent(ctx, &event); err != nil { 1053 return fmt.Errorf("failed to process event: %w", err) 1054 } 1055 1056 // Send to channel so test can verify 1057 select { 1058 case eventChan <- &event: 1059 return nil 1060 case <-time.After(1 * time.Second): 1061 return fmt.Errorf("timeout sending event to channel") 1062 } 1063 } 1064 } 1065 } 1066}