A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/atproto/jetstream" 5 "Coves/internal/core/communities" 6 "context" 7 "database/sql" 8 "fmt" 9 "testing" 10 "time" 11 12 postgresRepo "Coves/internal/db/postgres" 13) 14 15// TestCommunityBlocking_Indexing tests Jetstream indexing of block events 16func TestCommunityBlocking_Indexing(t *testing.T) { 17 if testing.Short() { 18 t.Skip("Skipping integration test in short mode") 19 } 20 21 ctx := context.Background() 22 db := setupTestDB(t) 23 defer cleanupBlockingTestDB(t, db) 24 25 repo := createBlockingTestCommunityRepo(t, db) 26 // Skip verification in tests 27 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true) 28 29 // Create test community 30 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 31 community := createBlockingTestCommunity(t, repo, "test-community-blocking", testDID) 32 33 t.Run("indexes block CREATE event", func(t *testing.T) { 34 userDID := "did:plc:test-user-blocker" 35 rkey := "test-block-1" 36 37 // Simulate Jetstream CREATE event 38 event := &jetstream.JetstreamEvent{ 39 Did: userDID, 40 Kind: "commit", 41 TimeUS: time.Now().UnixMicro(), 42 Commit: &jetstream.CommitEvent{ 43 Rev: "test-rev-1", 44 Operation: "create", 45 Collection: "social.coves.community.block", 46 RKey: rkey, 47 CID: "bafyblock123", 48 Record: map[string]interface{}{ 49 "$type": "social.coves.community.block", 50 "subject": community.DID, 51 "createdAt": time.Now().Format(time.RFC3339), 52 }, 53 }, 54 } 55 56 // Process event 57 err := consumer.HandleEvent(ctx, event) 58 if err != nil { 59 t.Fatalf("Failed to handle block event: %v", err) 60 } 61 62 // Verify block indexed 63 block, err := repo.GetBlock(ctx, userDID, community.DID) 64 if err != nil { 65 t.Fatalf("Failed to get block: %v", err) 66 } 67 68 if block.UserDID != userDID { 69 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 70 } 71 if block.CommunityDID != community.DID { 72 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 73 } 74 75 // Verify IsBlocked works 76 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 77 if err != nil { 78 t.Fatalf("IsBlocked failed: %v", err) 79 } 80 if !isBlocked { 81 t.Error("Expected IsBlocked=true, got false") 82 } 83 }) 84 85 t.Run("indexes block DELETE event", func(t *testing.T) { 86 userDID := "did:plc:test-user-unblocker" 87 rkey := "test-block-2" 88 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, rkey) 89 90 // First create a block 91 block := &communities.CommunityBlock{ 92 UserDID: userDID, 93 CommunityDID: community.DID, 94 BlockedAt: time.Now(), 95 RecordURI: uri, 96 RecordCID: "bafyblock456", 97 } 98 _, err := repo.BlockCommunity(ctx, block) 99 if err != nil { 100 t.Fatalf("Failed to create block: %v", err) 101 } 102 103 // Simulate DELETE event 104 event := &jetstream.JetstreamEvent{ 105 Did: userDID, 106 Kind: "commit", 107 TimeUS: time.Now().UnixMicro(), 108 Commit: &jetstream.CommitEvent{ 109 Rev: "test-rev-2", 110 Operation: "delete", 111 Collection: "social.coves.community.block", 112 RKey: rkey, 113 }, 114 } 115 116 // Process delete 117 err = consumer.HandleEvent(ctx, event) 118 if err != nil { 119 t.Fatalf("Failed to handle delete event: %v", err) 120 } 121 122 // Verify block removed 123 _, err = repo.GetBlock(ctx, userDID, community.DID) 124 if !communities.IsNotFound(err) { 125 t.Error("Expected block to be deleted") 126 } 127 128 // Verify IsBlocked returns false 129 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 130 if err != nil { 131 t.Fatalf("IsBlocked failed: %v", err) 132 } 133 if isBlocked { 134 t.Error("Expected IsBlocked=false, got true") 135 } 136 }) 137 138 t.Run("block is idempotent", func(t *testing.T) { 139 userDID := "did:plc:test-user-idempotent" 140 rkey := "test-block-3" 141 142 event := &jetstream.JetstreamEvent{ 143 Did: userDID, 144 Kind: "commit", 145 TimeUS: time.Now().UnixMicro(), 146 Commit: &jetstream.CommitEvent{ 147 Rev: "test-rev-3", 148 Operation: "create", 149 Collection: "social.coves.community.block", 150 RKey: rkey, 151 CID: "bafyblock789", 152 Record: map[string]interface{}{ 153 "$type": "social.coves.community.block", 154 "subject": community.DID, 155 "createdAt": time.Now().Format(time.RFC3339), 156 }, 157 }, 158 } 159 160 // Process event twice 161 err := consumer.HandleEvent(ctx, event) 162 if err != nil { 163 t.Fatalf("First block failed: %v", err) 164 } 165 166 err = consumer.HandleEvent(ctx, event) 167 if err != nil { 168 t.Fatalf("Second block (idempotent) failed: %v", err) 169 } 170 171 // Should still exist only once 172 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0) 173 if err != nil { 174 t.Fatalf("ListBlockedCommunities failed: %v", err) 175 } 176 if len(blocks) != 1 { 177 t.Errorf("Expected 1 block, got %d", len(blocks)) 178 } 179 }) 180 181 t.Run("handles DELETE of non-existent block gracefully", func(t *testing.T) { 182 userDID := "did:plc:test-user-nonexistent" 183 rkey := "test-block-nonexistent" 184 185 // Simulate DELETE event for block that doesn't exist 186 event := &jetstream.JetstreamEvent{ 187 Did: userDID, 188 Kind: "commit", 189 TimeUS: time.Now().UnixMicro(), 190 Commit: &jetstream.CommitEvent{ 191 Rev: "test-rev-99", 192 Operation: "delete", 193 Collection: "social.coves.community.block", 194 RKey: rkey, 195 }, 196 } 197 198 // Should not error (idempotent) 199 err := consumer.HandleEvent(ctx, event) 200 if err != nil { 201 t.Errorf("DELETE of non-existent block should be idempotent, got error: %v", err) 202 } 203 }) 204} 205 206// TestCommunityBlocking_ListBlocked tests listing blocked communities 207func TestCommunityBlocking_ListBlocked(t *testing.T) { 208 if testing.Short() { 209 t.Skip("Skipping integration test in short mode") 210 } 211 212 ctx := context.Background() 213 db := setupTestDB(t) 214 defer cleanupBlockingTestDB(t, db) 215 216 repo := createBlockingTestCommunityRepo(t, db) 217 userDID := "did:plc:test-user-list" 218 219 // Create and block 3 communities 220 testCommunities := make([]*communities.Community, 3) 221 for i := 0; i < 3; i++ { 222 communityDID := fmt.Sprintf("did:plc:test-community-list-%d", i) 223 testCommunities[i] = createBlockingTestCommunity(t, repo, fmt.Sprintf("community-list-%d", i), communityDID) 224 225 block := &communities.CommunityBlock{ 226 UserDID: userDID, 227 CommunityDID: testCommunities[i].DID, 228 BlockedAt: time.Now(), 229 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/%d", userDID, i), 230 RecordCID: fmt.Sprintf("bafyblock%d", i), 231 } 232 _, err := repo.BlockCommunity(ctx, block) 233 if err != nil { 234 t.Fatalf("Failed to block community %d: %v", i, err) 235 } 236 } 237 238 t.Run("lists all blocked communities", func(t *testing.T) { 239 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0) 240 if err != nil { 241 t.Fatalf("ListBlockedCommunities failed: %v", err) 242 } 243 244 if len(blocks) != 3 { 245 t.Errorf("Expected 3 blocks, got %d", len(blocks)) 246 } 247 248 // Verify all blocks belong to correct user 249 for _, block := range blocks { 250 if block.UserDID != userDID { 251 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 252 } 253 } 254 }) 255 256 t.Run("pagination works correctly", func(t *testing.T) { 257 // Get first 2 258 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 2, 0) 259 if err != nil { 260 t.Fatalf("ListBlockedCommunities with limit failed: %v", err) 261 } 262 if len(blocks) != 2 { 263 t.Errorf("Expected 2 blocks (paginated), got %d", len(blocks)) 264 } 265 266 // Get next 2 (should only get 1) 267 blocksPage2, err := repo.ListBlockedCommunities(ctx, userDID, 2, 2) 268 if err != nil { 269 t.Fatalf("ListBlockedCommunities page 2 failed: %v", err) 270 } 271 if len(blocksPage2) != 1 { 272 t.Errorf("Expected 1 block on page 2, got %d", len(blocksPage2)) 273 } 274 }) 275 276 t.Run("returns empty list for user with no blocks", func(t *testing.T) { 277 blocks, err := repo.ListBlockedCommunities(ctx, "did:plc:user-no-blocks", 10, 0) 278 if err != nil { 279 t.Fatalf("ListBlockedCommunities failed: %v", err) 280 } 281 if len(blocks) != 0 { 282 t.Errorf("Expected 0 blocks, got %d", len(blocks)) 283 } 284 }) 285} 286 287// TestCommunityBlocking_IsBlocked tests the fast block check 288func TestCommunityBlocking_IsBlocked(t *testing.T) { 289 if testing.Short() { 290 t.Skip("Skipping integration test in short mode") 291 } 292 293 ctx := context.Background() 294 db := setupTestDB(t) 295 defer cleanupBlockingTestDB(t, db) 296 297 repo := createBlockingTestCommunityRepo(t, db) 298 299 userDID := "did:plc:test-user-isblocked" 300 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 301 community := createBlockingTestCommunity(t, repo, "test-community-isblocked", communityDID) 302 303 t.Run("returns false when not blocked", func(t *testing.T) { 304 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 305 if err != nil { 306 t.Fatalf("IsBlocked failed: %v", err) 307 } 308 if isBlocked { 309 t.Error("Expected IsBlocked=false, got true") 310 } 311 }) 312 313 t.Run("returns true when blocked", func(t *testing.T) { 314 // Create block 315 block := &communities.CommunityBlock{ 316 UserDID: userDID, 317 CommunityDID: community.DID, 318 BlockedAt: time.Now(), 319 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/test", userDID), 320 RecordCID: "bafyblocktest", 321 } 322 _, err := repo.BlockCommunity(ctx, block) 323 if err != nil { 324 t.Fatalf("Failed to create block: %v", err) 325 } 326 327 // Check IsBlocked 328 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 329 if err != nil { 330 t.Fatalf("IsBlocked failed: %v", err) 331 } 332 if !isBlocked { 333 t.Error("Expected IsBlocked=true, got false") 334 } 335 }) 336 337 t.Run("returns false after unblock", func(t *testing.T) { 338 // Unblock 339 err := repo.UnblockCommunity(ctx, userDID, community.DID) 340 if err != nil { 341 t.Fatalf("UnblockCommunity failed: %v", err) 342 } 343 344 // Check IsBlocked 345 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 346 if err != nil { 347 t.Fatalf("IsBlocked failed: %v", err) 348 } 349 if isBlocked { 350 t.Error("Expected IsBlocked=false after unblock, got true") 351 } 352 }) 353} 354 355// TestCommunityBlocking_GetBlock tests block retrieval 356func TestCommunityBlocking_GetBlock(t *testing.T) { 357 if testing.Short() { 358 t.Skip("Skipping integration test in short mode") 359 } 360 361 ctx := context.Background() 362 db := setupTestDB(t) 363 defer cleanupBlockingTestDB(t, db) 364 365 repo := createBlockingTestCommunityRepo(t, db) 366 367 userDID := "did:plc:test-user-getblock" 368 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 369 community := createBlockingTestCommunity(t, repo, "test-community-getblock", communityDID) 370 371 t.Run("returns error when block doesn't exist", func(t *testing.T) { 372 _, err := repo.GetBlock(ctx, userDID, community.DID) 373 if !communities.IsNotFound(err) { 374 t.Errorf("Expected ErrBlockNotFound, got: %v", err) 375 } 376 }) 377 378 t.Run("retrieves block by user and community DID", func(t *testing.T) { 379 // Create block 380 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID) 381 originalBlock := &communities.CommunityBlock{ 382 UserDID: userDID, 383 CommunityDID: community.DID, 384 BlockedAt: time.Now(), 385 RecordURI: recordURI, 386 RecordCID: "bafyblockgettest", 387 } 388 _, err := repo.BlockCommunity(ctx, originalBlock) 389 if err != nil { 390 t.Fatalf("Failed to create block: %v", err) 391 } 392 393 // Retrieve by user+community 394 block, err := repo.GetBlock(ctx, userDID, community.DID) 395 if err != nil { 396 t.Fatalf("GetBlock failed: %v", err) 397 } 398 399 if block.UserDID != userDID { 400 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 401 } 402 if block.CommunityDID != community.DID { 403 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 404 } 405 if block.RecordURI != recordURI { 406 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI) 407 } 408 }) 409 410 t.Run("retrieves block by URI", func(t *testing.T) { 411 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID) 412 413 // Retrieve by URI 414 block, err := repo.GetBlockByURI(ctx, recordURI) 415 if err != nil { 416 t.Fatalf("GetBlockByURI failed: %v", err) 417 } 418 419 if block.RecordURI != recordURI { 420 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI) 421 } 422 if block.CommunityDID != community.DID { 423 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 424 } 425 }) 426} 427 428// Helper functions for blocking tests 429 430func createBlockingTestCommunityRepo(t *testing.T, db *sql.DB) communities.Repository { 431 return postgresRepo.NewCommunityRepository(db) 432} 433 434func createBlockingTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community { 435 community := &communities.Community{ 436 DID: did, 437 Handle: fmt.Sprintf("!%s@coves.test", name), 438 Name: name, 439 DisplayName: fmt.Sprintf("Test Community %s", name), 440 Description: "Test community for blocking tests", 441 OwnerDID: did, 442 CreatedByDID: "did:plc:test-creator", 443 HostedByDID: "did:plc:test-instance", 444 Visibility: "public", 445 CreatedAt: time.Now(), 446 UpdatedAt: time.Now(), 447 } 448 449 created, err := repo.Create(context.Background(), community) 450 if err != nil { 451 t.Fatalf("Failed to create test community: %v", err) 452 } 453 454 return created 455} 456 457func cleanupBlockingTestDB(t *testing.T, db *sql.DB) { 458 // Clean up test data 459 _, err := db.Exec("DELETE FROM community_blocks WHERE user_did LIKE 'did:plc:test-%'") 460 if err != nil { 461 t.Logf("Warning: Failed to clean up blocks: %v", err) 462 } 463 464 _, err = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:test-community-%'") 465 if err != nil { 466 t.Logf("Warning: Failed to clean up communities: %v", err) 467 } 468 469 if closeErr := db.Close(); closeErr != nil { 470 t.Logf("Failed to close database: %v", closeErr) 471 } 472}