A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/jetstream"
5 "Coves/internal/core/users"
6 "Coves/internal/db/postgres"
7 "context"
8 "fmt"
9 "testing"
10 "time"
11)
12
13// TestPostConsumer_CommentCountReconciliation tests that post comment_count
14// is correctly reconciled when comments arrive before the parent post.
15//
16// This addresses the issue identified in comment_consumer.go:362 where the FIXME
17// comment suggests reconciliation is not implemented. This test verifies that
18// the reconciliation logic in post_consumer.go:210-226 works correctly.
19func TestPostConsumer_CommentCountReconciliation(t *testing.T) {
20 db := setupTestDB(t)
21 defer func() {
22 if err := db.Close(); err != nil {
23 t.Logf("Failed to close database: %v", err)
24 }
25 }()
26
27 ctx := context.Background()
28
29 // Set up repositories and consumers
30 postRepo := postgres.NewPostRepository(db)
31 commentRepo := postgres.NewCommentRepository(db)
32 communityRepo := postgres.NewCommunityRepository(db)
33 userRepo := postgres.NewUserRepository(db)
34 userService := users.NewUserService(userRepo, nil, getTestPDSURL())
35
36 commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db)
37 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
38
39 // Setup test data
40 testUser := createTestUser(t, db, "reconcile.test", "did:plc:reconcile123")
41 testCommunity, err := createFeedTestCommunity(db, ctx, "reconcile-community", "owner.test")
42 if err != nil {
43 t.Fatalf("Failed to create test community: %v", err)
44 }
45
46 t.Run("Single comment arrives before post - count reconciled", func(t *testing.T) {
47 // Scenario: User creates a post
48 // Another user creates a comment on that post
49 // Due to Jetstream ordering, comment event arrives BEFORE post event
50 // When post is finally indexed, comment_count should be 1, not 0
51
52 postRkey := generateTID()
53 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey)
54
55 commentRkey := generateTID()
56 commentURI := fmt.Sprintf("at://%s/social.coves.community.comment/%s", testUser.DID, commentRkey)
57
58 // Step 1: Index comment FIRST (before parent post exists)
59 commentEvent := &jetstream.JetstreamEvent{
60 Did: testUser.DID,
61 Kind: "commit",
62 Commit: &jetstream.CommitEvent{
63 Rev: "comment-rev",
64 Operation: "create",
65 Collection: "social.coves.community.comment",
66 RKey: commentRkey,
67 CID: "bafycomment",
68 Record: map[string]interface{}{
69 "$type": "social.coves.community.comment",
70 "content": "Comment arriving before parent post!",
71 "reply": map[string]interface{}{
72 "root": map[string]interface{}{
73 "uri": postURI, // Points to post that doesn't exist yet
74 "cid": "bafypost",
75 },
76 "parent": map[string]interface{}{
77 "uri": postURI,
78 "cid": "bafypost",
79 },
80 },
81 "createdAt": time.Now().Format(time.RFC3339),
82 },
83 },
84 }
85
86 err := commentConsumer.HandleEvent(ctx, commentEvent)
87 if err != nil {
88 t.Fatalf("Failed to handle comment event: %v", err)
89 }
90
91 // Verify comment was indexed
92 comment, err := commentRepo.GetByURI(ctx, commentURI)
93 if err != nil {
94 t.Fatalf("Comment not indexed: %v", err)
95 }
96 if comment.ParentURI != postURI {
97 t.Errorf("Expected comment parent_uri %s, got %s", postURI, comment.ParentURI)
98 }
99
100 // Step 2: Now index post (arrives late due to Jetstream ordering)
101 postEvent := &jetstream.JetstreamEvent{
102 Did: testCommunity,
103 Kind: "commit",
104 Commit: &jetstream.CommitEvent{
105 Rev: "post-rev",
106 Operation: "create",
107 Collection: "social.coves.community.post",
108 RKey: postRkey,
109 CID: "bafypost",
110 Record: map[string]interface{}{
111 "$type": "social.coves.community.post",
112 "community": testCommunity,
113 "author": testUser.DID,
114 "title": "Post arriving after comment",
115 "content": "This post's comment arrived first!",
116 "createdAt": time.Now().Format(time.RFC3339),
117 },
118 },
119 }
120
121 err = postConsumer.HandleEvent(ctx, postEvent)
122 if err != nil {
123 t.Fatalf("Failed to handle post event: %v", err)
124 }
125
126 // Step 3: Verify post was indexed with CORRECT comment_count
127 post, err := postRepo.GetByURI(ctx, postURI)
128 if err != nil {
129 t.Fatalf("Post not indexed: %v", err)
130 }
131
132 // THIS IS THE KEY TEST: Post should have comment_count = 1 due to reconciliation
133 if post.CommentCount != 1 {
134 t.Errorf("Expected post comment_count to be 1 (reconciled), got %d", post.CommentCount)
135 t.Logf("This indicates the reconciliation logic in post_consumer.go is not working!")
136 t.Logf("The FIXME comment at comment_consumer.go:362 may still be valid.")
137 }
138
139 // Verify via direct query as well
140 var dbCommentCount int
141 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", postURI).Scan(&dbCommentCount)
142 if err != nil {
143 t.Fatalf("Failed to query post comment_count: %v", err)
144 }
145 if dbCommentCount != 1 {
146 t.Errorf("Expected DB comment_count to be 1, got %d", dbCommentCount)
147 }
148 })
149
150 t.Run("Multiple comments arrive before post - count reconciled to correct total", func(t *testing.T) {
151 postRkey := generateTID()
152 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey)
153
154 // Step 1: Index 3 comments BEFORE the post exists
155 for i := 1; i <= 3; i++ {
156 commentRkey := generateTID()
157 commentEvent := &jetstream.JetstreamEvent{
158 Did: testUser.DID,
159 Kind: "commit",
160 Commit: &jetstream.CommitEvent{
161 Rev: fmt.Sprintf("comment-%d-rev", i),
162 Operation: "create",
163 Collection: "social.coves.community.comment",
164 RKey: commentRkey,
165 CID: fmt.Sprintf("bafycomment%d", i),
166 Record: map[string]interface{}{
167 "$type": "social.coves.community.comment",
168 "content": fmt.Sprintf("Comment %d before post", i),
169 "reply": map[string]interface{}{
170 "root": map[string]interface{}{
171 "uri": postURI,
172 "cid": "bafypost2",
173 },
174 "parent": map[string]interface{}{
175 "uri": postURI,
176 "cid": "bafypost2",
177 },
178 },
179 "createdAt": time.Now().Format(time.RFC3339),
180 },
181 },
182 }
183
184 err := commentConsumer.HandleEvent(ctx, commentEvent)
185 if err != nil {
186 t.Fatalf("Failed to handle comment %d event: %v", i, err)
187 }
188 }
189
190 // Step 2: Now index the post
191 postEvent := &jetstream.JetstreamEvent{
192 Did: testCommunity,
193 Kind: "commit",
194 Commit: &jetstream.CommitEvent{
195 Rev: "post2-rev",
196 Operation: "create",
197 Collection: "social.coves.community.post",
198 RKey: postRkey,
199 CID: "bafypost2",
200 Record: map[string]interface{}{
201 "$type": "social.coves.community.post",
202 "community": testCommunity,
203 "author": testUser.DID,
204 "title": "Post with 3 pre-existing comments",
205 "content": "All 3 comments arrived before this post!",
206 "createdAt": time.Now().Format(time.RFC3339),
207 },
208 },
209 }
210
211 err := postConsumer.HandleEvent(ctx, postEvent)
212 if err != nil {
213 t.Fatalf("Failed to handle post event: %v", err)
214 }
215
216 // Step 3: Verify post has comment_count = 3
217 post, err := postRepo.GetByURI(ctx, postURI)
218 if err != nil {
219 t.Fatalf("Post not indexed: %v", err)
220 }
221
222 if post.CommentCount != 3 {
223 t.Errorf("Expected post comment_count to be 3 (reconciled), got %d", post.CommentCount)
224 }
225 })
226
227 t.Run("Comments before and after post - count remains accurate", func(t *testing.T) {
228 postRkey := generateTID()
229 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey)
230
231 // Step 1: Index 2 comments BEFORE post
232 for i := 1; i <= 2; i++ {
233 commentRkey := generateTID()
234 commentEvent := &jetstream.JetstreamEvent{
235 Did: testUser.DID,
236 Kind: "commit",
237 Commit: &jetstream.CommitEvent{
238 Rev: fmt.Sprintf("before-%d-rev", i),
239 Operation: "create",
240 Collection: "social.coves.community.comment",
241 RKey: commentRkey,
242 CID: fmt.Sprintf("bafybefore%d", i),
243 Record: map[string]interface{}{
244 "$type": "social.coves.community.comment",
245 "content": fmt.Sprintf("Before comment %d", i),
246 "reply": map[string]interface{}{
247 "root": map[string]interface{}{
248 "uri": postURI,
249 "cid": "bafypost3",
250 },
251 "parent": map[string]interface{}{
252 "uri": postURI,
253 "cid": "bafypost3",
254 },
255 },
256 "createdAt": time.Now().Format(time.RFC3339),
257 },
258 },
259 }
260
261 err := commentConsumer.HandleEvent(ctx, commentEvent)
262 if err != nil {
263 t.Fatalf("Failed to handle before-comment %d: %v", i, err)
264 }
265 }
266
267 // Step 2: Index the post (should reconcile to 2)
268 postEvent := &jetstream.JetstreamEvent{
269 Did: testCommunity,
270 Kind: "commit",
271 Commit: &jetstream.CommitEvent{
272 Rev: "post3-rev",
273 Operation: "create",
274 Collection: "social.coves.community.post",
275 RKey: postRkey,
276 CID: "bafypost3",
277 Record: map[string]interface{}{
278 "$type": "social.coves.community.post",
279 "community": testCommunity,
280 "author": testUser.DID,
281 "title": "Post with before and after comments",
282 "content": "Testing mixed ordering",
283 "createdAt": time.Now().Format(time.RFC3339),
284 },
285 },
286 }
287
288 err := postConsumer.HandleEvent(ctx, postEvent)
289 if err != nil {
290 t.Fatalf("Failed to handle post event: %v", err)
291 }
292
293 // Verify count is 2
294 post, err := postRepo.GetByURI(ctx, postURI)
295 if err != nil {
296 t.Fatalf("Post not indexed: %v", err)
297 }
298 if post.CommentCount != 2 {
299 t.Errorf("Expected comment_count=2 after reconciliation, got %d", post.CommentCount)
300 }
301
302 // Step 3: Add 1 more comment AFTER post exists
303 commentRkey := generateTID()
304 afterCommentEvent := &jetstream.JetstreamEvent{
305 Did: testUser.DID,
306 Kind: "commit",
307 Commit: &jetstream.CommitEvent{
308 Rev: "after-rev",
309 Operation: "create",
310 Collection: "social.coves.community.comment",
311 RKey: commentRkey,
312 CID: "bafyafter",
313 Record: map[string]interface{}{
314 "$type": "social.coves.community.comment",
315 "content": "Comment after post exists",
316 "reply": map[string]interface{}{
317 "root": map[string]interface{}{
318 "uri": postURI,
319 "cid": "bafypost3",
320 },
321 "parent": map[string]interface{}{
322 "uri": postURI,
323 "cid": "bafypost3",
324 },
325 },
326 "createdAt": time.Now().Format(time.RFC3339),
327 },
328 },
329 }
330
331 err = commentConsumer.HandleEvent(ctx, afterCommentEvent)
332 if err != nil {
333 t.Fatalf("Failed to handle after-comment: %v", err)
334 }
335
336 // Verify count incremented to 3
337 post, err = postRepo.GetByURI(ctx, postURI)
338 if err != nil {
339 t.Fatalf("Failed to get post after increment: %v", err)
340 }
341 if post.CommentCount != 3 {
342 t.Errorf("Expected comment_count=3 after increment, got %d", post.CommentCount)
343 }
344 })
345
346 t.Run("Idempotent post indexing preserves comment_count", func(t *testing.T) {
347 postRkey := generateTID()
348 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey)
349
350 // Create comment first
351 commentRkey := generateTID()
352 commentEvent := &jetstream.JetstreamEvent{
353 Did: testUser.DID,
354 Kind: "commit",
355 Commit: &jetstream.CommitEvent{
356 Rev: "idem-comment-rev",
357 Operation: "create",
358 Collection: "social.coves.community.comment",
359 RKey: commentRkey,
360 CID: "bafyidemcomment",
361 Record: map[string]interface{}{
362 "$type": "social.coves.community.comment",
363 "content": "Comment for idempotent test",
364 "reply": map[string]interface{}{
365 "root": map[string]interface{}{
366 "uri": postURI,
367 "cid": "bafyidempost",
368 },
369 "parent": map[string]interface{}{
370 "uri": postURI,
371 "cid": "bafyidempost",
372 },
373 },
374 "createdAt": time.Now().Format(time.RFC3339),
375 },
376 },
377 }
378
379 err := commentConsumer.HandleEvent(ctx, commentEvent)
380 if err != nil {
381 t.Fatalf("Failed to create comment: %v", err)
382 }
383
384 // Index post (should reconcile to 1)
385 postEvent := &jetstream.JetstreamEvent{
386 Did: testCommunity,
387 Kind: "commit",
388 Commit: &jetstream.CommitEvent{
389 Rev: "idem-post-rev",
390 Operation: "create",
391 Collection: "social.coves.community.post",
392 RKey: postRkey,
393 CID: "bafyidempost",
394 Record: map[string]interface{}{
395 "$type": "social.coves.community.post",
396 "community": testCommunity,
397 "author": testUser.DID,
398 "title": "Idempotent test post",
399 "content": "Testing idempotent indexing",
400 "createdAt": time.Now().Format(time.RFC3339),
401 },
402 },
403 }
404
405 err = postConsumer.HandleEvent(ctx, postEvent)
406 if err != nil {
407 t.Fatalf("Failed to index post first time: %v", err)
408 }
409
410 // Verify count is 1
411 post, err := postRepo.GetByURI(ctx, postURI)
412 if err != nil {
413 t.Fatalf("Failed to get post: %v", err)
414 }
415 if post.CommentCount != 1 {
416 t.Errorf("Expected comment_count=1 after first index, got %d", post.CommentCount)
417 }
418
419 // Replay same post event (idempotent - should skip)
420 err = postConsumer.HandleEvent(ctx, postEvent)
421 if err != nil {
422 t.Fatalf("Idempotent post event should not error: %v", err)
423 }
424
425 // Verify count still 1 (not reset to 0)
426 post, err = postRepo.GetByURI(ctx, postURI)
427 if err != nil {
428 t.Fatalf("Failed to get post after replay: %v", err)
429 }
430 if post.CommentCount != 1 {
431 t.Errorf("Expected comment_count=1 after replay (idempotent), got %d", post.CommentCount)
432 }
433 })
434}