A community based topic aggregation platform built on atproto
at main 14 kB view raw
1package integration 2 3import ( 4 "Coves/internal/atproto/jetstream" 5 "Coves/internal/core/users" 6 "Coves/internal/db/postgres" 7 "context" 8 "fmt" 9 "testing" 10 "time" 11) 12 13// TestPostConsumer_CommentCountReconciliation tests that post comment_count 14// is correctly reconciled when comments arrive before the parent post. 15// 16// This addresses the issue identified in comment_consumer.go:362 where the FIXME 17// comment suggests reconciliation is not implemented. This test verifies that 18// the reconciliation logic in post_consumer.go:210-226 works correctly. 19func TestPostConsumer_CommentCountReconciliation(t *testing.T) { 20 db := setupTestDB(t) 21 defer func() { 22 if err := db.Close(); err != nil { 23 t.Logf("Failed to close database: %v", err) 24 } 25 }() 26 27 ctx := context.Background() 28 29 // Set up repositories and consumers 30 postRepo := postgres.NewPostRepository(db) 31 commentRepo := postgres.NewCommentRepository(db) 32 communityRepo := postgres.NewCommunityRepository(db) 33 userRepo := postgres.NewUserRepository(db) 34 userService := users.NewUserService(userRepo, nil, getTestPDSURL()) 35 36 commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 37 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 38 39 // Setup test data 40 testUser := createTestUser(t, db, "reconcile.test", "did:plc:reconcile123") 41 testCommunity, err := createFeedTestCommunity(db, ctx, "reconcile-community", "owner.test") 42 if err != nil { 43 t.Fatalf("Failed to create test community: %v", err) 44 } 45 46 t.Run("Single comment arrives before post - count reconciled", func(t *testing.T) { 47 // Scenario: User creates a post 48 // Another user creates a comment on that post 49 // Due to Jetstream ordering, comment event arrives BEFORE post event 50 // When post is finally indexed, comment_count should be 1, not 0 51 52 postRkey := generateTID() 53 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey) 54 55 commentRkey := generateTID() 56 commentURI := fmt.Sprintf("at://%s/social.coves.community.comment/%s", testUser.DID, commentRkey) 57 58 // Step 1: Index comment FIRST (before parent post exists) 59 commentEvent := &jetstream.JetstreamEvent{ 60 Did: testUser.DID, 61 Kind: "commit", 62 Commit: &jetstream.CommitEvent{ 63 Rev: "comment-rev", 64 Operation: "create", 65 Collection: "social.coves.community.comment", 66 RKey: commentRkey, 67 CID: "bafycomment", 68 Record: map[string]interface{}{ 69 "$type": "social.coves.community.comment", 70 "content": "Comment arriving before parent post!", 71 "reply": map[string]interface{}{ 72 "root": map[string]interface{}{ 73 "uri": postURI, // Points to post that doesn't exist yet 74 "cid": "bafypost", 75 }, 76 "parent": map[string]interface{}{ 77 "uri": postURI, 78 "cid": "bafypost", 79 }, 80 }, 81 "createdAt": time.Now().Format(time.RFC3339), 82 }, 83 }, 84 } 85 86 err := commentConsumer.HandleEvent(ctx, commentEvent) 87 if err != nil { 88 t.Fatalf("Failed to handle comment event: %v", err) 89 } 90 91 // Verify comment was indexed 92 comment, err := commentRepo.GetByURI(ctx, commentURI) 93 if err != nil { 94 t.Fatalf("Comment not indexed: %v", err) 95 } 96 if comment.ParentURI != postURI { 97 t.Errorf("Expected comment parent_uri %s, got %s", postURI, comment.ParentURI) 98 } 99 100 // Step 2: Now index post (arrives late due to Jetstream ordering) 101 postEvent := &jetstream.JetstreamEvent{ 102 Did: testCommunity, 103 Kind: "commit", 104 Commit: &jetstream.CommitEvent{ 105 Rev: "post-rev", 106 Operation: "create", 107 Collection: "social.coves.community.post", 108 RKey: postRkey, 109 CID: "bafypost", 110 Record: map[string]interface{}{ 111 "$type": "social.coves.community.post", 112 "community": testCommunity, 113 "author": testUser.DID, 114 "title": "Post arriving after comment", 115 "content": "This post's comment arrived first!", 116 "createdAt": time.Now().Format(time.RFC3339), 117 }, 118 }, 119 } 120 121 err = postConsumer.HandleEvent(ctx, postEvent) 122 if err != nil { 123 t.Fatalf("Failed to handle post event: %v", err) 124 } 125 126 // Step 3: Verify post was indexed with CORRECT comment_count 127 post, err := postRepo.GetByURI(ctx, postURI) 128 if err != nil { 129 t.Fatalf("Post not indexed: %v", err) 130 } 131 132 // THIS IS THE KEY TEST: Post should have comment_count = 1 due to reconciliation 133 if post.CommentCount != 1 { 134 t.Errorf("Expected post comment_count to be 1 (reconciled), got %d", post.CommentCount) 135 t.Logf("This indicates the reconciliation logic in post_consumer.go is not working!") 136 t.Logf("The FIXME comment at comment_consumer.go:362 may still be valid.") 137 } 138 139 // Verify via direct query as well 140 var dbCommentCount int 141 err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", postURI).Scan(&dbCommentCount) 142 if err != nil { 143 t.Fatalf("Failed to query post comment_count: %v", err) 144 } 145 if dbCommentCount != 1 { 146 t.Errorf("Expected DB comment_count to be 1, got %d", dbCommentCount) 147 } 148 }) 149 150 t.Run("Multiple comments arrive before post - count reconciled to correct total", func(t *testing.T) { 151 postRkey := generateTID() 152 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey) 153 154 // Step 1: Index 3 comments BEFORE the post exists 155 for i := 1; i <= 3; i++ { 156 commentRkey := generateTID() 157 commentEvent := &jetstream.JetstreamEvent{ 158 Did: testUser.DID, 159 Kind: "commit", 160 Commit: &jetstream.CommitEvent{ 161 Rev: fmt.Sprintf("comment-%d-rev", i), 162 Operation: "create", 163 Collection: "social.coves.community.comment", 164 RKey: commentRkey, 165 CID: fmt.Sprintf("bafycomment%d", i), 166 Record: map[string]interface{}{ 167 "$type": "social.coves.community.comment", 168 "content": fmt.Sprintf("Comment %d before post", i), 169 "reply": map[string]interface{}{ 170 "root": map[string]interface{}{ 171 "uri": postURI, 172 "cid": "bafypost2", 173 }, 174 "parent": map[string]interface{}{ 175 "uri": postURI, 176 "cid": "bafypost2", 177 }, 178 }, 179 "createdAt": time.Now().Format(time.RFC3339), 180 }, 181 }, 182 } 183 184 err := commentConsumer.HandleEvent(ctx, commentEvent) 185 if err != nil { 186 t.Fatalf("Failed to handle comment %d event: %v", i, err) 187 } 188 } 189 190 // Step 2: Now index the post 191 postEvent := &jetstream.JetstreamEvent{ 192 Did: testCommunity, 193 Kind: "commit", 194 Commit: &jetstream.CommitEvent{ 195 Rev: "post2-rev", 196 Operation: "create", 197 Collection: "social.coves.community.post", 198 RKey: postRkey, 199 CID: "bafypost2", 200 Record: map[string]interface{}{ 201 "$type": "social.coves.community.post", 202 "community": testCommunity, 203 "author": testUser.DID, 204 "title": "Post with 3 pre-existing comments", 205 "content": "All 3 comments arrived before this post!", 206 "createdAt": time.Now().Format(time.RFC3339), 207 }, 208 }, 209 } 210 211 err := postConsumer.HandleEvent(ctx, postEvent) 212 if err != nil { 213 t.Fatalf("Failed to handle post event: %v", err) 214 } 215 216 // Step 3: Verify post has comment_count = 3 217 post, err := postRepo.GetByURI(ctx, postURI) 218 if err != nil { 219 t.Fatalf("Post not indexed: %v", err) 220 } 221 222 if post.CommentCount != 3 { 223 t.Errorf("Expected post comment_count to be 3 (reconciled), got %d", post.CommentCount) 224 } 225 }) 226 227 t.Run("Comments before and after post - count remains accurate", func(t *testing.T) { 228 postRkey := generateTID() 229 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey) 230 231 // Step 1: Index 2 comments BEFORE post 232 for i := 1; i <= 2; i++ { 233 commentRkey := generateTID() 234 commentEvent := &jetstream.JetstreamEvent{ 235 Did: testUser.DID, 236 Kind: "commit", 237 Commit: &jetstream.CommitEvent{ 238 Rev: fmt.Sprintf("before-%d-rev", i), 239 Operation: "create", 240 Collection: "social.coves.community.comment", 241 RKey: commentRkey, 242 CID: fmt.Sprintf("bafybefore%d", i), 243 Record: map[string]interface{}{ 244 "$type": "social.coves.community.comment", 245 "content": fmt.Sprintf("Before comment %d", i), 246 "reply": map[string]interface{}{ 247 "root": map[string]interface{}{ 248 "uri": postURI, 249 "cid": "bafypost3", 250 }, 251 "parent": map[string]interface{}{ 252 "uri": postURI, 253 "cid": "bafypost3", 254 }, 255 }, 256 "createdAt": time.Now().Format(time.RFC3339), 257 }, 258 }, 259 } 260 261 err := commentConsumer.HandleEvent(ctx, commentEvent) 262 if err != nil { 263 t.Fatalf("Failed to handle before-comment %d: %v", i, err) 264 } 265 } 266 267 // Step 2: Index the post (should reconcile to 2) 268 postEvent := &jetstream.JetstreamEvent{ 269 Did: testCommunity, 270 Kind: "commit", 271 Commit: &jetstream.CommitEvent{ 272 Rev: "post3-rev", 273 Operation: "create", 274 Collection: "social.coves.community.post", 275 RKey: postRkey, 276 CID: "bafypost3", 277 Record: map[string]interface{}{ 278 "$type": "social.coves.community.post", 279 "community": testCommunity, 280 "author": testUser.DID, 281 "title": "Post with before and after comments", 282 "content": "Testing mixed ordering", 283 "createdAt": time.Now().Format(time.RFC3339), 284 }, 285 }, 286 } 287 288 err := postConsumer.HandleEvent(ctx, postEvent) 289 if err != nil { 290 t.Fatalf("Failed to handle post event: %v", err) 291 } 292 293 // Verify count is 2 294 post, err := postRepo.GetByURI(ctx, postURI) 295 if err != nil { 296 t.Fatalf("Post not indexed: %v", err) 297 } 298 if post.CommentCount != 2 { 299 t.Errorf("Expected comment_count=2 after reconciliation, got %d", post.CommentCount) 300 } 301 302 // Step 3: Add 1 more comment AFTER post exists 303 commentRkey := generateTID() 304 afterCommentEvent := &jetstream.JetstreamEvent{ 305 Did: testUser.DID, 306 Kind: "commit", 307 Commit: &jetstream.CommitEvent{ 308 Rev: "after-rev", 309 Operation: "create", 310 Collection: "social.coves.community.comment", 311 RKey: commentRkey, 312 CID: "bafyafter", 313 Record: map[string]interface{}{ 314 "$type": "social.coves.community.comment", 315 "content": "Comment after post exists", 316 "reply": map[string]interface{}{ 317 "root": map[string]interface{}{ 318 "uri": postURI, 319 "cid": "bafypost3", 320 }, 321 "parent": map[string]interface{}{ 322 "uri": postURI, 323 "cid": "bafypost3", 324 }, 325 }, 326 "createdAt": time.Now().Format(time.RFC3339), 327 }, 328 }, 329 } 330 331 err = commentConsumer.HandleEvent(ctx, afterCommentEvent) 332 if err != nil { 333 t.Fatalf("Failed to handle after-comment: %v", err) 334 } 335 336 // Verify count incremented to 3 337 post, err = postRepo.GetByURI(ctx, postURI) 338 if err != nil { 339 t.Fatalf("Failed to get post after increment: %v", err) 340 } 341 if post.CommentCount != 3 { 342 t.Errorf("Expected comment_count=3 after increment, got %d", post.CommentCount) 343 } 344 }) 345 346 t.Run("Idempotent post indexing preserves comment_count", func(t *testing.T) { 347 postRkey := generateTID() 348 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey) 349 350 // Create comment first 351 commentRkey := generateTID() 352 commentEvent := &jetstream.JetstreamEvent{ 353 Did: testUser.DID, 354 Kind: "commit", 355 Commit: &jetstream.CommitEvent{ 356 Rev: "idem-comment-rev", 357 Operation: "create", 358 Collection: "social.coves.community.comment", 359 RKey: commentRkey, 360 CID: "bafyidemcomment", 361 Record: map[string]interface{}{ 362 "$type": "social.coves.community.comment", 363 "content": "Comment for idempotent test", 364 "reply": map[string]interface{}{ 365 "root": map[string]interface{}{ 366 "uri": postURI, 367 "cid": "bafyidempost", 368 }, 369 "parent": map[string]interface{}{ 370 "uri": postURI, 371 "cid": "bafyidempost", 372 }, 373 }, 374 "createdAt": time.Now().Format(time.RFC3339), 375 }, 376 }, 377 } 378 379 err := commentConsumer.HandleEvent(ctx, commentEvent) 380 if err != nil { 381 t.Fatalf("Failed to create comment: %v", err) 382 } 383 384 // Index post (should reconcile to 1) 385 postEvent := &jetstream.JetstreamEvent{ 386 Did: testCommunity, 387 Kind: "commit", 388 Commit: &jetstream.CommitEvent{ 389 Rev: "idem-post-rev", 390 Operation: "create", 391 Collection: "social.coves.community.post", 392 RKey: postRkey, 393 CID: "bafyidempost", 394 Record: map[string]interface{}{ 395 "$type": "social.coves.community.post", 396 "community": testCommunity, 397 "author": testUser.DID, 398 "title": "Idempotent test post", 399 "content": "Testing idempotent indexing", 400 "createdAt": time.Now().Format(time.RFC3339), 401 }, 402 }, 403 } 404 405 err = postConsumer.HandleEvent(ctx, postEvent) 406 if err != nil { 407 t.Fatalf("Failed to index post first time: %v", err) 408 } 409 410 // Verify count is 1 411 post, err := postRepo.GetByURI(ctx, postURI) 412 if err != nil { 413 t.Fatalf("Failed to get post: %v", err) 414 } 415 if post.CommentCount != 1 { 416 t.Errorf("Expected comment_count=1 after first index, got %d", post.CommentCount) 417 } 418 419 // Replay same post event (idempotent - should skip) 420 err = postConsumer.HandleEvent(ctx, postEvent) 421 if err != nil { 422 t.Fatalf("Idempotent post event should not error: %v", err) 423 } 424 425 // Verify count still 1 (not reset to 0) 426 post, err = postRepo.GetByURI(ctx, postURI) 427 if err != nil { 428 t.Fatalf("Failed to get post after replay: %v", err) 429 } 430 if post.CommentCount != 1 { 431 t.Errorf("Expected comment_count=1 after replay (idempotent), got %d", post.CommentCount) 432 } 433 }) 434}