···
t.Logf("Failed to close response body: %v", closeErr)
607
-
// Simulate Jetstream UPDATE event (PDS updates the existing record)
608
-
t.Logf("\n๐ Simulating Jetstream UPDATE event...")
609
-
updateEvent := jetstream.JetstreamEvent{
607
+
// The service flow for direction change is:
608
+
// 1. DELETE old vote on PDS
609
+
// 2. CREATE new vote with NEW rkey on PDS
610
+
// So we simulate DELETE + CREATE events (not UPDATE)
612
+
// Simulate Jetstream DELETE event for old vote
613
+
t.Logf("\n๐ Simulating Jetstream DELETE event for old upvote...")
614
+
deleteEvent := jetstream.JetstreamEvent{
616
+
TimeUS: time.Now().UnixMicro(),
618
+
Commit: &jetstream.CommitEvent{
619
+
Rev: "test-vote-rev-delete",
620
+
Operation: "delete",
621
+
Collection: "social.coves.feed.vote",
622
+
RKey: rkey, // Old upvote rkey
625
+
if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil {
626
+
t.Fatalf("Failed to handle delete event: %v", handleErr)
629
+
// Simulate Jetstream CREATE event for new downvote
630
+
t.Logf("\n๐ Simulating Jetstream CREATE event for new downvote...")
631
+
newRkey := utils.ExtractRKeyFromURI(downvoteResp.URI)
632
+
createEvent := jetstream.JetstreamEvent{
TimeUS: time.Now().UnixMicro(),
Commit: &jetstream.CommitEvent{
Rev: "test-vote-rev-down",
615
-
Operation: "update",
638
+
Operation: "create",
Collection: "social.coves.feed.vote",
617
-
RKey: rkey, // Same rkey as before
640
+
RKey: newRkey, // NEW rkey from downvote response
Record: map[string]interface{}{
"$type": "social.coves.feed.vote",
···
625
-
"direction": "down", // Changed direction
648
+
"direction": "down",
"createdAt": time.Now().Format(time.RFC3339),
630
-
if handleErr := voteConsumer.HandleEvent(ctx, &updateEvent); handleErr != nil {
631
-
t.Fatalf("Failed to handle update event: %v", handleErr)
653
+
if handleErr := voteConsumer.HandleEvent(ctx, &createEvent); handleErr != nil {
654
+
t.Fatalf("Failed to handle create event: %v", handleErr)
657
+
// Verify old upvote was deleted
658
+
t.Logf("\n๐ Verifying old upvote was deleted...")
659
+
_, err = voteRepo.GetByURI(ctx, upvoteResp.URI)
661
+
t.Error("Expected old upvote to be deleted, but it still exists")
634
-
// Verify vote direction changed in AppView
635
-
t.Logf("\n๐ Verifying vote direction changed in AppView...")
636
-
updatedVote, err := voteRepo.GetByURI(ctx, upvoteResp.URI)
664
+
// Verify new downvote was indexed
665
+
t.Logf("\n๐ Verifying new downvote indexed in AppView...")
666
+
newVote, err := voteRepo.GetByURI(ctx, downvoteResp.URI)
638
-
t.Fatalf("Vote not found after update: %v", err)
668
+
t.Fatalf("New downvote not found: %v", err)
641
-
if updatedVote.Direction != "down" {
642
-
t.Errorf("Expected direction 'down', got %s", updatedVote.Direction)
671
+
if newVote.Direction != "down" {
672
+
t.Errorf("Expected direction 'down', got %s", newVote.Direction)
// Verify post counts updated