A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/atproto/identity" 5 "Coves/internal/atproto/jetstream" 6 "Coves/internal/core/communities" 7 "Coves/internal/db/postgres" 8 "context" 9 "errors" 10 "fmt" 11 "testing" 12 "time" 13) 14 15func TestCommunityConsumer_HandleCommunityProfile(t *testing.T) { 16 db := setupTestDB(t) 17 defer func() { 18 if err := db.Close(); err != nil { 19 t.Logf("Failed to close database: %v", err) 20 } 21 }() 22 23 repo := postgres.NewCommunityRepository(db) 24 ctx := context.Background() 25 26 t.Run("creates community from firehose event", func(t *testing.T) { 27 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 28 communityDID := generateTestDID(uniqueSuffix) 29 communityName := fmt.Sprintf("test-community-%s", uniqueSuffix) 30 expectedHandle := fmt.Sprintf("%s.community.coves.local", communityName) 31 32 // Set up mock resolver for this test DID 33 mockResolver := newMockIdentityResolver() 34 mockResolver.resolutions[communityDID] = expectedHandle 35 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver) 36 37 // Simulate a Jetstream commit event 38 event := &jetstream.JetstreamEvent{ 39 Did: communityDID, 40 TimeUS: time.Now().UnixMicro(), 41 Kind: "commit", 42 Commit: &jetstream.CommitEvent{ 43 Rev: "rev123", 44 Operation: "create", 45 Collection: "social.coves.community.profile", 46 RKey: "self", 47 CID: "bafy123abc", 48 Record: map[string]interface{}{ 49 // Note: No 'did', 'handle', 'memberCount', or 'subscriberCount' in record 50 // These are resolved/computed by AppView, not stored in immutable records 51 "name": communityName, 52 "displayName": "Test Community", 53 "description": "A test community", 54 "owner": "did:web:coves.local", 55 "createdBy": "did:plc:user123", 56 "hostedBy": "did:web:coves.local", 57 "visibility": "public", 58 "federation": map[string]interface{}{ 59 "allowExternalDiscovery": true, 60 }, 61 "createdAt": time.Now().Format(time.RFC3339), 62 }, 63 }, 64 } 65 66 // Handle the event 67 if err := consumer.HandleEvent(ctx, event); err != nil { 68 t.Fatalf("Failed to handle event: %v", err) 69 } 70 71 // Verify community was indexed 72 community, err := repo.GetByDID(ctx, communityDID) 73 if err != nil { 74 t.Fatalf("Failed to get indexed community: %v", err) 75 } 76 77 if community.DID != communityDID { 78 t.Errorf("Expected DID %s, got %s", communityDID, community.DID) 79 } 80 if community.DisplayName != "Test Community" { 81 t.Errorf("Expected DisplayName 'Test Community', got %s", community.DisplayName) 82 } 83 if community.Visibility != "public" { 84 t.Errorf("Expected Visibility 'public', got %s", community.Visibility) 85 } 86 }) 87 88 t.Run("updates existing community", func(t *testing.T) { 89 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 90 communityDID := generateTestDID(uniqueSuffix) 91 communityName := fmt.Sprintf("update-test-%s", uniqueSuffix) 92 expectedHandle := fmt.Sprintf("%s.community.coves.local", communityName) 93 94 // Set up mock resolver for this test DID 95 mockResolver := newMockIdentityResolver() 96 mockResolver.resolutions[communityDID] = expectedHandle 97 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver) 98 99 // Create initial community 100 initialCommunity := &communities.Community{ 101 DID: communityDID, 102 Handle: expectedHandle, 103 Name: communityName, 104 DisplayName: "Original Name", 105 Description: "Original description", 106 OwnerDID: "did:web:coves.local", 107 CreatedByDID: "did:plc:user123", 108 HostedByDID: "did:web:coves.local", 109 Visibility: "public", 110 AllowExternalDiscovery: true, 111 CreatedAt: time.Now(), 112 UpdatedAt: time.Now(), 113 } 114 115 if _, err := repo.Create(ctx, initialCommunity); err != nil { 116 t.Fatalf("Failed to create initial community: %v", err) 117 } 118 119 // Simulate update event 120 updateEvent := &jetstream.JetstreamEvent{ 121 Did: communityDID, 122 TimeUS: time.Now().UnixMicro(), 123 Kind: "commit", 124 Commit: &jetstream.CommitEvent{ 125 Rev: "rev124", 126 Operation: "update", 127 Collection: "social.coves.community.profile", 128 RKey: "self", 129 CID: "bafy456def", 130 Record: map[string]interface{}{ 131 // Note: No 'did', 'handle', 'memberCount', or 'subscriberCount' in record 132 // These are resolved/computed by AppView, not stored in immutable records 133 "name": "update-test", 134 "displayName": "Updated Name", 135 "description": "Updated description", 136 "owner": "did:web:coves.local", 137 "createdBy": "did:plc:user123", 138 "hostedBy": "did:web:coves.local", 139 "visibility": "unlisted", 140 "federation": map[string]interface{}{ 141 "allowExternalDiscovery": false, 142 }, 143 "createdAt": time.Now().Format(time.RFC3339), 144 }, 145 }, 146 } 147 148 // Handle the update 149 if err := consumer.HandleEvent(ctx, updateEvent); err != nil { 150 t.Fatalf("Failed to handle update event: %v", err) 151 } 152 153 // Verify community was updated 154 updated, err := repo.GetByDID(ctx, communityDID) 155 if err != nil { 156 t.Fatalf("Failed to get updated community: %v", err) 157 } 158 159 if updated.DisplayName != "Updated Name" { 160 t.Errorf("Expected DisplayName 'Updated Name', got %s", updated.DisplayName) 161 } 162 if updated.Description != "Updated description" { 163 t.Errorf("Expected Description 'Updated description', got %s", updated.Description) 164 } 165 if updated.Visibility != "unlisted" { 166 t.Errorf("Expected Visibility 'unlisted', got %s", updated.Visibility) 167 } 168 if updated.AllowExternalDiscovery { 169 t.Error("Expected AllowExternalDiscovery to be false") 170 } 171 }) 172 173 t.Run("deletes community", func(t *testing.T) { 174 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 175 communityDID := generateTestDID(uniqueSuffix) 176 communityName := fmt.Sprintf("delete-test-%s", uniqueSuffix) 177 expectedHandle := fmt.Sprintf("%s.community.coves.local", communityName) 178 179 // Set up mock resolver for this test DID 180 mockResolver := newMockIdentityResolver() 181 mockResolver.resolutions[communityDID] = expectedHandle 182 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver) 183 184 // Create community to delete 185 community := &communities.Community{ 186 DID: communityDID, 187 Handle: expectedHandle, 188 Name: communityName, 189 OwnerDID: "did:web:coves.local", 190 CreatedByDID: "did:plc:user123", 191 HostedByDID: "did:web:coves.local", 192 Visibility: "public", 193 CreatedAt: time.Now(), 194 UpdatedAt: time.Now(), 195 } 196 197 if _, err := repo.Create(ctx, community); err != nil { 198 t.Fatalf("Failed to create community: %v", err) 199 } 200 201 // Simulate delete event 202 deleteEvent := &jetstream.JetstreamEvent{ 203 Did: communityDID, 204 TimeUS: time.Now().UnixMicro(), 205 Kind: "commit", 206 Commit: &jetstream.CommitEvent{ 207 Rev: "rev125", 208 Operation: "delete", 209 Collection: "social.coves.community.profile", 210 RKey: "self", 211 }, 212 } 213 214 // Handle the delete 215 if err := consumer.HandleEvent(ctx, deleteEvent); err != nil { 216 t.Fatalf("Failed to handle delete event: %v", err) 217 } 218 219 // Verify community was deleted 220 if _, err := repo.GetByDID(ctx, communityDID); err != communities.ErrCommunityNotFound { 221 t.Errorf("Expected ErrCommunityNotFound, got: %v", err) 222 } 223 }) 224} 225 226func TestCommunityConsumer_HandleSubscription(t *testing.T) { 227 db := setupTestDB(t) 228 defer func() { 229 if err := db.Close(); err != nil { 230 t.Logf("Failed to close database: %v", err) 231 } 232 }() 233 234 repo := postgres.NewCommunityRepository(db) 235 ctx := context.Background() 236 237 t.Run("creates subscription from event", func(t *testing.T) { 238 // Create a community first 239 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 240 communityDID := generateTestDID(uniqueSuffix) 241 communityName := fmt.Sprintf("sub-test-%s", uniqueSuffix) 242 expectedHandle := fmt.Sprintf("%s.community.coves.local", communityName) 243 244 // Set up mock resolver for this test DID 245 mockResolver := newMockIdentityResolver() 246 mockResolver.resolutions[communityDID] = expectedHandle 247 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver) 248 249 community := &communities.Community{ 250 DID: communityDID, 251 Handle: expectedHandle, 252 Name: communityName, 253 OwnerDID: "did:web:coves.local", 254 CreatedByDID: "did:plc:user123", 255 HostedByDID: "did:web:coves.local", 256 Visibility: "public", 257 CreatedAt: time.Now(), 258 UpdatedAt: time.Now(), 259 } 260 261 if _, err := repo.Create(ctx, community); err != nil { 262 t.Fatalf("Failed to create community: %v", err) 263 } 264 265 // Simulate subscription event 266 // IMPORTANT: Use correct collection name (record type, not XRPC procedure) 267 userDID := "did:plc:subscriber123" 268 subEvent := &jetstream.JetstreamEvent{ 269 Did: userDID, 270 TimeUS: time.Now().UnixMicro(), 271 Kind: "commit", 272 Commit: &jetstream.CommitEvent{ 273 Rev: "rev200", 274 Operation: "create", 275 Collection: "social.coves.community.subscription", // Updated to communities namespace 276 RKey: "sub123", 277 CID: "bafy789ghi", 278 Record: map[string]interface{}{ 279 "subject": communityDID, // Using 'subject' per atProto conventions 280 "contentVisibility": 3, 281 "createdAt": time.Now().Format(time.RFC3339), 282 }, 283 }, 284 } 285 286 // Handle the subscription 287 if err := consumer.HandleEvent(ctx, subEvent); err != nil { 288 t.Fatalf("Failed to handle subscription event: %v", err) 289 } 290 291 // Verify subscription was created 292 subscription, err := repo.GetSubscription(ctx, userDID, communityDID) 293 if err != nil { 294 t.Fatalf("Failed to get subscription: %v", err) 295 } 296 297 if subscription.UserDID != userDID { 298 t.Errorf("Expected UserDID %s, got %s", userDID, subscription.UserDID) 299 } 300 if subscription.CommunityDID != communityDID { 301 t.Errorf("Expected CommunityDID %s, got %s", communityDID, subscription.CommunityDID) 302 } 303 304 // Verify subscriber count was incremented 305 updated, err := repo.GetByDID(ctx, communityDID) 306 if err != nil { 307 t.Fatalf("Failed to get community: %v", err) 308 } 309 310 if updated.SubscriberCount != 1 { 311 t.Errorf("Expected SubscriberCount 1, got %d", updated.SubscriberCount) 312 } 313 }) 314} 315 316func TestCommunityConsumer_IgnoresNonCommunityEvents(t *testing.T) { 317 db := setupTestDB(t) 318 defer func() { 319 if err := db.Close(); err != nil { 320 t.Logf("Failed to close database: %v", err) 321 } 322 }() 323 324 repo := postgres.NewCommunityRepository(db) 325 // Use mock resolver (though these tests don't create communities, so it won't be called) 326 mockResolver := newMockIdentityResolver() 327 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver) 328 ctx := context.Background() 329 330 t.Run("ignores identity events", func(t *testing.T) { 331 event := &jetstream.JetstreamEvent{ 332 Did: "did:plc:user123", 333 TimeUS: time.Now().UnixMicro(), 334 Kind: "identity", 335 Identity: &jetstream.IdentityEvent{ 336 Did: "did:plc:user123", 337 Handle: "alice.bsky.social", 338 }, 339 } 340 341 err := consumer.HandleEvent(ctx, event) 342 if err != nil { 343 t.Errorf("Expected no error for identity event, got: %v", err) 344 } 345 }) 346 347 t.Run("ignores non-community collections", func(t *testing.T) { 348 event := &jetstream.JetstreamEvent{ 349 Did: "did:plc:user123", 350 TimeUS: time.Now().UnixMicro(), 351 Kind: "commit", 352 Commit: &jetstream.CommitEvent{ 353 Rev: "rev300", 354 Operation: "create", 355 Collection: "app.bsky.communityFeed.post", 356 RKey: "post123", 357 Record: map[string]interface{}{ 358 "text": "Hello world", 359 }, 360 }, 361 } 362 363 err := consumer.HandleEvent(ctx, event) 364 if err != nil { 365 t.Errorf("Expected no error for non-community event, got: %v", err) 366 } 367 }) 368} 369 370// mockIdentityResolver is a test double for identity resolution 371type mockIdentityResolver struct { 372 resolutions map[string]string 373 lastDID string 374 callCount int 375 shouldFail bool 376} 377 378func newMockIdentityResolver() *mockIdentityResolver { 379 return &mockIdentityResolver{ 380 resolutions: make(map[string]string), 381 } 382} 383 384func (m *mockIdentityResolver) Resolve(ctx context.Context, did string) (*identity.Identity, error) { 385 m.callCount++ 386 m.lastDID = did 387 388 if m.shouldFail { 389 return nil, errors.New("mock PLC resolution failure") 390 } 391 392 handle, ok := m.resolutions[did] 393 if !ok { 394 return nil, fmt.Errorf("no resolution configured for DID: %s", did) 395 } 396 397 return &identity.Identity{ 398 DID: did, 399 Handle: handle, 400 PDSURL: "https://pds.example.com", 401 ResolvedAt: time.Now(), 402 Method: identity.MethodHTTPS, 403 }, nil 404} 405 406func TestCommunityConsumer_PLCHandleResolution(t *testing.T) { 407 db := setupTestDB(t) 408 defer func() { 409 if err := db.Close(); err != nil { 410 t.Logf("Failed to close database: %v", err) 411 } 412 }() 413 414 repo := postgres.NewCommunityRepository(db) 415 ctx := context.Background() 416 417 t.Run("resolves handle from PLC successfully", func(t *testing.T) { 418 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 419 communityDID := generateTestDID(uniqueSuffix) 420 communityName := fmt.Sprintf("test-plc-%s", uniqueSuffix) 421 expectedHandle := fmt.Sprintf("%s.community.coves.social", communityName) 422 423 // Create mock resolver 424 mockResolver := newMockIdentityResolver() 425 mockResolver.resolutions[communityDID] = expectedHandle 426 427 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver) 428 429 // Simulate Jetstream event without handle in record 430 event := &jetstream.JetstreamEvent{ 431 Did: communityDID, 432 TimeUS: time.Now().UnixMicro(), 433 Kind: "commit", 434 Commit: &jetstream.CommitEvent{ 435 Rev: "rev123", 436 Operation: "create", 437 Collection: "social.coves.community.profile", 438 RKey: "self", 439 CID: "bafy123abc", 440 Record: map[string]interface{}{ 441 // No handle field - should trigger PLC resolution 442 "name": communityName, 443 "displayName": "Test PLC Community", 444 "description": "Testing PLC resolution", 445 "owner": "did:web:coves.local", 446 "createdBy": "did:plc:user123", 447 "hostedBy": "did:web:coves.local", 448 "visibility": "public", 449 "federation": map[string]interface{}{ 450 "allowExternalDiscovery": true, 451 }, 452 "createdAt": time.Now().Format(time.RFC3339), 453 }, 454 }, 455 } 456 457 // Handle the event 458 if err := consumer.HandleEvent(ctx, event); err != nil { 459 t.Fatalf("Failed to handle event: %v", err) 460 } 461 462 // Verify mock was called 463 if mockResolver.callCount != 1 { 464 t.Errorf("Expected 1 PLC resolution call, got %d", mockResolver.callCount) 465 } 466 if mockResolver.lastDID != communityDID { 467 t.Errorf("Expected PLC resolution for DID %s, got %s", communityDID, mockResolver.lastDID) 468 } 469 470 // Verify community was indexed with PLC-resolved handle 471 community, err := repo.GetByDID(ctx, communityDID) 472 if err != nil { 473 t.Fatalf("Failed to get indexed community: %v", err) 474 } 475 476 if community.Handle != expectedHandle { 477 t.Errorf("Expected handle %s from PLC, got %s", expectedHandle, community.Handle) 478 } 479 }) 480 481 t.Run("fails when PLC resolution fails (no fallback)", func(t *testing.T) { 482 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 483 communityDID := generateTestDID(uniqueSuffix) 484 communityName := fmt.Sprintf("test-plc-fail-%s", uniqueSuffix) 485 486 // Create mock resolver that fails 487 mockResolver := newMockIdentityResolver() 488 mockResolver.shouldFail = true 489 490 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver) 491 492 // Simulate Jetstream event without handle in record 493 event := &jetstream.JetstreamEvent{ 494 Did: communityDID, 495 TimeUS: time.Now().UnixMicro(), 496 Kind: "commit", 497 Commit: &jetstream.CommitEvent{ 498 Rev: "rev456", 499 Operation: "create", 500 Collection: "social.coves.community.profile", 501 RKey: "self", 502 CID: "bafy456def", 503 Record: map[string]interface{}{ 504 "name": communityName, 505 "displayName": "Test PLC Failure", 506 "description": "Testing PLC failure", 507 "owner": "did:web:coves.local", 508 "createdBy": "did:plc:user123", 509 "hostedBy": "did:web:coves.local", 510 "visibility": "public", 511 "federation": map[string]interface{}{ 512 "allowExternalDiscovery": true, 513 }, 514 "createdAt": time.Now().Format(time.RFC3339), 515 }, 516 }, 517 } 518 519 // Handle the event - should fail 520 err := consumer.HandleEvent(ctx, event) 521 if err == nil { 522 t.Fatal("Expected error when PLC resolution fails, got nil") 523 } 524 525 // Verify error message indicates PLC failure 526 expectedErrSubstring := "failed to resolve handle from PLC" 527 if !contains(err.Error(), expectedErrSubstring) { 528 t.Errorf("Expected error containing '%s', got: %v", expectedErrSubstring, err) 529 } 530 531 // Verify community was NOT indexed 532 _, err = repo.GetByDID(ctx, communityDID) 533 if !communities.IsNotFound(err) { 534 t.Errorf("Expected community NOT to be indexed when PLC fails, but got: %v", err) 535 } 536 537 // Verify mock was called (failure happened during resolution, not before) 538 if mockResolver.callCount != 1 { 539 t.Errorf("Expected 1 PLC resolution attempt, got %d", mockResolver.callCount) 540 } 541 }) 542 543 t.Run("test mode rejects invalid hostedBy format", func(t *testing.T) { 544 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 545 communityDID := generateTestDID(uniqueSuffix) 546 communityName := fmt.Sprintf("test-invalid-hosted-%s", uniqueSuffix) 547 548 // No identity resolver (test mode) 549 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil) 550 551 // Event with invalid hostedBy format (not did:web) 552 event := &jetstream.JetstreamEvent{ 553 Did: communityDID, 554 TimeUS: time.Now().UnixMicro(), 555 Kind: "commit", 556 Commit: &jetstream.CommitEvent{ 557 Rev: "rev789", 558 Operation: "create", 559 Collection: "social.coves.community.profile", 560 RKey: "self", 561 CID: "bafy789ghi", 562 Record: map[string]interface{}{ 563 "name": communityName, 564 "displayName": "Test Invalid HostedBy", 565 "description": "Testing validation", 566 "owner": "did:web:coves.local", 567 "createdBy": "did:plc:user123", 568 "hostedBy": "did:plc:invalid", // Invalid format - not did:web 569 "visibility": "public", 570 "federation": map[string]interface{}{ 571 "allowExternalDiscovery": true, 572 }, 573 "createdAt": time.Now().Format(time.RFC3339), 574 }, 575 }, 576 } 577 578 // Handle the event - should fail due to empty handle 579 err := consumer.HandleEvent(ctx, event) 580 if err == nil { 581 t.Fatal("Expected error for invalid hostedBy format in test mode, got nil") 582 } 583 584 // Verify error is about handle being required 585 expectedErrSubstring := "handle is required" 586 if !contains(err.Error(), expectedErrSubstring) { 587 t.Errorf("Expected error containing '%s', got: %v", expectedErrSubstring, err) 588 } 589 }) 590}