A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "testing" 8 "time" 9 10 "Coves/internal/atproto/identity" 11 "Coves/internal/atproto/jetstream" 12 "Coves/internal/core/communities" 13 "Coves/internal/db/postgres" 14) 15 16func TestCommunityConsumer_HandleCommunityProfile(t *testing.T) { 17 db := setupTestDB(t) 18 defer func() { 19 if err := db.Close(); err != nil { 20 t.Logf("Failed to close database: %v", err) 21 } 22 }() 23 24 repo := postgres.NewCommunityRepository(db) 25 ctx := context.Background() 26 27 t.Run("creates community from firehose event", func(t *testing.T) { 28 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 29 communityDID := generateTestDID(uniqueSuffix) 30 communityName := fmt.Sprintf("test-community-%s", uniqueSuffix) 31 expectedHandle := fmt.Sprintf("%s.community.coves.local", communityName) 32 33 // Set up mock resolver for this test DID 34 mockResolver := newMockIdentityResolver() 35 mockResolver.resolutions[communityDID] = expectedHandle 36 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver) 37 38 // Simulate a Jetstream commit event 39 event := &jetstream.JetstreamEvent{ 40 Did: communityDID, 41 TimeUS: time.Now().UnixMicro(), 42 Kind: "commit", 43 Commit: &jetstream.CommitEvent{ 44 Rev: "rev123", 45 Operation: "create", 46 Collection: "social.coves.community.profile", 47 RKey: "self", 48 CID: "bafy123abc", 49 Record: map[string]interface{}{ 50 // Note: No 'did', 'handle', 'memberCount', or 'subscriberCount' in record 51 // These are resolved/computed by AppView, not stored in immutable records 52 "name": communityName, 53 "displayName": "Test Community", 54 "description": "A test community", 55 "owner": "did:web:coves.local", 56 "createdBy": "did:plc:user123", 57 "hostedBy": "did:web:coves.local", 58 "visibility": "public", 59 "federation": map[string]interface{}{ 60 "allowExternalDiscovery": true, 61 }, 62 "createdAt": time.Now().Format(time.RFC3339), 63 }, 64 }, 65 } 66 67 // Handle the event 68 if err := consumer.HandleEvent(ctx, event); err != nil { 69 t.Fatalf("Failed to handle event: %v", err) 70 } 71 72 // Verify community was indexed 73 community, err := repo.GetByDID(ctx, communityDID) 74 if err != nil { 75 t.Fatalf("Failed to get indexed community: %v", err) 76 } 77 78 if community.DID != communityDID { 79 t.Errorf("Expected DID %s, got %s", communityDID, community.DID) 80 } 81 if community.DisplayName != "Test Community" { 82 t.Errorf("Expected DisplayName 'Test Community', got %s", community.DisplayName) 83 } 84 if community.Visibility != "public" { 85 t.Errorf("Expected Visibility 'public', got %s", community.Visibility) 86 } 87 }) 88 89 t.Run("updates existing community", func(t *testing.T) { 90 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 91 communityDID := generateTestDID(uniqueSuffix) 92 communityName := fmt.Sprintf("update-test-%s", uniqueSuffix) 93 expectedHandle := fmt.Sprintf("%s.community.coves.local", communityName) 94 95 // Set up mock resolver for this test DID 96 mockResolver := newMockIdentityResolver() 97 mockResolver.resolutions[communityDID] = expectedHandle 98 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver) 99 100 // Create initial community 101 initialCommunity := &communities.Community{ 102 DID: communityDID, 103 Handle: expectedHandle, 104 Name: communityName, 105 DisplayName: "Original Name", 106 Description: "Original description", 107 OwnerDID: "did:web:coves.local", 108 CreatedByDID: "did:plc:user123", 109 HostedByDID: "did:web:coves.local", 110 Visibility: "public", 111 AllowExternalDiscovery: true, 112 CreatedAt: time.Now(), 113 UpdatedAt: time.Now(), 114 } 115 116 if _, err := repo.Create(ctx, initialCommunity); err != nil { 117 t.Fatalf("Failed to create initial community: %v", err) 118 } 119 120 // Simulate update event 121 updateEvent := &jetstream.JetstreamEvent{ 122 Did: communityDID, 123 TimeUS: time.Now().UnixMicro(), 124 Kind: "commit", 125 Commit: &jetstream.CommitEvent{ 126 Rev: "rev124", 127 Operation: "update", 128 Collection: "social.coves.community.profile", 129 RKey: "self", 130 CID: "bafy456def", 131 Record: map[string]interface{}{ 132 // Note: No 'did', 'handle', 'memberCount', or 'subscriberCount' in record 133 // These are resolved/computed by AppView, not stored in immutable records 134 "name": "update-test", 135 "displayName": "Updated Name", 136 "description": "Updated description", 137 "owner": "did:web:coves.local", 138 "createdBy": "did:plc:user123", 139 "hostedBy": "did:web:coves.local", 140 "visibility": "unlisted", 141 "federation": map[string]interface{}{ 142 "allowExternalDiscovery": false, 143 }, 144 "createdAt": time.Now().Format(time.RFC3339), 145 }, 146 }, 147 } 148 149 // Handle the update 150 if err := consumer.HandleEvent(ctx, updateEvent); err != nil { 151 t.Fatalf("Failed to handle update event: %v", err) 152 } 153 154 // Verify community was updated 155 updated, err := repo.GetByDID(ctx, communityDID) 156 if err != nil { 157 t.Fatalf("Failed to get updated community: %v", err) 158 } 159 160 if updated.DisplayName != "Updated Name" { 161 t.Errorf("Expected DisplayName 'Updated Name', got %s", updated.DisplayName) 162 } 163 if updated.Description != "Updated description" { 164 t.Errorf("Expected Description 'Updated description', got %s", updated.Description) 165 } 166 if updated.Visibility != "unlisted" { 167 t.Errorf("Expected Visibility 'unlisted', got %s", updated.Visibility) 168 } 169 if updated.AllowExternalDiscovery { 170 t.Error("Expected AllowExternalDiscovery to be false") 171 } 172 }) 173 174 t.Run("deletes community", func(t *testing.T) { 175 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 176 communityDID := generateTestDID(uniqueSuffix) 177 communityName := fmt.Sprintf("delete-test-%s", uniqueSuffix) 178 expectedHandle := fmt.Sprintf("%s.community.coves.local", communityName) 179 180 // Set up mock resolver for this test DID 181 mockResolver := newMockIdentityResolver() 182 mockResolver.resolutions[communityDID] = expectedHandle 183 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver) 184 185 // Create community to delete 186 community := &communities.Community{ 187 DID: communityDID, 188 Handle: expectedHandle, 189 Name: communityName, 190 OwnerDID: "did:web:coves.local", 191 CreatedByDID: "did:plc:user123", 192 HostedByDID: "did:web:coves.local", 193 Visibility: "public", 194 CreatedAt: time.Now(), 195 UpdatedAt: time.Now(), 196 } 197 198 if _, err := repo.Create(ctx, community); err != nil { 199 t.Fatalf("Failed to create community: %v", err) 200 } 201 202 // Simulate delete event 203 deleteEvent := &jetstream.JetstreamEvent{ 204 Did: communityDID, 205 TimeUS: time.Now().UnixMicro(), 206 Kind: "commit", 207 Commit: &jetstream.CommitEvent{ 208 Rev: "rev125", 209 Operation: "delete", 210 Collection: "social.coves.community.profile", 211 RKey: "self", 212 }, 213 } 214 215 // Handle the delete 216 if err := consumer.HandleEvent(ctx, deleteEvent); err != nil { 217 t.Fatalf("Failed to handle delete event: %v", err) 218 } 219 220 // Verify community was deleted 221 if _, err := repo.GetByDID(ctx, communityDID); err != communities.ErrCommunityNotFound { 222 t.Errorf("Expected ErrCommunityNotFound, got: %v", err) 223 } 224 }) 225} 226 227func TestCommunityConsumer_HandleSubscription(t *testing.T) { 228 db := setupTestDB(t) 229 defer func() { 230 if err := db.Close(); err != nil { 231 t.Logf("Failed to close database: %v", err) 232 } 233 }() 234 235 repo := postgres.NewCommunityRepository(db) 236 ctx := context.Background() 237 238 t.Run("creates subscription from event", func(t *testing.T) { 239 // Create a community first 240 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 241 communityDID := generateTestDID(uniqueSuffix) 242 communityName := fmt.Sprintf("sub-test-%s", uniqueSuffix) 243 expectedHandle := fmt.Sprintf("%s.community.coves.local", communityName) 244 245 // Set up mock resolver for this test DID 246 mockResolver := newMockIdentityResolver() 247 mockResolver.resolutions[communityDID] = expectedHandle 248 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver) 249 250 community := &communities.Community{ 251 DID: communityDID, 252 Handle: expectedHandle, 253 Name: communityName, 254 OwnerDID: "did:web:coves.local", 255 CreatedByDID: "did:plc:user123", 256 HostedByDID: "did:web:coves.local", 257 Visibility: "public", 258 CreatedAt: time.Now(), 259 UpdatedAt: time.Now(), 260 } 261 262 if _, err := repo.Create(ctx, community); err != nil { 263 t.Fatalf("Failed to create community: %v", err) 264 } 265 266 // Simulate subscription event 267 // IMPORTANT: Use correct collection name (record type, not XRPC procedure) 268 userDID := "did:plc:subscriber123" 269 subEvent := &jetstream.JetstreamEvent{ 270 Did: userDID, 271 TimeUS: time.Now().UnixMicro(), 272 Kind: "commit", 273 Commit: &jetstream.CommitEvent{ 274 Rev: "rev200", 275 Operation: "create", 276 Collection: "social.coves.community.subscription", // Updated to communities namespace 277 RKey: "sub123", 278 CID: "bafy789ghi", 279 Record: map[string]interface{}{ 280 "subject": communityDID, // Using 'subject' per atProto conventions 281 "contentVisibility": 3, 282 "createdAt": time.Now().Format(time.RFC3339), 283 }, 284 }, 285 } 286 287 // Handle the subscription 288 if err := consumer.HandleEvent(ctx, subEvent); err != nil { 289 t.Fatalf("Failed to handle subscription event: %v", err) 290 } 291 292 // Verify subscription was created 293 subscription, err := repo.GetSubscription(ctx, userDID, communityDID) 294 if err != nil { 295 t.Fatalf("Failed to get subscription: %v", err) 296 } 297 298 if subscription.UserDID != userDID { 299 t.Errorf("Expected UserDID %s, got %s", userDID, subscription.UserDID) 300 } 301 if subscription.CommunityDID != communityDID { 302 t.Errorf("Expected CommunityDID %s, got %s", communityDID, subscription.CommunityDID) 303 } 304 305 // Verify subscriber count was incremented 306 updated, err := repo.GetByDID(ctx, communityDID) 307 if err != nil { 308 t.Fatalf("Failed to get community: %v", err) 309 } 310 311 if updated.SubscriberCount != 1 { 312 t.Errorf("Expected SubscriberCount 1, got %d", updated.SubscriberCount) 313 } 314 }) 315} 316 317func TestCommunityConsumer_IgnoresNonCommunityEvents(t *testing.T) { 318 db := setupTestDB(t) 319 defer func() { 320 if err := db.Close(); err != nil { 321 t.Logf("Failed to close database: %v", err) 322 } 323 }() 324 325 repo := postgres.NewCommunityRepository(db) 326 // Use mock resolver (though these tests don't create communities, so it won't be called) 327 mockResolver := newMockIdentityResolver() 328 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver) 329 ctx := context.Background() 330 331 t.Run("ignores identity events", func(t *testing.T) { 332 event := &jetstream.JetstreamEvent{ 333 Did: "did:plc:user123", 334 TimeUS: time.Now().UnixMicro(), 335 Kind: "identity", 336 Identity: &jetstream.IdentityEvent{ 337 Did: "did:plc:user123", 338 Handle: "alice.bsky.social", 339 }, 340 } 341 342 err := consumer.HandleEvent(ctx, event) 343 if err != nil { 344 t.Errorf("Expected no error for identity event, got: %v", err) 345 } 346 }) 347 348 t.Run("ignores non-community collections", func(t *testing.T) { 349 event := &jetstream.JetstreamEvent{ 350 Did: "did:plc:user123", 351 TimeUS: time.Now().UnixMicro(), 352 Kind: "commit", 353 Commit: &jetstream.CommitEvent{ 354 Rev: "rev300", 355 Operation: "create", 356 Collection: "app.bsky.communityFeed.post", 357 RKey: "post123", 358 Record: map[string]interface{}{ 359 "text": "Hello world", 360 }, 361 }, 362 } 363 364 err := consumer.HandleEvent(ctx, event) 365 if err != nil { 366 t.Errorf("Expected no error for non-community event, got: %v", err) 367 } 368 }) 369} 370 371// mockIdentityResolver is a test double for identity resolution 372type mockIdentityResolver struct { 373 resolutions map[string]string 374 lastDID string 375 callCount int 376 shouldFail bool 377} 378 379func newMockIdentityResolver() *mockIdentityResolver { 380 return &mockIdentityResolver{ 381 resolutions: make(map[string]string), 382 } 383} 384 385func (m *mockIdentityResolver) Resolve(ctx context.Context, did string) (*identity.Identity, error) { 386 m.callCount++ 387 m.lastDID = did 388 389 if m.shouldFail { 390 return nil, errors.New("mock PLC resolution failure") 391 } 392 393 handle, ok := m.resolutions[did] 394 if !ok { 395 return nil, fmt.Errorf("no resolution configured for DID: %s", did) 396 } 397 398 return &identity.Identity{ 399 DID: did, 400 Handle: handle, 401 PDSURL: "https://pds.example.com", 402 ResolvedAt: time.Now(), 403 Method: identity.MethodHTTPS, 404 }, nil 405} 406 407func TestCommunityConsumer_PLCHandleResolution(t *testing.T) { 408 db := setupTestDB(t) 409 defer func() { 410 if err := db.Close(); err != nil { 411 t.Logf("Failed to close database: %v", err) 412 } 413 }() 414 415 repo := postgres.NewCommunityRepository(db) 416 ctx := context.Background() 417 418 t.Run("resolves handle from PLC successfully", func(t *testing.T) { 419 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 420 communityDID := generateTestDID(uniqueSuffix) 421 communityName := fmt.Sprintf("test-plc-%s", uniqueSuffix) 422 expectedHandle := fmt.Sprintf("%s.community.coves.social", communityName) 423 424 // Create mock resolver 425 mockResolver := newMockIdentityResolver() 426 mockResolver.resolutions[communityDID] = expectedHandle 427 428 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver) 429 430 // Simulate Jetstream event without handle in record 431 event := &jetstream.JetstreamEvent{ 432 Did: communityDID, 433 TimeUS: time.Now().UnixMicro(), 434 Kind: "commit", 435 Commit: &jetstream.CommitEvent{ 436 Rev: "rev123", 437 Operation: "create", 438 Collection: "social.coves.community.profile", 439 RKey: "self", 440 CID: "bafy123abc", 441 Record: map[string]interface{}{ 442 // No handle field - should trigger PLC resolution 443 "name": communityName, 444 "displayName": "Test PLC Community", 445 "description": "Testing PLC resolution", 446 "owner": "did:web:coves.local", 447 "createdBy": "did:plc:user123", 448 "hostedBy": "did:web:coves.local", 449 "visibility": "public", 450 "federation": map[string]interface{}{ 451 "allowExternalDiscovery": true, 452 }, 453 "createdAt": time.Now().Format(time.RFC3339), 454 }, 455 }, 456 } 457 458 // Handle the event 459 if err := consumer.HandleEvent(ctx, event); err != nil { 460 t.Fatalf("Failed to handle event: %v", err) 461 } 462 463 // Verify mock was called 464 if mockResolver.callCount != 1 { 465 t.Errorf("Expected 1 PLC resolution call, got %d", mockResolver.callCount) 466 } 467 if mockResolver.lastDID != communityDID { 468 t.Errorf("Expected PLC resolution for DID %s, got %s", communityDID, mockResolver.lastDID) 469 } 470 471 // Verify community was indexed with PLC-resolved handle 472 community, err := repo.GetByDID(ctx, communityDID) 473 if err != nil { 474 t.Fatalf("Failed to get indexed community: %v", err) 475 } 476 477 if community.Handle != expectedHandle { 478 t.Errorf("Expected handle %s from PLC, got %s", expectedHandle, community.Handle) 479 } 480 }) 481 482 t.Run("fails when PLC resolution fails (no fallback)", func(t *testing.T) { 483 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 484 communityDID := generateTestDID(uniqueSuffix) 485 communityName := fmt.Sprintf("test-plc-fail-%s", uniqueSuffix) 486 487 // Create mock resolver that fails 488 mockResolver := newMockIdentityResolver() 489 mockResolver.shouldFail = true 490 491 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver) 492 493 // Simulate Jetstream event without handle in record 494 event := &jetstream.JetstreamEvent{ 495 Did: communityDID, 496 TimeUS: time.Now().UnixMicro(), 497 Kind: "commit", 498 Commit: &jetstream.CommitEvent{ 499 Rev: "rev456", 500 Operation: "create", 501 Collection: "social.coves.community.profile", 502 RKey: "self", 503 CID: "bafy456def", 504 Record: map[string]interface{}{ 505 "name": communityName, 506 "displayName": "Test PLC Failure", 507 "description": "Testing PLC failure", 508 "owner": "did:web:coves.local", 509 "createdBy": "did:plc:user123", 510 "hostedBy": "did:web:coves.local", 511 "visibility": "public", 512 "federation": map[string]interface{}{ 513 "allowExternalDiscovery": true, 514 }, 515 "createdAt": time.Now().Format(time.RFC3339), 516 }, 517 }, 518 } 519 520 // Handle the event - should fail 521 err := consumer.HandleEvent(ctx, event) 522 if err == nil { 523 t.Fatal("Expected error when PLC resolution fails, got nil") 524 } 525 526 // Verify error message indicates PLC failure 527 expectedErrSubstring := "failed to resolve handle from PLC" 528 if !contains(err.Error(), expectedErrSubstring) { 529 t.Errorf("Expected error containing '%s', got: %v", expectedErrSubstring, err) 530 } 531 532 // Verify community was NOT indexed 533 _, err = repo.GetByDID(ctx, communityDID) 534 if !communities.IsNotFound(err) { 535 t.Errorf("Expected community NOT to be indexed when PLC fails, but got: %v", err) 536 } 537 538 // Verify mock was called (failure happened during resolution, not before) 539 if mockResolver.callCount != 1 { 540 t.Errorf("Expected 1 PLC resolution attempt, got %d", mockResolver.callCount) 541 } 542 }) 543 544 t.Run("test mode rejects invalid hostedBy format", func(t *testing.T) { 545 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 546 communityDID := generateTestDID(uniqueSuffix) 547 communityName := fmt.Sprintf("test-invalid-hosted-%s", uniqueSuffix) 548 549 // No identity resolver (test mode) 550 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil) 551 552 // Event with invalid hostedBy format (not did:web) 553 event := &jetstream.JetstreamEvent{ 554 Did: communityDID, 555 TimeUS: time.Now().UnixMicro(), 556 Kind: "commit", 557 Commit: &jetstream.CommitEvent{ 558 Rev: "rev789", 559 Operation: "create", 560 Collection: "social.coves.community.profile", 561 RKey: "self", 562 CID: "bafy789ghi", 563 Record: map[string]interface{}{ 564 "name": communityName, 565 "displayName": "Test Invalid HostedBy", 566 "description": "Testing validation", 567 "owner": "did:web:coves.local", 568 "createdBy": "did:plc:user123", 569 "hostedBy": "did:plc:invalid", // Invalid format - not did:web 570 "visibility": "public", 571 "federation": map[string]interface{}{ 572 "allowExternalDiscovery": true, 573 }, 574 "createdAt": time.Now().Format(time.RFC3339), 575 }, 576 }, 577 } 578 579 // Handle the event - should fail due to empty handle 580 err := consumer.HandleEvent(ctx, event) 581 if err == nil { 582 t.Fatal("Expected error for invalid hostedBy format in test mode, got nil") 583 } 584 585 // Verify error is about handle being required 586 expectedErrSubstring := "handle is required" 587 if !contains(err.Error(), expectedErrSubstring) { 588 t.Errorf("Expected error containing '%s', got: %v", expectedErrSubstring, err) 589 } 590 }) 591}