A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/atproto/jetstream" 5 "Coves/internal/core/communities" 6 postgresRepo "Coves/internal/db/postgres" 7 "context" 8 "database/sql" 9 "fmt" 10 "testing" 11 "time" 12) 13 14// TestCommunityBlocking_Indexing tests Jetstream indexing of block events 15func TestCommunityBlocking_Indexing(t *testing.T) { 16 if testing.Short() { 17 t.Skip("Skipping integration test in short mode") 18 } 19 20 ctx := context.Background() 21 db := setupTestDB(t) 22 defer cleanupBlockingTestDB(t, db) 23 24 repo := createBlockingTestCommunityRepo(t, db) 25 consumer := jetstream.NewCommunityEventConsumer(repo) 26 27 // Create test community 28 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 29 community := createBlockingTestCommunity(t, repo, "test-community-blocking", testDID) 30 31 t.Run("indexes block CREATE event", func(t *testing.T) { 32 userDID := "did:plc:test-user-blocker" 33 rkey := "test-block-1" 34 35 // Simulate Jetstream CREATE event 36 event := &jetstream.JetstreamEvent{ 37 Did: userDID, 38 Kind: "commit", 39 TimeUS: time.Now().UnixMicro(), 40 Commit: &jetstream.CommitEvent{ 41 Rev: "test-rev-1", 42 Operation: "create", 43 Collection: "social.coves.community.block", 44 RKey: rkey, 45 CID: "bafyblock123", 46 Record: map[string]interface{}{ 47 "$type": "social.coves.community.block", 48 "subject": community.DID, 49 "createdAt": time.Now().Format(time.RFC3339), 50 }, 51 }, 52 } 53 54 // Process event 55 err := consumer.HandleEvent(ctx, event) 56 if err != nil { 57 t.Fatalf("Failed to handle block event: %v", err) 58 } 59 60 // Verify block indexed 61 block, err := repo.GetBlock(ctx, userDID, community.DID) 62 if err != nil { 63 t.Fatalf("Failed to get block: %v", err) 64 } 65 66 if block.UserDID != userDID { 67 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 68 } 69 if block.CommunityDID != community.DID { 70 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 71 } 72 73 // Verify IsBlocked works 74 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 75 if err != nil { 76 t.Fatalf("IsBlocked failed: %v", err) 77 } 78 if !isBlocked { 79 t.Error("Expected IsBlocked=true, got false") 80 } 81 }) 82 83 t.Run("indexes block DELETE event", func(t *testing.T) { 84 userDID := "did:plc:test-user-unblocker" 85 rkey := "test-block-2" 86 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, rkey) 87 88 // First create a block 89 block := &communities.CommunityBlock{ 90 UserDID: userDID, 91 CommunityDID: community.DID, 92 BlockedAt: time.Now(), 93 RecordURI: uri, 94 RecordCID: "bafyblock456", 95 } 96 _, err := repo.BlockCommunity(ctx, block) 97 if err != nil { 98 t.Fatalf("Failed to create block: %v", err) 99 } 100 101 // Simulate DELETE event 102 event := &jetstream.JetstreamEvent{ 103 Did: userDID, 104 Kind: "commit", 105 TimeUS: time.Now().UnixMicro(), 106 Commit: &jetstream.CommitEvent{ 107 Rev: "test-rev-2", 108 Operation: "delete", 109 Collection: "social.coves.community.block", 110 RKey: rkey, 111 }, 112 } 113 114 // Process delete 115 err = consumer.HandleEvent(ctx, event) 116 if err != nil { 117 t.Fatalf("Failed to handle delete event: %v", err) 118 } 119 120 // Verify block removed 121 _, err = repo.GetBlock(ctx, userDID, community.DID) 122 if !communities.IsNotFound(err) { 123 t.Error("Expected block to be deleted") 124 } 125 126 // Verify IsBlocked returns false 127 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 128 if err != nil { 129 t.Fatalf("IsBlocked failed: %v", err) 130 } 131 if isBlocked { 132 t.Error("Expected IsBlocked=false, got true") 133 } 134 }) 135 136 t.Run("block is idempotent", func(t *testing.T) { 137 userDID := "did:plc:test-user-idempotent" 138 rkey := "test-block-3" 139 140 event := &jetstream.JetstreamEvent{ 141 Did: userDID, 142 Kind: "commit", 143 TimeUS: time.Now().UnixMicro(), 144 Commit: &jetstream.CommitEvent{ 145 Rev: "test-rev-3", 146 Operation: "create", 147 Collection: "social.coves.community.block", 148 RKey: rkey, 149 CID: "bafyblock789", 150 Record: map[string]interface{}{ 151 "$type": "social.coves.community.block", 152 "subject": community.DID, 153 "createdAt": time.Now().Format(time.RFC3339), 154 }, 155 }, 156 } 157 158 // Process event twice 159 err := consumer.HandleEvent(ctx, event) 160 if err != nil { 161 t.Fatalf("First block failed: %v", err) 162 } 163 164 err = consumer.HandleEvent(ctx, event) 165 if err != nil { 166 t.Fatalf("Second block (idempotent) failed: %v", err) 167 } 168 169 // Should still exist only once 170 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0) 171 if err != nil { 172 t.Fatalf("ListBlockedCommunities failed: %v", err) 173 } 174 if len(blocks) != 1 { 175 t.Errorf("Expected 1 block, got %d", len(blocks)) 176 } 177 }) 178 179 t.Run("handles DELETE of non-existent block gracefully", func(t *testing.T) { 180 userDID := "did:plc:test-user-nonexistent" 181 rkey := "test-block-nonexistent" 182 183 // Simulate DELETE event for block that doesn't exist 184 event := &jetstream.JetstreamEvent{ 185 Did: userDID, 186 Kind: "commit", 187 TimeUS: time.Now().UnixMicro(), 188 Commit: &jetstream.CommitEvent{ 189 Rev: "test-rev-99", 190 Operation: "delete", 191 Collection: "social.coves.community.block", 192 RKey: rkey, 193 }, 194 } 195 196 // Should not error (idempotent) 197 err := consumer.HandleEvent(ctx, event) 198 if err != nil { 199 t.Errorf("DELETE of non-existent block should be idempotent, got error: %v", err) 200 } 201 }) 202} 203 204// TestCommunityBlocking_ListBlocked tests listing blocked communities 205func TestCommunityBlocking_ListBlocked(t *testing.T) { 206 if testing.Short() { 207 t.Skip("Skipping integration test in short mode") 208 } 209 210 ctx := context.Background() 211 db := setupTestDB(t) 212 defer cleanupBlockingTestDB(t, db) 213 214 repo := createBlockingTestCommunityRepo(t, db) 215 userDID := "did:plc:test-user-list" 216 217 // Create and block 3 communities 218 testCommunities := make([]*communities.Community, 3) 219 for i := 0; i < 3; i++ { 220 communityDID := fmt.Sprintf("did:plc:test-community-list-%d", i) 221 testCommunities[i] = createBlockingTestCommunity(t, repo, fmt.Sprintf("community-list-%d", i), communityDID) 222 223 block := &communities.CommunityBlock{ 224 UserDID: userDID, 225 CommunityDID: testCommunities[i].DID, 226 BlockedAt: time.Now(), 227 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/%d", userDID, i), 228 RecordCID: fmt.Sprintf("bafyblock%d", i), 229 } 230 _, err := repo.BlockCommunity(ctx, block) 231 if err != nil { 232 t.Fatalf("Failed to block community %d: %v", i, err) 233 } 234 } 235 236 t.Run("lists all blocked communities", func(t *testing.T) { 237 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0) 238 if err != nil { 239 t.Fatalf("ListBlockedCommunities failed: %v", err) 240 } 241 242 if len(blocks) != 3 { 243 t.Errorf("Expected 3 blocks, got %d", len(blocks)) 244 } 245 246 // Verify all blocks belong to correct user 247 for _, block := range blocks { 248 if block.UserDID != userDID { 249 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 250 } 251 } 252 }) 253 254 t.Run("pagination works correctly", func(t *testing.T) { 255 // Get first 2 256 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 2, 0) 257 if err != nil { 258 t.Fatalf("ListBlockedCommunities with limit failed: %v", err) 259 } 260 if len(blocks) != 2 { 261 t.Errorf("Expected 2 blocks (paginated), got %d", len(blocks)) 262 } 263 264 // Get next 2 (should only get 1) 265 blocksPage2, err := repo.ListBlockedCommunities(ctx, userDID, 2, 2) 266 if err != nil { 267 t.Fatalf("ListBlockedCommunities page 2 failed: %v", err) 268 } 269 if len(blocksPage2) != 1 { 270 t.Errorf("Expected 1 block on page 2, got %d", len(blocksPage2)) 271 } 272 }) 273 274 t.Run("returns empty list for user with no blocks", func(t *testing.T) { 275 blocks, err := repo.ListBlockedCommunities(ctx, "did:plc:user-no-blocks", 10, 0) 276 if err != nil { 277 t.Fatalf("ListBlockedCommunities failed: %v", err) 278 } 279 if len(blocks) != 0 { 280 t.Errorf("Expected 0 blocks, got %d", len(blocks)) 281 } 282 }) 283} 284 285// TestCommunityBlocking_IsBlocked tests the fast block check 286func TestCommunityBlocking_IsBlocked(t *testing.T) { 287 if testing.Short() { 288 t.Skip("Skipping integration test in short mode") 289 } 290 291 ctx := context.Background() 292 db := setupTestDB(t) 293 defer cleanupBlockingTestDB(t, db) 294 295 repo := createBlockingTestCommunityRepo(t, db) 296 297 userDID := "did:plc:test-user-isblocked" 298 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 299 community := createBlockingTestCommunity(t, repo, "test-community-isblocked", communityDID) 300 301 t.Run("returns false when not blocked", func(t *testing.T) { 302 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 303 if err != nil { 304 t.Fatalf("IsBlocked failed: %v", err) 305 } 306 if isBlocked { 307 t.Error("Expected IsBlocked=false, got true") 308 } 309 }) 310 311 t.Run("returns true when blocked", func(t *testing.T) { 312 // Create block 313 block := &communities.CommunityBlock{ 314 UserDID: userDID, 315 CommunityDID: community.DID, 316 BlockedAt: time.Now(), 317 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/test", userDID), 318 RecordCID: "bafyblocktest", 319 } 320 _, err := repo.BlockCommunity(ctx, block) 321 if err != nil { 322 t.Fatalf("Failed to create block: %v", err) 323 } 324 325 // Check IsBlocked 326 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 327 if err != nil { 328 t.Fatalf("IsBlocked failed: %v", err) 329 } 330 if !isBlocked { 331 t.Error("Expected IsBlocked=true, got false") 332 } 333 }) 334 335 t.Run("returns false after unblock", func(t *testing.T) { 336 // Unblock 337 err := repo.UnblockCommunity(ctx, userDID, community.DID) 338 if err != nil { 339 t.Fatalf("UnblockCommunity failed: %v", err) 340 } 341 342 // Check IsBlocked 343 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 344 if err != nil { 345 t.Fatalf("IsBlocked failed: %v", err) 346 } 347 if isBlocked { 348 t.Error("Expected IsBlocked=false after unblock, got true") 349 } 350 }) 351} 352 353// TestCommunityBlocking_GetBlock tests block retrieval 354func TestCommunityBlocking_GetBlock(t *testing.T) { 355 if testing.Short() { 356 t.Skip("Skipping integration test in short mode") 357 } 358 359 ctx := context.Background() 360 db := setupTestDB(t) 361 defer cleanupBlockingTestDB(t, db) 362 363 repo := createBlockingTestCommunityRepo(t, db) 364 365 userDID := "did:plc:test-user-getblock" 366 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 367 community := createBlockingTestCommunity(t, repo, "test-community-getblock", communityDID) 368 369 t.Run("returns error when block doesn't exist", func(t *testing.T) { 370 _, err := repo.GetBlock(ctx, userDID, community.DID) 371 if !communities.IsNotFound(err) { 372 t.Errorf("Expected ErrBlockNotFound, got: %v", err) 373 } 374 }) 375 376 t.Run("retrieves block by user and community DID", func(t *testing.T) { 377 // Create block 378 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID) 379 originalBlock := &communities.CommunityBlock{ 380 UserDID: userDID, 381 CommunityDID: community.DID, 382 BlockedAt: time.Now(), 383 RecordURI: recordURI, 384 RecordCID: "bafyblockgettest", 385 } 386 _, err := repo.BlockCommunity(ctx, originalBlock) 387 if err != nil { 388 t.Fatalf("Failed to create block: %v", err) 389 } 390 391 // Retrieve by user+community 392 block, err := repo.GetBlock(ctx, userDID, community.DID) 393 if err != nil { 394 t.Fatalf("GetBlock failed: %v", err) 395 } 396 397 if block.UserDID != userDID { 398 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 399 } 400 if block.CommunityDID != community.DID { 401 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 402 } 403 if block.RecordURI != recordURI { 404 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI) 405 } 406 }) 407 408 t.Run("retrieves block by URI", func(t *testing.T) { 409 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID) 410 411 // Retrieve by URI 412 block, err := repo.GetBlockByURI(ctx, recordURI) 413 if err != nil { 414 t.Fatalf("GetBlockByURI failed: %v", err) 415 } 416 417 if block.RecordURI != recordURI { 418 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI) 419 } 420 if block.CommunityDID != community.DID { 421 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 422 } 423 }) 424} 425 426// Helper functions for blocking tests 427 428func createBlockingTestCommunityRepo(t *testing.T, db *sql.DB) communities.Repository { 429 return postgresRepo.NewCommunityRepository(db) 430} 431 432func createBlockingTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community { 433 community := &communities.Community{ 434 DID: did, 435 Handle: fmt.Sprintf("!%s@coves.test", name), 436 Name: name, 437 DisplayName: fmt.Sprintf("Test Community %s", name), 438 Description: "Test community for blocking tests", 439 OwnerDID: did, 440 CreatedByDID: "did:plc:test-creator", 441 HostedByDID: "did:plc:test-instance", 442 Visibility: "public", 443 CreatedAt: time.Now(), 444 UpdatedAt: time.Now(), 445 } 446 447 created, err := repo.Create(context.Background(), community) 448 if err != nil { 449 t.Fatalf("Failed to create test community: %v", err) 450 } 451 452 return created 453} 454 455func cleanupBlockingTestDB(t *testing.T, db *sql.DB) { 456 // Clean up test data 457 _, err := db.Exec("DELETE FROM community_blocks WHERE user_did LIKE 'did:plc:test-%'") 458 if err != nil { 459 t.Logf("Warning: Failed to clean up blocks: %v", err) 460 } 461 462 _, err = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:test-community-%'") 463 if err != nil { 464 t.Logf("Warning: Failed to clean up communities: %v", err) 465 } 466 467 if closeErr := db.Close(); closeErr != nil { 468 t.Logf("Failed to close database: %v", closeErr) 469 } 470}