A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/jetstream"
5 "Coves/internal/core/comments"
6 "Coves/internal/core/communities"
7 "Coves/internal/core/users"
8 "Coves/internal/db/postgres"
9 "context"
10 "fmt"
11 "sync"
12 "testing"
13 "time"
14)
15
16// TestConcurrentVoting_MultipleUsersOnSamePost tests race conditions when multiple users
17// vote on the same post simultaneously
18func TestConcurrentVoting_MultipleUsersOnSamePost(t *testing.T) {
19 if testing.Short() {
20 t.Skip("Skipping integration test in short mode")
21 }
22
23 db := setupTestDB(t)
24 defer func() {
25 if err := db.Close(); err != nil {
26 t.Logf("Failed to close database: %v", err)
27 }
28 }()
29
30 ctx := context.Background()
31 voteRepo := postgres.NewVoteRepository(db)
32 postRepo := postgres.NewPostRepository(db)
33 userRepo := postgres.NewUserRepository(db)
34 userService := users.NewUserService(userRepo, nil, "http://localhost:3001")
35 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
36
37 // Use fixed timestamp
38 fixedTime := time.Date(2025, 11, 16, 12, 0, 0, 0, time.UTC)
39
40 // Setup: Create test community and post
41 testCommunity, err := createFeedTestCommunity(db, ctx, "concurrent-votes", "owner.test")
42 if err != nil {
43 t.Fatalf("Failed to create test community: %v", err)
44 }
45
46 testUser := createTestUser(t, db, "author.test", "did:plc:author123")
47 postURI := createTestPost(t, db, testCommunity, testUser.DID, "Post for concurrent voting", 0, fixedTime)
48
49 t.Run("Multiple users upvoting same post concurrently", func(t *testing.T) {
50 const numVoters = 20
51 var wg sync.WaitGroup
52 wg.Add(numVoters)
53
54 // Channel to collect errors
55 errors := make(chan error, numVoters)
56
57 // Create voters and vote concurrently
58 for i := 0; i < numVoters; i++ {
59 go func(voterIndex int) {
60 defer wg.Done()
61
62 voterDID := fmt.Sprintf("did:plc:voter%d", voterIndex)
63 voterHandle := fmt.Sprintf("voter%d.test", voterIndex)
64
65 // Create user
66 _, createErr := userService.CreateUser(ctx, users.CreateUserRequest{
67 DID: voterDID,
68 Handle: voterHandle,
69 PDSURL: "http://localhost:3001",
70 })
71 if createErr != nil {
72 errors <- fmt.Errorf("voter %d: failed to create user: %w", voterIndex, createErr)
73 return
74 }
75
76 // Create vote
77 voteRKey := generateTID()
78 voteEvent := &jetstream.JetstreamEvent{
79 Did: voterDID,
80 Kind: "commit",
81 Commit: &jetstream.CommitEvent{
82 Rev: fmt.Sprintf("rev-%d", voterIndex),
83 Operation: "create",
84 Collection: "social.coves.feed.vote",
85 RKey: voteRKey,
86 CID: fmt.Sprintf("bafyvote%d", voterIndex),
87 Record: map[string]interface{}{
88 "$type": "social.coves.feed.vote",
89 "subject": map[string]interface{}{
90 "uri": postURI,
91 "cid": "bafypost",
92 },
93 "direction": "up",
94 "createdAt": fixedTime.Format(time.RFC3339),
95 },
96 },
97 }
98
99 if handleErr := voteConsumer.HandleEvent(ctx, voteEvent); handleErr != nil {
100 errors <- fmt.Errorf("voter %d: failed to handle vote event: %w", voterIndex, handleErr)
101 return
102 }
103 }(i)
104 }
105
106 // Wait for all goroutines to complete
107 wg.Wait()
108 close(errors)
109
110 // Check for errors
111 var errorCount int
112 for err := range errors {
113 t.Logf("Error during concurrent voting: %v", err)
114 errorCount++
115 }
116
117 if errorCount > 0 {
118 t.Errorf("Expected no errors during concurrent voting, got %d errors", errorCount)
119 }
120
121 // Verify post vote counts are correct
122 post, err := postRepo.GetByURI(ctx, postURI)
123 if err != nil {
124 t.Fatalf("Failed to get post: %v", err)
125 }
126
127 if post.UpvoteCount != numVoters {
128 t.Errorf("Expected upvote_count = %d, got %d (possible race condition in count update)", numVoters, post.UpvoteCount)
129 }
130
131 if post.Score != numVoters {
132 t.Errorf("Expected score = %d, got %d (possible race condition in score calculation)", numVoters, post.Score)
133 }
134
135 // CRITICAL: Verify actual vote records in database to detect race conditions
136 // This catches issues that aggregate counts might miss (e.g., duplicate votes, lost votes)
137 var actualVoteCount int
138 var distinctVoterCount int
139 err = db.QueryRow("SELECT COUNT(*), COUNT(DISTINCT voter_did) FROM votes WHERE subject_uri = $1 AND direction = 'up'", postURI).
140 Scan(&actualVoteCount, &distinctVoterCount)
141 if err != nil {
142 t.Fatalf("Failed to query vote records: %v", err)
143 }
144
145 if actualVoteCount != numVoters {
146 t.Errorf("Expected %d vote records in database, got %d (possible race condition: votes lost or duplicated)", numVoters, actualVoteCount)
147 }
148
149 if distinctVoterCount != numVoters {
150 t.Errorf("Expected %d distinct voters, got %d (possible race condition: duplicate votes from same voter)", numVoters, distinctVoterCount)
151 }
152
153 t.Logf("✓ %d concurrent upvotes processed correctly:", numVoters)
154 t.Logf(" - Post counts: upvote_count=%d, score=%d", post.UpvoteCount, post.Score)
155 t.Logf(" - Database records: %d votes from %d distinct voters (no duplicates)", actualVoteCount, distinctVoterCount)
156 })
157
158 t.Run("Concurrent upvotes and downvotes on same post", func(t *testing.T) {
159 // Create a new post for this test
160 testPost2URI := createTestPost(t, db, testCommunity, testUser.DID, "Post for mixed voting", 0, fixedTime)
161
162 const numUpvoters = 15
163 const numDownvoters = 10
164 const totalVoters = numUpvoters + numDownvoters
165
166 var wg sync.WaitGroup
167 wg.Add(totalVoters)
168 errors := make(chan error, totalVoters)
169
170 // Upvoters
171 for i := 0; i < numUpvoters; i++ {
172 go func(voterIndex int) {
173 defer wg.Done()
174
175 voterDID := fmt.Sprintf("did:plc:upvoter%d", voterIndex)
176 voterHandle := fmt.Sprintf("upvoter%d.test", voterIndex)
177
178 _, createErr := userService.CreateUser(ctx, users.CreateUserRequest{
179 DID: voterDID,
180 Handle: voterHandle,
181 PDSURL: "http://localhost:3001",
182 })
183 if createErr != nil {
184 errors <- fmt.Errorf("upvoter %d: failed to create user: %w", voterIndex, createErr)
185 return
186 }
187
188 voteRKey := generateTID()
189 voteEvent := &jetstream.JetstreamEvent{
190 Did: voterDID,
191 Kind: "commit",
192 Commit: &jetstream.CommitEvent{
193 Rev: fmt.Sprintf("rev-up-%d", voterIndex),
194 Operation: "create",
195 Collection: "social.coves.feed.vote",
196 RKey: voteRKey,
197 CID: fmt.Sprintf("bafyup%d", voterIndex),
198 Record: map[string]interface{}{
199 "$type": "social.coves.feed.vote",
200 "subject": map[string]interface{}{
201 "uri": testPost2URI,
202 "cid": "bafypost2",
203 },
204 "direction": "up",
205 "createdAt": fixedTime.Format(time.RFC3339),
206 },
207 },
208 }
209
210 if handleErr := voteConsumer.HandleEvent(ctx, voteEvent); handleErr != nil {
211 errors <- fmt.Errorf("upvoter %d: failed to handle event: %w", voterIndex, handleErr)
212 }
213 }(i)
214 }
215
216 // Downvoters
217 for i := 0; i < numDownvoters; i++ {
218 go func(voterIndex int) {
219 defer wg.Done()
220
221 voterDID := fmt.Sprintf("did:plc:downvoter%d", voterIndex)
222 voterHandle := fmt.Sprintf("downvoter%d.test", voterIndex)
223
224 _, createErr := userService.CreateUser(ctx, users.CreateUserRequest{
225 DID: voterDID,
226 Handle: voterHandle,
227 PDSURL: "http://localhost:3001",
228 })
229 if createErr != nil {
230 errors <- fmt.Errorf("downvoter %d: failed to create user: %w", voterIndex, createErr)
231 return
232 }
233
234 voteRKey := generateTID()
235 voteEvent := &jetstream.JetstreamEvent{
236 Did: voterDID,
237 Kind: "commit",
238 Commit: &jetstream.CommitEvent{
239 Rev: fmt.Sprintf("rev-down-%d", voterIndex),
240 Operation: "create",
241 Collection: "social.coves.feed.vote",
242 RKey: voteRKey,
243 CID: fmt.Sprintf("bafydown%d", voterIndex),
244 Record: map[string]interface{}{
245 "$type": "social.coves.feed.vote",
246 "subject": map[string]interface{}{
247 "uri": testPost2URI,
248 "cid": "bafypost2",
249 },
250 "direction": "down",
251 "createdAt": fixedTime.Format(time.RFC3339),
252 },
253 },
254 }
255
256 if handleErr := voteConsumer.HandleEvent(ctx, voteEvent); handleErr != nil {
257 errors <- fmt.Errorf("downvoter %d: failed to handle event: %w", voterIndex, handleErr)
258 }
259 }(i)
260 }
261
262 wg.Wait()
263 close(errors)
264
265 // Check for errors
266 var errorCount int
267 for err := range errors {
268 t.Logf("Error during concurrent mixed voting: %v", err)
269 errorCount++
270 }
271
272 if errorCount > 0 {
273 t.Errorf("Expected no errors during concurrent voting, got %d errors", errorCount)
274 }
275
276 // Verify counts
277 post, err := postRepo.GetByURI(ctx, testPost2URI)
278 if err != nil {
279 t.Fatalf("Failed to get post: %v", err)
280 }
281
282 expectedScore := numUpvoters - numDownvoters
283 if post.UpvoteCount != numUpvoters {
284 t.Errorf("Expected upvote_count = %d, got %d", numUpvoters, post.UpvoteCount)
285 }
286 if post.DownvoteCount != numDownvoters {
287 t.Errorf("Expected downvote_count = %d, got %d", numDownvoters, post.DownvoteCount)
288 }
289 if post.Score != expectedScore {
290 t.Errorf("Expected score = %d, got %d", expectedScore, post.Score)
291 }
292
293 // CRITICAL: Verify actual vote records to detect race conditions
294 var actualUpvotes, actualDownvotes, distinctUpvoters, distinctDownvoters int
295 err = db.QueryRow(`
296 SELECT
297 COUNT(*) FILTER (WHERE direction = 'up'),
298 COUNT(*) FILTER (WHERE direction = 'down'),
299 COUNT(DISTINCT voter_did) FILTER (WHERE direction = 'up'),
300 COUNT(DISTINCT voter_did) FILTER (WHERE direction = 'down')
301 FROM votes WHERE subject_uri = $1
302 `, testPost2URI).Scan(&actualUpvotes, &actualDownvotes, &distinctUpvoters, &distinctDownvoters)
303 if err != nil {
304 t.Fatalf("Failed to query vote records: %v", err)
305 }
306
307 if actualUpvotes != numUpvoters {
308 t.Errorf("Expected %d upvote records, got %d (possible race condition)", numUpvoters, actualUpvotes)
309 }
310 if actualDownvotes != numDownvoters {
311 t.Errorf("Expected %d downvote records, got %d (possible race condition)", numDownvoters, actualDownvotes)
312 }
313 if distinctUpvoters != numUpvoters {
314 t.Errorf("Expected %d distinct upvoters, got %d (duplicate votes detected)", numUpvoters, distinctUpvoters)
315 }
316 if distinctDownvoters != numDownvoters {
317 t.Errorf("Expected %d distinct downvoters, got %d (duplicate votes detected)", numDownvoters, distinctDownvoters)
318 }
319
320 t.Logf("✓ Concurrent mixed voting processed correctly:")
321 t.Logf(" - Post counts: upvotes=%d, downvotes=%d, score=%d", post.UpvoteCount, post.DownvoteCount, post.Score)
322 t.Logf(" - Database records: %d upvotes from %d voters, %d downvotes from %d voters (no duplicates)",
323 actualUpvotes, distinctUpvoters, actualDownvotes, distinctDownvoters)
324 })
325}
326
327// TestConcurrentCommenting_MultipleUsersOnSamePost tests race conditions when multiple users
328// comment on the same post simultaneously
329func TestConcurrentCommenting_MultipleUsersOnSamePost(t *testing.T) {
330 if testing.Short() {
331 t.Skip("Skipping integration test in short mode")
332 }
333
334 db := setupTestDB(t)
335 defer func() {
336 if err := db.Close(); err != nil {
337 t.Logf("Failed to close database: %v", err)
338 }
339 }()
340
341 ctx := context.Background()
342 commentRepo := postgres.NewCommentRepository(db)
343 postRepo := postgres.NewPostRepository(db)
344 userRepo := postgres.NewUserRepository(db)
345 communityRepo := postgres.NewCommunityRepository(db)
346 commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db)
347
348 fixedTime := time.Date(2025, 11, 16, 12, 0, 0, 0, time.UTC)
349
350 // Setup: Create test community and post
351 testCommunity, err := createFeedTestCommunity(db, ctx, "concurrent-comments", "owner.test")
352 if err != nil {
353 t.Fatalf("Failed to create test community: %v", err)
354 }
355
356 testUser := createTestUser(t, db, "author.test", "did:plc:author456")
357 postURI := createTestPost(t, db, testCommunity, testUser.DID, "Post for concurrent commenting", 0, fixedTime)
358
359 t.Run("Multiple users commenting simultaneously", func(t *testing.T) {
360 const numCommenters = 25
361 var wg sync.WaitGroup
362 wg.Add(numCommenters)
363
364 errors := make(chan error, numCommenters)
365 commentURIs := make(chan string, numCommenters)
366
367 for i := 0; i < numCommenters; i++ {
368 go func(commenterIndex int) {
369 defer wg.Done()
370
371 commenterDID := fmt.Sprintf("did:plc:commenter%d", commenterIndex)
372 commentRKey := fmt.Sprintf("%s-comment%d", generateTID(), commenterIndex)
373 commentURI := fmt.Sprintf("at://%s/social.coves.community.comment/%s", commenterDID, commentRKey)
374
375 commentEvent := &jetstream.JetstreamEvent{
376 Did: commenterDID,
377 Kind: "commit",
378 Commit: &jetstream.CommitEvent{
379 Rev: fmt.Sprintf("rev-comment-%d", commenterIndex),
380 Operation: "create",
381 Collection: "social.coves.community.comment",
382 RKey: commentRKey,
383 CID: fmt.Sprintf("bafycomment%d", commenterIndex),
384 Record: map[string]interface{}{
385 "$type": "social.coves.community.comment",
386 "content": fmt.Sprintf("Concurrent comment #%d", commenterIndex),
387 "reply": map[string]interface{}{
388 "root": map[string]interface{}{
389 "uri": postURI,
390 "cid": "bafypost",
391 },
392 "parent": map[string]interface{}{
393 "uri": postURI,
394 "cid": "bafypost",
395 },
396 },
397 "createdAt": fixedTime.Add(time.Duration(commenterIndex) * time.Millisecond).Format(time.RFC3339),
398 },
399 },
400 }
401
402 if handleErr := commentConsumer.HandleEvent(ctx, commentEvent); handleErr != nil {
403 errors <- fmt.Errorf("commenter %d: failed to handle comment event: %w", commenterIndex, handleErr)
404 return
405 }
406
407 commentURIs <- commentURI
408 }(i)
409 }
410
411 wg.Wait()
412 close(errors)
413 close(commentURIs)
414
415 // Check for errors
416 var errorCount int
417 for err := range errors {
418 t.Logf("Error during concurrent commenting: %v", err)
419 errorCount++
420 }
421
422 if errorCount > 0 {
423 t.Errorf("Expected no errors during concurrent commenting, got %d errors", errorCount)
424 }
425
426 // Verify post comment count updated correctly
427 post, err := postRepo.GetByURI(ctx, postURI)
428 if err != nil {
429 t.Fatalf("Failed to get post: %v", err)
430 }
431
432 if post.CommentCount != numCommenters {
433 t.Errorf("Expected comment_count = %d, got %d (possible race condition in count update)", numCommenters, post.CommentCount)
434 }
435
436 // CRITICAL: Verify actual comment records to detect race conditions
437 var actualCommentCount int
438 var distinctCommenters int
439 err = db.QueryRow(`
440 SELECT COUNT(*), COUNT(DISTINCT author_did)
441 FROM comments
442 WHERE post_uri = $1 AND parent_comment_uri IS NULL
443 `, postURI).Scan(&actualCommentCount, &distinctCommenters)
444 if err != nil {
445 t.Fatalf("Failed to query comment records: %v", err)
446 }
447
448 if actualCommentCount != numCommenters {
449 t.Errorf("Expected %d comment records in database, got %d (possible race condition: comments lost or duplicated)", numCommenters, actualCommentCount)
450 }
451
452 if distinctCommenters != numCommenters {
453 t.Errorf("Expected %d distinct commenters, got %d (possible duplicate comments from same author)", numCommenters, distinctCommenters)
454 }
455
456 // Verify all comments are retrievable via service
457 // Use factory constructor with nil factory - this test only uses the read path (GetComments)
458 commentService := comments.NewCommentServiceWithPDSFactory(commentRepo, userRepo, postRepo, communityRepo, nil, nil)
459 response, err := commentService.GetComments(ctx, &comments.GetCommentsRequest{
460 PostURI: postURI,
461 Sort: "new",
462 Depth: 10,
463 Limit: 100,
464 ViewerDID: nil,
465 })
466 if err != nil {
467 t.Fatalf("Failed to get comments: %v", err)
468 }
469
470 if len(response.Comments) != numCommenters {
471 t.Errorf("Expected %d comments in response, got %d", numCommenters, len(response.Comments))
472 }
473
474 t.Logf("✓ %d concurrent comments processed correctly:", numCommenters)
475 t.Logf(" - Post comment_count: %d", post.CommentCount)
476 t.Logf(" - Database records: %d comments from %d distinct authors (no duplicates)", actualCommentCount, distinctCommenters)
477 })
478
479 t.Run("Concurrent replies to same comment", func(t *testing.T) {
480 // Create a parent comment first
481 parentCommentRKey := generateTID()
482 parentCommentURI := fmt.Sprintf("at://%s/social.coves.community.comment/%s", testUser.DID, parentCommentRKey)
483
484 parentEvent := &jetstream.JetstreamEvent{
485 Did: testUser.DID,
486 Kind: "commit",
487 Commit: &jetstream.CommitEvent{
488 Rev: "parent-rev",
489 Operation: "create",
490 Collection: "social.coves.community.comment",
491 RKey: parentCommentRKey,
492 CID: "bafyparent",
493 Record: map[string]interface{}{
494 "$type": "social.coves.community.comment",
495 "content": "Parent comment for replies",
496 "reply": map[string]interface{}{
497 "root": map[string]interface{}{
498 "uri": postURI,
499 "cid": "bafypost",
500 },
501 "parent": map[string]interface{}{
502 "uri": postURI,
503 "cid": "bafypost",
504 },
505 },
506 "createdAt": fixedTime.Format(time.RFC3339),
507 },
508 },
509 }
510
511 if err := commentConsumer.HandleEvent(ctx, parentEvent); err != nil {
512 t.Fatalf("Failed to create parent comment: %v", err)
513 }
514
515 // Now create concurrent replies
516 const numRepliers = 15
517 var wg sync.WaitGroup
518 wg.Add(numRepliers)
519 errors := make(chan error, numRepliers)
520
521 for i := 0; i < numRepliers; i++ {
522 go func(replierIndex int) {
523 defer wg.Done()
524
525 replierDID := fmt.Sprintf("did:plc:replier%d", replierIndex)
526 replyRKey := fmt.Sprintf("%s-reply%d", generateTID(), replierIndex)
527
528 replyEvent := &jetstream.JetstreamEvent{
529 Did: replierDID,
530 Kind: "commit",
531 Commit: &jetstream.CommitEvent{
532 Rev: fmt.Sprintf("rev-reply-%d", replierIndex),
533 Operation: "create",
534 Collection: "social.coves.community.comment",
535 RKey: replyRKey,
536 CID: fmt.Sprintf("bafyreply%d", replierIndex),
537 Record: map[string]interface{}{
538 "$type": "social.coves.community.comment",
539 "content": fmt.Sprintf("Concurrent reply #%d", replierIndex),
540 "reply": map[string]interface{}{
541 "root": map[string]interface{}{
542 "uri": postURI,
543 "cid": "bafypost",
544 },
545 "parent": map[string]interface{}{
546 "uri": parentCommentURI,
547 "cid": "bafyparent",
548 },
549 },
550 "createdAt": fixedTime.Add(time.Duration(replierIndex) * time.Millisecond).Format(time.RFC3339),
551 },
552 },
553 }
554
555 if handleErr := commentConsumer.HandleEvent(ctx, replyEvent); handleErr != nil {
556 errors <- fmt.Errorf("replier %d: failed to handle reply event: %w", replierIndex, handleErr)
557 }
558 }(i)
559 }
560
561 wg.Wait()
562 close(errors)
563
564 // Check for errors
565 var errorCount int
566 for err := range errors {
567 t.Logf("Error during concurrent replying: %v", err)
568 errorCount++
569 }
570
571 if errorCount > 0 {
572 t.Errorf("Expected no errors during concurrent replying, got %d errors", errorCount)
573 }
574
575 // Verify parent comment reply count
576 parentComment, err := commentRepo.GetByURI(ctx, parentCommentURI)
577 if err != nil {
578 t.Fatalf("Failed to get parent comment: %v", err)
579 }
580
581 if parentComment.ReplyCount != numRepliers {
582 t.Errorf("Expected reply_count = %d on parent comment, got %d (possible race condition)", numRepliers, parentComment.ReplyCount)
583 }
584
585 t.Logf("✓ %d concurrent replies processed correctly, reply_count=%d", numRepliers, parentComment.ReplyCount)
586 })
587}
588
589// TestConcurrentCommunityCreation tests race conditions when multiple goroutines
590// try to create communities with the same handle
591func TestConcurrentCommunityCreation_DuplicateHandle(t *testing.T) {
592 if testing.Short() {
593 t.Skip("Skipping integration test in short mode")
594 }
595
596 db := setupTestDB(t)
597 defer func() {
598 if err := db.Close(); err != nil {
599 t.Logf("Failed to close database: %v", err)
600 }
601 }()
602
603 ctx := context.Background()
604 repo := postgres.NewCommunityRepository(db)
605
606 t.Run("Concurrent creation with same handle should fail", func(t *testing.T) {
607 const numAttempts = 10
608 sameHandle := fmt.Sprintf("duplicate-handle-%d.test.coves.social", time.Now().UnixNano())
609
610 var wg sync.WaitGroup
611 wg.Add(numAttempts)
612
613 type result struct {
614 err error
615 success bool
616 }
617 results := make(chan result, numAttempts)
618
619 for i := 0; i < numAttempts; i++ {
620 go func(attemptIndex int) {
621 defer wg.Done()
622
623 // Each attempt uses a unique DID but same handle
624 uniqueDID := fmt.Sprintf("did:plc:dup-community-%d-%d", time.Now().UnixNano(), attemptIndex)
625
626 community := &communities.Community{
627 DID: uniqueDID,
628 Handle: sameHandle, // SAME HANDLE
629 Name: fmt.Sprintf("dup-test-%d", attemptIndex),
630 DisplayName: fmt.Sprintf("Duplicate Test %d", attemptIndex),
631 Description: "Testing duplicate handle prevention",
632 OwnerDID: "did:web:test.local",
633 CreatedByDID: "did:plc:creator",
634 HostedByDID: "did:web:test.local",
635 Visibility: "public",
636 CreatedAt: time.Now(),
637 UpdatedAt: time.Now(),
638 }
639
640 _, createErr := repo.Create(ctx, community)
641 results <- result{
642 success: createErr == nil,
643 err: createErr,
644 }
645 }(i)
646 }
647
648 wg.Wait()
649 close(results)
650
651 // Collect results
652 successCount := 0
653 duplicateErrors := 0
654
655 for res := range results {
656 if res.success {
657 successCount++
658 } else if communities.IsConflict(res.err) {
659 duplicateErrors++
660 } else {
661 t.Logf("Unexpected error type: %v", res.err)
662 }
663 }
664
665 // CRITICAL: Exactly ONE should succeed, rest should fail with duplicate error
666 if successCount != 1 {
667 t.Errorf("Expected exactly 1 successful creation, got %d (DATABASE CONSTRAINT VIOLATION - race condition detected)", successCount)
668 }
669
670 if duplicateErrors != numAttempts-1 {
671 t.Errorf("Expected %d duplicate errors, got %d", numAttempts-1, duplicateErrors)
672 }
673
674 t.Logf("✓ Duplicate handle protection: %d successful, %d duplicate errors (database constraint working)", successCount, duplicateErrors)
675 })
676
677 t.Run("Concurrent creation with different handles should succeed", func(t *testing.T) {
678 const numAttempts = 10
679 var wg sync.WaitGroup
680 wg.Add(numAttempts)
681
682 errors := make(chan error, numAttempts)
683
684 for i := 0; i < numAttempts; i++ {
685 go func(attemptIndex int) {
686 defer wg.Done()
687
688 uniqueSuffix := fmt.Sprintf("%d-%d", time.Now().UnixNano(), attemptIndex)
689 community := &communities.Community{
690 DID: generateTestDID(uniqueSuffix),
691 Handle: fmt.Sprintf("unique-handle-%s.test.coves.social", uniqueSuffix),
692 Name: fmt.Sprintf("unique-test-%s", uniqueSuffix),
693 DisplayName: fmt.Sprintf("Unique Test %d", attemptIndex),
694 Description: "Testing concurrent unique handle creation",
695 OwnerDID: "did:web:test.local",
696 CreatedByDID: "did:plc:creator",
697 HostedByDID: "did:web:test.local",
698 Visibility: "public",
699 CreatedAt: time.Now(),
700 UpdatedAt: time.Now(),
701 }
702
703 _, createErr := repo.Create(ctx, community)
704 if createErr != nil {
705 errors <- createErr
706 }
707 }(i)
708 }
709
710 wg.Wait()
711 close(errors)
712
713 // All should succeed
714 var errorCount int
715 for err := range errors {
716 t.Logf("Error during concurrent unique creation: %v", err)
717 errorCount++
718 }
719
720 if errorCount > 0 {
721 t.Errorf("Expected all %d creations to succeed, but %d failed", numAttempts, errorCount)
722 }
723
724 t.Logf("✓ All %d concurrent community creations with unique handles succeeded", numAttempts)
725 })
726}
727
728// TestConcurrentSubscription tests race conditions when multiple users subscribe
729// to the same community simultaneously
730func TestConcurrentSubscription_RaceConditions(t *testing.T) {
731 if testing.Short() {
732 t.Skip("Skipping integration test in short mode")
733 }
734
735 db := setupTestDB(t)
736 defer func() {
737 if err := db.Close(); err != nil {
738 t.Logf("Failed to close database: %v", err)
739 }
740 }()
741
742 ctx := context.Background()
743 communityRepo := postgres.NewCommunityRepository(db)
744 consumer := jetstream.NewCommunityEventConsumer(communityRepo, "did:web:coves.local", true, nil)
745
746 // Create test community
747 testDID := fmt.Sprintf("did:plc:test-sub-race-%d", time.Now().UnixNano())
748 community := &communities.Community{
749 DID: testDID,
750 Handle: fmt.Sprintf("sub-race-%d.test.coves.social", time.Now().UnixNano()),
751 Name: "sub-race-test",
752 DisplayName: "Subscription Race Test",
753 Description: "Testing subscription race conditions",
754 OwnerDID: "did:plc:owner",
755 CreatedByDID: "did:plc:creator",
756 HostedByDID: "did:web:coves.local",
757 Visibility: "public",
758 CreatedAt: time.Now(),
759 UpdatedAt: time.Now(),
760 }
761
762 created, err := communityRepo.Create(ctx, community)
763 if err != nil {
764 t.Fatalf("Failed to create test community: %v", err)
765 }
766
767 t.Run("Multiple users subscribing concurrently", func(t *testing.T) {
768 const numSubscribers = 30
769 var wg sync.WaitGroup
770 wg.Add(numSubscribers)
771
772 errors := make(chan error, numSubscribers)
773
774 for i := 0; i < numSubscribers; i++ {
775 go func(subscriberIndex int) {
776 defer wg.Done()
777
778 userDID := fmt.Sprintf("did:plc:subscriber%d", subscriberIndex)
779 rkey := fmt.Sprintf("sub-%d", subscriberIndex)
780
781 event := &jetstream.JetstreamEvent{
782 Did: userDID,
783 Kind: "commit",
784 TimeUS: time.Now().UnixMicro(),
785 Commit: &jetstream.CommitEvent{
786 Rev: fmt.Sprintf("rev-%d", subscriberIndex),
787 Operation: "create",
788 Collection: "social.coves.community.subscription",
789 RKey: rkey,
790 CID: fmt.Sprintf("bafysub%d", subscriberIndex),
791 Record: map[string]interface{}{
792 "$type": "social.coves.community.subscription",
793 "subject": created.DID,
794 "createdAt": time.Now().Format(time.RFC3339),
795 "contentVisibility": float64(3),
796 },
797 },
798 }
799
800 if handleErr := consumer.HandleEvent(ctx, event); handleErr != nil {
801 errors <- fmt.Errorf("subscriber %d: failed to subscribe: %w", subscriberIndex, handleErr)
802 }
803 }(i)
804 }
805
806 wg.Wait()
807 close(errors)
808
809 // Check for errors
810 var errorCount int
811 for err := range errors {
812 t.Logf("Error during concurrent subscription: %v", err)
813 errorCount++
814 }
815
816 if errorCount > 0 {
817 t.Errorf("Expected no errors during concurrent subscription, got %d errors", errorCount)
818 }
819
820 // Verify subscriber count is correct
821 updatedCommunity, err := communityRepo.GetByDID(ctx, created.DID)
822 if err != nil {
823 t.Fatalf("Failed to get updated community: %v", err)
824 }
825
826 if updatedCommunity.SubscriberCount != numSubscribers {
827 t.Errorf("Expected subscriber_count = %d, got %d (RACE CONDITION in subscriber count update)", numSubscribers, updatedCommunity.SubscriberCount)
828 }
829
830 // CRITICAL: Verify actual subscription records to detect race conditions
831 var actualSubscriptionCount int
832 var distinctSubscribers int
833 err = db.QueryRow(`
834 SELECT COUNT(*), COUNT(DISTINCT user_did)
835 FROM community_subscriptions
836 WHERE community_did = $1
837 `, created.DID).Scan(&actualSubscriptionCount, &distinctSubscribers)
838 if err != nil {
839 t.Fatalf("Failed to query subscription records: %v", err)
840 }
841
842 if actualSubscriptionCount != numSubscribers {
843 t.Errorf("Expected %d subscription records, got %d (possible race condition: subscriptions lost or duplicated)", numSubscribers, actualSubscriptionCount)
844 }
845
846 if distinctSubscribers != numSubscribers {
847 t.Errorf("Expected %d distinct subscribers, got %d (possible duplicate subscriptions)", numSubscribers, distinctSubscribers)
848 }
849
850 t.Logf("✓ %d concurrent subscriptions processed correctly:", numSubscribers)
851 t.Logf(" - Community subscriber_count: %d", updatedCommunity.SubscriberCount)
852 t.Logf(" - Database records: %d subscriptions from %d distinct users (no duplicates)", actualSubscriptionCount, distinctSubscribers)
853 })
854
855 t.Run("Concurrent subscribe and unsubscribe", func(t *testing.T) {
856 // Create new community for this test
857 testDID2 := fmt.Sprintf("did:plc:test-sub-unsub-%d", time.Now().UnixNano())
858 community2 := &communities.Community{
859 DID: testDID2,
860 Handle: fmt.Sprintf("sub-unsub-%d.test.coves.social", time.Now().UnixNano()),
861 Name: "sub-unsub-test",
862 DisplayName: "Subscribe/Unsubscribe Race Test",
863 Description: "Testing concurrent subscribe/unsubscribe",
864 OwnerDID: "did:plc:owner",
865 CreatedByDID: "did:plc:creator",
866 HostedByDID: "did:web:coves.local",
867 Visibility: "public",
868 CreatedAt: time.Now(),
869 UpdatedAt: time.Now(),
870 }
871
872 created2, err := communityRepo.Create(ctx, community2)
873 if err != nil {
874 t.Fatalf("Failed to create test community: %v", err)
875 }
876
877 const numUsers = 20
878 var wg sync.WaitGroup
879 wg.Add(numUsers * 2) // Each user subscribes then unsubscribes
880
881 errors := make(chan error, numUsers*2)
882
883 for i := 0; i < numUsers; i++ {
884 go func(userIndex int) {
885 userDID := fmt.Sprintf("did:plc:subunsubuser%d", userIndex)
886 rkey := fmt.Sprintf("subunsub-%d", userIndex)
887
888 // Subscribe
889 subscribeEvent := &jetstream.JetstreamEvent{
890 Did: userDID,
891 Kind: "commit",
892 TimeUS: time.Now().UnixMicro(),
893 Commit: &jetstream.CommitEvent{
894 Rev: fmt.Sprintf("rev-sub-%d", userIndex),
895 Operation: "create",
896 Collection: "social.coves.community.subscription",
897 RKey: rkey,
898 CID: fmt.Sprintf("bafysubscribe%d", userIndex),
899 Record: map[string]interface{}{
900 "$type": "social.coves.community.subscription",
901 "subject": created2.DID,
902 "createdAt": time.Now().Format(time.RFC3339),
903 "contentVisibility": float64(3),
904 },
905 },
906 }
907
908 if handleErr := consumer.HandleEvent(ctx, subscribeEvent); handleErr != nil {
909 errors <- fmt.Errorf("user %d: subscribe failed: %w", userIndex, handleErr)
910 }
911 wg.Done()
912
913 // Small delay to ensure subscribe happens first
914 time.Sleep(10 * time.Millisecond)
915
916 // Unsubscribe
917 unsubscribeEvent := &jetstream.JetstreamEvent{
918 Did: userDID,
919 Kind: "commit",
920 TimeUS: time.Now().UnixMicro(),
921 Commit: &jetstream.CommitEvent{
922 Rev: fmt.Sprintf("rev-unsub-%d", userIndex),
923 Operation: "delete",
924 Collection: "social.coves.community.subscription",
925 RKey: rkey,
926 CID: "",
927 Record: nil,
928 },
929 }
930
931 if handleErr := consumer.HandleEvent(ctx, unsubscribeEvent); handleErr != nil {
932 errors <- fmt.Errorf("user %d: unsubscribe failed: %w", userIndex, handleErr)
933 }
934 wg.Done()
935 }(i)
936 }
937
938 wg.Wait()
939 close(errors)
940
941 // Check for errors
942 var errorCount int
943 for err := range errors {
944 t.Logf("Error during concurrent sub/unsub: %v", err)
945 errorCount++
946 }
947
948 if errorCount > 0 {
949 t.Errorf("Expected no errors during concurrent sub/unsub, got %d errors", errorCount)
950 }
951
952 // Final subscriber count should be 0 (all unsubscribed)
953 finalCommunity, err := communityRepo.GetByDID(ctx, created2.DID)
954 if err != nil {
955 t.Fatalf("Failed to get final community: %v", err)
956 }
957
958 if finalCommunity.SubscriberCount != 0 {
959 t.Errorf("Expected subscriber_count = 0 after all unsubscribed, got %d (RACE CONDITION detected)", finalCommunity.SubscriberCount)
960 }
961
962 // CRITICAL: Verify no subscription records remain in database
963 var remainingSubscriptions int
964 err = db.QueryRow(`
965 SELECT COUNT(*)
966 FROM community_subscriptions
967 WHERE community_did = $1
968 `, created2.DID).Scan(&remainingSubscriptions)
969 if err != nil {
970 t.Fatalf("Failed to query subscription records: %v", err)
971 }
972
973 if remainingSubscriptions != 0 {
974 t.Errorf("Expected 0 subscription records after all unsubscribed, got %d (orphaned subscriptions detected)", remainingSubscriptions)
975 }
976
977 t.Logf("✓ Concurrent subscribe/unsubscribe handled correctly:")
978 t.Logf(" - Community subscriber_count: %d", finalCommunity.SubscriberCount)
979 t.Logf(" - Database records: %d subscriptions remaining (clean unsubscribe)", remainingSubscriptions)
980 })
981}