A community based topic aggregation platform built on atproto
at main 16 kB view raw
1package integration 2 3import ( 4 "Coves/internal/atproto/jetstream" 5 "Coves/internal/core/communities" 6 "context" 7 "database/sql" 8 "fmt" 9 "testing" 10 "time" 11 12 postgresRepo "Coves/internal/db/postgres" 13) 14 15// TestSubscriptionIndexing_ContentVisibility tests that contentVisibility is properly indexed 16// from Jetstream events and stored in the AppView database 17func TestSubscriptionIndexing_ContentVisibility(t *testing.T) { 18 if testing.Short() { 19 t.Skip("Skipping integration test in short mode") 20 } 21 22 ctx := context.Background() 23 db := setupTestDB(t) 24 defer cleanupTestDB(t, db) 25 26 repo := createTestCommunityRepo(t, db) 27 // Skip verification in tests 28 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs 29 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil) 30 31 // Create a test community first (with unique DID) 32 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 33 community := createTestCommunity(t, repo, "test-community-visibility", testDID) 34 35 t.Run("indexes subscription with contentVisibility=5", func(t *testing.T) { 36 userDID := "did:plc:test-user-123" 37 rkey := "test-sub-1" 38 uri := "at://" + userDID + "/social.coves.community.subscription/" + rkey 39 40 // Simulate Jetstream CREATE event for subscription 41 event := &jetstream.JetstreamEvent{ 42 Did: userDID, 43 Kind: "commit", 44 TimeUS: time.Now().UnixMicro(), 45 Commit: &jetstream.CommitEvent{ 46 Rev: "test-rev-1", 47 Operation: "create", 48 Collection: "social.coves.community.subscription", // CORRECT collection name 49 RKey: rkey, 50 CID: "bafytest123", 51 Record: map[string]interface{}{ 52 "$type": "social.coves.community.subscription", 53 "subject": community.DID, 54 "createdAt": time.Now().Format(time.RFC3339), 55 "contentVisibility": float64(5), // JSON numbers decode as float64 56 }, 57 }, 58 } 59 60 // Process event through consumer 61 err := consumer.HandleEvent(ctx, event) 62 if err != nil { 63 t.Fatalf("Failed to handle subscription event: %v", err) 64 } 65 66 // Verify subscription was indexed with correct contentVisibility 67 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 68 if err != nil { 69 t.Fatalf("Failed to get subscription: %v", err) 70 } 71 72 if subscription.ContentVisibility != 5 { 73 t.Errorf("Expected contentVisibility=5, got %d", subscription.ContentVisibility) 74 } 75 76 if subscription.UserDID != userDID { 77 t.Errorf("Expected userDID=%s, got %s", userDID, subscription.UserDID) 78 } 79 80 if subscription.CommunityDID != community.DID { 81 t.Errorf("Expected communityDID=%s, got %s", community.DID, subscription.CommunityDID) 82 } 83 84 if subscription.RecordURI != uri { 85 t.Errorf("Expected recordURI=%s, got %s", uri, subscription.RecordURI) 86 } 87 88 t.Logf("✓ Subscription indexed with contentVisibility=5") 89 }) 90 91 t.Run("defaults to contentVisibility=3 when not provided", func(t *testing.T) { 92 userDID := "did:plc:test-user-default" 93 rkey := "test-sub-default" 94 95 // Simulate Jetstream CREATE event WITHOUT contentVisibility field 96 event := &jetstream.JetstreamEvent{ 97 Did: userDID, 98 Kind: "commit", 99 TimeUS: time.Now().UnixMicro(), 100 Commit: &jetstream.CommitEvent{ 101 Rev: "test-rev-default", 102 Operation: "create", 103 Collection: "social.coves.community.subscription", 104 RKey: rkey, 105 CID: "bafydefault", 106 Record: map[string]interface{}{ 107 "$type": "social.coves.community.subscription", 108 "subject": community.DID, 109 "createdAt": time.Now().Format(time.RFC3339), 110 // contentVisibility NOT provided 111 }, 112 }, 113 } 114 115 // Process event 116 err := consumer.HandleEvent(ctx, event) 117 if err != nil { 118 t.Fatalf("Failed to handle subscription event: %v", err) 119 } 120 121 // Verify defaults to 3 122 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 123 if err != nil { 124 t.Fatalf("Failed to get subscription: %v", err) 125 } 126 127 if subscription.ContentVisibility != 3 { 128 t.Errorf("Expected contentVisibility=3 (default), got %d", subscription.ContentVisibility) 129 } 130 131 t.Logf("✓ Subscription defaulted to contentVisibility=3") 132 }) 133 134 t.Run("clamps contentVisibility to valid range (1-5)", func(t *testing.T) { 135 testCases := []struct { 136 name string 137 input float64 138 expected int 139 }{ 140 {input: 0, expected: 1, name: "zero clamped to 1"}, 141 {input: -5, expected: 1, name: "negative clamped to 1"}, 142 {input: 10, expected: 5, name: "10 clamped to 5"}, 143 {input: 100, expected: 5, name: "100 clamped to 5"}, 144 {input: 1, expected: 1, name: "1 stays 1"}, 145 {input: 3, expected: 3, name: "3 stays 3"}, 146 {input: 5, expected: 5, name: "5 stays 5"}, 147 } 148 149 for i, tc := range testCases { 150 t.Run(tc.name, func(t *testing.T) { 151 userDID := fmt.Sprintf("did:plc:test-clamp-%d", i) 152 rkey := fmt.Sprintf("test-sub-clamp-%d", i) 153 154 event := &jetstream.JetstreamEvent{ 155 Did: userDID, 156 Kind: "commit", 157 TimeUS: time.Now().UnixMicro(), 158 Commit: &jetstream.CommitEvent{ 159 Rev: "test-rev-clamp", 160 Operation: "create", 161 Collection: "social.coves.community.subscription", 162 RKey: rkey, 163 CID: "bafyclamp", 164 Record: map[string]interface{}{ 165 "$type": "social.coves.community.subscription", 166 "subject": community.DID, 167 "createdAt": time.Now().Format(time.RFC3339), 168 "contentVisibility": tc.input, 169 }, 170 }, 171 } 172 173 err := consumer.HandleEvent(ctx, event) 174 if err != nil { 175 t.Fatalf("Failed to handle subscription event: %v", err) 176 } 177 178 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 179 if err != nil { 180 t.Fatalf("Failed to get subscription: %v", err) 181 } 182 183 if subscription.ContentVisibility != tc.expected { 184 t.Errorf("Input %.0f: expected %d, got %d", tc.input, tc.expected, subscription.ContentVisibility) 185 } 186 187 t.Logf("✓ Input %.0f clamped to %d", tc.input, subscription.ContentVisibility) 188 }) 189 } 190 }) 191 192 t.Run("idempotency: duplicate subscription events don't fail", func(t *testing.T) { 193 userDID := "did:plc:test-idempotent" 194 rkey := "test-sub-idempotent" 195 196 event := &jetstream.JetstreamEvent{ 197 Did: userDID, 198 Kind: "commit", 199 TimeUS: time.Now().UnixMicro(), 200 Commit: &jetstream.CommitEvent{ 201 Rev: "test-rev-idempotent", 202 Operation: "create", 203 Collection: "social.coves.community.subscription", 204 RKey: rkey, 205 CID: "bafyidempotent", 206 Record: map[string]interface{}{ 207 "$type": "social.coves.community.subscription", 208 "subject": community.DID, 209 "createdAt": time.Now().Format(time.RFC3339), 210 "contentVisibility": float64(4), 211 }, 212 }, 213 } 214 215 // Process first time 216 err := consumer.HandleEvent(ctx, event) 217 if err != nil { 218 t.Fatalf("Failed to handle first subscription event: %v", err) 219 } 220 221 // Process again (Jetstream replay scenario) 222 err = consumer.HandleEvent(ctx, event) 223 if err != nil { 224 t.Errorf("Idempotency failed: second event should not error, got: %v", err) 225 } 226 227 // Verify only one subscription exists 228 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 229 if err != nil { 230 t.Fatalf("Failed to get subscription: %v", err) 231 } 232 233 if subscription.ContentVisibility != 4 { 234 t.Errorf("Expected contentVisibility=4, got %d", subscription.ContentVisibility) 235 } 236 237 t.Logf("✓ Duplicate events handled idempotently") 238 }) 239} 240 241// TestSubscriptionIndexing_DeleteOperations tests unsubscribe (DELETE) event handling 242func TestSubscriptionIndexing_DeleteOperations(t *testing.T) { 243 if testing.Short() { 244 t.Skip("Skipping integration test in short mode") 245 } 246 247 ctx := context.Background() 248 db := setupTestDB(t) 249 defer cleanupTestDB(t, db) 250 251 repo := createTestCommunityRepo(t, db) 252 // Skip verification in tests 253 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs 254 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil) 255 256 // Create test community (with unique DID) 257 testDID := fmt.Sprintf("did:plc:test-unsub-%d", time.Now().UnixNano()) 258 community := createTestCommunity(t, repo, "test-unsubscribe", testDID) 259 260 t.Run("deletes subscription when DELETE event received", func(t *testing.T) { 261 userDID := "did:plc:test-user-delete" 262 rkey := "test-sub-delete" 263 264 // First, create a subscription 265 createEvent := &jetstream.JetstreamEvent{ 266 Did: userDID, 267 Kind: "commit", 268 TimeUS: time.Now().UnixMicro(), 269 Commit: &jetstream.CommitEvent{ 270 Rev: "test-rev-create", 271 Operation: "create", 272 Collection: "social.coves.community.subscription", 273 RKey: rkey, 274 CID: "bafycreate", 275 Record: map[string]interface{}{ 276 "$type": "social.coves.community.subscription", 277 "subject": community.DID, 278 "createdAt": time.Now().Format(time.RFC3339), 279 "contentVisibility": float64(3), 280 }, 281 }, 282 } 283 284 err := consumer.HandleEvent(ctx, createEvent) 285 if err != nil { 286 t.Fatalf("Failed to create subscription: %v", err) 287 } 288 289 // Verify subscription exists 290 _, err = repo.GetSubscription(ctx, userDID, community.DID) 291 if err != nil { 292 t.Fatalf("Subscription should exist: %v", err) 293 } 294 295 // Now send DELETE event (unsubscribe) 296 // IMPORTANT: DELETE operations don't include record data in Jetstream 297 deleteEvent := &jetstream.JetstreamEvent{ 298 Did: userDID, 299 Kind: "commit", 300 TimeUS: time.Now().UnixMicro(), 301 Commit: &jetstream.CommitEvent{ 302 Rev: "test-rev-delete", 303 Operation: "delete", 304 Collection: "social.coves.community.subscription", 305 RKey: rkey, 306 CID: "", // No CID on deletes 307 Record: nil, // No record data on deletes 308 }, 309 } 310 311 err = consumer.HandleEvent(ctx, deleteEvent) 312 if err != nil { 313 t.Fatalf("Failed to handle delete event: %v", err) 314 } 315 316 // Verify subscription was deleted 317 _, err = repo.GetSubscription(ctx, userDID, community.DID) 318 if err == nil { 319 t.Errorf("Subscription should have been deleted") 320 } 321 if !communities.IsNotFound(err) { 322 t.Errorf("Expected NotFound error, got: %v", err) 323 } 324 325 t.Logf("✓ Subscription deleted successfully") 326 }) 327 328 t.Run("idempotent delete: deleting non-existent subscription doesn't fail", func(t *testing.T) { 329 userDID := "did:plc:test-user-noexist" 330 rkey := "test-sub-noexist" 331 332 // Try to delete a subscription that doesn't exist 333 deleteEvent := &jetstream.JetstreamEvent{ 334 Did: userDID, 335 Kind: "commit", 336 TimeUS: time.Now().UnixMicro(), 337 Commit: &jetstream.CommitEvent{ 338 Rev: "test-rev-noexist", 339 Operation: "delete", 340 Collection: "social.coves.community.subscription", 341 RKey: rkey, 342 CID: "", 343 Record: nil, 344 }, 345 } 346 347 // Should not error (idempotent) 348 err := consumer.HandleEvent(ctx, deleteEvent) 349 if err != nil { 350 t.Errorf("Deleting non-existent subscription should not error, got: %v", err) 351 } 352 353 t.Logf("✓ Idempotent delete handled gracefully") 354 }) 355} 356 357// TestSubscriptionIndexing_SubscriberCount tests that subscriber counts are updated atomically 358func TestSubscriptionIndexing_SubscriberCount(t *testing.T) { 359 if testing.Short() { 360 t.Skip("Skipping integration test in short mode") 361 } 362 363 ctx := context.Background() 364 db := setupTestDB(t) 365 defer cleanupTestDB(t, db) 366 367 repo := createTestCommunityRepo(t, db) 368 // Skip verification in tests 369 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs 370 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil) 371 372 // Create test community (with unique DID) 373 testDID := fmt.Sprintf("did:plc:test-subcount-%d", time.Now().UnixNano()) 374 community := createTestCommunity(t, repo, "test-subscriber-count", testDID) 375 376 // Verify initial subscriber count is 0 377 comm, err := repo.GetByDID(ctx, community.DID) 378 if err != nil { 379 t.Fatalf("Failed to get community: %v", err) 380 } 381 if comm.SubscriberCount != 0 { 382 t.Errorf("Initial subscriber count should be 0, got %d", comm.SubscriberCount) 383 } 384 385 t.Run("increments subscriber count on subscribe", func(t *testing.T) { 386 userDID := "did:plc:test-user-count1" 387 rkey := "test-sub-count1" 388 389 event := &jetstream.JetstreamEvent{ 390 Did: userDID, 391 Kind: "commit", 392 TimeUS: time.Now().UnixMicro(), 393 Commit: &jetstream.CommitEvent{ 394 Rev: "test-rev-count", 395 Operation: "create", 396 Collection: "social.coves.community.subscription", 397 RKey: rkey, 398 CID: "bafycount", 399 Record: map[string]interface{}{ 400 "$type": "social.coves.community.subscription", 401 "subject": community.DID, 402 "createdAt": time.Now().Format(time.RFC3339), 403 "contentVisibility": float64(3), 404 }, 405 }, 406 } 407 408 err := consumer.HandleEvent(ctx, event) 409 if err != nil { 410 t.Fatalf("Failed to handle subscription: %v", err) 411 } 412 413 // Check subscriber count incremented 414 comm, err := repo.GetByDID(ctx, community.DID) 415 if err != nil { 416 t.Fatalf("Failed to get community: %v", err) 417 } 418 419 if comm.SubscriberCount != 1 { 420 t.Errorf("Subscriber count should be 1, got %d", comm.SubscriberCount) 421 } 422 423 t.Logf("✓ Subscriber count incremented to 1") 424 }) 425 426 t.Run("decrements subscriber count on unsubscribe", func(t *testing.T) { 427 userDID := "did:plc:test-user-count1" // Same user from above 428 rkey := "test-sub-count1" 429 430 // Send DELETE event 431 deleteEvent := &jetstream.JetstreamEvent{ 432 Did: userDID, 433 Kind: "commit", 434 TimeUS: time.Now().UnixMicro(), 435 Commit: &jetstream.CommitEvent{ 436 Rev: "test-rev-unsub", 437 Operation: "delete", 438 Collection: "social.coves.community.subscription", 439 RKey: rkey, 440 CID: "", 441 Record: nil, 442 }, 443 } 444 445 err := consumer.HandleEvent(ctx, deleteEvent) 446 if err != nil { 447 t.Fatalf("Failed to handle unsubscribe: %v", err) 448 } 449 450 // Check subscriber count decremented back to 0 451 comm, err := repo.GetByDID(ctx, community.DID) 452 if err != nil { 453 t.Fatalf("Failed to get community: %v", err) 454 } 455 456 if comm.SubscriberCount != 0 { 457 t.Errorf("Subscriber count should be 0, got %d", comm.SubscriberCount) 458 } 459 460 t.Logf("✓ Subscriber count decremented to 0") 461 }) 462} 463 464// Helper functions 465 466func createTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community { 467 t.Helper() 468 469 // Add timestamp to make handles unique across test runs 470 uniqueHandle := fmt.Sprintf("%s-%d.test.coves.social", name, time.Now().UnixNano()) 471 472 community := &communities.Community{ 473 DID: did, 474 Handle: uniqueHandle, 475 Name: name, 476 DisplayName: "Test Community " + name, 477 Description: "Test community for subscription indexing", 478 OwnerDID: did, 479 CreatedByDID: "did:plc:test-creator", 480 HostedByDID: "did:plc:test-instance", 481 Visibility: "public", 482 CreatedAt: time.Now(), 483 UpdatedAt: time.Now(), 484 } 485 486 created, err := repo.Create(context.Background(), community) 487 if err != nil { 488 t.Fatalf("Failed to create test community: %v", err) 489 } 490 491 return created 492} 493 494func createTestCommunityRepo(t *testing.T, db interface{}) communities.Repository { 495 t.Helper() 496 // Import the postgres package to create a repo 497 return postgresRepo.NewCommunityRepository(db.(*sql.DB)) 498} 499 500func cleanupTestDB(t *testing.T, db interface{}) { 501 t.Helper() 502 sqlDB := db.(*sql.DB) 503 if err := sqlDB.Close(); err != nil { 504 t.Logf("Failed to close database: %v", err) 505 } 506}