A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/atproto/jetstream" 5 "Coves/internal/core/communities" 6 postgresRepo "Coves/internal/db/postgres" 7 "context" 8 "database/sql" 9 "fmt" 10 "testing" 11 "time" 12) 13 14// TestSubscriptionIndexing_ContentVisibility tests that contentVisibility is properly indexed 15// from Jetstream events and stored in the AppView database 16func TestSubscriptionIndexing_ContentVisibility(t *testing.T) { 17 if testing.Short() { 18 t.Skip("Skipping integration test in short mode") 19 } 20 21 ctx := context.Background() 22 db := setupTestDB(t) 23 defer cleanupTestDB(t, db) 24 25 repo := createTestCommunityRepo(t, db) 26 consumer := jetstream.NewCommunityEventConsumer(repo) 27 28 // Create a test community first (with unique DID) 29 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 30 community := createTestCommunity(t, repo, "test-community-visibility", testDID) 31 32 t.Run("indexes subscription with contentVisibility=5", func(t *testing.T) { 33 userDID := "did:plc:test-user-123" 34 rkey := "test-sub-1" 35 uri := "at://" + userDID + "/social.coves.community.subscription/" + rkey 36 37 // Simulate Jetstream CREATE event for subscription 38 event := &jetstream.JetstreamEvent{ 39 Did: userDID, 40 Kind: "commit", 41 TimeUS: time.Now().UnixMicro(), 42 Commit: &jetstream.CommitEvent{ 43 Rev: "test-rev-1", 44 Operation: "create", 45 Collection: "social.coves.community.subscription", // CORRECT collection name 46 RKey: rkey, 47 CID: "bafytest123", 48 Record: map[string]interface{}{ 49 "$type": "social.coves.community.subscription", 50 "subject": community.DID, 51 "createdAt": time.Now().Format(time.RFC3339), 52 "contentVisibility": float64(5), // JSON numbers decode as float64 53 }, 54 }, 55 } 56 57 // Process event through consumer 58 err := consumer.HandleEvent(ctx, event) 59 if err != nil { 60 t.Fatalf("Failed to handle subscription event: %v", err) 61 } 62 63 // Verify subscription was indexed with correct contentVisibility 64 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 65 if err != nil { 66 t.Fatalf("Failed to get subscription: %v", err) 67 } 68 69 if subscription.ContentVisibility != 5 { 70 t.Errorf("Expected contentVisibility=5, got %d", subscription.ContentVisibility) 71 } 72 73 if subscription.UserDID != userDID { 74 t.Errorf("Expected userDID=%s, got %s", userDID, subscription.UserDID) 75 } 76 77 if subscription.CommunityDID != community.DID { 78 t.Errorf("Expected communityDID=%s, got %s", community.DID, subscription.CommunityDID) 79 } 80 81 if subscription.RecordURI != uri { 82 t.Errorf("Expected recordURI=%s, got %s", uri, subscription.RecordURI) 83 } 84 85 t.Logf("✓ Subscription indexed with contentVisibility=5") 86 }) 87 88 t.Run("defaults to contentVisibility=3 when not provided", func(t *testing.T) { 89 userDID := "did:plc:test-user-default" 90 rkey := "test-sub-default" 91 92 // Simulate Jetstream CREATE event WITHOUT contentVisibility field 93 event := &jetstream.JetstreamEvent{ 94 Did: userDID, 95 Kind: "commit", 96 TimeUS: time.Now().UnixMicro(), 97 Commit: &jetstream.CommitEvent{ 98 Rev: "test-rev-default", 99 Operation: "create", 100 Collection: "social.coves.community.subscription", 101 RKey: rkey, 102 CID: "bafydefault", 103 Record: map[string]interface{}{ 104 "$type": "social.coves.community.subscription", 105 "subject": community.DID, 106 "createdAt": time.Now().Format(time.RFC3339), 107 // contentVisibility NOT provided 108 }, 109 }, 110 } 111 112 // Process event 113 err := consumer.HandleEvent(ctx, event) 114 if err != nil { 115 t.Fatalf("Failed to handle subscription event: %v", err) 116 } 117 118 // Verify defaults to 3 119 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 120 if err != nil { 121 t.Fatalf("Failed to get subscription: %v", err) 122 } 123 124 if subscription.ContentVisibility != 3 { 125 t.Errorf("Expected contentVisibility=3 (default), got %d", subscription.ContentVisibility) 126 } 127 128 t.Logf("✓ Subscription defaulted to contentVisibility=3") 129 }) 130 131 t.Run("clamps contentVisibility to valid range (1-5)", func(t *testing.T) { 132 testCases := []struct { 133 input float64 134 expected int 135 name string 136 }{ 137 {input: 0, expected: 1, name: "zero clamped to 1"}, 138 {input: -5, expected: 1, name: "negative clamped to 1"}, 139 {input: 10, expected: 5, name: "10 clamped to 5"}, 140 {input: 100, expected: 5, name: "100 clamped to 5"}, 141 {input: 1, expected: 1, name: "1 stays 1"}, 142 {input: 3, expected: 3, name: "3 stays 3"}, 143 {input: 5, expected: 5, name: "5 stays 5"}, 144 } 145 146 for i, tc := range testCases { 147 t.Run(tc.name, func(t *testing.T) { 148 userDID := fmt.Sprintf("did:plc:test-clamp-%d", i) 149 rkey := fmt.Sprintf("test-sub-clamp-%d", i) 150 151 event := &jetstream.JetstreamEvent{ 152 Did: userDID, 153 Kind: "commit", 154 TimeUS: time.Now().UnixMicro(), 155 Commit: &jetstream.CommitEvent{ 156 Rev: "test-rev-clamp", 157 Operation: "create", 158 Collection: "social.coves.community.subscription", 159 RKey: rkey, 160 CID: "bafyclamp", 161 Record: map[string]interface{}{ 162 "$type": "social.coves.community.subscription", 163 "subject": community.DID, 164 "createdAt": time.Now().Format(time.RFC3339), 165 "contentVisibility": tc.input, 166 }, 167 }, 168 } 169 170 err := consumer.HandleEvent(ctx, event) 171 if err != nil { 172 t.Fatalf("Failed to handle subscription event: %v", err) 173 } 174 175 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 176 if err != nil { 177 t.Fatalf("Failed to get subscription: %v", err) 178 } 179 180 if subscription.ContentVisibility != tc.expected { 181 t.Errorf("Input %.0f: expected %d, got %d", tc.input, tc.expected, subscription.ContentVisibility) 182 } 183 184 t.Logf("✓ Input %.0f clamped to %d", tc.input, subscription.ContentVisibility) 185 }) 186 } 187 }) 188 189 t.Run("idempotency: duplicate subscription events don't fail", func(t *testing.T) { 190 userDID := "did:plc:test-idempotent" 191 rkey := "test-sub-idempotent" 192 193 event := &jetstream.JetstreamEvent{ 194 Did: userDID, 195 Kind: "commit", 196 TimeUS: time.Now().UnixMicro(), 197 Commit: &jetstream.CommitEvent{ 198 Rev: "test-rev-idempotent", 199 Operation: "create", 200 Collection: "social.coves.community.subscription", 201 RKey: rkey, 202 CID: "bafyidempotent", 203 Record: map[string]interface{}{ 204 "$type": "social.coves.community.subscription", 205 "subject": community.DID, 206 "createdAt": time.Now().Format(time.RFC3339), 207 "contentVisibility": float64(4), 208 }, 209 }, 210 } 211 212 // Process first time 213 err := consumer.HandleEvent(ctx, event) 214 if err != nil { 215 t.Fatalf("Failed to handle first subscription event: %v", err) 216 } 217 218 // Process again (Jetstream replay scenario) 219 err = consumer.HandleEvent(ctx, event) 220 if err != nil { 221 t.Errorf("Idempotency failed: second event should not error, got: %v", err) 222 } 223 224 // Verify only one subscription exists 225 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 226 if err != nil { 227 t.Fatalf("Failed to get subscription: %v", err) 228 } 229 230 if subscription.ContentVisibility != 4 { 231 t.Errorf("Expected contentVisibility=4, got %d", subscription.ContentVisibility) 232 } 233 234 t.Logf("✓ Duplicate events handled idempotently") 235 }) 236} 237 238// TestSubscriptionIndexing_DeleteOperations tests unsubscribe (DELETE) event handling 239func TestSubscriptionIndexing_DeleteOperations(t *testing.T) { 240 if testing.Short() { 241 t.Skip("Skipping integration test in short mode") 242 } 243 244 ctx := context.Background() 245 db := setupTestDB(t) 246 defer cleanupTestDB(t, db) 247 248 repo := createTestCommunityRepo(t, db) 249 consumer := jetstream.NewCommunityEventConsumer(repo) 250 251 // Create test community (with unique DID) 252 testDID := fmt.Sprintf("did:plc:test-unsub-%d", time.Now().UnixNano()) 253 community := createTestCommunity(t, repo, "test-unsubscribe", testDID) 254 255 t.Run("deletes subscription when DELETE event received", func(t *testing.T) { 256 userDID := "did:plc:test-user-delete" 257 rkey := "test-sub-delete" 258 259 // First, create a subscription 260 createEvent := &jetstream.JetstreamEvent{ 261 Did: userDID, 262 Kind: "commit", 263 TimeUS: time.Now().UnixMicro(), 264 Commit: &jetstream.CommitEvent{ 265 Rev: "test-rev-create", 266 Operation: "create", 267 Collection: "social.coves.community.subscription", 268 RKey: rkey, 269 CID: "bafycreate", 270 Record: map[string]interface{}{ 271 "$type": "social.coves.community.subscription", 272 "subject": community.DID, 273 "createdAt": time.Now().Format(time.RFC3339), 274 "contentVisibility": float64(3), 275 }, 276 }, 277 } 278 279 err := consumer.HandleEvent(ctx, createEvent) 280 if err != nil { 281 t.Fatalf("Failed to create subscription: %v", err) 282 } 283 284 // Verify subscription exists 285 _, err = repo.GetSubscription(ctx, userDID, community.DID) 286 if err != nil { 287 t.Fatalf("Subscription should exist: %v", err) 288 } 289 290 // Now send DELETE event (unsubscribe) 291 // IMPORTANT: DELETE operations don't include record data in Jetstream 292 deleteEvent := &jetstream.JetstreamEvent{ 293 Did: userDID, 294 Kind: "commit", 295 TimeUS: time.Now().UnixMicro(), 296 Commit: &jetstream.CommitEvent{ 297 Rev: "test-rev-delete", 298 Operation: "delete", 299 Collection: "social.coves.community.subscription", 300 RKey: rkey, 301 CID: "", // No CID on deletes 302 Record: nil, // No record data on deletes 303 }, 304 } 305 306 err = consumer.HandleEvent(ctx, deleteEvent) 307 if err != nil { 308 t.Fatalf("Failed to handle delete event: %v", err) 309 } 310 311 // Verify subscription was deleted 312 _, err = repo.GetSubscription(ctx, userDID, community.DID) 313 if err == nil { 314 t.Errorf("Subscription should have been deleted") 315 } 316 if !communities.IsNotFound(err) { 317 t.Errorf("Expected NotFound error, got: %v", err) 318 } 319 320 t.Logf("✓ Subscription deleted successfully") 321 }) 322 323 t.Run("idempotent delete: deleting non-existent subscription doesn't fail", func(t *testing.T) { 324 userDID := "did:plc:test-user-noexist" 325 rkey := "test-sub-noexist" 326 327 // Try to delete a subscription that doesn't exist 328 deleteEvent := &jetstream.JetstreamEvent{ 329 Did: userDID, 330 Kind: "commit", 331 TimeUS: time.Now().UnixMicro(), 332 Commit: &jetstream.CommitEvent{ 333 Rev: "test-rev-noexist", 334 Operation: "delete", 335 Collection: "social.coves.community.subscription", 336 RKey: rkey, 337 CID: "", 338 Record: nil, 339 }, 340 } 341 342 // Should not error (idempotent) 343 err := consumer.HandleEvent(ctx, deleteEvent) 344 if err != nil { 345 t.Errorf("Deleting non-existent subscription should not error, got: %v", err) 346 } 347 348 t.Logf("✓ Idempotent delete handled gracefully") 349 }) 350} 351 352// TestSubscriptionIndexing_SubscriberCount tests that subscriber counts are updated atomically 353func TestSubscriptionIndexing_SubscriberCount(t *testing.T) { 354 if testing.Short() { 355 t.Skip("Skipping integration test in short mode") 356 } 357 358 ctx := context.Background() 359 db := setupTestDB(t) 360 defer cleanupTestDB(t, db) 361 362 repo := createTestCommunityRepo(t, db) 363 consumer := jetstream.NewCommunityEventConsumer(repo) 364 365 // Create test community (with unique DID) 366 testDID := fmt.Sprintf("did:plc:test-subcount-%d", time.Now().UnixNano()) 367 community := createTestCommunity(t, repo, "test-subscriber-count", testDID) 368 369 // Verify initial subscriber count is 0 370 comm, err := repo.GetByDID(ctx, community.DID) 371 if err != nil { 372 t.Fatalf("Failed to get community: %v", err) 373 } 374 if comm.SubscriberCount != 0 { 375 t.Errorf("Initial subscriber count should be 0, got %d", comm.SubscriberCount) 376 } 377 378 t.Run("increments subscriber count on subscribe", func(t *testing.T) { 379 userDID := "did:plc:test-user-count1" 380 rkey := "test-sub-count1" 381 382 event := &jetstream.JetstreamEvent{ 383 Did: userDID, 384 Kind: "commit", 385 TimeUS: time.Now().UnixMicro(), 386 Commit: &jetstream.CommitEvent{ 387 Rev: "test-rev-count", 388 Operation: "create", 389 Collection: "social.coves.community.subscription", 390 RKey: rkey, 391 CID: "bafycount", 392 Record: map[string]interface{}{ 393 "$type": "social.coves.community.subscription", 394 "subject": community.DID, 395 "createdAt": time.Now().Format(time.RFC3339), 396 "contentVisibility": float64(3), 397 }, 398 }, 399 } 400 401 err := consumer.HandleEvent(ctx, event) 402 if err != nil { 403 t.Fatalf("Failed to handle subscription: %v", err) 404 } 405 406 // Check subscriber count incremented 407 comm, err := repo.GetByDID(ctx, community.DID) 408 if err != nil { 409 t.Fatalf("Failed to get community: %v", err) 410 } 411 412 if comm.SubscriberCount != 1 { 413 t.Errorf("Subscriber count should be 1, got %d", comm.SubscriberCount) 414 } 415 416 t.Logf("✓ Subscriber count incremented to 1") 417 }) 418 419 t.Run("decrements subscriber count on unsubscribe", func(t *testing.T) { 420 userDID := "did:plc:test-user-count1" // Same user from above 421 rkey := "test-sub-count1" 422 423 // Send DELETE event 424 deleteEvent := &jetstream.JetstreamEvent{ 425 Did: userDID, 426 Kind: "commit", 427 TimeUS: time.Now().UnixMicro(), 428 Commit: &jetstream.CommitEvent{ 429 Rev: "test-rev-unsub", 430 Operation: "delete", 431 Collection: "social.coves.community.subscription", 432 RKey: rkey, 433 CID: "", 434 Record: nil, 435 }, 436 } 437 438 err := consumer.HandleEvent(ctx, deleteEvent) 439 if err != nil { 440 t.Fatalf("Failed to handle unsubscribe: %v", err) 441 } 442 443 // Check subscriber count decremented back to 0 444 comm, err := repo.GetByDID(ctx, community.DID) 445 if err != nil { 446 t.Fatalf("Failed to get community: %v", err) 447 } 448 449 if comm.SubscriberCount != 0 { 450 t.Errorf("Subscriber count should be 0, got %d", comm.SubscriberCount) 451 } 452 453 t.Logf("✓ Subscriber count decremented to 0") 454 }) 455} 456 457// Helper functions 458 459func createTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community { 460 t.Helper() 461 462 // Add timestamp to make handles unique across test runs 463 uniqueHandle := fmt.Sprintf("%s-%d.test.coves.social", name, time.Now().UnixNano()) 464 465 community := &communities.Community{ 466 DID: did, 467 Handle: uniqueHandle, 468 Name: name, 469 DisplayName: "Test Community " + name, 470 Description: "Test community for subscription indexing", 471 OwnerDID: did, 472 CreatedByDID: "did:plc:test-creator", 473 HostedByDID: "did:plc:test-instance", 474 Visibility: "public", 475 CreatedAt: time.Now(), 476 UpdatedAt: time.Now(), 477 } 478 479 created, err := repo.Create(context.Background(), community) 480 if err != nil { 481 t.Fatalf("Failed to create test community: %v", err) 482 } 483 484 return created 485} 486 487func createTestCommunityRepo(t *testing.T, db interface{}) communities.Repository { 488 t.Helper() 489 // Import the postgres package to create a repo 490 return postgresRepo.NewCommunityRepository(db.(*sql.DB)) 491} 492 493func cleanupTestDB(t *testing.T, db interface{}) { 494 t.Helper() 495 sqlDB := db.(*sql.DB) 496 if err := sqlDB.Close(); err != nil { 497 t.Logf("Failed to close database: %v", err) 498 } 499}