A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/jetstream"
5 "Coves/internal/core/comments"
6 "Coves/internal/core/communities"
7 "Coves/internal/core/users"
8 "Coves/internal/db/postgres"
9 "context"
10 "fmt"
11 "sync"
12 "testing"
13 "time"
14)
15
16// TestConcurrentVoting_MultipleUsersOnSamePost tests race conditions when multiple users
17// vote on the same post simultaneously
18func TestConcurrentVoting_MultipleUsersOnSamePost(t *testing.T) {
19 if testing.Short() {
20 t.Skip("Skipping integration test in short mode")
21 }
22
23 db := setupTestDB(t)
24 defer func() {
25 if err := db.Close(); err != nil {
26 t.Logf("Failed to close database: %v", err)
27 }
28 }()
29
30 ctx := context.Background()
31 voteRepo := postgres.NewVoteRepository(db)
32 postRepo := postgres.NewPostRepository(db)
33 userRepo := postgres.NewUserRepository(db)
34 userService := users.NewUserService(userRepo, nil, "http://localhost:3001")
35 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
36
37 // Use fixed timestamp
38 fixedTime := time.Date(2025, 11, 16, 12, 0, 0, 0, time.UTC)
39
40 // Setup: Create test community and post
41 testCommunity, err := createFeedTestCommunity(db, ctx, "concurrent-votes", "owner.test")
42 if err != nil {
43 t.Fatalf("Failed to create test community: %v", err)
44 }
45
46 testUser := createTestUser(t, db, "author.test", "did:plc:author123")
47 postURI := createTestPost(t, db, testCommunity, testUser.DID, "Post for concurrent voting", 0, fixedTime)
48
49 t.Run("Multiple users upvoting same post concurrently", func(t *testing.T) {
50 const numVoters = 20
51 var wg sync.WaitGroup
52 wg.Add(numVoters)
53
54 // Channel to collect errors
55 errors := make(chan error, numVoters)
56
57 // Create voters and vote concurrently
58 for i := 0; i < numVoters; i++ {
59 go func(voterIndex int) {
60 defer wg.Done()
61
62 voterDID := fmt.Sprintf("did:plc:voter%d", voterIndex)
63 voterHandle := fmt.Sprintf("voter%d.test", voterIndex)
64
65 // Create user
66 _, createErr := userService.CreateUser(ctx, users.CreateUserRequest{
67 DID: voterDID,
68 Handle: voterHandle,
69 PDSURL: "http://localhost:3001",
70 })
71 if createErr != nil {
72 errors <- fmt.Errorf("voter %d: failed to create user: %w", voterIndex, createErr)
73 return
74 }
75
76 // Create vote
77 voteRKey := generateTID()
78 voteEvent := &jetstream.JetstreamEvent{
79 Did: voterDID,
80 Kind: "commit",
81 Commit: &jetstream.CommitEvent{
82 Rev: fmt.Sprintf("rev-%d", voterIndex),
83 Operation: "create",
84 Collection: "social.coves.feed.vote",
85 RKey: voteRKey,
86 CID: fmt.Sprintf("bafyvote%d", voterIndex),
87 Record: map[string]interface{}{
88 "$type": "social.coves.feed.vote",
89 "subject": map[string]interface{}{
90 "uri": postURI,
91 "cid": "bafypost",
92 },
93 "direction": "up",
94 "createdAt": fixedTime.Format(time.RFC3339),
95 },
96 },
97 }
98
99 if handleErr := voteConsumer.HandleEvent(ctx, voteEvent); handleErr != nil {
100 errors <- fmt.Errorf("voter %d: failed to handle vote event: %w", voterIndex, handleErr)
101 return
102 }
103 }(i)
104 }
105
106 // Wait for all goroutines to complete
107 wg.Wait()
108 close(errors)
109
110 // Check for errors
111 var errorCount int
112 for err := range errors {
113 t.Logf("Error during concurrent voting: %v", err)
114 errorCount++
115 }
116
117 if errorCount > 0 {
118 t.Errorf("Expected no errors during concurrent voting, got %d errors", errorCount)
119 }
120
121 // Verify post vote counts are correct
122 post, err := postRepo.GetByURI(ctx, postURI)
123 if err != nil {
124 t.Fatalf("Failed to get post: %v", err)
125 }
126
127 if post.UpvoteCount != numVoters {
128 t.Errorf("Expected upvote_count = %d, got %d (possible race condition in count update)", numVoters, post.UpvoteCount)
129 }
130
131 if post.Score != numVoters {
132 t.Errorf("Expected score = %d, got %d (possible race condition in score calculation)", numVoters, post.Score)
133 }
134
135 // CRITICAL: Verify actual vote records in database to detect race conditions
136 // This catches issues that aggregate counts might miss (e.g., duplicate votes, lost votes)
137 var actualVoteCount int
138 var distinctVoterCount int
139 err = db.QueryRow("SELECT COUNT(*), COUNT(DISTINCT voter_did) FROM votes WHERE subject_uri = $1 AND direction = 'up'", postURI).
140 Scan(&actualVoteCount, &distinctVoterCount)
141 if err != nil {
142 t.Fatalf("Failed to query vote records: %v", err)
143 }
144
145 if actualVoteCount != numVoters {
146 t.Errorf("Expected %d vote records in database, got %d (possible race condition: votes lost or duplicated)", numVoters, actualVoteCount)
147 }
148
149 if distinctVoterCount != numVoters {
150 t.Errorf("Expected %d distinct voters, got %d (possible race condition: duplicate votes from same voter)", numVoters, distinctVoterCount)
151 }
152
153 t.Logf("✓ %d concurrent upvotes processed correctly:", numVoters)
154 t.Logf(" - Post counts: upvote_count=%d, score=%d", post.UpvoteCount, post.Score)
155 t.Logf(" - Database records: %d votes from %d distinct voters (no duplicates)", actualVoteCount, distinctVoterCount)
156 })
157
158 t.Run("Concurrent upvotes and downvotes on same post", func(t *testing.T) {
159 // Create a new post for this test
160 testPost2URI := createTestPost(t, db, testCommunity, testUser.DID, "Post for mixed voting", 0, fixedTime)
161
162 const numUpvoters = 15
163 const numDownvoters = 10
164 const totalVoters = numUpvoters + numDownvoters
165
166 var wg sync.WaitGroup
167 wg.Add(totalVoters)
168 errors := make(chan error, totalVoters)
169
170 // Upvoters
171 for i := 0; i < numUpvoters; i++ {
172 go func(voterIndex int) {
173 defer wg.Done()
174
175 voterDID := fmt.Sprintf("did:plc:upvoter%d", voterIndex)
176 voterHandle := fmt.Sprintf("upvoter%d.test", voterIndex)
177
178 _, createErr := userService.CreateUser(ctx, users.CreateUserRequest{
179 DID: voterDID,
180 Handle: voterHandle,
181 PDSURL: "http://localhost:3001",
182 })
183 if createErr != nil {
184 errors <- fmt.Errorf("upvoter %d: failed to create user: %w", voterIndex, createErr)
185 return
186 }
187
188 voteRKey := generateTID()
189 voteEvent := &jetstream.JetstreamEvent{
190 Did: voterDID,
191 Kind: "commit",
192 Commit: &jetstream.CommitEvent{
193 Rev: fmt.Sprintf("rev-up-%d", voterIndex),
194 Operation: "create",
195 Collection: "social.coves.feed.vote",
196 RKey: voteRKey,
197 CID: fmt.Sprintf("bafyup%d", voterIndex),
198 Record: map[string]interface{}{
199 "$type": "social.coves.feed.vote",
200 "subject": map[string]interface{}{
201 "uri": testPost2URI,
202 "cid": "bafypost2",
203 },
204 "direction": "up",
205 "createdAt": fixedTime.Format(time.RFC3339),
206 },
207 },
208 }
209
210 if handleErr := voteConsumer.HandleEvent(ctx, voteEvent); handleErr != nil {
211 errors <- fmt.Errorf("upvoter %d: failed to handle event: %w", voterIndex, handleErr)
212 }
213 }(i)
214 }
215
216 // Downvoters
217 for i := 0; i < numDownvoters; i++ {
218 go func(voterIndex int) {
219 defer wg.Done()
220
221 voterDID := fmt.Sprintf("did:plc:downvoter%d", voterIndex)
222 voterHandle := fmt.Sprintf("downvoter%d.test", voterIndex)
223
224 _, createErr := userService.CreateUser(ctx, users.CreateUserRequest{
225 DID: voterDID,
226 Handle: voterHandle,
227 PDSURL: "http://localhost:3001",
228 })
229 if createErr != nil {
230 errors <- fmt.Errorf("downvoter %d: failed to create user: %w", voterIndex, createErr)
231 return
232 }
233
234 voteRKey := generateTID()
235 voteEvent := &jetstream.JetstreamEvent{
236 Did: voterDID,
237 Kind: "commit",
238 Commit: &jetstream.CommitEvent{
239 Rev: fmt.Sprintf("rev-down-%d", voterIndex),
240 Operation: "create",
241 Collection: "social.coves.feed.vote",
242 RKey: voteRKey,
243 CID: fmt.Sprintf("bafydown%d", voterIndex),
244 Record: map[string]interface{}{
245 "$type": "social.coves.feed.vote",
246 "subject": map[string]interface{}{
247 "uri": testPost2URI,
248 "cid": "bafypost2",
249 },
250 "direction": "down",
251 "createdAt": fixedTime.Format(time.RFC3339),
252 },
253 },
254 }
255
256 if handleErr := voteConsumer.HandleEvent(ctx, voteEvent); handleErr != nil {
257 errors <- fmt.Errorf("downvoter %d: failed to handle event: %w", voterIndex, handleErr)
258 }
259 }(i)
260 }
261
262 wg.Wait()
263 close(errors)
264
265 // Check for errors
266 var errorCount int
267 for err := range errors {
268 t.Logf("Error during concurrent mixed voting: %v", err)
269 errorCount++
270 }
271
272 if errorCount > 0 {
273 t.Errorf("Expected no errors during concurrent voting, got %d errors", errorCount)
274 }
275
276 // Verify counts
277 post, err := postRepo.GetByURI(ctx, testPost2URI)
278 if err != nil {
279 t.Fatalf("Failed to get post: %v", err)
280 }
281
282 expectedScore := numUpvoters - numDownvoters
283 if post.UpvoteCount != numUpvoters {
284 t.Errorf("Expected upvote_count = %d, got %d", numUpvoters, post.UpvoteCount)
285 }
286 if post.DownvoteCount != numDownvoters {
287 t.Errorf("Expected downvote_count = %d, got %d", numDownvoters, post.DownvoteCount)
288 }
289 if post.Score != expectedScore {
290 t.Errorf("Expected score = %d, got %d", expectedScore, post.Score)
291 }
292
293 // CRITICAL: Verify actual vote records to detect race conditions
294 var actualUpvotes, actualDownvotes, distinctUpvoters, distinctDownvoters int
295 err = db.QueryRow(`
296 SELECT
297 COUNT(*) FILTER (WHERE direction = 'up'),
298 COUNT(*) FILTER (WHERE direction = 'down'),
299 COUNT(DISTINCT voter_did) FILTER (WHERE direction = 'up'),
300 COUNT(DISTINCT voter_did) FILTER (WHERE direction = 'down')
301 FROM votes WHERE subject_uri = $1
302 `, testPost2URI).Scan(&actualUpvotes, &actualDownvotes, &distinctUpvoters, &distinctDownvoters)
303 if err != nil {
304 t.Fatalf("Failed to query vote records: %v", err)
305 }
306
307 if actualUpvotes != numUpvoters {
308 t.Errorf("Expected %d upvote records, got %d (possible race condition)", numUpvoters, actualUpvotes)
309 }
310 if actualDownvotes != numDownvoters {
311 t.Errorf("Expected %d downvote records, got %d (possible race condition)", numDownvoters, actualDownvotes)
312 }
313 if distinctUpvoters != numUpvoters {
314 t.Errorf("Expected %d distinct upvoters, got %d (duplicate votes detected)", numUpvoters, distinctUpvoters)
315 }
316 if distinctDownvoters != numDownvoters {
317 t.Errorf("Expected %d distinct downvoters, got %d (duplicate votes detected)", numDownvoters, distinctDownvoters)
318 }
319
320 t.Logf("✓ Concurrent mixed voting processed correctly:")
321 t.Logf(" - Post counts: upvotes=%d, downvotes=%d, score=%d", post.UpvoteCount, post.DownvoteCount, post.Score)
322 t.Logf(" - Database records: %d upvotes from %d voters, %d downvotes from %d voters (no duplicates)",
323 actualUpvotes, distinctUpvoters, actualDownvotes, distinctDownvoters)
324 })
325}
326
327// TestConcurrentCommenting_MultipleUsersOnSamePost tests race conditions when multiple users
328// comment on the same post simultaneously
329func TestConcurrentCommenting_MultipleUsersOnSamePost(t *testing.T) {
330 if testing.Short() {
331 t.Skip("Skipping integration test in short mode")
332 }
333
334 db := setupTestDB(t)
335 defer func() {
336 if err := db.Close(); err != nil {
337 t.Logf("Failed to close database: %v", err)
338 }
339 }()
340
341 ctx := context.Background()
342 commentRepo := postgres.NewCommentRepository(db)
343 postRepo := postgres.NewPostRepository(db)
344 userRepo := postgres.NewUserRepository(db)
345 communityRepo := postgres.NewCommunityRepository(db)
346 commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db)
347
348 fixedTime := time.Date(2025, 11, 16, 12, 0, 0, 0, time.UTC)
349
350 // Setup: Create test community and post
351 testCommunity, err := createFeedTestCommunity(db, ctx, "concurrent-comments", "owner.test")
352 if err != nil {
353 t.Fatalf("Failed to create test community: %v", err)
354 }
355
356 testUser := createTestUser(t, db, "author.test", "did:plc:author456")
357 postURI := createTestPost(t, db, testCommunity, testUser.DID, "Post for concurrent commenting", 0, fixedTime)
358
359 t.Run("Multiple users commenting simultaneously", func(t *testing.T) {
360 const numCommenters = 25
361 var wg sync.WaitGroup
362 wg.Add(numCommenters)
363
364 errors := make(chan error, numCommenters)
365 commentURIs := make(chan string, numCommenters)
366
367 for i := 0; i < numCommenters; i++ {
368 go func(commenterIndex int) {
369 defer wg.Done()
370
371 commenterDID := fmt.Sprintf("did:plc:commenter%d", commenterIndex)
372 commentRKey := fmt.Sprintf("%s-comment%d", generateTID(), commenterIndex)
373 commentURI := fmt.Sprintf("at://%s/social.coves.community.comment/%s", commenterDID, commentRKey)
374
375 commentEvent := &jetstream.JetstreamEvent{
376 Did: commenterDID,
377 Kind: "commit",
378 Commit: &jetstream.CommitEvent{
379 Rev: fmt.Sprintf("rev-comment-%d", commenterIndex),
380 Operation: "create",
381 Collection: "social.coves.community.comment",
382 RKey: commentRKey,
383 CID: fmt.Sprintf("bafycomment%d", commenterIndex),
384 Record: map[string]interface{}{
385 "$type": "social.coves.community.comment",
386 "content": fmt.Sprintf("Concurrent comment #%d", commenterIndex),
387 "reply": map[string]interface{}{
388 "root": map[string]interface{}{
389 "uri": postURI,
390 "cid": "bafypost",
391 },
392 "parent": map[string]interface{}{
393 "uri": postURI,
394 "cid": "bafypost",
395 },
396 },
397 "createdAt": fixedTime.Add(time.Duration(commenterIndex) * time.Millisecond).Format(time.RFC3339),
398 },
399 },
400 }
401
402 if handleErr := commentConsumer.HandleEvent(ctx, commentEvent); handleErr != nil {
403 errors <- fmt.Errorf("commenter %d: failed to handle comment event: %w", commenterIndex, handleErr)
404 return
405 }
406
407 commentURIs <- commentURI
408 }(i)
409 }
410
411 wg.Wait()
412 close(errors)
413 close(commentURIs)
414
415 // Check for errors
416 var errorCount int
417 for err := range errors {
418 t.Logf("Error during concurrent commenting: %v", err)
419 errorCount++
420 }
421
422 if errorCount > 0 {
423 t.Errorf("Expected no errors during concurrent commenting, got %d errors", errorCount)
424 }
425
426 // Verify post comment count updated correctly
427 post, err := postRepo.GetByURI(ctx, postURI)
428 if err != nil {
429 t.Fatalf("Failed to get post: %v", err)
430 }
431
432 if post.CommentCount != numCommenters {
433 t.Errorf("Expected comment_count = %d, got %d (possible race condition in count update)", numCommenters, post.CommentCount)
434 }
435
436 // CRITICAL: Verify actual comment records to detect race conditions
437 var actualCommentCount int
438 var distinctCommenters int
439 err = db.QueryRow(`
440 SELECT COUNT(*), COUNT(DISTINCT author_did)
441 FROM comments
442 WHERE post_uri = $1 AND parent_comment_uri IS NULL
443 `, postURI).Scan(&actualCommentCount, &distinctCommenters)
444 if err != nil {
445 t.Fatalf("Failed to query comment records: %v", err)
446 }
447
448 if actualCommentCount != numCommenters {
449 t.Errorf("Expected %d comment records in database, got %d (possible race condition: comments lost or duplicated)", numCommenters, actualCommentCount)
450 }
451
452 if distinctCommenters != numCommenters {
453 t.Errorf("Expected %d distinct commenters, got %d (possible duplicate comments from same author)", numCommenters, distinctCommenters)
454 }
455
456 // Verify all comments are retrievable via service
457 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo)
458 response, err := commentService.GetComments(ctx, &comments.GetCommentsRequest{
459 PostURI: postURI,
460 Sort: "new",
461 Depth: 10,
462 Limit: 100,
463 ViewerDID: nil,
464 })
465 if err != nil {
466 t.Fatalf("Failed to get comments: %v", err)
467 }
468
469 if len(response.Comments) != numCommenters {
470 t.Errorf("Expected %d comments in response, got %d", numCommenters, len(response.Comments))
471 }
472
473 t.Logf("✓ %d concurrent comments processed correctly:", numCommenters)
474 t.Logf(" - Post comment_count: %d", post.CommentCount)
475 t.Logf(" - Database records: %d comments from %d distinct authors (no duplicates)", actualCommentCount, distinctCommenters)
476 })
477
478 t.Run("Concurrent replies to same comment", func(t *testing.T) {
479 // Create a parent comment first
480 parentCommentRKey := generateTID()
481 parentCommentURI := fmt.Sprintf("at://%s/social.coves.community.comment/%s", testUser.DID, parentCommentRKey)
482
483 parentEvent := &jetstream.JetstreamEvent{
484 Did: testUser.DID,
485 Kind: "commit",
486 Commit: &jetstream.CommitEvent{
487 Rev: "parent-rev",
488 Operation: "create",
489 Collection: "social.coves.community.comment",
490 RKey: parentCommentRKey,
491 CID: "bafyparent",
492 Record: map[string]interface{}{
493 "$type": "social.coves.community.comment",
494 "content": "Parent comment for replies",
495 "reply": map[string]interface{}{
496 "root": map[string]interface{}{
497 "uri": postURI,
498 "cid": "bafypost",
499 },
500 "parent": map[string]interface{}{
501 "uri": postURI,
502 "cid": "bafypost",
503 },
504 },
505 "createdAt": fixedTime.Format(time.RFC3339),
506 },
507 },
508 }
509
510 if err := commentConsumer.HandleEvent(ctx, parentEvent); err != nil {
511 t.Fatalf("Failed to create parent comment: %v", err)
512 }
513
514 // Now create concurrent replies
515 const numRepliers = 15
516 var wg sync.WaitGroup
517 wg.Add(numRepliers)
518 errors := make(chan error, numRepliers)
519
520 for i := 0; i < numRepliers; i++ {
521 go func(replierIndex int) {
522 defer wg.Done()
523
524 replierDID := fmt.Sprintf("did:plc:replier%d", replierIndex)
525 replyRKey := fmt.Sprintf("%s-reply%d", generateTID(), replierIndex)
526
527 replyEvent := &jetstream.JetstreamEvent{
528 Did: replierDID,
529 Kind: "commit",
530 Commit: &jetstream.CommitEvent{
531 Rev: fmt.Sprintf("rev-reply-%d", replierIndex),
532 Operation: "create",
533 Collection: "social.coves.community.comment",
534 RKey: replyRKey,
535 CID: fmt.Sprintf("bafyreply%d", replierIndex),
536 Record: map[string]interface{}{
537 "$type": "social.coves.community.comment",
538 "content": fmt.Sprintf("Concurrent reply #%d", replierIndex),
539 "reply": map[string]interface{}{
540 "root": map[string]interface{}{
541 "uri": postURI,
542 "cid": "bafypost",
543 },
544 "parent": map[string]interface{}{
545 "uri": parentCommentURI,
546 "cid": "bafyparent",
547 },
548 },
549 "createdAt": fixedTime.Add(time.Duration(replierIndex) * time.Millisecond).Format(time.RFC3339),
550 },
551 },
552 }
553
554 if handleErr := commentConsumer.HandleEvent(ctx, replyEvent); handleErr != nil {
555 errors <- fmt.Errorf("replier %d: failed to handle reply event: %w", replierIndex, handleErr)
556 }
557 }(i)
558 }
559
560 wg.Wait()
561 close(errors)
562
563 // Check for errors
564 var errorCount int
565 for err := range errors {
566 t.Logf("Error during concurrent replying: %v", err)
567 errorCount++
568 }
569
570 if errorCount > 0 {
571 t.Errorf("Expected no errors during concurrent replying, got %d errors", errorCount)
572 }
573
574 // Verify parent comment reply count
575 parentComment, err := commentRepo.GetByURI(ctx, parentCommentURI)
576 if err != nil {
577 t.Fatalf("Failed to get parent comment: %v", err)
578 }
579
580 if parentComment.ReplyCount != numRepliers {
581 t.Errorf("Expected reply_count = %d on parent comment, got %d (possible race condition)", numRepliers, parentComment.ReplyCount)
582 }
583
584 t.Logf("✓ %d concurrent replies processed correctly, reply_count=%d", numRepliers, parentComment.ReplyCount)
585 })
586}
587
588// TestConcurrentCommunityCreation tests race conditions when multiple goroutines
589// try to create communities with the same handle
590func TestConcurrentCommunityCreation_DuplicateHandle(t *testing.T) {
591 if testing.Short() {
592 t.Skip("Skipping integration test in short mode")
593 }
594
595 db := setupTestDB(t)
596 defer func() {
597 if err := db.Close(); err != nil {
598 t.Logf("Failed to close database: %v", err)
599 }
600 }()
601
602 ctx := context.Background()
603 repo := postgres.NewCommunityRepository(db)
604
605 t.Run("Concurrent creation with same handle should fail", func(t *testing.T) {
606 const numAttempts = 10
607 sameHandle := fmt.Sprintf("duplicate-handle-%d.test.coves.social", time.Now().UnixNano())
608
609 var wg sync.WaitGroup
610 wg.Add(numAttempts)
611
612 type result struct {
613 err error
614 success bool
615 }
616 results := make(chan result, numAttempts)
617
618 for i := 0; i < numAttempts; i++ {
619 go func(attemptIndex int) {
620 defer wg.Done()
621
622 // Each attempt uses a unique DID but same handle
623 uniqueDID := fmt.Sprintf("did:plc:dup-community-%d-%d", time.Now().UnixNano(), attemptIndex)
624
625 community := &communities.Community{
626 DID: uniqueDID,
627 Handle: sameHandle, // SAME HANDLE
628 Name: fmt.Sprintf("dup-test-%d", attemptIndex),
629 DisplayName: fmt.Sprintf("Duplicate Test %d", attemptIndex),
630 Description: "Testing duplicate handle prevention",
631 OwnerDID: "did:web:test.local",
632 CreatedByDID: "did:plc:creator",
633 HostedByDID: "did:web:test.local",
634 Visibility: "public",
635 CreatedAt: time.Now(),
636 UpdatedAt: time.Now(),
637 }
638
639 _, createErr := repo.Create(ctx, community)
640 results <- result{
641 success: createErr == nil,
642 err: createErr,
643 }
644 }(i)
645 }
646
647 wg.Wait()
648 close(results)
649
650 // Collect results
651 successCount := 0
652 duplicateErrors := 0
653
654 for res := range results {
655 if res.success {
656 successCount++
657 } else if communities.IsConflict(res.err) {
658 duplicateErrors++
659 } else {
660 t.Logf("Unexpected error type: %v", res.err)
661 }
662 }
663
664 // CRITICAL: Exactly ONE should succeed, rest should fail with duplicate error
665 if successCount != 1 {
666 t.Errorf("Expected exactly 1 successful creation, got %d (DATABASE CONSTRAINT VIOLATION - race condition detected)", successCount)
667 }
668
669 if duplicateErrors != numAttempts-1 {
670 t.Errorf("Expected %d duplicate errors, got %d", numAttempts-1, duplicateErrors)
671 }
672
673 t.Logf("✓ Duplicate handle protection: %d successful, %d duplicate errors (database constraint working)", successCount, duplicateErrors)
674 })
675
676 t.Run("Concurrent creation with different handles should succeed", func(t *testing.T) {
677 const numAttempts = 10
678 var wg sync.WaitGroup
679 wg.Add(numAttempts)
680
681 errors := make(chan error, numAttempts)
682
683 for i := 0; i < numAttempts; i++ {
684 go func(attemptIndex int) {
685 defer wg.Done()
686
687 uniqueSuffix := fmt.Sprintf("%d-%d", time.Now().UnixNano(), attemptIndex)
688 community := &communities.Community{
689 DID: generateTestDID(uniqueSuffix),
690 Handle: fmt.Sprintf("unique-handle-%s.test.coves.social", uniqueSuffix),
691 Name: fmt.Sprintf("unique-test-%s", uniqueSuffix),
692 DisplayName: fmt.Sprintf("Unique Test %d", attemptIndex),
693 Description: "Testing concurrent unique handle creation",
694 OwnerDID: "did:web:test.local",
695 CreatedByDID: "did:plc:creator",
696 HostedByDID: "did:web:test.local",
697 Visibility: "public",
698 CreatedAt: time.Now(),
699 UpdatedAt: time.Now(),
700 }
701
702 _, createErr := repo.Create(ctx, community)
703 if createErr != nil {
704 errors <- createErr
705 }
706 }(i)
707 }
708
709 wg.Wait()
710 close(errors)
711
712 // All should succeed
713 var errorCount int
714 for err := range errors {
715 t.Logf("Error during concurrent unique creation: %v", err)
716 errorCount++
717 }
718
719 if errorCount > 0 {
720 t.Errorf("Expected all %d creations to succeed, but %d failed", numAttempts, errorCount)
721 }
722
723 t.Logf("✓ All %d concurrent community creations with unique handles succeeded", numAttempts)
724 })
725}
726
727// TestConcurrentSubscription tests race conditions when multiple users subscribe
728// to the same community simultaneously
729func TestConcurrentSubscription_RaceConditions(t *testing.T) {
730 if testing.Short() {
731 t.Skip("Skipping integration test in short mode")
732 }
733
734 db := setupTestDB(t)
735 defer func() {
736 if err := db.Close(); err != nil {
737 t.Logf("Failed to close database: %v", err)
738 }
739 }()
740
741 ctx := context.Background()
742 communityRepo := postgres.NewCommunityRepository(db)
743 consumer := jetstream.NewCommunityEventConsumer(communityRepo, "did:web:coves.local", true, nil)
744
745 // Create test community
746 testDID := fmt.Sprintf("did:plc:test-sub-race-%d", time.Now().UnixNano())
747 community := &communities.Community{
748 DID: testDID,
749 Handle: fmt.Sprintf("sub-race-%d.test.coves.social", time.Now().UnixNano()),
750 Name: "sub-race-test",
751 DisplayName: "Subscription Race Test",
752 Description: "Testing subscription race conditions",
753 OwnerDID: "did:plc:owner",
754 CreatedByDID: "did:plc:creator",
755 HostedByDID: "did:web:coves.local",
756 Visibility: "public",
757 CreatedAt: time.Now(),
758 UpdatedAt: time.Now(),
759 }
760
761 created, err := communityRepo.Create(ctx, community)
762 if err != nil {
763 t.Fatalf("Failed to create test community: %v", err)
764 }
765
766 t.Run("Multiple users subscribing concurrently", func(t *testing.T) {
767 const numSubscribers = 30
768 var wg sync.WaitGroup
769 wg.Add(numSubscribers)
770
771 errors := make(chan error, numSubscribers)
772
773 for i := 0; i < numSubscribers; i++ {
774 go func(subscriberIndex int) {
775 defer wg.Done()
776
777 userDID := fmt.Sprintf("did:plc:subscriber%d", subscriberIndex)
778 rkey := fmt.Sprintf("sub-%d", subscriberIndex)
779
780 event := &jetstream.JetstreamEvent{
781 Did: userDID,
782 Kind: "commit",
783 TimeUS: time.Now().UnixMicro(),
784 Commit: &jetstream.CommitEvent{
785 Rev: fmt.Sprintf("rev-%d", subscriberIndex),
786 Operation: "create",
787 Collection: "social.coves.community.subscription",
788 RKey: rkey,
789 CID: fmt.Sprintf("bafysub%d", subscriberIndex),
790 Record: map[string]interface{}{
791 "$type": "social.coves.community.subscription",
792 "subject": created.DID,
793 "createdAt": time.Now().Format(time.RFC3339),
794 "contentVisibility": float64(3),
795 },
796 },
797 }
798
799 if handleErr := consumer.HandleEvent(ctx, event); handleErr != nil {
800 errors <- fmt.Errorf("subscriber %d: failed to subscribe: %w", subscriberIndex, handleErr)
801 }
802 }(i)
803 }
804
805 wg.Wait()
806 close(errors)
807
808 // Check for errors
809 var errorCount int
810 for err := range errors {
811 t.Logf("Error during concurrent subscription: %v", err)
812 errorCount++
813 }
814
815 if errorCount > 0 {
816 t.Errorf("Expected no errors during concurrent subscription, got %d errors", errorCount)
817 }
818
819 // Verify subscriber count is correct
820 updatedCommunity, err := communityRepo.GetByDID(ctx, created.DID)
821 if err != nil {
822 t.Fatalf("Failed to get updated community: %v", err)
823 }
824
825 if updatedCommunity.SubscriberCount != numSubscribers {
826 t.Errorf("Expected subscriber_count = %d, got %d (RACE CONDITION in subscriber count update)", numSubscribers, updatedCommunity.SubscriberCount)
827 }
828
829 // CRITICAL: Verify actual subscription records to detect race conditions
830 var actualSubscriptionCount int
831 var distinctSubscribers int
832 err = db.QueryRow(`
833 SELECT COUNT(*), COUNT(DISTINCT user_did)
834 FROM community_subscriptions
835 WHERE community_did = $1
836 `, created.DID).Scan(&actualSubscriptionCount, &distinctSubscribers)
837 if err != nil {
838 t.Fatalf("Failed to query subscription records: %v", err)
839 }
840
841 if actualSubscriptionCount != numSubscribers {
842 t.Errorf("Expected %d subscription records, got %d (possible race condition: subscriptions lost or duplicated)", numSubscribers, actualSubscriptionCount)
843 }
844
845 if distinctSubscribers != numSubscribers {
846 t.Errorf("Expected %d distinct subscribers, got %d (possible duplicate subscriptions)", numSubscribers, distinctSubscribers)
847 }
848
849 t.Logf("✓ %d concurrent subscriptions processed correctly:", numSubscribers)
850 t.Logf(" - Community subscriber_count: %d", updatedCommunity.SubscriberCount)
851 t.Logf(" - Database records: %d subscriptions from %d distinct users (no duplicates)", actualSubscriptionCount, distinctSubscribers)
852 })
853
854 t.Run("Concurrent subscribe and unsubscribe", func(t *testing.T) {
855 // Create new community for this test
856 testDID2 := fmt.Sprintf("did:plc:test-sub-unsub-%d", time.Now().UnixNano())
857 community2 := &communities.Community{
858 DID: testDID2,
859 Handle: fmt.Sprintf("sub-unsub-%d.test.coves.social", time.Now().UnixNano()),
860 Name: "sub-unsub-test",
861 DisplayName: "Subscribe/Unsubscribe Race Test",
862 Description: "Testing concurrent subscribe/unsubscribe",
863 OwnerDID: "did:plc:owner",
864 CreatedByDID: "did:plc:creator",
865 HostedByDID: "did:web:coves.local",
866 Visibility: "public",
867 CreatedAt: time.Now(),
868 UpdatedAt: time.Now(),
869 }
870
871 created2, err := communityRepo.Create(ctx, community2)
872 if err != nil {
873 t.Fatalf("Failed to create test community: %v", err)
874 }
875
876 const numUsers = 20
877 var wg sync.WaitGroup
878 wg.Add(numUsers * 2) // Each user subscribes then unsubscribes
879
880 errors := make(chan error, numUsers*2)
881
882 for i := 0; i < numUsers; i++ {
883 go func(userIndex int) {
884 userDID := fmt.Sprintf("did:plc:subunsubuser%d", userIndex)
885 rkey := fmt.Sprintf("subunsub-%d", userIndex)
886
887 // Subscribe
888 subscribeEvent := &jetstream.JetstreamEvent{
889 Did: userDID,
890 Kind: "commit",
891 TimeUS: time.Now().UnixMicro(),
892 Commit: &jetstream.CommitEvent{
893 Rev: fmt.Sprintf("rev-sub-%d", userIndex),
894 Operation: "create",
895 Collection: "social.coves.community.subscription",
896 RKey: rkey,
897 CID: fmt.Sprintf("bafysubscribe%d", userIndex),
898 Record: map[string]interface{}{
899 "$type": "social.coves.community.subscription",
900 "subject": created2.DID,
901 "createdAt": time.Now().Format(time.RFC3339),
902 "contentVisibility": float64(3),
903 },
904 },
905 }
906
907 if handleErr := consumer.HandleEvent(ctx, subscribeEvent); handleErr != nil {
908 errors <- fmt.Errorf("user %d: subscribe failed: %w", userIndex, handleErr)
909 }
910 wg.Done()
911
912 // Small delay to ensure subscribe happens first
913 time.Sleep(10 * time.Millisecond)
914
915 // Unsubscribe
916 unsubscribeEvent := &jetstream.JetstreamEvent{
917 Did: userDID,
918 Kind: "commit",
919 TimeUS: time.Now().UnixMicro(),
920 Commit: &jetstream.CommitEvent{
921 Rev: fmt.Sprintf("rev-unsub-%d", userIndex),
922 Operation: "delete",
923 Collection: "social.coves.community.subscription",
924 RKey: rkey,
925 CID: "",
926 Record: nil,
927 },
928 }
929
930 if handleErr := consumer.HandleEvent(ctx, unsubscribeEvent); handleErr != nil {
931 errors <- fmt.Errorf("user %d: unsubscribe failed: %w", userIndex, handleErr)
932 }
933 wg.Done()
934 }(i)
935 }
936
937 wg.Wait()
938 close(errors)
939
940 // Check for errors
941 var errorCount int
942 for err := range errors {
943 t.Logf("Error during concurrent sub/unsub: %v", err)
944 errorCount++
945 }
946
947 if errorCount > 0 {
948 t.Errorf("Expected no errors during concurrent sub/unsub, got %d errors", errorCount)
949 }
950
951 // Final subscriber count should be 0 (all unsubscribed)
952 finalCommunity, err := communityRepo.GetByDID(ctx, created2.DID)
953 if err != nil {
954 t.Fatalf("Failed to get final community: %v", err)
955 }
956
957 if finalCommunity.SubscriberCount != 0 {
958 t.Errorf("Expected subscriber_count = 0 after all unsubscribed, got %d (RACE CONDITION detected)", finalCommunity.SubscriberCount)
959 }
960
961 // CRITICAL: Verify no subscription records remain in database
962 var remainingSubscriptions int
963 err = db.QueryRow(`
964 SELECT COUNT(*)
965 FROM community_subscriptions
966 WHERE community_did = $1
967 `, created2.DID).Scan(&remainingSubscriptions)
968 if err != nil {
969 t.Fatalf("Failed to query subscription records: %v", err)
970 }
971
972 if remainingSubscriptions != 0 {
973 t.Errorf("Expected 0 subscription records after all unsubscribed, got %d (orphaned subscriptions detected)", remainingSubscriptions)
974 }
975
976 t.Logf("✓ Concurrent subscribe/unsubscribe handled correctly:")
977 t.Logf(" - Community subscriber_count: %d", finalCommunity.SubscriberCount)
978 t.Logf(" - Database records: %d subscriptions remaining (clean unsubscribe)", remainingSubscriptions)
979 })
980}