A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "testing" 8 "time" 9 10 "Coves/internal/atproto/jetstream" 11 "Coves/internal/core/communities" 12 13 postgresRepo "Coves/internal/db/postgres" 14) 15 16// TestSubscriptionIndexing_ContentVisibility tests that contentVisibility is properly indexed 17// from Jetstream events and stored in the AppView database 18func TestSubscriptionIndexing_ContentVisibility(t *testing.T) { 19 if testing.Short() { 20 t.Skip("Skipping integration test in short mode") 21 } 22 23 ctx := context.Background() 24 db := setupTestDB(t) 25 defer cleanupTestDB(t, db) 26 27 repo := createTestCommunityRepo(t, db) 28 // Skip verification in tests 29 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs 30 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil) 31 32 // Create a test community first (with unique DID) 33 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 34 community := createTestCommunity(t, repo, "test-community-visibility", testDID) 35 36 t.Run("indexes subscription with contentVisibility=5", func(t *testing.T) { 37 userDID := "did:plc:test-user-123" 38 rkey := "test-sub-1" 39 uri := "at://" + userDID + "/social.coves.community.subscription/" + rkey 40 41 // Simulate Jetstream CREATE event for subscription 42 event := &jetstream.JetstreamEvent{ 43 Did: userDID, 44 Kind: "commit", 45 TimeUS: time.Now().UnixMicro(), 46 Commit: &jetstream.CommitEvent{ 47 Rev: "test-rev-1", 48 Operation: "create", 49 Collection: "social.coves.community.subscription", // CORRECT collection name 50 RKey: rkey, 51 CID: "bafytest123", 52 Record: map[string]interface{}{ 53 "$type": "social.coves.community.subscription", 54 "subject": community.DID, 55 "createdAt": time.Now().Format(time.RFC3339), 56 "contentVisibility": float64(5), // JSON numbers decode as float64 57 }, 58 }, 59 } 60 61 // Process event through consumer 62 err := consumer.HandleEvent(ctx, event) 63 if err != nil { 64 t.Fatalf("Failed to handle subscription event: %v", err) 65 } 66 67 // Verify subscription was indexed with correct contentVisibility 68 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 69 if err != nil { 70 t.Fatalf("Failed to get subscription: %v", err) 71 } 72 73 if subscription.ContentVisibility != 5 { 74 t.Errorf("Expected contentVisibility=5, got %d", subscription.ContentVisibility) 75 } 76 77 if subscription.UserDID != userDID { 78 t.Errorf("Expected userDID=%s, got %s", userDID, subscription.UserDID) 79 } 80 81 if subscription.CommunityDID != community.DID { 82 t.Errorf("Expected communityDID=%s, got %s", community.DID, subscription.CommunityDID) 83 } 84 85 if subscription.RecordURI != uri { 86 t.Errorf("Expected recordURI=%s, got %s", uri, subscription.RecordURI) 87 } 88 89 t.Logf("✓ Subscription indexed with contentVisibility=5") 90 }) 91 92 t.Run("defaults to contentVisibility=3 when not provided", func(t *testing.T) { 93 userDID := "did:plc:test-user-default" 94 rkey := "test-sub-default" 95 96 // Simulate Jetstream CREATE event WITHOUT contentVisibility field 97 event := &jetstream.JetstreamEvent{ 98 Did: userDID, 99 Kind: "commit", 100 TimeUS: time.Now().UnixMicro(), 101 Commit: &jetstream.CommitEvent{ 102 Rev: "test-rev-default", 103 Operation: "create", 104 Collection: "social.coves.community.subscription", 105 RKey: rkey, 106 CID: "bafydefault", 107 Record: map[string]interface{}{ 108 "$type": "social.coves.community.subscription", 109 "subject": community.DID, 110 "createdAt": time.Now().Format(time.RFC3339), 111 // contentVisibility NOT provided 112 }, 113 }, 114 } 115 116 // Process event 117 err := consumer.HandleEvent(ctx, event) 118 if err != nil { 119 t.Fatalf("Failed to handle subscription event: %v", err) 120 } 121 122 // Verify defaults to 3 123 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 124 if err != nil { 125 t.Fatalf("Failed to get subscription: %v", err) 126 } 127 128 if subscription.ContentVisibility != 3 { 129 t.Errorf("Expected contentVisibility=3 (default), got %d", subscription.ContentVisibility) 130 } 131 132 t.Logf("✓ Subscription defaulted to contentVisibility=3") 133 }) 134 135 t.Run("clamps contentVisibility to valid range (1-5)", func(t *testing.T) { 136 testCases := []struct { 137 name string 138 input float64 139 expected int 140 }{ 141 {input: 0, expected: 1, name: "zero clamped to 1"}, 142 {input: -5, expected: 1, name: "negative clamped to 1"}, 143 {input: 10, expected: 5, name: "10 clamped to 5"}, 144 {input: 100, expected: 5, name: "100 clamped to 5"}, 145 {input: 1, expected: 1, name: "1 stays 1"}, 146 {input: 3, expected: 3, name: "3 stays 3"}, 147 {input: 5, expected: 5, name: "5 stays 5"}, 148 } 149 150 for i, tc := range testCases { 151 t.Run(tc.name, func(t *testing.T) { 152 userDID := fmt.Sprintf("did:plc:test-clamp-%d", i) 153 rkey := fmt.Sprintf("test-sub-clamp-%d", i) 154 155 event := &jetstream.JetstreamEvent{ 156 Did: userDID, 157 Kind: "commit", 158 TimeUS: time.Now().UnixMicro(), 159 Commit: &jetstream.CommitEvent{ 160 Rev: "test-rev-clamp", 161 Operation: "create", 162 Collection: "social.coves.community.subscription", 163 RKey: rkey, 164 CID: "bafyclamp", 165 Record: map[string]interface{}{ 166 "$type": "social.coves.community.subscription", 167 "subject": community.DID, 168 "createdAt": time.Now().Format(time.RFC3339), 169 "contentVisibility": tc.input, 170 }, 171 }, 172 } 173 174 err := consumer.HandleEvent(ctx, event) 175 if err != nil { 176 t.Fatalf("Failed to handle subscription event: %v", err) 177 } 178 179 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 180 if err != nil { 181 t.Fatalf("Failed to get subscription: %v", err) 182 } 183 184 if subscription.ContentVisibility != tc.expected { 185 t.Errorf("Input %.0f: expected %d, got %d", tc.input, tc.expected, subscription.ContentVisibility) 186 } 187 188 t.Logf("✓ Input %.0f clamped to %d", tc.input, subscription.ContentVisibility) 189 }) 190 } 191 }) 192 193 t.Run("idempotency: duplicate subscription events don't fail", func(t *testing.T) { 194 userDID := "did:plc:test-idempotent" 195 rkey := "test-sub-idempotent" 196 197 event := &jetstream.JetstreamEvent{ 198 Did: userDID, 199 Kind: "commit", 200 TimeUS: time.Now().UnixMicro(), 201 Commit: &jetstream.CommitEvent{ 202 Rev: "test-rev-idempotent", 203 Operation: "create", 204 Collection: "social.coves.community.subscription", 205 RKey: rkey, 206 CID: "bafyidempotent", 207 Record: map[string]interface{}{ 208 "$type": "social.coves.community.subscription", 209 "subject": community.DID, 210 "createdAt": time.Now().Format(time.RFC3339), 211 "contentVisibility": float64(4), 212 }, 213 }, 214 } 215 216 // Process first time 217 err := consumer.HandleEvent(ctx, event) 218 if err != nil { 219 t.Fatalf("Failed to handle first subscription event: %v", err) 220 } 221 222 // Process again (Jetstream replay scenario) 223 err = consumer.HandleEvent(ctx, event) 224 if err != nil { 225 t.Errorf("Idempotency failed: second event should not error, got: %v", err) 226 } 227 228 // Verify only one subscription exists 229 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 230 if err != nil { 231 t.Fatalf("Failed to get subscription: %v", err) 232 } 233 234 if subscription.ContentVisibility != 4 { 235 t.Errorf("Expected contentVisibility=4, got %d", subscription.ContentVisibility) 236 } 237 238 t.Logf("✓ Duplicate events handled idempotently") 239 }) 240} 241 242// TestSubscriptionIndexing_DeleteOperations tests unsubscribe (DELETE) event handling 243func TestSubscriptionIndexing_DeleteOperations(t *testing.T) { 244 if testing.Short() { 245 t.Skip("Skipping integration test in short mode") 246 } 247 248 ctx := context.Background() 249 db := setupTestDB(t) 250 defer cleanupTestDB(t, db) 251 252 repo := createTestCommunityRepo(t, db) 253 // Skip verification in tests 254 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs 255 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil) 256 257 // Create test community (with unique DID) 258 testDID := fmt.Sprintf("did:plc:test-unsub-%d", time.Now().UnixNano()) 259 community := createTestCommunity(t, repo, "test-unsubscribe", testDID) 260 261 t.Run("deletes subscription when DELETE event received", func(t *testing.T) { 262 userDID := "did:plc:test-user-delete" 263 rkey := "test-sub-delete" 264 265 // First, create a subscription 266 createEvent := &jetstream.JetstreamEvent{ 267 Did: userDID, 268 Kind: "commit", 269 TimeUS: time.Now().UnixMicro(), 270 Commit: &jetstream.CommitEvent{ 271 Rev: "test-rev-create", 272 Operation: "create", 273 Collection: "social.coves.community.subscription", 274 RKey: rkey, 275 CID: "bafycreate", 276 Record: map[string]interface{}{ 277 "$type": "social.coves.community.subscription", 278 "subject": community.DID, 279 "createdAt": time.Now().Format(time.RFC3339), 280 "contentVisibility": float64(3), 281 }, 282 }, 283 } 284 285 err := consumer.HandleEvent(ctx, createEvent) 286 if err != nil { 287 t.Fatalf("Failed to create subscription: %v", err) 288 } 289 290 // Verify subscription exists 291 _, err = repo.GetSubscription(ctx, userDID, community.DID) 292 if err != nil { 293 t.Fatalf("Subscription should exist: %v", err) 294 } 295 296 // Now send DELETE event (unsubscribe) 297 // IMPORTANT: DELETE operations don't include record data in Jetstream 298 deleteEvent := &jetstream.JetstreamEvent{ 299 Did: userDID, 300 Kind: "commit", 301 TimeUS: time.Now().UnixMicro(), 302 Commit: &jetstream.CommitEvent{ 303 Rev: "test-rev-delete", 304 Operation: "delete", 305 Collection: "social.coves.community.subscription", 306 RKey: rkey, 307 CID: "", // No CID on deletes 308 Record: nil, // No record data on deletes 309 }, 310 } 311 312 err = consumer.HandleEvent(ctx, deleteEvent) 313 if err != nil { 314 t.Fatalf("Failed to handle delete event: %v", err) 315 } 316 317 // Verify subscription was deleted 318 _, err = repo.GetSubscription(ctx, userDID, community.DID) 319 if err == nil { 320 t.Errorf("Subscription should have been deleted") 321 } 322 if !communities.IsNotFound(err) { 323 t.Errorf("Expected NotFound error, got: %v", err) 324 } 325 326 t.Logf("✓ Subscription deleted successfully") 327 }) 328 329 t.Run("idempotent delete: deleting non-existent subscription doesn't fail", func(t *testing.T) { 330 userDID := "did:plc:test-user-noexist" 331 rkey := "test-sub-noexist" 332 333 // Try to delete a subscription that doesn't exist 334 deleteEvent := &jetstream.JetstreamEvent{ 335 Did: userDID, 336 Kind: "commit", 337 TimeUS: time.Now().UnixMicro(), 338 Commit: &jetstream.CommitEvent{ 339 Rev: "test-rev-noexist", 340 Operation: "delete", 341 Collection: "social.coves.community.subscription", 342 RKey: rkey, 343 CID: "", 344 Record: nil, 345 }, 346 } 347 348 // Should not error (idempotent) 349 err := consumer.HandleEvent(ctx, deleteEvent) 350 if err != nil { 351 t.Errorf("Deleting non-existent subscription should not error, got: %v", err) 352 } 353 354 t.Logf("✓ Idempotent delete handled gracefully") 355 }) 356} 357 358// TestSubscriptionIndexing_SubscriberCount tests that subscriber counts are updated atomically 359func TestSubscriptionIndexing_SubscriberCount(t *testing.T) { 360 if testing.Short() { 361 t.Skip("Skipping integration test in short mode") 362 } 363 364 ctx := context.Background() 365 db := setupTestDB(t) 366 defer cleanupTestDB(t, db) 367 368 repo := createTestCommunityRepo(t, db) 369 // Skip verification in tests 370 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs 371 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil) 372 373 // Create test community (with unique DID) 374 testDID := fmt.Sprintf("did:plc:test-subcount-%d", time.Now().UnixNano()) 375 community := createTestCommunity(t, repo, "test-subscriber-count", testDID) 376 377 // Verify initial subscriber count is 0 378 comm, err := repo.GetByDID(ctx, community.DID) 379 if err != nil { 380 t.Fatalf("Failed to get community: %v", err) 381 } 382 if comm.SubscriberCount != 0 { 383 t.Errorf("Initial subscriber count should be 0, got %d", comm.SubscriberCount) 384 } 385 386 t.Run("increments subscriber count on subscribe", func(t *testing.T) { 387 userDID := "did:plc:test-user-count1" 388 rkey := "test-sub-count1" 389 390 event := &jetstream.JetstreamEvent{ 391 Did: userDID, 392 Kind: "commit", 393 TimeUS: time.Now().UnixMicro(), 394 Commit: &jetstream.CommitEvent{ 395 Rev: "test-rev-count", 396 Operation: "create", 397 Collection: "social.coves.community.subscription", 398 RKey: rkey, 399 CID: "bafycount", 400 Record: map[string]interface{}{ 401 "$type": "social.coves.community.subscription", 402 "subject": community.DID, 403 "createdAt": time.Now().Format(time.RFC3339), 404 "contentVisibility": float64(3), 405 }, 406 }, 407 } 408 409 err := consumer.HandleEvent(ctx, event) 410 if err != nil { 411 t.Fatalf("Failed to handle subscription: %v", err) 412 } 413 414 // Check subscriber count incremented 415 comm, err := repo.GetByDID(ctx, community.DID) 416 if err != nil { 417 t.Fatalf("Failed to get community: %v", err) 418 } 419 420 if comm.SubscriberCount != 1 { 421 t.Errorf("Subscriber count should be 1, got %d", comm.SubscriberCount) 422 } 423 424 t.Logf("✓ Subscriber count incremented to 1") 425 }) 426 427 t.Run("decrements subscriber count on unsubscribe", func(t *testing.T) { 428 userDID := "did:plc:test-user-count1" // Same user from above 429 rkey := "test-sub-count1" 430 431 // Send DELETE event 432 deleteEvent := &jetstream.JetstreamEvent{ 433 Did: userDID, 434 Kind: "commit", 435 TimeUS: time.Now().UnixMicro(), 436 Commit: &jetstream.CommitEvent{ 437 Rev: "test-rev-unsub", 438 Operation: "delete", 439 Collection: "social.coves.community.subscription", 440 RKey: rkey, 441 CID: "", 442 Record: nil, 443 }, 444 } 445 446 err := consumer.HandleEvent(ctx, deleteEvent) 447 if err != nil { 448 t.Fatalf("Failed to handle unsubscribe: %v", err) 449 } 450 451 // Check subscriber count decremented back to 0 452 comm, err := repo.GetByDID(ctx, community.DID) 453 if err != nil { 454 t.Fatalf("Failed to get community: %v", err) 455 } 456 457 if comm.SubscriberCount != 0 { 458 t.Errorf("Subscriber count should be 0, got %d", comm.SubscriberCount) 459 } 460 461 t.Logf("✓ Subscriber count decremented to 0") 462 }) 463} 464 465// Helper functions 466 467func createTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community { 468 t.Helper() 469 470 // Add timestamp to make handles unique across test runs 471 uniqueHandle := fmt.Sprintf("%s-%d.test.coves.social", name, time.Now().UnixNano()) 472 473 community := &communities.Community{ 474 DID: did, 475 Handle: uniqueHandle, 476 Name: name, 477 DisplayName: "Test Community " + name, 478 Description: "Test community for subscription indexing", 479 OwnerDID: did, 480 CreatedByDID: "did:plc:test-creator", 481 HostedByDID: "did:plc:test-instance", 482 Visibility: "public", 483 CreatedAt: time.Now(), 484 UpdatedAt: time.Now(), 485 } 486 487 created, err := repo.Create(context.Background(), community) 488 if err != nil { 489 t.Fatalf("Failed to create test community: %v", err) 490 } 491 492 return created 493} 494 495func createTestCommunityRepo(t *testing.T, db interface{}) communities.Repository { 496 t.Helper() 497 // Import the postgres package to create a repo 498 return postgresRepo.NewCommunityRepository(db.(*sql.DB)) 499} 500 501func cleanupTestDB(t *testing.T, db interface{}) { 502 t.Helper() 503 sqlDB := db.(*sql.DB) 504 if err := sqlDB.Close(); err != nil { 505 t.Logf("Failed to close database: %v", err) 506 } 507}