A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "testing"
8 "time"
9
10 "Coves/internal/atproto/jetstream"
11 "Coves/internal/core/communities"
12
13 postgresRepo "Coves/internal/db/postgres"
14)
15
16// TestSubscriptionIndexing_ContentVisibility tests that contentVisibility is properly indexed
17// from Jetstream events and stored in the AppView database
18func TestSubscriptionIndexing_ContentVisibility(t *testing.T) {
19 if testing.Short() {
20 t.Skip("Skipping integration test in short mode")
21 }
22
23 ctx := context.Background()
24 db := setupTestDB(t)
25 defer cleanupTestDB(t, db)
26
27 repo := createTestCommunityRepo(t, db)
28 // Skip verification in tests
29 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs
30 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil)
31
32 // Create a test community first (with unique DID)
33 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
34 community := createTestCommunity(t, repo, "test-community-visibility", testDID)
35
36 t.Run("indexes subscription with contentVisibility=5", func(t *testing.T) {
37 userDID := "did:plc:test-user-123"
38 rkey := "test-sub-1"
39 uri := "at://" + userDID + "/social.coves.community.subscription/" + rkey
40
41 // Simulate Jetstream CREATE event for subscription
42 event := &jetstream.JetstreamEvent{
43 Did: userDID,
44 Kind: "commit",
45 TimeUS: time.Now().UnixMicro(),
46 Commit: &jetstream.CommitEvent{
47 Rev: "test-rev-1",
48 Operation: "create",
49 Collection: "social.coves.community.subscription", // CORRECT collection name
50 RKey: rkey,
51 CID: "bafytest123",
52 Record: map[string]interface{}{
53 "$type": "social.coves.community.subscription",
54 "subject": community.DID,
55 "createdAt": time.Now().Format(time.RFC3339),
56 "contentVisibility": float64(5), // JSON numbers decode as float64
57 },
58 },
59 }
60
61 // Process event through consumer
62 err := consumer.HandleEvent(ctx, event)
63 if err != nil {
64 t.Fatalf("Failed to handle subscription event: %v", err)
65 }
66
67 // Verify subscription was indexed with correct contentVisibility
68 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
69 if err != nil {
70 t.Fatalf("Failed to get subscription: %v", err)
71 }
72
73 if subscription.ContentVisibility != 5 {
74 t.Errorf("Expected contentVisibility=5, got %d", subscription.ContentVisibility)
75 }
76
77 if subscription.UserDID != userDID {
78 t.Errorf("Expected userDID=%s, got %s", userDID, subscription.UserDID)
79 }
80
81 if subscription.CommunityDID != community.DID {
82 t.Errorf("Expected communityDID=%s, got %s", community.DID, subscription.CommunityDID)
83 }
84
85 if subscription.RecordURI != uri {
86 t.Errorf("Expected recordURI=%s, got %s", uri, subscription.RecordURI)
87 }
88
89 t.Logf("✓ Subscription indexed with contentVisibility=5")
90 })
91
92 t.Run("defaults to contentVisibility=3 when not provided", func(t *testing.T) {
93 userDID := "did:plc:test-user-default"
94 rkey := "test-sub-default"
95
96 // Simulate Jetstream CREATE event WITHOUT contentVisibility field
97 event := &jetstream.JetstreamEvent{
98 Did: userDID,
99 Kind: "commit",
100 TimeUS: time.Now().UnixMicro(),
101 Commit: &jetstream.CommitEvent{
102 Rev: "test-rev-default",
103 Operation: "create",
104 Collection: "social.coves.community.subscription",
105 RKey: rkey,
106 CID: "bafydefault",
107 Record: map[string]interface{}{
108 "$type": "social.coves.community.subscription",
109 "subject": community.DID,
110 "createdAt": time.Now().Format(time.RFC3339),
111 // contentVisibility NOT provided
112 },
113 },
114 }
115
116 // Process event
117 err := consumer.HandleEvent(ctx, event)
118 if err != nil {
119 t.Fatalf("Failed to handle subscription event: %v", err)
120 }
121
122 // Verify defaults to 3
123 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
124 if err != nil {
125 t.Fatalf("Failed to get subscription: %v", err)
126 }
127
128 if subscription.ContentVisibility != 3 {
129 t.Errorf("Expected contentVisibility=3 (default), got %d", subscription.ContentVisibility)
130 }
131
132 t.Logf("✓ Subscription defaulted to contentVisibility=3")
133 })
134
135 t.Run("clamps contentVisibility to valid range (1-5)", func(t *testing.T) {
136 testCases := []struct {
137 name string
138 input float64
139 expected int
140 }{
141 {input: 0, expected: 1, name: "zero clamped to 1"},
142 {input: -5, expected: 1, name: "negative clamped to 1"},
143 {input: 10, expected: 5, name: "10 clamped to 5"},
144 {input: 100, expected: 5, name: "100 clamped to 5"},
145 {input: 1, expected: 1, name: "1 stays 1"},
146 {input: 3, expected: 3, name: "3 stays 3"},
147 {input: 5, expected: 5, name: "5 stays 5"},
148 }
149
150 for i, tc := range testCases {
151 t.Run(tc.name, func(t *testing.T) {
152 userDID := fmt.Sprintf("did:plc:test-clamp-%d", i)
153 rkey := fmt.Sprintf("test-sub-clamp-%d", i)
154
155 event := &jetstream.JetstreamEvent{
156 Did: userDID,
157 Kind: "commit",
158 TimeUS: time.Now().UnixMicro(),
159 Commit: &jetstream.CommitEvent{
160 Rev: "test-rev-clamp",
161 Operation: "create",
162 Collection: "social.coves.community.subscription",
163 RKey: rkey,
164 CID: "bafyclamp",
165 Record: map[string]interface{}{
166 "$type": "social.coves.community.subscription",
167 "subject": community.DID,
168 "createdAt": time.Now().Format(time.RFC3339),
169 "contentVisibility": tc.input,
170 },
171 },
172 }
173
174 err := consumer.HandleEvent(ctx, event)
175 if err != nil {
176 t.Fatalf("Failed to handle subscription event: %v", err)
177 }
178
179 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
180 if err != nil {
181 t.Fatalf("Failed to get subscription: %v", err)
182 }
183
184 if subscription.ContentVisibility != tc.expected {
185 t.Errorf("Input %.0f: expected %d, got %d", tc.input, tc.expected, subscription.ContentVisibility)
186 }
187
188 t.Logf("✓ Input %.0f clamped to %d", tc.input, subscription.ContentVisibility)
189 })
190 }
191 })
192
193 t.Run("idempotency: duplicate subscription events don't fail", func(t *testing.T) {
194 userDID := "did:plc:test-idempotent"
195 rkey := "test-sub-idempotent"
196
197 event := &jetstream.JetstreamEvent{
198 Did: userDID,
199 Kind: "commit",
200 TimeUS: time.Now().UnixMicro(),
201 Commit: &jetstream.CommitEvent{
202 Rev: "test-rev-idempotent",
203 Operation: "create",
204 Collection: "social.coves.community.subscription",
205 RKey: rkey,
206 CID: "bafyidempotent",
207 Record: map[string]interface{}{
208 "$type": "social.coves.community.subscription",
209 "subject": community.DID,
210 "createdAt": time.Now().Format(time.RFC3339),
211 "contentVisibility": float64(4),
212 },
213 },
214 }
215
216 // Process first time
217 err := consumer.HandleEvent(ctx, event)
218 if err != nil {
219 t.Fatalf("Failed to handle first subscription event: %v", err)
220 }
221
222 // Process again (Jetstream replay scenario)
223 err = consumer.HandleEvent(ctx, event)
224 if err != nil {
225 t.Errorf("Idempotency failed: second event should not error, got: %v", err)
226 }
227
228 // Verify only one subscription exists
229 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
230 if err != nil {
231 t.Fatalf("Failed to get subscription: %v", err)
232 }
233
234 if subscription.ContentVisibility != 4 {
235 t.Errorf("Expected contentVisibility=4, got %d", subscription.ContentVisibility)
236 }
237
238 t.Logf("✓ Duplicate events handled idempotently")
239 })
240}
241
242// TestSubscriptionIndexing_DeleteOperations tests unsubscribe (DELETE) event handling
243func TestSubscriptionIndexing_DeleteOperations(t *testing.T) {
244 if testing.Short() {
245 t.Skip("Skipping integration test in short mode")
246 }
247
248 ctx := context.Background()
249 db := setupTestDB(t)
250 defer cleanupTestDB(t, db)
251
252 repo := createTestCommunityRepo(t, db)
253 // Skip verification in tests
254 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs
255 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil)
256
257 // Create test community (with unique DID)
258 testDID := fmt.Sprintf("did:plc:test-unsub-%d", time.Now().UnixNano())
259 community := createTestCommunity(t, repo, "test-unsubscribe", testDID)
260
261 t.Run("deletes subscription when DELETE event received", func(t *testing.T) {
262 userDID := "did:plc:test-user-delete"
263 rkey := "test-sub-delete"
264
265 // First, create a subscription
266 createEvent := &jetstream.JetstreamEvent{
267 Did: userDID,
268 Kind: "commit",
269 TimeUS: time.Now().UnixMicro(),
270 Commit: &jetstream.CommitEvent{
271 Rev: "test-rev-create",
272 Operation: "create",
273 Collection: "social.coves.community.subscription",
274 RKey: rkey,
275 CID: "bafycreate",
276 Record: map[string]interface{}{
277 "$type": "social.coves.community.subscription",
278 "subject": community.DID,
279 "createdAt": time.Now().Format(time.RFC3339),
280 "contentVisibility": float64(3),
281 },
282 },
283 }
284
285 err := consumer.HandleEvent(ctx, createEvent)
286 if err != nil {
287 t.Fatalf("Failed to create subscription: %v", err)
288 }
289
290 // Verify subscription exists
291 _, err = repo.GetSubscription(ctx, userDID, community.DID)
292 if err != nil {
293 t.Fatalf("Subscription should exist: %v", err)
294 }
295
296 // Now send DELETE event (unsubscribe)
297 // IMPORTANT: DELETE operations don't include record data in Jetstream
298 deleteEvent := &jetstream.JetstreamEvent{
299 Did: userDID,
300 Kind: "commit",
301 TimeUS: time.Now().UnixMicro(),
302 Commit: &jetstream.CommitEvent{
303 Rev: "test-rev-delete",
304 Operation: "delete",
305 Collection: "social.coves.community.subscription",
306 RKey: rkey,
307 CID: "", // No CID on deletes
308 Record: nil, // No record data on deletes
309 },
310 }
311
312 err = consumer.HandleEvent(ctx, deleteEvent)
313 if err != nil {
314 t.Fatalf("Failed to handle delete event: %v", err)
315 }
316
317 // Verify subscription was deleted
318 _, err = repo.GetSubscription(ctx, userDID, community.DID)
319 if err == nil {
320 t.Errorf("Subscription should have been deleted")
321 }
322 if !communities.IsNotFound(err) {
323 t.Errorf("Expected NotFound error, got: %v", err)
324 }
325
326 t.Logf("✓ Subscription deleted successfully")
327 })
328
329 t.Run("idempotent delete: deleting non-existent subscription doesn't fail", func(t *testing.T) {
330 userDID := "did:plc:test-user-noexist"
331 rkey := "test-sub-noexist"
332
333 // Try to delete a subscription that doesn't exist
334 deleteEvent := &jetstream.JetstreamEvent{
335 Did: userDID,
336 Kind: "commit",
337 TimeUS: time.Now().UnixMicro(),
338 Commit: &jetstream.CommitEvent{
339 Rev: "test-rev-noexist",
340 Operation: "delete",
341 Collection: "social.coves.community.subscription",
342 RKey: rkey,
343 CID: "",
344 Record: nil,
345 },
346 }
347
348 // Should not error (idempotent)
349 err := consumer.HandleEvent(ctx, deleteEvent)
350 if err != nil {
351 t.Errorf("Deleting non-existent subscription should not error, got: %v", err)
352 }
353
354 t.Logf("✓ Idempotent delete handled gracefully")
355 })
356}
357
358// TestSubscriptionIndexing_SubscriberCount tests that subscriber counts are updated atomically
359func TestSubscriptionIndexing_SubscriberCount(t *testing.T) {
360 if testing.Short() {
361 t.Skip("Skipping integration test in short mode")
362 }
363
364 ctx := context.Background()
365 db := setupTestDB(t)
366 defer cleanupTestDB(t, db)
367
368 repo := createTestCommunityRepo(t, db)
369 // Skip verification in tests
370 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs
371 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil)
372
373 // Create test community (with unique DID)
374 testDID := fmt.Sprintf("did:plc:test-subcount-%d", time.Now().UnixNano())
375 community := createTestCommunity(t, repo, "test-subscriber-count", testDID)
376
377 // Verify initial subscriber count is 0
378 comm, err := repo.GetByDID(ctx, community.DID)
379 if err != nil {
380 t.Fatalf("Failed to get community: %v", err)
381 }
382 if comm.SubscriberCount != 0 {
383 t.Errorf("Initial subscriber count should be 0, got %d", comm.SubscriberCount)
384 }
385
386 t.Run("increments subscriber count on subscribe", func(t *testing.T) {
387 userDID := "did:plc:test-user-count1"
388 rkey := "test-sub-count1"
389
390 event := &jetstream.JetstreamEvent{
391 Did: userDID,
392 Kind: "commit",
393 TimeUS: time.Now().UnixMicro(),
394 Commit: &jetstream.CommitEvent{
395 Rev: "test-rev-count",
396 Operation: "create",
397 Collection: "social.coves.community.subscription",
398 RKey: rkey,
399 CID: "bafycount",
400 Record: map[string]interface{}{
401 "$type": "social.coves.community.subscription",
402 "subject": community.DID,
403 "createdAt": time.Now().Format(time.RFC3339),
404 "contentVisibility": float64(3),
405 },
406 },
407 }
408
409 err := consumer.HandleEvent(ctx, event)
410 if err != nil {
411 t.Fatalf("Failed to handle subscription: %v", err)
412 }
413
414 // Check subscriber count incremented
415 comm, err := repo.GetByDID(ctx, community.DID)
416 if err != nil {
417 t.Fatalf("Failed to get community: %v", err)
418 }
419
420 if comm.SubscriberCount != 1 {
421 t.Errorf("Subscriber count should be 1, got %d", comm.SubscriberCount)
422 }
423
424 t.Logf("✓ Subscriber count incremented to 1")
425 })
426
427 t.Run("decrements subscriber count on unsubscribe", func(t *testing.T) {
428 userDID := "did:plc:test-user-count1" // Same user from above
429 rkey := "test-sub-count1"
430
431 // Send DELETE event
432 deleteEvent := &jetstream.JetstreamEvent{
433 Did: userDID,
434 Kind: "commit",
435 TimeUS: time.Now().UnixMicro(),
436 Commit: &jetstream.CommitEvent{
437 Rev: "test-rev-unsub",
438 Operation: "delete",
439 Collection: "social.coves.community.subscription",
440 RKey: rkey,
441 CID: "",
442 Record: nil,
443 },
444 }
445
446 err := consumer.HandleEvent(ctx, deleteEvent)
447 if err != nil {
448 t.Fatalf("Failed to handle unsubscribe: %v", err)
449 }
450
451 // Check subscriber count decremented back to 0
452 comm, err := repo.GetByDID(ctx, community.DID)
453 if err != nil {
454 t.Fatalf("Failed to get community: %v", err)
455 }
456
457 if comm.SubscriberCount != 0 {
458 t.Errorf("Subscriber count should be 0, got %d", comm.SubscriberCount)
459 }
460
461 t.Logf("✓ Subscriber count decremented to 0")
462 })
463}
464
465// Helper functions
466
467func createTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community {
468 t.Helper()
469
470 // Add timestamp to make handles unique across test runs
471 uniqueHandle := fmt.Sprintf("%s-%d.test.coves.social", name, time.Now().UnixNano())
472
473 community := &communities.Community{
474 DID: did,
475 Handle: uniqueHandle,
476 Name: name,
477 DisplayName: "Test Community " + name,
478 Description: "Test community for subscription indexing",
479 OwnerDID: did,
480 CreatedByDID: "did:plc:test-creator",
481 HostedByDID: "did:plc:test-instance",
482 Visibility: "public",
483 CreatedAt: time.Now(),
484 UpdatedAt: time.Now(),
485 }
486
487 created, err := repo.Create(context.Background(), community)
488 if err != nil {
489 t.Fatalf("Failed to create test community: %v", err)
490 }
491
492 return created
493}
494
495func createTestCommunityRepo(t *testing.T, db interface{}) communities.Repository {
496 t.Helper()
497 // Import the postgres package to create a repo
498 return postgresRepo.NewCommunityRepository(db.(*sql.DB))
499}
500
501func cleanupTestDB(t *testing.T, db interface{}) {
502 t.Helper()
503 sqlDB := db.(*sql.DB)
504 if err := sqlDB.Close(); err != nil {
505 t.Logf("Failed to close database: %v", err)
506 }
507}