A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "context" 5 "fmt" 6 "testing" 7 "time" 8 9 "Coves/internal/atproto/jetstream" 10 "Coves/internal/core/users" 11 "Coves/internal/db/postgres" 12) 13 14// TestPostConsumer_CommentCountReconciliation tests that post comment_count 15// is correctly reconciled when comments arrive before the parent post. 16// 17// This addresses the issue identified in comment_consumer.go:362 where the FIXME 18// comment suggests reconciliation is not implemented. This test verifies that 19// the reconciliation logic in post_consumer.go:210-226 works correctly. 20func TestPostConsumer_CommentCountReconciliation(t *testing.T) { 21 db := setupTestDB(t) 22 defer func() { 23 if err := db.Close(); err != nil { 24 t.Logf("Failed to close database: %v", err) 25 } 26 }() 27 28 ctx := context.Background() 29 30 // Set up repositories and consumers 31 postRepo := postgres.NewPostRepository(db) 32 commentRepo := postgres.NewCommentRepository(db) 33 communityRepo := postgres.NewCommunityRepository(db) 34 userRepo := postgres.NewUserRepository(db) 35 userService := users.NewUserService(userRepo, nil, getTestPDSURL()) 36 37 commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 38 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 39 40 // Setup test data 41 testUser := createTestUser(t, db, "reconcile.test", "did:plc:reconcile123") 42 testCommunity, err := createFeedTestCommunity(db, ctx, "reconcile-community", "owner.test") 43 if err != nil { 44 t.Fatalf("Failed to create test community: %v", err) 45 } 46 47 t.Run("Single comment arrives before post - count reconciled", func(t *testing.T) { 48 // Scenario: User creates a post 49 // Another user creates a comment on that post 50 // Due to Jetstream ordering, comment event arrives BEFORE post event 51 // When post is finally indexed, comment_count should be 1, not 0 52 53 postRkey := generateTID() 54 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey) 55 56 commentRkey := generateTID() 57 commentURI := fmt.Sprintf("at://%s/social.coves.community.comment/%s", testUser.DID, commentRkey) 58 59 // Step 1: Index comment FIRST (before parent post exists) 60 commentEvent := &jetstream.JetstreamEvent{ 61 Did: testUser.DID, 62 Kind: "commit", 63 Commit: &jetstream.CommitEvent{ 64 Rev: "comment-rev", 65 Operation: "create", 66 Collection: "social.coves.community.comment", 67 RKey: commentRkey, 68 CID: "bafycomment", 69 Record: map[string]interface{}{ 70 "$type": "social.coves.community.comment", 71 "content": "Comment arriving before parent post!", 72 "reply": map[string]interface{}{ 73 "root": map[string]interface{}{ 74 "uri": postURI, // Points to post that doesn't exist yet 75 "cid": "bafypost", 76 }, 77 "parent": map[string]interface{}{ 78 "uri": postURI, 79 "cid": "bafypost", 80 }, 81 }, 82 "createdAt": time.Now().Format(time.RFC3339), 83 }, 84 }, 85 } 86 87 err := commentConsumer.HandleEvent(ctx, commentEvent) 88 if err != nil { 89 t.Fatalf("Failed to handle comment event: %v", err) 90 } 91 92 // Verify comment was indexed 93 comment, err := commentRepo.GetByURI(ctx, commentURI) 94 if err != nil { 95 t.Fatalf("Comment not indexed: %v", err) 96 } 97 if comment.ParentURI != postURI { 98 t.Errorf("Expected comment parent_uri %s, got %s", postURI, comment.ParentURI) 99 } 100 101 // Step 2: Now index post (arrives late due to Jetstream ordering) 102 postEvent := &jetstream.JetstreamEvent{ 103 Did: testCommunity, 104 Kind: "commit", 105 Commit: &jetstream.CommitEvent{ 106 Rev: "post-rev", 107 Operation: "create", 108 Collection: "social.coves.community.post", 109 RKey: postRkey, 110 CID: "bafypost", 111 Record: map[string]interface{}{ 112 "$type": "social.coves.community.post", 113 "community": testCommunity, 114 "author": testUser.DID, 115 "title": "Post arriving after comment", 116 "content": "This post's comment arrived first!", 117 "createdAt": time.Now().Format(time.RFC3339), 118 }, 119 }, 120 } 121 122 err = postConsumer.HandleEvent(ctx, postEvent) 123 if err != nil { 124 t.Fatalf("Failed to handle post event: %v", err) 125 } 126 127 // Step 3: Verify post was indexed with CORRECT comment_count 128 post, err := postRepo.GetByURI(ctx, postURI) 129 if err != nil { 130 t.Fatalf("Post not indexed: %v", err) 131 } 132 133 // THIS IS THE KEY TEST: Post should have comment_count = 1 due to reconciliation 134 if post.CommentCount != 1 { 135 t.Errorf("Expected post comment_count to be 1 (reconciled), got %d", post.CommentCount) 136 t.Logf("This indicates the reconciliation logic in post_consumer.go is not working!") 137 t.Logf("The FIXME comment at comment_consumer.go:362 may still be valid.") 138 } 139 140 // Verify via direct query as well 141 var dbCommentCount int 142 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", postURI).Scan(&dbCommentCount) 143 if err != nil { 144 t.Fatalf("Failed to query post comment_count: %v", err) 145 } 146 if dbCommentCount != 1 { 147 t.Errorf("Expected DB comment_count to be 1, got %d", dbCommentCount) 148 } 149 }) 150 151 t.Run("Multiple comments arrive before post - count reconciled to correct total", func(t *testing.T) { 152 postRkey := generateTID() 153 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey) 154 155 // Step 1: Index 3 comments BEFORE the post exists 156 for i := 1; i <= 3; i++ { 157 commentRkey := generateTID() 158 commentEvent := &jetstream.JetstreamEvent{ 159 Did: testUser.DID, 160 Kind: "commit", 161 Commit: &jetstream.CommitEvent{ 162 Rev: fmt.Sprintf("comment-%d-rev", i), 163 Operation: "create", 164 Collection: "social.coves.community.comment", 165 RKey: commentRkey, 166 CID: fmt.Sprintf("bafycomment%d", i), 167 Record: map[string]interface{}{ 168 "$type": "social.coves.community.comment", 169 "content": fmt.Sprintf("Comment %d before post", i), 170 "reply": map[string]interface{}{ 171 "root": map[string]interface{}{ 172 "uri": postURI, 173 "cid": "bafypost2", 174 }, 175 "parent": map[string]interface{}{ 176 "uri": postURI, 177 "cid": "bafypost2", 178 }, 179 }, 180 "createdAt": time.Now().Format(time.RFC3339), 181 }, 182 }, 183 } 184 185 err := commentConsumer.HandleEvent(ctx, commentEvent) 186 if err != nil { 187 t.Fatalf("Failed to handle comment %d event: %v", i, err) 188 } 189 } 190 191 // Step 2: Now index the post 192 postEvent := &jetstream.JetstreamEvent{ 193 Did: testCommunity, 194 Kind: "commit", 195 Commit: &jetstream.CommitEvent{ 196 Rev: "post2-rev", 197 Operation: "create", 198 Collection: "social.coves.community.post", 199 RKey: postRkey, 200 CID: "bafypost2", 201 Record: map[string]interface{}{ 202 "$type": "social.coves.community.post", 203 "community": testCommunity, 204 "author": testUser.DID, 205 "title": "Post with 3 pre-existing comments", 206 "content": "All 3 comments arrived before this post!", 207 "createdAt": time.Now().Format(time.RFC3339), 208 }, 209 }, 210 } 211 212 err := postConsumer.HandleEvent(ctx, postEvent) 213 if err != nil { 214 t.Fatalf("Failed to handle post event: %v", err) 215 } 216 217 // Step 3: Verify post has comment_count = 3 218 post, err := postRepo.GetByURI(ctx, postURI) 219 if err != nil { 220 t.Fatalf("Post not indexed: %v", err) 221 } 222 223 if post.CommentCount != 3 { 224 t.Errorf("Expected post comment_count to be 3 (reconciled), got %d", post.CommentCount) 225 } 226 }) 227 228 t.Run("Comments before and after post - count remains accurate", func(t *testing.T) { 229 postRkey := generateTID() 230 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey) 231 232 // Step 1: Index 2 comments BEFORE post 233 for i := 1; i <= 2; i++ { 234 commentRkey := generateTID() 235 commentEvent := &jetstream.JetstreamEvent{ 236 Did: testUser.DID, 237 Kind: "commit", 238 Commit: &jetstream.CommitEvent{ 239 Rev: fmt.Sprintf("before-%d-rev", i), 240 Operation: "create", 241 Collection: "social.coves.community.comment", 242 RKey: commentRkey, 243 CID: fmt.Sprintf("bafybefore%d", i), 244 Record: map[string]interface{}{ 245 "$type": "social.coves.community.comment", 246 "content": fmt.Sprintf("Before comment %d", i), 247 "reply": map[string]interface{}{ 248 "root": map[string]interface{}{ 249 "uri": postURI, 250 "cid": "bafypost3", 251 }, 252 "parent": map[string]interface{}{ 253 "uri": postURI, 254 "cid": "bafypost3", 255 }, 256 }, 257 "createdAt": time.Now().Format(time.RFC3339), 258 }, 259 }, 260 } 261 262 err := commentConsumer.HandleEvent(ctx, commentEvent) 263 if err != nil { 264 t.Fatalf("Failed to handle before-comment %d: %v", i, err) 265 } 266 } 267 268 // Step 2: Index the post (should reconcile to 2) 269 postEvent := &jetstream.JetstreamEvent{ 270 Did: testCommunity, 271 Kind: "commit", 272 Commit: &jetstream.CommitEvent{ 273 Rev: "post3-rev", 274 Operation: "create", 275 Collection: "social.coves.community.post", 276 RKey: postRkey, 277 CID: "bafypost3", 278 Record: map[string]interface{}{ 279 "$type": "social.coves.community.post", 280 "community": testCommunity, 281 "author": testUser.DID, 282 "title": "Post with before and after comments", 283 "content": "Testing mixed ordering", 284 "createdAt": time.Now().Format(time.RFC3339), 285 }, 286 }, 287 } 288 289 err := postConsumer.HandleEvent(ctx, postEvent) 290 if err != nil { 291 t.Fatalf("Failed to handle post event: %v", err) 292 } 293 294 // Verify count is 2 295 post, err := postRepo.GetByURI(ctx, postURI) 296 if err != nil { 297 t.Fatalf("Post not indexed: %v", err) 298 } 299 if post.CommentCount != 2 { 300 t.Errorf("Expected comment_count=2 after reconciliation, got %d", post.CommentCount) 301 } 302 303 // Step 3: Add 1 more comment AFTER post exists 304 commentRkey := generateTID() 305 afterCommentEvent := &jetstream.JetstreamEvent{ 306 Did: testUser.DID, 307 Kind: "commit", 308 Commit: &jetstream.CommitEvent{ 309 Rev: "after-rev", 310 Operation: "create", 311 Collection: "social.coves.community.comment", 312 RKey: commentRkey, 313 CID: "bafyafter", 314 Record: map[string]interface{}{ 315 "$type": "social.coves.community.comment", 316 "content": "Comment after post exists", 317 "reply": map[string]interface{}{ 318 "root": map[string]interface{}{ 319 "uri": postURI, 320 "cid": "bafypost3", 321 }, 322 "parent": map[string]interface{}{ 323 "uri": postURI, 324 "cid": "bafypost3", 325 }, 326 }, 327 "createdAt": time.Now().Format(time.RFC3339), 328 }, 329 }, 330 } 331 332 err = commentConsumer.HandleEvent(ctx, afterCommentEvent) 333 if err != nil { 334 t.Fatalf("Failed to handle after-comment: %v", err) 335 } 336 337 // Verify count incremented to 3 338 post, err = postRepo.GetByURI(ctx, postURI) 339 if err != nil { 340 t.Fatalf("Failed to get post after increment: %v", err) 341 } 342 if post.CommentCount != 3 { 343 t.Errorf("Expected comment_count=3 after increment, got %d", post.CommentCount) 344 } 345 }) 346 347 t.Run("Idempotent post indexing preserves comment_count", func(t *testing.T) { 348 postRkey := generateTID() 349 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey) 350 351 // Create comment first 352 commentRkey := generateTID() 353 commentEvent := &jetstream.JetstreamEvent{ 354 Did: testUser.DID, 355 Kind: "commit", 356 Commit: &jetstream.CommitEvent{ 357 Rev: "idem-comment-rev", 358 Operation: "create", 359 Collection: "social.coves.community.comment", 360 RKey: commentRkey, 361 CID: "bafyidemcomment", 362 Record: map[string]interface{}{ 363 "$type": "social.coves.community.comment", 364 "content": "Comment for idempotent test", 365 "reply": map[string]interface{}{ 366 "root": map[string]interface{}{ 367 "uri": postURI, 368 "cid": "bafyidempost", 369 }, 370 "parent": map[string]interface{}{ 371 "uri": postURI, 372 "cid": "bafyidempost", 373 }, 374 }, 375 "createdAt": time.Now().Format(time.RFC3339), 376 }, 377 }, 378 } 379 380 err := commentConsumer.HandleEvent(ctx, commentEvent) 381 if err != nil { 382 t.Fatalf("Failed to create comment: %v", err) 383 } 384 385 // Index post (should reconcile to 1) 386 postEvent := &jetstream.JetstreamEvent{ 387 Did: testCommunity, 388 Kind: "commit", 389 Commit: &jetstream.CommitEvent{ 390 Rev: "idem-post-rev", 391 Operation: "create", 392 Collection: "social.coves.community.post", 393 RKey: postRkey, 394 CID: "bafyidempost", 395 Record: map[string]interface{}{ 396 "$type": "social.coves.community.post", 397 "community": testCommunity, 398 "author": testUser.DID, 399 "title": "Idempotent test post", 400 "content": "Testing idempotent indexing", 401 "createdAt": time.Now().Format(time.RFC3339), 402 }, 403 }, 404 } 405 406 err = postConsumer.HandleEvent(ctx, postEvent) 407 if err != nil { 408 t.Fatalf("Failed to index post first time: %v", err) 409 } 410 411 // Verify count is 1 412 post, err := postRepo.GetByURI(ctx, postURI) 413 if err != nil { 414 t.Fatalf("Failed to get post: %v", err) 415 } 416 if post.CommentCount != 1 { 417 t.Errorf("Expected comment_count=1 after first index, got %d", post.CommentCount) 418 } 419 420 // Replay same post event (idempotent - should skip) 421 err = postConsumer.HandleEvent(ctx, postEvent) 422 if err != nil { 423 t.Fatalf("Idempotent post event should not error: %v", err) 424 } 425 426 // Verify count still 1 (not reset to 0) 427 post, err = postRepo.GetByURI(ctx, postURI) 428 if err != nil { 429 t.Fatalf("Failed to get post after replay: %v", err) 430 } 431 if post.CommentCount != 1 { 432 t.Errorf("Expected comment_count=1 after replay (idempotent), got %d", post.CommentCount) 433 } 434 }) 435}