A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "context" 5 "fmt" 6 "testing" 7 "time" 8 9 "Coves/internal/atproto/jetstream" 10 "Coves/internal/core/comments" 11 "Coves/internal/db/postgres" 12) 13 14func TestCommentConsumer_CreateComment(t *testing.T) { 15 db := setupTestDB(t) 16 defer func() { 17 if err := db.Close(); err != nil { 18 t.Logf("Failed to close database: %v", err) 19 } 20 }() 21 22 ctx := context.Background() 23 commentRepo := postgres.NewCommentRepository(db) 24 consumer := jetstream.NewCommentEventConsumer(commentRepo, db) 25 26 // Setup test data 27 testUser := createTestUser(t, db, "commenter.test", "did:plc:commenter123") 28 testCommunity, err := createFeedTestCommunity(db, ctx, "testcommunity", "owner.test") 29 if err != nil { 30 t.Fatalf("Failed to create test community: %v", err) 31 } 32 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Test Post", 0, time.Now()) 33 34 t.Run("Create comment on post", func(t *testing.T) { 35 rkey := generateTID() 36 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey) 37 38 // Simulate Jetstream comment create event 39 event := &jetstream.JetstreamEvent{ 40 Did: testUser.DID, 41 Kind: "commit", 42 Commit: &jetstream.CommitEvent{ 43 Rev: "test-rev", 44 Operation: "create", 45 Collection: "social.coves.feed.comment", 46 RKey: rkey, 47 CID: "bafytest123", 48 Record: map[string]interface{}{ 49 "$type": "social.coves.feed.comment", 50 "content": "This is a test comment on a post!", 51 "reply": map[string]interface{}{ 52 "root": map[string]interface{}{ 53 "uri": testPostURI, 54 "cid": "bafypost", 55 }, 56 "parent": map[string]interface{}{ 57 "uri": testPostURI, 58 "cid": "bafypost", 59 }, 60 }, 61 "createdAt": time.Now().Format(time.RFC3339), 62 }, 63 }, 64 } 65 66 // Handle the event 67 err := consumer.HandleEvent(ctx, event) 68 if err != nil { 69 t.Fatalf("Failed to handle comment create event: %v", err) 70 } 71 72 // Verify comment was indexed 73 comment, err := commentRepo.GetByURI(ctx, uri) 74 if err != nil { 75 t.Fatalf("Failed to get indexed comment: %v", err) 76 } 77 78 if comment.URI != uri { 79 t.Errorf("Expected URI %s, got %s", uri, comment.URI) 80 } 81 82 if comment.CommenterDID != testUser.DID { 83 t.Errorf("Expected commenter %s, got %s", testUser.DID, comment.CommenterDID) 84 } 85 86 if comment.Content != "This is a test comment on a post!" { 87 t.Errorf("Expected content 'This is a test comment on a post!', got %s", comment.Content) 88 } 89 90 if comment.RootURI != testPostURI { 91 t.Errorf("Expected root URI %s, got %s", testPostURI, comment.RootURI) 92 } 93 94 if comment.ParentURI != testPostURI { 95 t.Errorf("Expected parent URI %s, got %s", testPostURI, comment.ParentURI) 96 } 97 98 // Verify post comment count was incremented 99 var commentCount int 100 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&commentCount) 101 if err != nil { 102 t.Fatalf("Failed to get post comment count: %v", err) 103 } 104 105 if commentCount != 1 { 106 t.Errorf("Expected post comment_count to be 1, got %d", commentCount) 107 } 108 }) 109 110 t.Run("Idempotent create - duplicate event", func(t *testing.T) { 111 rkey := generateTID() 112 113 event := &jetstream.JetstreamEvent{ 114 Did: testUser.DID, 115 Kind: "commit", 116 Commit: &jetstream.CommitEvent{ 117 Rev: "test-rev", 118 Operation: "create", 119 Collection: "social.coves.feed.comment", 120 RKey: rkey, 121 CID: "bafytest456", 122 Record: map[string]interface{}{ 123 "$type": "social.coves.feed.comment", 124 "content": "Idempotent test comment", 125 "reply": map[string]interface{}{ 126 "root": map[string]interface{}{ 127 "uri": testPostURI, 128 "cid": "bafypost", 129 }, 130 "parent": map[string]interface{}{ 131 "uri": testPostURI, 132 "cid": "bafypost", 133 }, 134 }, 135 "createdAt": time.Now().Format(time.RFC3339), 136 }, 137 }, 138 } 139 140 // First creation 141 err := consumer.HandleEvent(ctx, event) 142 if err != nil { 143 t.Fatalf("First creation failed: %v", err) 144 } 145 146 // Get initial comment count 147 var initialCount int 148 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&initialCount) 149 if err != nil { 150 t.Fatalf("Failed to get initial comment count: %v", err) 151 } 152 153 // Duplicate creation - should be idempotent 154 err = consumer.HandleEvent(ctx, event) 155 if err != nil { 156 t.Fatalf("Duplicate event should be handled gracefully: %v", err) 157 } 158 159 // Verify count wasn't incremented again 160 var finalCount int 161 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&finalCount) 162 if err != nil { 163 t.Fatalf("Failed to get final comment count: %v", err) 164 } 165 166 if finalCount != initialCount { 167 t.Errorf("Comment count should not increase on duplicate event. Initial: %d, Final: %d", initialCount, finalCount) 168 } 169 }) 170} 171 172func TestCommentConsumer_Threading(t *testing.T) { 173 db := setupTestDB(t) 174 defer func() { 175 if err := db.Close(); err != nil { 176 t.Logf("Failed to close database: %v", err) 177 } 178 }() 179 180 ctx := context.Background() 181 commentRepo := postgres.NewCommentRepository(db) 182 consumer := jetstream.NewCommentEventConsumer(commentRepo, db) 183 184 // Setup test data 185 testUser := createTestUser(t, db, "threader.test", "did:plc:threader123") 186 testCommunity, err := createFeedTestCommunity(db, ctx, "threadcommunity", "owner2.test") 187 if err != nil { 188 t.Fatalf("Failed to create test community: %v", err) 189 } 190 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Threading Test", 0, time.Now()) 191 192 t.Run("Create nested comment replies", func(t *testing.T) { 193 // Create first-level comment on post 194 comment1Rkey := generateTID() 195 comment1URI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, comment1Rkey) 196 197 event1 := &jetstream.JetstreamEvent{ 198 Did: testUser.DID, 199 Kind: "commit", 200 Commit: &jetstream.CommitEvent{ 201 Operation: "create", 202 Collection: "social.coves.feed.comment", 203 RKey: comment1Rkey, 204 CID: "bafycomment1", 205 Record: map[string]interface{}{ 206 "content": "First level comment", 207 "reply": map[string]interface{}{ 208 "root": map[string]interface{}{ 209 "uri": testPostURI, 210 "cid": "bafypost", 211 }, 212 "parent": map[string]interface{}{ 213 "uri": testPostURI, 214 "cid": "bafypost", 215 }, 216 }, 217 "createdAt": time.Now().Format(time.RFC3339), 218 }, 219 }, 220 } 221 222 err := consumer.HandleEvent(ctx, event1) 223 if err != nil { 224 t.Fatalf("Failed to create first-level comment: %v", err) 225 } 226 227 // Create second-level comment (reply to first comment) 228 comment2Rkey := generateTID() 229 comment2URI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, comment2Rkey) 230 231 event2 := &jetstream.JetstreamEvent{ 232 Did: testUser.DID, 233 Kind: "commit", 234 Commit: &jetstream.CommitEvent{ 235 Operation: "create", 236 Collection: "social.coves.feed.comment", 237 RKey: comment2Rkey, 238 CID: "bafycomment2", 239 Record: map[string]interface{}{ 240 "content": "Second level comment (reply to first)", 241 "reply": map[string]interface{}{ 242 "root": map[string]interface{}{ 243 "uri": testPostURI, 244 "cid": "bafypost", 245 }, 246 "parent": map[string]interface{}{ 247 "uri": comment1URI, 248 "cid": "bafycomment1", 249 }, 250 }, 251 "createdAt": time.Now().Add(1 * time.Second).Format(time.RFC3339), 252 }, 253 }, 254 } 255 256 err = consumer.HandleEvent(ctx, event2) 257 if err != nil { 258 t.Fatalf("Failed to create second-level comment: %v", err) 259 } 260 261 // Verify threading structure 262 comment1, err := commentRepo.GetByURI(ctx, comment1URI) 263 if err != nil { 264 t.Fatalf("Failed to get first comment: %v", err) 265 } 266 267 comment2, err := commentRepo.GetByURI(ctx, comment2URI) 268 if err != nil { 269 t.Fatalf("Failed to get second comment: %v", err) 270 } 271 272 // Both should have same root (original post) 273 if comment1.RootURI != testPostURI { 274 t.Errorf("Comment1 root should be post URI, got %s", comment1.RootURI) 275 } 276 277 if comment2.RootURI != testPostURI { 278 t.Errorf("Comment2 root should be post URI, got %s", comment2.RootURI) 279 } 280 281 // Comment1 parent should be post 282 if comment1.ParentURI != testPostURI { 283 t.Errorf("Comment1 parent should be post URI, got %s", comment1.ParentURI) 284 } 285 286 // Comment2 parent should be comment1 287 if comment2.ParentURI != comment1URI { 288 t.Errorf("Comment2 parent should be comment1 URI, got %s", comment2.ParentURI) 289 } 290 291 // Verify reply count on comment1 292 if comment1.ReplyCount != 1 { 293 t.Errorf("Comment1 should have 1 reply, got %d", comment1.ReplyCount) 294 } 295 296 // Query all comments by root 297 allComments, err := commentRepo.ListByRoot(ctx, testPostURI, 100, 0) 298 if err != nil { 299 t.Fatalf("Failed to list comments by root: %v", err) 300 } 301 302 if len(allComments) != 2 { 303 t.Errorf("Expected 2 comments in thread, got %d", len(allComments)) 304 } 305 306 // Query direct replies to post 307 directReplies, err := commentRepo.ListByParent(ctx, testPostURI, 100, 0) 308 if err != nil { 309 t.Fatalf("Failed to list direct replies to post: %v", err) 310 } 311 312 if len(directReplies) != 1 { 313 t.Errorf("Expected 1 direct reply to post, got %d", len(directReplies)) 314 } 315 316 // Query replies to comment1 317 comment1Replies, err := commentRepo.ListByParent(ctx, comment1URI, 100, 0) 318 if err != nil { 319 t.Fatalf("Failed to list replies to comment1: %v", err) 320 } 321 322 if len(comment1Replies) != 1 { 323 t.Errorf("Expected 1 reply to comment1, got %d", len(comment1Replies)) 324 } 325 }) 326} 327 328func TestCommentConsumer_UpdateComment(t *testing.T) { 329 db := setupTestDB(t) 330 defer func() { 331 if err := db.Close(); err != nil { 332 t.Logf("Failed to close database: %v", err) 333 } 334 }() 335 336 ctx := context.Background() 337 commentRepo := postgres.NewCommentRepository(db) 338 consumer := jetstream.NewCommentEventConsumer(commentRepo, db) 339 340 // Setup test data 341 testUser := createTestUser(t, db, "editor.test", "did:plc:editor123") 342 testCommunity, err := createFeedTestCommunity(db, ctx, "editcommunity", "owner3.test") 343 if err != nil { 344 t.Fatalf("Failed to create test community: %v", err) 345 } 346 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Edit Test", 0, time.Now()) 347 348 t.Run("Update comment content preserves vote counts", func(t *testing.T) { 349 rkey := generateTID() 350 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey) 351 352 // Create initial comment 353 createEvent := &jetstream.JetstreamEvent{ 354 Did: testUser.DID, 355 Kind: "commit", 356 Commit: &jetstream.CommitEvent{ 357 Operation: "create", 358 Collection: "social.coves.feed.comment", 359 RKey: rkey, 360 CID: "bafyoriginal", 361 Record: map[string]interface{}{ 362 "content": "Original comment content", 363 "reply": map[string]interface{}{ 364 "root": map[string]interface{}{ 365 "uri": testPostURI, 366 "cid": "bafypost", 367 }, 368 "parent": map[string]interface{}{ 369 "uri": testPostURI, 370 "cid": "bafypost", 371 }, 372 }, 373 "createdAt": time.Now().Format(time.RFC3339), 374 }, 375 }, 376 } 377 378 err := consumer.HandleEvent(ctx, createEvent) 379 if err != nil { 380 t.Fatalf("Failed to create comment: %v", err) 381 } 382 383 // Manually set vote counts to simulate votes 384 _, err = db.ExecContext(ctx, ` 385 UPDATE comments 386 SET upvote_count = 5, downvote_count = 2, score = 3 387 WHERE uri = $1 388 `, uri) 389 if err != nil { 390 t.Fatalf("Failed to set vote counts: %v", err) 391 } 392 393 // Update the comment 394 updateEvent := &jetstream.JetstreamEvent{ 395 Did: testUser.DID, 396 Kind: "commit", 397 Commit: &jetstream.CommitEvent{ 398 Operation: "update", 399 Collection: "social.coves.feed.comment", 400 RKey: rkey, 401 CID: "bafyupdated", 402 Record: map[string]interface{}{ 403 "content": "EDITED: Updated comment content", 404 "reply": map[string]interface{}{ 405 "root": map[string]interface{}{ 406 "uri": testPostURI, 407 "cid": "bafypost", 408 }, 409 "parent": map[string]interface{}{ 410 "uri": testPostURI, 411 "cid": "bafypost", 412 }, 413 }, 414 "createdAt": time.Now().Format(time.RFC3339), 415 }, 416 }, 417 } 418 419 err = consumer.HandleEvent(ctx, updateEvent) 420 if err != nil { 421 t.Fatalf("Failed to update comment: %v", err) 422 } 423 424 // Verify content updated 425 comment, err := commentRepo.GetByURI(ctx, uri) 426 if err != nil { 427 t.Fatalf("Failed to get updated comment: %v", err) 428 } 429 430 if comment.Content != "EDITED: Updated comment content" { 431 t.Errorf("Expected updated content, got %s", comment.Content) 432 } 433 434 // Verify CID updated 435 if comment.CID != "bafyupdated" { 436 t.Errorf("Expected CID to be updated to bafyupdated, got %s", comment.CID) 437 } 438 439 // Verify vote counts preserved 440 if comment.UpvoteCount != 5 { 441 t.Errorf("Expected upvote_count preserved at 5, got %d", comment.UpvoteCount) 442 } 443 444 if comment.DownvoteCount != 2 { 445 t.Errorf("Expected downvote_count preserved at 2, got %d", comment.DownvoteCount) 446 } 447 448 if comment.Score != 3 { 449 t.Errorf("Expected score preserved at 3, got %d", comment.Score) 450 } 451 }) 452} 453 454func TestCommentConsumer_DeleteComment(t *testing.T) { 455 db := setupTestDB(t) 456 defer func() { 457 if err := db.Close(); err != nil { 458 t.Logf("Failed to close database: %v", err) 459 } 460 }() 461 462 ctx := context.Background() 463 commentRepo := postgres.NewCommentRepository(db) 464 consumer := jetstream.NewCommentEventConsumer(commentRepo, db) 465 466 // Setup test data 467 testUser := createTestUser(t, db, "deleter.test", "did:plc:deleter123") 468 testCommunity, err := createFeedTestCommunity(db, ctx, "deletecommunity", "owner4.test") 469 if err != nil { 470 t.Fatalf("Failed to create test community: %v", err) 471 } 472 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Delete Test", 0, time.Now()) 473 474 t.Run("Delete comment decrements parent count", func(t *testing.T) { 475 rkey := generateTID() 476 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey) 477 478 // Create comment 479 createEvent := &jetstream.JetstreamEvent{ 480 Did: testUser.DID, 481 Kind: "commit", 482 Commit: &jetstream.CommitEvent{ 483 Operation: "create", 484 Collection: "social.coves.feed.comment", 485 RKey: rkey, 486 CID: "bafydelete", 487 Record: map[string]interface{}{ 488 "content": "Comment to be deleted", 489 "reply": map[string]interface{}{ 490 "root": map[string]interface{}{ 491 "uri": testPostURI, 492 "cid": "bafypost", 493 }, 494 "parent": map[string]interface{}{ 495 "uri": testPostURI, 496 "cid": "bafypost", 497 }, 498 }, 499 "createdAt": time.Now().Format(time.RFC3339), 500 }, 501 }, 502 } 503 504 err := consumer.HandleEvent(ctx, createEvent) 505 if err != nil { 506 t.Fatalf("Failed to create comment: %v", err) 507 } 508 509 // Get initial post comment count 510 var initialCount int 511 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&initialCount) 512 if err != nil { 513 t.Fatalf("Failed to get initial comment count: %v", err) 514 } 515 516 // Delete comment 517 deleteEvent := &jetstream.JetstreamEvent{ 518 Did: testUser.DID, 519 Kind: "commit", 520 Commit: &jetstream.CommitEvent{ 521 Operation: "delete", 522 Collection: "social.coves.feed.comment", 523 RKey: rkey, 524 }, 525 } 526 527 err = consumer.HandleEvent(ctx, deleteEvent) 528 if err != nil { 529 t.Fatalf("Failed to delete comment: %v", err) 530 } 531 532 // Verify soft delete 533 comment, err := commentRepo.GetByURI(ctx, uri) 534 if err != nil { 535 t.Fatalf("Failed to get deleted comment: %v", err) 536 } 537 538 if comment.DeletedAt == nil { 539 t.Error("Expected deleted_at to be set, got nil") 540 } 541 542 // Verify post comment count decremented 543 var finalCount int 544 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&finalCount) 545 if err != nil { 546 t.Fatalf("Failed to get final comment count: %v", err) 547 } 548 549 if finalCount != initialCount-1 { 550 t.Errorf("Expected comment count to decrease by 1. Initial: %d, Final: %d", initialCount, finalCount) 551 } 552 }) 553 554 t.Run("Delete is idempotent", func(t *testing.T) { 555 rkey := generateTID() 556 557 // Create comment 558 createEvent := &jetstream.JetstreamEvent{ 559 Did: testUser.DID, 560 Kind: "commit", 561 Commit: &jetstream.CommitEvent{ 562 Operation: "create", 563 Collection: "social.coves.feed.comment", 564 RKey: rkey, 565 CID: "bafyidempdelete", 566 Record: map[string]interface{}{ 567 "content": "Idempotent delete test", 568 "reply": map[string]interface{}{ 569 "root": map[string]interface{}{ 570 "uri": testPostURI, 571 "cid": "bafypost", 572 }, 573 "parent": map[string]interface{}{ 574 "uri": testPostURI, 575 "cid": "bafypost", 576 }, 577 }, 578 "createdAt": time.Now().Format(time.RFC3339), 579 }, 580 }, 581 } 582 583 err := consumer.HandleEvent(ctx, createEvent) 584 if err != nil { 585 t.Fatalf("Failed to create comment: %v", err) 586 } 587 588 // First delete 589 deleteEvent := &jetstream.JetstreamEvent{ 590 Did: testUser.DID, 591 Kind: "commit", 592 Commit: &jetstream.CommitEvent{ 593 Operation: "delete", 594 Collection: "social.coves.feed.comment", 595 RKey: rkey, 596 }, 597 } 598 599 err = consumer.HandleEvent(ctx, deleteEvent) 600 if err != nil { 601 t.Fatalf("First delete failed: %v", err) 602 } 603 604 // Get count after first delete 605 var countAfterFirstDelete int 606 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&countAfterFirstDelete) 607 if err != nil { 608 t.Fatalf("Failed to get count after first delete: %v", err) 609 } 610 611 // Second delete (idempotent) 612 err = consumer.HandleEvent(ctx, deleteEvent) 613 if err != nil { 614 t.Fatalf("Second delete should be idempotent: %v", err) 615 } 616 617 // Verify count didn't change 618 var countAfterSecondDelete int 619 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&countAfterSecondDelete) 620 if err != nil { 621 t.Fatalf("Failed to get count after second delete: %v", err) 622 } 623 624 if countAfterSecondDelete != countAfterFirstDelete { 625 t.Errorf("Count should not change on duplicate delete. After first: %d, After second: %d", countAfterFirstDelete, countAfterSecondDelete) 626 } 627 }) 628} 629 630func TestCommentConsumer_SecurityValidation(t *testing.T) { 631 db := setupTestDB(t) 632 defer func() { 633 if err := db.Close(); err != nil { 634 t.Logf("Failed to close database: %v", err) 635 } 636 }() 637 638 ctx := context.Background() 639 commentRepo := postgres.NewCommentRepository(db) 640 consumer := jetstream.NewCommentEventConsumer(commentRepo, db) 641 642 testUser := createTestUser(t, db, "security.test", "did:plc:security123") 643 testCommunity, err := createFeedTestCommunity(db, ctx, "seccommunity", "owner5.test") 644 if err != nil { 645 t.Fatalf("Failed to create test community: %v", err) 646 } 647 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Security Test", 0, time.Now()) 648 649 t.Run("Reject comment with empty content", func(t *testing.T) { 650 event := &jetstream.JetstreamEvent{ 651 Did: testUser.DID, 652 Kind: "commit", 653 Commit: &jetstream.CommitEvent{ 654 Operation: "create", 655 Collection: "social.coves.feed.comment", 656 RKey: generateTID(), 657 CID: "bafyinvalid", 658 Record: map[string]interface{}{ 659 "content": "", // Empty content 660 "reply": map[string]interface{}{ 661 "root": map[string]interface{}{ 662 "uri": testPostURI, 663 "cid": "bafypost", 664 }, 665 "parent": map[string]interface{}{ 666 "uri": testPostURI, 667 "cid": "bafypost", 668 }, 669 }, 670 "createdAt": time.Now().Format(time.RFC3339), 671 }, 672 }, 673 } 674 675 err := consumer.HandleEvent(ctx, event) 676 if err == nil { 677 t.Error("Expected error for empty content, got nil") 678 } 679 }) 680 681 t.Run("Reject comment with invalid root reference", func(t *testing.T) { 682 event := &jetstream.JetstreamEvent{ 683 Did: testUser.DID, 684 Kind: "commit", 685 Commit: &jetstream.CommitEvent{ 686 Operation: "create", 687 Collection: "social.coves.feed.comment", 688 RKey: generateTID(), 689 CID: "bafyinvalid2", 690 Record: map[string]interface{}{ 691 "content": "Valid content", 692 "reply": map[string]interface{}{ 693 "root": map[string]interface{}{ 694 "uri": "", // Missing URI 695 "cid": "bafypost", 696 }, 697 "parent": map[string]interface{}{ 698 "uri": testPostURI, 699 "cid": "bafypost", 700 }, 701 }, 702 "createdAt": time.Now().Format(time.RFC3339), 703 }, 704 }, 705 } 706 707 err := consumer.HandleEvent(ctx, event) 708 if err == nil { 709 t.Error("Expected error for invalid root reference, got nil") 710 } 711 }) 712 713 t.Run("Reject comment with invalid parent reference", func(t *testing.T) { 714 event := &jetstream.JetstreamEvent{ 715 Did: testUser.DID, 716 Kind: "commit", 717 Commit: &jetstream.CommitEvent{ 718 Operation: "create", 719 Collection: "social.coves.feed.comment", 720 RKey: generateTID(), 721 CID: "bafyinvalid3", 722 Record: map[string]interface{}{ 723 "content": "Valid content", 724 "reply": map[string]interface{}{ 725 "root": map[string]interface{}{ 726 "uri": testPostURI, 727 "cid": "bafypost", 728 }, 729 "parent": map[string]interface{}{ 730 "uri": testPostURI, 731 "cid": "", // Missing CID 732 }, 733 }, 734 "createdAt": time.Now().Format(time.RFC3339), 735 }, 736 }, 737 } 738 739 err := consumer.HandleEvent(ctx, event) 740 if err == nil { 741 t.Error("Expected error for invalid parent reference, got nil") 742 } 743 }) 744 745 t.Run("Reject comment with invalid DID format", func(t *testing.T) { 746 event := &jetstream.JetstreamEvent{ 747 Did: "invalid-did-format", // Bad DID 748 Kind: "commit", 749 Commit: &jetstream.CommitEvent{ 750 Operation: "create", 751 Collection: "social.coves.feed.comment", 752 RKey: generateTID(), 753 CID: "bafyinvalid4", 754 Record: map[string]interface{}{ 755 "content": "Valid content", 756 "reply": map[string]interface{}{ 757 "root": map[string]interface{}{ 758 "uri": testPostURI, 759 "cid": "bafypost", 760 }, 761 "parent": map[string]interface{}{ 762 "uri": testPostURI, 763 "cid": "bafypost", 764 }, 765 }, 766 "createdAt": time.Now().Format(time.RFC3339), 767 }, 768 }, 769 } 770 771 err := consumer.HandleEvent(ctx, event) 772 if err == nil { 773 t.Error("Expected error for invalid DID format, got nil") 774 } 775 }) 776 777 t.Run("Reject comment exceeding max content length", func(t *testing.T) { 778 event := &jetstream.JetstreamEvent{ 779 Did: testUser.DID, 780 Kind: "commit", 781 Commit: &jetstream.CommitEvent{ 782 Operation: "create", 783 Collection: "social.coves.feed.comment", 784 RKey: generateTID(), 785 CID: "bafytoobig", 786 Record: map[string]interface{}{ 787 "content": string(make([]byte, 30001)), // Exceeds 30000 byte limit 788 "reply": map[string]interface{}{ 789 "root": map[string]interface{}{ 790 "uri": testPostURI, 791 "cid": "bafypost", 792 }, 793 "parent": map[string]interface{}{ 794 "uri": testPostURI, 795 "cid": "bafypost", 796 }, 797 }, 798 "createdAt": time.Now().Format(time.RFC3339), 799 }, 800 }, 801 } 802 803 err := consumer.HandleEvent(ctx, event) 804 if err == nil { 805 t.Error("Expected error for oversized content, got nil") 806 } 807 if err != nil && !contains(err.Error(), "exceeds maximum length") { 808 t.Errorf("Expected 'exceeds maximum length' error, got: %v", err) 809 } 810 }) 811 812 t.Run("Reject comment with malformed parent URI", func(t *testing.T) { 813 event := &jetstream.JetstreamEvent{ 814 Did: testUser.DID, 815 Kind: "commit", 816 Commit: &jetstream.CommitEvent{ 817 Operation: "create", 818 Collection: "social.coves.feed.comment", 819 RKey: generateTID(), 820 CID: "bafymalformed", 821 Record: map[string]interface{}{ 822 "content": "Valid content", 823 "reply": map[string]interface{}{ 824 "root": map[string]interface{}{ 825 "uri": testPostURI, 826 "cid": "bafypost", 827 }, 828 "parent": map[string]interface{}{ 829 "uri": "at://malformed", // Invalid: missing collection/rkey 830 "cid": "bafyparent", 831 }, 832 }, 833 "createdAt": time.Now().Format(time.RFC3339), 834 }, 835 }, 836 } 837 838 err := consumer.HandleEvent(ctx, event) 839 if err == nil { 840 t.Error("Expected error for malformed AT-URI, got nil") 841 } 842 if err != nil && !contains(err.Error(), "invalid parent URI") { 843 t.Errorf("Expected 'invalid parent URI' error, got: %v", err) 844 } 845 }) 846 847 t.Run("Reject comment with malformed root URI", func(t *testing.T) { 848 event := &jetstream.JetstreamEvent{ 849 Did: testUser.DID, 850 Kind: "commit", 851 Commit: &jetstream.CommitEvent{ 852 Operation: "create", 853 Collection: "social.coves.feed.comment", 854 RKey: generateTID(), 855 CID: "bafymalformed2", 856 Record: map[string]interface{}{ 857 "content": "Valid content", 858 "reply": map[string]interface{}{ 859 "root": map[string]interface{}{ 860 "uri": "at://did:plc:test123", // Invalid: missing collection/rkey 861 "cid": "bafyroot", 862 }, 863 "parent": map[string]interface{}{ 864 "uri": testPostURI, 865 "cid": "bafyparent", 866 }, 867 }, 868 "createdAt": time.Now().Format(time.RFC3339), 869 }, 870 }, 871 } 872 873 err := consumer.HandleEvent(ctx, event) 874 if err == nil { 875 t.Error("Expected error for malformed AT-URI, got nil") 876 } 877 if err != nil && !contains(err.Error(), "invalid root URI") { 878 t.Errorf("Expected 'invalid root URI' error, got: %v", err) 879 } 880 }) 881} 882 883func TestCommentRepository_Queries(t *testing.T) { 884 db := setupTestDB(t) 885 defer func() { 886 if err := db.Close(); err != nil { 887 t.Logf("Failed to close database: %v", err) 888 } 889 }() 890 891 ctx := context.Background() 892 commentRepo := postgres.NewCommentRepository(db) 893 894 // Clean up any existing test data from previous runs 895 _, err := db.ExecContext(ctx, "DELETE FROM comments WHERE commenter_did LIKE 'did:plc:%'") 896 if err != nil { 897 t.Fatalf("Failed to clean up test comments: %v", err) 898 } 899 900 testUser := createTestUser(t, db, "query.test", "did:plc:query123") 901 testCommunity, err := createFeedTestCommunity(db, ctx, "querycommunity", "owner6.test") 902 if err != nil { 903 t.Fatalf("Failed to create test community: %v", err) 904 } 905 postURI := createTestPost(t, db, testCommunity, testUser.DID, "Query Test", 0, time.Now()) 906 907 // Create a comment tree 908 // Post 909 // |- Comment 1 910 // |- Comment 2 911 // |- Comment 3 912 // |- Comment 4 913 914 comment1 := &comments.Comment{ 915 URI: fmt.Sprintf("at://%s/social.coves.feed.comment/1", testUser.DID), 916 CID: "bafyc1", 917 RKey: "1", 918 CommenterDID: testUser.DID, 919 RootURI: postURI, 920 RootCID: "bafypost", 921 ParentURI: postURI, 922 ParentCID: "bafypost", 923 Content: "Comment 1", 924 Langs: []string{}, 925 CreatedAt: time.Now(), 926 } 927 928 comment2 := &comments.Comment{ 929 URI: fmt.Sprintf("at://%s/social.coves.feed.comment/2", testUser.DID), 930 CID: "bafyc2", 931 RKey: "2", 932 CommenterDID: testUser.DID, 933 RootURI: postURI, 934 RootCID: "bafypost", 935 ParentURI: comment1.URI, 936 ParentCID: "bafyc1", 937 Content: "Comment 2 (reply to 1)", 938 Langs: []string{}, 939 CreatedAt: time.Now().Add(1 * time.Second), 940 } 941 942 comment3 := &comments.Comment{ 943 URI: fmt.Sprintf("at://%s/social.coves.feed.comment/3", testUser.DID), 944 CID: "bafyc3", 945 RKey: "3", 946 CommenterDID: testUser.DID, 947 RootURI: postURI, 948 RootCID: "bafypost", 949 ParentURI: comment1.URI, 950 ParentCID: "bafyc1", 951 Content: "Comment 3 (reply to 1)", 952 Langs: []string{}, 953 CreatedAt: time.Now().Add(2 * time.Second), 954 } 955 956 comment4 := &comments.Comment{ 957 URI: fmt.Sprintf("at://%s/social.coves.feed.comment/4", testUser.DID), 958 CID: "bafyc4", 959 RKey: "4", 960 CommenterDID: testUser.DID, 961 RootURI: postURI, 962 RootCID: "bafypost", 963 ParentURI: postURI, 964 ParentCID: "bafypost", 965 Content: "Comment 4", 966 Langs: []string{}, 967 CreatedAt: time.Now().Add(3 * time.Second), 968 } 969 970 // Create all comments 971 for i, c := range []*comments.Comment{comment1, comment2, comment3, comment4} { 972 if err := commentRepo.Create(ctx, c); err != nil { 973 t.Fatalf("Failed to create comment %d: %v", i+1, err) 974 } 975 t.Logf("Created comment %d: %s", i+1, c.URI) 976 } 977 978 // Verify comments were created 979 verifyCount, err := commentRepo.CountByParent(ctx, postURI) 980 if err != nil { 981 t.Fatalf("Failed to count comments: %v", err) 982 } 983 t.Logf("Direct replies to post after creation: %d", verifyCount) 984 985 t.Run("ListByRoot returns all comments in thread", func(t *testing.T) { 986 comments, err := commentRepo.ListByRoot(ctx, postURI, 100, 0) 987 if err != nil { 988 t.Fatalf("Failed to list by root: %v", err) 989 } 990 991 if len(comments) != 4 { 992 t.Errorf("Expected 4 comments, got %d", len(comments)) 993 } 994 }) 995 996 t.Run("ListByParent returns direct replies", func(t *testing.T) { 997 // Direct replies to post 998 postReplies, err := commentRepo.ListByParent(ctx, postURI, 100, 0) 999 if err != nil { 1000 t.Fatalf("Failed to list post replies: %v", err) 1001 } 1002 1003 if len(postReplies) != 2 { 1004 t.Errorf("Expected 2 direct replies to post, got %d", len(postReplies)) 1005 } 1006 1007 // Direct replies to comment1 1008 comment1Replies, err := commentRepo.ListByParent(ctx, comment1.URI, 100, 0) 1009 if err != nil { 1010 t.Fatalf("Failed to list comment1 replies: %v", err) 1011 } 1012 1013 if len(comment1Replies) != 2 { 1014 t.Errorf("Expected 2 direct replies to comment1, got %d", len(comment1Replies)) 1015 } 1016 }) 1017 1018 t.Run("CountByParent returns correct counts", func(t *testing.T) { 1019 postCount, err := commentRepo.CountByParent(ctx, postURI) 1020 if err != nil { 1021 t.Fatalf("Failed to count post replies: %v", err) 1022 } 1023 1024 if postCount != 2 { 1025 t.Errorf("Expected 2 direct replies to post, got %d", postCount) 1026 } 1027 1028 comment1Count, err := commentRepo.CountByParent(ctx, comment1.URI) 1029 if err != nil { 1030 t.Fatalf("Failed to count comment1 replies: %v", err) 1031 } 1032 1033 if comment1Count != 2 { 1034 t.Errorf("Expected 2 direct replies to comment1, got %d", comment1Count) 1035 } 1036 }) 1037 1038 t.Run("ListByCommenter returns user's comments", func(t *testing.T) { 1039 userComments, err := commentRepo.ListByCommenter(ctx, testUser.DID, 100, 0) 1040 if err != nil { 1041 t.Fatalf("Failed to list by commenter: %v", err) 1042 } 1043 1044 if len(userComments) != 4 { 1045 t.Errorf("Expected 4 comments by user, got %d", len(userComments)) 1046 } 1047 }) 1048} 1049 1050// TestCommentConsumer_OutOfOrderReconciliation tests that parent counts are 1051// correctly reconciled when child comments arrive before their parent 1052func TestCommentConsumer_OutOfOrderReconciliation(t *testing.T) { 1053 db := setupTestDB(t) 1054 defer func() { 1055 if err := db.Close(); err != nil { 1056 t.Logf("Failed to close database: %v", err) 1057 } 1058 }() 1059 1060 ctx := context.Background() 1061 commentRepo := postgres.NewCommentRepository(db) 1062 consumer := jetstream.NewCommentEventConsumer(commentRepo, db) 1063 1064 testUser := createTestUser(t, db, "outoforder.test", "did:plc:outoforder123") 1065 testCommunity, err := createFeedTestCommunity(db, ctx, "ooo-community", "owner7.test") 1066 if err != nil { 1067 t.Fatalf("Failed to create test community: %v", err) 1068 } 1069 postURI := createTestPost(t, db, testCommunity, testUser.DID, "OOO Test Post", 0, time.Now()) 1070 1071 t.Run("Child arrives before parent - count reconciled", func(t *testing.T) { 1072 // Scenario: User A creates comment C1 on post 1073 // User B creates reply C2 to C1 1074 // Jetstream delivers C2 before C1 (different repos) 1075 // When C1 finally arrives, its reply_count should be 1, not 0 1076 1077 parentRkey := generateTID() 1078 parentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, parentRkey) 1079 1080 childRkey := generateTID() 1081 childURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, childRkey) 1082 1083 // Step 1: Index child FIRST (before parent exists) 1084 childEvent := &jetstream.JetstreamEvent{ 1085 Did: testUser.DID, 1086 Kind: "commit", 1087 Commit: &jetstream.CommitEvent{ 1088 Rev: "child-rev", 1089 Operation: "create", 1090 Collection: "social.coves.feed.comment", 1091 RKey: childRkey, 1092 CID: "bafychild", 1093 Record: map[string]interface{}{ 1094 "$type": "social.coves.feed.comment", 1095 "content": "This is a reply to a comment that doesn't exist yet!", 1096 "reply": map[string]interface{}{ 1097 "root": map[string]interface{}{ 1098 "uri": postURI, 1099 "cid": "bafypost", 1100 }, 1101 "parent": map[string]interface{}{ 1102 "uri": parentURI, // Points to parent that doesn't exist yet 1103 "cid": "bafyparent", 1104 }, 1105 }, 1106 "createdAt": time.Now().Format(time.RFC3339), 1107 }, 1108 }, 1109 } 1110 1111 err := consumer.HandleEvent(ctx, childEvent) 1112 if err != nil { 1113 t.Fatalf("Failed to handle child event: %v", err) 1114 } 1115 1116 // Verify child was indexed 1117 childComment, err := commentRepo.GetByURI(ctx, childURI) 1118 if err != nil { 1119 t.Fatalf("Child comment not indexed: %v", err) 1120 } 1121 if childComment.ParentURI != parentURI { 1122 t.Errorf("Expected child parent_uri %s, got %s", parentURI, childComment.ParentURI) 1123 } 1124 1125 // Step 2: Now index parent (arrives late due to Jetstream ordering) 1126 parentEvent := &jetstream.JetstreamEvent{ 1127 Did: testUser.DID, 1128 Kind: "commit", 1129 Commit: &jetstream.CommitEvent{ 1130 Rev: "parent-rev", 1131 Operation: "create", 1132 Collection: "social.coves.feed.comment", 1133 RKey: parentRkey, 1134 CID: "bafyparent", 1135 Record: map[string]interface{}{ 1136 "$type": "social.coves.feed.comment", 1137 "content": "This is the parent comment arriving late", 1138 "reply": map[string]interface{}{ 1139 "root": map[string]interface{}{ 1140 "uri": postURI, 1141 "cid": "bafypost", 1142 }, 1143 "parent": map[string]interface{}{ 1144 "uri": postURI, 1145 "cid": "bafypost", 1146 }, 1147 }, 1148 "createdAt": time.Now().Format(time.RFC3339), 1149 }, 1150 }, 1151 } 1152 1153 err = consumer.HandleEvent(ctx, parentEvent) 1154 if err != nil { 1155 t.Fatalf("Failed to handle parent event: %v", err) 1156 } 1157 1158 // Step 3: Verify parent was indexed with CORRECT reply_count 1159 parentComment, err := commentRepo.GetByURI(ctx, parentURI) 1160 if err != nil { 1161 t.Fatalf("Parent comment not indexed: %v", err) 1162 } 1163 1164 // THIS IS THE KEY TEST: Parent should have reply_count = 1 due to reconciliation 1165 if parentComment.ReplyCount != 1 { 1166 t.Errorf("Expected parent reply_count to be 1 (reconciled), got %d", parentComment.ReplyCount) 1167 t.Logf("This indicates out-of-order reconciliation failed!") 1168 } 1169 1170 // Verify via query as well 1171 count, err := commentRepo.CountByParent(ctx, parentURI) 1172 if err != nil { 1173 t.Fatalf("Failed to count parent replies: %v", err) 1174 } 1175 if count != 1 { 1176 t.Errorf("Expected 1 reply to parent, got %d", count) 1177 } 1178 }) 1179 1180 t.Run("Multiple children arrive before parent", func(t *testing.T) { 1181 parentRkey := generateTID() 1182 parentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, parentRkey) 1183 1184 // Index 3 children before parent 1185 for i := 1; i <= 3; i++ { 1186 childRkey := generateTID() 1187 childEvent := &jetstream.JetstreamEvent{ 1188 Did: testUser.DID, 1189 Kind: "commit", 1190 Commit: &jetstream.CommitEvent{ 1191 Rev: fmt.Sprintf("child-%d-rev", i), 1192 Operation: "create", 1193 Collection: "social.coves.feed.comment", 1194 RKey: childRkey, 1195 CID: fmt.Sprintf("bafychild%d", i), 1196 Record: map[string]interface{}{ 1197 "$type": "social.coves.feed.comment", 1198 "content": fmt.Sprintf("Reply %d before parent", i), 1199 "reply": map[string]interface{}{ 1200 "root": map[string]interface{}{ 1201 "uri": postURI, 1202 "cid": "bafypost", 1203 }, 1204 "parent": map[string]interface{}{ 1205 "uri": parentURI, 1206 "cid": "bafyparent2", 1207 }, 1208 }, 1209 "createdAt": time.Now().Format(time.RFC3339), 1210 }, 1211 }, 1212 } 1213 1214 err := consumer.HandleEvent(ctx, childEvent) 1215 if err != nil { 1216 t.Fatalf("Failed to handle child %d event: %v", i, err) 1217 } 1218 } 1219 1220 // Now index parent 1221 parentEvent := &jetstream.JetstreamEvent{ 1222 Did: testUser.DID, 1223 Kind: "commit", 1224 Commit: &jetstream.CommitEvent{ 1225 Rev: "parent2-rev", 1226 Operation: "create", 1227 Collection: "social.coves.feed.comment", 1228 RKey: parentRkey, 1229 CID: "bafyparent2", 1230 Record: map[string]interface{}{ 1231 "$type": "social.coves.feed.comment", 1232 "content": "Parent with 3 pre-existing children", 1233 "reply": map[string]interface{}{ 1234 "root": map[string]interface{}{ 1235 "uri": postURI, 1236 "cid": "bafypost", 1237 }, 1238 "parent": map[string]interface{}{ 1239 "uri": postURI, 1240 "cid": "bafypost", 1241 }, 1242 }, 1243 "createdAt": time.Now().Format(time.RFC3339), 1244 }, 1245 }, 1246 } 1247 1248 err := consumer.HandleEvent(ctx, parentEvent) 1249 if err != nil { 1250 t.Fatalf("Failed to handle parent event: %v", err) 1251 } 1252 1253 // Verify parent has reply_count = 3 1254 parentComment, err := commentRepo.GetByURI(ctx, parentURI) 1255 if err != nil { 1256 t.Fatalf("Parent comment not indexed: %v", err) 1257 } 1258 1259 if parentComment.ReplyCount != 3 { 1260 t.Errorf("Expected parent reply_count to be 3 (reconciled), got %d", parentComment.ReplyCount) 1261 } 1262 }) 1263} 1264 1265// TestCommentConsumer_Resurrection tests that soft-deleted comments can be recreated 1266// In atProto, deleted records' rkeys become available for reuse 1267func TestCommentConsumer_Resurrection(t *testing.T) { 1268 db := setupTestDB(t) 1269 defer func() { 1270 if err := db.Close(); err != nil { 1271 t.Logf("Failed to close database: %v", err) 1272 } 1273 }() 1274 1275 ctx := context.Background() 1276 commentRepo := postgres.NewCommentRepository(db) 1277 consumer := jetstream.NewCommentEventConsumer(commentRepo, db) 1278 1279 testUser := createTestUser(t, db, "resurrect.test", "did:plc:resurrect123") 1280 testCommunity, err := createFeedTestCommunity(db, ctx, "resurrect-community", "owner8.test") 1281 if err != nil { 1282 t.Fatalf("Failed to create test community: %v", err) 1283 } 1284 postURI := createTestPost(t, db, testCommunity, testUser.DID, "Resurrection Test", 0, time.Now()) 1285 1286 rkey := generateTID() 1287 commentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey) 1288 1289 t.Run("Recreate deleted comment with same rkey", func(t *testing.T) { 1290 // Step 1: Create initial comment 1291 createEvent := &jetstream.JetstreamEvent{ 1292 Did: testUser.DID, 1293 Kind: "commit", 1294 Commit: &jetstream.CommitEvent{ 1295 Rev: "v1", 1296 Operation: "create", 1297 Collection: "social.coves.feed.comment", 1298 RKey: rkey, 1299 CID: "bafyoriginal", 1300 Record: map[string]interface{}{ 1301 "$type": "social.coves.feed.comment", 1302 "content": "Original comment content", 1303 "reply": map[string]interface{}{ 1304 "root": map[string]interface{}{ 1305 "uri": postURI, 1306 "cid": "bafypost", 1307 }, 1308 "parent": map[string]interface{}{ 1309 "uri": postURI, 1310 "cid": "bafypost", 1311 }, 1312 }, 1313 "createdAt": time.Now().Format(time.RFC3339), 1314 }, 1315 }, 1316 } 1317 1318 err := consumer.HandleEvent(ctx, createEvent) 1319 if err != nil { 1320 t.Fatalf("Failed to create initial comment: %v", err) 1321 } 1322 1323 // Verify comment exists 1324 comment, err := commentRepo.GetByURI(ctx, commentURI) 1325 if err != nil { 1326 t.Fatalf("Comment not found after creation: %v", err) 1327 } 1328 if comment.Content != "Original comment content" { 1329 t.Errorf("Expected content 'Original comment content', got '%s'", comment.Content) 1330 } 1331 if comment.DeletedAt != nil { 1332 t.Errorf("Expected deleted_at to be nil, got %v", comment.DeletedAt) 1333 } 1334 1335 // Step 2: Delete the comment 1336 deleteEvent := &jetstream.JetstreamEvent{ 1337 Did: testUser.DID, 1338 Kind: "commit", 1339 Commit: &jetstream.CommitEvent{ 1340 Rev: "v2", 1341 Operation: "delete", 1342 Collection: "social.coves.feed.comment", 1343 RKey: rkey, 1344 }, 1345 } 1346 1347 err = consumer.HandleEvent(ctx, deleteEvent) 1348 if err != nil { 1349 t.Fatalf("Failed to delete comment: %v", err) 1350 } 1351 1352 // Verify comment is soft-deleted 1353 comment, err = commentRepo.GetByURI(ctx, commentURI) 1354 if err != nil { 1355 t.Fatalf("Comment not found after deletion: %v", err) 1356 } 1357 if comment.DeletedAt == nil { 1358 t.Error("Expected deleted_at to be set, got nil") 1359 } 1360 1361 // Step 3: Recreate comment with same rkey (resurrection) 1362 // In atProto, this is a valid operation - user can reuse the rkey 1363 recreateEvent := &jetstream.JetstreamEvent{ 1364 Did: testUser.DID, 1365 Kind: "commit", 1366 Commit: &jetstream.CommitEvent{ 1367 Rev: "v3", 1368 Operation: "create", 1369 Collection: "social.coves.feed.comment", 1370 RKey: rkey, // Same rkey! 1371 CID: "bafyresurrected", 1372 Record: map[string]interface{}{ 1373 "$type": "social.coves.feed.comment", 1374 "content": "Resurrected comment with new content", 1375 "reply": map[string]interface{}{ 1376 "root": map[string]interface{}{ 1377 "uri": postURI, 1378 "cid": "bafypost", 1379 }, 1380 "parent": map[string]interface{}{ 1381 "uri": postURI, 1382 "cid": "bafypost", 1383 }, 1384 }, 1385 "createdAt": time.Now().Format(time.RFC3339), 1386 }, 1387 }, 1388 } 1389 1390 err = consumer.HandleEvent(ctx, recreateEvent) 1391 if err != nil { 1392 t.Fatalf("Failed to resurrect comment: %v", err) 1393 } 1394 1395 // Step 4: Verify comment is resurrected with new content 1396 comment, err = commentRepo.GetByURI(ctx, commentURI) 1397 if err != nil { 1398 t.Fatalf("Comment not found after resurrection: %v", err) 1399 } 1400 1401 if comment.DeletedAt != nil { 1402 t.Errorf("Expected deleted_at to be NULL after resurrection, got %v", comment.DeletedAt) 1403 } 1404 if comment.Content != "Resurrected comment with new content" { 1405 t.Errorf("Expected resurrected content, got '%s'", comment.Content) 1406 } 1407 if comment.CID != "bafyresurrected" { 1408 t.Errorf("Expected CID 'bafyresurrected', got '%s'", comment.CID) 1409 } 1410 1411 // Verify parent count was restored (post should have comment_count = 1) 1412 var postCommentCount int 1413 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", postURI).Scan(&postCommentCount) 1414 if err != nil { 1415 t.Fatalf("Failed to check post comment count: %v", err) 1416 } 1417 if postCommentCount != 1 { 1418 t.Errorf("Expected post comment_count to be 1 after resurrection, got %d", postCommentCount) 1419 } 1420 }) 1421 1422 t.Run("Recreate deleted comment with DIFFERENT parent", func(t *testing.T) { 1423 // Create two posts 1424 post1URI := createTestPost(t, db, testCommunity, testUser.DID, "Post 1", 0, time.Now()) 1425 post2URI := createTestPost(t, db, testCommunity, testUser.DID, "Post 2", 0, time.Now()) 1426 1427 rkey2 := generateTID() 1428 commentURI2 := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey2) 1429 1430 // Step 1: Create comment on Post 1 1431 createEvent := &jetstream.JetstreamEvent{ 1432 Did: testUser.DID, 1433 Kind: "commit", 1434 Commit: &jetstream.CommitEvent{ 1435 Rev: "v1", 1436 Operation: "create", 1437 Collection: "social.coves.feed.comment", 1438 RKey: rkey2, 1439 CID: "bafyv1", 1440 Record: map[string]interface{}{ 1441 "$type": "social.coves.feed.comment", 1442 "content": "Original on Post 1", 1443 "reply": map[string]interface{}{ 1444 "root": map[string]interface{}{ 1445 "uri": post1URI, 1446 "cid": "bafypost1", 1447 }, 1448 "parent": map[string]interface{}{ 1449 "uri": post1URI, 1450 "cid": "bafypost1", 1451 }, 1452 }, 1453 "createdAt": time.Now().Format(time.RFC3339), 1454 }, 1455 }, 1456 } 1457 1458 err := consumer.HandleEvent(ctx, createEvent) 1459 if err != nil { 1460 t.Fatalf("Failed to create comment on Post 1: %v", err) 1461 } 1462 1463 // Verify Post 1 has comment_count = 1 1464 var post1Count int 1465 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post1URI).Scan(&post1Count) 1466 if err != nil { 1467 t.Fatalf("Failed to check post 1 count: %v", err) 1468 } 1469 if post1Count != 1 { 1470 t.Errorf("Expected Post 1 comment_count = 1, got %d", post1Count) 1471 } 1472 1473 // Step 2: Delete comment 1474 deleteEvent := &jetstream.JetstreamEvent{ 1475 Did: testUser.DID, 1476 Kind: "commit", 1477 Commit: &jetstream.CommitEvent{ 1478 Rev: "v2", 1479 Operation: "delete", 1480 Collection: "social.coves.feed.comment", 1481 RKey: rkey2, 1482 }, 1483 } 1484 1485 err = consumer.HandleEvent(ctx, deleteEvent) 1486 if err != nil { 1487 t.Fatalf("Failed to delete comment: %v", err) 1488 } 1489 1490 // Verify Post 1 count decremented to 0 1491 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post1URI).Scan(&post1Count) 1492 if err != nil { 1493 t.Fatalf("Failed to check post 1 count after delete: %v", err) 1494 } 1495 if post1Count != 0 { 1496 t.Errorf("Expected Post 1 comment_count = 0 after delete, got %d", post1Count) 1497 } 1498 1499 // Step 3: Recreate comment with same rkey but on Post 2 (different parent!) 1500 recreateEvent := &jetstream.JetstreamEvent{ 1501 Did: testUser.DID, 1502 Kind: "commit", 1503 Commit: &jetstream.CommitEvent{ 1504 Rev: "v3", 1505 Operation: "create", 1506 Collection: "social.coves.feed.comment", 1507 RKey: rkey2, // Same rkey! 1508 CID: "bafyv3", 1509 Record: map[string]interface{}{ 1510 "$type": "social.coves.feed.comment", 1511 "content": "New comment on Post 2", 1512 "reply": map[string]interface{}{ 1513 "root": map[string]interface{}{ 1514 "uri": post2URI, // Different root! 1515 "cid": "bafypost2", 1516 }, 1517 "parent": map[string]interface{}{ 1518 "uri": post2URI, // Different parent! 1519 "cid": "bafypost2", 1520 }, 1521 }, 1522 "createdAt": time.Now().Format(time.RFC3339), 1523 }, 1524 }, 1525 } 1526 1527 err = consumer.HandleEvent(ctx, recreateEvent) 1528 if err != nil { 1529 t.Fatalf("Failed to resurrect comment on Post 2: %v", err) 1530 } 1531 1532 // Step 4: Verify threading references updated correctly 1533 comment, err := commentRepo.GetByURI(ctx, commentURI2) 1534 if err != nil { 1535 t.Fatalf("Failed to get resurrected comment: %v", err) 1536 } 1537 1538 // THIS IS THE CRITICAL TEST: Threading refs must point to Post 2, not Post 1 1539 if comment.ParentURI != post2URI { 1540 t.Errorf("Expected parent URI to be %s (Post 2), got %s (STALE!)", post2URI, comment.ParentURI) 1541 } 1542 if comment.RootURI != post2URI { 1543 t.Errorf("Expected root URI to be %s (Post 2), got %s (STALE!)", post2URI, comment.RootURI) 1544 } 1545 if comment.ParentCID != "bafypost2" { 1546 t.Errorf("Expected parent CID 'bafypost2', got '%s'", comment.ParentCID) 1547 } 1548 1549 // Verify counts are correct 1550 var post2Count int 1551 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post2URI).Scan(&post2Count) 1552 if err != nil { 1553 t.Fatalf("Failed to check post 2 count: %v", err) 1554 } 1555 if post2Count != 1 { 1556 t.Errorf("Expected Post 2 comment_count = 1, got %d", post2Count) 1557 } 1558 1559 // Verify Post 1 count still 0 (not incremented by resurrection on Post 2) 1560 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post1URI).Scan(&post1Count) 1561 if err != nil { 1562 t.Fatalf("Failed to check post 1 count after resurrection: %v", err) 1563 } 1564 if post1Count != 0 { 1565 t.Errorf("Expected Post 1 comment_count = 0 (unchanged), got %d", post1Count) 1566 } 1567 }) 1568} 1569 1570// TestCommentConsumer_ThreadingImmutability tests that UPDATE events cannot change threading refs 1571func TestCommentConsumer_ThreadingImmutability(t *testing.T) { 1572 db := setupTestDB(t) 1573 defer func() { 1574 if err := db.Close(); err != nil { 1575 t.Logf("Failed to close database: %v", err) 1576 } 1577 }() 1578 1579 ctx := context.Background() 1580 commentRepo := postgres.NewCommentRepository(db) 1581 consumer := jetstream.NewCommentEventConsumer(commentRepo, db) 1582 1583 testUser := createTestUser(t, db, "immutable.test", "did:plc:immutable123") 1584 testCommunity, err := createFeedTestCommunity(db, ctx, "immutable-community", "owner9.test") 1585 if err != nil { 1586 t.Fatalf("Failed to create test community: %v", err) 1587 } 1588 postURI1 := createTestPost(t, db, testCommunity, testUser.DID, "Post 1", 0, time.Now()) 1589 postURI2 := createTestPost(t, db, testCommunity, testUser.DID, "Post 2", 0, time.Now()) 1590 1591 rkey := generateTID() 1592 commentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey) 1593 1594 t.Run("Reject UPDATE that changes parent URI", func(t *testing.T) { 1595 // Create comment on Post 1 1596 createEvent := &jetstream.JetstreamEvent{ 1597 Did: testUser.DID, 1598 Kind: "commit", 1599 Commit: &jetstream.CommitEvent{ 1600 Rev: "v1", 1601 Operation: "create", 1602 Collection: "social.coves.feed.comment", 1603 RKey: rkey, 1604 CID: "bafycomment1", 1605 Record: map[string]interface{}{ 1606 "$type": "social.coves.feed.comment", 1607 "content": "Comment on Post 1", 1608 "reply": map[string]interface{}{ 1609 "root": map[string]interface{}{ 1610 "uri": postURI1, 1611 "cid": "bafypost1", 1612 }, 1613 "parent": map[string]interface{}{ 1614 "uri": postURI1, 1615 "cid": "bafypost1", 1616 }, 1617 }, 1618 "createdAt": time.Now().Format(time.RFC3339), 1619 }, 1620 }, 1621 } 1622 1623 err := consumer.HandleEvent(ctx, createEvent) 1624 if err != nil { 1625 t.Fatalf("Failed to create comment: %v", err) 1626 } 1627 1628 // Attempt to update comment to move it to Post 2 (should fail) 1629 updateEvent := &jetstream.JetstreamEvent{ 1630 Did: testUser.DID, 1631 Kind: "commit", 1632 Commit: &jetstream.CommitEvent{ 1633 Rev: "v2", 1634 Operation: "update", 1635 Collection: "social.coves.feed.comment", 1636 RKey: rkey, 1637 CID: "bafycomment2", 1638 Record: map[string]interface{}{ 1639 "$type": "social.coves.feed.comment", 1640 "content": "Trying to hijack this comment to Post 2", 1641 "reply": map[string]interface{}{ 1642 "root": map[string]interface{}{ 1643 "uri": postURI2, // Changed! 1644 "cid": "bafypost2", 1645 }, 1646 "parent": map[string]interface{}{ 1647 "uri": postURI2, // Changed! 1648 "cid": "bafypost2", 1649 }, 1650 }, 1651 "createdAt": time.Now().Format(time.RFC3339), 1652 }, 1653 }, 1654 } 1655 1656 err = consumer.HandleEvent(ctx, updateEvent) 1657 if err == nil { 1658 t.Error("Expected error when changing threading references, got nil") 1659 } 1660 if err != nil && !contains(err.Error(), "threading references cannot be changed") { 1661 t.Errorf("Expected 'threading references cannot be changed' error, got: %v", err) 1662 } 1663 1664 // Verify comment still points to Post 1 1665 comment, err := commentRepo.GetByURI(ctx, commentURI) 1666 if err != nil { 1667 t.Fatalf("Failed to get comment: %v", err) 1668 } 1669 if comment.ParentURI != postURI1 { 1670 t.Errorf("Expected parent URI to remain %s, got %s", postURI1, comment.ParentURI) 1671 } 1672 if comment.RootURI != postURI1 { 1673 t.Errorf("Expected root URI to remain %s, got %s", postURI1, comment.RootURI) 1674 } 1675 // Content should NOT have been updated since the operation was rejected 1676 if comment.Content != "Comment on Post 1" { 1677 t.Errorf("Expected original content, got '%s'", comment.Content) 1678 } 1679 }) 1680 1681 t.Run("Allow UPDATE that only changes content (threading unchanged)", func(t *testing.T) { 1682 rkey2 := generateTID() 1683 commentURI2 := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey2) 1684 1685 // Create comment 1686 createEvent := &jetstream.JetstreamEvent{ 1687 Did: testUser.DID, 1688 Kind: "commit", 1689 Commit: &jetstream.CommitEvent{ 1690 Rev: "v1", 1691 Operation: "create", 1692 Collection: "social.coves.feed.comment", 1693 RKey: rkey2, 1694 CID: "bafycomment3", 1695 Record: map[string]interface{}{ 1696 "$type": "social.coves.feed.comment", 1697 "content": "Original content", 1698 "reply": map[string]interface{}{ 1699 "root": map[string]interface{}{ 1700 "uri": postURI1, 1701 "cid": "bafypost1", 1702 }, 1703 "parent": map[string]interface{}{ 1704 "uri": postURI1, 1705 "cid": "bafypost1", 1706 }, 1707 }, 1708 "createdAt": time.Now().Format(time.RFC3339), 1709 }, 1710 }, 1711 } 1712 1713 err := consumer.HandleEvent(ctx, createEvent) 1714 if err != nil { 1715 t.Fatalf("Failed to create comment: %v", err) 1716 } 1717 1718 // Update content only (threading unchanged - should succeed) 1719 updateEvent := &jetstream.JetstreamEvent{ 1720 Did: testUser.DID, 1721 Kind: "commit", 1722 Commit: &jetstream.CommitEvent{ 1723 Rev: "v2", 1724 Operation: "update", 1725 Collection: "social.coves.feed.comment", 1726 RKey: rkey2, 1727 CID: "bafycomment4", 1728 Record: map[string]interface{}{ 1729 "$type": "social.coves.feed.comment", 1730 "content": "Updated content", 1731 "reply": map[string]interface{}{ 1732 "root": map[string]interface{}{ 1733 "uri": postURI1, // Same 1734 "cid": "bafypost1", 1735 }, 1736 "parent": map[string]interface{}{ 1737 "uri": postURI1, // Same 1738 "cid": "bafypost1", 1739 }, 1740 }, 1741 "createdAt": time.Now().Format(time.RFC3339), 1742 }, 1743 }, 1744 } 1745 1746 err = consumer.HandleEvent(ctx, updateEvent) 1747 if err != nil { 1748 t.Fatalf("Expected update to succeed when threading unchanged, got error: %v", err) 1749 } 1750 1751 // Verify content was updated 1752 comment, err := commentRepo.GetByURI(ctx, commentURI2) 1753 if err != nil { 1754 t.Fatalf("Failed to get comment: %v", err) 1755 } 1756 if comment.Content != "Updated content" { 1757 t.Errorf("Expected updated content, got '%s'", comment.Content) 1758 } 1759 // Threading should remain unchanged 1760 if comment.ParentURI != postURI1 { 1761 t.Errorf("Expected parent URI %s, got %s", postURI1, comment.ParentURI) 1762 } 1763 }) 1764}