A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/atproto/jetstream" 5 "Coves/internal/core/communities" 6 "context" 7 "database/sql" 8 "fmt" 9 "testing" 10 "time" 11 12 postgresRepo "Coves/internal/db/postgres" 13) 14 15// TestCommunityBlocking_Indexing tests Jetstream indexing of block events 16func TestCommunityBlocking_Indexing(t *testing.T) { 17 if testing.Short() { 18 t.Skip("Skipping integration test in short mode") 19 } 20 21 ctx := context.Background() 22 db := setupTestDB(t) 23 defer cleanupBlockingTestDB(t, db) 24 25 repo := createBlockingTestCommunityRepo(t, db) 26 // Skip verification in tests 27 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs 28 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil) 29 30 // Create test community 31 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 32 community := createBlockingTestCommunity(t, repo, "test-community-blocking", testDID) 33 34 t.Run("indexes block CREATE event", func(t *testing.T) { 35 userDID := "did:plc:test-user-blocker" 36 rkey := "test-block-1" 37 38 // Simulate Jetstream CREATE event 39 event := &jetstream.JetstreamEvent{ 40 Did: userDID, 41 Kind: "commit", 42 TimeUS: time.Now().UnixMicro(), 43 Commit: &jetstream.CommitEvent{ 44 Rev: "test-rev-1", 45 Operation: "create", 46 Collection: "social.coves.community.block", 47 RKey: rkey, 48 CID: "bafyblock123", 49 Record: map[string]interface{}{ 50 "$type": "social.coves.community.block", 51 "subject": community.DID, 52 "createdAt": time.Now().Format(time.RFC3339), 53 }, 54 }, 55 } 56 57 // Process event 58 err := consumer.HandleEvent(ctx, event) 59 if err != nil { 60 t.Fatalf("Failed to handle block event: %v", err) 61 } 62 63 // Verify block indexed 64 block, err := repo.GetBlock(ctx, userDID, community.DID) 65 if err != nil { 66 t.Fatalf("Failed to get block: %v", err) 67 } 68 69 if block.UserDID != userDID { 70 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 71 } 72 if block.CommunityDID != community.DID { 73 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 74 } 75 76 // Verify IsBlocked works 77 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 78 if err != nil { 79 t.Fatalf("IsBlocked failed: %v", err) 80 } 81 if !isBlocked { 82 t.Error("Expected IsBlocked=true, got false") 83 } 84 }) 85 86 t.Run("indexes block DELETE event", func(t *testing.T) { 87 userDID := "did:plc:test-user-unblocker" 88 rkey := "test-block-2" 89 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, rkey) 90 91 // First create a block 92 block := &communities.CommunityBlock{ 93 UserDID: userDID, 94 CommunityDID: community.DID, 95 BlockedAt: time.Now(), 96 RecordURI: uri, 97 RecordCID: "bafyblock456", 98 } 99 _, err := repo.BlockCommunity(ctx, block) 100 if err != nil { 101 t.Fatalf("Failed to create block: %v", err) 102 } 103 104 // Simulate DELETE event 105 event := &jetstream.JetstreamEvent{ 106 Did: userDID, 107 Kind: "commit", 108 TimeUS: time.Now().UnixMicro(), 109 Commit: &jetstream.CommitEvent{ 110 Rev: "test-rev-2", 111 Operation: "delete", 112 Collection: "social.coves.community.block", 113 RKey: rkey, 114 }, 115 } 116 117 // Process delete 118 err = consumer.HandleEvent(ctx, event) 119 if err != nil { 120 t.Fatalf("Failed to handle delete event: %v", err) 121 } 122 123 // Verify block removed 124 _, err = repo.GetBlock(ctx, userDID, community.DID) 125 if !communities.IsNotFound(err) { 126 t.Error("Expected block to be deleted") 127 } 128 129 // Verify IsBlocked returns false 130 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 131 if err != nil { 132 t.Fatalf("IsBlocked failed: %v", err) 133 } 134 if isBlocked { 135 t.Error("Expected IsBlocked=false, got true") 136 } 137 }) 138 139 t.Run("block is idempotent", func(t *testing.T) { 140 userDID := "did:plc:test-user-idempotent" 141 rkey := "test-block-3" 142 143 event := &jetstream.JetstreamEvent{ 144 Did: userDID, 145 Kind: "commit", 146 TimeUS: time.Now().UnixMicro(), 147 Commit: &jetstream.CommitEvent{ 148 Rev: "test-rev-3", 149 Operation: "create", 150 Collection: "social.coves.community.block", 151 RKey: rkey, 152 CID: "bafyblock789", 153 Record: map[string]interface{}{ 154 "$type": "social.coves.community.block", 155 "subject": community.DID, 156 "createdAt": time.Now().Format(time.RFC3339), 157 }, 158 }, 159 } 160 161 // Process event twice 162 err := consumer.HandleEvent(ctx, event) 163 if err != nil { 164 t.Fatalf("First block failed: %v", err) 165 } 166 167 err = consumer.HandleEvent(ctx, event) 168 if err != nil { 169 t.Fatalf("Second block (idempotent) failed: %v", err) 170 } 171 172 // Should still exist only once 173 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0) 174 if err != nil { 175 t.Fatalf("ListBlockedCommunities failed: %v", err) 176 } 177 if len(blocks) != 1 { 178 t.Errorf("Expected 1 block, got %d", len(blocks)) 179 } 180 }) 181 182 t.Run("handles DELETE of non-existent block gracefully", func(t *testing.T) { 183 userDID := "did:plc:test-user-nonexistent" 184 rkey := "test-block-nonexistent" 185 186 // Simulate DELETE event for block that doesn't exist 187 event := &jetstream.JetstreamEvent{ 188 Did: userDID, 189 Kind: "commit", 190 TimeUS: time.Now().UnixMicro(), 191 Commit: &jetstream.CommitEvent{ 192 Rev: "test-rev-99", 193 Operation: "delete", 194 Collection: "social.coves.community.block", 195 RKey: rkey, 196 }, 197 } 198 199 // Should not error (idempotent) 200 err := consumer.HandleEvent(ctx, event) 201 if err != nil { 202 t.Errorf("DELETE of non-existent block should be idempotent, got error: %v", err) 203 } 204 }) 205} 206 207// TestCommunityBlocking_ListBlocked tests listing blocked communities 208func TestCommunityBlocking_ListBlocked(t *testing.T) { 209 if testing.Short() { 210 t.Skip("Skipping integration test in short mode") 211 } 212 213 ctx := context.Background() 214 db := setupTestDB(t) 215 defer cleanupBlockingTestDB(t, db) 216 217 repo := createBlockingTestCommunityRepo(t, db) 218 userDID := "did:plc:test-user-list" 219 220 // Create and block 3 communities 221 testCommunities := make([]*communities.Community, 3) 222 for i := 0; i < 3; i++ { 223 communityDID := fmt.Sprintf("did:plc:test-community-list-%d", i) 224 testCommunities[i] = createBlockingTestCommunity(t, repo, fmt.Sprintf("community-list-%d", i), communityDID) 225 226 block := &communities.CommunityBlock{ 227 UserDID: userDID, 228 CommunityDID: testCommunities[i].DID, 229 BlockedAt: time.Now(), 230 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/%d", userDID, i), 231 RecordCID: fmt.Sprintf("bafyblock%d", i), 232 } 233 _, err := repo.BlockCommunity(ctx, block) 234 if err != nil { 235 t.Fatalf("Failed to block community %d: %v", i, err) 236 } 237 } 238 239 t.Run("lists all blocked communities", func(t *testing.T) { 240 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0) 241 if err != nil { 242 t.Fatalf("ListBlockedCommunities failed: %v", err) 243 } 244 245 if len(blocks) != 3 { 246 t.Errorf("Expected 3 blocks, got %d", len(blocks)) 247 } 248 249 // Verify all blocks belong to correct user 250 for _, block := range blocks { 251 if block.UserDID != userDID { 252 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 253 } 254 } 255 }) 256 257 t.Run("pagination works correctly", func(t *testing.T) { 258 // Get first 2 259 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 2, 0) 260 if err != nil { 261 t.Fatalf("ListBlockedCommunities with limit failed: %v", err) 262 } 263 if len(blocks) != 2 { 264 t.Errorf("Expected 2 blocks (paginated), got %d", len(blocks)) 265 } 266 267 // Get next 2 (should only get 1) 268 blocksPage2, err := repo.ListBlockedCommunities(ctx, userDID, 2, 2) 269 if err != nil { 270 t.Fatalf("ListBlockedCommunities page 2 failed: %v", err) 271 } 272 if len(blocksPage2) != 1 { 273 t.Errorf("Expected 1 block on page 2, got %d", len(blocksPage2)) 274 } 275 }) 276 277 t.Run("returns empty list for user with no blocks", func(t *testing.T) { 278 blocks, err := repo.ListBlockedCommunities(ctx, "did:plc:user-no-blocks", 10, 0) 279 if err != nil { 280 t.Fatalf("ListBlockedCommunities failed: %v", err) 281 } 282 if len(blocks) != 0 { 283 t.Errorf("Expected 0 blocks, got %d", len(blocks)) 284 } 285 }) 286} 287 288// TestCommunityBlocking_IsBlocked tests the fast block check 289func TestCommunityBlocking_IsBlocked(t *testing.T) { 290 if testing.Short() { 291 t.Skip("Skipping integration test in short mode") 292 } 293 294 ctx := context.Background() 295 db := setupTestDB(t) 296 defer cleanupBlockingTestDB(t, db) 297 298 repo := createBlockingTestCommunityRepo(t, db) 299 300 userDID := "did:plc:test-user-isblocked" 301 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 302 community := createBlockingTestCommunity(t, repo, "test-community-isblocked", communityDID) 303 304 t.Run("returns false when not blocked", func(t *testing.T) { 305 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 306 if err != nil { 307 t.Fatalf("IsBlocked failed: %v", err) 308 } 309 if isBlocked { 310 t.Error("Expected IsBlocked=false, got true") 311 } 312 }) 313 314 t.Run("returns true when blocked", func(t *testing.T) { 315 // Create block 316 block := &communities.CommunityBlock{ 317 UserDID: userDID, 318 CommunityDID: community.DID, 319 BlockedAt: time.Now(), 320 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/test", userDID), 321 RecordCID: "bafyblocktest", 322 } 323 _, err := repo.BlockCommunity(ctx, block) 324 if err != nil { 325 t.Fatalf("Failed to create block: %v", err) 326 } 327 328 // Check IsBlocked 329 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 330 if err != nil { 331 t.Fatalf("IsBlocked failed: %v", err) 332 } 333 if !isBlocked { 334 t.Error("Expected IsBlocked=true, got false") 335 } 336 }) 337 338 t.Run("returns false after unblock", func(t *testing.T) { 339 // Unblock 340 err := repo.UnblockCommunity(ctx, userDID, community.DID) 341 if err != nil { 342 t.Fatalf("UnblockCommunity failed: %v", err) 343 } 344 345 // Check IsBlocked 346 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID) 347 if err != nil { 348 t.Fatalf("IsBlocked failed: %v", err) 349 } 350 if isBlocked { 351 t.Error("Expected IsBlocked=false after unblock, got true") 352 } 353 }) 354} 355 356// TestCommunityBlocking_GetBlock tests block retrieval 357func TestCommunityBlocking_GetBlock(t *testing.T) { 358 if testing.Short() { 359 t.Skip("Skipping integration test in short mode") 360 } 361 362 ctx := context.Background() 363 db := setupTestDB(t) 364 defer cleanupBlockingTestDB(t, db) 365 366 repo := createBlockingTestCommunityRepo(t, db) 367 368 userDID := "did:plc:test-user-getblock" 369 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 370 community := createBlockingTestCommunity(t, repo, "test-community-getblock", communityDID) 371 372 t.Run("returns error when block doesn't exist", func(t *testing.T) { 373 _, err := repo.GetBlock(ctx, userDID, community.DID) 374 if !communities.IsNotFound(err) { 375 t.Errorf("Expected ErrBlockNotFound, got: %v", err) 376 } 377 }) 378 379 t.Run("retrieves block by user and community DID", func(t *testing.T) { 380 // Create block 381 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID) 382 originalBlock := &communities.CommunityBlock{ 383 UserDID: userDID, 384 CommunityDID: community.DID, 385 BlockedAt: time.Now(), 386 RecordURI: recordURI, 387 RecordCID: "bafyblockgettest", 388 } 389 _, err := repo.BlockCommunity(ctx, originalBlock) 390 if err != nil { 391 t.Fatalf("Failed to create block: %v", err) 392 } 393 394 // Retrieve by user+community 395 block, err := repo.GetBlock(ctx, userDID, community.DID) 396 if err != nil { 397 t.Fatalf("GetBlock failed: %v", err) 398 } 399 400 if block.UserDID != userDID { 401 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID) 402 } 403 if block.CommunityDID != community.DID { 404 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 405 } 406 if block.RecordURI != recordURI { 407 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI) 408 } 409 }) 410 411 t.Run("retrieves block by URI", func(t *testing.T) { 412 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID) 413 414 // Retrieve by URI 415 block, err := repo.GetBlockByURI(ctx, recordURI) 416 if err != nil { 417 t.Fatalf("GetBlockByURI failed: %v", err) 418 } 419 420 if block.RecordURI != recordURI { 421 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI) 422 } 423 if block.CommunityDID != community.DID { 424 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID) 425 } 426 }) 427} 428 429// Helper functions for blocking tests 430 431func createBlockingTestCommunityRepo(t *testing.T, db *sql.DB) communities.Repository { 432 return postgresRepo.NewCommunityRepository(db) 433} 434 435func createBlockingTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community { 436 community := &communities.Community{ 437 DID: did, 438 Handle: fmt.Sprintf("!%s@coves.test", name), 439 Name: name, 440 DisplayName: fmt.Sprintf("Test Community %s", name), 441 Description: "Test community for blocking tests", 442 OwnerDID: did, 443 CreatedByDID: "did:plc:test-creator", 444 HostedByDID: "did:plc:test-instance", 445 Visibility: "public", 446 CreatedAt: time.Now(), 447 UpdatedAt: time.Now(), 448 } 449 450 created, err := repo.Create(context.Background(), community) 451 if err != nil { 452 t.Fatalf("Failed to create test community: %v", err) 453 } 454 455 return created 456} 457 458func cleanupBlockingTestDB(t *testing.T, db *sql.DB) { 459 // Clean up test data 460 _, err := db.Exec("DELETE FROM community_blocks WHERE user_did LIKE 'did:plc:test-%'") 461 if err != nil { 462 t.Logf("Warning: Failed to clean up blocks: %v", err) 463 } 464 465 _, err = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:test-community-%'") 466 if err != nil { 467 t.Logf("Warning: Failed to clean up communities: %v", err) 468 } 469 470 if closeErr := db.Close(); closeErr != nil { 471 t.Logf("Failed to close database: %v", closeErr) 472 } 473}