A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "context"
5 "fmt"
6 "testing"
7 "time"
8
9 "Coves/internal/atproto/jetstream"
10 "Coves/internal/core/users"
11 "Coves/internal/db/postgres"
12)
13
14// TestPostConsumer_CommentCountReconciliation tests that post comment_count
15// is correctly reconciled when comments arrive before the parent post.
16//
17// This addresses the issue identified in comment_consumer.go:362 where the FIXME
18// comment suggests reconciliation is not implemented. This test verifies that
19// the reconciliation logic in post_consumer.go:210-226 works correctly.
20func TestPostConsumer_CommentCountReconciliation(t *testing.T) {
21 db := setupTestDB(t)
22 defer func() {
23 if err := db.Close(); err != nil {
24 t.Logf("Failed to close database: %v", err)
25 }
26 }()
27
28 ctx := context.Background()
29
30 // Set up repositories and consumers
31 postRepo := postgres.NewPostRepository(db)
32 commentRepo := postgres.NewCommentRepository(db)
33 communityRepo := postgres.NewCommunityRepository(db)
34 userRepo := postgres.NewUserRepository(db)
35 userService := users.NewUserService(userRepo, nil, getTestPDSURL())
36
37 commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db)
38 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
39
40 // Setup test data
41 testUser := createTestUser(t, db, "reconcile.test", "did:plc:reconcile123")
42 testCommunity, err := createFeedTestCommunity(db, ctx, "reconcile-community", "owner.test")
43 if err != nil {
44 t.Fatalf("Failed to create test community: %v", err)
45 }
46
47 t.Run("Single comment arrives before post - count reconciled", func(t *testing.T) {
48 // Scenario: User creates a post
49 // Another user creates a comment on that post
50 // Due to Jetstream ordering, comment event arrives BEFORE post event
51 // When post is finally indexed, comment_count should be 1, not 0
52
53 postRkey := generateTID()
54 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey)
55
56 commentRkey := generateTID()
57 commentURI := fmt.Sprintf("at://%s/social.coves.community.comment/%s", testUser.DID, commentRkey)
58
59 // Step 1: Index comment FIRST (before parent post exists)
60 commentEvent := &jetstream.JetstreamEvent{
61 Did: testUser.DID,
62 Kind: "commit",
63 Commit: &jetstream.CommitEvent{
64 Rev: "comment-rev",
65 Operation: "create",
66 Collection: "social.coves.community.comment",
67 RKey: commentRkey,
68 CID: "bafycomment",
69 Record: map[string]interface{}{
70 "$type": "social.coves.community.comment",
71 "content": "Comment arriving before parent post!",
72 "reply": map[string]interface{}{
73 "root": map[string]interface{}{
74 "uri": postURI, // Points to post that doesn't exist yet
75 "cid": "bafypost",
76 },
77 "parent": map[string]interface{}{
78 "uri": postURI,
79 "cid": "bafypost",
80 },
81 },
82 "createdAt": time.Now().Format(time.RFC3339),
83 },
84 },
85 }
86
87 err := commentConsumer.HandleEvent(ctx, commentEvent)
88 if err != nil {
89 t.Fatalf("Failed to handle comment event: %v", err)
90 }
91
92 // Verify comment was indexed
93 comment, err := commentRepo.GetByURI(ctx, commentURI)
94 if err != nil {
95 t.Fatalf("Comment not indexed: %v", err)
96 }
97 if comment.ParentURI != postURI {
98 t.Errorf("Expected comment parent_uri %s, got %s", postURI, comment.ParentURI)
99 }
100
101 // Step 2: Now index post (arrives late due to Jetstream ordering)
102 postEvent := &jetstream.JetstreamEvent{
103 Did: testCommunity,
104 Kind: "commit",
105 Commit: &jetstream.CommitEvent{
106 Rev: "post-rev",
107 Operation: "create",
108 Collection: "social.coves.community.post",
109 RKey: postRkey,
110 CID: "bafypost",
111 Record: map[string]interface{}{
112 "$type": "social.coves.community.post",
113 "community": testCommunity,
114 "author": testUser.DID,
115 "title": "Post arriving after comment",
116 "content": "This post's comment arrived first!",
117 "createdAt": time.Now().Format(time.RFC3339),
118 },
119 },
120 }
121
122 err = postConsumer.HandleEvent(ctx, postEvent)
123 if err != nil {
124 t.Fatalf("Failed to handle post event: %v", err)
125 }
126
127 // Step 3: Verify post was indexed with CORRECT comment_count
128 post, err := postRepo.GetByURI(ctx, postURI)
129 if err != nil {
130 t.Fatalf("Post not indexed: %v", err)
131 }
132
133 // THIS IS THE KEY TEST: Post should have comment_count = 1 due to reconciliation
134 if post.CommentCount != 1 {
135 t.Errorf("Expected post comment_count to be 1 (reconciled), got %d", post.CommentCount)
136 t.Logf("This indicates the reconciliation logic in post_consumer.go is not working!")
137 t.Logf("The FIXME comment at comment_consumer.go:362 may still be valid.")
138 }
139
140 // Verify via direct query as well
141 var dbCommentCount int
142 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", postURI).Scan(&dbCommentCount)
143 if err != nil {
144 t.Fatalf("Failed to query post comment_count: %v", err)
145 }
146 if dbCommentCount != 1 {
147 t.Errorf("Expected DB comment_count to be 1, got %d", dbCommentCount)
148 }
149 })
150
151 t.Run("Multiple comments arrive before post - count reconciled to correct total", func(t *testing.T) {
152 postRkey := generateTID()
153 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey)
154
155 // Step 1: Index 3 comments BEFORE the post exists
156 for i := 1; i <= 3; i++ {
157 commentRkey := generateTID()
158 commentEvent := &jetstream.JetstreamEvent{
159 Did: testUser.DID,
160 Kind: "commit",
161 Commit: &jetstream.CommitEvent{
162 Rev: fmt.Sprintf("comment-%d-rev", i),
163 Operation: "create",
164 Collection: "social.coves.community.comment",
165 RKey: commentRkey,
166 CID: fmt.Sprintf("bafycomment%d", i),
167 Record: map[string]interface{}{
168 "$type": "social.coves.community.comment",
169 "content": fmt.Sprintf("Comment %d before post", i),
170 "reply": map[string]interface{}{
171 "root": map[string]interface{}{
172 "uri": postURI,
173 "cid": "bafypost2",
174 },
175 "parent": map[string]interface{}{
176 "uri": postURI,
177 "cid": "bafypost2",
178 },
179 },
180 "createdAt": time.Now().Format(time.RFC3339),
181 },
182 },
183 }
184
185 err := commentConsumer.HandleEvent(ctx, commentEvent)
186 if err != nil {
187 t.Fatalf("Failed to handle comment %d event: %v", i, err)
188 }
189 }
190
191 // Step 2: Now index the post
192 postEvent := &jetstream.JetstreamEvent{
193 Did: testCommunity,
194 Kind: "commit",
195 Commit: &jetstream.CommitEvent{
196 Rev: "post2-rev",
197 Operation: "create",
198 Collection: "social.coves.community.post",
199 RKey: postRkey,
200 CID: "bafypost2",
201 Record: map[string]interface{}{
202 "$type": "social.coves.community.post",
203 "community": testCommunity,
204 "author": testUser.DID,
205 "title": "Post with 3 pre-existing comments",
206 "content": "All 3 comments arrived before this post!",
207 "createdAt": time.Now().Format(time.RFC3339),
208 },
209 },
210 }
211
212 err := postConsumer.HandleEvent(ctx, postEvent)
213 if err != nil {
214 t.Fatalf("Failed to handle post event: %v", err)
215 }
216
217 // Step 3: Verify post has comment_count = 3
218 post, err := postRepo.GetByURI(ctx, postURI)
219 if err != nil {
220 t.Fatalf("Post not indexed: %v", err)
221 }
222
223 if post.CommentCount != 3 {
224 t.Errorf("Expected post comment_count to be 3 (reconciled), got %d", post.CommentCount)
225 }
226 })
227
228 t.Run("Comments before and after post - count remains accurate", func(t *testing.T) {
229 postRkey := generateTID()
230 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey)
231
232 // Step 1: Index 2 comments BEFORE post
233 for i := 1; i <= 2; i++ {
234 commentRkey := generateTID()
235 commentEvent := &jetstream.JetstreamEvent{
236 Did: testUser.DID,
237 Kind: "commit",
238 Commit: &jetstream.CommitEvent{
239 Rev: fmt.Sprintf("before-%d-rev", i),
240 Operation: "create",
241 Collection: "social.coves.community.comment",
242 RKey: commentRkey,
243 CID: fmt.Sprintf("bafybefore%d", i),
244 Record: map[string]interface{}{
245 "$type": "social.coves.community.comment",
246 "content": fmt.Sprintf("Before comment %d", i),
247 "reply": map[string]interface{}{
248 "root": map[string]interface{}{
249 "uri": postURI,
250 "cid": "bafypost3",
251 },
252 "parent": map[string]interface{}{
253 "uri": postURI,
254 "cid": "bafypost3",
255 },
256 },
257 "createdAt": time.Now().Format(time.RFC3339),
258 },
259 },
260 }
261
262 err := commentConsumer.HandleEvent(ctx, commentEvent)
263 if err != nil {
264 t.Fatalf("Failed to handle before-comment %d: %v", i, err)
265 }
266 }
267
268 // Step 2: Index the post (should reconcile to 2)
269 postEvent := &jetstream.JetstreamEvent{
270 Did: testCommunity,
271 Kind: "commit",
272 Commit: &jetstream.CommitEvent{
273 Rev: "post3-rev",
274 Operation: "create",
275 Collection: "social.coves.community.post",
276 RKey: postRkey,
277 CID: "bafypost3",
278 Record: map[string]interface{}{
279 "$type": "social.coves.community.post",
280 "community": testCommunity,
281 "author": testUser.DID,
282 "title": "Post with before and after comments",
283 "content": "Testing mixed ordering",
284 "createdAt": time.Now().Format(time.RFC3339),
285 },
286 },
287 }
288
289 err := postConsumer.HandleEvent(ctx, postEvent)
290 if err != nil {
291 t.Fatalf("Failed to handle post event: %v", err)
292 }
293
294 // Verify count is 2
295 post, err := postRepo.GetByURI(ctx, postURI)
296 if err != nil {
297 t.Fatalf("Post not indexed: %v", err)
298 }
299 if post.CommentCount != 2 {
300 t.Errorf("Expected comment_count=2 after reconciliation, got %d", post.CommentCount)
301 }
302
303 // Step 3: Add 1 more comment AFTER post exists
304 commentRkey := generateTID()
305 afterCommentEvent := &jetstream.JetstreamEvent{
306 Did: testUser.DID,
307 Kind: "commit",
308 Commit: &jetstream.CommitEvent{
309 Rev: "after-rev",
310 Operation: "create",
311 Collection: "social.coves.community.comment",
312 RKey: commentRkey,
313 CID: "bafyafter",
314 Record: map[string]interface{}{
315 "$type": "social.coves.community.comment",
316 "content": "Comment after post exists",
317 "reply": map[string]interface{}{
318 "root": map[string]interface{}{
319 "uri": postURI,
320 "cid": "bafypost3",
321 },
322 "parent": map[string]interface{}{
323 "uri": postURI,
324 "cid": "bafypost3",
325 },
326 },
327 "createdAt": time.Now().Format(time.RFC3339),
328 },
329 },
330 }
331
332 err = commentConsumer.HandleEvent(ctx, afterCommentEvent)
333 if err != nil {
334 t.Fatalf("Failed to handle after-comment: %v", err)
335 }
336
337 // Verify count incremented to 3
338 post, err = postRepo.GetByURI(ctx, postURI)
339 if err != nil {
340 t.Fatalf("Failed to get post after increment: %v", err)
341 }
342 if post.CommentCount != 3 {
343 t.Errorf("Expected comment_count=3 after increment, got %d", post.CommentCount)
344 }
345 })
346
347 t.Run("Idempotent post indexing preserves comment_count", func(t *testing.T) {
348 postRkey := generateTID()
349 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey)
350
351 // Create comment first
352 commentRkey := generateTID()
353 commentEvent := &jetstream.JetstreamEvent{
354 Did: testUser.DID,
355 Kind: "commit",
356 Commit: &jetstream.CommitEvent{
357 Rev: "idem-comment-rev",
358 Operation: "create",
359 Collection: "social.coves.community.comment",
360 RKey: commentRkey,
361 CID: "bafyidemcomment",
362 Record: map[string]interface{}{
363 "$type": "social.coves.community.comment",
364 "content": "Comment for idempotent test",
365 "reply": map[string]interface{}{
366 "root": map[string]interface{}{
367 "uri": postURI,
368 "cid": "bafyidempost",
369 },
370 "parent": map[string]interface{}{
371 "uri": postURI,
372 "cid": "bafyidempost",
373 },
374 },
375 "createdAt": time.Now().Format(time.RFC3339),
376 },
377 },
378 }
379
380 err := commentConsumer.HandleEvent(ctx, commentEvent)
381 if err != nil {
382 t.Fatalf("Failed to create comment: %v", err)
383 }
384
385 // Index post (should reconcile to 1)
386 postEvent := &jetstream.JetstreamEvent{
387 Did: testCommunity,
388 Kind: "commit",
389 Commit: &jetstream.CommitEvent{
390 Rev: "idem-post-rev",
391 Operation: "create",
392 Collection: "social.coves.community.post",
393 RKey: postRkey,
394 CID: "bafyidempost",
395 Record: map[string]interface{}{
396 "$type": "social.coves.community.post",
397 "community": testCommunity,
398 "author": testUser.DID,
399 "title": "Idempotent test post",
400 "content": "Testing idempotent indexing",
401 "createdAt": time.Now().Format(time.RFC3339),
402 },
403 },
404 }
405
406 err = postConsumer.HandleEvent(ctx, postEvent)
407 if err != nil {
408 t.Fatalf("Failed to index post first time: %v", err)
409 }
410
411 // Verify count is 1
412 post, err := postRepo.GetByURI(ctx, postURI)
413 if err != nil {
414 t.Fatalf("Failed to get post: %v", err)
415 }
416 if post.CommentCount != 1 {
417 t.Errorf("Expected comment_count=1 after first index, got %d", post.CommentCount)
418 }
419
420 // Replay same post event (idempotent - should skip)
421 err = postConsumer.HandleEvent(ctx, postEvent)
422 if err != nil {
423 t.Fatalf("Idempotent post event should not error: %v", err)
424 }
425
426 // Verify count still 1 (not reset to 0)
427 post, err = postRepo.GetByURI(ctx, postURI)
428 if err != nil {
429 t.Fatalf("Failed to get post after replay: %v", err)
430 }
431 if post.CommentCount != 1 {
432 t.Errorf("Expected comment_count=1 after replay (idempotent), got %d", post.CommentCount)
433 }
434 })
435}