A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/jetstream"
5 "Coves/internal/core/comments"
6 "Coves/internal/db/postgres"
7 "context"
8 "fmt"
9 "testing"
10 "time"
11)
12
13func TestCommentConsumer_CreateComment(t *testing.T) {
14 db := setupTestDB(t)
15 defer func() {
16 if err := db.Close(); err != nil {
17 t.Logf("Failed to close database: %v", err)
18 }
19 }()
20
21 ctx := context.Background()
22 commentRepo := postgres.NewCommentRepository(db)
23 consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
24
25 // Setup test data
26 testUser := createTestUser(t, db, "commenter.test", "did:plc:commenter123")
27 testCommunity, err := createFeedTestCommunity(db, ctx, "testcommunity", "owner.test")
28 if err != nil {
29 t.Fatalf("Failed to create test community: %v", err)
30 }
31 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Test Post", 0, time.Now())
32
33 t.Run("Create comment on post", func(t *testing.T) {
34 rkey := generateTID()
35 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey)
36
37 // Simulate Jetstream comment create event
38 event := &jetstream.JetstreamEvent{
39 Did: testUser.DID,
40 Kind: "commit",
41 Commit: &jetstream.CommitEvent{
42 Rev: "test-rev",
43 Operation: "create",
44 Collection: "social.coves.feed.comment",
45 RKey: rkey,
46 CID: "bafytest123",
47 Record: map[string]interface{}{
48 "$type": "social.coves.feed.comment",
49 "content": "This is a test comment on a post!",
50 "reply": map[string]interface{}{
51 "root": map[string]interface{}{
52 "uri": testPostURI,
53 "cid": "bafypost",
54 },
55 "parent": map[string]interface{}{
56 "uri": testPostURI,
57 "cid": "bafypost",
58 },
59 },
60 "createdAt": time.Now().Format(time.RFC3339),
61 },
62 },
63 }
64
65 // Handle the event
66 err := consumer.HandleEvent(ctx, event)
67 if err != nil {
68 t.Fatalf("Failed to handle comment create event: %v", err)
69 }
70
71 // Verify comment was indexed
72 comment, err := commentRepo.GetByURI(ctx, uri)
73 if err != nil {
74 t.Fatalf("Failed to get indexed comment: %v", err)
75 }
76
77 if comment.URI != uri {
78 t.Errorf("Expected URI %s, got %s", uri, comment.URI)
79 }
80
81 if comment.CommenterDID != testUser.DID {
82 t.Errorf("Expected commenter %s, got %s", testUser.DID, comment.CommenterDID)
83 }
84
85 if comment.Content != "This is a test comment on a post!" {
86 t.Errorf("Expected content 'This is a test comment on a post!', got %s", comment.Content)
87 }
88
89 if comment.RootURI != testPostURI {
90 t.Errorf("Expected root URI %s, got %s", testPostURI, comment.RootURI)
91 }
92
93 if comment.ParentURI != testPostURI {
94 t.Errorf("Expected parent URI %s, got %s", testPostURI, comment.ParentURI)
95 }
96
97 // Verify post comment count was incremented
98 var commentCount int
99 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&commentCount)
100 if err != nil {
101 t.Fatalf("Failed to get post comment count: %v", err)
102 }
103
104 if commentCount != 1 {
105 t.Errorf("Expected post comment_count to be 1, got %d", commentCount)
106 }
107 })
108
109 t.Run("Idempotent create - duplicate event", func(t *testing.T) {
110 rkey := generateTID()
111
112 event := &jetstream.JetstreamEvent{
113 Did: testUser.DID,
114 Kind: "commit",
115 Commit: &jetstream.CommitEvent{
116 Rev: "test-rev",
117 Operation: "create",
118 Collection: "social.coves.feed.comment",
119 RKey: rkey,
120 CID: "bafytest456",
121 Record: map[string]interface{}{
122 "$type": "social.coves.feed.comment",
123 "content": "Idempotent test comment",
124 "reply": map[string]interface{}{
125 "root": map[string]interface{}{
126 "uri": testPostURI,
127 "cid": "bafypost",
128 },
129 "parent": map[string]interface{}{
130 "uri": testPostURI,
131 "cid": "bafypost",
132 },
133 },
134 "createdAt": time.Now().Format(time.RFC3339),
135 },
136 },
137 }
138
139 // First creation
140 err := consumer.HandleEvent(ctx, event)
141 if err != nil {
142 t.Fatalf("First creation failed: %v", err)
143 }
144
145 // Get initial comment count
146 var initialCount int
147 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&initialCount)
148 if err != nil {
149 t.Fatalf("Failed to get initial comment count: %v", err)
150 }
151
152 // Duplicate creation - should be idempotent
153 err = consumer.HandleEvent(ctx, event)
154 if err != nil {
155 t.Fatalf("Duplicate event should be handled gracefully: %v", err)
156 }
157
158 // Verify count wasn't incremented again
159 var finalCount int
160 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&finalCount)
161 if err != nil {
162 t.Fatalf("Failed to get final comment count: %v", err)
163 }
164
165 if finalCount != initialCount {
166 t.Errorf("Comment count should not increase on duplicate event. Initial: %d, Final: %d", initialCount, finalCount)
167 }
168 })
169}
170
171func TestCommentConsumer_Threading(t *testing.T) {
172 db := setupTestDB(t)
173 defer func() {
174 if err := db.Close(); err != nil {
175 t.Logf("Failed to close database: %v", err)
176 }
177 }()
178
179 ctx := context.Background()
180 commentRepo := postgres.NewCommentRepository(db)
181 consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
182
183 // Setup test data
184 testUser := createTestUser(t, db, "threader.test", "did:plc:threader123")
185 testCommunity, err := createFeedTestCommunity(db, ctx, "threadcommunity", "owner2.test")
186 if err != nil {
187 t.Fatalf("Failed to create test community: %v", err)
188 }
189 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Threading Test", 0, time.Now())
190
191 t.Run("Create nested comment replies", func(t *testing.T) {
192 // Create first-level comment on post
193 comment1Rkey := generateTID()
194 comment1URI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, comment1Rkey)
195
196 event1 := &jetstream.JetstreamEvent{
197 Did: testUser.DID,
198 Kind: "commit",
199 Commit: &jetstream.CommitEvent{
200 Operation: "create",
201 Collection: "social.coves.feed.comment",
202 RKey: comment1Rkey,
203 CID: "bafycomment1",
204 Record: map[string]interface{}{
205 "content": "First level comment",
206 "reply": map[string]interface{}{
207 "root": map[string]interface{}{
208 "uri": testPostURI,
209 "cid": "bafypost",
210 },
211 "parent": map[string]interface{}{
212 "uri": testPostURI,
213 "cid": "bafypost",
214 },
215 },
216 "createdAt": time.Now().Format(time.RFC3339),
217 },
218 },
219 }
220
221 err := consumer.HandleEvent(ctx, event1)
222 if err != nil {
223 t.Fatalf("Failed to create first-level comment: %v", err)
224 }
225
226 // Create second-level comment (reply to first comment)
227 comment2Rkey := generateTID()
228 comment2URI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, comment2Rkey)
229
230 event2 := &jetstream.JetstreamEvent{
231 Did: testUser.DID,
232 Kind: "commit",
233 Commit: &jetstream.CommitEvent{
234 Operation: "create",
235 Collection: "social.coves.feed.comment",
236 RKey: comment2Rkey,
237 CID: "bafycomment2",
238 Record: map[string]interface{}{
239 "content": "Second level comment (reply to first)",
240 "reply": map[string]interface{}{
241 "root": map[string]interface{}{
242 "uri": testPostURI,
243 "cid": "bafypost",
244 },
245 "parent": map[string]interface{}{
246 "uri": comment1URI,
247 "cid": "bafycomment1",
248 },
249 },
250 "createdAt": time.Now().Add(1 * time.Second).Format(time.RFC3339),
251 },
252 },
253 }
254
255 err = consumer.HandleEvent(ctx, event2)
256 if err != nil {
257 t.Fatalf("Failed to create second-level comment: %v", err)
258 }
259
260 // Verify threading structure
261 comment1, err := commentRepo.GetByURI(ctx, comment1URI)
262 if err != nil {
263 t.Fatalf("Failed to get first comment: %v", err)
264 }
265
266 comment2, err := commentRepo.GetByURI(ctx, comment2URI)
267 if err != nil {
268 t.Fatalf("Failed to get second comment: %v", err)
269 }
270
271 // Both should have same root (original post)
272 if comment1.RootURI != testPostURI {
273 t.Errorf("Comment1 root should be post URI, got %s", comment1.RootURI)
274 }
275
276 if comment2.RootURI != testPostURI {
277 t.Errorf("Comment2 root should be post URI, got %s", comment2.RootURI)
278 }
279
280 // Comment1 parent should be post
281 if comment1.ParentURI != testPostURI {
282 t.Errorf("Comment1 parent should be post URI, got %s", comment1.ParentURI)
283 }
284
285 // Comment2 parent should be comment1
286 if comment2.ParentURI != comment1URI {
287 t.Errorf("Comment2 parent should be comment1 URI, got %s", comment2.ParentURI)
288 }
289
290 // Verify reply count on comment1
291 if comment1.ReplyCount != 1 {
292 t.Errorf("Comment1 should have 1 reply, got %d", comment1.ReplyCount)
293 }
294
295 // Query all comments by root
296 allComments, err := commentRepo.ListByRoot(ctx, testPostURI, 100, 0)
297 if err != nil {
298 t.Fatalf("Failed to list comments by root: %v", err)
299 }
300
301 if len(allComments) != 2 {
302 t.Errorf("Expected 2 comments in thread, got %d", len(allComments))
303 }
304
305 // Query direct replies to post
306 directReplies, err := commentRepo.ListByParent(ctx, testPostURI, 100, 0)
307 if err != nil {
308 t.Fatalf("Failed to list direct replies to post: %v", err)
309 }
310
311 if len(directReplies) != 1 {
312 t.Errorf("Expected 1 direct reply to post, got %d", len(directReplies))
313 }
314
315 // Query replies to comment1
316 comment1Replies, err := commentRepo.ListByParent(ctx, comment1URI, 100, 0)
317 if err != nil {
318 t.Fatalf("Failed to list replies to comment1: %v", err)
319 }
320
321 if len(comment1Replies) != 1 {
322 t.Errorf("Expected 1 reply to comment1, got %d", len(comment1Replies))
323 }
324 })
325}
326
327func TestCommentConsumer_UpdateComment(t *testing.T) {
328 db := setupTestDB(t)
329 defer func() {
330 if err := db.Close(); err != nil {
331 t.Logf("Failed to close database: %v", err)
332 }
333 }()
334
335 ctx := context.Background()
336 commentRepo := postgres.NewCommentRepository(db)
337 consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
338
339 // Setup test data
340 testUser := createTestUser(t, db, "editor.test", "did:plc:editor123")
341 testCommunity, err := createFeedTestCommunity(db, ctx, "editcommunity", "owner3.test")
342 if err != nil {
343 t.Fatalf("Failed to create test community: %v", err)
344 }
345 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Edit Test", 0, time.Now())
346
347 t.Run("Update comment content preserves vote counts", func(t *testing.T) {
348 rkey := generateTID()
349 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey)
350
351 // Create initial comment
352 createEvent := &jetstream.JetstreamEvent{
353 Did: testUser.DID,
354 Kind: "commit",
355 Commit: &jetstream.CommitEvent{
356 Operation: "create",
357 Collection: "social.coves.feed.comment",
358 RKey: rkey,
359 CID: "bafyoriginal",
360 Record: map[string]interface{}{
361 "content": "Original comment content",
362 "reply": map[string]interface{}{
363 "root": map[string]interface{}{
364 "uri": testPostURI,
365 "cid": "bafypost",
366 },
367 "parent": map[string]interface{}{
368 "uri": testPostURI,
369 "cid": "bafypost",
370 },
371 },
372 "createdAt": time.Now().Format(time.RFC3339),
373 },
374 },
375 }
376
377 err := consumer.HandleEvent(ctx, createEvent)
378 if err != nil {
379 t.Fatalf("Failed to create comment: %v", err)
380 }
381
382 // Manually set vote counts to simulate votes
383 _, err = db.ExecContext(ctx, `
384 UPDATE comments
385 SET upvote_count = 5, downvote_count = 2, score = 3
386 WHERE uri = $1
387 `, uri)
388 if err != nil {
389 t.Fatalf("Failed to set vote counts: %v", err)
390 }
391
392 // Update the comment
393 updateEvent := &jetstream.JetstreamEvent{
394 Did: testUser.DID,
395 Kind: "commit",
396 Commit: &jetstream.CommitEvent{
397 Operation: "update",
398 Collection: "social.coves.feed.comment",
399 RKey: rkey,
400 CID: "bafyupdated",
401 Record: map[string]interface{}{
402 "content": "EDITED: Updated comment content",
403 "reply": map[string]interface{}{
404 "root": map[string]interface{}{
405 "uri": testPostURI,
406 "cid": "bafypost",
407 },
408 "parent": map[string]interface{}{
409 "uri": testPostURI,
410 "cid": "bafypost",
411 },
412 },
413 "createdAt": time.Now().Format(time.RFC3339),
414 },
415 },
416 }
417
418 err = consumer.HandleEvent(ctx, updateEvent)
419 if err != nil {
420 t.Fatalf("Failed to update comment: %v", err)
421 }
422
423 // Verify content updated
424 comment, err := commentRepo.GetByURI(ctx, uri)
425 if err != nil {
426 t.Fatalf("Failed to get updated comment: %v", err)
427 }
428
429 if comment.Content != "EDITED: Updated comment content" {
430 t.Errorf("Expected updated content, got %s", comment.Content)
431 }
432
433 // Verify CID updated
434 if comment.CID != "bafyupdated" {
435 t.Errorf("Expected CID to be updated to bafyupdated, got %s", comment.CID)
436 }
437
438 // Verify vote counts preserved
439 if comment.UpvoteCount != 5 {
440 t.Errorf("Expected upvote_count preserved at 5, got %d", comment.UpvoteCount)
441 }
442
443 if comment.DownvoteCount != 2 {
444 t.Errorf("Expected downvote_count preserved at 2, got %d", comment.DownvoteCount)
445 }
446
447 if comment.Score != 3 {
448 t.Errorf("Expected score preserved at 3, got %d", comment.Score)
449 }
450 })
451}
452
453func TestCommentConsumer_DeleteComment(t *testing.T) {
454 db := setupTestDB(t)
455 defer func() {
456 if err := db.Close(); err != nil {
457 t.Logf("Failed to close database: %v", err)
458 }
459 }()
460
461 ctx := context.Background()
462 commentRepo := postgres.NewCommentRepository(db)
463 consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
464
465 // Setup test data
466 testUser := createTestUser(t, db, "deleter.test", "did:plc:deleter123")
467 testCommunity, err := createFeedTestCommunity(db, ctx, "deletecommunity", "owner4.test")
468 if err != nil {
469 t.Fatalf("Failed to create test community: %v", err)
470 }
471 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Delete Test", 0, time.Now())
472
473 t.Run("Delete comment decrements parent count", func(t *testing.T) {
474 rkey := generateTID()
475 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey)
476
477 // Create comment
478 createEvent := &jetstream.JetstreamEvent{
479 Did: testUser.DID,
480 Kind: "commit",
481 Commit: &jetstream.CommitEvent{
482 Operation: "create",
483 Collection: "social.coves.feed.comment",
484 RKey: rkey,
485 CID: "bafydelete",
486 Record: map[string]interface{}{
487 "content": "Comment to be deleted",
488 "reply": map[string]interface{}{
489 "root": map[string]interface{}{
490 "uri": testPostURI,
491 "cid": "bafypost",
492 },
493 "parent": map[string]interface{}{
494 "uri": testPostURI,
495 "cid": "bafypost",
496 },
497 },
498 "createdAt": time.Now().Format(time.RFC3339),
499 },
500 },
501 }
502
503 err := consumer.HandleEvent(ctx, createEvent)
504 if err != nil {
505 t.Fatalf("Failed to create comment: %v", err)
506 }
507
508 // Get initial post comment count
509 var initialCount int
510 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&initialCount)
511 if err != nil {
512 t.Fatalf("Failed to get initial comment count: %v", err)
513 }
514
515 // Delete comment
516 deleteEvent := &jetstream.JetstreamEvent{
517 Did: testUser.DID,
518 Kind: "commit",
519 Commit: &jetstream.CommitEvent{
520 Operation: "delete",
521 Collection: "social.coves.feed.comment",
522 RKey: rkey,
523 },
524 }
525
526 err = consumer.HandleEvent(ctx, deleteEvent)
527 if err != nil {
528 t.Fatalf("Failed to delete comment: %v", err)
529 }
530
531 // Verify soft delete
532 comment, err := commentRepo.GetByURI(ctx, uri)
533 if err != nil {
534 t.Fatalf("Failed to get deleted comment: %v", err)
535 }
536
537 if comment.DeletedAt == nil {
538 t.Error("Expected deleted_at to be set, got nil")
539 }
540
541 // Verify post comment count decremented
542 var finalCount int
543 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&finalCount)
544 if err != nil {
545 t.Fatalf("Failed to get final comment count: %v", err)
546 }
547
548 if finalCount != initialCount-1 {
549 t.Errorf("Expected comment count to decrease by 1. Initial: %d, Final: %d", initialCount, finalCount)
550 }
551 })
552
553 t.Run("Delete is idempotent", func(t *testing.T) {
554 rkey := generateTID()
555
556 // Create comment
557 createEvent := &jetstream.JetstreamEvent{
558 Did: testUser.DID,
559 Kind: "commit",
560 Commit: &jetstream.CommitEvent{
561 Operation: "create",
562 Collection: "social.coves.feed.comment",
563 RKey: rkey,
564 CID: "bafyidempdelete",
565 Record: map[string]interface{}{
566 "content": "Idempotent delete test",
567 "reply": map[string]interface{}{
568 "root": map[string]interface{}{
569 "uri": testPostURI,
570 "cid": "bafypost",
571 },
572 "parent": map[string]interface{}{
573 "uri": testPostURI,
574 "cid": "bafypost",
575 },
576 },
577 "createdAt": time.Now().Format(time.RFC3339),
578 },
579 },
580 }
581
582 err := consumer.HandleEvent(ctx, createEvent)
583 if err != nil {
584 t.Fatalf("Failed to create comment: %v", err)
585 }
586
587 // First delete
588 deleteEvent := &jetstream.JetstreamEvent{
589 Did: testUser.DID,
590 Kind: "commit",
591 Commit: &jetstream.CommitEvent{
592 Operation: "delete",
593 Collection: "social.coves.feed.comment",
594 RKey: rkey,
595 },
596 }
597
598 err = consumer.HandleEvent(ctx, deleteEvent)
599 if err != nil {
600 t.Fatalf("First delete failed: %v", err)
601 }
602
603 // Get count after first delete
604 var countAfterFirstDelete int
605 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&countAfterFirstDelete)
606 if err != nil {
607 t.Fatalf("Failed to get count after first delete: %v", err)
608 }
609
610 // Second delete (idempotent)
611 err = consumer.HandleEvent(ctx, deleteEvent)
612 if err != nil {
613 t.Fatalf("Second delete should be idempotent: %v", err)
614 }
615
616 // Verify count didn't change
617 var countAfterSecondDelete int
618 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&countAfterSecondDelete)
619 if err != nil {
620 t.Fatalf("Failed to get count after second delete: %v", err)
621 }
622
623 if countAfterSecondDelete != countAfterFirstDelete {
624 t.Errorf("Count should not change on duplicate delete. After first: %d, After second: %d", countAfterFirstDelete, countAfterSecondDelete)
625 }
626 })
627}
628
629func TestCommentConsumer_SecurityValidation(t *testing.T) {
630 db := setupTestDB(t)
631 defer func() {
632 if err := db.Close(); err != nil {
633 t.Logf("Failed to close database: %v", err)
634 }
635 }()
636
637 ctx := context.Background()
638 commentRepo := postgres.NewCommentRepository(db)
639 consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
640
641 testUser := createTestUser(t, db, "security.test", "did:plc:security123")
642 testCommunity, err := createFeedTestCommunity(db, ctx, "seccommunity", "owner5.test")
643 if err != nil {
644 t.Fatalf("Failed to create test community: %v", err)
645 }
646 testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Security Test", 0, time.Now())
647
648 t.Run("Reject comment with empty content", func(t *testing.T) {
649 event := &jetstream.JetstreamEvent{
650 Did: testUser.DID,
651 Kind: "commit",
652 Commit: &jetstream.CommitEvent{
653 Operation: "create",
654 Collection: "social.coves.feed.comment",
655 RKey: generateTID(),
656 CID: "bafyinvalid",
657 Record: map[string]interface{}{
658 "content": "", // Empty content
659 "reply": map[string]interface{}{
660 "root": map[string]interface{}{
661 "uri": testPostURI,
662 "cid": "bafypost",
663 },
664 "parent": map[string]interface{}{
665 "uri": testPostURI,
666 "cid": "bafypost",
667 },
668 },
669 "createdAt": time.Now().Format(time.RFC3339),
670 },
671 },
672 }
673
674 err := consumer.HandleEvent(ctx, event)
675 if err == nil {
676 t.Error("Expected error for empty content, got nil")
677 }
678 })
679
680 t.Run("Reject comment with invalid root reference", func(t *testing.T) {
681 event := &jetstream.JetstreamEvent{
682 Did: testUser.DID,
683 Kind: "commit",
684 Commit: &jetstream.CommitEvent{
685 Operation: "create",
686 Collection: "social.coves.feed.comment",
687 RKey: generateTID(),
688 CID: "bafyinvalid2",
689 Record: map[string]interface{}{
690 "content": "Valid content",
691 "reply": map[string]interface{}{
692 "root": map[string]interface{}{
693 "uri": "", // Missing URI
694 "cid": "bafypost",
695 },
696 "parent": map[string]interface{}{
697 "uri": testPostURI,
698 "cid": "bafypost",
699 },
700 },
701 "createdAt": time.Now().Format(time.RFC3339),
702 },
703 },
704 }
705
706 err := consumer.HandleEvent(ctx, event)
707 if err == nil {
708 t.Error("Expected error for invalid root reference, got nil")
709 }
710 })
711
712 t.Run("Reject comment with invalid parent reference", func(t *testing.T) {
713 event := &jetstream.JetstreamEvent{
714 Did: testUser.DID,
715 Kind: "commit",
716 Commit: &jetstream.CommitEvent{
717 Operation: "create",
718 Collection: "social.coves.feed.comment",
719 RKey: generateTID(),
720 CID: "bafyinvalid3",
721 Record: map[string]interface{}{
722 "content": "Valid content",
723 "reply": map[string]interface{}{
724 "root": map[string]interface{}{
725 "uri": testPostURI,
726 "cid": "bafypost",
727 },
728 "parent": map[string]interface{}{
729 "uri": testPostURI,
730 "cid": "", // Missing CID
731 },
732 },
733 "createdAt": time.Now().Format(time.RFC3339),
734 },
735 },
736 }
737
738 err := consumer.HandleEvent(ctx, event)
739 if err == nil {
740 t.Error("Expected error for invalid parent reference, got nil")
741 }
742 })
743
744 t.Run("Reject comment with invalid DID format", func(t *testing.T) {
745 event := &jetstream.JetstreamEvent{
746 Did: "invalid-did-format", // Bad DID
747 Kind: "commit",
748 Commit: &jetstream.CommitEvent{
749 Operation: "create",
750 Collection: "social.coves.feed.comment",
751 RKey: generateTID(),
752 CID: "bafyinvalid4",
753 Record: map[string]interface{}{
754 "content": "Valid content",
755 "reply": map[string]interface{}{
756 "root": map[string]interface{}{
757 "uri": testPostURI,
758 "cid": "bafypost",
759 },
760 "parent": map[string]interface{}{
761 "uri": testPostURI,
762 "cid": "bafypost",
763 },
764 },
765 "createdAt": time.Now().Format(time.RFC3339),
766 },
767 },
768 }
769
770 err := consumer.HandleEvent(ctx, event)
771 if err == nil {
772 t.Error("Expected error for invalid DID format, got nil")
773 }
774 })
775
776 t.Run("Reject comment exceeding max content length", func(t *testing.T) {
777 event := &jetstream.JetstreamEvent{
778 Did: testUser.DID,
779 Kind: "commit",
780 Commit: &jetstream.CommitEvent{
781 Operation: "create",
782 Collection: "social.coves.feed.comment",
783 RKey: generateTID(),
784 CID: "bafytoobig",
785 Record: map[string]interface{}{
786 "content": string(make([]byte, 30001)), // Exceeds 30000 byte limit
787 "reply": map[string]interface{}{
788 "root": map[string]interface{}{
789 "uri": testPostURI,
790 "cid": "bafypost",
791 },
792 "parent": map[string]interface{}{
793 "uri": testPostURI,
794 "cid": "bafypost",
795 },
796 },
797 "createdAt": time.Now().Format(time.RFC3339),
798 },
799 },
800 }
801
802 err := consumer.HandleEvent(ctx, event)
803 if err == nil {
804 t.Error("Expected error for oversized content, got nil")
805 }
806 if err != nil && !contains(err.Error(), "exceeds maximum length") {
807 t.Errorf("Expected 'exceeds maximum length' error, got: %v", err)
808 }
809 })
810
811 t.Run("Reject comment with malformed parent URI", func(t *testing.T) {
812 event := &jetstream.JetstreamEvent{
813 Did: testUser.DID,
814 Kind: "commit",
815 Commit: &jetstream.CommitEvent{
816 Operation: "create",
817 Collection: "social.coves.feed.comment",
818 RKey: generateTID(),
819 CID: "bafymalformed",
820 Record: map[string]interface{}{
821 "content": "Valid content",
822 "reply": map[string]interface{}{
823 "root": map[string]interface{}{
824 "uri": testPostURI,
825 "cid": "bafypost",
826 },
827 "parent": map[string]interface{}{
828 "uri": "at://malformed", // Invalid: missing collection/rkey
829 "cid": "bafyparent",
830 },
831 },
832 "createdAt": time.Now().Format(time.RFC3339),
833 },
834 },
835 }
836
837 err := consumer.HandleEvent(ctx, event)
838 if err == nil {
839 t.Error("Expected error for malformed AT-URI, got nil")
840 }
841 if err != nil && !contains(err.Error(), "invalid parent URI") {
842 t.Errorf("Expected 'invalid parent URI' error, got: %v", err)
843 }
844 })
845
846 t.Run("Reject comment with malformed root URI", func(t *testing.T) {
847 event := &jetstream.JetstreamEvent{
848 Did: testUser.DID,
849 Kind: "commit",
850 Commit: &jetstream.CommitEvent{
851 Operation: "create",
852 Collection: "social.coves.feed.comment",
853 RKey: generateTID(),
854 CID: "bafymalformed2",
855 Record: map[string]interface{}{
856 "content": "Valid content",
857 "reply": map[string]interface{}{
858 "root": map[string]interface{}{
859 "uri": "at://did:plc:test123", // Invalid: missing collection/rkey
860 "cid": "bafyroot",
861 },
862 "parent": map[string]interface{}{
863 "uri": testPostURI,
864 "cid": "bafyparent",
865 },
866 },
867 "createdAt": time.Now().Format(time.RFC3339),
868 },
869 },
870 }
871
872 err := consumer.HandleEvent(ctx, event)
873 if err == nil {
874 t.Error("Expected error for malformed AT-URI, got nil")
875 }
876 if err != nil && !contains(err.Error(), "invalid root URI") {
877 t.Errorf("Expected 'invalid root URI' error, got: %v", err)
878 }
879 })
880}
881
882func TestCommentRepository_Queries(t *testing.T) {
883 db := setupTestDB(t)
884 defer func() {
885 if err := db.Close(); err != nil {
886 t.Logf("Failed to close database: %v", err)
887 }
888 }()
889
890 ctx := context.Background()
891 commentRepo := postgres.NewCommentRepository(db)
892
893 // Clean up any existing test data from previous runs
894 _, err := db.ExecContext(ctx, "DELETE FROM comments WHERE commenter_did LIKE 'did:plc:%'")
895 if err != nil {
896 t.Fatalf("Failed to clean up test comments: %v", err)
897 }
898
899 testUser := createTestUser(t, db, "query.test", "did:plc:query123")
900 testCommunity, err := createFeedTestCommunity(db, ctx, "querycommunity", "owner6.test")
901 if err != nil {
902 t.Fatalf("Failed to create test community: %v", err)
903 }
904 postURI := createTestPost(t, db, testCommunity, testUser.DID, "Query Test", 0, time.Now())
905
906 // Create a comment tree
907 // Post
908 // |- Comment 1
909 // |- Comment 2
910 // |- Comment 3
911 // |- Comment 4
912
913 comment1 := &comments.Comment{
914 URI: fmt.Sprintf("at://%s/social.coves.feed.comment/1", testUser.DID),
915 CID: "bafyc1",
916 RKey: "1",
917 CommenterDID: testUser.DID,
918 RootURI: postURI,
919 RootCID: "bafypost",
920 ParentURI: postURI,
921 ParentCID: "bafypost",
922 Content: "Comment 1",
923 Langs: []string{},
924 CreatedAt: time.Now(),
925 }
926
927 comment2 := &comments.Comment{
928 URI: fmt.Sprintf("at://%s/social.coves.feed.comment/2", testUser.DID),
929 CID: "bafyc2",
930 RKey: "2",
931 CommenterDID: testUser.DID,
932 RootURI: postURI,
933 RootCID: "bafypost",
934 ParentURI: comment1.URI,
935 ParentCID: "bafyc1",
936 Content: "Comment 2 (reply to 1)",
937 Langs: []string{},
938 CreatedAt: time.Now().Add(1 * time.Second),
939 }
940
941 comment3 := &comments.Comment{
942 URI: fmt.Sprintf("at://%s/social.coves.feed.comment/3", testUser.DID),
943 CID: "bafyc3",
944 RKey: "3",
945 CommenterDID: testUser.DID,
946 RootURI: postURI,
947 RootCID: "bafypost",
948 ParentURI: comment1.URI,
949 ParentCID: "bafyc1",
950 Content: "Comment 3 (reply to 1)",
951 Langs: []string{},
952 CreatedAt: time.Now().Add(2 * time.Second),
953 }
954
955 comment4 := &comments.Comment{
956 URI: fmt.Sprintf("at://%s/social.coves.feed.comment/4", testUser.DID),
957 CID: "bafyc4",
958 RKey: "4",
959 CommenterDID: testUser.DID,
960 RootURI: postURI,
961 RootCID: "bafypost",
962 ParentURI: postURI,
963 ParentCID: "bafypost",
964 Content: "Comment 4",
965 Langs: []string{},
966 CreatedAt: time.Now().Add(3 * time.Second),
967 }
968
969 // Create all comments
970 for i, c := range []*comments.Comment{comment1, comment2, comment3, comment4} {
971 if err := commentRepo.Create(ctx, c); err != nil {
972 t.Fatalf("Failed to create comment %d: %v", i+1, err)
973 }
974 t.Logf("Created comment %d: %s", i+1, c.URI)
975 }
976
977 // Verify comments were created
978 verifyCount, err := commentRepo.CountByParent(ctx, postURI)
979 if err != nil {
980 t.Fatalf("Failed to count comments: %v", err)
981 }
982 t.Logf("Direct replies to post after creation: %d", verifyCount)
983
984 t.Run("ListByRoot returns all comments in thread", func(t *testing.T) {
985 comments, err := commentRepo.ListByRoot(ctx, postURI, 100, 0)
986 if err != nil {
987 t.Fatalf("Failed to list by root: %v", err)
988 }
989
990 if len(comments) != 4 {
991 t.Errorf("Expected 4 comments, got %d", len(comments))
992 }
993 })
994
995 t.Run("ListByParent returns direct replies", func(t *testing.T) {
996 // Direct replies to post
997 postReplies, err := commentRepo.ListByParent(ctx, postURI, 100, 0)
998 if err != nil {
999 t.Fatalf("Failed to list post replies: %v", err)
1000 }
1001
1002 if len(postReplies) != 2 {
1003 t.Errorf("Expected 2 direct replies to post, got %d", len(postReplies))
1004 }
1005
1006 // Direct replies to comment1
1007 comment1Replies, err := commentRepo.ListByParent(ctx, comment1.URI, 100, 0)
1008 if err != nil {
1009 t.Fatalf("Failed to list comment1 replies: %v", err)
1010 }
1011
1012 if len(comment1Replies) != 2 {
1013 t.Errorf("Expected 2 direct replies to comment1, got %d", len(comment1Replies))
1014 }
1015 })
1016
1017 t.Run("CountByParent returns correct counts", func(t *testing.T) {
1018 postCount, err := commentRepo.CountByParent(ctx, postURI)
1019 if err != nil {
1020 t.Fatalf("Failed to count post replies: %v", err)
1021 }
1022
1023 if postCount != 2 {
1024 t.Errorf("Expected 2 direct replies to post, got %d", postCount)
1025 }
1026
1027 comment1Count, err := commentRepo.CountByParent(ctx, comment1.URI)
1028 if err != nil {
1029 t.Fatalf("Failed to count comment1 replies: %v", err)
1030 }
1031
1032 if comment1Count != 2 {
1033 t.Errorf("Expected 2 direct replies to comment1, got %d", comment1Count)
1034 }
1035 })
1036
1037 t.Run("ListByCommenter returns user's comments", func(t *testing.T) {
1038 userComments, err := commentRepo.ListByCommenter(ctx, testUser.DID, 100, 0)
1039 if err != nil {
1040 t.Fatalf("Failed to list by commenter: %v", err)
1041 }
1042
1043 if len(userComments) != 4 {
1044 t.Errorf("Expected 4 comments by user, got %d", len(userComments))
1045 }
1046 })
1047}
1048
1049// TestCommentConsumer_OutOfOrderReconciliation tests that parent counts are
1050// correctly reconciled when child comments arrive before their parent
1051func TestCommentConsumer_OutOfOrderReconciliation(t *testing.T) {
1052 db := setupTestDB(t)
1053 defer func() {
1054 if err := db.Close(); err != nil {
1055 t.Logf("Failed to close database: %v", err)
1056 }
1057 }()
1058
1059 ctx := context.Background()
1060 commentRepo := postgres.NewCommentRepository(db)
1061 consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
1062
1063 testUser := createTestUser(t, db, "outoforder.test", "did:plc:outoforder123")
1064 testCommunity, err := createFeedTestCommunity(db, ctx, "ooo-community", "owner7.test")
1065 if err != nil {
1066 t.Fatalf("Failed to create test community: %v", err)
1067 }
1068 postURI := createTestPost(t, db, testCommunity, testUser.DID, "OOO Test Post", 0, time.Now())
1069
1070 t.Run("Child arrives before parent - count reconciled", func(t *testing.T) {
1071 // Scenario: User A creates comment C1 on post
1072 // User B creates reply C2 to C1
1073 // Jetstream delivers C2 before C1 (different repos)
1074 // When C1 finally arrives, its reply_count should be 1, not 0
1075
1076 parentRkey := generateTID()
1077 parentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, parentRkey)
1078
1079 childRkey := generateTID()
1080 childURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, childRkey)
1081
1082 // Step 1: Index child FIRST (before parent exists)
1083 childEvent := &jetstream.JetstreamEvent{
1084 Did: testUser.DID,
1085 Kind: "commit",
1086 Commit: &jetstream.CommitEvent{
1087 Rev: "child-rev",
1088 Operation: "create",
1089 Collection: "social.coves.feed.comment",
1090 RKey: childRkey,
1091 CID: "bafychild",
1092 Record: map[string]interface{}{
1093 "$type": "social.coves.feed.comment",
1094 "content": "This is a reply to a comment that doesn't exist yet!",
1095 "reply": map[string]interface{}{
1096 "root": map[string]interface{}{
1097 "uri": postURI,
1098 "cid": "bafypost",
1099 },
1100 "parent": map[string]interface{}{
1101 "uri": parentURI, // Points to parent that doesn't exist yet
1102 "cid": "bafyparent",
1103 },
1104 },
1105 "createdAt": time.Now().Format(time.RFC3339),
1106 },
1107 },
1108 }
1109
1110 err := consumer.HandleEvent(ctx, childEvent)
1111 if err != nil {
1112 t.Fatalf("Failed to handle child event: %v", err)
1113 }
1114
1115 // Verify child was indexed
1116 childComment, err := commentRepo.GetByURI(ctx, childURI)
1117 if err != nil {
1118 t.Fatalf("Child comment not indexed: %v", err)
1119 }
1120 if childComment.ParentURI != parentURI {
1121 t.Errorf("Expected child parent_uri %s, got %s", parentURI, childComment.ParentURI)
1122 }
1123
1124 // Step 2: Now index parent (arrives late due to Jetstream ordering)
1125 parentEvent := &jetstream.JetstreamEvent{
1126 Did: testUser.DID,
1127 Kind: "commit",
1128 Commit: &jetstream.CommitEvent{
1129 Rev: "parent-rev",
1130 Operation: "create",
1131 Collection: "social.coves.feed.comment",
1132 RKey: parentRkey,
1133 CID: "bafyparent",
1134 Record: map[string]interface{}{
1135 "$type": "social.coves.feed.comment",
1136 "content": "This is the parent comment arriving late",
1137 "reply": map[string]interface{}{
1138 "root": map[string]interface{}{
1139 "uri": postURI,
1140 "cid": "bafypost",
1141 },
1142 "parent": map[string]interface{}{
1143 "uri": postURI,
1144 "cid": "bafypost",
1145 },
1146 },
1147 "createdAt": time.Now().Format(time.RFC3339),
1148 },
1149 },
1150 }
1151
1152 err = consumer.HandleEvent(ctx, parentEvent)
1153 if err != nil {
1154 t.Fatalf("Failed to handle parent event: %v", err)
1155 }
1156
1157 // Step 3: Verify parent was indexed with CORRECT reply_count
1158 parentComment, err := commentRepo.GetByURI(ctx, parentURI)
1159 if err != nil {
1160 t.Fatalf("Parent comment not indexed: %v", err)
1161 }
1162
1163 // THIS IS THE KEY TEST: Parent should have reply_count = 1 due to reconciliation
1164 if parentComment.ReplyCount != 1 {
1165 t.Errorf("Expected parent reply_count to be 1 (reconciled), got %d", parentComment.ReplyCount)
1166 t.Logf("This indicates out-of-order reconciliation failed!")
1167 }
1168
1169 // Verify via query as well
1170 count, err := commentRepo.CountByParent(ctx, parentURI)
1171 if err != nil {
1172 t.Fatalf("Failed to count parent replies: %v", err)
1173 }
1174 if count != 1 {
1175 t.Errorf("Expected 1 reply to parent, got %d", count)
1176 }
1177 })
1178
1179 t.Run("Multiple children arrive before parent", func(t *testing.T) {
1180 parentRkey := generateTID()
1181 parentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, parentRkey)
1182
1183 // Index 3 children before parent
1184 for i := 1; i <= 3; i++ {
1185 childRkey := generateTID()
1186 childEvent := &jetstream.JetstreamEvent{
1187 Did: testUser.DID,
1188 Kind: "commit",
1189 Commit: &jetstream.CommitEvent{
1190 Rev: fmt.Sprintf("child-%d-rev", i),
1191 Operation: "create",
1192 Collection: "social.coves.feed.comment",
1193 RKey: childRkey,
1194 CID: fmt.Sprintf("bafychild%d", i),
1195 Record: map[string]interface{}{
1196 "$type": "social.coves.feed.comment",
1197 "content": fmt.Sprintf("Reply %d before parent", i),
1198 "reply": map[string]interface{}{
1199 "root": map[string]interface{}{
1200 "uri": postURI,
1201 "cid": "bafypost",
1202 },
1203 "parent": map[string]interface{}{
1204 "uri": parentURI,
1205 "cid": "bafyparent2",
1206 },
1207 },
1208 "createdAt": time.Now().Format(time.RFC3339),
1209 },
1210 },
1211 }
1212
1213 err := consumer.HandleEvent(ctx, childEvent)
1214 if err != nil {
1215 t.Fatalf("Failed to handle child %d event: %v", i, err)
1216 }
1217 }
1218
1219 // Now index parent
1220 parentEvent := &jetstream.JetstreamEvent{
1221 Did: testUser.DID,
1222 Kind: "commit",
1223 Commit: &jetstream.CommitEvent{
1224 Rev: "parent2-rev",
1225 Operation: "create",
1226 Collection: "social.coves.feed.comment",
1227 RKey: parentRkey,
1228 CID: "bafyparent2",
1229 Record: map[string]interface{}{
1230 "$type": "social.coves.feed.comment",
1231 "content": "Parent with 3 pre-existing children",
1232 "reply": map[string]interface{}{
1233 "root": map[string]interface{}{
1234 "uri": postURI,
1235 "cid": "bafypost",
1236 },
1237 "parent": map[string]interface{}{
1238 "uri": postURI,
1239 "cid": "bafypost",
1240 },
1241 },
1242 "createdAt": time.Now().Format(time.RFC3339),
1243 },
1244 },
1245 }
1246
1247 err := consumer.HandleEvent(ctx, parentEvent)
1248 if err != nil {
1249 t.Fatalf("Failed to handle parent event: %v", err)
1250 }
1251
1252 // Verify parent has reply_count = 3
1253 parentComment, err := commentRepo.GetByURI(ctx, parentURI)
1254 if err != nil {
1255 t.Fatalf("Parent comment not indexed: %v", err)
1256 }
1257
1258 if parentComment.ReplyCount != 3 {
1259 t.Errorf("Expected parent reply_count to be 3 (reconciled), got %d", parentComment.ReplyCount)
1260 }
1261 })
1262}
1263
1264// TestCommentConsumer_Resurrection tests that soft-deleted comments can be recreated
1265// In atProto, deleted records' rkeys become available for reuse
1266func TestCommentConsumer_Resurrection(t *testing.T) {
1267 db := setupTestDB(t)
1268 defer func() {
1269 if err := db.Close(); err != nil {
1270 t.Logf("Failed to close database: %v", err)
1271 }
1272 }()
1273
1274 ctx := context.Background()
1275 commentRepo := postgres.NewCommentRepository(db)
1276 consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
1277
1278 testUser := createTestUser(t, db, "resurrect.test", "did:plc:resurrect123")
1279 testCommunity, err := createFeedTestCommunity(db, ctx, "resurrect-community", "owner8.test")
1280 if err != nil {
1281 t.Fatalf("Failed to create test community: %v", err)
1282 }
1283 postURI := createTestPost(t, db, testCommunity, testUser.DID, "Resurrection Test", 0, time.Now())
1284
1285 rkey := generateTID()
1286 commentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey)
1287
1288 t.Run("Recreate deleted comment with same rkey", func(t *testing.T) {
1289 // Step 1: Create initial comment
1290 createEvent := &jetstream.JetstreamEvent{
1291 Did: testUser.DID,
1292 Kind: "commit",
1293 Commit: &jetstream.CommitEvent{
1294 Rev: "v1",
1295 Operation: "create",
1296 Collection: "social.coves.feed.comment",
1297 RKey: rkey,
1298 CID: "bafyoriginal",
1299 Record: map[string]interface{}{
1300 "$type": "social.coves.feed.comment",
1301 "content": "Original comment content",
1302 "reply": map[string]interface{}{
1303 "root": map[string]interface{}{
1304 "uri": postURI,
1305 "cid": "bafypost",
1306 },
1307 "parent": map[string]interface{}{
1308 "uri": postURI,
1309 "cid": "bafypost",
1310 },
1311 },
1312 "createdAt": time.Now().Format(time.RFC3339),
1313 },
1314 },
1315 }
1316
1317 err := consumer.HandleEvent(ctx, createEvent)
1318 if err != nil {
1319 t.Fatalf("Failed to create initial comment: %v", err)
1320 }
1321
1322 // Verify comment exists
1323 comment, err := commentRepo.GetByURI(ctx, commentURI)
1324 if err != nil {
1325 t.Fatalf("Comment not found after creation: %v", err)
1326 }
1327 if comment.Content != "Original comment content" {
1328 t.Errorf("Expected content 'Original comment content', got '%s'", comment.Content)
1329 }
1330 if comment.DeletedAt != nil {
1331 t.Errorf("Expected deleted_at to be nil, got %v", comment.DeletedAt)
1332 }
1333
1334 // Step 2: Delete the comment
1335 deleteEvent := &jetstream.JetstreamEvent{
1336 Did: testUser.DID,
1337 Kind: "commit",
1338 Commit: &jetstream.CommitEvent{
1339 Rev: "v2",
1340 Operation: "delete",
1341 Collection: "social.coves.feed.comment",
1342 RKey: rkey,
1343 },
1344 }
1345
1346 err = consumer.HandleEvent(ctx, deleteEvent)
1347 if err != nil {
1348 t.Fatalf("Failed to delete comment: %v", err)
1349 }
1350
1351 // Verify comment is soft-deleted
1352 comment, err = commentRepo.GetByURI(ctx, commentURI)
1353 if err != nil {
1354 t.Fatalf("Comment not found after deletion: %v", err)
1355 }
1356 if comment.DeletedAt == nil {
1357 t.Error("Expected deleted_at to be set, got nil")
1358 }
1359
1360 // Step 3: Recreate comment with same rkey (resurrection)
1361 // In atProto, this is a valid operation - user can reuse the rkey
1362 recreateEvent := &jetstream.JetstreamEvent{
1363 Did: testUser.DID,
1364 Kind: "commit",
1365 Commit: &jetstream.CommitEvent{
1366 Rev: "v3",
1367 Operation: "create",
1368 Collection: "social.coves.feed.comment",
1369 RKey: rkey, // Same rkey!
1370 CID: "bafyresurrected",
1371 Record: map[string]interface{}{
1372 "$type": "social.coves.feed.comment",
1373 "content": "Resurrected comment with new content",
1374 "reply": map[string]interface{}{
1375 "root": map[string]interface{}{
1376 "uri": postURI,
1377 "cid": "bafypost",
1378 },
1379 "parent": map[string]interface{}{
1380 "uri": postURI,
1381 "cid": "bafypost",
1382 },
1383 },
1384 "createdAt": time.Now().Format(time.RFC3339),
1385 },
1386 },
1387 }
1388
1389 err = consumer.HandleEvent(ctx, recreateEvent)
1390 if err != nil {
1391 t.Fatalf("Failed to resurrect comment: %v", err)
1392 }
1393
1394 // Step 4: Verify comment is resurrected with new content
1395 comment, err = commentRepo.GetByURI(ctx, commentURI)
1396 if err != nil {
1397 t.Fatalf("Comment not found after resurrection: %v", err)
1398 }
1399
1400 if comment.DeletedAt != nil {
1401 t.Errorf("Expected deleted_at to be NULL after resurrection, got %v", comment.DeletedAt)
1402 }
1403 if comment.Content != "Resurrected comment with new content" {
1404 t.Errorf("Expected resurrected content, got '%s'", comment.Content)
1405 }
1406 if comment.CID != "bafyresurrected" {
1407 t.Errorf("Expected CID 'bafyresurrected', got '%s'", comment.CID)
1408 }
1409
1410 // Verify parent count was restored (post should have comment_count = 1)
1411 var postCommentCount int
1412 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", postURI).Scan(&postCommentCount)
1413 if err != nil {
1414 t.Fatalf("Failed to check post comment count: %v", err)
1415 }
1416 if postCommentCount != 1 {
1417 t.Errorf("Expected post comment_count to be 1 after resurrection, got %d", postCommentCount)
1418 }
1419 })
1420
1421 t.Run("Recreate deleted comment with DIFFERENT parent", func(t *testing.T) {
1422 // Create two posts
1423 post1URI := createTestPost(t, db, testCommunity, testUser.DID, "Post 1", 0, time.Now())
1424 post2URI := createTestPost(t, db, testCommunity, testUser.DID, "Post 2", 0, time.Now())
1425
1426 rkey2 := generateTID()
1427 commentURI2 := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey2)
1428
1429 // Step 1: Create comment on Post 1
1430 createEvent := &jetstream.JetstreamEvent{
1431 Did: testUser.DID,
1432 Kind: "commit",
1433 Commit: &jetstream.CommitEvent{
1434 Rev: "v1",
1435 Operation: "create",
1436 Collection: "social.coves.feed.comment",
1437 RKey: rkey2,
1438 CID: "bafyv1",
1439 Record: map[string]interface{}{
1440 "$type": "social.coves.feed.comment",
1441 "content": "Original on Post 1",
1442 "reply": map[string]interface{}{
1443 "root": map[string]interface{}{
1444 "uri": post1URI,
1445 "cid": "bafypost1",
1446 },
1447 "parent": map[string]interface{}{
1448 "uri": post1URI,
1449 "cid": "bafypost1",
1450 },
1451 },
1452 "createdAt": time.Now().Format(time.RFC3339),
1453 },
1454 },
1455 }
1456
1457 err := consumer.HandleEvent(ctx, createEvent)
1458 if err != nil {
1459 t.Fatalf("Failed to create comment on Post 1: %v", err)
1460 }
1461
1462 // Verify Post 1 has comment_count = 1
1463 var post1Count int
1464 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post1URI).Scan(&post1Count)
1465 if err != nil {
1466 t.Fatalf("Failed to check post 1 count: %v", err)
1467 }
1468 if post1Count != 1 {
1469 t.Errorf("Expected Post 1 comment_count = 1, got %d", post1Count)
1470 }
1471
1472 // Step 2: Delete comment
1473 deleteEvent := &jetstream.JetstreamEvent{
1474 Did: testUser.DID,
1475 Kind: "commit",
1476 Commit: &jetstream.CommitEvent{
1477 Rev: "v2",
1478 Operation: "delete",
1479 Collection: "social.coves.feed.comment",
1480 RKey: rkey2,
1481 },
1482 }
1483
1484 err = consumer.HandleEvent(ctx, deleteEvent)
1485 if err != nil {
1486 t.Fatalf("Failed to delete comment: %v", err)
1487 }
1488
1489 // Verify Post 1 count decremented to 0
1490 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post1URI).Scan(&post1Count)
1491 if err != nil {
1492 t.Fatalf("Failed to check post 1 count after delete: %v", err)
1493 }
1494 if post1Count != 0 {
1495 t.Errorf("Expected Post 1 comment_count = 0 after delete, got %d", post1Count)
1496 }
1497
1498 // Step 3: Recreate comment with same rkey but on Post 2 (different parent!)
1499 recreateEvent := &jetstream.JetstreamEvent{
1500 Did: testUser.DID,
1501 Kind: "commit",
1502 Commit: &jetstream.CommitEvent{
1503 Rev: "v3",
1504 Operation: "create",
1505 Collection: "social.coves.feed.comment",
1506 RKey: rkey2, // Same rkey!
1507 CID: "bafyv3",
1508 Record: map[string]interface{}{
1509 "$type": "social.coves.feed.comment",
1510 "content": "New comment on Post 2",
1511 "reply": map[string]interface{}{
1512 "root": map[string]interface{}{
1513 "uri": post2URI, // Different root!
1514 "cid": "bafypost2",
1515 },
1516 "parent": map[string]interface{}{
1517 "uri": post2URI, // Different parent!
1518 "cid": "bafypost2",
1519 },
1520 },
1521 "createdAt": time.Now().Format(time.RFC3339),
1522 },
1523 },
1524 }
1525
1526 err = consumer.HandleEvent(ctx, recreateEvent)
1527 if err != nil {
1528 t.Fatalf("Failed to resurrect comment on Post 2: %v", err)
1529 }
1530
1531 // Step 4: Verify threading references updated correctly
1532 comment, err := commentRepo.GetByURI(ctx, commentURI2)
1533 if err != nil {
1534 t.Fatalf("Failed to get resurrected comment: %v", err)
1535 }
1536
1537 // THIS IS THE CRITICAL TEST: Threading refs must point to Post 2, not Post 1
1538 if comment.ParentURI != post2URI {
1539 t.Errorf("Expected parent URI to be %s (Post 2), got %s (STALE!)", post2URI, comment.ParentURI)
1540 }
1541 if comment.RootURI != post2URI {
1542 t.Errorf("Expected root URI to be %s (Post 2), got %s (STALE!)", post2URI, comment.RootURI)
1543 }
1544 if comment.ParentCID != "bafypost2" {
1545 t.Errorf("Expected parent CID 'bafypost2', got '%s'", comment.ParentCID)
1546 }
1547
1548 // Verify counts are correct
1549 var post2Count int
1550 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post2URI).Scan(&post2Count)
1551 if err != nil {
1552 t.Fatalf("Failed to check post 2 count: %v", err)
1553 }
1554 if post2Count != 1 {
1555 t.Errorf("Expected Post 2 comment_count = 1, got %d", post2Count)
1556 }
1557
1558 // Verify Post 1 count still 0 (not incremented by resurrection on Post 2)
1559 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post1URI).Scan(&post1Count)
1560 if err != nil {
1561 t.Fatalf("Failed to check post 1 count after resurrection: %v", err)
1562 }
1563 if post1Count != 0 {
1564 t.Errorf("Expected Post 1 comment_count = 0 (unchanged), got %d", post1Count)
1565 }
1566 })
1567}
1568
1569// TestCommentConsumer_ThreadingImmutability tests that UPDATE events cannot change threading refs
1570func TestCommentConsumer_ThreadingImmutability(t *testing.T) {
1571 db := setupTestDB(t)
1572 defer func() {
1573 if err := db.Close(); err != nil {
1574 t.Logf("Failed to close database: %v", err)
1575 }
1576 }()
1577
1578 ctx := context.Background()
1579 commentRepo := postgres.NewCommentRepository(db)
1580 consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
1581
1582 testUser := createTestUser(t, db, "immutable.test", "did:plc:immutable123")
1583 testCommunity, err := createFeedTestCommunity(db, ctx, "immutable-community", "owner9.test")
1584 if err != nil {
1585 t.Fatalf("Failed to create test community: %v", err)
1586 }
1587 postURI1 := createTestPost(t, db, testCommunity, testUser.DID, "Post 1", 0, time.Now())
1588 postURI2 := createTestPost(t, db, testCommunity, testUser.DID, "Post 2", 0, time.Now())
1589
1590 rkey := generateTID()
1591 commentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey)
1592
1593 t.Run("Reject UPDATE that changes parent URI", func(t *testing.T) {
1594 // Create comment on Post 1
1595 createEvent := &jetstream.JetstreamEvent{
1596 Did: testUser.DID,
1597 Kind: "commit",
1598 Commit: &jetstream.CommitEvent{
1599 Rev: "v1",
1600 Operation: "create",
1601 Collection: "social.coves.feed.comment",
1602 RKey: rkey,
1603 CID: "bafycomment1",
1604 Record: map[string]interface{}{
1605 "$type": "social.coves.feed.comment",
1606 "content": "Comment on Post 1",
1607 "reply": map[string]interface{}{
1608 "root": map[string]interface{}{
1609 "uri": postURI1,
1610 "cid": "bafypost1",
1611 },
1612 "parent": map[string]interface{}{
1613 "uri": postURI1,
1614 "cid": "bafypost1",
1615 },
1616 },
1617 "createdAt": time.Now().Format(time.RFC3339),
1618 },
1619 },
1620 }
1621
1622 err := consumer.HandleEvent(ctx, createEvent)
1623 if err != nil {
1624 t.Fatalf("Failed to create comment: %v", err)
1625 }
1626
1627 // Attempt to update comment to move it to Post 2 (should fail)
1628 updateEvent := &jetstream.JetstreamEvent{
1629 Did: testUser.DID,
1630 Kind: "commit",
1631 Commit: &jetstream.CommitEvent{
1632 Rev: "v2",
1633 Operation: "update",
1634 Collection: "social.coves.feed.comment",
1635 RKey: rkey,
1636 CID: "bafycomment2",
1637 Record: map[string]interface{}{
1638 "$type": "social.coves.feed.comment",
1639 "content": "Trying to hijack this comment to Post 2",
1640 "reply": map[string]interface{}{
1641 "root": map[string]interface{}{
1642 "uri": postURI2, // Changed!
1643 "cid": "bafypost2",
1644 },
1645 "parent": map[string]interface{}{
1646 "uri": postURI2, // Changed!
1647 "cid": "bafypost2",
1648 },
1649 },
1650 "createdAt": time.Now().Format(time.RFC3339),
1651 },
1652 },
1653 }
1654
1655 err = consumer.HandleEvent(ctx, updateEvent)
1656 if err == nil {
1657 t.Error("Expected error when changing threading references, got nil")
1658 }
1659 if err != nil && !contains(err.Error(), "threading references cannot be changed") {
1660 t.Errorf("Expected 'threading references cannot be changed' error, got: %v", err)
1661 }
1662
1663 // Verify comment still points to Post 1
1664 comment, err := commentRepo.GetByURI(ctx, commentURI)
1665 if err != nil {
1666 t.Fatalf("Failed to get comment: %v", err)
1667 }
1668 if comment.ParentURI != postURI1 {
1669 t.Errorf("Expected parent URI to remain %s, got %s", postURI1, comment.ParentURI)
1670 }
1671 if comment.RootURI != postURI1 {
1672 t.Errorf("Expected root URI to remain %s, got %s", postURI1, comment.RootURI)
1673 }
1674 // Content should NOT have been updated since the operation was rejected
1675 if comment.Content != "Comment on Post 1" {
1676 t.Errorf("Expected original content, got '%s'", comment.Content)
1677 }
1678 })
1679
1680 t.Run("Allow UPDATE that only changes content (threading unchanged)", func(t *testing.T) {
1681 rkey2 := generateTID()
1682 commentURI2 := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey2)
1683
1684 // Create comment
1685 createEvent := &jetstream.JetstreamEvent{
1686 Did: testUser.DID,
1687 Kind: "commit",
1688 Commit: &jetstream.CommitEvent{
1689 Rev: "v1",
1690 Operation: "create",
1691 Collection: "social.coves.feed.comment",
1692 RKey: rkey2,
1693 CID: "bafycomment3",
1694 Record: map[string]interface{}{
1695 "$type": "social.coves.feed.comment",
1696 "content": "Original content",
1697 "reply": map[string]interface{}{
1698 "root": map[string]interface{}{
1699 "uri": postURI1,
1700 "cid": "bafypost1",
1701 },
1702 "parent": map[string]interface{}{
1703 "uri": postURI1,
1704 "cid": "bafypost1",
1705 },
1706 },
1707 "createdAt": time.Now().Format(time.RFC3339),
1708 },
1709 },
1710 }
1711
1712 err := consumer.HandleEvent(ctx, createEvent)
1713 if err != nil {
1714 t.Fatalf("Failed to create comment: %v", err)
1715 }
1716
1717 // Update content only (threading unchanged - should succeed)
1718 updateEvent := &jetstream.JetstreamEvent{
1719 Did: testUser.DID,
1720 Kind: "commit",
1721 Commit: &jetstream.CommitEvent{
1722 Rev: "v2",
1723 Operation: "update",
1724 Collection: "social.coves.feed.comment",
1725 RKey: rkey2,
1726 CID: "bafycomment4",
1727 Record: map[string]interface{}{
1728 "$type": "social.coves.feed.comment",
1729 "content": "Updated content",
1730 "reply": map[string]interface{}{
1731 "root": map[string]interface{}{
1732 "uri": postURI1, // Same
1733 "cid": "bafypost1",
1734 },
1735 "parent": map[string]interface{}{
1736 "uri": postURI1, // Same
1737 "cid": "bafypost1",
1738 },
1739 },
1740 "createdAt": time.Now().Format(time.RFC3339),
1741 },
1742 },
1743 }
1744
1745 err = consumer.HandleEvent(ctx, updateEvent)
1746 if err != nil {
1747 t.Fatalf("Expected update to succeed when threading unchanged, got error: %v", err)
1748 }
1749
1750 // Verify content was updated
1751 comment, err := commentRepo.GetByURI(ctx, commentURI2)
1752 if err != nil {
1753 t.Fatalf("Failed to get comment: %v", err)
1754 }
1755 if comment.Content != "Updated content" {
1756 t.Errorf("Expected updated content, got '%s'", comment.Content)
1757 }
1758 // Threading should remain unchanged
1759 if comment.ParentURI != postURI1 {
1760 t.Errorf("Expected parent URI %s, got %s", postURI1, comment.ParentURI)
1761 }
1762 })
1763}