···
4
+
"Coves/internal/atproto/jetstream"
5
+
"Coves/internal/core/communities"
6
+
postgresRepo "Coves/internal/db/postgres"
14
+
// TestSubscriptionIndexing_ContentVisibility tests that contentVisibility is properly indexed
15
+
// from Jetstream events and stored in the AppView database
16
+
func TestSubscriptionIndexing_ContentVisibility(t *testing.T) {
17
+
if testing.Short() {
18
+
t.Skip("Skipping integration test in short mode")
21
+
ctx := context.Background()
22
+
db := setupTestDB(t)
23
+
defer cleanupTestDB(t, db)
25
+
repo := createTestCommunityRepo(t, db)
26
+
consumer := jetstream.NewCommunityEventConsumer(repo)
28
+
// Create a test community first (with unique DID)
29
+
testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
30
+
community := createTestCommunity(t, repo, "test-community-visibility", testDID)
32
+
t.Run("indexes subscription with contentVisibility=5", func(t *testing.T) {
33
+
userDID := "did:plc:test-user-123"
34
+
rkey := "test-sub-1"
35
+
uri := "at://" + userDID + "/social.coves.community.subscription/" + rkey
37
+
// Simulate Jetstream CREATE event for subscription
38
+
event := &jetstream.JetstreamEvent{
41
+
TimeUS: time.Now().UnixMicro(),
42
+
Commit: &jetstream.CommitEvent{
44
+
Operation: "create",
45
+
Collection: "social.coves.community.subscription", // CORRECT collection name
48
+
Record: map[string]interface{}{
49
+
"$type": "social.coves.community.subscription",
50
+
"subject": community.DID,
51
+
"createdAt": time.Now().Format(time.RFC3339),
52
+
"contentVisibility": float64(5), // JSON numbers decode as float64
57
+
// Process event through consumer
58
+
err := consumer.HandleEvent(ctx, event)
60
+
t.Fatalf("Failed to handle subscription event: %v", err)
63
+
// Verify subscription was indexed with correct contentVisibility
64
+
subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
66
+
t.Fatalf("Failed to get subscription: %v", err)
69
+
if subscription.ContentVisibility != 5 {
70
+
t.Errorf("Expected contentVisibility=5, got %d", subscription.ContentVisibility)
73
+
if subscription.UserDID != userDID {
74
+
t.Errorf("Expected userDID=%s, got %s", userDID, subscription.UserDID)
77
+
if subscription.CommunityDID != community.DID {
78
+
t.Errorf("Expected communityDID=%s, got %s", community.DID, subscription.CommunityDID)
81
+
if subscription.RecordURI != uri {
82
+
t.Errorf("Expected recordURI=%s, got %s", uri, subscription.RecordURI)
85
+
t.Logf("✓ Subscription indexed with contentVisibility=5")
88
+
t.Run("defaults to contentVisibility=3 when not provided", func(t *testing.T) {
89
+
userDID := "did:plc:test-user-default"
90
+
rkey := "test-sub-default"
92
+
// Simulate Jetstream CREATE event WITHOUT contentVisibility field
93
+
event := &jetstream.JetstreamEvent{
96
+
TimeUS: time.Now().UnixMicro(),
97
+
Commit: &jetstream.CommitEvent{
98
+
Rev: "test-rev-default",
99
+
Operation: "create",
100
+
Collection: "social.coves.community.subscription",
102
+
CID: "bafydefault",
103
+
Record: map[string]interface{}{
104
+
"$type": "social.coves.community.subscription",
105
+
"subject": community.DID,
106
+
"createdAt": time.Now().Format(time.RFC3339),
107
+
// contentVisibility NOT provided
113
+
err := consumer.HandleEvent(ctx, event)
115
+
t.Fatalf("Failed to handle subscription event: %v", err)
118
+
// Verify defaults to 3
119
+
subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
121
+
t.Fatalf("Failed to get subscription: %v", err)
124
+
if subscription.ContentVisibility != 3 {
125
+
t.Errorf("Expected contentVisibility=3 (default), got %d", subscription.ContentVisibility)
128
+
t.Logf("✓ Subscription defaulted to contentVisibility=3")
131
+
t.Run("clamps contentVisibility to valid range (1-5)", func(t *testing.T) {
132
+
testCases := []struct {
137
+
{input: 0, expected: 1, name: "zero clamped to 1"},
138
+
{input: -5, expected: 1, name: "negative clamped to 1"},
139
+
{input: 10, expected: 5, name: "10 clamped to 5"},
140
+
{input: 100, expected: 5, name: "100 clamped to 5"},
141
+
{input: 1, expected: 1, name: "1 stays 1"},
142
+
{input: 3, expected: 3, name: "3 stays 3"},
143
+
{input: 5, expected: 5, name: "5 stays 5"},
146
+
for i, tc := range testCases {
147
+
t.Run(tc.name, func(t *testing.T) {
148
+
userDID := fmt.Sprintf("did:plc:test-clamp-%d", i)
149
+
rkey := fmt.Sprintf("test-sub-clamp-%d", i)
151
+
event := &jetstream.JetstreamEvent{
154
+
TimeUS: time.Now().UnixMicro(),
155
+
Commit: &jetstream.CommitEvent{
156
+
Rev: "test-rev-clamp",
157
+
Operation: "create",
158
+
Collection: "social.coves.community.subscription",
161
+
Record: map[string]interface{}{
162
+
"$type": "social.coves.community.subscription",
163
+
"subject": community.DID,
164
+
"createdAt": time.Now().Format(time.RFC3339),
165
+
"contentVisibility": tc.input,
170
+
err := consumer.HandleEvent(ctx, event)
172
+
t.Fatalf("Failed to handle subscription event: %v", err)
175
+
subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
177
+
t.Fatalf("Failed to get subscription: %v", err)
180
+
if subscription.ContentVisibility != tc.expected {
181
+
t.Errorf("Input %.0f: expected %d, got %d", tc.input, tc.expected, subscription.ContentVisibility)
184
+
t.Logf("✓ Input %.0f clamped to %d", tc.input, subscription.ContentVisibility)
189
+
t.Run("idempotency: duplicate subscription events don't fail", func(t *testing.T) {
190
+
userDID := "did:plc:test-idempotent"
191
+
rkey := "test-sub-idempotent"
193
+
event := &jetstream.JetstreamEvent{
196
+
TimeUS: time.Now().UnixMicro(),
197
+
Commit: &jetstream.CommitEvent{
198
+
Rev: "test-rev-idempotent",
199
+
Operation: "create",
200
+
Collection: "social.coves.community.subscription",
202
+
CID: "bafyidempotent",
203
+
Record: map[string]interface{}{
204
+
"$type": "social.coves.community.subscription",
205
+
"subject": community.DID,
206
+
"createdAt": time.Now().Format(time.RFC3339),
207
+
"contentVisibility": float64(4),
212
+
// Process first time
213
+
err := consumer.HandleEvent(ctx, event)
215
+
t.Fatalf("Failed to handle first subscription event: %v", err)
218
+
// Process again (Jetstream replay scenario)
219
+
err = consumer.HandleEvent(ctx, event)
221
+
t.Errorf("Idempotency failed: second event should not error, got: %v", err)
224
+
// Verify only one subscription exists
225
+
subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
227
+
t.Fatalf("Failed to get subscription: %v", err)
230
+
if subscription.ContentVisibility != 4 {
231
+
t.Errorf("Expected contentVisibility=4, got %d", subscription.ContentVisibility)
234
+
t.Logf("✓ Duplicate events handled idempotently")
238
+
// TestSubscriptionIndexing_DeleteOperations tests unsubscribe (DELETE) event handling
239
+
func TestSubscriptionIndexing_DeleteOperations(t *testing.T) {
240
+
if testing.Short() {
241
+
t.Skip("Skipping integration test in short mode")
244
+
ctx := context.Background()
245
+
db := setupTestDB(t)
246
+
defer cleanupTestDB(t, db)
248
+
repo := createTestCommunityRepo(t, db)
249
+
consumer := jetstream.NewCommunityEventConsumer(repo)
251
+
// Create test community (with unique DID)
252
+
testDID := fmt.Sprintf("did:plc:test-unsub-%d", time.Now().UnixNano())
253
+
community := createTestCommunity(t, repo, "test-unsubscribe", testDID)
255
+
t.Run("deletes subscription when DELETE event received", func(t *testing.T) {
256
+
userDID := "did:plc:test-user-delete"
257
+
rkey := "test-sub-delete"
259
+
// First, create a subscription
260
+
createEvent := &jetstream.JetstreamEvent{
263
+
TimeUS: time.Now().UnixMicro(),
264
+
Commit: &jetstream.CommitEvent{
265
+
Rev: "test-rev-create",
266
+
Operation: "create",
267
+
Collection: "social.coves.community.subscription",
270
+
Record: map[string]interface{}{
271
+
"$type": "social.coves.community.subscription",
272
+
"subject": community.DID,
273
+
"createdAt": time.Now().Format(time.RFC3339),
274
+
"contentVisibility": float64(3),
279
+
err := consumer.HandleEvent(ctx, createEvent)
281
+
t.Fatalf("Failed to create subscription: %v", err)
284
+
// Verify subscription exists
285
+
_, err = repo.GetSubscription(ctx, userDID, community.DID)
287
+
t.Fatalf("Subscription should exist: %v", err)
290
+
// Now send DELETE event (unsubscribe)
291
+
// IMPORTANT: DELETE operations don't include record data in Jetstream
292
+
deleteEvent := &jetstream.JetstreamEvent{
295
+
TimeUS: time.Now().UnixMicro(),
296
+
Commit: &jetstream.CommitEvent{
297
+
Rev: "test-rev-delete",
298
+
Operation: "delete",
299
+
Collection: "social.coves.community.subscription",
301
+
CID: "", // No CID on deletes
302
+
Record: nil, // No record data on deletes
306
+
err = consumer.HandleEvent(ctx, deleteEvent)
308
+
t.Fatalf("Failed to handle delete event: %v", err)
311
+
// Verify subscription was deleted
312
+
_, err = repo.GetSubscription(ctx, userDID, community.DID)
314
+
t.Errorf("Subscription should have been deleted")
316
+
if !communities.IsNotFound(err) {
317
+
t.Errorf("Expected NotFound error, got: %v", err)
320
+
t.Logf("✓ Subscription deleted successfully")
323
+
t.Run("idempotent delete: deleting non-existent subscription doesn't fail", func(t *testing.T) {
324
+
userDID := "did:plc:test-user-noexist"
325
+
rkey := "test-sub-noexist"
327
+
// Try to delete a subscription that doesn't exist
328
+
deleteEvent := &jetstream.JetstreamEvent{
331
+
TimeUS: time.Now().UnixMicro(),
332
+
Commit: &jetstream.CommitEvent{
333
+
Rev: "test-rev-noexist",
334
+
Operation: "delete",
335
+
Collection: "social.coves.community.subscription",
342
+
// Should not error (idempotent)
343
+
err := consumer.HandleEvent(ctx, deleteEvent)
345
+
t.Errorf("Deleting non-existent subscription should not error, got: %v", err)
348
+
t.Logf("✓ Idempotent delete handled gracefully")
352
+
// TestSubscriptionIndexing_SubscriberCount tests that subscriber counts are updated atomically
353
+
func TestSubscriptionIndexing_SubscriberCount(t *testing.T) {
354
+
if testing.Short() {
355
+
t.Skip("Skipping integration test in short mode")
358
+
ctx := context.Background()
359
+
db := setupTestDB(t)
360
+
defer cleanupTestDB(t, db)
362
+
repo := createTestCommunityRepo(t, db)
363
+
consumer := jetstream.NewCommunityEventConsumer(repo)
365
+
// Create test community (with unique DID)
366
+
testDID := fmt.Sprintf("did:plc:test-subcount-%d", time.Now().UnixNano())
367
+
community := createTestCommunity(t, repo, "test-subscriber-count", testDID)
369
+
// Verify initial subscriber count is 0
370
+
comm, err := repo.GetByDID(ctx, community.DID)
372
+
t.Fatalf("Failed to get community: %v", err)
374
+
if comm.SubscriberCount != 0 {
375
+
t.Errorf("Initial subscriber count should be 0, got %d", comm.SubscriberCount)
378
+
t.Run("increments subscriber count on subscribe", func(t *testing.T) {
379
+
userDID := "did:plc:test-user-count1"
380
+
rkey := "test-sub-count1"
382
+
event := &jetstream.JetstreamEvent{
385
+
TimeUS: time.Now().UnixMicro(),
386
+
Commit: &jetstream.CommitEvent{
387
+
Rev: "test-rev-count",
388
+
Operation: "create",
389
+
Collection: "social.coves.community.subscription",
392
+
Record: map[string]interface{}{
393
+
"$type": "social.coves.community.subscription",
394
+
"subject": community.DID,
395
+
"createdAt": time.Now().Format(time.RFC3339),
396
+
"contentVisibility": float64(3),
401
+
err := consumer.HandleEvent(ctx, event)
403
+
t.Fatalf("Failed to handle subscription: %v", err)
406
+
// Check subscriber count incremented
407
+
comm, err := repo.GetByDID(ctx, community.DID)
409
+
t.Fatalf("Failed to get community: %v", err)
412
+
if comm.SubscriberCount != 1 {
413
+
t.Errorf("Subscriber count should be 1, got %d", comm.SubscriberCount)
416
+
t.Logf("✓ Subscriber count incremented to 1")
419
+
t.Run("decrements subscriber count on unsubscribe", func(t *testing.T) {
420
+
userDID := "did:plc:test-user-count1" // Same user from above
421
+
rkey := "test-sub-count1"
423
+
// Send DELETE event
424
+
deleteEvent := &jetstream.JetstreamEvent{
427
+
TimeUS: time.Now().UnixMicro(),
428
+
Commit: &jetstream.CommitEvent{
429
+
Rev: "test-rev-unsub",
430
+
Operation: "delete",
431
+
Collection: "social.coves.community.subscription",
438
+
err := consumer.HandleEvent(ctx, deleteEvent)
440
+
t.Fatalf("Failed to handle unsubscribe: %v", err)
443
+
// Check subscriber count decremented back to 0
444
+
comm, err := repo.GetByDID(ctx, community.DID)
446
+
t.Fatalf("Failed to get community: %v", err)
449
+
if comm.SubscriberCount != 0 {
450
+
t.Errorf("Subscriber count should be 0, got %d", comm.SubscriberCount)
453
+
t.Logf("✓ Subscriber count decremented to 0")
457
+
// Helper functions
459
+
func createTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community {
462
+
// Add timestamp to make handles unique across test runs
463
+
uniqueHandle := fmt.Sprintf("%s-%d.test.coves.social", name, time.Now().UnixNano())
465
+
community := &communities.Community{
467
+
Handle: uniqueHandle,
469
+
DisplayName: "Test Community " + name,
470
+
Description: "Test community for subscription indexing",
472
+
CreatedByDID: "did:plc:test-creator",
473
+
HostedByDID: "did:plc:test-instance",
474
+
Visibility: "public",
475
+
CreatedAt: time.Now(),
476
+
UpdatedAt: time.Now(),
479
+
created, err := repo.Create(context.Background(), community)
481
+
t.Fatalf("Failed to create test community: %v", err)
487
+
func createTestCommunityRepo(t *testing.T, db interface{}) communities.Repository {
489
+
// Import the postgres package to create a repo
490
+
return postgresRepo.NewCommunityRepository(db.(*sql.DB))
493
+
func cleanupTestDB(t *testing.T, db interface{}) {
495
+
sqlDB := db.(*sql.DB)
496
+
if err := sqlDB.Close(); err != nil {
497
+
t.Logf("Failed to close database: %v", err)