A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "context"
5 "fmt"
6 "testing"
7 "time"
8
9 "Coves/internal/atproto/jetstream"
10 "Coves/internal/core/comments"
11 "Coves/internal/db/postgres"
12)
13
14func TestCommentConsumer_CreateComment(t *testing.T) {
15 db := setupTestDB(t)
16 defer func() {
17 if err := db.Close(); err != nil {
18 t.Logf("Failed to close database: %v", err)
19 }
20 }()
21
22 ctx := context.Background()
23 commentRepo := postgres.NewCommentRepository(db)
24 consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
25
26 // Setup test data
27 testUser := createTestUser(t, db, "commenter.test", "did:plc:commenter123")
28 testCommunity, err := createFeedTestCommunity(db, ctx, "testcommunity", "owner.test")
29 if err != nil {
30 t.Fatalf("Failed to create test community: %v", err)
31 }
32 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Test Post", 0, time.Now())
33
34 t.Run("Create comment on post", func(t *testing.T) {
35 rkey := generateTID()
36 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey)
37
38 // Simulate Jetstream comment create event
39 event := &jetstream.JetstreamEvent{
40 Did: testUser.DID,
41 Kind: "commit",
42 Commit: &jetstream.CommitEvent{
43 Rev: "test-rev",
44 Operation: "create",
45 Collection: "social.coves.feed.comment",
46 RKey: rkey,
47 CID: "bafytest123",
48 Record: map[string]interface{}{
49 "$type": "social.coves.feed.comment",
50 "content": "This is a test comment on a post!",
51 "reply": map[string]interface{}{
52 "root": map[string]interface{}{
53 "uri": testPostURI,
54 "cid": "bafypost",
55 },
56 "parent": map[string]interface{}{
57 "uri": testPostURI,
58 "cid": "bafypost",
59 },
60 },
61 "createdAt": time.Now().Format(time.RFC3339),
62 },
63 },
64 }
65
66 // Handle the event
67 err := consumer.HandleEvent(ctx, event)
68 if err != nil {
69 t.Fatalf("Failed to handle comment create event: %v", err)
70 }
71
72 // Verify comment was indexed
73 comment, err := commentRepo.GetByURI(ctx, uri)
74 if err != nil {
75 t.Fatalf("Failed to get indexed comment: %v", err)
76 }
77
78 if comment.URI != uri {
79 t.Errorf("Expected URI %s, got %s", uri, comment.URI)
80 }
81
82 if comment.CommenterDID != testUser.DID {
83 t.Errorf("Expected commenter %s, got %s", testUser.DID, comment.CommenterDID)
84 }
85
86 if comment.Content != "This is a test comment on a post!" {
87 t.Errorf("Expected content 'This is a test comment on a post!', got %s", comment.Content)
88 }
89
90 if comment.RootURI != testPostURI {
91 t.Errorf("Expected root URI %s, got %s", testPostURI, comment.RootURI)
92 }
93
94 if comment.ParentURI != testPostURI {
95 t.Errorf("Expected parent URI %s, got %s", testPostURI, comment.ParentURI)
96 }
97
98 // Verify post comment count was incremented
99 var commentCount int
100 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&commentCount)
101 if err != nil {
102 t.Fatalf("Failed to get post comment count: %v", err)
103 }
104
105 if commentCount != 1 {
106 t.Errorf("Expected post comment_count to be 1, got %d", commentCount)
107 }
108 })
109
110 t.Run("Idempotent create - duplicate event", func(t *testing.T) {
111 rkey := generateTID()
112
113 event := &jetstream.JetstreamEvent{
114 Did: testUser.DID,
115 Kind: "commit",
116 Commit: &jetstream.CommitEvent{
117 Rev: "test-rev",
118 Operation: "create",
119 Collection: "social.coves.feed.comment",
120 RKey: rkey,
121 CID: "bafytest456",
122 Record: map[string]interface{}{
123 "$type": "social.coves.feed.comment",
124 "content": "Idempotent test comment",
125 "reply": map[string]interface{}{
126 "root": map[string]interface{}{
127 "uri": testPostURI,
128 "cid": "bafypost",
129 },
130 "parent": map[string]interface{}{
131 "uri": testPostURI,
132 "cid": "bafypost",
133 },
134 },
135 "createdAt": time.Now().Format(time.RFC3339),
136 },
137 },
138 }
139
140 // First creation
141 err := consumer.HandleEvent(ctx, event)
142 if err != nil {
143 t.Fatalf("First creation failed: %v", err)
144 }
145
146 // Get initial comment count
147 var initialCount int
148 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&initialCount)
149 if err != nil {
150 t.Fatalf("Failed to get initial comment count: %v", err)
151 }
152
153 // Duplicate creation - should be idempotent
154 err = consumer.HandleEvent(ctx, event)
155 if err != nil {
156 t.Fatalf("Duplicate event should be handled gracefully: %v", err)
157 }
158
159 // Verify count wasn't incremented again
160 var finalCount int
161 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&finalCount)
162 if err != nil {
163 t.Fatalf("Failed to get final comment count: %v", err)
164 }
165
166 if finalCount != initialCount {
167 t.Errorf("Comment count should not increase on duplicate event. Initial: %d, Final: %d", initialCount, finalCount)
168 }
169 })
170}
171
172func TestCommentConsumer_Threading(t *testing.T) {
173 db := setupTestDB(t)
174 defer func() {
175 if err := db.Close(); err != nil {
176 t.Logf("Failed to close database: %v", err)
177 }
178 }()
179
180 ctx := context.Background()
181 commentRepo := postgres.NewCommentRepository(db)
182 consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
183
184 // Setup test data
185 testUser := createTestUser(t, db, "threader.test", "did:plc:threader123")
186 testCommunity, err := createFeedTestCommunity(db, ctx, "threadcommunity", "owner2.test")
187 if err != nil {
188 t.Fatalf("Failed to create test community: %v", err)
189 }
190 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Threading Test", 0, time.Now())
191
192 t.Run("Create nested comment replies", func(t *testing.T) {
193 // Create first-level comment on post
194 comment1Rkey := generateTID()
195 comment1URI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, comment1Rkey)
196
197 event1 := &jetstream.JetstreamEvent{
198 Did: testUser.DID,
199 Kind: "commit",
200 Commit: &jetstream.CommitEvent{
201 Operation: "create",
202 Collection: "social.coves.feed.comment",
203 RKey: comment1Rkey,
204 CID: "bafycomment1",
205 Record: map[string]interface{}{
206 "content": "First level comment",
207 "reply": map[string]interface{}{
208 "root": map[string]interface{}{
209 "uri": testPostURI,
210 "cid": "bafypost",
211 },
212 "parent": map[string]interface{}{
213 "uri": testPostURI,
214 "cid": "bafypost",
215 },
216 },
217 "createdAt": time.Now().Format(time.RFC3339),
218 },
219 },
220 }
221
222 err := consumer.HandleEvent(ctx, event1)
223 if err != nil {
224 t.Fatalf("Failed to create first-level comment: %v", err)
225 }
226
227 // Create second-level comment (reply to first comment)
228 comment2Rkey := generateTID()
229 comment2URI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, comment2Rkey)
230
231 event2 := &jetstream.JetstreamEvent{
232 Did: testUser.DID,
233 Kind: "commit",
234 Commit: &jetstream.CommitEvent{
235 Operation: "create",
236 Collection: "social.coves.feed.comment",
237 RKey: comment2Rkey,
238 CID: "bafycomment2",
239 Record: map[string]interface{}{
240 "content": "Second level comment (reply to first)",
241 "reply": map[string]interface{}{
242 "root": map[string]interface{}{
243 "uri": testPostURI,
244 "cid": "bafypost",
245 },
246 "parent": map[string]interface{}{
247 "uri": comment1URI,
248 "cid": "bafycomment1",
249 },
250 },
251 "createdAt": time.Now().Add(1 * time.Second).Format(time.RFC3339),
252 },
253 },
254 }
255
256 err = consumer.HandleEvent(ctx, event2)
257 if err != nil {
258 t.Fatalf("Failed to create second-level comment: %v", err)
259 }
260
261 // Verify threading structure
262 comment1, err := commentRepo.GetByURI(ctx, comment1URI)
263 if err != nil {
264 t.Fatalf("Failed to get first comment: %v", err)
265 }
266
267 comment2, err := commentRepo.GetByURI(ctx, comment2URI)
268 if err != nil {
269 t.Fatalf("Failed to get second comment: %v", err)
270 }
271
272 // Both should have same root (original post)
273 if comment1.RootURI != testPostURI {
274 t.Errorf("Comment1 root should be post URI, got %s", comment1.RootURI)
275 }
276
277 if comment2.RootURI != testPostURI {
278 t.Errorf("Comment2 root should be post URI, got %s", comment2.RootURI)
279 }
280
281 // Comment1 parent should be post
282 if comment1.ParentURI != testPostURI {
283 t.Errorf("Comment1 parent should be post URI, got %s", comment1.ParentURI)
284 }
285
286 // Comment2 parent should be comment1
287 if comment2.ParentURI != comment1URI {
288 t.Errorf("Comment2 parent should be comment1 URI, got %s", comment2.ParentURI)
289 }
290
291 // Verify reply count on comment1
292 if comment1.ReplyCount != 1 {
293 t.Errorf("Comment1 should have 1 reply, got %d", comment1.ReplyCount)
294 }
295
296 // Query all comments by root
297 allComments, err := commentRepo.ListByRoot(ctx, testPostURI, 100, 0)
298 if err != nil {
299 t.Fatalf("Failed to list comments by root: %v", err)
300 }
301
302 if len(allComments) != 2 {
303 t.Errorf("Expected 2 comments in thread, got %d", len(allComments))
304 }
305
306 // Query direct replies to post
307 directReplies, err := commentRepo.ListByParent(ctx, testPostURI, 100, 0)
308 if err != nil {
309 t.Fatalf("Failed to list direct replies to post: %v", err)
310 }
311
312 if len(directReplies) != 1 {
313 t.Errorf("Expected 1 direct reply to post, got %d", len(directReplies))
314 }
315
316 // Query replies to comment1
317 comment1Replies, err := commentRepo.ListByParent(ctx, comment1URI, 100, 0)
318 if err != nil {
319 t.Fatalf("Failed to list replies to comment1: %v", err)
320 }
321
322 if len(comment1Replies) != 1 {
323 t.Errorf("Expected 1 reply to comment1, got %d", len(comment1Replies))
324 }
325 })
326}
327
328func TestCommentConsumer_UpdateComment(t *testing.T) {
329 db := setupTestDB(t)
330 defer func() {
331 if err := db.Close(); err != nil {
332 t.Logf("Failed to close database: %v", err)
333 }
334 }()
335
336 ctx := context.Background()
337 commentRepo := postgres.NewCommentRepository(db)
338 consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
339
340 // Setup test data
341 testUser := createTestUser(t, db, "editor.test", "did:plc:editor123")
342 testCommunity, err := createFeedTestCommunity(db, ctx, "editcommunity", "owner3.test")
343 if err != nil {
344 t.Fatalf("Failed to create test community: %v", err)
345 }
346 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Edit Test", 0, time.Now())
347
348 t.Run("Update comment content preserves vote counts", func(t *testing.T) {
349 rkey := generateTID()
350 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey)
351
352 // Create initial comment
353 createEvent := &jetstream.JetstreamEvent{
354 Did: testUser.DID,
355 Kind: "commit",
356 Commit: &jetstream.CommitEvent{
357 Operation: "create",
358 Collection: "social.coves.feed.comment",
359 RKey: rkey,
360 CID: "bafyoriginal",
361 Record: map[string]interface{}{
362 "content": "Original comment content",
363 "reply": map[string]interface{}{
364 "root": map[string]interface{}{
365 "uri": testPostURI,
366 "cid": "bafypost",
367 },
368 "parent": map[string]interface{}{
369 "uri": testPostURI,
370 "cid": "bafypost",
371 },
372 },
373 "createdAt": time.Now().Format(time.RFC3339),
374 },
375 },
376 }
377
378 err := consumer.HandleEvent(ctx, createEvent)
379 if err != nil {
380 t.Fatalf("Failed to create comment: %v", err)
381 }
382
383 // Manually set vote counts to simulate votes
384 _, err = db.ExecContext(ctx, `
385 UPDATE comments
386 SET upvote_count = 5, downvote_count = 2, score = 3
387 WHERE uri = $1
388 `, uri)
389 if err != nil {
390 t.Fatalf("Failed to set vote counts: %v", err)
391 }
392
393 // Update the comment
394 updateEvent := &jetstream.JetstreamEvent{
395 Did: testUser.DID,
396 Kind: "commit",
397 Commit: &jetstream.CommitEvent{
398 Operation: "update",
399 Collection: "social.coves.feed.comment",
400 RKey: rkey,
401 CID: "bafyupdated",
402 Record: map[string]interface{}{
403 "content": "EDITED: Updated comment content",
404 "reply": map[string]interface{}{
405 "root": map[string]interface{}{
406 "uri": testPostURI,
407 "cid": "bafypost",
408 },
409 "parent": map[string]interface{}{
410 "uri": testPostURI,
411 "cid": "bafypost",
412 },
413 },
414 "createdAt": time.Now().Format(time.RFC3339),
415 },
416 },
417 }
418
419 err = consumer.HandleEvent(ctx, updateEvent)
420 if err != nil {
421 t.Fatalf("Failed to update comment: %v", err)
422 }
423
424 // Verify content updated
425 comment, err := commentRepo.GetByURI(ctx, uri)
426 if err != nil {
427 t.Fatalf("Failed to get updated comment: %v", err)
428 }
429
430 if comment.Content != "EDITED: Updated comment content" {
431 t.Errorf("Expected updated content, got %s", comment.Content)
432 }
433
434 // Verify CID updated
435 if comment.CID != "bafyupdated" {
436 t.Errorf("Expected CID to be updated to bafyupdated, got %s", comment.CID)
437 }
438
439 // Verify vote counts preserved
440 if comment.UpvoteCount != 5 {
441 t.Errorf("Expected upvote_count preserved at 5, got %d", comment.UpvoteCount)
442 }
443
444 if comment.DownvoteCount != 2 {
445 t.Errorf("Expected downvote_count preserved at 2, got %d", comment.DownvoteCount)
446 }
447
448 if comment.Score != 3 {
449 t.Errorf("Expected score preserved at 3, got %d", comment.Score)
450 }
451 })
452}
453
454func TestCommentConsumer_DeleteComment(t *testing.T) {
455 db := setupTestDB(t)
456 defer func() {
457 if err := db.Close(); err != nil {
458 t.Logf("Failed to close database: %v", err)
459 }
460 }()
461
462 ctx := context.Background()
463 commentRepo := postgres.NewCommentRepository(db)
464 consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
465
466 // Setup test data
467 testUser := createTestUser(t, db, "deleter.test", "did:plc:deleter123")
468 testCommunity, err := createFeedTestCommunity(db, ctx, "deletecommunity", "owner4.test")
469 if err != nil {
470 t.Fatalf("Failed to create test community: %v", err)
471 }
472 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Delete Test", 0, time.Now())
473
474 t.Run("Delete comment decrements parent count", func(t *testing.T) {
475 rkey := generateTID()
476 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey)
477
478 // Create comment
479 createEvent := &jetstream.JetstreamEvent{
480 Did: testUser.DID,
481 Kind: "commit",
482 Commit: &jetstream.CommitEvent{
483 Operation: "create",
484 Collection: "social.coves.feed.comment",
485 RKey: rkey,
486 CID: "bafydelete",
487 Record: map[string]interface{}{
488 "content": "Comment to be deleted",
489 "reply": map[string]interface{}{
490 "root": map[string]interface{}{
491 "uri": testPostURI,
492 "cid": "bafypost",
493 },
494 "parent": map[string]interface{}{
495 "uri": testPostURI,
496 "cid": "bafypost",
497 },
498 },
499 "createdAt": time.Now().Format(time.RFC3339),
500 },
501 },
502 }
503
504 err := consumer.HandleEvent(ctx, createEvent)
505 if err != nil {
506 t.Fatalf("Failed to create comment: %v", err)
507 }
508
509 // Get initial post comment count
510 var initialCount int
511 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&initialCount)
512 if err != nil {
513 t.Fatalf("Failed to get initial comment count: %v", err)
514 }
515
516 // Delete comment
517 deleteEvent := &jetstream.JetstreamEvent{
518 Did: testUser.DID,
519 Kind: "commit",
520 Commit: &jetstream.CommitEvent{
521 Operation: "delete",
522 Collection: "social.coves.feed.comment",
523 RKey: rkey,
524 },
525 }
526
527 err = consumer.HandleEvent(ctx, deleteEvent)
528 if err != nil {
529 t.Fatalf("Failed to delete comment: %v", err)
530 }
531
532 // Verify soft delete
533 comment, err := commentRepo.GetByURI(ctx, uri)
534 if err != nil {
535 t.Fatalf("Failed to get deleted comment: %v", err)
536 }
537
538 if comment.DeletedAt == nil {
539 t.Error("Expected deleted_at to be set, got nil")
540 }
541
542 // Verify post comment count decremented
543 var finalCount int
544 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&finalCount)
545 if err != nil {
546 t.Fatalf("Failed to get final comment count: %v", err)
547 }
548
549 if finalCount != initialCount-1 {
550 t.Errorf("Expected comment count to decrease by 1. Initial: %d, Final: %d", initialCount, finalCount)
551 }
552 })
553
554 t.Run("Delete is idempotent", func(t *testing.T) {
555 rkey := generateTID()
556
557 // Create comment
558 createEvent := &jetstream.JetstreamEvent{
559 Did: testUser.DID,
560 Kind: "commit",
561 Commit: &jetstream.CommitEvent{
562 Operation: "create",
563 Collection: "social.coves.feed.comment",
564 RKey: rkey,
565 CID: "bafyidempdelete",
566 Record: map[string]interface{}{
567 "content": "Idempotent delete test",
568 "reply": map[string]interface{}{
569 "root": map[string]interface{}{
570 "uri": testPostURI,
571 "cid": "bafypost",
572 },
573 "parent": map[string]interface{}{
574 "uri": testPostURI,
575 "cid": "bafypost",
576 },
577 },
578 "createdAt": time.Now().Format(time.RFC3339),
579 },
580 },
581 }
582
583 err := consumer.HandleEvent(ctx, createEvent)
584 if err != nil {
585 t.Fatalf("Failed to create comment: %v", err)
586 }
587
588 // First delete
589 deleteEvent := &jetstream.JetstreamEvent{
590 Did: testUser.DID,
591 Kind: "commit",
592 Commit: &jetstream.CommitEvent{
593 Operation: "delete",
594 Collection: "social.coves.feed.comment",
595 RKey: rkey,
596 },
597 }
598
599 err = consumer.HandleEvent(ctx, deleteEvent)
600 if err != nil {
601 t.Fatalf("First delete failed: %v", err)
602 }
603
604 // Get count after first delete
605 var countAfterFirstDelete int
606 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&countAfterFirstDelete)
607 if err != nil {
608 t.Fatalf("Failed to get count after first delete: %v", err)
609 }
610
611 // Second delete (idempotent)
612 err = consumer.HandleEvent(ctx, deleteEvent)
613 if err != nil {
614 t.Fatalf("Second delete should be idempotent: %v", err)
615 }
616
617 // Verify count didn't change
618 var countAfterSecondDelete int
619 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&countAfterSecondDelete)
620 if err != nil {
621 t.Fatalf("Failed to get count after second delete: %v", err)
622 }
623
624 if countAfterSecondDelete != countAfterFirstDelete {
625 t.Errorf("Count should not change on duplicate delete. After first: %d, After second: %d", countAfterFirstDelete, countAfterSecondDelete)
626 }
627 })
628}
629
630func TestCommentConsumer_SecurityValidation(t *testing.T) {
631 db := setupTestDB(t)
632 defer func() {
633 if err := db.Close(); err != nil {
634 t.Logf("Failed to close database: %v", err)
635 }
636 }()
637
638 ctx := context.Background()
639 commentRepo := postgres.NewCommentRepository(db)
640 consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
641
642 testUser := createTestUser(t, db, "security.test", "did:plc:security123")
643 testCommunity, err := createFeedTestCommunity(db, ctx, "seccommunity", "owner5.test")
644 if err != nil {
645 t.Fatalf("Failed to create test community: %v", err)
646 }
647 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Security Test", 0, time.Now())
648
649 t.Run("Reject comment with empty content", func(t *testing.T) {
650 event := &jetstream.JetstreamEvent{
651 Did: testUser.DID,
652 Kind: "commit",
653 Commit: &jetstream.CommitEvent{
654 Operation: "create",
655 Collection: "social.coves.feed.comment",
656 RKey: generateTID(),
657 CID: "bafyinvalid",
658 Record: map[string]interface{}{
659 "content": "", // Empty content
660 "reply": map[string]interface{}{
661 "root": map[string]interface{}{
662 "uri": testPostURI,
663 "cid": "bafypost",
664 },
665 "parent": map[string]interface{}{
666 "uri": testPostURI,
667 "cid": "bafypost",
668 },
669 },
670 "createdAt": time.Now().Format(time.RFC3339),
671 },
672 },
673 }
674
675 err := consumer.HandleEvent(ctx, event)
676 if err == nil {
677 t.Error("Expected error for empty content, got nil")
678 }
679 })
680
681 t.Run("Reject comment with invalid root reference", func(t *testing.T) {
682 event := &jetstream.JetstreamEvent{
683 Did: testUser.DID,
684 Kind: "commit",
685 Commit: &jetstream.CommitEvent{
686 Operation: "create",
687 Collection: "social.coves.feed.comment",
688 RKey: generateTID(),
689 CID: "bafyinvalid2",
690 Record: map[string]interface{}{
691 "content": "Valid content",
692 "reply": map[string]interface{}{
693 "root": map[string]interface{}{
694 "uri": "", // Missing URI
695 "cid": "bafypost",
696 },
697 "parent": map[string]interface{}{
698 "uri": testPostURI,
699 "cid": "bafypost",
700 },
701 },
702 "createdAt": time.Now().Format(time.RFC3339),
703 },
704 },
705 }
706
707 err := consumer.HandleEvent(ctx, event)
708 if err == nil {
709 t.Error("Expected error for invalid root reference, got nil")
710 }
711 })
712
713 t.Run("Reject comment with invalid parent reference", func(t *testing.T) {
714 event := &jetstream.JetstreamEvent{
715 Did: testUser.DID,
716 Kind: "commit",
717 Commit: &jetstream.CommitEvent{
718 Operation: "create",
719 Collection: "social.coves.feed.comment",
720 RKey: generateTID(),
721 CID: "bafyinvalid3",
722 Record: map[string]interface{}{
723 "content": "Valid content",
724 "reply": map[string]interface{}{
725 "root": map[string]interface{}{
726 "uri": testPostURI,
727 "cid": "bafypost",
728 },
729 "parent": map[string]interface{}{
730 "uri": testPostURI,
731 "cid": "", // Missing CID
732 },
733 },
734 "createdAt": time.Now().Format(time.RFC3339),
735 },
736 },
737 }
738
739 err := consumer.HandleEvent(ctx, event)
740 if err == nil {
741 t.Error("Expected error for invalid parent reference, got nil")
742 }
743 })
744
745 t.Run("Reject comment with invalid DID format", func(t *testing.T) {
746 event := &jetstream.JetstreamEvent{
747 Did: "invalid-did-format", // Bad DID
748 Kind: "commit",
749 Commit: &jetstream.CommitEvent{
750 Operation: "create",
751 Collection: "social.coves.feed.comment",
752 RKey: generateTID(),
753 CID: "bafyinvalid4",
754 Record: map[string]interface{}{
755 "content": "Valid content",
756 "reply": map[string]interface{}{
757 "root": map[string]interface{}{
758 "uri": testPostURI,
759 "cid": "bafypost",
760 },
761 "parent": map[string]interface{}{
762 "uri": testPostURI,
763 "cid": "bafypost",
764 },
765 },
766 "createdAt": time.Now().Format(time.RFC3339),
767 },
768 },
769 }
770
771 err := consumer.HandleEvent(ctx, event)
772 if err == nil {
773 t.Error("Expected error for invalid DID format, got nil")
774 }
775 })
776
777 t.Run("Reject comment exceeding max content length", func(t *testing.T) {
778 event := &jetstream.JetstreamEvent{
779 Did: testUser.DID,
780 Kind: "commit",
781 Commit: &jetstream.CommitEvent{
782 Operation: "create",
783 Collection: "social.coves.feed.comment",
784 RKey: generateTID(),
785 CID: "bafytoobig",
786 Record: map[string]interface{}{
787 "content": string(make([]byte, 30001)), // Exceeds 30000 byte limit
788 "reply": map[string]interface{}{
789 "root": map[string]interface{}{
790 "uri": testPostURI,
791 "cid": "bafypost",
792 },
793 "parent": map[string]interface{}{
794 "uri": testPostURI,
795 "cid": "bafypost",
796 },
797 },
798 "createdAt": time.Now().Format(time.RFC3339),
799 },
800 },
801 }
802
803 err := consumer.HandleEvent(ctx, event)
804 if err == nil {
805 t.Error("Expected error for oversized content, got nil")
806 }
807 if err != nil && !contains(err.Error(), "exceeds maximum length") {
808 t.Errorf("Expected 'exceeds maximum length' error, got: %v", err)
809 }
810 })
811
812 t.Run("Reject comment with malformed parent URI", func(t *testing.T) {
813 event := &jetstream.JetstreamEvent{
814 Did: testUser.DID,
815 Kind: "commit",
816 Commit: &jetstream.CommitEvent{
817 Operation: "create",
818 Collection: "social.coves.feed.comment",
819 RKey: generateTID(),
820 CID: "bafymalformed",
821 Record: map[string]interface{}{
822 "content": "Valid content",
823 "reply": map[string]interface{}{
824 "root": map[string]interface{}{
825 "uri": testPostURI,
826 "cid": "bafypost",
827 },
828 "parent": map[string]interface{}{
829 "uri": "at://malformed", // Invalid: missing collection/rkey
830 "cid": "bafyparent",
831 },
832 },
833 "createdAt": time.Now().Format(time.RFC3339),
834 },
835 },
836 }
837
838 err := consumer.HandleEvent(ctx, event)
839 if err == nil {
840 t.Error("Expected error for malformed AT-URI, got nil")
841 }
842 if err != nil && !contains(err.Error(), "invalid parent URI") {
843 t.Errorf("Expected 'invalid parent URI' error, got: %v", err)
844 }
845 })
846
847 t.Run("Reject comment with malformed root URI", func(t *testing.T) {
848 event := &jetstream.JetstreamEvent{
849 Did: testUser.DID,
850 Kind: "commit",
851 Commit: &jetstream.CommitEvent{
852 Operation: "create",
853 Collection: "social.coves.feed.comment",
854 RKey: generateTID(),
855 CID: "bafymalformed2",
856 Record: map[string]interface{}{
857 "content": "Valid content",
858 "reply": map[string]interface{}{
859 "root": map[string]interface{}{
860 "uri": "at://did:plc:test123", // Invalid: missing collection/rkey
861 "cid": "bafyroot",
862 },
863 "parent": map[string]interface{}{
864 "uri": testPostURI,
865 "cid": "bafyparent",
866 },
867 },
868 "createdAt": time.Now().Format(time.RFC3339),
869 },
870 },
871 }
872
873 err := consumer.HandleEvent(ctx, event)
874 if err == nil {
875 t.Error("Expected error for malformed AT-URI, got nil")
876 }
877 if err != nil && !contains(err.Error(), "invalid root URI") {
878 t.Errorf("Expected 'invalid root URI' error, got: %v", err)
879 }
880 })
881}
882
883func TestCommentRepository_Queries(t *testing.T) {
884 db := setupTestDB(t)
885 defer func() {
886 if err := db.Close(); err != nil {
887 t.Logf("Failed to close database: %v", err)
888 }
889 }()
890
891 ctx := context.Background()
892 commentRepo := postgres.NewCommentRepository(db)
893
894 // Clean up any existing test data from previous runs
895 _, err := db.ExecContext(ctx, "DELETE FROM comments WHERE commenter_did LIKE 'did:plc:%'")
896 if err != nil {
897 t.Fatalf("Failed to clean up test comments: %v", err)
898 }
899
900 testUser := createTestUser(t, db, "query.test", "did:plc:query123")
901 testCommunity, err := createFeedTestCommunity(db, ctx, "querycommunity", "owner6.test")
902 if err != nil {
903 t.Fatalf("Failed to create test community: %v", err)
904 }
905 postURI := createTestPost(t, db, testCommunity, testUser.DID, "Query Test", 0, time.Now())
906
907 // Create a comment tree
908 // Post
909 // |- Comment 1
910 // |- Comment 2
911 // |- Comment 3
912 // |- Comment 4
913
914 comment1 := &comments.Comment{
915 URI: fmt.Sprintf("at://%s/social.coves.feed.comment/1", testUser.DID),
916 CID: "bafyc1",
917 RKey: "1",
918 CommenterDID: testUser.DID,
919 RootURI: postURI,
920 RootCID: "bafypost",
921 ParentURI: postURI,
922 ParentCID: "bafypost",
923 Content: "Comment 1",
924 Langs: []string{},
925 CreatedAt: time.Now(),
926 }
927
928 comment2 := &comments.Comment{
929 URI: fmt.Sprintf("at://%s/social.coves.feed.comment/2", testUser.DID),
930 CID: "bafyc2",
931 RKey: "2",
932 CommenterDID: testUser.DID,
933 RootURI: postURI,
934 RootCID: "bafypost",
935 ParentURI: comment1.URI,
936 ParentCID: "bafyc1",
937 Content: "Comment 2 (reply to 1)",
938 Langs: []string{},
939 CreatedAt: time.Now().Add(1 * time.Second),
940 }
941
942 comment3 := &comments.Comment{
943 URI: fmt.Sprintf("at://%s/social.coves.feed.comment/3", testUser.DID),
944 CID: "bafyc3",
945 RKey: "3",
946 CommenterDID: testUser.DID,
947 RootURI: postURI,
948 RootCID: "bafypost",
949 ParentURI: comment1.URI,
950 ParentCID: "bafyc1",
951 Content: "Comment 3 (reply to 1)",
952 Langs: []string{},
953 CreatedAt: time.Now().Add(2 * time.Second),
954 }
955
956 comment4 := &comments.Comment{
957 URI: fmt.Sprintf("at://%s/social.coves.feed.comment/4", testUser.DID),
958 CID: "bafyc4",
959 RKey: "4",
960 CommenterDID: testUser.DID,
961 RootURI: postURI,
962 RootCID: "bafypost",
963 ParentURI: postURI,
964 ParentCID: "bafypost",
965 Content: "Comment 4",
966 Langs: []string{},
967 CreatedAt: time.Now().Add(3 * time.Second),
968 }
969
970 // Create all comments
971 for i, c := range []*comments.Comment{comment1, comment2, comment3, comment4} {
972 if err := commentRepo.Create(ctx, c); err != nil {
973 t.Fatalf("Failed to create comment %d: %v", i+1, err)
974 }
975 t.Logf("Created comment %d: %s", i+1, c.URI)
976 }
977
978 // Verify comments were created
979 verifyCount, err := commentRepo.CountByParent(ctx, postURI)
980 if err != nil {
981 t.Fatalf("Failed to count comments: %v", err)
982 }
983 t.Logf("Direct replies to post after creation: %d", verifyCount)
984
985 t.Run("ListByRoot returns all comments in thread", func(t *testing.T) {
986 comments, err := commentRepo.ListByRoot(ctx, postURI, 100, 0)
987 if err != nil {
988 t.Fatalf("Failed to list by root: %v", err)
989 }
990
991 if len(comments) != 4 {
992 t.Errorf("Expected 4 comments, got %d", len(comments))
993 }
994 })
995
996 t.Run("ListByParent returns direct replies", func(t *testing.T) {
997 // Direct replies to post
998 postReplies, err := commentRepo.ListByParent(ctx, postURI, 100, 0)
999 if err != nil {
1000 t.Fatalf("Failed to list post replies: %v", err)
1001 }
1002
1003 if len(postReplies) != 2 {
1004 t.Errorf("Expected 2 direct replies to post, got %d", len(postReplies))
1005 }
1006
1007 // Direct replies to comment1
1008 comment1Replies, err := commentRepo.ListByParent(ctx, comment1.URI, 100, 0)
1009 if err != nil {
1010 t.Fatalf("Failed to list comment1 replies: %v", err)
1011 }
1012
1013 if len(comment1Replies) != 2 {
1014 t.Errorf("Expected 2 direct replies to comment1, got %d", len(comment1Replies))
1015 }
1016 })
1017
1018 t.Run("CountByParent returns correct counts", func(t *testing.T) {
1019 postCount, err := commentRepo.CountByParent(ctx, postURI)
1020 if err != nil {
1021 t.Fatalf("Failed to count post replies: %v", err)
1022 }
1023
1024 if postCount != 2 {
1025 t.Errorf("Expected 2 direct replies to post, got %d", postCount)
1026 }
1027
1028 comment1Count, err := commentRepo.CountByParent(ctx, comment1.URI)
1029 if err != nil {
1030 t.Fatalf("Failed to count comment1 replies: %v", err)
1031 }
1032
1033 if comment1Count != 2 {
1034 t.Errorf("Expected 2 direct replies to comment1, got %d", comment1Count)
1035 }
1036 })
1037
1038 t.Run("ListByCommenter returns user's comments", func(t *testing.T) {
1039 userComments, err := commentRepo.ListByCommenter(ctx, testUser.DID, 100, 0)
1040 if err != nil {
1041 t.Fatalf("Failed to list by commenter: %v", err)
1042 }
1043
1044 if len(userComments) != 4 {
1045 t.Errorf("Expected 4 comments by user, got %d", len(userComments))
1046 }
1047 })
1048}
1049
1050// TestCommentConsumer_OutOfOrderReconciliation tests that parent counts are
1051// correctly reconciled when child comments arrive before their parent
1052func TestCommentConsumer_OutOfOrderReconciliation(t *testing.T) {
1053 db := setupTestDB(t)
1054 defer func() {
1055 if err := db.Close(); err != nil {
1056 t.Logf("Failed to close database: %v", err)
1057 }
1058 }()
1059
1060 ctx := context.Background()
1061 commentRepo := postgres.NewCommentRepository(db)
1062 consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
1063
1064 testUser := createTestUser(t, db, "outoforder.test", "did:plc:outoforder123")
1065 testCommunity, err := createFeedTestCommunity(db, ctx, "ooo-community", "owner7.test")
1066 if err != nil {
1067 t.Fatalf("Failed to create test community: %v", err)
1068 }
1069 postURI := createTestPost(t, db, testCommunity, testUser.DID, "OOO Test Post", 0, time.Now())
1070
1071 t.Run("Child arrives before parent - count reconciled", func(t *testing.T) {
1072 // Scenario: User A creates comment C1 on post
1073 // User B creates reply C2 to C1
1074 // Jetstream delivers C2 before C1 (different repos)
1075 // When C1 finally arrives, its reply_count should be 1, not 0
1076
1077 parentRkey := generateTID()
1078 parentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, parentRkey)
1079
1080 childRkey := generateTID()
1081 childURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, childRkey)
1082
1083 // Step 1: Index child FIRST (before parent exists)
1084 childEvent := &jetstream.JetstreamEvent{
1085 Did: testUser.DID,
1086 Kind: "commit",
1087 Commit: &jetstream.CommitEvent{
1088 Rev: "child-rev",
1089 Operation: "create",
1090 Collection: "social.coves.feed.comment",
1091 RKey: childRkey,
1092 CID: "bafychild",
1093 Record: map[string]interface{}{
1094 "$type": "social.coves.feed.comment",
1095 "content": "This is a reply to a comment that doesn't exist yet!",
1096 "reply": map[string]interface{}{
1097 "root": map[string]interface{}{
1098 "uri": postURI,
1099 "cid": "bafypost",
1100 },
1101 "parent": map[string]interface{}{
1102 "uri": parentURI, // Points to parent that doesn't exist yet
1103 "cid": "bafyparent",
1104 },
1105 },
1106 "createdAt": time.Now().Format(time.RFC3339),
1107 },
1108 },
1109 }
1110
1111 err := consumer.HandleEvent(ctx, childEvent)
1112 if err != nil {
1113 t.Fatalf("Failed to handle child event: %v", err)
1114 }
1115
1116 // Verify child was indexed
1117 childComment, err := commentRepo.GetByURI(ctx, childURI)
1118 if err != nil {
1119 t.Fatalf("Child comment not indexed: %v", err)
1120 }
1121 if childComment.ParentURI != parentURI {
1122 t.Errorf("Expected child parent_uri %s, got %s", parentURI, childComment.ParentURI)
1123 }
1124
1125 // Step 2: Now index parent (arrives late due to Jetstream ordering)
1126 parentEvent := &jetstream.JetstreamEvent{
1127 Did: testUser.DID,
1128 Kind: "commit",
1129 Commit: &jetstream.CommitEvent{
1130 Rev: "parent-rev",
1131 Operation: "create",
1132 Collection: "social.coves.feed.comment",
1133 RKey: parentRkey,
1134 CID: "bafyparent",
1135 Record: map[string]interface{}{
1136 "$type": "social.coves.feed.comment",
1137 "content": "This is the parent comment arriving late",
1138 "reply": map[string]interface{}{
1139 "root": map[string]interface{}{
1140 "uri": postURI,
1141 "cid": "bafypost",
1142 },
1143 "parent": map[string]interface{}{
1144 "uri": postURI,
1145 "cid": "bafypost",
1146 },
1147 },
1148 "createdAt": time.Now().Format(time.RFC3339),
1149 },
1150 },
1151 }
1152
1153 err = consumer.HandleEvent(ctx, parentEvent)
1154 if err != nil {
1155 t.Fatalf("Failed to handle parent event: %v", err)
1156 }
1157
1158 // Step 3: Verify parent was indexed with CORRECT reply_count
1159 parentComment, err := commentRepo.GetByURI(ctx, parentURI)
1160 if err != nil {
1161 t.Fatalf("Parent comment not indexed: %v", err)
1162 }
1163
1164 // THIS IS THE KEY TEST: Parent should have reply_count = 1 due to reconciliation
1165 if parentComment.ReplyCount != 1 {
1166 t.Errorf("Expected parent reply_count to be 1 (reconciled), got %d", parentComment.ReplyCount)
1167 t.Logf("This indicates out-of-order reconciliation failed!")
1168 }
1169
1170 // Verify via query as well
1171 count, err := commentRepo.CountByParent(ctx, parentURI)
1172 if err != nil {
1173 t.Fatalf("Failed to count parent replies: %v", err)
1174 }
1175 if count != 1 {
1176 t.Errorf("Expected 1 reply to parent, got %d", count)
1177 }
1178 })
1179
1180 t.Run("Multiple children arrive before parent", func(t *testing.T) {
1181 parentRkey := generateTID()
1182 parentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, parentRkey)
1183
1184 // Index 3 children before parent
1185 for i := 1; i <= 3; i++ {
1186 childRkey := generateTID()
1187 childEvent := &jetstream.JetstreamEvent{
1188 Did: testUser.DID,
1189 Kind: "commit",
1190 Commit: &jetstream.CommitEvent{
1191 Rev: fmt.Sprintf("child-%d-rev", i),
1192 Operation: "create",
1193 Collection: "social.coves.feed.comment",
1194 RKey: childRkey,
1195 CID: fmt.Sprintf("bafychild%d", i),
1196 Record: map[string]interface{}{
1197 "$type": "social.coves.feed.comment",
1198 "content": fmt.Sprintf("Reply %d before parent", i),
1199 "reply": map[string]interface{}{
1200 "root": map[string]interface{}{
1201 "uri": postURI,
1202 "cid": "bafypost",
1203 },
1204 "parent": map[string]interface{}{
1205 "uri": parentURI,
1206 "cid": "bafyparent2",
1207 },
1208 },
1209 "createdAt": time.Now().Format(time.RFC3339),
1210 },
1211 },
1212 }
1213
1214 err := consumer.HandleEvent(ctx, childEvent)
1215 if err != nil {
1216 t.Fatalf("Failed to handle child %d event: %v", i, err)
1217 }
1218 }
1219
1220 // Now index parent
1221 parentEvent := &jetstream.JetstreamEvent{
1222 Did: testUser.DID,
1223 Kind: "commit",
1224 Commit: &jetstream.CommitEvent{
1225 Rev: "parent2-rev",
1226 Operation: "create",
1227 Collection: "social.coves.feed.comment",
1228 RKey: parentRkey,
1229 CID: "bafyparent2",
1230 Record: map[string]interface{}{
1231 "$type": "social.coves.feed.comment",
1232 "content": "Parent with 3 pre-existing children",
1233 "reply": map[string]interface{}{
1234 "root": map[string]interface{}{
1235 "uri": postURI,
1236 "cid": "bafypost",
1237 },
1238 "parent": map[string]interface{}{
1239 "uri": postURI,
1240 "cid": "bafypost",
1241 },
1242 },
1243 "createdAt": time.Now().Format(time.RFC3339),
1244 },
1245 },
1246 }
1247
1248 err := consumer.HandleEvent(ctx, parentEvent)
1249 if err != nil {
1250 t.Fatalf("Failed to handle parent event: %v", err)
1251 }
1252
1253 // Verify parent has reply_count = 3
1254 parentComment, err := commentRepo.GetByURI(ctx, parentURI)
1255 if err != nil {
1256 t.Fatalf("Parent comment not indexed: %v", err)
1257 }
1258
1259 if parentComment.ReplyCount != 3 {
1260 t.Errorf("Expected parent reply_count to be 3 (reconciled), got %d", parentComment.ReplyCount)
1261 }
1262 })
1263}
1264
1265// TestCommentConsumer_Resurrection tests that soft-deleted comments can be recreated
1266// In atProto, deleted records' rkeys become available for reuse
1267func TestCommentConsumer_Resurrection(t *testing.T) {
1268 db := setupTestDB(t)
1269 defer func() {
1270 if err := db.Close(); err != nil {
1271 t.Logf("Failed to close database: %v", err)
1272 }
1273 }()
1274
1275 ctx := context.Background()
1276 commentRepo := postgres.NewCommentRepository(db)
1277 consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
1278
1279 testUser := createTestUser(t, db, "resurrect.test", "did:plc:resurrect123")
1280 testCommunity, err := createFeedTestCommunity(db, ctx, "resurrect-community", "owner8.test")
1281 if err != nil {
1282 t.Fatalf("Failed to create test community: %v", err)
1283 }
1284 postURI := createTestPost(t, db, testCommunity, testUser.DID, "Resurrection Test", 0, time.Now())
1285
1286 rkey := generateTID()
1287 commentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey)
1288
1289 t.Run("Recreate deleted comment with same rkey", func(t *testing.T) {
1290 // Step 1: Create initial comment
1291 createEvent := &jetstream.JetstreamEvent{
1292 Did: testUser.DID,
1293 Kind: "commit",
1294 Commit: &jetstream.CommitEvent{
1295 Rev: "v1",
1296 Operation: "create",
1297 Collection: "social.coves.feed.comment",
1298 RKey: rkey,
1299 CID: "bafyoriginal",
1300 Record: map[string]interface{}{
1301 "$type": "social.coves.feed.comment",
1302 "content": "Original comment content",
1303 "reply": map[string]interface{}{
1304 "root": map[string]interface{}{
1305 "uri": postURI,
1306 "cid": "bafypost",
1307 },
1308 "parent": map[string]interface{}{
1309 "uri": postURI,
1310 "cid": "bafypost",
1311 },
1312 },
1313 "createdAt": time.Now().Format(time.RFC3339),
1314 },
1315 },
1316 }
1317
1318 err := consumer.HandleEvent(ctx, createEvent)
1319 if err != nil {
1320 t.Fatalf("Failed to create initial comment: %v", err)
1321 }
1322
1323 // Verify comment exists
1324 comment, err := commentRepo.GetByURI(ctx, commentURI)
1325 if err != nil {
1326 t.Fatalf("Comment not found after creation: %v", err)
1327 }
1328 if comment.Content != "Original comment content" {
1329 t.Errorf("Expected content 'Original comment content', got '%s'", comment.Content)
1330 }
1331 if comment.DeletedAt != nil {
1332 t.Errorf("Expected deleted_at to be nil, got %v", comment.DeletedAt)
1333 }
1334
1335 // Step 2: Delete the comment
1336 deleteEvent := &jetstream.JetstreamEvent{
1337 Did: testUser.DID,
1338 Kind: "commit",
1339 Commit: &jetstream.CommitEvent{
1340 Rev: "v2",
1341 Operation: "delete",
1342 Collection: "social.coves.feed.comment",
1343 RKey: rkey,
1344 },
1345 }
1346
1347 err = consumer.HandleEvent(ctx, deleteEvent)
1348 if err != nil {
1349 t.Fatalf("Failed to delete comment: %v", err)
1350 }
1351
1352 // Verify comment is soft-deleted
1353 comment, err = commentRepo.GetByURI(ctx, commentURI)
1354 if err != nil {
1355 t.Fatalf("Comment not found after deletion: %v", err)
1356 }
1357 if comment.DeletedAt == nil {
1358 t.Error("Expected deleted_at to be set, got nil")
1359 }
1360
1361 // Step 3: Recreate comment with same rkey (resurrection)
1362 // In atProto, this is a valid operation - user can reuse the rkey
1363 recreateEvent := &jetstream.JetstreamEvent{
1364 Did: testUser.DID,
1365 Kind: "commit",
1366 Commit: &jetstream.CommitEvent{
1367 Rev: "v3",
1368 Operation: "create",
1369 Collection: "social.coves.feed.comment",
1370 RKey: rkey, // Same rkey!
1371 CID: "bafyresurrected",
1372 Record: map[string]interface{}{
1373 "$type": "social.coves.feed.comment",
1374 "content": "Resurrected comment with new content",
1375 "reply": map[string]interface{}{
1376 "root": map[string]interface{}{
1377 "uri": postURI,
1378 "cid": "bafypost",
1379 },
1380 "parent": map[string]interface{}{
1381 "uri": postURI,
1382 "cid": "bafypost",
1383 },
1384 },
1385 "createdAt": time.Now().Format(time.RFC3339),
1386 },
1387 },
1388 }
1389
1390 err = consumer.HandleEvent(ctx, recreateEvent)
1391 if err != nil {
1392 t.Fatalf("Failed to resurrect comment: %v", err)
1393 }
1394
1395 // Step 4: Verify comment is resurrected with new content
1396 comment, err = commentRepo.GetByURI(ctx, commentURI)
1397 if err != nil {
1398 t.Fatalf("Comment not found after resurrection: %v", err)
1399 }
1400
1401 if comment.DeletedAt != nil {
1402 t.Errorf("Expected deleted_at to be NULL after resurrection, got %v", comment.DeletedAt)
1403 }
1404 if comment.Content != "Resurrected comment with new content" {
1405 t.Errorf("Expected resurrected content, got '%s'", comment.Content)
1406 }
1407 if comment.CID != "bafyresurrected" {
1408 t.Errorf("Expected CID 'bafyresurrected', got '%s'", comment.CID)
1409 }
1410
1411 // Verify parent count was restored (post should have comment_count = 1)
1412 var postCommentCount int
1413 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", postURI).Scan(&postCommentCount)
1414 if err != nil {
1415 t.Fatalf("Failed to check post comment count: %v", err)
1416 }
1417 if postCommentCount != 1 {
1418 t.Errorf("Expected post comment_count to be 1 after resurrection, got %d", postCommentCount)
1419 }
1420 })
1421
1422 t.Run("Recreate deleted comment with DIFFERENT parent", func(t *testing.T) {
1423 // Create two posts
1424 post1URI := createTestPost(t, db, testCommunity, testUser.DID, "Post 1", 0, time.Now())
1425 post2URI := createTestPost(t, db, testCommunity, testUser.DID, "Post 2", 0, time.Now())
1426
1427 rkey2 := generateTID()
1428 commentURI2 := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey2)
1429
1430 // Step 1: Create comment on Post 1
1431 createEvent := &jetstream.JetstreamEvent{
1432 Did: testUser.DID,
1433 Kind: "commit",
1434 Commit: &jetstream.CommitEvent{
1435 Rev: "v1",
1436 Operation: "create",
1437 Collection: "social.coves.feed.comment",
1438 RKey: rkey2,
1439 CID: "bafyv1",
1440 Record: map[string]interface{}{
1441 "$type": "social.coves.feed.comment",
1442 "content": "Original on Post 1",
1443 "reply": map[string]interface{}{
1444 "root": map[string]interface{}{
1445 "uri": post1URI,
1446 "cid": "bafypost1",
1447 },
1448 "parent": map[string]interface{}{
1449 "uri": post1URI,
1450 "cid": "bafypost1",
1451 },
1452 },
1453 "createdAt": time.Now().Format(time.RFC3339),
1454 },
1455 },
1456 }
1457
1458 err := consumer.HandleEvent(ctx, createEvent)
1459 if err != nil {
1460 t.Fatalf("Failed to create comment on Post 1: %v", err)
1461 }
1462
1463 // Verify Post 1 has comment_count = 1
1464 var post1Count int
1465 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post1URI).Scan(&post1Count)
1466 if err != nil {
1467 t.Fatalf("Failed to check post 1 count: %v", err)
1468 }
1469 if post1Count != 1 {
1470 t.Errorf("Expected Post 1 comment_count = 1, got %d", post1Count)
1471 }
1472
1473 // Step 2: Delete comment
1474 deleteEvent := &jetstream.JetstreamEvent{
1475 Did: testUser.DID,
1476 Kind: "commit",
1477 Commit: &jetstream.CommitEvent{
1478 Rev: "v2",
1479 Operation: "delete",
1480 Collection: "social.coves.feed.comment",
1481 RKey: rkey2,
1482 },
1483 }
1484
1485 err = consumer.HandleEvent(ctx, deleteEvent)
1486 if err != nil {
1487 t.Fatalf("Failed to delete comment: %v", err)
1488 }
1489
1490 // Verify Post 1 count decremented to 0
1491 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post1URI).Scan(&post1Count)
1492 if err != nil {
1493 t.Fatalf("Failed to check post 1 count after delete: %v", err)
1494 }
1495 if post1Count != 0 {
1496 t.Errorf("Expected Post 1 comment_count = 0 after delete, got %d", post1Count)
1497 }
1498
1499 // Step 3: Recreate comment with same rkey but on Post 2 (different parent!)
1500 recreateEvent := &jetstream.JetstreamEvent{
1501 Did: testUser.DID,
1502 Kind: "commit",
1503 Commit: &jetstream.CommitEvent{
1504 Rev: "v3",
1505 Operation: "create",
1506 Collection: "social.coves.feed.comment",
1507 RKey: rkey2, // Same rkey!
1508 CID: "bafyv3",
1509 Record: map[string]interface{}{
1510 "$type": "social.coves.feed.comment",
1511 "content": "New comment on Post 2",
1512 "reply": map[string]interface{}{
1513 "root": map[string]interface{}{
1514 "uri": post2URI, // Different root!
1515 "cid": "bafypost2",
1516 },
1517 "parent": map[string]interface{}{
1518 "uri": post2URI, // Different parent!
1519 "cid": "bafypost2",
1520 },
1521 },
1522 "createdAt": time.Now().Format(time.RFC3339),
1523 },
1524 },
1525 }
1526
1527 err = consumer.HandleEvent(ctx, recreateEvent)
1528 if err != nil {
1529 t.Fatalf("Failed to resurrect comment on Post 2: %v", err)
1530 }
1531
1532 // Step 4: Verify threading references updated correctly
1533 comment, err := commentRepo.GetByURI(ctx, commentURI2)
1534 if err != nil {
1535 t.Fatalf("Failed to get resurrected comment: %v", err)
1536 }
1537
1538 // THIS IS THE CRITICAL TEST: Threading refs must point to Post 2, not Post 1
1539 if comment.ParentURI != post2URI {
1540 t.Errorf("Expected parent URI to be %s (Post 2), got %s (STALE!)", post2URI, comment.ParentURI)
1541 }
1542 if comment.RootURI != post2URI {
1543 t.Errorf("Expected root URI to be %s (Post 2), got %s (STALE!)", post2URI, comment.RootURI)
1544 }
1545 if comment.ParentCID != "bafypost2" {
1546 t.Errorf("Expected parent CID 'bafypost2', got '%s'", comment.ParentCID)
1547 }
1548
1549 // Verify counts are correct
1550 var post2Count int
1551 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post2URI).Scan(&post2Count)
1552 if err != nil {
1553 t.Fatalf("Failed to check post 2 count: %v", err)
1554 }
1555 if post2Count != 1 {
1556 t.Errorf("Expected Post 2 comment_count = 1, got %d", post2Count)
1557 }
1558
1559 // Verify Post 1 count still 0 (not incremented by resurrection on Post 2)
1560 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post1URI).Scan(&post1Count)
1561 if err != nil {
1562 t.Fatalf("Failed to check post 1 count after resurrection: %v", err)
1563 }
1564 if post1Count != 0 {
1565 t.Errorf("Expected Post 1 comment_count = 0 (unchanged), got %d", post1Count)
1566 }
1567 })
1568}
1569
1570// TestCommentConsumer_ThreadingImmutability tests that UPDATE events cannot change threading refs
1571func TestCommentConsumer_ThreadingImmutability(t *testing.T) {
1572 db := setupTestDB(t)
1573 defer func() {
1574 if err := db.Close(); err != nil {
1575 t.Logf("Failed to close database: %v", err)
1576 }
1577 }()
1578
1579 ctx := context.Background()
1580 commentRepo := postgres.NewCommentRepository(db)
1581 consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
1582
1583 testUser := createTestUser(t, db, "immutable.test", "did:plc:immutable123")
1584 testCommunity, err := createFeedTestCommunity(db, ctx, "immutable-community", "owner9.test")
1585 if err != nil {
1586 t.Fatalf("Failed to create test community: %v", err)
1587 }
1588 postURI1 := createTestPost(t, db, testCommunity, testUser.DID, "Post 1", 0, time.Now())
1589 postURI2 := createTestPost(t, db, testCommunity, testUser.DID, "Post 2", 0, time.Now())
1590
1591 rkey := generateTID()
1592 commentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey)
1593
1594 t.Run("Reject UPDATE that changes parent URI", func(t *testing.T) {
1595 // Create comment on Post 1
1596 createEvent := &jetstream.JetstreamEvent{
1597 Did: testUser.DID,
1598 Kind: "commit",
1599 Commit: &jetstream.CommitEvent{
1600 Rev: "v1",
1601 Operation: "create",
1602 Collection: "social.coves.feed.comment",
1603 RKey: rkey,
1604 CID: "bafycomment1",
1605 Record: map[string]interface{}{
1606 "$type": "social.coves.feed.comment",
1607 "content": "Comment on Post 1",
1608 "reply": map[string]interface{}{
1609 "root": map[string]interface{}{
1610 "uri": postURI1,
1611 "cid": "bafypost1",
1612 },
1613 "parent": map[string]interface{}{
1614 "uri": postURI1,
1615 "cid": "bafypost1",
1616 },
1617 },
1618 "createdAt": time.Now().Format(time.RFC3339),
1619 },
1620 },
1621 }
1622
1623 err := consumer.HandleEvent(ctx, createEvent)
1624 if err != nil {
1625 t.Fatalf("Failed to create comment: %v", err)
1626 }
1627
1628 // Attempt to update comment to move it to Post 2 (should fail)
1629 updateEvent := &jetstream.JetstreamEvent{
1630 Did: testUser.DID,
1631 Kind: "commit",
1632 Commit: &jetstream.CommitEvent{
1633 Rev: "v2",
1634 Operation: "update",
1635 Collection: "social.coves.feed.comment",
1636 RKey: rkey,
1637 CID: "bafycomment2",
1638 Record: map[string]interface{}{
1639 "$type": "social.coves.feed.comment",
1640 "content": "Trying to hijack this comment to Post 2",
1641 "reply": map[string]interface{}{
1642 "root": map[string]interface{}{
1643 "uri": postURI2, // Changed!
1644 "cid": "bafypost2",
1645 },
1646 "parent": map[string]interface{}{
1647 "uri": postURI2, // Changed!
1648 "cid": "bafypost2",
1649 },
1650 },
1651 "createdAt": time.Now().Format(time.RFC3339),
1652 },
1653 },
1654 }
1655
1656 err = consumer.HandleEvent(ctx, updateEvent)
1657 if err == nil {
1658 t.Error("Expected error when changing threading references, got nil")
1659 }
1660 if err != nil && !contains(err.Error(), "threading references cannot be changed") {
1661 t.Errorf("Expected 'threading references cannot be changed' error, got: %v", err)
1662 }
1663
1664 // Verify comment still points to Post 1
1665 comment, err := commentRepo.GetByURI(ctx, commentURI)
1666 if err != nil {
1667 t.Fatalf("Failed to get comment: %v", err)
1668 }
1669 if comment.ParentURI != postURI1 {
1670 t.Errorf("Expected parent URI to remain %s, got %s", postURI1, comment.ParentURI)
1671 }
1672 if comment.RootURI != postURI1 {
1673 t.Errorf("Expected root URI to remain %s, got %s", postURI1, comment.RootURI)
1674 }
1675 // Content should NOT have been updated since the operation was rejected
1676 if comment.Content != "Comment on Post 1" {
1677 t.Errorf("Expected original content, got '%s'", comment.Content)
1678 }
1679 })
1680
1681 t.Run("Allow UPDATE that only changes content (threading unchanged)", func(t *testing.T) {
1682 rkey2 := generateTID()
1683 commentURI2 := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey2)
1684
1685 // Create comment
1686 createEvent := &jetstream.JetstreamEvent{
1687 Did: testUser.DID,
1688 Kind: "commit",
1689 Commit: &jetstream.CommitEvent{
1690 Rev: "v1",
1691 Operation: "create",
1692 Collection: "social.coves.feed.comment",
1693 RKey: rkey2,
1694 CID: "bafycomment3",
1695 Record: map[string]interface{}{
1696 "$type": "social.coves.feed.comment",
1697 "content": "Original content",
1698 "reply": map[string]interface{}{
1699 "root": map[string]interface{}{
1700 "uri": postURI1,
1701 "cid": "bafypost1",
1702 },
1703 "parent": map[string]interface{}{
1704 "uri": postURI1,
1705 "cid": "bafypost1",
1706 },
1707 },
1708 "createdAt": time.Now().Format(time.RFC3339),
1709 },
1710 },
1711 }
1712
1713 err := consumer.HandleEvent(ctx, createEvent)
1714 if err != nil {
1715 t.Fatalf("Failed to create comment: %v", err)
1716 }
1717
1718 // Update content only (threading unchanged - should succeed)
1719 updateEvent := &jetstream.JetstreamEvent{
1720 Did: testUser.DID,
1721 Kind: "commit",
1722 Commit: &jetstream.CommitEvent{
1723 Rev: "v2",
1724 Operation: "update",
1725 Collection: "social.coves.feed.comment",
1726 RKey: rkey2,
1727 CID: "bafycomment4",
1728 Record: map[string]interface{}{
1729 "$type": "social.coves.feed.comment",
1730 "content": "Updated content",
1731 "reply": map[string]interface{}{
1732 "root": map[string]interface{}{
1733 "uri": postURI1, // Same
1734 "cid": "bafypost1",
1735 },
1736 "parent": map[string]interface{}{
1737 "uri": postURI1, // Same
1738 "cid": "bafypost1",
1739 },
1740 },
1741 "createdAt": time.Now().Format(time.RFC3339),
1742 },
1743 },
1744 }
1745
1746 err = consumer.HandleEvent(ctx, updateEvent)
1747 if err != nil {
1748 t.Fatalf("Expected update to succeed when threading unchanged, got error: %v", err)
1749 }
1750
1751 // Verify content was updated
1752 comment, err := commentRepo.GetByURI(ctx, commentURI2)
1753 if err != nil {
1754 t.Fatalf("Failed to get comment: %v", err)
1755 }
1756 if comment.Content != "Updated content" {
1757 t.Errorf("Expected updated content, got '%s'", comment.Content)
1758 }
1759 // Threading should remain unchanged
1760 if comment.ParentURI != postURI1 {
1761 t.Errorf("Expected parent URI %s, got %s", postURI1, comment.ParentURI)
1762 }
1763 })
1764}