A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/jetstream"
5 "Coves/internal/core/communities"
6 "context"
7 "database/sql"
8 "fmt"
9 "testing"
10 "time"
11
12 postgresRepo "Coves/internal/db/postgres"
13)
14
15// TestSubscriptionIndexing_ContentVisibility tests that contentVisibility is properly indexed
16// from Jetstream events and stored in the AppView database
17func TestSubscriptionIndexing_ContentVisibility(t *testing.T) {
18 if testing.Short() {
19 t.Skip("Skipping integration test in short mode")
20 }
21
22 ctx := context.Background()
23 db := setupTestDB(t)
24 defer cleanupTestDB(t, db)
25
26 repo := createTestCommunityRepo(t, db)
27 // Skip verification in tests
28 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs
29 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil)
30
31 // Create a test community first (with unique DID)
32 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
33 community := createTestCommunity(t, repo, "test-community-visibility", testDID)
34
35 t.Run("indexes subscription with contentVisibility=5", func(t *testing.T) {
36 userDID := "did:plc:test-user-123"
37 rkey := "test-sub-1"
38 uri := "at://" + userDID + "/social.coves.community.subscription/" + rkey
39
40 // Simulate Jetstream CREATE event for subscription
41 event := &jetstream.JetstreamEvent{
42 Did: userDID,
43 Kind: "commit",
44 TimeUS: time.Now().UnixMicro(),
45 Commit: &jetstream.CommitEvent{
46 Rev: "test-rev-1",
47 Operation: "create",
48 Collection: "social.coves.community.subscription", // CORRECT collection name
49 RKey: rkey,
50 CID: "bafytest123",
51 Record: map[string]interface{}{
52 "$type": "social.coves.community.subscription",
53 "subject": community.DID,
54 "createdAt": time.Now().Format(time.RFC3339),
55 "contentVisibility": float64(5), // JSON numbers decode as float64
56 },
57 },
58 }
59
60 // Process event through consumer
61 err := consumer.HandleEvent(ctx, event)
62 if err != nil {
63 t.Fatalf("Failed to handle subscription event: %v", err)
64 }
65
66 // Verify subscription was indexed with correct contentVisibility
67 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
68 if err != nil {
69 t.Fatalf("Failed to get subscription: %v", err)
70 }
71
72 if subscription.ContentVisibility != 5 {
73 t.Errorf("Expected contentVisibility=5, got %d", subscription.ContentVisibility)
74 }
75
76 if subscription.UserDID != userDID {
77 t.Errorf("Expected userDID=%s, got %s", userDID, subscription.UserDID)
78 }
79
80 if subscription.CommunityDID != community.DID {
81 t.Errorf("Expected communityDID=%s, got %s", community.DID, subscription.CommunityDID)
82 }
83
84 if subscription.RecordURI != uri {
85 t.Errorf("Expected recordURI=%s, got %s", uri, subscription.RecordURI)
86 }
87
88 t.Logf("✓ Subscription indexed with contentVisibility=5")
89 })
90
91 t.Run("defaults to contentVisibility=3 when not provided", func(t *testing.T) {
92 userDID := "did:plc:test-user-default"
93 rkey := "test-sub-default"
94
95 // Simulate Jetstream CREATE event WITHOUT contentVisibility field
96 event := &jetstream.JetstreamEvent{
97 Did: userDID,
98 Kind: "commit",
99 TimeUS: time.Now().UnixMicro(),
100 Commit: &jetstream.CommitEvent{
101 Rev: "test-rev-default",
102 Operation: "create",
103 Collection: "social.coves.community.subscription",
104 RKey: rkey,
105 CID: "bafydefault",
106 Record: map[string]interface{}{
107 "$type": "social.coves.community.subscription",
108 "subject": community.DID,
109 "createdAt": time.Now().Format(time.RFC3339),
110 // contentVisibility NOT provided
111 },
112 },
113 }
114
115 // Process event
116 err := consumer.HandleEvent(ctx, event)
117 if err != nil {
118 t.Fatalf("Failed to handle subscription event: %v", err)
119 }
120
121 // Verify defaults to 3
122 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
123 if err != nil {
124 t.Fatalf("Failed to get subscription: %v", err)
125 }
126
127 if subscription.ContentVisibility != 3 {
128 t.Errorf("Expected contentVisibility=3 (default), got %d", subscription.ContentVisibility)
129 }
130
131 t.Logf("✓ Subscription defaulted to contentVisibility=3")
132 })
133
134 t.Run("clamps contentVisibility to valid range (1-5)", func(t *testing.T) {
135 testCases := []struct {
136 name string
137 input float64
138 expected int
139 }{
140 {input: 0, expected: 1, name: "zero clamped to 1"},
141 {input: -5, expected: 1, name: "negative clamped to 1"},
142 {input: 10, expected: 5, name: "10 clamped to 5"},
143 {input: 100, expected: 5, name: "100 clamped to 5"},
144 {input: 1, expected: 1, name: "1 stays 1"},
145 {input: 3, expected: 3, name: "3 stays 3"},
146 {input: 5, expected: 5, name: "5 stays 5"},
147 }
148
149 for i, tc := range testCases {
150 t.Run(tc.name, func(t *testing.T) {
151 userDID := fmt.Sprintf("did:plc:test-clamp-%d", i)
152 rkey := fmt.Sprintf("test-sub-clamp-%d", i)
153
154 event := &jetstream.JetstreamEvent{
155 Did: userDID,
156 Kind: "commit",
157 TimeUS: time.Now().UnixMicro(),
158 Commit: &jetstream.CommitEvent{
159 Rev: "test-rev-clamp",
160 Operation: "create",
161 Collection: "social.coves.community.subscription",
162 RKey: rkey,
163 CID: "bafyclamp",
164 Record: map[string]interface{}{
165 "$type": "social.coves.community.subscription",
166 "subject": community.DID,
167 "createdAt": time.Now().Format(time.RFC3339),
168 "contentVisibility": tc.input,
169 },
170 },
171 }
172
173 err := consumer.HandleEvent(ctx, event)
174 if err != nil {
175 t.Fatalf("Failed to handle subscription event: %v", err)
176 }
177
178 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
179 if err != nil {
180 t.Fatalf("Failed to get subscription: %v", err)
181 }
182
183 if subscription.ContentVisibility != tc.expected {
184 t.Errorf("Input %.0f: expected %d, got %d", tc.input, tc.expected, subscription.ContentVisibility)
185 }
186
187 t.Logf("✓ Input %.0f clamped to %d", tc.input, subscription.ContentVisibility)
188 })
189 }
190 })
191
192 t.Run("idempotency: duplicate subscription events don't fail", func(t *testing.T) {
193 userDID := "did:plc:test-idempotent"
194 rkey := "test-sub-idempotent"
195
196 event := &jetstream.JetstreamEvent{
197 Did: userDID,
198 Kind: "commit",
199 TimeUS: time.Now().UnixMicro(),
200 Commit: &jetstream.CommitEvent{
201 Rev: "test-rev-idempotent",
202 Operation: "create",
203 Collection: "social.coves.community.subscription",
204 RKey: rkey,
205 CID: "bafyidempotent",
206 Record: map[string]interface{}{
207 "$type": "social.coves.community.subscription",
208 "subject": community.DID,
209 "createdAt": time.Now().Format(time.RFC3339),
210 "contentVisibility": float64(4),
211 },
212 },
213 }
214
215 // Process first time
216 err := consumer.HandleEvent(ctx, event)
217 if err != nil {
218 t.Fatalf("Failed to handle first subscription event: %v", err)
219 }
220
221 // Process again (Jetstream replay scenario)
222 err = consumer.HandleEvent(ctx, event)
223 if err != nil {
224 t.Errorf("Idempotency failed: second event should not error, got: %v", err)
225 }
226
227 // Verify only one subscription exists
228 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
229 if err != nil {
230 t.Fatalf("Failed to get subscription: %v", err)
231 }
232
233 if subscription.ContentVisibility != 4 {
234 t.Errorf("Expected contentVisibility=4, got %d", subscription.ContentVisibility)
235 }
236
237 t.Logf("✓ Duplicate events handled idempotently")
238 })
239}
240
241// TestSubscriptionIndexing_DeleteOperations tests unsubscribe (DELETE) event handling
242func TestSubscriptionIndexing_DeleteOperations(t *testing.T) {
243 if testing.Short() {
244 t.Skip("Skipping integration test in short mode")
245 }
246
247 ctx := context.Background()
248 db := setupTestDB(t)
249 defer cleanupTestDB(t, db)
250
251 repo := createTestCommunityRepo(t, db)
252 // Skip verification in tests
253 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs
254 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil)
255
256 // Create test community (with unique DID)
257 testDID := fmt.Sprintf("did:plc:test-unsub-%d", time.Now().UnixNano())
258 community := createTestCommunity(t, repo, "test-unsubscribe", testDID)
259
260 t.Run("deletes subscription when DELETE event received", func(t *testing.T) {
261 userDID := "did:plc:test-user-delete"
262 rkey := "test-sub-delete"
263
264 // First, create a subscription
265 createEvent := &jetstream.JetstreamEvent{
266 Did: userDID,
267 Kind: "commit",
268 TimeUS: time.Now().UnixMicro(),
269 Commit: &jetstream.CommitEvent{
270 Rev: "test-rev-create",
271 Operation: "create",
272 Collection: "social.coves.community.subscription",
273 RKey: rkey,
274 CID: "bafycreate",
275 Record: map[string]interface{}{
276 "$type": "social.coves.community.subscription",
277 "subject": community.DID,
278 "createdAt": time.Now().Format(time.RFC3339),
279 "contentVisibility": float64(3),
280 },
281 },
282 }
283
284 err := consumer.HandleEvent(ctx, createEvent)
285 if err != nil {
286 t.Fatalf("Failed to create subscription: %v", err)
287 }
288
289 // Verify subscription exists
290 _, err = repo.GetSubscription(ctx, userDID, community.DID)
291 if err != nil {
292 t.Fatalf("Subscription should exist: %v", err)
293 }
294
295 // Now send DELETE event (unsubscribe)
296 // IMPORTANT: DELETE operations don't include record data in Jetstream
297 deleteEvent := &jetstream.JetstreamEvent{
298 Did: userDID,
299 Kind: "commit",
300 TimeUS: time.Now().UnixMicro(),
301 Commit: &jetstream.CommitEvent{
302 Rev: "test-rev-delete",
303 Operation: "delete",
304 Collection: "social.coves.community.subscription",
305 RKey: rkey,
306 CID: "", // No CID on deletes
307 Record: nil, // No record data on deletes
308 },
309 }
310
311 err = consumer.HandleEvent(ctx, deleteEvent)
312 if err != nil {
313 t.Fatalf("Failed to handle delete event: %v", err)
314 }
315
316 // Verify subscription was deleted
317 _, err = repo.GetSubscription(ctx, userDID, community.DID)
318 if err == nil {
319 t.Errorf("Subscription should have been deleted")
320 }
321 if !communities.IsNotFound(err) {
322 t.Errorf("Expected NotFound error, got: %v", err)
323 }
324
325 t.Logf("✓ Subscription deleted successfully")
326 })
327
328 t.Run("idempotent delete: deleting non-existent subscription doesn't fail", func(t *testing.T) {
329 userDID := "did:plc:test-user-noexist"
330 rkey := "test-sub-noexist"
331
332 // Try to delete a subscription that doesn't exist
333 deleteEvent := &jetstream.JetstreamEvent{
334 Did: userDID,
335 Kind: "commit",
336 TimeUS: time.Now().UnixMicro(),
337 Commit: &jetstream.CommitEvent{
338 Rev: "test-rev-noexist",
339 Operation: "delete",
340 Collection: "social.coves.community.subscription",
341 RKey: rkey,
342 CID: "",
343 Record: nil,
344 },
345 }
346
347 // Should not error (idempotent)
348 err := consumer.HandleEvent(ctx, deleteEvent)
349 if err != nil {
350 t.Errorf("Deleting non-existent subscription should not error, got: %v", err)
351 }
352
353 t.Logf("✓ Idempotent delete handled gracefully")
354 })
355}
356
357// TestSubscriptionIndexing_SubscriberCount tests that subscriber counts are updated atomically
358func TestSubscriptionIndexing_SubscriberCount(t *testing.T) {
359 if testing.Short() {
360 t.Skip("Skipping integration test in short mode")
361 }
362
363 ctx := context.Background()
364 db := setupTestDB(t)
365 defer cleanupTestDB(t, db)
366
367 repo := createTestCommunityRepo(t, db)
368 // Skip verification in tests
369 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs
370 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil)
371
372 // Create test community (with unique DID)
373 testDID := fmt.Sprintf("did:plc:test-subcount-%d", time.Now().UnixNano())
374 community := createTestCommunity(t, repo, "test-subscriber-count", testDID)
375
376 // Verify initial subscriber count is 0
377 comm, err := repo.GetByDID(ctx, community.DID)
378 if err != nil {
379 t.Fatalf("Failed to get community: %v", err)
380 }
381 if comm.SubscriberCount != 0 {
382 t.Errorf("Initial subscriber count should be 0, got %d", comm.SubscriberCount)
383 }
384
385 t.Run("increments subscriber count on subscribe", func(t *testing.T) {
386 userDID := "did:plc:test-user-count1"
387 rkey := "test-sub-count1"
388
389 event := &jetstream.JetstreamEvent{
390 Did: userDID,
391 Kind: "commit",
392 TimeUS: time.Now().UnixMicro(),
393 Commit: &jetstream.CommitEvent{
394 Rev: "test-rev-count",
395 Operation: "create",
396 Collection: "social.coves.community.subscription",
397 RKey: rkey,
398 CID: "bafycount",
399 Record: map[string]interface{}{
400 "$type": "social.coves.community.subscription",
401 "subject": community.DID,
402 "createdAt": time.Now().Format(time.RFC3339),
403 "contentVisibility": float64(3),
404 },
405 },
406 }
407
408 err := consumer.HandleEvent(ctx, event)
409 if err != nil {
410 t.Fatalf("Failed to handle subscription: %v", err)
411 }
412
413 // Check subscriber count incremented
414 comm, err := repo.GetByDID(ctx, community.DID)
415 if err != nil {
416 t.Fatalf("Failed to get community: %v", err)
417 }
418
419 if comm.SubscriberCount != 1 {
420 t.Errorf("Subscriber count should be 1, got %d", comm.SubscriberCount)
421 }
422
423 t.Logf("✓ Subscriber count incremented to 1")
424 })
425
426 t.Run("decrements subscriber count on unsubscribe", func(t *testing.T) {
427 userDID := "did:plc:test-user-count1" // Same user from above
428 rkey := "test-sub-count1"
429
430 // Send DELETE event
431 deleteEvent := &jetstream.JetstreamEvent{
432 Did: userDID,
433 Kind: "commit",
434 TimeUS: time.Now().UnixMicro(),
435 Commit: &jetstream.CommitEvent{
436 Rev: "test-rev-unsub",
437 Operation: "delete",
438 Collection: "social.coves.community.subscription",
439 RKey: rkey,
440 CID: "",
441 Record: nil,
442 },
443 }
444
445 err := consumer.HandleEvent(ctx, deleteEvent)
446 if err != nil {
447 t.Fatalf("Failed to handle unsubscribe: %v", err)
448 }
449
450 // Check subscriber count decremented back to 0
451 comm, err := repo.GetByDID(ctx, community.DID)
452 if err != nil {
453 t.Fatalf("Failed to get community: %v", err)
454 }
455
456 if comm.SubscriberCount != 0 {
457 t.Errorf("Subscriber count should be 0, got %d", comm.SubscriberCount)
458 }
459
460 t.Logf("✓ Subscriber count decremented to 0")
461 })
462}
463
464// Helper functions
465
466func createTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community {
467 t.Helper()
468
469 // Add timestamp to make handles unique across test runs
470 uniqueHandle := fmt.Sprintf("%s-%d.test.coves.social", name, time.Now().UnixNano())
471
472 community := &communities.Community{
473 DID: did,
474 Handle: uniqueHandle,
475 Name: name,
476 DisplayName: "Test Community " + name,
477 Description: "Test community for subscription indexing",
478 OwnerDID: did,
479 CreatedByDID: "did:plc:test-creator",
480 HostedByDID: "did:plc:test-instance",
481 Visibility: "public",
482 CreatedAt: time.Now(),
483 UpdatedAt: time.Now(),
484 }
485
486 created, err := repo.Create(context.Background(), community)
487 if err != nil {
488 t.Fatalf("Failed to create test community: %v", err)
489 }
490
491 return created
492}
493
494func createTestCommunityRepo(t *testing.T, db interface{}) communities.Repository {
495 t.Helper()
496 // Import the postgres package to create a repo
497 return postgresRepo.NewCommunityRepository(db.(*sql.DB))
498}
499
500func cleanupTestDB(t *testing.T, db interface{}) {
501 t.Helper()
502 sqlDB := db.(*sql.DB)
503 if err := sqlDB.Close(); err != nil {
504 t.Logf("Failed to close database: %v", err)
505 }
506}