A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/atproto/jetstream" 5 "Coves/internal/core/comments" 6 "Coves/internal/core/communities" 7 "Coves/internal/core/users" 8 "Coves/internal/db/postgres" 9 "context" 10 "fmt" 11 "sync" 12 "testing" 13 "time" 14) 15 16// TestConcurrentVoting_MultipleUsersOnSamePost tests race conditions when multiple users 17// vote on the same post simultaneously 18func TestConcurrentVoting_MultipleUsersOnSamePost(t *testing.T) { 19 if testing.Short() { 20 t.Skip("Skipping integration test in short mode") 21 } 22 23 db := setupTestDB(t) 24 defer func() { 25 if err := db.Close(); err != nil { 26 t.Logf("Failed to close database: %v", err) 27 } 28 }() 29 30 ctx := context.Background() 31 voteRepo := postgres.NewVoteRepository(db) 32 postRepo := postgres.NewPostRepository(db) 33 userRepo := postgres.NewUserRepository(db) 34 userService := users.NewUserService(userRepo, nil, "http://localhost:3001") 35 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db) 36 37 // Use fixed timestamp 38 fixedTime := time.Date(2025, 11, 16, 12, 0, 0, 0, time.UTC) 39 40 // Setup: Create test community and post 41 testCommunity, err := createFeedTestCommunity(db, ctx, "concurrent-votes", "owner.test") 42 if err != nil { 43 t.Fatalf("Failed to create test community: %v", err) 44 } 45 46 testUser := createTestUser(t, db, "author.test", "did:plc:author123") 47 postURI := createTestPost(t, db, testCommunity, testUser.DID, "Post for concurrent voting", 0, fixedTime) 48 49 t.Run("Multiple users upvoting same post concurrently", func(t *testing.T) { 50 const numVoters = 20 51 var wg sync.WaitGroup 52 wg.Add(numVoters) 53 54 // Channel to collect errors 55 errors := make(chan error, numVoters) 56 57 // Create voters and vote concurrently 58 for i := 0; i < numVoters; i++ { 59 go func(voterIndex int) { 60 defer wg.Done() 61 62 voterDID := fmt.Sprintf("did:plc:voter%d", voterIndex) 63 voterHandle := fmt.Sprintf("voter%d.test", voterIndex) 64 65 // Create user 66 _, createErr := userService.CreateUser(ctx, users.CreateUserRequest{ 67 DID: voterDID, 68 Handle: voterHandle, 69 PDSURL: "http://localhost:3001", 70 }) 71 if createErr != nil { 72 errors <- fmt.Errorf("voter %d: failed to create user: %w", voterIndex, createErr) 73 return 74 } 75 76 // Create vote 77 voteRKey := generateTID() 78 voteEvent := &jetstream.JetstreamEvent{ 79 Did: voterDID, 80 Kind: "commit", 81 Commit: &jetstream.CommitEvent{ 82 Rev: fmt.Sprintf("rev-%d", voterIndex), 83 Operation: "create", 84 Collection: "social.coves.feed.vote", 85 RKey: voteRKey, 86 CID: fmt.Sprintf("bafyvote%d", voterIndex), 87 Record: map[string]interface{}{ 88 "$type": "social.coves.feed.vote", 89 "subject": map[string]interface{}{ 90 "uri": postURI, 91 "cid": "bafypost", 92 }, 93 "direction": "up", 94 "createdAt": fixedTime.Format(time.RFC3339), 95 }, 96 }, 97 } 98 99 if handleErr := voteConsumer.HandleEvent(ctx, voteEvent); handleErr != nil { 100 errors <- fmt.Errorf("voter %d: failed to handle vote event: %w", voterIndex, handleErr) 101 return 102 } 103 }(i) 104 } 105 106 // Wait for all goroutines to complete 107 wg.Wait() 108 close(errors) 109 110 // Check for errors 111 var errorCount int 112 for err := range errors { 113 t.Logf("Error during concurrent voting: %v", err) 114 errorCount++ 115 } 116 117 if errorCount > 0 { 118 t.Errorf("Expected no errors during concurrent voting, got %d errors", errorCount) 119 } 120 121 // Verify post vote counts are correct 122 post, err := postRepo.GetByURI(ctx, postURI) 123 if err != nil { 124 t.Fatalf("Failed to get post: %v", err) 125 } 126 127 if post.UpvoteCount != numVoters { 128 t.Errorf("Expected upvote_count = %d, got %d (possible race condition in count update)", numVoters, post.UpvoteCount) 129 } 130 131 if post.Score != numVoters { 132 t.Errorf("Expected score = %d, got %d (possible race condition in score calculation)", numVoters, post.Score) 133 } 134 135 // CRITICAL: Verify actual vote records in database to detect race conditions 136 // This catches issues that aggregate counts might miss (e.g., duplicate votes, lost votes) 137 var actualVoteCount int 138 var distinctVoterCount int 139 err = db.QueryRow("SELECT COUNT(*), COUNT(DISTINCT voter_did) FROM votes WHERE subject_uri = $1 AND direction = 'up'", postURI). 140 Scan(&actualVoteCount, &distinctVoterCount) 141 if err != nil { 142 t.Fatalf("Failed to query vote records: %v", err) 143 } 144 145 if actualVoteCount != numVoters { 146 t.Errorf("Expected %d vote records in database, got %d (possible race condition: votes lost or duplicated)", numVoters, actualVoteCount) 147 } 148 149 if distinctVoterCount != numVoters { 150 t.Errorf("Expected %d distinct voters, got %d (possible race condition: duplicate votes from same voter)", numVoters, distinctVoterCount) 151 } 152 153 t.Logf("✓ %d concurrent upvotes processed correctly:", numVoters) 154 t.Logf(" - Post counts: upvote_count=%d, score=%d", post.UpvoteCount, post.Score) 155 t.Logf(" - Database records: %d votes from %d distinct voters (no duplicates)", actualVoteCount, distinctVoterCount) 156 }) 157 158 t.Run("Concurrent upvotes and downvotes on same post", func(t *testing.T) { 159 // Create a new post for this test 160 testPost2URI := createTestPost(t, db, testCommunity, testUser.DID, "Post for mixed voting", 0, fixedTime) 161 162 const numUpvoters = 15 163 const numDownvoters = 10 164 const totalVoters = numUpvoters + numDownvoters 165 166 var wg sync.WaitGroup 167 wg.Add(totalVoters) 168 errors := make(chan error, totalVoters) 169 170 // Upvoters 171 for i := 0; i < numUpvoters; i++ { 172 go func(voterIndex int) { 173 defer wg.Done() 174 175 voterDID := fmt.Sprintf("did:plc:upvoter%d", voterIndex) 176 voterHandle := fmt.Sprintf("upvoter%d.test", voterIndex) 177 178 _, createErr := userService.CreateUser(ctx, users.CreateUserRequest{ 179 DID: voterDID, 180 Handle: voterHandle, 181 PDSURL: "http://localhost:3001", 182 }) 183 if createErr != nil { 184 errors <- fmt.Errorf("upvoter %d: failed to create user: %w", voterIndex, createErr) 185 return 186 } 187 188 voteRKey := generateTID() 189 voteEvent := &jetstream.JetstreamEvent{ 190 Did: voterDID, 191 Kind: "commit", 192 Commit: &jetstream.CommitEvent{ 193 Rev: fmt.Sprintf("rev-up-%d", voterIndex), 194 Operation: "create", 195 Collection: "social.coves.feed.vote", 196 RKey: voteRKey, 197 CID: fmt.Sprintf("bafyup%d", voterIndex), 198 Record: map[string]interface{}{ 199 "$type": "social.coves.feed.vote", 200 "subject": map[string]interface{}{ 201 "uri": testPost2URI, 202 "cid": "bafypost2", 203 }, 204 "direction": "up", 205 "createdAt": fixedTime.Format(time.RFC3339), 206 }, 207 }, 208 } 209 210 if handleErr := voteConsumer.HandleEvent(ctx, voteEvent); handleErr != nil { 211 errors <- fmt.Errorf("upvoter %d: failed to handle event: %w", voterIndex, handleErr) 212 } 213 }(i) 214 } 215 216 // Downvoters 217 for i := 0; i < numDownvoters; i++ { 218 go func(voterIndex int) { 219 defer wg.Done() 220 221 voterDID := fmt.Sprintf("did:plc:downvoter%d", voterIndex) 222 voterHandle := fmt.Sprintf("downvoter%d.test", voterIndex) 223 224 _, createErr := userService.CreateUser(ctx, users.CreateUserRequest{ 225 DID: voterDID, 226 Handle: voterHandle, 227 PDSURL: "http://localhost:3001", 228 }) 229 if createErr != nil { 230 errors <- fmt.Errorf("downvoter %d: failed to create user: %w", voterIndex, createErr) 231 return 232 } 233 234 voteRKey := generateTID() 235 voteEvent := &jetstream.JetstreamEvent{ 236 Did: voterDID, 237 Kind: "commit", 238 Commit: &jetstream.CommitEvent{ 239 Rev: fmt.Sprintf("rev-down-%d", voterIndex), 240 Operation: "create", 241 Collection: "social.coves.feed.vote", 242 RKey: voteRKey, 243 CID: fmt.Sprintf("bafydown%d", voterIndex), 244 Record: map[string]interface{}{ 245 "$type": "social.coves.feed.vote", 246 "subject": map[string]interface{}{ 247 "uri": testPost2URI, 248 "cid": "bafypost2", 249 }, 250 "direction": "down", 251 "createdAt": fixedTime.Format(time.RFC3339), 252 }, 253 }, 254 } 255 256 if handleErr := voteConsumer.HandleEvent(ctx, voteEvent); handleErr != nil { 257 errors <- fmt.Errorf("downvoter %d: failed to handle event: %w", voterIndex, handleErr) 258 } 259 }(i) 260 } 261 262 wg.Wait() 263 close(errors) 264 265 // Check for errors 266 var errorCount int 267 for err := range errors { 268 t.Logf("Error during concurrent mixed voting: %v", err) 269 errorCount++ 270 } 271 272 if errorCount > 0 { 273 t.Errorf("Expected no errors during concurrent voting, got %d errors", errorCount) 274 } 275 276 // Verify counts 277 post, err := postRepo.GetByURI(ctx, testPost2URI) 278 if err != nil { 279 t.Fatalf("Failed to get post: %v", err) 280 } 281 282 expectedScore := numUpvoters - numDownvoters 283 if post.UpvoteCount != numUpvoters { 284 t.Errorf("Expected upvote_count = %d, got %d", numUpvoters, post.UpvoteCount) 285 } 286 if post.DownvoteCount != numDownvoters { 287 t.Errorf("Expected downvote_count = %d, got %d", numDownvoters, post.DownvoteCount) 288 } 289 if post.Score != expectedScore { 290 t.Errorf("Expected score = %d, got %d", expectedScore, post.Score) 291 } 292 293 // CRITICAL: Verify actual vote records to detect race conditions 294 var actualUpvotes, actualDownvotes, distinctUpvoters, distinctDownvoters int 295 err = db.QueryRow(` 296 SELECT 297 COUNT(*) FILTER (WHERE direction = 'up'), 298 COUNT(*) FILTER (WHERE direction = 'down'), 299 COUNT(DISTINCT voter_did) FILTER (WHERE direction = 'up'), 300 COUNT(DISTINCT voter_did) FILTER (WHERE direction = 'down') 301 FROM votes WHERE subject_uri = $1 302 `, testPost2URI).Scan(&actualUpvotes, &actualDownvotes, &distinctUpvoters, &distinctDownvoters) 303 if err != nil { 304 t.Fatalf("Failed to query vote records: %v", err) 305 } 306 307 if actualUpvotes != numUpvoters { 308 t.Errorf("Expected %d upvote records, got %d (possible race condition)", numUpvoters, actualUpvotes) 309 } 310 if actualDownvotes != numDownvoters { 311 t.Errorf("Expected %d downvote records, got %d (possible race condition)", numDownvoters, actualDownvotes) 312 } 313 if distinctUpvoters != numUpvoters { 314 t.Errorf("Expected %d distinct upvoters, got %d (duplicate votes detected)", numUpvoters, distinctUpvoters) 315 } 316 if distinctDownvoters != numDownvoters { 317 t.Errorf("Expected %d distinct downvoters, got %d (duplicate votes detected)", numDownvoters, distinctDownvoters) 318 } 319 320 t.Logf("✓ Concurrent mixed voting processed correctly:") 321 t.Logf(" - Post counts: upvotes=%d, downvotes=%d, score=%d", post.UpvoteCount, post.DownvoteCount, post.Score) 322 t.Logf(" - Database records: %d upvotes from %d voters, %d downvotes from %d voters (no duplicates)", 323 actualUpvotes, distinctUpvoters, actualDownvotes, distinctDownvoters) 324 }) 325} 326 327// TestConcurrentCommenting_MultipleUsersOnSamePost tests race conditions when multiple users 328// comment on the same post simultaneously 329func TestConcurrentCommenting_MultipleUsersOnSamePost(t *testing.T) { 330 if testing.Short() { 331 t.Skip("Skipping integration test in short mode") 332 } 333 334 db := setupTestDB(t) 335 defer func() { 336 if err := db.Close(); err != nil { 337 t.Logf("Failed to close database: %v", err) 338 } 339 }() 340 341 ctx := context.Background() 342 commentRepo := postgres.NewCommentRepository(db) 343 postRepo := postgres.NewPostRepository(db) 344 userRepo := postgres.NewUserRepository(db) 345 communityRepo := postgres.NewCommunityRepository(db) 346 commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 347 348 fixedTime := time.Date(2025, 11, 16, 12, 0, 0, 0, time.UTC) 349 350 // Setup: Create test community and post 351 testCommunity, err := createFeedTestCommunity(db, ctx, "concurrent-comments", "owner.test") 352 if err != nil { 353 t.Fatalf("Failed to create test community: %v", err) 354 } 355 356 testUser := createTestUser(t, db, "author.test", "did:plc:author456") 357 postURI := createTestPost(t, db, testCommunity, testUser.DID, "Post for concurrent commenting", 0, fixedTime) 358 359 t.Run("Multiple users commenting simultaneously", func(t *testing.T) { 360 const numCommenters = 25 361 var wg sync.WaitGroup 362 wg.Add(numCommenters) 363 364 errors := make(chan error, numCommenters) 365 commentURIs := make(chan string, numCommenters) 366 367 for i := 0; i < numCommenters; i++ { 368 go func(commenterIndex int) { 369 defer wg.Done() 370 371 commenterDID := fmt.Sprintf("did:plc:commenter%d", commenterIndex) 372 commentRKey := fmt.Sprintf("%s-comment%d", generateTID(), commenterIndex) 373 commentURI := fmt.Sprintf("at://%s/social.coves.community.comment/%s", commenterDID, commentRKey) 374 375 commentEvent := &jetstream.JetstreamEvent{ 376 Did: commenterDID, 377 Kind: "commit", 378 Commit: &jetstream.CommitEvent{ 379 Rev: fmt.Sprintf("rev-comment-%d", commenterIndex), 380 Operation: "create", 381 Collection: "social.coves.community.comment", 382 RKey: commentRKey, 383 CID: fmt.Sprintf("bafycomment%d", commenterIndex), 384 Record: map[string]interface{}{ 385 "$type": "social.coves.community.comment", 386 "content": fmt.Sprintf("Concurrent comment #%d", commenterIndex), 387 "reply": map[string]interface{}{ 388 "root": map[string]interface{}{ 389 "uri": postURI, 390 "cid": "bafypost", 391 }, 392 "parent": map[string]interface{}{ 393 "uri": postURI, 394 "cid": "bafypost", 395 }, 396 }, 397 "createdAt": fixedTime.Add(time.Duration(commenterIndex) * time.Millisecond).Format(time.RFC3339), 398 }, 399 }, 400 } 401 402 if handleErr := commentConsumer.HandleEvent(ctx, commentEvent); handleErr != nil { 403 errors <- fmt.Errorf("commenter %d: failed to handle comment event: %w", commenterIndex, handleErr) 404 return 405 } 406 407 commentURIs <- commentURI 408 }(i) 409 } 410 411 wg.Wait() 412 close(errors) 413 close(commentURIs) 414 415 // Check for errors 416 var errorCount int 417 for err := range errors { 418 t.Logf("Error during concurrent commenting: %v", err) 419 errorCount++ 420 } 421 422 if errorCount > 0 { 423 t.Errorf("Expected no errors during concurrent commenting, got %d errors", errorCount) 424 } 425 426 // Verify post comment count updated correctly 427 post, err := postRepo.GetByURI(ctx, postURI) 428 if err != nil { 429 t.Fatalf("Failed to get post: %v", err) 430 } 431 432 if post.CommentCount != numCommenters { 433 t.Errorf("Expected comment_count = %d, got %d (possible race condition in count update)", numCommenters, post.CommentCount) 434 } 435 436 // CRITICAL: Verify actual comment records to detect race conditions 437 var actualCommentCount int 438 var distinctCommenters int 439 err = db.QueryRow(` 440 SELECT COUNT(*), COUNT(DISTINCT author_did) 441 FROM comments 442 WHERE post_uri = $1 AND parent_comment_uri IS NULL 443 `, postURI).Scan(&actualCommentCount, &distinctCommenters) 444 if err != nil { 445 t.Fatalf("Failed to query comment records: %v", err) 446 } 447 448 if actualCommentCount != numCommenters { 449 t.Errorf("Expected %d comment records in database, got %d (possible race condition: comments lost or duplicated)", numCommenters, actualCommentCount) 450 } 451 452 if distinctCommenters != numCommenters { 453 t.Errorf("Expected %d distinct commenters, got %d (possible duplicate comments from same author)", numCommenters, distinctCommenters) 454 } 455 456 // Verify all comments are retrievable via service 457 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo) 458 response, err := commentService.GetComments(ctx, &comments.GetCommentsRequest{ 459 PostURI: postURI, 460 Sort: "new", 461 Depth: 10, 462 Limit: 100, 463 ViewerDID: nil, 464 }) 465 if err != nil { 466 t.Fatalf("Failed to get comments: %v", err) 467 } 468 469 if len(response.Comments) != numCommenters { 470 t.Errorf("Expected %d comments in response, got %d", numCommenters, len(response.Comments)) 471 } 472 473 t.Logf("✓ %d concurrent comments processed correctly:", numCommenters) 474 t.Logf(" - Post comment_count: %d", post.CommentCount) 475 t.Logf(" - Database records: %d comments from %d distinct authors (no duplicates)", actualCommentCount, distinctCommenters) 476 }) 477 478 t.Run("Concurrent replies to same comment", func(t *testing.T) { 479 // Create a parent comment first 480 parentCommentRKey := generateTID() 481 parentCommentURI := fmt.Sprintf("at://%s/social.coves.community.comment/%s", testUser.DID, parentCommentRKey) 482 483 parentEvent := &jetstream.JetstreamEvent{ 484 Did: testUser.DID, 485 Kind: "commit", 486 Commit: &jetstream.CommitEvent{ 487 Rev: "parent-rev", 488 Operation: "create", 489 Collection: "social.coves.community.comment", 490 RKey: parentCommentRKey, 491 CID: "bafyparent", 492 Record: map[string]interface{}{ 493 "$type": "social.coves.community.comment", 494 "content": "Parent comment for replies", 495 "reply": map[string]interface{}{ 496 "root": map[string]interface{}{ 497 "uri": postURI, 498 "cid": "bafypost", 499 }, 500 "parent": map[string]interface{}{ 501 "uri": postURI, 502 "cid": "bafypost", 503 }, 504 }, 505 "createdAt": fixedTime.Format(time.RFC3339), 506 }, 507 }, 508 } 509 510 if err := commentConsumer.HandleEvent(ctx, parentEvent); err != nil { 511 t.Fatalf("Failed to create parent comment: %v", err) 512 } 513 514 // Now create concurrent replies 515 const numRepliers = 15 516 var wg sync.WaitGroup 517 wg.Add(numRepliers) 518 errors := make(chan error, numRepliers) 519 520 for i := 0; i < numRepliers; i++ { 521 go func(replierIndex int) { 522 defer wg.Done() 523 524 replierDID := fmt.Sprintf("did:plc:replier%d", replierIndex) 525 replyRKey := fmt.Sprintf("%s-reply%d", generateTID(), replierIndex) 526 527 replyEvent := &jetstream.JetstreamEvent{ 528 Did: replierDID, 529 Kind: "commit", 530 Commit: &jetstream.CommitEvent{ 531 Rev: fmt.Sprintf("rev-reply-%d", replierIndex), 532 Operation: "create", 533 Collection: "social.coves.community.comment", 534 RKey: replyRKey, 535 CID: fmt.Sprintf("bafyreply%d", replierIndex), 536 Record: map[string]interface{}{ 537 "$type": "social.coves.community.comment", 538 "content": fmt.Sprintf("Concurrent reply #%d", replierIndex), 539 "reply": map[string]interface{}{ 540 "root": map[string]interface{}{ 541 "uri": postURI, 542 "cid": "bafypost", 543 }, 544 "parent": map[string]interface{}{ 545 "uri": parentCommentURI, 546 "cid": "bafyparent", 547 }, 548 }, 549 "createdAt": fixedTime.Add(time.Duration(replierIndex) * time.Millisecond).Format(time.RFC3339), 550 }, 551 }, 552 } 553 554 if handleErr := commentConsumer.HandleEvent(ctx, replyEvent); handleErr != nil { 555 errors <- fmt.Errorf("replier %d: failed to handle reply event: %w", replierIndex, handleErr) 556 } 557 }(i) 558 } 559 560 wg.Wait() 561 close(errors) 562 563 // Check for errors 564 var errorCount int 565 for err := range errors { 566 t.Logf("Error during concurrent replying: %v", err) 567 errorCount++ 568 } 569 570 if errorCount > 0 { 571 t.Errorf("Expected no errors during concurrent replying, got %d errors", errorCount) 572 } 573 574 // Verify parent comment reply count 575 parentComment, err := commentRepo.GetByURI(ctx, parentCommentURI) 576 if err != nil { 577 t.Fatalf("Failed to get parent comment: %v", err) 578 } 579 580 if parentComment.ReplyCount != numRepliers { 581 t.Errorf("Expected reply_count = %d on parent comment, got %d (possible race condition)", numRepliers, parentComment.ReplyCount) 582 } 583 584 t.Logf("✓ %d concurrent replies processed correctly, reply_count=%d", numRepliers, parentComment.ReplyCount) 585 }) 586} 587 588// TestConcurrentCommunityCreation tests race conditions when multiple goroutines 589// try to create communities with the same handle 590func TestConcurrentCommunityCreation_DuplicateHandle(t *testing.T) { 591 if testing.Short() { 592 t.Skip("Skipping integration test in short mode") 593 } 594 595 db := setupTestDB(t) 596 defer func() { 597 if err := db.Close(); err != nil { 598 t.Logf("Failed to close database: %v", err) 599 } 600 }() 601 602 ctx := context.Background() 603 repo := postgres.NewCommunityRepository(db) 604 605 t.Run("Concurrent creation with same handle should fail", func(t *testing.T) { 606 const numAttempts = 10 607 sameHandle := fmt.Sprintf("duplicate-handle-%d.test.coves.social", time.Now().UnixNano()) 608 609 var wg sync.WaitGroup 610 wg.Add(numAttempts) 611 612 type result struct { 613 err error 614 success bool 615 } 616 results := make(chan result, numAttempts) 617 618 for i := 0; i < numAttempts; i++ { 619 go func(attemptIndex int) { 620 defer wg.Done() 621 622 // Each attempt uses a unique DID but same handle 623 uniqueDID := fmt.Sprintf("did:plc:dup-community-%d-%d", time.Now().UnixNano(), attemptIndex) 624 625 community := &communities.Community{ 626 DID: uniqueDID, 627 Handle: sameHandle, // SAME HANDLE 628 Name: fmt.Sprintf("dup-test-%d", attemptIndex), 629 DisplayName: fmt.Sprintf("Duplicate Test %d", attemptIndex), 630 Description: "Testing duplicate handle prevention", 631 OwnerDID: "did:web:test.local", 632 CreatedByDID: "did:plc:creator", 633 HostedByDID: "did:web:test.local", 634 Visibility: "public", 635 CreatedAt: time.Now(), 636 UpdatedAt: time.Now(), 637 } 638 639 _, createErr := repo.Create(ctx, community) 640 results <- result{ 641 success: createErr == nil, 642 err: createErr, 643 } 644 }(i) 645 } 646 647 wg.Wait() 648 close(results) 649 650 // Collect results 651 successCount := 0 652 duplicateErrors := 0 653 654 for res := range results { 655 if res.success { 656 successCount++ 657 } else if communities.IsConflict(res.err) { 658 duplicateErrors++ 659 } else { 660 t.Logf("Unexpected error type: %v", res.err) 661 } 662 } 663 664 // CRITICAL: Exactly ONE should succeed, rest should fail with duplicate error 665 if successCount != 1 { 666 t.Errorf("Expected exactly 1 successful creation, got %d (DATABASE CONSTRAINT VIOLATION - race condition detected)", successCount) 667 } 668 669 if duplicateErrors != numAttempts-1 { 670 t.Errorf("Expected %d duplicate errors, got %d", numAttempts-1, duplicateErrors) 671 } 672 673 t.Logf("✓ Duplicate handle protection: %d successful, %d duplicate errors (database constraint working)", successCount, duplicateErrors) 674 }) 675 676 t.Run("Concurrent creation with different handles should succeed", func(t *testing.T) { 677 const numAttempts = 10 678 var wg sync.WaitGroup 679 wg.Add(numAttempts) 680 681 errors := make(chan error, numAttempts) 682 683 for i := 0; i < numAttempts; i++ { 684 go func(attemptIndex int) { 685 defer wg.Done() 686 687 uniqueSuffix := fmt.Sprintf("%d-%d", time.Now().UnixNano(), attemptIndex) 688 community := &communities.Community{ 689 DID: generateTestDID(uniqueSuffix), 690 Handle: fmt.Sprintf("unique-handle-%s.test.coves.social", uniqueSuffix), 691 Name: fmt.Sprintf("unique-test-%s", uniqueSuffix), 692 DisplayName: fmt.Sprintf("Unique Test %d", attemptIndex), 693 Description: "Testing concurrent unique handle creation", 694 OwnerDID: "did:web:test.local", 695 CreatedByDID: "did:plc:creator", 696 HostedByDID: "did:web:test.local", 697 Visibility: "public", 698 CreatedAt: time.Now(), 699 UpdatedAt: time.Now(), 700 } 701 702 _, createErr := repo.Create(ctx, community) 703 if createErr != nil { 704 errors <- createErr 705 } 706 }(i) 707 } 708 709 wg.Wait() 710 close(errors) 711 712 // All should succeed 713 var errorCount int 714 for err := range errors { 715 t.Logf("Error during concurrent unique creation: %v", err) 716 errorCount++ 717 } 718 719 if errorCount > 0 { 720 t.Errorf("Expected all %d creations to succeed, but %d failed", numAttempts, errorCount) 721 } 722 723 t.Logf("✓ All %d concurrent community creations with unique handles succeeded", numAttempts) 724 }) 725} 726 727// TestConcurrentSubscription tests race conditions when multiple users subscribe 728// to the same community simultaneously 729func TestConcurrentSubscription_RaceConditions(t *testing.T) { 730 if testing.Short() { 731 t.Skip("Skipping integration test in short mode") 732 } 733 734 db := setupTestDB(t) 735 defer func() { 736 if err := db.Close(); err != nil { 737 t.Logf("Failed to close database: %v", err) 738 } 739 }() 740 741 ctx := context.Background() 742 communityRepo := postgres.NewCommunityRepository(db) 743 consumer := jetstream.NewCommunityEventConsumer(communityRepo, "did:web:coves.local", true, nil) 744 745 // Create test community 746 testDID := fmt.Sprintf("did:plc:test-sub-race-%d", time.Now().UnixNano()) 747 community := &communities.Community{ 748 DID: testDID, 749 Handle: fmt.Sprintf("sub-race-%d.test.coves.social", time.Now().UnixNano()), 750 Name: "sub-race-test", 751 DisplayName: "Subscription Race Test", 752 Description: "Testing subscription race conditions", 753 OwnerDID: "did:plc:owner", 754 CreatedByDID: "did:plc:creator", 755 HostedByDID: "did:web:coves.local", 756 Visibility: "public", 757 CreatedAt: time.Now(), 758 UpdatedAt: time.Now(), 759 } 760 761 created, err := communityRepo.Create(ctx, community) 762 if err != nil { 763 t.Fatalf("Failed to create test community: %v", err) 764 } 765 766 t.Run("Multiple users subscribing concurrently", func(t *testing.T) { 767 const numSubscribers = 30 768 var wg sync.WaitGroup 769 wg.Add(numSubscribers) 770 771 errors := make(chan error, numSubscribers) 772 773 for i := 0; i < numSubscribers; i++ { 774 go func(subscriberIndex int) { 775 defer wg.Done() 776 777 userDID := fmt.Sprintf("did:plc:subscriber%d", subscriberIndex) 778 rkey := fmt.Sprintf("sub-%d", subscriberIndex) 779 780 event := &jetstream.JetstreamEvent{ 781 Did: userDID, 782 Kind: "commit", 783 TimeUS: time.Now().UnixMicro(), 784 Commit: &jetstream.CommitEvent{ 785 Rev: fmt.Sprintf("rev-%d", subscriberIndex), 786 Operation: "create", 787 Collection: "social.coves.community.subscription", 788 RKey: rkey, 789 CID: fmt.Sprintf("bafysub%d", subscriberIndex), 790 Record: map[string]interface{}{ 791 "$type": "social.coves.community.subscription", 792 "subject": created.DID, 793 "createdAt": time.Now().Format(time.RFC3339), 794 "contentVisibility": float64(3), 795 }, 796 }, 797 } 798 799 if handleErr := consumer.HandleEvent(ctx, event); handleErr != nil { 800 errors <- fmt.Errorf("subscriber %d: failed to subscribe: %w", subscriberIndex, handleErr) 801 } 802 }(i) 803 } 804 805 wg.Wait() 806 close(errors) 807 808 // Check for errors 809 var errorCount int 810 for err := range errors { 811 t.Logf("Error during concurrent subscription: %v", err) 812 errorCount++ 813 } 814 815 if errorCount > 0 { 816 t.Errorf("Expected no errors during concurrent subscription, got %d errors", errorCount) 817 } 818 819 // Verify subscriber count is correct 820 updatedCommunity, err := communityRepo.GetByDID(ctx, created.DID) 821 if err != nil { 822 t.Fatalf("Failed to get updated community: %v", err) 823 } 824 825 if updatedCommunity.SubscriberCount != numSubscribers { 826 t.Errorf("Expected subscriber_count = %d, got %d (RACE CONDITION in subscriber count update)", numSubscribers, updatedCommunity.SubscriberCount) 827 } 828 829 // CRITICAL: Verify actual subscription records to detect race conditions 830 var actualSubscriptionCount int 831 var distinctSubscribers int 832 err = db.QueryRow(` 833 SELECT COUNT(*), COUNT(DISTINCT user_did) 834 FROM community_subscriptions 835 WHERE community_did = $1 836 `, created.DID).Scan(&actualSubscriptionCount, &distinctSubscribers) 837 if err != nil { 838 t.Fatalf("Failed to query subscription records: %v", err) 839 } 840 841 if actualSubscriptionCount != numSubscribers { 842 t.Errorf("Expected %d subscription records, got %d (possible race condition: subscriptions lost or duplicated)", numSubscribers, actualSubscriptionCount) 843 } 844 845 if distinctSubscribers != numSubscribers { 846 t.Errorf("Expected %d distinct subscribers, got %d (possible duplicate subscriptions)", numSubscribers, distinctSubscribers) 847 } 848 849 t.Logf("✓ %d concurrent subscriptions processed correctly:", numSubscribers) 850 t.Logf(" - Community subscriber_count: %d", updatedCommunity.SubscriberCount) 851 t.Logf(" - Database records: %d subscriptions from %d distinct users (no duplicates)", actualSubscriptionCount, distinctSubscribers) 852 }) 853 854 t.Run("Concurrent subscribe and unsubscribe", func(t *testing.T) { 855 // Create new community for this test 856 testDID2 := fmt.Sprintf("did:plc:test-sub-unsub-%d", time.Now().UnixNano()) 857 community2 := &communities.Community{ 858 DID: testDID2, 859 Handle: fmt.Sprintf("sub-unsub-%d.test.coves.social", time.Now().UnixNano()), 860 Name: "sub-unsub-test", 861 DisplayName: "Subscribe/Unsubscribe Race Test", 862 Description: "Testing concurrent subscribe/unsubscribe", 863 OwnerDID: "did:plc:owner", 864 CreatedByDID: "did:plc:creator", 865 HostedByDID: "did:web:coves.local", 866 Visibility: "public", 867 CreatedAt: time.Now(), 868 UpdatedAt: time.Now(), 869 } 870 871 created2, err := communityRepo.Create(ctx, community2) 872 if err != nil { 873 t.Fatalf("Failed to create test community: %v", err) 874 } 875 876 const numUsers = 20 877 var wg sync.WaitGroup 878 wg.Add(numUsers * 2) // Each user subscribes then unsubscribes 879 880 errors := make(chan error, numUsers*2) 881 882 for i := 0; i < numUsers; i++ { 883 go func(userIndex int) { 884 userDID := fmt.Sprintf("did:plc:subunsubuser%d", userIndex) 885 rkey := fmt.Sprintf("subunsub-%d", userIndex) 886 887 // Subscribe 888 subscribeEvent := &jetstream.JetstreamEvent{ 889 Did: userDID, 890 Kind: "commit", 891 TimeUS: time.Now().UnixMicro(), 892 Commit: &jetstream.CommitEvent{ 893 Rev: fmt.Sprintf("rev-sub-%d", userIndex), 894 Operation: "create", 895 Collection: "social.coves.community.subscription", 896 RKey: rkey, 897 CID: fmt.Sprintf("bafysubscribe%d", userIndex), 898 Record: map[string]interface{}{ 899 "$type": "social.coves.community.subscription", 900 "subject": created2.DID, 901 "createdAt": time.Now().Format(time.RFC3339), 902 "contentVisibility": float64(3), 903 }, 904 }, 905 } 906 907 if handleErr := consumer.HandleEvent(ctx, subscribeEvent); handleErr != nil { 908 errors <- fmt.Errorf("user %d: subscribe failed: %w", userIndex, handleErr) 909 } 910 wg.Done() 911 912 // Small delay to ensure subscribe happens first 913 time.Sleep(10 * time.Millisecond) 914 915 // Unsubscribe 916 unsubscribeEvent := &jetstream.JetstreamEvent{ 917 Did: userDID, 918 Kind: "commit", 919 TimeUS: time.Now().UnixMicro(), 920 Commit: &jetstream.CommitEvent{ 921 Rev: fmt.Sprintf("rev-unsub-%d", userIndex), 922 Operation: "delete", 923 Collection: "social.coves.community.subscription", 924 RKey: rkey, 925 CID: "", 926 Record: nil, 927 }, 928 } 929 930 if handleErr := consumer.HandleEvent(ctx, unsubscribeEvent); handleErr != nil { 931 errors <- fmt.Errorf("user %d: unsubscribe failed: %w", userIndex, handleErr) 932 } 933 wg.Done() 934 }(i) 935 } 936 937 wg.Wait() 938 close(errors) 939 940 // Check for errors 941 var errorCount int 942 for err := range errors { 943 t.Logf("Error during concurrent sub/unsub: %v", err) 944 errorCount++ 945 } 946 947 if errorCount > 0 { 948 t.Errorf("Expected no errors during concurrent sub/unsub, got %d errors", errorCount) 949 } 950 951 // Final subscriber count should be 0 (all unsubscribed) 952 finalCommunity, err := communityRepo.GetByDID(ctx, created2.DID) 953 if err != nil { 954 t.Fatalf("Failed to get final community: %v", err) 955 } 956 957 if finalCommunity.SubscriberCount != 0 { 958 t.Errorf("Expected subscriber_count = 0 after all unsubscribed, got %d (RACE CONDITION detected)", finalCommunity.SubscriberCount) 959 } 960 961 // CRITICAL: Verify no subscription records remain in database 962 var remainingSubscriptions int 963 err = db.QueryRow(` 964 SELECT COUNT(*) 965 FROM community_subscriptions 966 WHERE community_did = $1 967 `, created2.DID).Scan(&remainingSubscriptions) 968 if err != nil { 969 t.Fatalf("Failed to query subscription records: %v", err) 970 } 971 972 if remainingSubscriptions != 0 { 973 t.Errorf("Expected 0 subscription records after all unsubscribed, got %d (orphaned subscriptions detected)", remainingSubscriptions) 974 } 975 976 t.Logf("✓ Concurrent subscribe/unsubscribe handled correctly:") 977 t.Logf(" - Community subscriber_count: %d", finalCommunity.SubscriberCount) 978 t.Logf(" - Database records: %d subscriptions remaining (clean unsubscribe)", remainingSubscriptions) 979 }) 980}