A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/atproto/jetstream" 5 "Coves/internal/core/comments" 6 "Coves/internal/db/postgres" 7 "context" 8 "fmt" 9 "testing" 10 "time" 11) 12 13func TestCommentConsumer_CreateComment(t *testing.T) { 14 db := setupTestDB(t) 15 defer func() { 16 if err := db.Close(); err != nil { 17 t.Logf("Failed to close database: %v", err) 18 } 19 }() 20 21 ctx := context.Background() 22 commentRepo := postgres.NewCommentRepository(db) 23 consumer := jetstream.NewCommentEventConsumer(commentRepo, db) 24 25 // Setup test data 26 testUser := createTestUser(t, db, "commenter.test", "did:plc:commenter123") 27 testCommunity, err := createFeedTestCommunity(db, ctx, "testcommunity", "owner.test") 28 if err != nil { 29 t.Fatalf("Failed to create test community: %v", err) 30 } 31 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Test Post", 0, time.Now()) 32 33 t.Run("Create comment on post", func(t *testing.T) { 34 rkey := generateTID() 35 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey) 36 37 // Simulate Jetstream comment create event 38 event := &jetstream.JetstreamEvent{ 39 Did: testUser.DID, 40 Kind: "commit", 41 Commit: &jetstream.CommitEvent{ 42 Rev: "test-rev", 43 Operation: "create", 44 Collection: "social.coves.feed.comment", 45 RKey: rkey, 46 CID: "bafytest123", 47 Record: map[string]interface{}{ 48 "$type": "social.coves.feed.comment", 49 "content": "This is a test comment on a post!", 50 "reply": map[string]interface{}{ 51 "root": map[string]interface{}{ 52 "uri": testPostURI, 53 "cid": "bafypost", 54 }, 55 "parent": map[string]interface{}{ 56 "uri": testPostURI, 57 "cid": "bafypost", 58 }, 59 }, 60 "createdAt": time.Now().Format(time.RFC3339), 61 }, 62 }, 63 } 64 65 // Handle the event 66 err := consumer.HandleEvent(ctx, event) 67 if err != nil { 68 t.Fatalf("Failed to handle comment create event: %v", err) 69 } 70 71 // Verify comment was indexed 72 comment, err := commentRepo.GetByURI(ctx, uri) 73 if err != nil { 74 t.Fatalf("Failed to get indexed comment: %v", err) 75 } 76 77 if comment.URI != uri { 78 t.Errorf("Expected URI %s, got %s", uri, comment.URI) 79 } 80 81 if comment.CommenterDID != testUser.DID { 82 t.Errorf("Expected commenter %s, got %s", testUser.DID, comment.CommenterDID) 83 } 84 85 if comment.Content != "This is a test comment on a post!" { 86 t.Errorf("Expected content 'This is a test comment on a post!', got %s", comment.Content) 87 } 88 89 if comment.RootURI != testPostURI { 90 t.Errorf("Expected root URI %s, got %s", testPostURI, comment.RootURI) 91 } 92 93 if comment.ParentURI != testPostURI { 94 t.Errorf("Expected parent URI %s, got %s", testPostURI, comment.ParentURI) 95 } 96 97 // Verify post comment count was incremented 98 var commentCount int 99 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&commentCount) 100 if err != nil { 101 t.Fatalf("Failed to get post comment count: %v", err) 102 } 103 104 if commentCount != 1 { 105 t.Errorf("Expected post comment_count to be 1, got %d", commentCount) 106 } 107 }) 108 109 t.Run("Idempotent create - duplicate event", func(t *testing.T) { 110 rkey := generateTID() 111 112 event := &jetstream.JetstreamEvent{ 113 Did: testUser.DID, 114 Kind: "commit", 115 Commit: &jetstream.CommitEvent{ 116 Rev: "test-rev", 117 Operation: "create", 118 Collection: "social.coves.feed.comment", 119 RKey: rkey, 120 CID: "bafytest456", 121 Record: map[string]interface{}{ 122 "$type": "social.coves.feed.comment", 123 "content": "Idempotent test comment", 124 "reply": map[string]interface{}{ 125 "root": map[string]interface{}{ 126 "uri": testPostURI, 127 "cid": "bafypost", 128 }, 129 "parent": map[string]interface{}{ 130 "uri": testPostURI, 131 "cid": "bafypost", 132 }, 133 }, 134 "createdAt": time.Now().Format(time.RFC3339), 135 }, 136 }, 137 } 138 139 // First creation 140 err := consumer.HandleEvent(ctx, event) 141 if err != nil { 142 t.Fatalf("First creation failed: %v", err) 143 } 144 145 // Get initial comment count 146 var initialCount int 147 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&initialCount) 148 if err != nil { 149 t.Fatalf("Failed to get initial comment count: %v", err) 150 } 151 152 // Duplicate creation - should be idempotent 153 err = consumer.HandleEvent(ctx, event) 154 if err != nil { 155 t.Fatalf("Duplicate event should be handled gracefully: %v", err) 156 } 157 158 // Verify count wasn't incremented again 159 var finalCount int 160 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&finalCount) 161 if err != nil { 162 t.Fatalf("Failed to get final comment count: %v", err) 163 } 164 165 if finalCount != initialCount { 166 t.Errorf("Comment count should not increase on duplicate event. Initial: %d, Final: %d", initialCount, finalCount) 167 } 168 }) 169} 170 171func TestCommentConsumer_Threading(t *testing.T) { 172 db := setupTestDB(t) 173 defer func() { 174 if err := db.Close(); err != nil { 175 t.Logf("Failed to close database: %v", err) 176 } 177 }() 178 179 ctx := context.Background() 180 commentRepo := postgres.NewCommentRepository(db) 181 consumer := jetstream.NewCommentEventConsumer(commentRepo, db) 182 183 // Setup test data 184 testUser := createTestUser(t, db, "threader.test", "did:plc:threader123") 185 testCommunity, err := createFeedTestCommunity(db, ctx, "threadcommunity", "owner2.test") 186 if err != nil { 187 t.Fatalf("Failed to create test community: %v", err) 188 } 189 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Threading Test", 0, time.Now()) 190 191 t.Run("Create nested comment replies", func(t *testing.T) { 192 // Create first-level comment on post 193 comment1Rkey := generateTID() 194 comment1URI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, comment1Rkey) 195 196 event1 := &jetstream.JetstreamEvent{ 197 Did: testUser.DID, 198 Kind: "commit", 199 Commit: &jetstream.CommitEvent{ 200 Operation: "create", 201 Collection: "social.coves.feed.comment", 202 RKey: comment1Rkey, 203 CID: "bafycomment1", 204 Record: map[string]interface{}{ 205 "content": "First level comment", 206 "reply": map[string]interface{}{ 207 "root": map[string]interface{}{ 208 "uri": testPostURI, 209 "cid": "bafypost", 210 }, 211 "parent": map[string]interface{}{ 212 "uri": testPostURI, 213 "cid": "bafypost", 214 }, 215 }, 216 "createdAt": time.Now().Format(time.RFC3339), 217 }, 218 }, 219 } 220 221 err := consumer.HandleEvent(ctx, event1) 222 if err != nil { 223 t.Fatalf("Failed to create first-level comment: %v", err) 224 } 225 226 // Create second-level comment (reply to first comment) 227 comment2Rkey := generateTID() 228 comment2URI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, comment2Rkey) 229 230 event2 := &jetstream.JetstreamEvent{ 231 Did: testUser.DID, 232 Kind: "commit", 233 Commit: &jetstream.CommitEvent{ 234 Operation: "create", 235 Collection: "social.coves.feed.comment", 236 RKey: comment2Rkey, 237 CID: "bafycomment2", 238 Record: map[string]interface{}{ 239 "content": "Second level comment (reply to first)", 240 "reply": map[string]interface{}{ 241 "root": map[string]interface{}{ 242 "uri": testPostURI, 243 "cid": "bafypost", 244 }, 245 "parent": map[string]interface{}{ 246 "uri": comment1URI, 247 "cid": "bafycomment1", 248 }, 249 }, 250 "createdAt": time.Now().Add(1 * time.Second).Format(time.RFC3339), 251 }, 252 }, 253 } 254 255 err = consumer.HandleEvent(ctx, event2) 256 if err != nil { 257 t.Fatalf("Failed to create second-level comment: %v", err) 258 } 259 260 // Verify threading structure 261 comment1, err := commentRepo.GetByURI(ctx, comment1URI) 262 if err != nil { 263 t.Fatalf("Failed to get first comment: %v", err) 264 } 265 266 comment2, err := commentRepo.GetByURI(ctx, comment2URI) 267 if err != nil { 268 t.Fatalf("Failed to get second comment: %v", err) 269 } 270 271 // Both should have same root (original post) 272 if comment1.RootURI != testPostURI { 273 t.Errorf("Comment1 root should be post URI, got %s", comment1.RootURI) 274 } 275 276 if comment2.RootURI != testPostURI { 277 t.Errorf("Comment2 root should be post URI, got %s", comment2.RootURI) 278 } 279 280 // Comment1 parent should be post 281 if comment1.ParentURI != testPostURI { 282 t.Errorf("Comment1 parent should be post URI, got %s", comment1.ParentURI) 283 } 284 285 // Comment2 parent should be comment1 286 if comment2.ParentURI != comment1URI { 287 t.Errorf("Comment2 parent should be comment1 URI, got %s", comment2.ParentURI) 288 } 289 290 // Verify reply count on comment1 291 if comment1.ReplyCount != 1 { 292 t.Errorf("Comment1 should have 1 reply, got %d", comment1.ReplyCount) 293 } 294 295 // Query all comments by root 296 allComments, err := commentRepo.ListByRoot(ctx, testPostURI, 100, 0) 297 if err != nil { 298 t.Fatalf("Failed to list comments by root: %v", err) 299 } 300 301 if len(allComments) != 2 { 302 t.Errorf("Expected 2 comments in thread, got %d", len(allComments)) 303 } 304 305 // Query direct replies to post 306 directReplies, err := commentRepo.ListByParent(ctx, testPostURI, 100, 0) 307 if err != nil { 308 t.Fatalf("Failed to list direct replies to post: %v", err) 309 } 310 311 if len(directReplies) != 1 { 312 t.Errorf("Expected 1 direct reply to post, got %d", len(directReplies)) 313 } 314 315 // Query replies to comment1 316 comment1Replies, err := commentRepo.ListByParent(ctx, comment1URI, 100, 0) 317 if err != nil { 318 t.Fatalf("Failed to list replies to comment1: %v", err) 319 } 320 321 if len(comment1Replies) != 1 { 322 t.Errorf("Expected 1 reply to comment1, got %d", len(comment1Replies)) 323 } 324 }) 325} 326 327func TestCommentConsumer_UpdateComment(t *testing.T) { 328 db := setupTestDB(t) 329 defer func() { 330 if err := db.Close(); err != nil { 331 t.Logf("Failed to close database: %v", err) 332 } 333 }() 334 335 ctx := context.Background() 336 commentRepo := postgres.NewCommentRepository(db) 337 consumer := jetstream.NewCommentEventConsumer(commentRepo, db) 338 339 // Setup test data 340 testUser := createTestUser(t, db, "editor.test", "did:plc:editor123") 341 testCommunity, err := createFeedTestCommunity(db, ctx, "editcommunity", "owner3.test") 342 if err != nil { 343 t.Fatalf("Failed to create test community: %v", err) 344 } 345 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Edit Test", 0, time.Now()) 346 347 t.Run("Update comment content preserves vote counts", func(t *testing.T) { 348 rkey := generateTID() 349 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey) 350 351 // Create initial comment 352 createEvent := &jetstream.JetstreamEvent{ 353 Did: testUser.DID, 354 Kind: "commit", 355 Commit: &jetstream.CommitEvent{ 356 Operation: "create", 357 Collection: "social.coves.feed.comment", 358 RKey: rkey, 359 CID: "bafyoriginal", 360 Record: map[string]interface{}{ 361 "content": "Original comment content", 362 "reply": map[string]interface{}{ 363 "root": map[string]interface{}{ 364 "uri": testPostURI, 365 "cid": "bafypost", 366 }, 367 "parent": map[string]interface{}{ 368 "uri": testPostURI, 369 "cid": "bafypost", 370 }, 371 }, 372 "createdAt": time.Now().Format(time.RFC3339), 373 }, 374 }, 375 } 376 377 err := consumer.HandleEvent(ctx, createEvent) 378 if err != nil { 379 t.Fatalf("Failed to create comment: %v", err) 380 } 381 382 // Manually set vote counts to simulate votes 383 _, err = db.ExecContext(ctx, ` 384 UPDATE comments 385 SET upvote_count = 5, downvote_count = 2, score = 3 386 WHERE uri = $1 387 `, uri) 388 if err != nil { 389 t.Fatalf("Failed to set vote counts: %v", err) 390 } 391 392 // Update the comment 393 updateEvent := &jetstream.JetstreamEvent{ 394 Did: testUser.DID, 395 Kind: "commit", 396 Commit: &jetstream.CommitEvent{ 397 Operation: "update", 398 Collection: "social.coves.feed.comment", 399 RKey: rkey, 400 CID: "bafyupdated", 401 Record: map[string]interface{}{ 402 "content": "EDITED: Updated comment content", 403 "reply": map[string]interface{}{ 404 "root": map[string]interface{}{ 405 "uri": testPostURI, 406 "cid": "bafypost", 407 }, 408 "parent": map[string]interface{}{ 409 "uri": testPostURI, 410 "cid": "bafypost", 411 }, 412 }, 413 "createdAt": time.Now().Format(time.RFC3339), 414 }, 415 }, 416 } 417 418 err = consumer.HandleEvent(ctx, updateEvent) 419 if err != nil { 420 t.Fatalf("Failed to update comment: %v", err) 421 } 422 423 // Verify content updated 424 comment, err := commentRepo.GetByURI(ctx, uri) 425 if err != nil { 426 t.Fatalf("Failed to get updated comment: %v", err) 427 } 428 429 if comment.Content != "EDITED: Updated comment content" { 430 t.Errorf("Expected updated content, got %s", comment.Content) 431 } 432 433 // Verify CID updated 434 if comment.CID != "bafyupdated" { 435 t.Errorf("Expected CID to be updated to bafyupdated, got %s", comment.CID) 436 } 437 438 // Verify vote counts preserved 439 if comment.UpvoteCount != 5 { 440 t.Errorf("Expected upvote_count preserved at 5, got %d", comment.UpvoteCount) 441 } 442 443 if comment.DownvoteCount != 2 { 444 t.Errorf("Expected downvote_count preserved at 2, got %d", comment.DownvoteCount) 445 } 446 447 if comment.Score != 3 { 448 t.Errorf("Expected score preserved at 3, got %d", comment.Score) 449 } 450 }) 451} 452 453func TestCommentConsumer_DeleteComment(t *testing.T) { 454 db := setupTestDB(t) 455 defer func() { 456 if err := db.Close(); err != nil { 457 t.Logf("Failed to close database: %v", err) 458 } 459 }() 460 461 ctx := context.Background() 462 commentRepo := postgres.NewCommentRepository(db) 463 consumer := jetstream.NewCommentEventConsumer(commentRepo, db) 464 465 // Setup test data 466 testUser := createTestUser(t, db, "deleter.test", "did:plc:deleter123") 467 testCommunity, err := createFeedTestCommunity(db, ctx, "deletecommunity", "owner4.test") 468 if err != nil { 469 t.Fatalf("Failed to create test community: %v", err) 470 } 471 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Delete Test", 0, time.Now()) 472 473 t.Run("Delete comment decrements parent count", func(t *testing.T) { 474 rkey := generateTID() 475 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey) 476 477 // Create comment 478 createEvent := &jetstream.JetstreamEvent{ 479 Did: testUser.DID, 480 Kind: "commit", 481 Commit: &jetstream.CommitEvent{ 482 Operation: "create", 483 Collection: "social.coves.feed.comment", 484 RKey: rkey, 485 CID: "bafydelete", 486 Record: map[string]interface{}{ 487 "content": "Comment to be deleted", 488 "reply": map[string]interface{}{ 489 "root": map[string]interface{}{ 490 "uri": testPostURI, 491 "cid": "bafypost", 492 }, 493 "parent": map[string]interface{}{ 494 "uri": testPostURI, 495 "cid": "bafypost", 496 }, 497 }, 498 "createdAt": time.Now().Format(time.RFC3339), 499 }, 500 }, 501 } 502 503 err := consumer.HandleEvent(ctx, createEvent) 504 if err != nil { 505 t.Fatalf("Failed to create comment: %v", err) 506 } 507 508 // Get initial post comment count 509 var initialCount int 510 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&initialCount) 511 if err != nil { 512 t.Fatalf("Failed to get initial comment count: %v", err) 513 } 514 515 // Delete comment 516 deleteEvent := &jetstream.JetstreamEvent{ 517 Did: testUser.DID, 518 Kind: "commit", 519 Commit: &jetstream.CommitEvent{ 520 Operation: "delete", 521 Collection: "social.coves.feed.comment", 522 RKey: rkey, 523 }, 524 } 525 526 err = consumer.HandleEvent(ctx, deleteEvent) 527 if err != nil { 528 t.Fatalf("Failed to delete comment: %v", err) 529 } 530 531 // Verify soft delete 532 comment, err := commentRepo.GetByURI(ctx, uri) 533 if err != nil { 534 t.Fatalf("Failed to get deleted comment: %v", err) 535 } 536 537 if comment.DeletedAt == nil { 538 t.Error("Expected deleted_at to be set, got nil") 539 } 540 541 // Verify post comment count decremented 542 var finalCount int 543 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&finalCount) 544 if err != nil { 545 t.Fatalf("Failed to get final comment count: %v", err) 546 } 547 548 if finalCount != initialCount-1 { 549 t.Errorf("Expected comment count to decrease by 1. Initial: %d, Final: %d", initialCount, finalCount) 550 } 551 }) 552 553 t.Run("Delete is idempotent", func(t *testing.T) { 554 rkey := generateTID() 555 556 // Create comment 557 createEvent := &jetstream.JetstreamEvent{ 558 Did: testUser.DID, 559 Kind: "commit", 560 Commit: &jetstream.CommitEvent{ 561 Operation: "create", 562 Collection: "social.coves.feed.comment", 563 RKey: rkey, 564 CID: "bafyidempdelete", 565 Record: map[string]interface{}{ 566 "content": "Idempotent delete test", 567 "reply": map[string]interface{}{ 568 "root": map[string]interface{}{ 569 "uri": testPostURI, 570 "cid": "bafypost", 571 }, 572 "parent": map[string]interface{}{ 573 "uri": testPostURI, 574 "cid": "bafypost", 575 }, 576 }, 577 "createdAt": time.Now().Format(time.RFC3339), 578 }, 579 }, 580 } 581 582 err := consumer.HandleEvent(ctx, createEvent) 583 if err != nil { 584 t.Fatalf("Failed to create comment: %v", err) 585 } 586 587 // First delete 588 deleteEvent := &jetstream.JetstreamEvent{ 589 Did: testUser.DID, 590 Kind: "commit", 591 Commit: &jetstream.CommitEvent{ 592 Operation: "delete", 593 Collection: "social.coves.feed.comment", 594 RKey: rkey, 595 }, 596 } 597 598 err = consumer.HandleEvent(ctx, deleteEvent) 599 if err != nil { 600 t.Fatalf("First delete failed: %v", err) 601 } 602 603 // Get count after first delete 604 var countAfterFirstDelete int 605 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&countAfterFirstDelete) 606 if err != nil { 607 t.Fatalf("Failed to get count after first delete: %v", err) 608 } 609 610 // Second delete (idempotent) 611 err = consumer.HandleEvent(ctx, deleteEvent) 612 if err != nil { 613 t.Fatalf("Second delete should be idempotent: %v", err) 614 } 615 616 // Verify count didn't change 617 var countAfterSecondDelete int 618 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&countAfterSecondDelete) 619 if err != nil { 620 t.Fatalf("Failed to get count after second delete: %v", err) 621 } 622 623 if countAfterSecondDelete != countAfterFirstDelete { 624 t.Errorf("Count should not change on duplicate delete. After first: %d, After second: %d", countAfterFirstDelete, countAfterSecondDelete) 625 } 626 }) 627} 628 629func TestCommentConsumer_SecurityValidation(t *testing.T) { 630 db := setupTestDB(t) 631 defer func() { 632 if err := db.Close(); err != nil { 633 t.Logf("Failed to close database: %v", err) 634 } 635 }() 636 637 ctx := context.Background() 638 commentRepo := postgres.NewCommentRepository(db) 639 consumer := jetstream.NewCommentEventConsumer(commentRepo, db) 640 641 testUser := createTestUser(t, db, "security.test", "did:plc:security123") 642 testCommunity, err := createFeedTestCommunity(db, ctx, "seccommunity", "owner5.test") 643 if err != nil { 644 t.Fatalf("Failed to create test community: %v", err) 645 } 646 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Security Test", 0, time.Now()) 647 648 t.Run("Reject comment with empty content", func(t *testing.T) { 649 event := &jetstream.JetstreamEvent{ 650 Did: testUser.DID, 651 Kind: "commit", 652 Commit: &jetstream.CommitEvent{ 653 Operation: "create", 654 Collection: "social.coves.feed.comment", 655 RKey: generateTID(), 656 CID: "bafyinvalid", 657 Record: map[string]interface{}{ 658 "content": "", // Empty content 659 "reply": map[string]interface{}{ 660 "root": map[string]interface{}{ 661 "uri": testPostURI, 662 "cid": "bafypost", 663 }, 664 "parent": map[string]interface{}{ 665 "uri": testPostURI, 666 "cid": "bafypost", 667 }, 668 }, 669 "createdAt": time.Now().Format(time.RFC3339), 670 }, 671 }, 672 } 673 674 err := consumer.HandleEvent(ctx, event) 675 if err == nil { 676 t.Error("Expected error for empty content, got nil") 677 } 678 }) 679 680 t.Run("Reject comment with invalid root reference", func(t *testing.T) { 681 event := &jetstream.JetstreamEvent{ 682 Did: testUser.DID, 683 Kind: "commit", 684 Commit: &jetstream.CommitEvent{ 685 Operation: "create", 686 Collection: "social.coves.feed.comment", 687 RKey: generateTID(), 688 CID: "bafyinvalid2", 689 Record: map[string]interface{}{ 690 "content": "Valid content", 691 "reply": map[string]interface{}{ 692 "root": map[string]interface{}{ 693 "uri": "", // Missing URI 694 "cid": "bafypost", 695 }, 696 "parent": map[string]interface{}{ 697 "uri": testPostURI, 698 "cid": "bafypost", 699 }, 700 }, 701 "createdAt": time.Now().Format(time.RFC3339), 702 }, 703 }, 704 } 705 706 err := consumer.HandleEvent(ctx, event) 707 if err == nil { 708 t.Error("Expected error for invalid root reference, got nil") 709 } 710 }) 711 712 t.Run("Reject comment with invalid parent reference", func(t *testing.T) { 713 event := &jetstream.JetstreamEvent{ 714 Did: testUser.DID, 715 Kind: "commit", 716 Commit: &jetstream.CommitEvent{ 717 Operation: "create", 718 Collection: "social.coves.feed.comment", 719 RKey: generateTID(), 720 CID: "bafyinvalid3", 721 Record: map[string]interface{}{ 722 "content": "Valid content", 723 "reply": map[string]interface{}{ 724 "root": map[string]interface{}{ 725 "uri": testPostURI, 726 "cid": "bafypost", 727 }, 728 "parent": map[string]interface{}{ 729 "uri": testPostURI, 730 "cid": "", // Missing CID 731 }, 732 }, 733 "createdAt": time.Now().Format(time.RFC3339), 734 }, 735 }, 736 } 737 738 err := consumer.HandleEvent(ctx, event) 739 if err == nil { 740 t.Error("Expected error for invalid parent reference, got nil") 741 } 742 }) 743 744 t.Run("Reject comment with invalid DID format", func(t *testing.T) { 745 event := &jetstream.JetstreamEvent{ 746 Did: "invalid-did-format", // Bad DID 747 Kind: "commit", 748 Commit: &jetstream.CommitEvent{ 749 Operation: "create", 750 Collection: "social.coves.feed.comment", 751 RKey: generateTID(), 752 CID: "bafyinvalid4", 753 Record: map[string]interface{}{ 754 "content": "Valid content", 755 "reply": map[string]interface{}{ 756 "root": map[string]interface{}{ 757 "uri": testPostURI, 758 "cid": "bafypost", 759 }, 760 "parent": map[string]interface{}{ 761 "uri": testPostURI, 762 "cid": "bafypost", 763 }, 764 }, 765 "createdAt": time.Now().Format(time.RFC3339), 766 }, 767 }, 768 } 769 770 err := consumer.HandleEvent(ctx, event) 771 if err == nil { 772 t.Error("Expected error for invalid DID format, got nil") 773 } 774 }) 775 776 t.Run("Reject comment exceeding max content length", func(t *testing.T) { 777 event := &jetstream.JetstreamEvent{ 778 Did: testUser.DID, 779 Kind: "commit", 780 Commit: &jetstream.CommitEvent{ 781 Operation: "create", 782 Collection: "social.coves.feed.comment", 783 RKey: generateTID(), 784 CID: "bafytoobig", 785 Record: map[string]interface{}{ 786 "content": string(make([]byte, 30001)), // Exceeds 30000 byte limit 787 "reply": map[string]interface{}{ 788 "root": map[string]interface{}{ 789 "uri": testPostURI, 790 "cid": "bafypost", 791 }, 792 "parent": map[string]interface{}{ 793 "uri": testPostURI, 794 "cid": "bafypost", 795 }, 796 }, 797 "createdAt": time.Now().Format(time.RFC3339), 798 }, 799 }, 800 } 801 802 err := consumer.HandleEvent(ctx, event) 803 if err == nil { 804 t.Error("Expected error for oversized content, got nil") 805 } 806 if err != nil && !contains(err.Error(), "exceeds maximum length") { 807 t.Errorf("Expected 'exceeds maximum length' error, got: %v", err) 808 } 809 }) 810 811 t.Run("Reject comment with malformed parent URI", func(t *testing.T) { 812 event := &jetstream.JetstreamEvent{ 813 Did: testUser.DID, 814 Kind: "commit", 815 Commit: &jetstream.CommitEvent{ 816 Operation: "create", 817 Collection: "social.coves.feed.comment", 818 RKey: generateTID(), 819 CID: "bafymalformed", 820 Record: map[string]interface{}{ 821 "content": "Valid content", 822 "reply": map[string]interface{}{ 823 "root": map[string]interface{}{ 824 "uri": testPostURI, 825 "cid": "bafypost", 826 }, 827 "parent": map[string]interface{}{ 828 "uri": "at://malformed", // Invalid: missing collection/rkey 829 "cid": "bafyparent", 830 }, 831 }, 832 "createdAt": time.Now().Format(time.RFC3339), 833 }, 834 }, 835 } 836 837 err := consumer.HandleEvent(ctx, event) 838 if err == nil { 839 t.Error("Expected error for malformed AT-URI, got nil") 840 } 841 if err != nil && !contains(err.Error(), "invalid parent URI") { 842 t.Errorf("Expected 'invalid parent URI' error, got: %v", err) 843 } 844 }) 845 846 t.Run("Reject comment with malformed root URI", func(t *testing.T) { 847 event := &jetstream.JetstreamEvent{ 848 Did: testUser.DID, 849 Kind: "commit", 850 Commit: &jetstream.CommitEvent{ 851 Operation: "create", 852 Collection: "social.coves.feed.comment", 853 RKey: generateTID(), 854 CID: "bafymalformed2", 855 Record: map[string]interface{}{ 856 "content": "Valid content", 857 "reply": map[string]interface{}{ 858 "root": map[string]interface{}{ 859 "uri": "at://did:plc:test123", // Invalid: missing collection/rkey 860 "cid": "bafyroot", 861 }, 862 "parent": map[string]interface{}{ 863 "uri": testPostURI, 864 "cid": "bafyparent", 865 }, 866 }, 867 "createdAt": time.Now().Format(time.RFC3339), 868 }, 869 }, 870 } 871 872 err := consumer.HandleEvent(ctx, event) 873 if err == nil { 874 t.Error("Expected error for malformed AT-URI, got nil") 875 } 876 if err != nil && !contains(err.Error(), "invalid root URI") { 877 t.Errorf("Expected 'invalid root URI' error, got: %v", err) 878 } 879 }) 880} 881 882func TestCommentRepository_Queries(t *testing.T) { 883 db := setupTestDB(t) 884 defer func() { 885 if err := db.Close(); err != nil { 886 t.Logf("Failed to close database: %v", err) 887 } 888 }() 889 890 ctx := context.Background() 891 commentRepo := postgres.NewCommentRepository(db) 892 893 // Clean up any existing test data from previous runs 894 _, err := db.ExecContext(ctx, "DELETE FROM comments WHERE commenter_did LIKE 'did:plc:%'") 895 if err != nil { 896 t.Fatalf("Failed to clean up test comments: %v", err) 897 } 898 899 testUser := createTestUser(t, db, "query.test", "did:plc:query123") 900 testCommunity, err := createFeedTestCommunity(db, ctx, "querycommunity", "owner6.test") 901 if err != nil { 902 t.Fatalf("Failed to create test community: %v", err) 903 } 904 postURI := createTestPost(t, db, testCommunity, testUser.DID, "Query Test", 0, time.Now()) 905 906 // Create a comment tree 907 // Post 908 // |- Comment 1 909 // |- Comment 2 910 // |- Comment 3 911 // |- Comment 4 912 913 comment1 := &comments.Comment{ 914 URI: fmt.Sprintf("at://%s/social.coves.feed.comment/1", testUser.DID), 915 CID: "bafyc1", 916 RKey: "1", 917 CommenterDID: testUser.DID, 918 RootURI: postURI, 919 RootCID: "bafypost", 920 ParentURI: postURI, 921 ParentCID: "bafypost", 922 Content: "Comment 1", 923 Langs: []string{}, 924 CreatedAt: time.Now(), 925 } 926 927 comment2 := &comments.Comment{ 928 URI: fmt.Sprintf("at://%s/social.coves.feed.comment/2", testUser.DID), 929 CID: "bafyc2", 930 RKey: "2", 931 CommenterDID: testUser.DID, 932 RootURI: postURI, 933 RootCID: "bafypost", 934 ParentURI: comment1.URI, 935 ParentCID: "bafyc1", 936 Content: "Comment 2 (reply to 1)", 937 Langs: []string{}, 938 CreatedAt: time.Now().Add(1 * time.Second), 939 } 940 941 comment3 := &comments.Comment{ 942 URI: fmt.Sprintf("at://%s/social.coves.feed.comment/3", testUser.DID), 943 CID: "bafyc3", 944 RKey: "3", 945 CommenterDID: testUser.DID, 946 RootURI: postURI, 947 RootCID: "bafypost", 948 ParentURI: comment1.URI, 949 ParentCID: "bafyc1", 950 Content: "Comment 3 (reply to 1)", 951 Langs: []string{}, 952 CreatedAt: time.Now().Add(2 * time.Second), 953 } 954 955 comment4 := &comments.Comment{ 956 URI: fmt.Sprintf("at://%s/social.coves.feed.comment/4", testUser.DID), 957 CID: "bafyc4", 958 RKey: "4", 959 CommenterDID: testUser.DID, 960 RootURI: postURI, 961 RootCID: "bafypost", 962 ParentURI: postURI, 963 ParentCID: "bafypost", 964 Content: "Comment 4", 965 Langs: []string{}, 966 CreatedAt: time.Now().Add(3 * time.Second), 967 } 968 969 // Create all comments 970 for i, c := range []*comments.Comment{comment1, comment2, comment3, comment4} { 971 if err := commentRepo.Create(ctx, c); err != nil { 972 t.Fatalf("Failed to create comment %d: %v", i+1, err) 973 } 974 t.Logf("Created comment %d: %s", i+1, c.URI) 975 } 976 977 // Verify comments were created 978 verifyCount, err := commentRepo.CountByParent(ctx, postURI) 979 if err != nil { 980 t.Fatalf("Failed to count comments: %v", err) 981 } 982 t.Logf("Direct replies to post after creation: %d", verifyCount) 983 984 t.Run("ListByRoot returns all comments in thread", func(t *testing.T) { 985 comments, err := commentRepo.ListByRoot(ctx, postURI, 100, 0) 986 if err != nil { 987 t.Fatalf("Failed to list by root: %v", err) 988 } 989 990 if len(comments) != 4 { 991 t.Errorf("Expected 4 comments, got %d", len(comments)) 992 } 993 }) 994 995 t.Run("ListByParent returns direct replies", func(t *testing.T) { 996 // Direct replies to post 997 postReplies, err := commentRepo.ListByParent(ctx, postURI, 100, 0) 998 if err != nil { 999 t.Fatalf("Failed to list post replies: %v", err) 1000 } 1001 1002 if len(postReplies) != 2 { 1003 t.Errorf("Expected 2 direct replies to post, got %d", len(postReplies)) 1004 } 1005 1006 // Direct replies to comment1 1007 comment1Replies, err := commentRepo.ListByParent(ctx, comment1.URI, 100, 0) 1008 if err != nil { 1009 t.Fatalf("Failed to list comment1 replies: %v", err) 1010 } 1011 1012 if len(comment1Replies) != 2 { 1013 t.Errorf("Expected 2 direct replies to comment1, got %d", len(comment1Replies)) 1014 } 1015 }) 1016 1017 t.Run("CountByParent returns correct counts", func(t *testing.T) { 1018 postCount, err := commentRepo.CountByParent(ctx, postURI) 1019 if err != nil { 1020 t.Fatalf("Failed to count post replies: %v", err) 1021 } 1022 1023 if postCount != 2 { 1024 t.Errorf("Expected 2 direct replies to post, got %d", postCount) 1025 } 1026 1027 comment1Count, err := commentRepo.CountByParent(ctx, comment1.URI) 1028 if err != nil { 1029 t.Fatalf("Failed to count comment1 replies: %v", err) 1030 } 1031 1032 if comment1Count != 2 { 1033 t.Errorf("Expected 2 direct replies to comment1, got %d", comment1Count) 1034 } 1035 }) 1036 1037 t.Run("ListByCommenter returns user's comments", func(t *testing.T) { 1038 userComments, err := commentRepo.ListByCommenter(ctx, testUser.DID, 100, 0) 1039 if err != nil { 1040 t.Fatalf("Failed to list by commenter: %v", err) 1041 } 1042 1043 if len(userComments) != 4 { 1044 t.Errorf("Expected 4 comments by user, got %d", len(userComments)) 1045 } 1046 }) 1047} 1048 1049// TestCommentConsumer_OutOfOrderReconciliation tests that parent counts are 1050// correctly reconciled when child comments arrive before their parent 1051func TestCommentConsumer_OutOfOrderReconciliation(t *testing.T) { 1052 db := setupTestDB(t) 1053 defer func() { 1054 if err := db.Close(); err != nil { 1055 t.Logf("Failed to close database: %v", err) 1056 } 1057 }() 1058 1059 ctx := context.Background() 1060 commentRepo := postgres.NewCommentRepository(db) 1061 consumer := jetstream.NewCommentEventConsumer(commentRepo, db) 1062 1063 testUser := createTestUser(t, db, "outoforder.test", "did:plc:outoforder123") 1064 testCommunity, err := createFeedTestCommunity(db, ctx, "ooo-community", "owner7.test") 1065 if err != nil { 1066 t.Fatalf("Failed to create test community: %v", err) 1067 } 1068 postURI := createTestPost(t, db, testCommunity, testUser.DID, "OOO Test Post", 0, time.Now()) 1069 1070 t.Run("Child arrives before parent - count reconciled", func(t *testing.T) { 1071 // Scenario: User A creates comment C1 on post 1072 // User B creates reply C2 to C1 1073 // Jetstream delivers C2 before C1 (different repos) 1074 // When C1 finally arrives, its reply_count should be 1, not 0 1075 1076 parentRkey := generateTID() 1077 parentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, parentRkey) 1078 1079 childRkey := generateTID() 1080 childURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, childRkey) 1081 1082 // Step 1: Index child FIRST (before parent exists) 1083 childEvent := &jetstream.JetstreamEvent{ 1084 Did: testUser.DID, 1085 Kind: "commit", 1086 Commit: &jetstream.CommitEvent{ 1087 Rev: "child-rev", 1088 Operation: "create", 1089 Collection: "social.coves.feed.comment", 1090 RKey: childRkey, 1091 CID: "bafychild", 1092 Record: map[string]interface{}{ 1093 "$type": "social.coves.feed.comment", 1094 "content": "This is a reply to a comment that doesn't exist yet!", 1095 "reply": map[string]interface{}{ 1096 "root": map[string]interface{}{ 1097 "uri": postURI, 1098 "cid": "bafypost", 1099 }, 1100 "parent": map[string]interface{}{ 1101 "uri": parentURI, // Points to parent that doesn't exist yet 1102 "cid": "bafyparent", 1103 }, 1104 }, 1105 "createdAt": time.Now().Format(time.RFC3339), 1106 }, 1107 }, 1108 } 1109 1110 err := consumer.HandleEvent(ctx, childEvent) 1111 if err != nil { 1112 t.Fatalf("Failed to handle child event: %v", err) 1113 } 1114 1115 // Verify child was indexed 1116 childComment, err := commentRepo.GetByURI(ctx, childURI) 1117 if err != nil { 1118 t.Fatalf("Child comment not indexed: %v", err) 1119 } 1120 if childComment.ParentURI != parentURI { 1121 t.Errorf("Expected child parent_uri %s, got %s", parentURI, childComment.ParentURI) 1122 } 1123 1124 // Step 2: Now index parent (arrives late due to Jetstream ordering) 1125 parentEvent := &jetstream.JetstreamEvent{ 1126 Did: testUser.DID, 1127 Kind: "commit", 1128 Commit: &jetstream.CommitEvent{ 1129 Rev: "parent-rev", 1130 Operation: "create", 1131 Collection: "social.coves.feed.comment", 1132 RKey: parentRkey, 1133 CID: "bafyparent", 1134 Record: map[string]interface{}{ 1135 "$type": "social.coves.feed.comment", 1136 "content": "This is the parent comment arriving late", 1137 "reply": map[string]interface{}{ 1138 "root": map[string]interface{}{ 1139 "uri": postURI, 1140 "cid": "bafypost", 1141 }, 1142 "parent": map[string]interface{}{ 1143 "uri": postURI, 1144 "cid": "bafypost", 1145 }, 1146 }, 1147 "createdAt": time.Now().Format(time.RFC3339), 1148 }, 1149 }, 1150 } 1151 1152 err = consumer.HandleEvent(ctx, parentEvent) 1153 if err != nil { 1154 t.Fatalf("Failed to handle parent event: %v", err) 1155 } 1156 1157 // Step 3: Verify parent was indexed with CORRECT reply_count 1158 parentComment, err := commentRepo.GetByURI(ctx, parentURI) 1159 if err != nil { 1160 t.Fatalf("Parent comment not indexed: %v", err) 1161 } 1162 1163 // THIS IS THE KEY TEST: Parent should have reply_count = 1 due to reconciliation 1164 if parentComment.ReplyCount != 1 { 1165 t.Errorf("Expected parent reply_count to be 1 (reconciled), got %d", parentComment.ReplyCount) 1166 t.Logf("This indicates out-of-order reconciliation failed!") 1167 } 1168 1169 // Verify via query as well 1170 count, err := commentRepo.CountByParent(ctx, parentURI) 1171 if err != nil { 1172 t.Fatalf("Failed to count parent replies: %v", err) 1173 } 1174 if count != 1 { 1175 t.Errorf("Expected 1 reply to parent, got %d", count) 1176 } 1177 }) 1178 1179 t.Run("Multiple children arrive before parent", func(t *testing.T) { 1180 parentRkey := generateTID() 1181 parentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, parentRkey) 1182 1183 // Index 3 children before parent 1184 for i := 1; i <= 3; i++ { 1185 childRkey := generateTID() 1186 childEvent := &jetstream.JetstreamEvent{ 1187 Did: testUser.DID, 1188 Kind: "commit", 1189 Commit: &jetstream.CommitEvent{ 1190 Rev: fmt.Sprintf("child-%d-rev", i), 1191 Operation: "create", 1192 Collection: "social.coves.feed.comment", 1193 RKey: childRkey, 1194 CID: fmt.Sprintf("bafychild%d", i), 1195 Record: map[string]interface{}{ 1196 "$type": "social.coves.feed.comment", 1197 "content": fmt.Sprintf("Reply %d before parent", i), 1198 "reply": map[string]interface{}{ 1199 "root": map[string]interface{}{ 1200 "uri": postURI, 1201 "cid": "bafypost", 1202 }, 1203 "parent": map[string]interface{}{ 1204 "uri": parentURI, 1205 "cid": "bafyparent2", 1206 }, 1207 }, 1208 "createdAt": time.Now().Format(time.RFC3339), 1209 }, 1210 }, 1211 } 1212 1213 err := consumer.HandleEvent(ctx, childEvent) 1214 if err != nil { 1215 t.Fatalf("Failed to handle child %d event: %v", i, err) 1216 } 1217 } 1218 1219 // Now index parent 1220 parentEvent := &jetstream.JetstreamEvent{ 1221 Did: testUser.DID, 1222 Kind: "commit", 1223 Commit: &jetstream.CommitEvent{ 1224 Rev: "parent2-rev", 1225 Operation: "create", 1226 Collection: "social.coves.feed.comment", 1227 RKey: parentRkey, 1228 CID: "bafyparent2", 1229 Record: map[string]interface{}{ 1230 "$type": "social.coves.feed.comment", 1231 "content": "Parent with 3 pre-existing children", 1232 "reply": map[string]interface{}{ 1233 "root": map[string]interface{}{ 1234 "uri": postURI, 1235 "cid": "bafypost", 1236 }, 1237 "parent": map[string]interface{}{ 1238 "uri": postURI, 1239 "cid": "bafypost", 1240 }, 1241 }, 1242 "createdAt": time.Now().Format(time.RFC3339), 1243 }, 1244 }, 1245 } 1246 1247 err := consumer.HandleEvent(ctx, parentEvent) 1248 if err != nil { 1249 t.Fatalf("Failed to handle parent event: %v", err) 1250 } 1251 1252 // Verify parent has reply_count = 3 1253 parentComment, err := commentRepo.GetByURI(ctx, parentURI) 1254 if err != nil { 1255 t.Fatalf("Parent comment not indexed: %v", err) 1256 } 1257 1258 if parentComment.ReplyCount != 3 { 1259 t.Errorf("Expected parent reply_count to be 3 (reconciled), got %d", parentComment.ReplyCount) 1260 } 1261 }) 1262} 1263 1264// TestCommentConsumer_Resurrection tests that soft-deleted comments can be recreated 1265// In atProto, deleted records' rkeys become available for reuse 1266func TestCommentConsumer_Resurrection(t *testing.T) { 1267 db := setupTestDB(t) 1268 defer func() { 1269 if err := db.Close(); err != nil { 1270 t.Logf("Failed to close database: %v", err) 1271 } 1272 }() 1273 1274 ctx := context.Background() 1275 commentRepo := postgres.NewCommentRepository(db) 1276 consumer := jetstream.NewCommentEventConsumer(commentRepo, db) 1277 1278 testUser := createTestUser(t, db, "resurrect.test", "did:plc:resurrect123") 1279 testCommunity, err := createFeedTestCommunity(db, ctx, "resurrect-community", "owner8.test") 1280 if err != nil { 1281 t.Fatalf("Failed to create test community: %v", err) 1282 } 1283 postURI := createTestPost(t, db, testCommunity, testUser.DID, "Resurrection Test", 0, time.Now()) 1284 1285 rkey := generateTID() 1286 commentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey) 1287 1288 t.Run("Recreate deleted comment with same rkey", func(t *testing.T) { 1289 // Step 1: Create initial comment 1290 createEvent := &jetstream.JetstreamEvent{ 1291 Did: testUser.DID, 1292 Kind: "commit", 1293 Commit: &jetstream.CommitEvent{ 1294 Rev: "v1", 1295 Operation: "create", 1296 Collection: "social.coves.feed.comment", 1297 RKey: rkey, 1298 CID: "bafyoriginal", 1299 Record: map[string]interface{}{ 1300 "$type": "social.coves.feed.comment", 1301 "content": "Original comment content", 1302 "reply": map[string]interface{}{ 1303 "root": map[string]interface{}{ 1304 "uri": postURI, 1305 "cid": "bafypost", 1306 }, 1307 "parent": map[string]interface{}{ 1308 "uri": postURI, 1309 "cid": "bafypost", 1310 }, 1311 }, 1312 "createdAt": time.Now().Format(time.RFC3339), 1313 }, 1314 }, 1315 } 1316 1317 err := consumer.HandleEvent(ctx, createEvent) 1318 if err != nil { 1319 t.Fatalf("Failed to create initial comment: %v", err) 1320 } 1321 1322 // Verify comment exists 1323 comment, err := commentRepo.GetByURI(ctx, commentURI) 1324 if err != nil { 1325 t.Fatalf("Comment not found after creation: %v", err) 1326 } 1327 if comment.Content != "Original comment content" { 1328 t.Errorf("Expected content 'Original comment content', got '%s'", comment.Content) 1329 } 1330 if comment.DeletedAt != nil { 1331 t.Errorf("Expected deleted_at to be nil, got %v", comment.DeletedAt) 1332 } 1333 1334 // Step 2: Delete the comment 1335 deleteEvent := &jetstream.JetstreamEvent{ 1336 Did: testUser.DID, 1337 Kind: "commit", 1338 Commit: &jetstream.CommitEvent{ 1339 Rev: "v2", 1340 Operation: "delete", 1341 Collection: "social.coves.feed.comment", 1342 RKey: rkey, 1343 }, 1344 } 1345 1346 err = consumer.HandleEvent(ctx, deleteEvent) 1347 if err != nil { 1348 t.Fatalf("Failed to delete comment: %v", err) 1349 } 1350 1351 // Verify comment is soft-deleted 1352 comment, err = commentRepo.GetByURI(ctx, commentURI) 1353 if err != nil { 1354 t.Fatalf("Comment not found after deletion: %v", err) 1355 } 1356 if comment.DeletedAt == nil { 1357 t.Error("Expected deleted_at to be set, got nil") 1358 } 1359 1360 // Step 3: Recreate comment with same rkey (resurrection) 1361 // In atProto, this is a valid operation - user can reuse the rkey 1362 recreateEvent := &jetstream.JetstreamEvent{ 1363 Did: testUser.DID, 1364 Kind: "commit", 1365 Commit: &jetstream.CommitEvent{ 1366 Rev: "v3", 1367 Operation: "create", 1368 Collection: "social.coves.feed.comment", 1369 RKey: rkey, // Same rkey! 1370 CID: "bafyresurrected", 1371 Record: map[string]interface{}{ 1372 "$type": "social.coves.feed.comment", 1373 "content": "Resurrected comment with new content", 1374 "reply": map[string]interface{}{ 1375 "root": map[string]interface{}{ 1376 "uri": postURI, 1377 "cid": "bafypost", 1378 }, 1379 "parent": map[string]interface{}{ 1380 "uri": postURI, 1381 "cid": "bafypost", 1382 }, 1383 }, 1384 "createdAt": time.Now().Format(time.RFC3339), 1385 }, 1386 }, 1387 } 1388 1389 err = consumer.HandleEvent(ctx, recreateEvent) 1390 if err != nil { 1391 t.Fatalf("Failed to resurrect comment: %v", err) 1392 } 1393 1394 // Step 4: Verify comment is resurrected with new content 1395 comment, err = commentRepo.GetByURI(ctx, commentURI) 1396 if err != nil { 1397 t.Fatalf("Comment not found after resurrection: %v", err) 1398 } 1399 1400 if comment.DeletedAt != nil { 1401 t.Errorf("Expected deleted_at to be NULL after resurrection, got %v", comment.DeletedAt) 1402 } 1403 if comment.Content != "Resurrected comment with new content" { 1404 t.Errorf("Expected resurrected content, got '%s'", comment.Content) 1405 } 1406 if comment.CID != "bafyresurrected" { 1407 t.Errorf("Expected CID 'bafyresurrected', got '%s'", comment.CID) 1408 } 1409 1410 // Verify parent count was restored (post should have comment_count = 1) 1411 var postCommentCount int 1412 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", postURI).Scan(&postCommentCount) 1413 if err != nil { 1414 t.Fatalf("Failed to check post comment count: %v", err) 1415 } 1416 if postCommentCount != 1 { 1417 t.Errorf("Expected post comment_count to be 1 after resurrection, got %d", postCommentCount) 1418 } 1419 }) 1420 1421 t.Run("Recreate deleted comment with DIFFERENT parent", func(t *testing.T) { 1422 // Create two posts 1423 post1URI := createTestPost(t, db, testCommunity, testUser.DID, "Post 1", 0, time.Now()) 1424 post2URI := createTestPost(t, db, testCommunity, testUser.DID, "Post 2", 0, time.Now()) 1425 1426 rkey2 := generateTID() 1427 commentURI2 := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey2) 1428 1429 // Step 1: Create comment on Post 1 1430 createEvent := &jetstream.JetstreamEvent{ 1431 Did: testUser.DID, 1432 Kind: "commit", 1433 Commit: &jetstream.CommitEvent{ 1434 Rev: "v1", 1435 Operation: "create", 1436 Collection: "social.coves.feed.comment", 1437 RKey: rkey2, 1438 CID: "bafyv1", 1439 Record: map[string]interface{}{ 1440 "$type": "social.coves.feed.comment", 1441 "content": "Original on Post 1", 1442 "reply": map[string]interface{}{ 1443 "root": map[string]interface{}{ 1444 "uri": post1URI, 1445 "cid": "bafypost1", 1446 }, 1447 "parent": map[string]interface{}{ 1448 "uri": post1URI, 1449 "cid": "bafypost1", 1450 }, 1451 }, 1452 "createdAt": time.Now().Format(time.RFC3339), 1453 }, 1454 }, 1455 } 1456 1457 err := consumer.HandleEvent(ctx, createEvent) 1458 if err != nil { 1459 t.Fatalf("Failed to create comment on Post 1: %v", err) 1460 } 1461 1462 // Verify Post 1 has comment_count = 1 1463 var post1Count int 1464 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post1URI).Scan(&post1Count) 1465 if err != nil { 1466 t.Fatalf("Failed to check post 1 count: %v", err) 1467 } 1468 if post1Count != 1 { 1469 t.Errorf("Expected Post 1 comment_count = 1, got %d", post1Count) 1470 } 1471 1472 // Step 2: Delete comment 1473 deleteEvent := &jetstream.JetstreamEvent{ 1474 Did: testUser.DID, 1475 Kind: "commit", 1476 Commit: &jetstream.CommitEvent{ 1477 Rev: "v2", 1478 Operation: "delete", 1479 Collection: "social.coves.feed.comment", 1480 RKey: rkey2, 1481 }, 1482 } 1483 1484 err = consumer.HandleEvent(ctx, deleteEvent) 1485 if err != nil { 1486 t.Fatalf("Failed to delete comment: %v", err) 1487 } 1488 1489 // Verify Post 1 count decremented to 0 1490 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post1URI).Scan(&post1Count) 1491 if err != nil { 1492 t.Fatalf("Failed to check post 1 count after delete: %v", err) 1493 } 1494 if post1Count != 0 { 1495 t.Errorf("Expected Post 1 comment_count = 0 after delete, got %d", post1Count) 1496 } 1497 1498 // Step 3: Recreate comment with same rkey but on Post 2 (different parent!) 1499 recreateEvent := &jetstream.JetstreamEvent{ 1500 Did: testUser.DID, 1501 Kind: "commit", 1502 Commit: &jetstream.CommitEvent{ 1503 Rev: "v3", 1504 Operation: "create", 1505 Collection: "social.coves.feed.comment", 1506 RKey: rkey2, // Same rkey! 1507 CID: "bafyv3", 1508 Record: map[string]interface{}{ 1509 "$type": "social.coves.feed.comment", 1510 "content": "New comment on Post 2", 1511 "reply": map[string]interface{}{ 1512 "root": map[string]interface{}{ 1513 "uri": post2URI, // Different root! 1514 "cid": "bafypost2", 1515 }, 1516 "parent": map[string]interface{}{ 1517 "uri": post2URI, // Different parent! 1518 "cid": "bafypost2", 1519 }, 1520 }, 1521 "createdAt": time.Now().Format(time.RFC3339), 1522 }, 1523 }, 1524 } 1525 1526 err = consumer.HandleEvent(ctx, recreateEvent) 1527 if err != nil { 1528 t.Fatalf("Failed to resurrect comment on Post 2: %v", err) 1529 } 1530 1531 // Step 4: Verify threading references updated correctly 1532 comment, err := commentRepo.GetByURI(ctx, commentURI2) 1533 if err != nil { 1534 t.Fatalf("Failed to get resurrected comment: %v", err) 1535 } 1536 1537 // THIS IS THE CRITICAL TEST: Threading refs must point to Post 2, not Post 1 1538 if comment.ParentURI != post2URI { 1539 t.Errorf("Expected parent URI to be %s (Post 2), got %s (STALE!)", post2URI, comment.ParentURI) 1540 } 1541 if comment.RootURI != post2URI { 1542 t.Errorf("Expected root URI to be %s (Post 2), got %s (STALE!)", post2URI, comment.RootURI) 1543 } 1544 if comment.ParentCID != "bafypost2" { 1545 t.Errorf("Expected parent CID 'bafypost2', got '%s'", comment.ParentCID) 1546 } 1547 1548 // Verify counts are correct 1549 var post2Count int 1550 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post2URI).Scan(&post2Count) 1551 if err != nil { 1552 t.Fatalf("Failed to check post 2 count: %v", err) 1553 } 1554 if post2Count != 1 { 1555 t.Errorf("Expected Post 2 comment_count = 1, got %d", post2Count) 1556 } 1557 1558 // Verify Post 1 count still 0 (not incremented by resurrection on Post 2) 1559 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post1URI).Scan(&post1Count) 1560 if err != nil { 1561 t.Fatalf("Failed to check post 1 count after resurrection: %v", err) 1562 } 1563 if post1Count != 0 { 1564 t.Errorf("Expected Post 1 comment_count = 0 (unchanged), got %d", post1Count) 1565 } 1566 }) 1567} 1568 1569// TestCommentConsumer_ThreadingImmutability tests that UPDATE events cannot change threading refs 1570func TestCommentConsumer_ThreadingImmutability(t *testing.T) { 1571 db := setupTestDB(t) 1572 defer func() { 1573 if err := db.Close(); err != nil { 1574 t.Logf("Failed to close database: %v", err) 1575 } 1576 }() 1577 1578 ctx := context.Background() 1579 commentRepo := postgres.NewCommentRepository(db) 1580 consumer := jetstream.NewCommentEventConsumer(commentRepo, db) 1581 1582 testUser := createTestUser(t, db, "immutable.test", "did:plc:immutable123") 1583 testCommunity, err := createFeedTestCommunity(db, ctx, "immutable-community", "owner9.test") 1584 if err != nil { 1585 t.Fatalf("Failed to create test community: %v", err) 1586 } 1587 postURI1 := createTestPost(t, db, testCommunity, testUser.DID, "Post 1", 0, time.Now()) 1588 postURI2 := createTestPost(t, db, testCommunity, testUser.DID, "Post 2", 0, time.Now()) 1589 1590 rkey := generateTID() 1591 commentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey) 1592 1593 t.Run("Reject UPDATE that changes parent URI", func(t *testing.T) { 1594 // Create comment on Post 1 1595 createEvent := &jetstream.JetstreamEvent{ 1596 Did: testUser.DID, 1597 Kind: "commit", 1598 Commit: &jetstream.CommitEvent{ 1599 Rev: "v1", 1600 Operation: "create", 1601 Collection: "social.coves.feed.comment", 1602 RKey: rkey, 1603 CID: "bafycomment1", 1604 Record: map[string]interface{}{ 1605 "$type": "social.coves.feed.comment", 1606 "content": "Comment on Post 1", 1607 "reply": map[string]interface{}{ 1608 "root": map[string]interface{}{ 1609 "uri": postURI1, 1610 "cid": "bafypost1", 1611 }, 1612 "parent": map[string]interface{}{ 1613 "uri": postURI1, 1614 "cid": "bafypost1", 1615 }, 1616 }, 1617 "createdAt": time.Now().Format(time.RFC3339), 1618 }, 1619 }, 1620 } 1621 1622 err := consumer.HandleEvent(ctx, createEvent) 1623 if err != nil { 1624 t.Fatalf("Failed to create comment: %v", err) 1625 } 1626 1627 // Attempt to update comment to move it to Post 2 (should fail) 1628 updateEvent := &jetstream.JetstreamEvent{ 1629 Did: testUser.DID, 1630 Kind: "commit", 1631 Commit: &jetstream.CommitEvent{ 1632 Rev: "v2", 1633 Operation: "update", 1634 Collection: "social.coves.feed.comment", 1635 RKey: rkey, 1636 CID: "bafycomment2", 1637 Record: map[string]interface{}{ 1638 "$type": "social.coves.feed.comment", 1639 "content": "Trying to hijack this comment to Post 2", 1640 "reply": map[string]interface{}{ 1641 "root": map[string]interface{}{ 1642 "uri": postURI2, // Changed! 1643 "cid": "bafypost2", 1644 }, 1645 "parent": map[string]interface{}{ 1646 "uri": postURI2, // Changed! 1647 "cid": "bafypost2", 1648 }, 1649 }, 1650 "createdAt": time.Now().Format(time.RFC3339), 1651 }, 1652 }, 1653 } 1654 1655 err = consumer.HandleEvent(ctx, updateEvent) 1656 if err == nil { 1657 t.Error("Expected error when changing threading references, got nil") 1658 } 1659 if err != nil && !contains(err.Error(), "threading references cannot be changed") { 1660 t.Errorf("Expected 'threading references cannot be changed' error, got: %v", err) 1661 } 1662 1663 // Verify comment still points to Post 1 1664 comment, err := commentRepo.GetByURI(ctx, commentURI) 1665 if err != nil { 1666 t.Fatalf("Failed to get comment: %v", err) 1667 } 1668 if comment.ParentURI != postURI1 { 1669 t.Errorf("Expected parent URI to remain %s, got %s", postURI1, comment.ParentURI) 1670 } 1671 if comment.RootURI != postURI1 { 1672 t.Errorf("Expected root URI to remain %s, got %s", postURI1, comment.RootURI) 1673 } 1674 // Content should NOT have been updated since the operation was rejected 1675 if comment.Content != "Comment on Post 1" { 1676 t.Errorf("Expected original content, got '%s'", comment.Content) 1677 } 1678 }) 1679 1680 t.Run("Allow UPDATE that only changes content (threading unchanged)", func(t *testing.T) { 1681 rkey2 := generateTID() 1682 commentURI2 := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey2) 1683 1684 // Create comment 1685 createEvent := &jetstream.JetstreamEvent{ 1686 Did: testUser.DID, 1687 Kind: "commit", 1688 Commit: &jetstream.CommitEvent{ 1689 Rev: "v1", 1690 Operation: "create", 1691 Collection: "social.coves.feed.comment", 1692 RKey: rkey2, 1693 CID: "bafycomment3", 1694 Record: map[string]interface{}{ 1695 "$type": "social.coves.feed.comment", 1696 "content": "Original content", 1697 "reply": map[string]interface{}{ 1698 "root": map[string]interface{}{ 1699 "uri": postURI1, 1700 "cid": "bafypost1", 1701 }, 1702 "parent": map[string]interface{}{ 1703 "uri": postURI1, 1704 "cid": "bafypost1", 1705 }, 1706 }, 1707 "createdAt": time.Now().Format(time.RFC3339), 1708 }, 1709 }, 1710 } 1711 1712 err := consumer.HandleEvent(ctx, createEvent) 1713 if err != nil { 1714 t.Fatalf("Failed to create comment: %v", err) 1715 } 1716 1717 // Update content only (threading unchanged - should succeed) 1718 updateEvent := &jetstream.JetstreamEvent{ 1719 Did: testUser.DID, 1720 Kind: "commit", 1721 Commit: &jetstream.CommitEvent{ 1722 Rev: "v2", 1723 Operation: "update", 1724 Collection: "social.coves.feed.comment", 1725 RKey: rkey2, 1726 CID: "bafycomment4", 1727 Record: map[string]interface{}{ 1728 "$type": "social.coves.feed.comment", 1729 "content": "Updated content", 1730 "reply": map[string]interface{}{ 1731 "root": map[string]interface{}{ 1732 "uri": postURI1, // Same 1733 "cid": "bafypost1", 1734 }, 1735 "parent": map[string]interface{}{ 1736 "uri": postURI1, // Same 1737 "cid": "bafypost1", 1738 }, 1739 }, 1740 "createdAt": time.Now().Format(time.RFC3339), 1741 }, 1742 }, 1743 } 1744 1745 err = consumer.HandleEvent(ctx, updateEvent) 1746 if err != nil { 1747 t.Fatalf("Expected update to succeed when threading unchanged, got error: %v", err) 1748 } 1749 1750 // Verify content was updated 1751 comment, err := commentRepo.GetByURI(ctx, commentURI2) 1752 if err != nil { 1753 t.Fatalf("Failed to get comment: %v", err) 1754 } 1755 if comment.Content != "Updated content" { 1756 t.Errorf("Expected updated content, got '%s'", comment.Content) 1757 } 1758 // Threading should remain unchanged 1759 if comment.ParentURI != postURI1 { 1760 t.Errorf("Expected parent URI %s, got %s", postURI1, comment.ParentURI) 1761 } 1762 }) 1763}