A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/atproto/jetstream" 5 "Coves/internal/core/communities" 6 postgresRepo "Coves/internal/db/postgres" 7 "context" 8 "database/sql" 9 "fmt" 10 "testing" 11 "time" 12) 13 14// TestSubscriptionIndexing_ContentVisibility tests that contentVisibility is properly indexed 15// from Jetstream events and stored in the AppView database 16func TestSubscriptionIndexing_ContentVisibility(t *testing.T) { 17 if testing.Short() { 18 t.Skip("Skipping integration test in short mode") 19 } 20 21 ctx := context.Background() 22 db := setupTestDB(t) 23 defer cleanupTestDB(t, db) 24 25 repo := createTestCommunityRepo(t, db) 26 // Skip verification in tests 27 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true) 28 29 // Create a test community first (with unique DID) 30 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 31 community := createTestCommunity(t, repo, "test-community-visibility", testDID) 32 33 t.Run("indexes subscription with contentVisibility=5", func(t *testing.T) { 34 userDID := "did:plc:test-user-123" 35 rkey := "test-sub-1" 36 uri := "at://" + userDID + "/social.coves.community.subscription/" + rkey 37 38 // Simulate Jetstream CREATE event for subscription 39 event := &jetstream.JetstreamEvent{ 40 Did: userDID, 41 Kind: "commit", 42 TimeUS: time.Now().UnixMicro(), 43 Commit: &jetstream.CommitEvent{ 44 Rev: "test-rev-1", 45 Operation: "create", 46 Collection: "social.coves.community.subscription", // CORRECT collection name 47 RKey: rkey, 48 CID: "bafytest123", 49 Record: map[string]interface{}{ 50 "$type": "social.coves.community.subscription", 51 "subject": community.DID, 52 "createdAt": time.Now().Format(time.RFC3339), 53 "contentVisibility": float64(5), // JSON numbers decode as float64 54 }, 55 }, 56 } 57 58 // Process event through consumer 59 err := consumer.HandleEvent(ctx, event) 60 if err != nil { 61 t.Fatalf("Failed to handle subscription event: %v", err) 62 } 63 64 // Verify subscription was indexed with correct contentVisibility 65 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 66 if err != nil { 67 t.Fatalf("Failed to get subscription: %v", err) 68 } 69 70 if subscription.ContentVisibility != 5 { 71 t.Errorf("Expected contentVisibility=5, got %d", subscription.ContentVisibility) 72 } 73 74 if subscription.UserDID != userDID { 75 t.Errorf("Expected userDID=%s, got %s", userDID, subscription.UserDID) 76 } 77 78 if subscription.CommunityDID != community.DID { 79 t.Errorf("Expected communityDID=%s, got %s", community.DID, subscription.CommunityDID) 80 } 81 82 if subscription.RecordURI != uri { 83 t.Errorf("Expected recordURI=%s, got %s", uri, subscription.RecordURI) 84 } 85 86 t.Logf("✓ Subscription indexed with contentVisibility=5") 87 }) 88 89 t.Run("defaults to contentVisibility=3 when not provided", func(t *testing.T) { 90 userDID := "did:plc:test-user-default" 91 rkey := "test-sub-default" 92 93 // Simulate Jetstream CREATE event WITHOUT contentVisibility field 94 event := &jetstream.JetstreamEvent{ 95 Did: userDID, 96 Kind: "commit", 97 TimeUS: time.Now().UnixMicro(), 98 Commit: &jetstream.CommitEvent{ 99 Rev: "test-rev-default", 100 Operation: "create", 101 Collection: "social.coves.community.subscription", 102 RKey: rkey, 103 CID: "bafydefault", 104 Record: map[string]interface{}{ 105 "$type": "social.coves.community.subscription", 106 "subject": community.DID, 107 "createdAt": time.Now().Format(time.RFC3339), 108 // contentVisibility NOT provided 109 }, 110 }, 111 } 112 113 // Process event 114 err := consumer.HandleEvent(ctx, event) 115 if err != nil { 116 t.Fatalf("Failed to handle subscription event: %v", err) 117 } 118 119 // Verify defaults to 3 120 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 121 if err != nil { 122 t.Fatalf("Failed to get subscription: %v", err) 123 } 124 125 if subscription.ContentVisibility != 3 { 126 t.Errorf("Expected contentVisibility=3 (default), got %d", subscription.ContentVisibility) 127 } 128 129 t.Logf("✓ Subscription defaulted to contentVisibility=3") 130 }) 131 132 t.Run("clamps contentVisibility to valid range (1-5)", func(t *testing.T) { 133 testCases := []struct { 134 name string 135 input float64 136 expected int 137 }{ 138 {input: 0, expected: 1, name: "zero clamped to 1"}, 139 {input: -5, expected: 1, name: "negative clamped to 1"}, 140 {input: 10, expected: 5, name: "10 clamped to 5"}, 141 {input: 100, expected: 5, name: "100 clamped to 5"}, 142 {input: 1, expected: 1, name: "1 stays 1"}, 143 {input: 3, expected: 3, name: "3 stays 3"}, 144 {input: 5, expected: 5, name: "5 stays 5"}, 145 } 146 147 for i, tc := range testCases { 148 t.Run(tc.name, func(t *testing.T) { 149 userDID := fmt.Sprintf("did:plc:test-clamp-%d", i) 150 rkey := fmt.Sprintf("test-sub-clamp-%d", i) 151 152 event := &jetstream.JetstreamEvent{ 153 Did: userDID, 154 Kind: "commit", 155 TimeUS: time.Now().UnixMicro(), 156 Commit: &jetstream.CommitEvent{ 157 Rev: "test-rev-clamp", 158 Operation: "create", 159 Collection: "social.coves.community.subscription", 160 RKey: rkey, 161 CID: "bafyclamp", 162 Record: map[string]interface{}{ 163 "$type": "social.coves.community.subscription", 164 "subject": community.DID, 165 "createdAt": time.Now().Format(time.RFC3339), 166 "contentVisibility": tc.input, 167 }, 168 }, 169 } 170 171 err := consumer.HandleEvent(ctx, event) 172 if err != nil { 173 t.Fatalf("Failed to handle subscription event: %v", err) 174 } 175 176 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 177 if err != nil { 178 t.Fatalf("Failed to get subscription: %v", err) 179 } 180 181 if subscription.ContentVisibility != tc.expected { 182 t.Errorf("Input %.0f: expected %d, got %d", tc.input, tc.expected, subscription.ContentVisibility) 183 } 184 185 t.Logf("✓ Input %.0f clamped to %d", tc.input, subscription.ContentVisibility) 186 }) 187 } 188 }) 189 190 t.Run("idempotency: duplicate subscription events don't fail", func(t *testing.T) { 191 userDID := "did:plc:test-idempotent" 192 rkey := "test-sub-idempotent" 193 194 event := &jetstream.JetstreamEvent{ 195 Did: userDID, 196 Kind: "commit", 197 TimeUS: time.Now().UnixMicro(), 198 Commit: &jetstream.CommitEvent{ 199 Rev: "test-rev-idempotent", 200 Operation: "create", 201 Collection: "social.coves.community.subscription", 202 RKey: rkey, 203 CID: "bafyidempotent", 204 Record: map[string]interface{}{ 205 "$type": "social.coves.community.subscription", 206 "subject": community.DID, 207 "createdAt": time.Now().Format(time.RFC3339), 208 "contentVisibility": float64(4), 209 }, 210 }, 211 } 212 213 // Process first time 214 err := consumer.HandleEvent(ctx, event) 215 if err != nil { 216 t.Fatalf("Failed to handle first subscription event: %v", err) 217 } 218 219 // Process again (Jetstream replay scenario) 220 err = consumer.HandleEvent(ctx, event) 221 if err != nil { 222 t.Errorf("Idempotency failed: second event should not error, got: %v", err) 223 } 224 225 // Verify only one subscription exists 226 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 227 if err != nil { 228 t.Fatalf("Failed to get subscription: %v", err) 229 } 230 231 if subscription.ContentVisibility != 4 { 232 t.Errorf("Expected contentVisibility=4, got %d", subscription.ContentVisibility) 233 } 234 235 t.Logf("✓ Duplicate events handled idempotently") 236 }) 237} 238 239// TestSubscriptionIndexing_DeleteOperations tests unsubscribe (DELETE) event handling 240func TestSubscriptionIndexing_DeleteOperations(t *testing.T) { 241 if testing.Short() { 242 t.Skip("Skipping integration test in short mode") 243 } 244 245 ctx := context.Background() 246 db := setupTestDB(t) 247 defer cleanupTestDB(t, db) 248 249 repo := createTestCommunityRepo(t, db) 250 // Skip verification in tests 251 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true) 252 253 // Create test community (with unique DID) 254 testDID := fmt.Sprintf("did:plc:test-unsub-%d", time.Now().UnixNano()) 255 community := createTestCommunity(t, repo, "test-unsubscribe", testDID) 256 257 t.Run("deletes subscription when DELETE event received", func(t *testing.T) { 258 userDID := "did:plc:test-user-delete" 259 rkey := "test-sub-delete" 260 261 // First, create a subscription 262 createEvent := &jetstream.JetstreamEvent{ 263 Did: userDID, 264 Kind: "commit", 265 TimeUS: time.Now().UnixMicro(), 266 Commit: &jetstream.CommitEvent{ 267 Rev: "test-rev-create", 268 Operation: "create", 269 Collection: "social.coves.community.subscription", 270 RKey: rkey, 271 CID: "bafycreate", 272 Record: map[string]interface{}{ 273 "$type": "social.coves.community.subscription", 274 "subject": community.DID, 275 "createdAt": time.Now().Format(time.RFC3339), 276 "contentVisibility": float64(3), 277 }, 278 }, 279 } 280 281 err := consumer.HandleEvent(ctx, createEvent) 282 if err != nil { 283 t.Fatalf("Failed to create subscription: %v", err) 284 } 285 286 // Verify subscription exists 287 _, err = repo.GetSubscription(ctx, userDID, community.DID) 288 if err != nil { 289 t.Fatalf("Subscription should exist: %v", err) 290 } 291 292 // Now send DELETE event (unsubscribe) 293 // IMPORTANT: DELETE operations don't include record data in Jetstream 294 deleteEvent := &jetstream.JetstreamEvent{ 295 Did: userDID, 296 Kind: "commit", 297 TimeUS: time.Now().UnixMicro(), 298 Commit: &jetstream.CommitEvent{ 299 Rev: "test-rev-delete", 300 Operation: "delete", 301 Collection: "social.coves.community.subscription", 302 RKey: rkey, 303 CID: "", // No CID on deletes 304 Record: nil, // No record data on deletes 305 }, 306 } 307 308 err = consumer.HandleEvent(ctx, deleteEvent) 309 if err != nil { 310 t.Fatalf("Failed to handle delete event: %v", err) 311 } 312 313 // Verify subscription was deleted 314 _, err = repo.GetSubscription(ctx, userDID, community.DID) 315 if err == nil { 316 t.Errorf("Subscription should have been deleted") 317 } 318 if !communities.IsNotFound(err) { 319 t.Errorf("Expected NotFound error, got: %v", err) 320 } 321 322 t.Logf("✓ Subscription deleted successfully") 323 }) 324 325 t.Run("idempotent delete: deleting non-existent subscription doesn't fail", func(t *testing.T) { 326 userDID := "did:plc:test-user-noexist" 327 rkey := "test-sub-noexist" 328 329 // Try to delete a subscription that doesn't exist 330 deleteEvent := &jetstream.JetstreamEvent{ 331 Did: userDID, 332 Kind: "commit", 333 TimeUS: time.Now().UnixMicro(), 334 Commit: &jetstream.CommitEvent{ 335 Rev: "test-rev-noexist", 336 Operation: "delete", 337 Collection: "social.coves.community.subscription", 338 RKey: rkey, 339 CID: "", 340 Record: nil, 341 }, 342 } 343 344 // Should not error (idempotent) 345 err := consumer.HandleEvent(ctx, deleteEvent) 346 if err != nil { 347 t.Errorf("Deleting non-existent subscription should not error, got: %v", err) 348 } 349 350 t.Logf("✓ Idempotent delete handled gracefully") 351 }) 352} 353 354// TestSubscriptionIndexing_SubscriberCount tests that subscriber counts are updated atomically 355func TestSubscriptionIndexing_SubscriberCount(t *testing.T) { 356 if testing.Short() { 357 t.Skip("Skipping integration test in short mode") 358 } 359 360 ctx := context.Background() 361 db := setupTestDB(t) 362 defer cleanupTestDB(t, db) 363 364 repo := createTestCommunityRepo(t, db) 365 // Skip verification in tests 366 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true) 367 368 // Create test community (with unique DID) 369 testDID := fmt.Sprintf("did:plc:test-subcount-%d", time.Now().UnixNano()) 370 community := createTestCommunity(t, repo, "test-subscriber-count", testDID) 371 372 // Verify initial subscriber count is 0 373 comm, err := repo.GetByDID(ctx, community.DID) 374 if err != nil { 375 t.Fatalf("Failed to get community: %v", err) 376 } 377 if comm.SubscriberCount != 0 { 378 t.Errorf("Initial subscriber count should be 0, got %d", comm.SubscriberCount) 379 } 380 381 t.Run("increments subscriber count on subscribe", func(t *testing.T) { 382 userDID := "did:plc:test-user-count1" 383 rkey := "test-sub-count1" 384 385 event := &jetstream.JetstreamEvent{ 386 Did: userDID, 387 Kind: "commit", 388 TimeUS: time.Now().UnixMicro(), 389 Commit: &jetstream.CommitEvent{ 390 Rev: "test-rev-count", 391 Operation: "create", 392 Collection: "social.coves.community.subscription", 393 RKey: rkey, 394 CID: "bafycount", 395 Record: map[string]interface{}{ 396 "$type": "social.coves.community.subscription", 397 "subject": community.DID, 398 "createdAt": time.Now().Format(time.RFC3339), 399 "contentVisibility": float64(3), 400 }, 401 }, 402 } 403 404 err := consumer.HandleEvent(ctx, event) 405 if err != nil { 406 t.Fatalf("Failed to handle subscription: %v", err) 407 } 408 409 // Check subscriber count incremented 410 comm, err := repo.GetByDID(ctx, community.DID) 411 if err != nil { 412 t.Fatalf("Failed to get community: %v", err) 413 } 414 415 if comm.SubscriberCount != 1 { 416 t.Errorf("Subscriber count should be 1, got %d", comm.SubscriberCount) 417 } 418 419 t.Logf("✓ Subscriber count incremented to 1") 420 }) 421 422 t.Run("decrements subscriber count on unsubscribe", func(t *testing.T) { 423 userDID := "did:plc:test-user-count1" // Same user from above 424 rkey := "test-sub-count1" 425 426 // Send DELETE event 427 deleteEvent := &jetstream.JetstreamEvent{ 428 Did: userDID, 429 Kind: "commit", 430 TimeUS: time.Now().UnixMicro(), 431 Commit: &jetstream.CommitEvent{ 432 Rev: "test-rev-unsub", 433 Operation: "delete", 434 Collection: "social.coves.community.subscription", 435 RKey: rkey, 436 CID: "", 437 Record: nil, 438 }, 439 } 440 441 err := consumer.HandleEvent(ctx, deleteEvent) 442 if err != nil { 443 t.Fatalf("Failed to handle unsubscribe: %v", err) 444 } 445 446 // Check subscriber count decremented back to 0 447 comm, err := repo.GetByDID(ctx, community.DID) 448 if err != nil { 449 t.Fatalf("Failed to get community: %v", err) 450 } 451 452 if comm.SubscriberCount != 0 { 453 t.Errorf("Subscriber count should be 0, got %d", comm.SubscriberCount) 454 } 455 456 t.Logf("✓ Subscriber count decremented to 0") 457 }) 458} 459 460// Helper functions 461 462func createTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community { 463 t.Helper() 464 465 // Add timestamp to make handles unique across test runs 466 uniqueHandle := fmt.Sprintf("%s-%d.test.coves.social", name, time.Now().UnixNano()) 467 468 community := &communities.Community{ 469 DID: did, 470 Handle: uniqueHandle, 471 Name: name, 472 DisplayName: "Test Community " + name, 473 Description: "Test community for subscription indexing", 474 OwnerDID: did, 475 CreatedByDID: "did:plc:test-creator", 476 HostedByDID: "did:plc:test-instance", 477 Visibility: "public", 478 CreatedAt: time.Now(), 479 UpdatedAt: time.Now(), 480 } 481 482 created, err := repo.Create(context.Background(), community) 483 if err != nil { 484 t.Fatalf("Failed to create test community: %v", err) 485 } 486 487 return created 488} 489 490func createTestCommunityRepo(t *testing.T, db interface{}) communities.Repository { 491 t.Helper() 492 // Import the postgres package to create a repo 493 return postgresRepo.NewCommunityRepository(db.(*sql.DB)) 494} 495 496func cleanupTestDB(t *testing.T, db interface{}) { 497 t.Helper() 498 sqlDB := db.(*sql.DB) 499 if err := sqlDB.Close(); err != nil { 500 t.Logf("Failed to close database: %v", err) 501 } 502}