A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/atproto/jetstream" 5 "Coves/internal/core/communities" 6 postgresRepo "Coves/internal/db/postgres" 7 "context" 8 "database/sql" 9 "fmt" 10 "testing" 11 "time" 12) 13 14// TestCommunityBlocking_Indexing tests Jetstream indexing of block events 15func TestCommunityBlocking_Indexing(t *testing.T) { 16 if testing.Short() { 17 t.Skip("Skipping integration test in short mode") 18 } 19 20 ctx := context.Background() 21 db := setupTestDB(t) 22 defer cleanupBlockingTestDB(t, db) 23 24 repo := createBlockingTestCommunityRepo(t, db) 25 // Skip verification in tests 26 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true) 27 28 // Create test community 29 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 30 community := createBlockingTestCommunity(t, repo, "test-community-blocking", testDID) 31 32 t.Run("indexes block CREATE event", func(t *testing.T) { 33 userDID := "did:plc:test-user-blocker" 34 rkey := "test-block-1" 35 36 // Simulate Jetstream CREATE event 37 event := &jetstream.JetstreamEvent{ 38 Did: userDID, 39 Kind: "commit", 40 TimeUS: time.Now().UnixMicro(), 41 Commit: &jetstream.CommitEvent{ 42 Rev: "test-rev-1", 43 Operation: "create", 44 Collection: "social.coves.community.block", 45 RKey: rkey, 46 CID: "bafyblock123", 47 Record: map[string]interface{}{ 48 "$type": "social.coves.community.block", 49 "subject": community.DID, 50 "createdAt": time.Now().Format(time.RFC3339), 51 }, 52 }, 53 } 54 55 // Process event 56 err := consumer.HandleEvent(ctx, event) 57 if err != nil { 58 t.Fatalf("Failed to handle block event: %v", err) 59 } 60 61 // Verify block indexed 62 block, err := repo.GetBlock(ctx, userDID, community.DID) 63 if err != nil { 64 t.Fatalf("Failed to get block: %v", err) 65 } 66 67 if block.UserDID != userDID { 68 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 69 } 70 if block.CommunityDID != community.DID { 71 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 72 } 73 74 // Verify IsBlocked works 75 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 76 if err != nil { 77 t.Fatalf("IsBlocked failed: %v", err) 78 } 79 if !isBlocked { 80 t.Error("Expected IsBlocked=true, got false") 81 } 82 }) 83 84 t.Run("indexes block DELETE event", func(t *testing.T) { 85 userDID := "did:plc:test-user-unblocker" 86 rkey := "test-block-2" 87 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, rkey) 88 89 // First create a block 90 block := &communities.CommunityBlock{ 91 UserDID: userDID, 92 CommunityDID: community.DID, 93 BlockedAt: time.Now(), 94 RecordURI: uri, 95 RecordCID: "bafyblock456", 96 } 97 _, err := repo.BlockCommunity(ctx, block) 98 if err != nil { 99 t.Fatalf("Failed to create block: %v", err) 100 } 101 102 // Simulate DELETE event 103 event := &jetstream.JetstreamEvent{ 104 Did: userDID, 105 Kind: "commit", 106 TimeUS: time.Now().UnixMicro(), 107 Commit: &jetstream.CommitEvent{ 108 Rev: "test-rev-2", 109 Operation: "delete", 110 Collection: "social.coves.community.block", 111 RKey: rkey, 112 }, 113 } 114 115 // Process delete 116 err = consumer.HandleEvent(ctx, event) 117 if err != nil { 118 t.Fatalf("Failed to handle delete event: %v", err) 119 } 120 121 // Verify block removed 122 _, err = repo.GetBlock(ctx, userDID, community.DID) 123 if !communities.IsNotFound(err) { 124 t.Error("Expected block to be deleted") 125 } 126 127 // Verify IsBlocked returns false 128 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 129 if err != nil { 130 t.Fatalf("IsBlocked failed: %v", err) 131 } 132 if isBlocked { 133 t.Error("Expected IsBlocked=false, got true") 134 } 135 }) 136 137 t.Run("block is idempotent", func(t *testing.T) { 138 userDID := "did:plc:test-user-idempotent" 139 rkey := "test-block-3" 140 141 event := &jetstream.JetstreamEvent{ 142 Did: userDID, 143 Kind: "commit", 144 TimeUS: time.Now().UnixMicro(), 145 Commit: &jetstream.CommitEvent{ 146 Rev: "test-rev-3", 147 Operation: "create", 148 Collection: "social.coves.community.block", 149 RKey: rkey, 150 CID: "bafyblock789", 151 Record: map[string]interface{}{ 152 "$type": "social.coves.community.block", 153 "subject": community.DID, 154 "createdAt": time.Now().Format(time.RFC3339), 155 }, 156 }, 157 } 158 159 // Process event twice 160 err := consumer.HandleEvent(ctx, event) 161 if err != nil { 162 t.Fatalf("First block failed: %v", err) 163 } 164 165 err = consumer.HandleEvent(ctx, event) 166 if err != nil { 167 t.Fatalf("Second block (idempotent) failed: %v", err) 168 } 169 170 // Should still exist only once 171 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0) 172 if err != nil { 173 t.Fatalf("ListBlockedCommunities failed: %v", err) 174 } 175 if len(blocks) != 1 { 176 t.Errorf("Expected 1 block, got %d", len(blocks)) 177 } 178 }) 179 180 t.Run("handles DELETE of non-existent block gracefully", func(t *testing.T) { 181 userDID := "did:plc:test-user-nonexistent" 182 rkey := "test-block-nonexistent" 183 184 // Simulate DELETE event for block that doesn't exist 185 event := &jetstream.JetstreamEvent{ 186 Did: userDID, 187 Kind: "commit", 188 TimeUS: time.Now().UnixMicro(), 189 Commit: &jetstream.CommitEvent{ 190 Rev: "test-rev-99", 191 Operation: "delete", 192 Collection: "social.coves.community.block", 193 RKey: rkey, 194 }, 195 } 196 197 // Should not error (idempotent) 198 err := consumer.HandleEvent(ctx, event) 199 if err != nil { 200 t.Errorf("DELETE of non-existent block should be idempotent, got error: %v", err) 201 } 202 }) 203} 204 205// TestCommunityBlocking_ListBlocked tests listing blocked communities 206func TestCommunityBlocking_ListBlocked(t *testing.T) { 207 if testing.Short() { 208 t.Skip("Skipping integration test in short mode") 209 } 210 211 ctx := context.Background() 212 db := setupTestDB(t) 213 defer cleanupBlockingTestDB(t, db) 214 215 repo := createBlockingTestCommunityRepo(t, db) 216 userDID := "did:plc:test-user-list" 217 218 // Create and block 3 communities 219 testCommunities := make([]*communities.Community, 3) 220 for i := 0; i < 3; i++ { 221 communityDID := fmt.Sprintf("did:plc:test-community-list-%d", i) 222 testCommunities[i] = createBlockingTestCommunity(t, repo, fmt.Sprintf("community-list-%d", i), communityDID) 223 224 block := &communities.CommunityBlock{ 225 UserDID: userDID, 226 CommunityDID: testCommunities[i].DID, 227 BlockedAt: time.Now(), 228 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/%d", userDID, i), 229 RecordCID: fmt.Sprintf("bafyblock%d", i), 230 } 231 _, err := repo.BlockCommunity(ctx, block) 232 if err != nil { 233 t.Fatalf("Failed to block community %d: %v", i, err) 234 } 235 } 236 237 t.Run("lists all blocked communities", func(t *testing.T) { 238 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0) 239 if err != nil { 240 t.Fatalf("ListBlockedCommunities failed: %v", err) 241 } 242 243 if len(blocks) != 3 { 244 t.Errorf("Expected 3 blocks, got %d", len(blocks)) 245 } 246 247 // Verify all blocks belong to correct user 248 for _, block := range blocks { 249 if block.UserDID != userDID { 250 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 251 } 252 } 253 }) 254 255 t.Run("pagination works correctly", func(t *testing.T) { 256 // Get first 2 257 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 2, 0) 258 if err != nil { 259 t.Fatalf("ListBlockedCommunities with limit failed: %v", err) 260 } 261 if len(blocks) != 2 { 262 t.Errorf("Expected 2 blocks (paginated), got %d", len(blocks)) 263 } 264 265 // Get next 2 (should only get 1) 266 blocksPage2, err := repo.ListBlockedCommunities(ctx, userDID, 2, 2) 267 if err != nil { 268 t.Fatalf("ListBlockedCommunities page 2 failed: %v", err) 269 } 270 if len(blocksPage2) != 1 { 271 t.Errorf("Expected 1 block on page 2, got %d", len(blocksPage2)) 272 } 273 }) 274 275 t.Run("returns empty list for user with no blocks", func(t *testing.T) { 276 blocks, err := repo.ListBlockedCommunities(ctx, "did:plc:user-no-blocks", 10, 0) 277 if err != nil { 278 t.Fatalf("ListBlockedCommunities failed: %v", err) 279 } 280 if len(blocks) != 0 { 281 t.Errorf("Expected 0 blocks, got %d", len(blocks)) 282 } 283 }) 284} 285 286// TestCommunityBlocking_IsBlocked tests the fast block check 287func TestCommunityBlocking_IsBlocked(t *testing.T) { 288 if testing.Short() { 289 t.Skip("Skipping integration test in short mode") 290 } 291 292 ctx := context.Background() 293 db := setupTestDB(t) 294 defer cleanupBlockingTestDB(t, db) 295 296 repo := createBlockingTestCommunityRepo(t, db) 297 298 userDID := "did:plc:test-user-isblocked" 299 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 300 community := createBlockingTestCommunity(t, repo, "test-community-isblocked", communityDID) 301 302 t.Run("returns false when not blocked", func(t *testing.T) { 303 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 304 if err != nil { 305 t.Fatalf("IsBlocked failed: %v", err) 306 } 307 if isBlocked { 308 t.Error("Expected IsBlocked=false, got true") 309 } 310 }) 311 312 t.Run("returns true when blocked", func(t *testing.T) { 313 // Create block 314 block := &communities.CommunityBlock{ 315 UserDID: userDID, 316 CommunityDID: community.DID, 317 BlockedAt: time.Now(), 318 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/test", userDID), 319 RecordCID: "bafyblocktest", 320 } 321 _, err := repo.BlockCommunity(ctx, block) 322 if err != nil { 323 t.Fatalf("Failed to create block: %v", err) 324 } 325 326 // Check IsBlocked 327 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 328 if err != nil { 329 t.Fatalf("IsBlocked failed: %v", err) 330 } 331 if !isBlocked { 332 t.Error("Expected IsBlocked=true, got false") 333 } 334 }) 335 336 t.Run("returns false after unblock", func(t *testing.T) { 337 // Unblock 338 err := repo.UnblockCommunity(ctx, userDID, community.DID) 339 if err != nil { 340 t.Fatalf("UnblockCommunity failed: %v", err) 341 } 342 343 // Check IsBlocked 344 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 345 if err != nil { 346 t.Fatalf("IsBlocked failed: %v", err) 347 } 348 if isBlocked { 349 t.Error("Expected IsBlocked=false after unblock, got true") 350 } 351 }) 352} 353 354// TestCommunityBlocking_GetBlock tests block retrieval 355func TestCommunityBlocking_GetBlock(t *testing.T) { 356 if testing.Short() { 357 t.Skip("Skipping integration test in short mode") 358 } 359 360 ctx := context.Background() 361 db := setupTestDB(t) 362 defer cleanupBlockingTestDB(t, db) 363 364 repo := createBlockingTestCommunityRepo(t, db) 365 366 userDID := "did:plc:test-user-getblock" 367 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 368 community := createBlockingTestCommunity(t, repo, "test-community-getblock", communityDID) 369 370 t.Run("returns error when block doesn't exist", func(t *testing.T) { 371 _, err := repo.GetBlock(ctx, userDID, community.DID) 372 if !communities.IsNotFound(err) { 373 t.Errorf("Expected ErrBlockNotFound, got: %v", err) 374 } 375 }) 376 377 t.Run("retrieves block by user and community DID", func(t *testing.T) { 378 // Create block 379 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID) 380 originalBlock := &communities.CommunityBlock{ 381 UserDID: userDID, 382 CommunityDID: community.DID, 383 BlockedAt: time.Now(), 384 RecordURI: recordURI, 385 RecordCID: "bafyblockgettest", 386 } 387 _, err := repo.BlockCommunity(ctx, originalBlock) 388 if err != nil { 389 t.Fatalf("Failed to create block: %v", err) 390 } 391 392 // Retrieve by user+community 393 block, err := repo.GetBlock(ctx, userDID, community.DID) 394 if err != nil { 395 t.Fatalf("GetBlock failed: %v", err) 396 } 397 398 if block.UserDID != userDID { 399 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 400 } 401 if block.CommunityDID != community.DID { 402 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 403 } 404 if block.RecordURI != recordURI { 405 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI) 406 } 407 }) 408 409 t.Run("retrieves block by URI", func(t *testing.T) { 410 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID) 411 412 // Retrieve by URI 413 block, err := repo.GetBlockByURI(ctx, recordURI) 414 if err != nil { 415 t.Fatalf("GetBlockByURI failed: %v", err) 416 } 417 418 if block.RecordURI != recordURI { 419 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI) 420 } 421 if block.CommunityDID != community.DID { 422 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 423 } 424 }) 425} 426 427// Helper functions for blocking tests 428 429func createBlockingTestCommunityRepo(t *testing.T, db *sql.DB) communities.Repository { 430 return postgresRepo.NewCommunityRepository(db) 431} 432 433func createBlockingTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community { 434 community := &communities.Community{ 435 DID: did, 436 Handle: fmt.Sprintf("!%s@coves.test", name), 437 Name: name, 438 DisplayName: fmt.Sprintf("Test Community %s", name), 439 Description: "Test community for blocking tests", 440 OwnerDID: did, 441 CreatedByDID: "did:plc:test-creator", 442 HostedByDID: "did:plc:test-instance", 443 Visibility: "public", 444 CreatedAt: time.Now(), 445 UpdatedAt: time.Now(), 446 } 447 448 created, err := repo.Create(context.Background(), community) 449 if err != nil { 450 t.Fatalf("Failed to create test community: %v", err) 451 } 452 453 return created 454} 455 456func cleanupBlockingTestDB(t *testing.T, db *sql.DB) { 457 // Clean up test data 458 _, err := db.Exec("DELETE FROM community_blocks WHERE user_did LIKE 'did:plc:test-%'") 459 if err != nil { 460 t.Logf("Warning: Failed to clean up blocks: %v", err) 461 } 462 463 _, err = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:test-community-%'") 464 if err != nil { 465 t.Logf("Warning: Failed to clean up communities: %v", err) 466 } 467 468 if closeErr := db.Close(); closeErr != nil { 469 t.Logf("Failed to close database: %v", closeErr) 470 } 471}