A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "testing" 8 "time" 9 10 "Coves/internal/atproto/jetstream" 11 "Coves/internal/core/communities" 12 13 postgresRepo "Coves/internal/db/postgres" 14) 15 16// TestCommunityBlocking_Indexing tests Jetstream indexing of block events 17func TestCommunityBlocking_Indexing(t *testing.T) { 18 if testing.Short() { 19 t.Skip("Skipping integration test in short mode") 20 } 21 22 ctx := context.Background() 23 db := setupTestDB(t) 24 defer cleanupBlockingTestDB(t, db) 25 26 repo := createBlockingTestCommunityRepo(t, db) 27 // Skip verification in tests 28 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs 29 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil) 30 31 // Create test community 32 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 33 community := createBlockingTestCommunity(t, repo, "test-community-blocking", testDID) 34 35 t.Run("indexes block CREATE event", func(t *testing.T) { 36 userDID := "did:plc:test-user-blocker" 37 rkey := "test-block-1" 38 39 // Simulate Jetstream CREATE event 40 event := &jetstream.JetstreamEvent{ 41 Did: userDID, 42 Kind: "commit", 43 TimeUS: time.Now().UnixMicro(), 44 Commit: &jetstream.CommitEvent{ 45 Rev: "test-rev-1", 46 Operation: "create", 47 Collection: "social.coves.community.block", 48 RKey: rkey, 49 CID: "bafyblock123", 50 Record: map[string]interface{}{ 51 "$type": "social.coves.community.block", 52 "subject": community.DID, 53 "createdAt": time.Now().Format(time.RFC3339), 54 }, 55 }, 56 } 57 58 // Process event 59 err := consumer.HandleEvent(ctx, event) 60 if err != nil { 61 t.Fatalf("Failed to handle block event: %v", err) 62 } 63 64 // Verify block indexed 65 block, err := repo.GetBlock(ctx, userDID, community.DID) 66 if err != nil { 67 t.Fatalf("Failed to get block: %v", err) 68 } 69 70 if block.UserDID != userDID { 71 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 72 } 73 if block.CommunityDID != community.DID { 74 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 75 } 76 77 // Verify IsBlocked works 78 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 79 if err != nil { 80 t.Fatalf("IsBlocked failed: %v", err) 81 } 82 if !isBlocked { 83 t.Error("Expected IsBlocked=true, got false") 84 } 85 }) 86 87 t.Run("indexes block DELETE event", func(t *testing.T) { 88 userDID := "did:plc:test-user-unblocker" 89 rkey := "test-block-2" 90 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, rkey) 91 92 // First create a block 93 block := &communities.CommunityBlock{ 94 UserDID: userDID, 95 CommunityDID: community.DID, 96 BlockedAt: time.Now(), 97 RecordURI: uri, 98 RecordCID: "bafyblock456", 99 } 100 _, err := repo.BlockCommunity(ctx, block) 101 if err != nil { 102 t.Fatalf("Failed to create block: %v", err) 103 } 104 105 // Simulate DELETE event 106 event := &jetstream.JetstreamEvent{ 107 Did: userDID, 108 Kind: "commit", 109 TimeUS: time.Now().UnixMicro(), 110 Commit: &jetstream.CommitEvent{ 111 Rev: "test-rev-2", 112 Operation: "delete", 113 Collection: "social.coves.community.block", 114 RKey: rkey, 115 }, 116 } 117 118 // Process delete 119 err = consumer.HandleEvent(ctx, event) 120 if err != nil { 121 t.Fatalf("Failed to handle delete event: %v", err) 122 } 123 124 // Verify block removed 125 _, err = repo.GetBlock(ctx, userDID, community.DID) 126 if !communities.IsNotFound(err) { 127 t.Error("Expected block to be deleted") 128 } 129 130 // Verify IsBlocked returns false 131 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 132 if err != nil { 133 t.Fatalf("IsBlocked failed: %v", err) 134 } 135 if isBlocked { 136 t.Error("Expected IsBlocked=false, got true") 137 } 138 }) 139 140 t.Run("block is idempotent", func(t *testing.T) { 141 userDID := "did:plc:test-user-idempotent" 142 rkey := "test-block-3" 143 144 event := &jetstream.JetstreamEvent{ 145 Did: userDID, 146 Kind: "commit", 147 TimeUS: time.Now().UnixMicro(), 148 Commit: &jetstream.CommitEvent{ 149 Rev: "test-rev-3", 150 Operation: "create", 151 Collection: "social.coves.community.block", 152 RKey: rkey, 153 CID: "bafyblock789", 154 Record: map[string]interface{}{ 155 "$type": "social.coves.community.block", 156 "subject": community.DID, 157 "createdAt": time.Now().Format(time.RFC3339), 158 }, 159 }, 160 } 161 162 // Process event twice 163 err := consumer.HandleEvent(ctx, event) 164 if err != nil { 165 t.Fatalf("First block failed: %v", err) 166 } 167 168 err = consumer.HandleEvent(ctx, event) 169 if err != nil { 170 t.Fatalf("Second block (idempotent) failed: %v", err) 171 } 172 173 // Should still exist only once 174 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0) 175 if err != nil { 176 t.Fatalf("ListBlockedCommunities failed: %v", err) 177 } 178 if len(blocks) != 1 { 179 t.Errorf("Expected 1 block, got %d", len(blocks)) 180 } 181 }) 182 183 t.Run("handles DELETE of non-existent block gracefully", func(t *testing.T) { 184 userDID := "did:plc:test-user-nonexistent" 185 rkey := "test-block-nonexistent" 186 187 // Simulate DELETE event for block that doesn't exist 188 event := &jetstream.JetstreamEvent{ 189 Did: userDID, 190 Kind: "commit", 191 TimeUS: time.Now().UnixMicro(), 192 Commit: &jetstream.CommitEvent{ 193 Rev: "test-rev-99", 194 Operation: "delete", 195 Collection: "social.coves.community.block", 196 RKey: rkey, 197 }, 198 } 199 200 // Should not error (idempotent) 201 err := consumer.HandleEvent(ctx, event) 202 if err != nil { 203 t.Errorf("DELETE of non-existent block should be idempotent, got error: %v", err) 204 } 205 }) 206} 207 208// TestCommunityBlocking_ListBlocked tests listing blocked communities 209func TestCommunityBlocking_ListBlocked(t *testing.T) { 210 if testing.Short() { 211 t.Skip("Skipping integration test in short mode") 212 } 213 214 ctx := context.Background() 215 db := setupTestDB(t) 216 defer cleanupBlockingTestDB(t, db) 217 218 repo := createBlockingTestCommunityRepo(t, db) 219 userDID := "did:plc:test-user-list" 220 221 // Create and block 3 communities 222 testCommunities := make([]*communities.Community, 3) 223 for i := 0; i < 3; i++ { 224 communityDID := fmt.Sprintf("did:plc:test-community-list-%d", i) 225 testCommunities[i] = createBlockingTestCommunity(t, repo, fmt.Sprintf("community-list-%d", i), communityDID) 226 227 block := &communities.CommunityBlock{ 228 UserDID: userDID, 229 CommunityDID: testCommunities[i].DID, 230 BlockedAt: time.Now(), 231 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/%d", userDID, i), 232 RecordCID: fmt.Sprintf("bafyblock%d", i), 233 } 234 _, err := repo.BlockCommunity(ctx, block) 235 if err != nil { 236 t.Fatalf("Failed to block community %d: %v", i, err) 237 } 238 } 239 240 t.Run("lists all blocked communities", func(t *testing.T) { 241 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0) 242 if err != nil { 243 t.Fatalf("ListBlockedCommunities failed: %v", err) 244 } 245 246 if len(blocks) != 3 { 247 t.Errorf("Expected 3 blocks, got %d", len(blocks)) 248 } 249 250 // Verify all blocks belong to correct user 251 for _, block := range blocks { 252 if block.UserDID != userDID { 253 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 254 } 255 } 256 }) 257 258 t.Run("pagination works correctly", func(t *testing.T) { 259 // Get first 2 260 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 2, 0) 261 if err != nil { 262 t.Fatalf("ListBlockedCommunities with limit failed: %v", err) 263 } 264 if len(blocks) != 2 { 265 t.Errorf("Expected 2 blocks (paginated), got %d", len(blocks)) 266 } 267 268 // Get next 2 (should only get 1) 269 blocksPage2, err := repo.ListBlockedCommunities(ctx, userDID, 2, 2) 270 if err != nil { 271 t.Fatalf("ListBlockedCommunities page 2 failed: %v", err) 272 } 273 if len(blocksPage2) != 1 { 274 t.Errorf("Expected 1 block on page 2, got %d", len(blocksPage2)) 275 } 276 }) 277 278 t.Run("returns empty list for user with no blocks", func(t *testing.T) { 279 blocks, err := repo.ListBlockedCommunities(ctx, "did:plc:user-no-blocks", 10, 0) 280 if err != nil { 281 t.Fatalf("ListBlockedCommunities failed: %v", err) 282 } 283 if len(blocks) != 0 { 284 t.Errorf("Expected 0 blocks, got %d", len(blocks)) 285 } 286 }) 287} 288 289// TestCommunityBlocking_IsBlocked tests the fast block check 290func TestCommunityBlocking_IsBlocked(t *testing.T) { 291 if testing.Short() { 292 t.Skip("Skipping integration test in short mode") 293 } 294 295 ctx := context.Background() 296 db := setupTestDB(t) 297 defer cleanupBlockingTestDB(t, db) 298 299 repo := createBlockingTestCommunityRepo(t, db) 300 301 userDID := "did:plc:test-user-isblocked" 302 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 303 community := createBlockingTestCommunity(t, repo, "test-community-isblocked", communityDID) 304 305 t.Run("returns false when not blocked", func(t *testing.T) { 306 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 307 if err != nil { 308 t.Fatalf("IsBlocked failed: %v", err) 309 } 310 if isBlocked { 311 t.Error("Expected IsBlocked=false, got true") 312 } 313 }) 314 315 t.Run("returns true when blocked", func(t *testing.T) { 316 // Create block 317 block := &communities.CommunityBlock{ 318 UserDID: userDID, 319 CommunityDID: community.DID, 320 BlockedAt: time.Now(), 321 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/test", userDID), 322 RecordCID: "bafyblocktest", 323 } 324 _, err := repo.BlockCommunity(ctx, block) 325 if err != nil { 326 t.Fatalf("Failed to create block: %v", err) 327 } 328 329 // Check IsBlocked 330 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 331 if err != nil { 332 t.Fatalf("IsBlocked failed: %v", err) 333 } 334 if !isBlocked { 335 t.Error("Expected IsBlocked=true, got false") 336 } 337 }) 338 339 t.Run("returns false after unblock", func(t *testing.T) { 340 // Unblock 341 err := repo.UnblockCommunity(ctx, userDID, community.DID) 342 if err != nil { 343 t.Fatalf("UnblockCommunity failed: %v", err) 344 } 345 346 // Check IsBlocked 347 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 348 if err != nil { 349 t.Fatalf("IsBlocked failed: %v", err) 350 } 351 if isBlocked { 352 t.Error("Expected IsBlocked=false after unblock, got true") 353 } 354 }) 355} 356 357// TestCommunityBlocking_GetBlock tests block retrieval 358func TestCommunityBlocking_GetBlock(t *testing.T) { 359 if testing.Short() { 360 t.Skip("Skipping integration test in short mode") 361 } 362 363 ctx := context.Background() 364 db := setupTestDB(t) 365 defer cleanupBlockingTestDB(t, db) 366 367 repo := createBlockingTestCommunityRepo(t, db) 368 369 userDID := "did:plc:test-user-getblock" 370 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 371 community := createBlockingTestCommunity(t, repo, "test-community-getblock", communityDID) 372 373 t.Run("returns error when block doesn't exist", func(t *testing.T) { 374 _, err := repo.GetBlock(ctx, userDID, community.DID) 375 if !communities.IsNotFound(err) { 376 t.Errorf("Expected ErrBlockNotFound, got: %v", err) 377 } 378 }) 379 380 t.Run("retrieves block by user and community DID", func(t *testing.T) { 381 // Create block 382 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID) 383 originalBlock := &communities.CommunityBlock{ 384 UserDID: userDID, 385 CommunityDID: community.DID, 386 BlockedAt: time.Now(), 387 RecordURI: recordURI, 388 RecordCID: "bafyblockgettest", 389 } 390 _, err := repo.BlockCommunity(ctx, originalBlock) 391 if err != nil { 392 t.Fatalf("Failed to create block: %v", err) 393 } 394 395 // Retrieve by user+community 396 block, err := repo.GetBlock(ctx, userDID, community.DID) 397 if err != nil { 398 t.Fatalf("GetBlock failed: %v", err) 399 } 400 401 if block.UserDID != userDID { 402 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 403 } 404 if block.CommunityDID != community.DID { 405 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 406 } 407 if block.RecordURI != recordURI { 408 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI) 409 } 410 }) 411 412 t.Run("retrieves block by URI", func(t *testing.T) { 413 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID) 414 415 // Retrieve by URI 416 block, err := repo.GetBlockByURI(ctx, recordURI) 417 if err != nil { 418 t.Fatalf("GetBlockByURI failed: %v", err) 419 } 420 421 if block.RecordURI != recordURI { 422 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI) 423 } 424 if block.CommunityDID != community.DID { 425 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 426 } 427 }) 428} 429 430// Helper functions for blocking tests 431 432func createBlockingTestCommunityRepo(t *testing.T, db *sql.DB) communities.Repository { 433 return postgresRepo.NewCommunityRepository(db) 434} 435 436func createBlockingTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community { 437 community := &communities.Community{ 438 DID: did, 439 Handle: fmt.Sprintf("!%s@coves.test", name), 440 Name: name, 441 DisplayName: fmt.Sprintf("Test Community %s", name), 442 Description: "Test community for blocking tests", 443 OwnerDID: did, 444 CreatedByDID: "did:plc:test-creator", 445 HostedByDID: "did:plc:test-instance", 446 Visibility: "public", 447 CreatedAt: time.Now(), 448 UpdatedAt: time.Now(), 449 } 450 451 created, err := repo.Create(context.Background(), community) 452 if err != nil { 453 t.Fatalf("Failed to create test community: %v", err) 454 } 455 456 return created 457} 458 459func cleanupBlockingTestDB(t *testing.T, db *sql.DB) { 460 // Clean up test data 461 _, err := db.Exec("DELETE FROM community_blocks WHERE user_did LIKE 'did:plc:test-%'") 462 if err != nil { 463 t.Logf("Warning: Failed to clean up blocks: %v", err) 464 } 465 466 _, err = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:test-community-%'") 467 if err != nil { 468 t.Logf("Warning: Failed to clean up communities: %v", err) 469 } 470 471 if closeErr := db.Close(); closeErr != nil { 472 t.Logf("Failed to close database: %v", closeErr) 473 } 474}