A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/atproto/jetstream" 5 "Coves/internal/core/comments" 6 "Coves/internal/core/communities" 7 "Coves/internal/core/users" 8 "Coves/internal/db/postgres" 9 "context" 10 "fmt" 11 "sync" 12 "testing" 13 "time" 14) 15 16// TestConcurrentVoting_MultipleUsersOnSamePost tests race conditions when multiple users 17// vote on the same post simultaneously 18func TestConcurrentVoting_MultipleUsersOnSamePost(t *testing.T) { 19 if testing.Short() { 20 t.Skip("Skipping integration test in short mode") 21 } 22 23 db := setupTestDB(t) 24 defer func() { 25 if err := db.Close(); err != nil { 26 t.Logf("Failed to close database: %v", err) 27 } 28 }() 29 30 ctx := context.Background() 31 voteRepo := postgres.NewVoteRepository(db) 32 postRepo := postgres.NewPostRepository(db) 33 userRepo := postgres.NewUserRepository(db) 34 userService := users.NewUserService(userRepo, nil, "http://localhost:3001") 35 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db) 36 37 // Use fixed timestamp 38 fixedTime := time.Date(2025, 11, 16, 12, 0, 0, 0, time.UTC) 39 40 // Setup: Create test community and post 41 testCommunity, err := createFeedTestCommunity(db, ctx, "concurrent-votes", "owner.test") 42 if err != nil { 43 t.Fatalf("Failed to create test community: %v", err) 44 } 45 46 testUser := createTestUser(t, db, "author.test", "did:plc:author123") 47 postURI := createTestPost(t, db, testCommunity, testUser.DID, "Post for concurrent voting", 0, fixedTime) 48 49 t.Run("Multiple users upvoting same post concurrently", func(t *testing.T) { 50 const numVoters = 20 51 var wg sync.WaitGroup 52 wg.Add(numVoters) 53 54 // Channel to collect errors 55 errors := make(chan error, numVoters) 56 57 // Create voters and vote concurrently 58 for i := 0; i < numVoters; i++ { 59 go func(voterIndex int) { 60 defer wg.Done() 61 62 voterDID := fmt.Sprintf("did:plc:voter%d", voterIndex) 63 voterHandle := fmt.Sprintf("voter%d.test", voterIndex) 64 65 // Create user 66 _, createErr := userService.CreateUser(ctx, users.CreateUserRequest{ 67 DID: voterDID, 68 Handle: voterHandle, 69 PDSURL: "http://localhost:3001", 70 }) 71 if createErr != nil { 72 errors <- fmt.Errorf("voter %d: failed to create user: %w", voterIndex, createErr) 73 return 74 } 75 76 // Create vote 77 voteRKey := generateTID() 78 voteEvent := &jetstream.JetstreamEvent{ 79 Did: voterDID, 80 Kind: "commit", 81 Commit: &jetstream.CommitEvent{ 82 Rev: fmt.Sprintf("rev-%d", voterIndex), 83 Operation: "create", 84 Collection: "social.coves.feed.vote", 85 RKey: voteRKey, 86 CID: fmt.Sprintf("bafyvote%d", voterIndex), 87 Record: map[string]interface{}{ 88 "$type": "social.coves.feed.vote", 89 "subject": map[string]interface{}{ 90 "uri": postURI, 91 "cid": "bafypost", 92 }, 93 "direction": "up", 94 "createdAt": fixedTime.Format(time.RFC3339), 95 }, 96 }, 97 } 98 99 if handleErr := voteConsumer.HandleEvent(ctx, voteEvent); handleErr != nil { 100 errors <- fmt.Errorf("voter %d: failed to handle vote event: %w", voterIndex, handleErr) 101 return 102 } 103 }(i) 104 } 105 106 // Wait for all goroutines to complete 107 wg.Wait() 108 close(errors) 109 110 // Check for errors 111 var errorCount int 112 for err := range errors { 113 t.Logf("Error during concurrent voting: %v", err) 114 errorCount++ 115 } 116 117 if errorCount > 0 { 118 t.Errorf("Expected no errors during concurrent voting, got %d errors", errorCount) 119 } 120 121 // Verify post vote counts are correct 122 post, err := postRepo.GetByURI(ctx, postURI) 123 if err != nil { 124 t.Fatalf("Failed to get post: %v", err) 125 } 126 127 if post.UpvoteCount != numVoters { 128 t.Errorf("Expected upvote_count = %d, got %d (possible race condition in count update)", numVoters, post.UpvoteCount) 129 } 130 131 if post.Score != numVoters { 132 t.Errorf("Expected score = %d, got %d (possible race condition in score calculation)", numVoters, post.Score) 133 } 134 135 // CRITICAL: Verify actual vote records in database to detect race conditions 136 // This catches issues that aggregate counts might miss (e.g., duplicate votes, lost votes) 137 var actualVoteCount int 138 var distinctVoterCount int 139 err = db.QueryRow("SELECT COUNT(*), COUNT(DISTINCT voter_did) FROM votes WHERE subject_uri = $1 AND direction = 'up'", postURI). 140 Scan(&actualVoteCount, &distinctVoterCount) 141 if err != nil { 142 t.Fatalf("Failed to query vote records: %v", err) 143 } 144 145 if actualVoteCount != numVoters { 146 t.Errorf("Expected %d vote records in database, got %d (possible race condition: votes lost or duplicated)", numVoters, actualVoteCount) 147 } 148 149 if distinctVoterCount != numVoters { 150 t.Errorf("Expected %d distinct voters, got %d (possible race condition: duplicate votes from same voter)", numVoters, distinctVoterCount) 151 } 152 153 t.Logf("✓ %d concurrent upvotes processed correctly:", numVoters) 154 t.Logf(" - Post counts: upvote_count=%d, score=%d", post.UpvoteCount, post.Score) 155 t.Logf(" - Database records: %d votes from %d distinct voters (no duplicates)", actualVoteCount, distinctVoterCount) 156 }) 157 158 t.Run("Concurrent upvotes and downvotes on same post", func(t *testing.T) { 159 // Create a new post for this test 160 testPost2URI := createTestPost(t, db, testCommunity, testUser.DID, "Post for mixed voting", 0, fixedTime) 161 162 const numUpvoters = 15 163 const numDownvoters = 10 164 const totalVoters = numUpvoters + numDownvoters 165 166 var wg sync.WaitGroup 167 wg.Add(totalVoters) 168 errors := make(chan error, totalVoters) 169 170 // Upvoters 171 for i := 0; i < numUpvoters; i++ { 172 go func(voterIndex int) { 173 defer wg.Done() 174 175 voterDID := fmt.Sprintf("did:plc:upvoter%d", voterIndex) 176 voterHandle := fmt.Sprintf("upvoter%d.test", voterIndex) 177 178 _, createErr := userService.CreateUser(ctx, users.CreateUserRequest{ 179 DID: voterDID, 180 Handle: voterHandle, 181 PDSURL: "http://localhost:3001", 182 }) 183 if createErr != nil { 184 errors <- fmt.Errorf("upvoter %d: failed to create user: %w", voterIndex, createErr) 185 return 186 } 187 188 voteRKey := generateTID() 189 voteEvent := &jetstream.JetstreamEvent{ 190 Did: voterDID, 191 Kind: "commit", 192 Commit: &jetstream.CommitEvent{ 193 Rev: fmt.Sprintf("rev-up-%d", voterIndex), 194 Operation: "create", 195 Collection: "social.coves.feed.vote", 196 RKey: voteRKey, 197 CID: fmt.Sprintf("bafyup%d", voterIndex), 198 Record: map[string]interface{}{ 199 "$type": "social.coves.feed.vote", 200 "subject": map[string]interface{}{ 201 "uri": testPost2URI, 202 "cid": "bafypost2", 203 }, 204 "direction": "up", 205 "createdAt": fixedTime.Format(time.RFC3339), 206 }, 207 }, 208 } 209 210 if handleErr := voteConsumer.HandleEvent(ctx, voteEvent); handleErr != nil { 211 errors <- fmt.Errorf("upvoter %d: failed to handle event: %w", voterIndex, handleErr) 212 } 213 }(i) 214 } 215 216 // Downvoters 217 for i := 0; i < numDownvoters; i++ { 218 go func(voterIndex int) { 219 defer wg.Done() 220 221 voterDID := fmt.Sprintf("did:plc:downvoter%d", voterIndex) 222 voterHandle := fmt.Sprintf("downvoter%d.test", voterIndex) 223 224 _, createErr := userService.CreateUser(ctx, users.CreateUserRequest{ 225 DID: voterDID, 226 Handle: voterHandle, 227 PDSURL: "http://localhost:3001", 228 }) 229 if createErr != nil { 230 errors <- fmt.Errorf("downvoter %d: failed to create user: %w", voterIndex, createErr) 231 return 232 } 233 234 voteRKey := generateTID() 235 voteEvent := &jetstream.JetstreamEvent{ 236 Did: voterDID, 237 Kind: "commit", 238 Commit: &jetstream.CommitEvent{ 239 Rev: fmt.Sprintf("rev-down-%d", voterIndex), 240 Operation: "create", 241 Collection: "social.coves.feed.vote", 242 RKey: voteRKey, 243 CID: fmt.Sprintf("bafydown%d", voterIndex), 244 Record: map[string]interface{}{ 245 "$type": "social.coves.feed.vote", 246 "subject": map[string]interface{}{ 247 "uri": testPost2URI, 248 "cid": "bafypost2", 249 }, 250 "direction": "down", 251 "createdAt": fixedTime.Format(time.RFC3339), 252 }, 253 }, 254 } 255 256 if handleErr := voteConsumer.HandleEvent(ctx, voteEvent); handleErr != nil { 257 errors <- fmt.Errorf("downvoter %d: failed to handle event: %w", voterIndex, handleErr) 258 } 259 }(i) 260 } 261 262 wg.Wait() 263 close(errors) 264 265 // Check for errors 266 var errorCount int 267 for err := range errors { 268 t.Logf("Error during concurrent mixed voting: %v", err) 269 errorCount++ 270 } 271 272 if errorCount > 0 { 273 t.Errorf("Expected no errors during concurrent voting, got %d errors", errorCount) 274 } 275 276 // Verify counts 277 post, err := postRepo.GetByURI(ctx, testPost2URI) 278 if err != nil { 279 t.Fatalf("Failed to get post: %v", err) 280 } 281 282 expectedScore := numUpvoters - numDownvoters 283 if post.UpvoteCount != numUpvoters { 284 t.Errorf("Expected upvote_count = %d, got %d", numUpvoters, post.UpvoteCount) 285 } 286 if post.DownvoteCount != numDownvoters { 287 t.Errorf("Expected downvote_count = %d, got %d", numDownvoters, post.DownvoteCount) 288 } 289 if post.Score != expectedScore { 290 t.Errorf("Expected score = %d, got %d", expectedScore, post.Score) 291 } 292 293 // CRITICAL: Verify actual vote records to detect race conditions 294 var actualUpvotes, actualDownvotes, distinctUpvoters, distinctDownvoters int 295 err = db.QueryRow(` 296 SELECT 297 COUNT(*) FILTER (WHERE direction = 'up'), 298 COUNT(*) FILTER (WHERE direction = 'down'), 299 COUNT(DISTINCT voter_did) FILTER (WHERE direction = 'up'), 300 COUNT(DISTINCT voter_did) FILTER (WHERE direction = 'down') 301 FROM votes WHERE subject_uri = $1 302 `, testPost2URI).Scan(&actualUpvotes, &actualDownvotes, &distinctUpvoters, &distinctDownvoters) 303 if err != nil { 304 t.Fatalf("Failed to query vote records: %v", err) 305 } 306 307 if actualUpvotes != numUpvoters { 308 t.Errorf("Expected %d upvote records, got %d (possible race condition)", numUpvoters, actualUpvotes) 309 } 310 if actualDownvotes != numDownvoters { 311 t.Errorf("Expected %d downvote records, got %d (possible race condition)", numDownvoters, actualDownvotes) 312 } 313 if distinctUpvoters != numUpvoters { 314 t.Errorf("Expected %d distinct upvoters, got %d (duplicate votes detected)", numUpvoters, distinctUpvoters) 315 } 316 if distinctDownvoters != numDownvoters { 317 t.Errorf("Expected %d distinct downvoters, got %d (duplicate votes detected)", numDownvoters, distinctDownvoters) 318 } 319 320 t.Logf("✓ Concurrent mixed voting processed correctly:") 321 t.Logf(" - Post counts: upvotes=%d, downvotes=%d, score=%d", post.UpvoteCount, post.DownvoteCount, post.Score) 322 t.Logf(" - Database records: %d upvotes from %d voters, %d downvotes from %d voters (no duplicates)", 323 actualUpvotes, distinctUpvoters, actualDownvotes, distinctDownvoters) 324 }) 325} 326 327// TestConcurrentCommenting_MultipleUsersOnSamePost tests race conditions when multiple users 328// comment on the same post simultaneously 329func TestConcurrentCommenting_MultipleUsersOnSamePost(t *testing.T) { 330 if testing.Short() { 331 t.Skip("Skipping integration test in short mode") 332 } 333 334 db := setupTestDB(t) 335 defer func() { 336 if err := db.Close(); err != nil { 337 t.Logf("Failed to close database: %v", err) 338 } 339 }() 340 341 ctx := context.Background() 342 commentRepo := postgres.NewCommentRepository(db) 343 postRepo := postgres.NewPostRepository(db) 344 userRepo := postgres.NewUserRepository(db) 345 communityRepo := postgres.NewCommunityRepository(db) 346 commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 347 348 fixedTime := time.Date(2025, 11, 16, 12, 0, 0, 0, time.UTC) 349 350 // Setup: Create test community and post 351 testCommunity, err := createFeedTestCommunity(db, ctx, "concurrent-comments", "owner.test") 352 if err != nil { 353 t.Fatalf("Failed to create test community: %v", err) 354 } 355 356 testUser := createTestUser(t, db, "author.test", "did:plc:author456") 357 postURI := createTestPost(t, db, testCommunity, testUser.DID, "Post for concurrent commenting", 0, fixedTime) 358 359 t.Run("Multiple users commenting simultaneously", func(t *testing.T) { 360 const numCommenters = 25 361 var wg sync.WaitGroup 362 wg.Add(numCommenters) 363 364 errors := make(chan error, numCommenters) 365 commentURIs := make(chan string, numCommenters) 366 367 for i := 0; i < numCommenters; i++ { 368 go func(commenterIndex int) { 369 defer wg.Done() 370 371 commenterDID := fmt.Sprintf("did:plc:commenter%d", commenterIndex) 372 commentRKey := fmt.Sprintf("%s-comment%d", generateTID(), commenterIndex) 373 commentURI := fmt.Sprintf("at://%s/social.coves.community.comment/%s", commenterDID, commentRKey) 374 375 commentEvent := &jetstream.JetstreamEvent{ 376 Did: commenterDID, 377 Kind: "commit", 378 Commit: &jetstream.CommitEvent{ 379 Rev: fmt.Sprintf("rev-comment-%d", commenterIndex), 380 Operation: "create", 381 Collection: "social.coves.community.comment", 382 RKey: commentRKey, 383 CID: fmt.Sprintf("bafycomment%d", commenterIndex), 384 Record: map[string]interface{}{ 385 "$type": "social.coves.community.comment", 386 "content": fmt.Sprintf("Concurrent comment #%d", commenterIndex), 387 "reply": map[string]interface{}{ 388 "root": map[string]interface{}{ 389 "uri": postURI, 390 "cid": "bafypost", 391 }, 392 "parent": map[string]interface{}{ 393 "uri": postURI, 394 "cid": "bafypost", 395 }, 396 }, 397 "createdAt": fixedTime.Add(time.Duration(commenterIndex) * time.Millisecond).Format(time.RFC3339), 398 }, 399 }, 400 } 401 402 if handleErr := commentConsumer.HandleEvent(ctx, commentEvent); handleErr != nil { 403 errors <- fmt.Errorf("commenter %d: failed to handle comment event: %w", commenterIndex, handleErr) 404 return 405 } 406 407 commentURIs <- commentURI 408 }(i) 409 } 410 411 wg.Wait() 412 close(errors) 413 close(commentURIs) 414 415 // Check for errors 416 var errorCount int 417 for err := range errors { 418 t.Logf("Error during concurrent commenting: %v", err) 419 errorCount++ 420 } 421 422 if errorCount > 0 { 423 t.Errorf("Expected no errors during concurrent commenting, got %d errors", errorCount) 424 } 425 426 // Verify post comment count updated correctly 427 post, err := postRepo.GetByURI(ctx, postURI) 428 if err != nil { 429 t.Fatalf("Failed to get post: %v", err) 430 } 431 432 if post.CommentCount != numCommenters { 433 t.Errorf("Expected comment_count = %d, got %d (possible race condition in count update)", numCommenters, post.CommentCount) 434 } 435 436 // CRITICAL: Verify actual comment records to detect race conditions 437 var actualCommentCount int 438 var distinctCommenters int 439 err = db.QueryRow(` 440 SELECT COUNT(*), COUNT(DISTINCT author_did) 441 FROM comments 442 WHERE post_uri = $1 AND parent_comment_uri IS NULL 443 `, postURI).Scan(&actualCommentCount, &distinctCommenters) 444 if err != nil { 445 t.Fatalf("Failed to query comment records: %v", err) 446 } 447 448 if actualCommentCount != numCommenters { 449 t.Errorf("Expected %d comment records in database, got %d (possible race condition: comments lost or duplicated)", numCommenters, actualCommentCount) 450 } 451 452 if distinctCommenters != numCommenters { 453 t.Errorf("Expected %d distinct commenters, got %d (possible duplicate comments from same author)", numCommenters, distinctCommenters) 454 } 455 456 // Verify all comments are retrievable via service 457 // Use factory constructor with nil factory - this test only uses the read path (GetComments) 458 commentService := comments.NewCommentServiceWithPDSFactory(commentRepo, userRepo, postRepo, communityRepo, nil, nil) 459 response, err := commentService.GetComments(ctx, &comments.GetCommentsRequest{ 460 PostURI: postURI, 461 Sort: "new", 462 Depth: 10, 463 Limit: 100, 464 ViewerDID: nil, 465 }) 466 if err != nil { 467 t.Fatalf("Failed to get comments: %v", err) 468 } 469 470 if len(response.Comments) != numCommenters { 471 t.Errorf("Expected %d comments in response, got %d", numCommenters, len(response.Comments)) 472 } 473 474 t.Logf("✓ %d concurrent comments processed correctly:", numCommenters) 475 t.Logf(" - Post comment_count: %d", post.CommentCount) 476 t.Logf(" - Database records: %d comments from %d distinct authors (no duplicates)", actualCommentCount, distinctCommenters) 477 }) 478 479 t.Run("Concurrent replies to same comment", func(t *testing.T) { 480 // Create a parent comment first 481 parentCommentRKey := generateTID() 482 parentCommentURI := fmt.Sprintf("at://%s/social.coves.community.comment/%s", testUser.DID, parentCommentRKey) 483 484 parentEvent := &jetstream.JetstreamEvent{ 485 Did: testUser.DID, 486 Kind: "commit", 487 Commit: &jetstream.CommitEvent{ 488 Rev: "parent-rev", 489 Operation: "create", 490 Collection: "social.coves.community.comment", 491 RKey: parentCommentRKey, 492 CID: "bafyparent", 493 Record: map[string]interface{}{ 494 "$type": "social.coves.community.comment", 495 "content": "Parent comment for replies", 496 "reply": map[string]interface{}{ 497 "root": map[string]interface{}{ 498 "uri": postURI, 499 "cid": "bafypost", 500 }, 501 "parent": map[string]interface{}{ 502 "uri": postURI, 503 "cid": "bafypost", 504 }, 505 }, 506 "createdAt": fixedTime.Format(time.RFC3339), 507 }, 508 }, 509 } 510 511 if err := commentConsumer.HandleEvent(ctx, parentEvent); err != nil { 512 t.Fatalf("Failed to create parent comment: %v", err) 513 } 514 515 // Now create concurrent replies 516 const numRepliers = 15 517 var wg sync.WaitGroup 518 wg.Add(numRepliers) 519 errors := make(chan error, numRepliers) 520 521 for i := 0; i < numRepliers; i++ { 522 go func(replierIndex int) { 523 defer wg.Done() 524 525 replierDID := fmt.Sprintf("did:plc:replier%d", replierIndex) 526 replyRKey := fmt.Sprintf("%s-reply%d", generateTID(), replierIndex) 527 528 replyEvent := &jetstream.JetstreamEvent{ 529 Did: replierDID, 530 Kind: "commit", 531 Commit: &jetstream.CommitEvent{ 532 Rev: fmt.Sprintf("rev-reply-%d", replierIndex), 533 Operation: "create", 534 Collection: "social.coves.community.comment", 535 RKey: replyRKey, 536 CID: fmt.Sprintf("bafyreply%d", replierIndex), 537 Record: map[string]interface{}{ 538 "$type": "social.coves.community.comment", 539 "content": fmt.Sprintf("Concurrent reply #%d", replierIndex), 540 "reply": map[string]interface{}{ 541 "root": map[string]interface{}{ 542 "uri": postURI, 543 "cid": "bafypost", 544 }, 545 "parent": map[string]interface{}{ 546 "uri": parentCommentURI, 547 "cid": "bafyparent", 548 }, 549 }, 550 "createdAt": fixedTime.Add(time.Duration(replierIndex) * time.Millisecond).Format(time.RFC3339), 551 }, 552 }, 553 } 554 555 if handleErr := commentConsumer.HandleEvent(ctx, replyEvent); handleErr != nil { 556 errors <- fmt.Errorf("replier %d: failed to handle reply event: %w", replierIndex, handleErr) 557 } 558 }(i) 559 } 560 561 wg.Wait() 562 close(errors) 563 564 // Check for errors 565 var errorCount int 566 for err := range errors { 567 t.Logf("Error during concurrent replying: %v", err) 568 errorCount++ 569 } 570 571 if errorCount > 0 { 572 t.Errorf("Expected no errors during concurrent replying, got %d errors", errorCount) 573 } 574 575 // Verify parent comment reply count 576 parentComment, err := commentRepo.GetByURI(ctx, parentCommentURI) 577 if err != nil { 578 t.Fatalf("Failed to get parent comment: %v", err) 579 } 580 581 if parentComment.ReplyCount != numRepliers { 582 t.Errorf("Expected reply_count = %d on parent comment, got %d (possible race condition)", numRepliers, parentComment.ReplyCount) 583 } 584 585 t.Logf("✓ %d concurrent replies processed correctly, reply_count=%d", numRepliers, parentComment.ReplyCount) 586 }) 587} 588 589// TestConcurrentCommunityCreation tests race conditions when multiple goroutines 590// try to create communities with the same handle 591func TestConcurrentCommunityCreation_DuplicateHandle(t *testing.T) { 592 if testing.Short() { 593 t.Skip("Skipping integration test in short mode") 594 } 595 596 db := setupTestDB(t) 597 defer func() { 598 if err := db.Close(); err != nil { 599 t.Logf("Failed to close database: %v", err) 600 } 601 }() 602 603 ctx := context.Background() 604 repo := postgres.NewCommunityRepository(db) 605 606 t.Run("Concurrent creation with same handle should fail", func(t *testing.T) { 607 const numAttempts = 10 608 sameHandle := fmt.Sprintf("duplicate-handle-%d.test.coves.social", time.Now().UnixNano()) 609 610 var wg sync.WaitGroup 611 wg.Add(numAttempts) 612 613 type result struct { 614 err error 615 success bool 616 } 617 results := make(chan result, numAttempts) 618 619 for i := 0; i < numAttempts; i++ { 620 go func(attemptIndex int) { 621 defer wg.Done() 622 623 // Each attempt uses a unique DID but same handle 624 uniqueDID := fmt.Sprintf("did:plc:dup-community-%d-%d", time.Now().UnixNano(), attemptIndex) 625 626 community := &communities.Community{ 627 DID: uniqueDID, 628 Handle: sameHandle, // SAME HANDLE 629 Name: fmt.Sprintf("dup-test-%d", attemptIndex), 630 DisplayName: fmt.Sprintf("Duplicate Test %d", attemptIndex), 631 Description: "Testing duplicate handle prevention", 632 OwnerDID: "did:web:test.local", 633 CreatedByDID: "did:plc:creator", 634 HostedByDID: "did:web:test.local", 635 Visibility: "public", 636 CreatedAt: time.Now(), 637 UpdatedAt: time.Now(), 638 } 639 640 _, createErr := repo.Create(ctx, community) 641 results <- result{ 642 success: createErr == nil, 643 err: createErr, 644 } 645 }(i) 646 } 647 648 wg.Wait() 649 close(results) 650 651 // Collect results 652 successCount := 0 653 duplicateErrors := 0 654 655 for res := range results { 656 if res.success { 657 successCount++ 658 } else if communities.IsConflict(res.err) { 659 duplicateErrors++ 660 } else { 661 t.Logf("Unexpected error type: %v", res.err) 662 } 663 } 664 665 // CRITICAL: Exactly ONE should succeed, rest should fail with duplicate error 666 if successCount != 1 { 667 t.Errorf("Expected exactly 1 successful creation, got %d (DATABASE CONSTRAINT VIOLATION - race condition detected)", successCount) 668 } 669 670 if duplicateErrors != numAttempts-1 { 671 t.Errorf("Expected %d duplicate errors, got %d", numAttempts-1, duplicateErrors) 672 } 673 674 t.Logf("✓ Duplicate handle protection: %d successful, %d duplicate errors (database constraint working)", successCount, duplicateErrors) 675 }) 676 677 t.Run("Concurrent creation with different handles should succeed", func(t *testing.T) { 678 const numAttempts = 10 679 var wg sync.WaitGroup 680 wg.Add(numAttempts) 681 682 errors := make(chan error, numAttempts) 683 684 for i := 0; i < numAttempts; i++ { 685 go func(attemptIndex int) { 686 defer wg.Done() 687 688 uniqueSuffix := fmt.Sprintf("%d-%d", time.Now().UnixNano(), attemptIndex) 689 community := &communities.Community{ 690 DID: generateTestDID(uniqueSuffix), 691 Handle: fmt.Sprintf("unique-handle-%s.test.coves.social", uniqueSuffix), 692 Name: fmt.Sprintf("unique-test-%s", uniqueSuffix), 693 DisplayName: fmt.Sprintf("Unique Test %d", attemptIndex), 694 Description: "Testing concurrent unique handle creation", 695 OwnerDID: "did:web:test.local", 696 CreatedByDID: "did:plc:creator", 697 HostedByDID: "did:web:test.local", 698 Visibility: "public", 699 CreatedAt: time.Now(), 700 UpdatedAt: time.Now(), 701 } 702 703 _, createErr := repo.Create(ctx, community) 704 if createErr != nil { 705 errors <- createErr 706 } 707 }(i) 708 } 709 710 wg.Wait() 711 close(errors) 712 713 // All should succeed 714 var errorCount int 715 for err := range errors { 716 t.Logf("Error during concurrent unique creation: %v", err) 717 errorCount++ 718 } 719 720 if errorCount > 0 { 721 t.Errorf("Expected all %d creations to succeed, but %d failed", numAttempts, errorCount) 722 } 723 724 t.Logf("✓ All %d concurrent community creations with unique handles succeeded", numAttempts) 725 }) 726} 727 728// TestConcurrentSubscription tests race conditions when multiple users subscribe 729// to the same community simultaneously 730func TestConcurrentSubscription_RaceConditions(t *testing.T) { 731 if testing.Short() { 732 t.Skip("Skipping integration test in short mode") 733 } 734 735 db := setupTestDB(t) 736 defer func() { 737 if err := db.Close(); err != nil { 738 t.Logf("Failed to close database: %v", err) 739 } 740 }() 741 742 ctx := context.Background() 743 communityRepo := postgres.NewCommunityRepository(db) 744 consumer := jetstream.NewCommunityEventConsumer(communityRepo, "did:web:coves.local", true, nil) 745 746 // Create test community 747 testDID := fmt.Sprintf("did:plc:test-sub-race-%d", time.Now().UnixNano()) 748 community := &communities.Community{ 749 DID: testDID, 750 Handle: fmt.Sprintf("sub-race-%d.test.coves.social", time.Now().UnixNano()), 751 Name: "sub-race-test", 752 DisplayName: "Subscription Race Test", 753 Description: "Testing subscription race conditions", 754 OwnerDID: "did:plc:owner", 755 CreatedByDID: "did:plc:creator", 756 HostedByDID: "did:web:coves.local", 757 Visibility: "public", 758 CreatedAt: time.Now(), 759 UpdatedAt: time.Now(), 760 } 761 762 created, err := communityRepo.Create(ctx, community) 763 if err != nil { 764 t.Fatalf("Failed to create test community: %v", err) 765 } 766 767 t.Run("Multiple users subscribing concurrently", func(t *testing.T) { 768 const numSubscribers = 30 769 var wg sync.WaitGroup 770 wg.Add(numSubscribers) 771 772 errors := make(chan error, numSubscribers) 773 774 for i := 0; i < numSubscribers; i++ { 775 go func(subscriberIndex int) { 776 defer wg.Done() 777 778 userDID := fmt.Sprintf("did:plc:subscriber%d", subscriberIndex) 779 rkey := fmt.Sprintf("sub-%d", subscriberIndex) 780 781 event := &jetstream.JetstreamEvent{ 782 Did: userDID, 783 Kind: "commit", 784 TimeUS: time.Now().UnixMicro(), 785 Commit: &jetstream.CommitEvent{ 786 Rev: fmt.Sprintf("rev-%d", subscriberIndex), 787 Operation: "create", 788 Collection: "social.coves.community.subscription", 789 RKey: rkey, 790 CID: fmt.Sprintf("bafysub%d", subscriberIndex), 791 Record: map[string]interface{}{ 792 "$type": "social.coves.community.subscription", 793 "subject": created.DID, 794 "createdAt": time.Now().Format(time.RFC3339), 795 "contentVisibility": float64(3), 796 }, 797 }, 798 } 799 800 if handleErr := consumer.HandleEvent(ctx, event); handleErr != nil { 801 errors <- fmt.Errorf("subscriber %d: failed to subscribe: %w", subscriberIndex, handleErr) 802 } 803 }(i) 804 } 805 806 wg.Wait() 807 close(errors) 808 809 // Check for errors 810 var errorCount int 811 for err := range errors { 812 t.Logf("Error during concurrent subscription: %v", err) 813 errorCount++ 814 } 815 816 if errorCount > 0 { 817 t.Errorf("Expected no errors during concurrent subscription, got %d errors", errorCount) 818 } 819 820 // Verify subscriber count is correct 821 updatedCommunity, err := communityRepo.GetByDID(ctx, created.DID) 822 if err != nil { 823 t.Fatalf("Failed to get updated community: %v", err) 824 } 825 826 if updatedCommunity.SubscriberCount != numSubscribers { 827 t.Errorf("Expected subscriber_count = %d, got %d (RACE CONDITION in subscriber count update)", numSubscribers, updatedCommunity.SubscriberCount) 828 } 829 830 // CRITICAL: Verify actual subscription records to detect race conditions 831 var actualSubscriptionCount int 832 var distinctSubscribers int 833 err = db.QueryRow(` 834 SELECT COUNT(*), COUNT(DISTINCT user_did) 835 FROM community_subscriptions 836 WHERE community_did = $1 837 `, created.DID).Scan(&actualSubscriptionCount, &distinctSubscribers) 838 if err != nil { 839 t.Fatalf("Failed to query subscription records: %v", err) 840 } 841 842 if actualSubscriptionCount != numSubscribers { 843 t.Errorf("Expected %d subscription records, got %d (possible race condition: subscriptions lost or duplicated)", numSubscribers, actualSubscriptionCount) 844 } 845 846 if distinctSubscribers != numSubscribers { 847 t.Errorf("Expected %d distinct subscribers, got %d (possible duplicate subscriptions)", numSubscribers, distinctSubscribers) 848 } 849 850 t.Logf("✓ %d concurrent subscriptions processed correctly:", numSubscribers) 851 t.Logf(" - Community subscriber_count: %d", updatedCommunity.SubscriberCount) 852 t.Logf(" - Database records: %d subscriptions from %d distinct users (no duplicates)", actualSubscriptionCount, distinctSubscribers) 853 }) 854 855 t.Run("Concurrent subscribe and unsubscribe", func(t *testing.T) { 856 // Create new community for this test 857 testDID2 := fmt.Sprintf("did:plc:test-sub-unsub-%d", time.Now().UnixNano()) 858 community2 := &communities.Community{ 859 DID: testDID2, 860 Handle: fmt.Sprintf("sub-unsub-%d.test.coves.social", time.Now().UnixNano()), 861 Name: "sub-unsub-test", 862 DisplayName: "Subscribe/Unsubscribe Race Test", 863 Description: "Testing concurrent subscribe/unsubscribe", 864 OwnerDID: "did:plc:owner", 865 CreatedByDID: "did:plc:creator", 866 HostedByDID: "did:web:coves.local", 867 Visibility: "public", 868 CreatedAt: time.Now(), 869 UpdatedAt: time.Now(), 870 } 871 872 created2, err := communityRepo.Create(ctx, community2) 873 if err != nil { 874 t.Fatalf("Failed to create test community: %v", err) 875 } 876 877 const numUsers = 20 878 var wg sync.WaitGroup 879 wg.Add(numUsers * 2) // Each user subscribes then unsubscribes 880 881 errors := make(chan error, numUsers*2) 882 883 for i := 0; i < numUsers; i++ { 884 go func(userIndex int) { 885 userDID := fmt.Sprintf("did:plc:subunsubuser%d", userIndex) 886 rkey := fmt.Sprintf("subunsub-%d", userIndex) 887 888 // Subscribe 889 subscribeEvent := &jetstream.JetstreamEvent{ 890 Did: userDID, 891 Kind: "commit", 892 TimeUS: time.Now().UnixMicro(), 893 Commit: &jetstream.CommitEvent{ 894 Rev: fmt.Sprintf("rev-sub-%d", userIndex), 895 Operation: "create", 896 Collection: "social.coves.community.subscription", 897 RKey: rkey, 898 CID: fmt.Sprintf("bafysubscribe%d", userIndex), 899 Record: map[string]interface{}{ 900 "$type": "social.coves.community.subscription", 901 "subject": created2.DID, 902 "createdAt": time.Now().Format(time.RFC3339), 903 "contentVisibility": float64(3), 904 }, 905 }, 906 } 907 908 if handleErr := consumer.HandleEvent(ctx, subscribeEvent); handleErr != nil { 909 errors <- fmt.Errorf("user %d: subscribe failed: %w", userIndex, handleErr) 910 } 911 wg.Done() 912 913 // Small delay to ensure subscribe happens first 914 time.Sleep(10 * time.Millisecond) 915 916 // Unsubscribe 917 unsubscribeEvent := &jetstream.JetstreamEvent{ 918 Did: userDID, 919 Kind: "commit", 920 TimeUS: time.Now().UnixMicro(), 921 Commit: &jetstream.CommitEvent{ 922 Rev: fmt.Sprintf("rev-unsub-%d", userIndex), 923 Operation: "delete", 924 Collection: "social.coves.community.subscription", 925 RKey: rkey, 926 CID: "", 927 Record: nil, 928 }, 929 } 930 931 if handleErr := consumer.HandleEvent(ctx, unsubscribeEvent); handleErr != nil { 932 errors <- fmt.Errorf("user %d: unsubscribe failed: %w", userIndex, handleErr) 933 } 934 wg.Done() 935 }(i) 936 } 937 938 wg.Wait() 939 close(errors) 940 941 // Check for errors 942 var errorCount int 943 for err := range errors { 944 t.Logf("Error during concurrent sub/unsub: %v", err) 945 errorCount++ 946 } 947 948 if errorCount > 0 { 949 t.Errorf("Expected no errors during concurrent sub/unsub, got %d errors", errorCount) 950 } 951 952 // Final subscriber count should be 0 (all unsubscribed) 953 finalCommunity, err := communityRepo.GetByDID(ctx, created2.DID) 954 if err != nil { 955 t.Fatalf("Failed to get final community: %v", err) 956 } 957 958 if finalCommunity.SubscriberCount != 0 { 959 t.Errorf("Expected subscriber_count = 0 after all unsubscribed, got %d (RACE CONDITION detected)", finalCommunity.SubscriberCount) 960 } 961 962 // CRITICAL: Verify no subscription records remain in database 963 var remainingSubscriptions int 964 err = db.QueryRow(` 965 SELECT COUNT(*) 966 FROM community_subscriptions 967 WHERE community_did = $1 968 `, created2.DID).Scan(&remainingSubscriptions) 969 if err != nil { 970 t.Fatalf("Failed to query subscription records: %v", err) 971 } 972 973 if remainingSubscriptions != 0 { 974 t.Errorf("Expected 0 subscription records after all unsubscribed, got %d (orphaned subscriptions detected)", remainingSubscriptions) 975 } 976 977 t.Logf("✓ Concurrent subscribe/unsubscribe handled correctly:") 978 t.Logf(" - Community subscriber_count: %d", finalCommunity.SubscriberCount) 979 t.Logf(" - Database records: %d subscriptions remaining (clean unsubscribe)", remainingSubscriptions) 980 }) 981}