A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/api/routes" 5 "Coves/internal/atproto/jetstream" 6 "Coves/internal/atproto/utils" 7 "Coves/internal/core/votes" 8 "Coves/internal/db/postgres" 9 "bytes" 10 "context" 11 "database/sql" 12 "encoding/json" 13 "fmt" 14 "io" 15 "net" 16 "net/http" 17 "net/http/httptest" 18 "os" 19 "strings" 20 "testing" 21 "time" 22 23 "github.com/go-chi/chi/v5" 24 "github.com/gorilla/websocket" 25 _ "github.com/lib/pq" 26 "github.com/pressly/goose/v3" 27) 28 29// TestVoteE2E_CreateUpvote tests the full vote creation flow with a real local PDS 30// Flow: Client → XRPC → PDS Write → Jetstream → Consumer → AppView 31func TestVoteE2E_CreateUpvote(t *testing.T) { 32 // Skip in short mode since this requires real PDS 33 if testing.Short() { 34 t.Skip("Skipping E2E test in short mode") 35 } 36 37 // Setup test database 38 dbURL := os.Getenv("TEST_DATABASE_URL") 39 if dbURL == "" { 40 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 41 } 42 43 db, err := sql.Open("postgres", dbURL) 44 if err != nil { 45 t.Fatalf("Failed to connect to test database: %v", err) 46 } 47 defer func() { 48 if closeErr := db.Close(); closeErr != nil { 49 t.Logf("Failed to close database: %v", closeErr) 50 } 51 }() 52 53 // Run migrations 54 if dialectErr := goose.SetDialect("postgres"); dialectErr != nil { 55 t.Fatalf("Failed to set goose dialect: %v", dialectErr) 56 } 57 if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil { 58 t.Fatalf("Failed to run migrations: %v", migrateErr) 59 } 60 61 // Check if PDS is running 62 pdsURL := os.Getenv("PDS_URL") 63 if pdsURL == "" { 64 pdsURL = "http://localhost:3001" 65 } 66 67 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 68 if err != nil { 69 t.Skipf("PDS not running at %s: %v", pdsURL, err) 70 } 71 func() { 72 if closeErr := healthResp.Body.Close(); closeErr != nil { 73 t.Logf("Failed to close health response: %v", closeErr) 74 } 75 }() 76 77 ctx := context.Background() 78 79 // Setup repositories 80 voteRepo := postgres.NewVoteRepository(db) 81 postRepo := postgres.NewPostRepository(db) 82 83 // Setup services with password-based PDS client factory for E2E testing 84 voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory()) 85 86 // Create test user on PDS 87 testUserHandle := fmt.Sprintf("voter-%d.local.coves.dev", time.Now().Unix()) 88 testUserEmail := fmt.Sprintf("voter-%d@test.local", time.Now().Unix()) 89 testUserPassword := "test-password-123" 90 91 t.Logf("Creating test user on PDS: %s", testUserHandle) 92 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 93 if err != nil { 94 t.Fatalf("Failed to create test user on PDS: %v", err) 95 } 96 t.Logf("Test user created: DID=%s", userDID) 97 98 // Index user in AppView 99 testUser := createTestUser(t, db, testUserHandle, userDID) 100 101 // Create test post to vote on 102 testCommunityDID, err := createFeedTestCommunity(db, ctx, "test-community", "owner.test") 103 if err != nil { 104 t.Fatalf("Failed to create test community: %v", err) 105 } 106 107 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 108 postCID := "bafypost123" 109 110 // Setup OAuth middleware with real PDS access token 111 e2eAuth := NewE2EOAuthMiddleware() 112 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL) 113 114 // Setup HTTP server with XRPC routes 115 r := chi.NewRouter() 116 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware) 117 httpServer := httptest.NewServer(r) 118 defer httpServer.Close() 119 120 // Setup Jetstream consumer 121 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db) 122 123 // ==================================================================================== 124 // TEST: Create upvote on post 125 // ==================================================================================== 126 t.Logf("\n📝 Creating upvote via XRPC endpoint...") 127 128 voteReq := map[string]interface{}{ 129 "subject": map[string]interface{}{ 130 "uri": postURI, 131 "cid": postCID, 132 }, 133 "direction": "up", 134 } 135 136 reqBody, marshalErr := json.Marshal(voteReq) 137 if marshalErr != nil { 138 t.Fatalf("Failed to marshal request: %v", marshalErr) 139 } 140 141 req, err := http.NewRequest(http.MethodPost, 142 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 143 bytes.NewBuffer(reqBody)) 144 if err != nil { 145 t.Fatalf("Failed to create request: %v", err) 146 } 147 req.Header.Set("Content-Type", "application/json") 148 req.Header.Set("Authorization", "Bearer "+token) 149 150 resp, err := http.DefaultClient.Do(req) 151 if err != nil { 152 t.Fatalf("Failed to POST vote: %v", err) 153 } 154 defer func() { _ = resp.Body.Close() }() 155 156 if resp.StatusCode != http.StatusOK { 157 body, readErr := io.ReadAll(resp.Body) 158 if readErr != nil { 159 t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr) 160 } 161 t.Logf("XRPC Vote Failed") 162 t.Logf(" Status: %d", resp.StatusCode) 163 t.Logf(" Response: %s", string(body)) 164 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 165 } 166 167 var voteResp struct { 168 URI string `json:"uri"` 169 CID string `json:"cid"` 170 } 171 172 if decodeErr := json.NewDecoder(resp.Body).Decode(&voteResp); decodeErr != nil { 173 t.Fatalf("Failed to decode vote response: %v", decodeErr) 174 } 175 176 t.Logf("✅ XRPC response received:") 177 t.Logf(" URI: %s", voteResp.URI) 178 t.Logf(" CID: %s", voteResp.CID) 179 180 // Verify vote record was written to PDS 181 t.Logf("\n🔍 Verifying vote record on PDS...") 182 rkey := utils.ExtractRKeyFromURI(voteResp.URI) 183 collection := "social.coves.feed.vote" 184 185 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 186 pdsURL, userDID, collection, rkey)) 187 if pdsErr != nil { 188 t.Fatalf("Failed to fetch vote record from PDS: %v", pdsErr) 189 } 190 defer func() { 191 if closeErr := pdsResp.Body.Close(); closeErr != nil { 192 t.Logf("Failed to close PDS response: %v", closeErr) 193 } 194 }() 195 196 if pdsResp.StatusCode != http.StatusOK { 197 body, _ := io.ReadAll(pdsResp.Body) 198 t.Fatalf("Vote record not found on PDS: status %d, body: %s", pdsResp.StatusCode, string(body)) 199 } 200 201 var pdsRecord struct { 202 Value map[string]interface{} `json:"value"` 203 CID string `json:"cid"` 204 } 205 if decodeErr := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); decodeErr != nil { 206 t.Fatalf("Failed to decode PDS record: %v", decodeErr) 207 } 208 209 t.Logf("✅ Vote record found on PDS:") 210 t.Logf(" CID: %s", pdsRecord.CID) 211 t.Logf(" Direction: %v", pdsRecord.Value["direction"]) 212 213 // Verify direction 214 if pdsRecord.Value["direction"] != "up" { 215 t.Errorf("Expected direction 'up', got %v", pdsRecord.Value["direction"]) 216 } 217 218 // Simulate Jetstream consumer indexing the vote 219 t.Logf("\n🔄 Simulating Jetstream consumer indexing vote...") 220 voteEvent := jetstream.JetstreamEvent{ 221 Did: userDID, 222 TimeUS: time.Now().UnixMicro(), 223 Kind: "commit", 224 Commit: &jetstream.CommitEvent{ 225 Rev: "test-vote-rev", 226 Operation: "create", 227 Collection: "social.coves.feed.vote", 228 RKey: rkey, 229 CID: pdsRecord.CID, 230 Record: map[string]interface{}{ 231 "$type": "social.coves.feed.vote", 232 "subject": map[string]interface{}{ 233 "uri": postURI, 234 "cid": postCID, 235 }, 236 "direction": "up", 237 "createdAt": time.Now().Format(time.RFC3339), 238 }, 239 }, 240 } 241 242 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil { 243 t.Fatalf("Failed to handle vote event: %v", handleErr) 244 } 245 246 // Verify vote was indexed in AppView 247 t.Logf("\n🔍 Verifying vote indexed in AppView...") 248 indexedVote, err := voteRepo.GetByURI(ctx, voteResp.URI) 249 if err != nil { 250 t.Fatalf("Vote not indexed in AppView: %v", err) 251 } 252 253 t.Logf("✅ Vote indexed in AppView:") 254 t.Logf(" VoterDID: %s", indexedVote.VoterDID) 255 t.Logf(" SubjectURI: %s", indexedVote.SubjectURI) 256 t.Logf(" Direction: %s", indexedVote.Direction) 257 t.Logf(" URI: %s", indexedVote.URI) 258 259 // Verify vote details 260 if indexedVote.VoterDID != userDID { 261 t.Errorf("Expected voter_did %s, got %s", userDID, indexedVote.VoterDID) 262 } 263 if indexedVote.SubjectURI != postURI { 264 t.Errorf("Expected subject_uri %s, got %s", postURI, indexedVote.SubjectURI) 265 } 266 if indexedVote.Direction != "up" { 267 t.Errorf("Expected direction 'up', got %s", indexedVote.Direction) 268 } 269 270 // Verify post counts updated 271 t.Logf("\n🔍 Verifying post vote counts updated...") 272 updatedPost, err := postRepo.GetByURI(ctx, postURI) 273 if err != nil { 274 t.Fatalf("Failed to get updated post: %v", err) 275 } 276 277 if updatedPost.UpvoteCount != 1 { 278 t.Errorf("Expected upvote_count = 1, got %d", updatedPost.UpvoteCount) 279 } 280 if updatedPost.Score != 1 { 281 t.Errorf("Expected score = 1, got %d", updatedPost.Score) 282 } 283 284 t.Logf("✅ TRUE E2E UPVOTE FLOW COMPLETE:") 285 t.Logf(" Client → XRPC → PDS Write → Jetstream → Consumer → AppView ✓") 286 t.Logf(" ✓ Vote written to PDS") 287 t.Logf(" ✓ Vote indexed in AppView") 288 t.Logf(" ✓ Post vote counts updated") 289} 290 291// TestVoteE2E_ToggleSameDirection tests voting twice in same direction (toggle off) 292func TestVoteE2E_ToggleSameDirection(t *testing.T) { 293 if testing.Short() { 294 t.Skip("Skipping E2E test in short mode") 295 } 296 297 db := setupTestDB(t) 298 defer func() { _ = db.Close() }() 299 300 ctx := context.Background() 301 pdsURL := getTestPDSURL() 302 303 // Setup repositories and services 304 voteRepo := postgres.NewVoteRepository(db) 305 postRepo := postgres.NewPostRepository(db) 306 307 voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory()) 308 309 // Create test user 310 testUserHandle := fmt.Sprintf("toggle-%d.local.coves.dev", time.Now().Unix()) 311 testUserEmail := fmt.Sprintf("toggle-%d@test.local", time.Now().Unix()) 312 testUserPassword := "test-password-123" 313 314 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 315 if err != nil { 316 t.Skipf("PDS not available: %v", err) 317 } 318 319 testUser := createTestUser(t, db, testUserHandle, userDID) 320 321 // Create test post 322 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "toggle-community", "owner.test") 323 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 324 postCID := "bafypost456" 325 326 // Setup OAuth and HTTP server with real PDS access token 327 e2eAuth := NewE2EOAuthMiddleware() 328 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL) 329 330 r := chi.NewRouter() 331 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware) 332 httpServer := httptest.NewServer(r) 333 defer httpServer.Close() 334 335 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db) 336 337 // First upvote 338 t.Logf("\n📝 Creating first upvote...") 339 voteReq := map[string]interface{}{ 340 "subject": map[string]interface{}{ 341 "uri": postURI, 342 "cid": postCID, 343 }, 344 "direction": "up", 345 } 346 347 reqBody, _ := json.Marshal(voteReq) 348 req, _ := http.NewRequest(http.MethodPost, 349 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 350 bytes.NewBuffer(reqBody)) 351 req.Header.Set("Content-Type", "application/json") 352 req.Header.Set("Authorization", "Bearer "+token) 353 354 resp, err := http.DefaultClient.Do(req) 355 if err != nil { 356 t.Fatalf("Failed to create first vote: %v", err) 357 } 358 359 var firstVoteResp struct { 360 URI string `json:"uri"` 361 CID string `json:"cid"` 362 } 363 if decodeErr := json.NewDecoder(resp.Body).Decode(&firstVoteResp); decodeErr != nil { 364 t.Fatalf("Failed to decode first vote response: %v", decodeErr) 365 } 366 if closeErr := resp.Body.Close(); closeErr != nil { 367 t.Logf("Failed to close response body: %v", closeErr) 368 } 369 370 t.Logf("✅ First vote created: %s", firstVoteResp.URI) 371 372 // Index first vote 373 rkey := utils.ExtractRKeyFromURI(firstVoteResp.URI) 374 voteEvent := jetstream.JetstreamEvent{ 375 Did: userDID, 376 TimeUS: time.Now().UnixMicro(), 377 Kind: "commit", 378 Commit: &jetstream.CommitEvent{ 379 Rev: "test-vote-rev-1", 380 Operation: "create", 381 Collection: "social.coves.feed.vote", 382 RKey: rkey, 383 CID: firstVoteResp.CID, 384 Record: map[string]interface{}{ 385 "$type": "social.coves.feed.vote", 386 "subject": map[string]interface{}{ 387 "uri": postURI, 388 "cid": postCID, 389 }, 390 "direction": "up", 391 "createdAt": time.Now().Format(time.RFC3339), 392 }, 393 }, 394 } 395 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil { 396 t.Fatalf("Failed to handle first vote event: %v", handleErr) 397 } 398 399 // Second upvote (same direction) - should toggle off (delete) 400 t.Logf("\n📝 Creating second upvote (toggle off)...") 401 req2, _ := http.NewRequest(http.MethodPost, 402 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 403 bytes.NewBuffer(reqBody)) 404 req2.Header.Set("Content-Type", "application/json") 405 req2.Header.Set("Authorization", "Bearer "+token) 406 407 resp2, err := http.DefaultClient.Do(req2) 408 if err != nil { 409 t.Fatalf("Failed to toggle vote: %v", err) 410 } 411 defer func() { 412 if closeErr := resp2.Body.Close(); closeErr != nil { 413 t.Logf("Failed to close response body: %v", closeErr) 414 } 415 }() 416 417 if resp2.StatusCode != http.StatusOK { 418 body, _ := io.ReadAll(resp2.Body) 419 t.Fatalf("Expected 200, got %d: %s", resp2.StatusCode, string(body)) 420 } 421 422 t.Logf("✅ Second vote request completed (toggle)") 423 424 // Simulate Jetstream DELETE event 425 t.Logf("\n🔄 Simulating Jetstream DELETE event...") 426 deleteEvent := jetstream.JetstreamEvent{ 427 Did: userDID, 428 TimeUS: time.Now().UnixMicro(), 429 Kind: "commit", 430 Commit: &jetstream.CommitEvent{ 431 Rev: "test-vote-rev-2", 432 Operation: "delete", 433 Collection: "social.coves.feed.vote", 434 RKey: rkey, 435 }, 436 } 437 if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil { 438 t.Fatalf("Failed to handle delete event: %v", handleErr) 439 } 440 441 // Verify vote was removed from AppView 442 t.Logf("\n🔍 Verifying vote removed from AppView...") 443 _, err = voteRepo.GetByURI(ctx, firstVoteResp.URI) 444 if err == nil { 445 t.Error("Expected vote to be deleted, but it still exists") 446 } 447 448 // Verify post counts reset 449 updatedPost, _ := postRepo.GetByURI(ctx, postURI) 450 if updatedPost.UpvoteCount != 0 { 451 t.Errorf("Expected upvote_count = 0 after toggle, got %d", updatedPost.UpvoteCount) 452 } 453 454 t.Logf("✅ TOGGLE SAME DIRECTION FLOW COMPLETE:") 455 t.Logf(" ✓ First vote created and indexed") 456 t.Logf(" ✓ Second vote toggled off (deleted)") 457 t.Logf(" ✓ Post counts updated correctly") 458} 459 460// TestVoteE2E_ToggleDifferentDirection tests changing vote direction 461func TestVoteE2E_ToggleDifferentDirection(t *testing.T) { 462 if testing.Short() { 463 t.Skip("Skipping E2E test in short mode") 464 } 465 466 db := setupTestDB(t) 467 defer func() { _ = db.Close() }() 468 469 ctx := context.Background() 470 pdsURL := getTestPDSURL() 471 472 // Setup repositories and services 473 voteRepo := postgres.NewVoteRepository(db) 474 postRepo := postgres.NewPostRepository(db) 475 476 voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory()) 477 478 // Create test user 479 testUserHandle := fmt.Sprintf("flip-%d.local.coves.dev", time.Now().Unix()) 480 testUserEmail := fmt.Sprintf("flip-%d@test.local", time.Now().Unix()) 481 testUserPassword := "test-password-123" 482 483 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 484 if err != nil { 485 t.Skipf("PDS not available: %v", err) 486 } 487 488 testUser := createTestUser(t, db, testUserHandle, userDID) 489 490 // Create test post 491 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "flip-community", "owner.test") 492 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 493 postCID := "bafypost789" 494 495 // Setup OAuth and HTTP server with real PDS access token 496 e2eAuth := NewE2EOAuthMiddleware() 497 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL) 498 499 r := chi.NewRouter() 500 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware) 501 httpServer := httptest.NewServer(r) 502 defer httpServer.Close() 503 504 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db) 505 506 // Create upvote 507 t.Logf("\n📝 Creating upvote...") 508 upvoteReq := map[string]interface{}{ 509 "subject": map[string]interface{}{ 510 "uri": postURI, 511 "cid": postCID, 512 }, 513 "direction": "up", 514 } 515 516 reqBody, _ := json.Marshal(upvoteReq) 517 req, _ := http.NewRequest(http.MethodPost, 518 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 519 bytes.NewBuffer(reqBody)) 520 req.Header.Set("Content-Type", "application/json") 521 req.Header.Set("Authorization", "Bearer "+token) 522 523 resp, err := http.DefaultClient.Do(req) 524 if err != nil { 525 t.Fatalf("Failed to create upvote: %v", err) 526 } 527 var upvoteResp struct { 528 URI string `json:"uri"` 529 CID string `json:"cid"` 530 } 531 if decodeErr := json.NewDecoder(resp.Body).Decode(&upvoteResp); decodeErr != nil { 532 t.Fatalf("Failed to decode upvote response: %v", decodeErr) 533 } 534 if closeErr := resp.Body.Close(); closeErr != nil { 535 t.Logf("Failed to close response body: %v", closeErr) 536 } 537 538 // Index upvote 539 rkey := utils.ExtractRKeyFromURI(upvoteResp.URI) 540 upvoteEvent := jetstream.JetstreamEvent{ 541 Did: userDID, 542 TimeUS: time.Now().UnixMicro(), 543 Kind: "commit", 544 Commit: &jetstream.CommitEvent{ 545 Rev: "test-vote-rev-up", 546 Operation: "create", 547 Collection: "social.coves.feed.vote", 548 RKey: rkey, 549 CID: upvoteResp.CID, 550 Record: map[string]interface{}{ 551 "$type": "social.coves.feed.vote", 552 "subject": map[string]interface{}{ 553 "uri": postURI, 554 "cid": postCID, 555 }, 556 "direction": "up", 557 "createdAt": time.Now().Format(time.RFC3339), 558 }, 559 }, 560 } 561 if handleErr := voteConsumer.HandleEvent(ctx, &upvoteEvent); handleErr != nil { 562 t.Fatalf("Failed to handle upvote event: %v", handleErr) 563 } 564 565 t.Logf("✅ Upvote created and indexed") 566 567 // Change to downvote 568 t.Logf("\n📝 Changing to downvote...") 569 downvoteReq := map[string]interface{}{ 570 "subject": map[string]interface{}{ 571 "uri": postURI, 572 "cid": postCID, 573 }, 574 "direction": "down", 575 } 576 577 reqBody2, _ := json.Marshal(downvoteReq) 578 req2, _ := http.NewRequest(http.MethodPost, 579 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 580 bytes.NewBuffer(reqBody2)) 581 req2.Header.Set("Content-Type", "application/json") 582 req2.Header.Set("Authorization", "Bearer "+token) 583 584 resp2, err := http.DefaultClient.Do(req2) 585 if err != nil { 586 t.Fatalf("Failed to create downvote: %v", err) 587 } 588 var downvoteResp struct { 589 URI string `json:"uri"` 590 CID string `json:"cid"` 591 } 592 if decodeErr := json.NewDecoder(resp2.Body).Decode(&downvoteResp); decodeErr != nil { 593 t.Fatalf("Failed to decode downvote response: %v", decodeErr) 594 } 595 if closeErr := resp2.Body.Close(); closeErr != nil { 596 t.Logf("Failed to close response body: %v", closeErr) 597 } 598 599 // The service flow for direction change is: 600 // 1. DELETE old vote on PDS 601 // 2. CREATE new vote with NEW rkey on PDS 602 // So we simulate DELETE + CREATE events (not UPDATE) 603 604 // Simulate Jetstream DELETE event for old vote 605 t.Logf("\n🔄 Simulating Jetstream DELETE event for old upvote...") 606 deleteEvent := jetstream.JetstreamEvent{ 607 Did: userDID, 608 TimeUS: time.Now().UnixMicro(), 609 Kind: "commit", 610 Commit: &jetstream.CommitEvent{ 611 Rev: "test-vote-rev-delete", 612 Operation: "delete", 613 Collection: "social.coves.feed.vote", 614 RKey: rkey, // Old upvote rkey 615 }, 616 } 617 if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil { 618 t.Fatalf("Failed to handle delete event: %v", handleErr) 619 } 620 621 // Simulate Jetstream CREATE event for new downvote 622 t.Logf("\n🔄 Simulating Jetstream CREATE event for new downvote...") 623 newRkey := utils.ExtractRKeyFromURI(downvoteResp.URI) 624 createEvent := jetstream.JetstreamEvent{ 625 Did: userDID, 626 TimeUS: time.Now().UnixMicro(), 627 Kind: "commit", 628 Commit: &jetstream.CommitEvent{ 629 Rev: "test-vote-rev-down", 630 Operation: "create", 631 Collection: "social.coves.feed.vote", 632 RKey: newRkey, // NEW rkey from downvote response 633 CID: downvoteResp.CID, 634 Record: map[string]interface{}{ 635 "$type": "social.coves.feed.vote", 636 "subject": map[string]interface{}{ 637 "uri": postURI, 638 "cid": postCID, 639 }, 640 "direction": "down", 641 "createdAt": time.Now().Format(time.RFC3339), 642 }, 643 }, 644 } 645 if handleErr := voteConsumer.HandleEvent(ctx, &createEvent); handleErr != nil { 646 t.Fatalf("Failed to handle create event: %v", handleErr) 647 } 648 649 // Verify old upvote was deleted 650 t.Logf("\n🔍 Verifying old upvote was deleted...") 651 _, err = voteRepo.GetByURI(ctx, upvoteResp.URI) 652 if err == nil { 653 t.Error("Expected old upvote to be deleted, but it still exists") 654 } 655 656 // Verify new downvote was indexed 657 t.Logf("\n🔍 Verifying new downvote indexed in AppView...") 658 newVote, err := voteRepo.GetByURI(ctx, downvoteResp.URI) 659 if err != nil { 660 t.Fatalf("New downvote not found: %v", err) 661 } 662 663 if newVote.Direction != "down" { 664 t.Errorf("Expected direction 'down', got %s", newVote.Direction) 665 } 666 667 // Verify post counts updated 668 updatedPost, _ := postRepo.GetByURI(ctx, postURI) 669 if updatedPost.UpvoteCount != 0 { 670 t.Errorf("Expected upvote_count = 0, got %d", updatedPost.UpvoteCount) 671 } 672 if updatedPost.DownvoteCount != 1 { 673 t.Errorf("Expected downvote_count = 1, got %d", updatedPost.DownvoteCount) 674 } 675 if updatedPost.Score != -1 { 676 t.Errorf("Expected score = -1, got %d", updatedPost.Score) 677 } 678 679 t.Logf("✅ TOGGLE DIFFERENT DIRECTION FLOW COMPLETE:") 680 t.Logf(" ✓ Upvote created (score: +1)") 681 t.Logf(" ✓ Changed to downvote (score: -1)") 682 t.Logf(" ✓ Post counts updated correctly") 683} 684 685// TestVoteE2E_DeleteVote tests explicit vote deletion 686func TestVoteE2E_DeleteVote(t *testing.T) { 687 if testing.Short() { 688 t.Skip("Skipping E2E test in short mode") 689 } 690 691 db := setupTestDB(t) 692 defer func() { _ = db.Close() }() 693 694 ctx := context.Background() 695 pdsURL := getTestPDSURL() 696 697 // Setup repositories and services 698 voteRepo := postgres.NewVoteRepository(db) 699 postRepo := postgres.NewPostRepository(db) 700 701 voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory()) 702 703 // Create test user 704 testUserHandle := fmt.Sprintf("delete-%d.local.coves.dev", time.Now().Unix()) 705 testUserEmail := fmt.Sprintf("delete-%d@test.local", time.Now().Unix()) 706 testUserPassword := "test-password-123" 707 708 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 709 if err != nil { 710 t.Skipf("PDS not available: %v", err) 711 } 712 713 testUser := createTestUser(t, db, testUserHandle, userDID) 714 715 // Create test post 716 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "delete-community", "owner.test") 717 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 718 postCID := "bafypost999" 719 720 // Setup OAuth and HTTP server with real PDS access token 721 e2eAuth := NewE2EOAuthMiddleware() 722 token := e2eAuth.AddUserWithPDSToken(userDID, pdsAccessToken, pdsURL) 723 724 r := chi.NewRouter() 725 routes.RegisterVoteRoutes(r, voteService, e2eAuth.OAuthAuthMiddleware) 726 httpServer := httptest.NewServer(r) 727 defer httpServer.Close() 728 729 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db) 730 731 // Create vote first 732 t.Logf("\n📝 Creating vote to delete...") 733 voteReq := map[string]interface{}{ 734 "subject": map[string]interface{}{ 735 "uri": postURI, 736 "cid": postCID, 737 }, 738 "direction": "up", 739 } 740 741 reqBody, _ := json.Marshal(voteReq) 742 req, _ := http.NewRequest(http.MethodPost, 743 httpServer.URL+"/xrpc/social.coves.feed.vote.create", 744 bytes.NewBuffer(reqBody)) 745 req.Header.Set("Content-Type", "application/json") 746 req.Header.Set("Authorization", "Bearer "+token) 747 748 resp, err := http.DefaultClient.Do(req) 749 if err != nil { 750 t.Fatalf("Failed to create vote: %v", err) 751 } 752 var voteResp struct { 753 URI string `json:"uri"` 754 CID string `json:"cid"` 755 } 756 if decodeErr := json.NewDecoder(resp.Body).Decode(&voteResp); decodeErr != nil { 757 t.Fatalf("Failed to decode vote response: %v", decodeErr) 758 } 759 if closeErr := resp.Body.Close(); closeErr != nil { 760 t.Logf("Failed to close response body: %v", closeErr) 761 } 762 763 // Index vote 764 rkey := utils.ExtractRKeyFromURI(voteResp.URI) 765 voteEvent := jetstream.JetstreamEvent{ 766 Did: userDID, 767 TimeUS: time.Now().UnixMicro(), 768 Kind: "commit", 769 Commit: &jetstream.CommitEvent{ 770 Rev: "test-vote-create", 771 Operation: "create", 772 Collection: "social.coves.feed.vote", 773 RKey: rkey, 774 CID: voteResp.CID, 775 Record: map[string]interface{}{ 776 "$type": "social.coves.feed.vote", 777 "subject": map[string]interface{}{ 778 "uri": postURI, 779 "cid": postCID, 780 }, 781 "direction": "up", 782 "createdAt": time.Now().Format(time.RFC3339), 783 }, 784 }, 785 } 786 if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil { 787 t.Fatalf("Failed to handle vote event: %v", handleErr) 788 } 789 790 t.Logf("✅ Vote created and indexed") 791 792 // Delete vote via XRPC 793 t.Logf("\n📝 Deleting vote via XRPC...") 794 deleteReq := map[string]interface{}{ 795 "subject": map[string]interface{}{ 796 "uri": postURI, 797 "cid": postCID, 798 }, 799 } 800 801 deleteBody, _ := json.Marshal(deleteReq) 802 deleteHttpReq, _ := http.NewRequest(http.MethodPost, 803 httpServer.URL+"/xrpc/social.coves.feed.vote.delete", 804 bytes.NewBuffer(deleteBody)) 805 deleteHttpReq.Header.Set("Content-Type", "application/json") 806 deleteHttpReq.Header.Set("Authorization", "Bearer "+token) 807 808 deleteResp, err := http.DefaultClient.Do(deleteHttpReq) 809 if err != nil { 810 t.Fatalf("Failed to delete vote: %v", err) 811 } 812 defer func() { 813 if closeErr := deleteResp.Body.Close(); closeErr != nil { 814 t.Logf("Failed to close response body: %v", closeErr) 815 } 816 }() 817 818 if deleteResp.StatusCode != http.StatusOK { 819 body, _ := io.ReadAll(deleteResp.Body) 820 t.Fatalf("Delete failed: status %d, body: %s", deleteResp.StatusCode, string(body)) 821 } 822 823 // Per lexicon, delete returns empty object {} 824 var deleteRespBody map[string]interface{} 825 if decodeErr := json.NewDecoder(deleteResp.Body).Decode(&deleteRespBody); decodeErr != nil { 826 t.Fatalf("Failed to decode delete response: %v", decodeErr) 827 } 828 829 if len(deleteRespBody) != 0 { 830 t.Errorf("Expected empty object per lexicon, got %v", deleteRespBody) 831 } 832 833 t.Logf("✅ Delete vote request succeeded") 834 835 // Simulate Jetstream DELETE event 836 t.Logf("\n🔄 Simulating Jetstream DELETE event...") 837 deleteEvent := jetstream.JetstreamEvent{ 838 Did: userDID, 839 TimeUS: time.Now().UnixMicro(), 840 Kind: "commit", 841 Commit: &jetstream.CommitEvent{ 842 Rev: "test-vote-delete", 843 Operation: "delete", 844 Collection: "social.coves.feed.vote", 845 RKey: rkey, 846 }, 847 } 848 if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil { 849 t.Fatalf("Failed to handle delete event: %v", handleErr) 850 } 851 852 // Verify vote removed from AppView 853 t.Logf("\n🔍 Verifying vote removed from AppView...") 854 _, err = voteRepo.GetByURI(ctx, voteResp.URI) 855 if err == nil { 856 t.Error("Expected vote to be deleted, but it still exists") 857 } 858 859 // Verify post counts reset 860 updatedPost, _ := postRepo.GetByURI(ctx, postURI) 861 if updatedPost.UpvoteCount != 0 { 862 t.Errorf("Expected upvote_count = 0 after delete, got %d", updatedPost.UpvoteCount) 863 } 864 if updatedPost.Score != 0 { 865 t.Errorf("Expected score = 0 after delete, got %d", updatedPost.Score) 866 } 867 868 t.Logf("✅ EXPLICIT DELETE FLOW COMPLETE:") 869 t.Logf(" ✓ Vote created and indexed") 870 t.Logf(" ✓ Vote deleted via XRPC") 871 t.Logf(" ✓ Vote removed from AppView") 872 t.Logf(" ✓ Post counts updated correctly") 873} 874 875// TestVoteE2E_JetstreamIndexing tests real Jetstream firehose consumption 876func TestVoteE2E_JetstreamIndexing(t *testing.T) { 877 if testing.Short() { 878 t.Skip("Skipping E2E test in short mode") 879 } 880 881 db := setupTestDB(t) 882 defer func() { _ = db.Close() }() 883 884 ctx := context.Background() 885 pdsURL := getTestPDSURL() 886 887 // Setup repositories 888 voteRepo := postgres.NewVoteRepository(db) 889 890 // Create test user on PDS 891 testUserHandle := fmt.Sprintf("jetstream-%d.local.coves.dev", time.Now().Unix()) 892 testUserEmail := fmt.Sprintf("jetstream-%d@test.local", time.Now().Unix()) 893 testUserPassword := "test-password-123" 894 895 accessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 896 if err != nil { 897 t.Skipf("PDS not available: %v", err) 898 } 899 900 testUser := createTestUser(t, db, testUserHandle, userDID) 901 902 // Create test post 903 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "jetstream-community", "owner.test") 904 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 905 postCID := "bafypostjetstream" 906 907 // Write vote directly to PDS 908 t.Logf("\n📝 Writing vote to PDS...") 909 voteRecord := map[string]interface{}{ 910 "$type": "social.coves.feed.vote", 911 "subject": map[string]interface{}{ 912 "uri": postURI, 913 "cid": postCID, 914 }, 915 "direction": "up", 916 "createdAt": time.Now().Format(time.RFC3339), 917 } 918 919 voteURI, voteCID, err := writePDSRecord(pdsURL, accessToken, userDID, "social.coves.feed.vote", "", voteRecord) 920 if err != nil { 921 t.Fatalf("Failed to write vote to PDS: %v", err) 922 } 923 924 t.Logf("✅ Vote written to PDS:") 925 t.Logf(" URI: %s", voteURI) 926 t.Logf(" CID: %s", voteCID) 927 928 // Setup Jetstream consumer 929 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, nil, db) 930 931 // Subscribe to Jetstream 932 t.Logf("\n🔄 Subscribing to real Jetstream firehose...") 933 pdsHostname := strings.TrimPrefix(pdsURL, "http://") 934 pdsHostname = strings.TrimPrefix(pdsHostname, "https://") 935 pdsHostname = strings.Split(pdsHostname, ":")[0] 936 937 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.feed.vote", pdsHostname) 938 t.Logf(" Jetstream URL: %s", jetstreamURL) 939 t.Logf(" Looking for vote DID: %s", userDID) 940 941 // Channels for event communication 942 eventChan := make(chan *jetstream.JetstreamEvent, 10) 943 errorChan := make(chan error, 1) 944 done := make(chan bool) 945 946 // Start Jetstream consumer in background 947 go func() { 948 err := subscribeToJetstreamForVote(ctx, jetstreamURL, userDID, voteConsumer, eventChan, errorChan, done) 949 if err != nil { 950 errorChan <- err 951 } 952 }() 953 954 // Wait for event or timeout 955 t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...") 956 957 select { 958 case event := <-eventChan: 959 t.Logf("✅ Received real Jetstream event!") 960 t.Logf(" Event DID: %s", event.Did) 961 t.Logf(" Collection: %s", event.Commit.Collection) 962 t.Logf(" Operation: %s", event.Commit.Operation) 963 t.Logf(" RKey: %s", event.Commit.RKey) 964 965 // Verify it's our vote 966 if event.Did != userDID { 967 t.Errorf("Expected DID %s, got %s", userDID, event.Did) 968 } 969 970 // Verify indexed in AppView database 971 t.Logf("\n🔍 Querying AppView database...") 972 indexedVote, err := voteRepo.GetByURI(ctx, voteURI) 973 if err != nil { 974 t.Fatalf("Vote not indexed in AppView: %v", err) 975 } 976 977 t.Logf("✅ Vote indexed in AppView:") 978 t.Logf(" VoterDID: %s", indexedVote.VoterDID) 979 t.Logf(" SubjectURI: %s", indexedVote.SubjectURI) 980 t.Logf(" Direction: %s", indexedVote.Direction) 981 t.Logf(" URI: %s", indexedVote.URI) 982 983 // Signal to stop Jetstream consumer 984 close(done) 985 986 case err := <-errorChan: 987 t.Fatalf("Jetstream error: %v", err) 988 989 case <-time.After(30 * time.Second): 990 t.Fatalf("Timeout: No Jetstream event received within 30 seconds") 991 } 992 993 t.Logf("\n✅ TRUE E2E JETSTREAM FLOW COMPLETE:") 994 t.Logf(" PDS → Jetstream → Consumer → AppView ✓") 995} 996 997// subscribeToJetstreamForVote subscribes to real Jetstream firehose for vote events 998func subscribeToJetstreamForVote( 999 ctx context.Context, 1000 jetstreamURL string, 1001 targetDID string, 1002 consumer *jetstream.VoteEventConsumer, 1003 eventChan chan<- *jetstream.JetstreamEvent, 1004 errorChan chan<- error, 1005 done <-chan bool, 1006) error { 1007 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 1008 if err != nil { 1009 return fmt.Errorf("failed to connect to Jetstream: %w", err) 1010 } 1011 defer func() { _ = conn.Close() }() 1012 1013 // Read messages until we find our event or receive done signal 1014 for { 1015 select { 1016 case <-done: 1017 return nil 1018 case <-ctx.Done(): 1019 return ctx.Err() 1020 default: 1021 // Set read deadline to avoid blocking forever 1022 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { 1023 return fmt.Errorf("failed to set read deadline: %w", err) 1024 } 1025 1026 var event jetstream.JetstreamEvent 1027 err := conn.ReadJSON(&event) 1028 if err != nil { 1029 // Check if it's a timeout (expected) 1030 if websocket.IsCloseError(err, websocket.CloseNormalClosure) { 1031 return nil 1032 } 1033 if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 1034 continue // Timeout is expected, keep listening 1035 } 1036 return fmt.Errorf("failed to read Jetstream message: %w", err) 1037 } 1038 1039 // Check if this is the event we're looking for 1040 if event.Did == targetDID && event.Kind == "commit" && event.Commit.Collection == "social.coves.feed.vote" { 1041 // Process the event through the consumer 1042 if err := consumer.HandleEvent(ctx, &event); err != nil { 1043 return fmt.Errorf("failed to process event: %w", err) 1044 } 1045 1046 // Send to channel so test can verify 1047 select { 1048 case eventChan <- &event: 1049 return nil 1050 case <-time.After(1 * time.Second): 1051 return fmt.Errorf("timeout sending event to channel") 1052 } 1053 } 1054 } 1055 } 1056}