A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/jetstream"
5 "Coves/internal/core/communities"
6 postgresRepo "Coves/internal/db/postgres"
7 "context"
8 "database/sql"
9 "fmt"
10 "testing"
11 "time"
12)
13
14// TestSubscriptionIndexing_ContentVisibility tests that contentVisibility is properly indexed
15// from Jetstream events and stored in the AppView database
16func TestSubscriptionIndexing_ContentVisibility(t *testing.T) {
17 if testing.Short() {
18 t.Skip("Skipping integration test in short mode")
19 }
20
21 ctx := context.Background()
22 db := setupTestDB(t)
23 defer cleanupTestDB(t, db)
24
25 repo := createTestCommunityRepo(t, db)
26 consumer := jetstream.NewCommunityEventConsumer(repo)
27
28 // Create a test community first (with unique DID)
29 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
30 community := createTestCommunity(t, repo, "test-community-visibility", testDID)
31
32 t.Run("indexes subscription with contentVisibility=5", func(t *testing.T) {
33 userDID := "did:plc:test-user-123"
34 rkey := "test-sub-1"
35 uri := "at://" + userDID + "/social.coves.community.subscription/" + rkey
36
37 // Simulate Jetstream CREATE event for subscription
38 event := &jetstream.JetstreamEvent{
39 Did: userDID,
40 Kind: "commit",
41 TimeUS: time.Now().UnixMicro(),
42 Commit: &jetstream.CommitEvent{
43 Rev: "test-rev-1",
44 Operation: "create",
45 Collection: "social.coves.community.subscription", // CORRECT collection name
46 RKey: rkey,
47 CID: "bafytest123",
48 Record: map[string]interface{}{
49 "$type": "social.coves.community.subscription",
50 "subject": community.DID,
51 "createdAt": time.Now().Format(time.RFC3339),
52 "contentVisibility": float64(5), // JSON numbers decode as float64
53 },
54 },
55 }
56
57 // Process event through consumer
58 err := consumer.HandleEvent(ctx, event)
59 if err != nil {
60 t.Fatalf("Failed to handle subscription event: %v", err)
61 }
62
63 // Verify subscription was indexed with correct contentVisibility
64 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
65 if err != nil {
66 t.Fatalf("Failed to get subscription: %v", err)
67 }
68
69 if subscription.ContentVisibility != 5 {
70 t.Errorf("Expected contentVisibility=5, got %d", subscription.ContentVisibility)
71 }
72
73 if subscription.UserDID != userDID {
74 t.Errorf("Expected userDID=%s, got %s", userDID, subscription.UserDID)
75 }
76
77 if subscription.CommunityDID != community.DID {
78 t.Errorf("Expected communityDID=%s, got %s", community.DID, subscription.CommunityDID)
79 }
80
81 if subscription.RecordURI != uri {
82 t.Errorf("Expected recordURI=%s, got %s", uri, subscription.RecordURI)
83 }
84
85 t.Logf("✓ Subscription indexed with contentVisibility=5")
86 })
87
88 t.Run("defaults to contentVisibility=3 when not provided", func(t *testing.T) {
89 userDID := "did:plc:test-user-default"
90 rkey := "test-sub-default"
91
92 // Simulate Jetstream CREATE event WITHOUT contentVisibility field
93 event := &jetstream.JetstreamEvent{
94 Did: userDID,
95 Kind: "commit",
96 TimeUS: time.Now().UnixMicro(),
97 Commit: &jetstream.CommitEvent{
98 Rev: "test-rev-default",
99 Operation: "create",
100 Collection: "social.coves.community.subscription",
101 RKey: rkey,
102 CID: "bafydefault",
103 Record: map[string]interface{}{
104 "$type": "social.coves.community.subscription",
105 "subject": community.DID,
106 "createdAt": time.Now().Format(time.RFC3339),
107 // contentVisibility NOT provided
108 },
109 },
110 }
111
112 // Process event
113 err := consumer.HandleEvent(ctx, event)
114 if err != nil {
115 t.Fatalf("Failed to handle subscription event: %v", err)
116 }
117
118 // Verify defaults to 3
119 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
120 if err != nil {
121 t.Fatalf("Failed to get subscription: %v", err)
122 }
123
124 if subscription.ContentVisibility != 3 {
125 t.Errorf("Expected contentVisibility=3 (default), got %d", subscription.ContentVisibility)
126 }
127
128 t.Logf("✓ Subscription defaulted to contentVisibility=3")
129 })
130
131 t.Run("clamps contentVisibility to valid range (1-5)", func(t *testing.T) {
132 testCases := []struct {
133 input float64
134 expected int
135 name string
136 }{
137 {input: 0, expected: 1, name: "zero clamped to 1"},
138 {input: -5, expected: 1, name: "negative clamped to 1"},
139 {input: 10, expected: 5, name: "10 clamped to 5"},
140 {input: 100, expected: 5, name: "100 clamped to 5"},
141 {input: 1, expected: 1, name: "1 stays 1"},
142 {input: 3, expected: 3, name: "3 stays 3"},
143 {input: 5, expected: 5, name: "5 stays 5"},
144 }
145
146 for i, tc := range testCases {
147 t.Run(tc.name, func(t *testing.T) {
148 userDID := fmt.Sprintf("did:plc:test-clamp-%d", i)
149 rkey := fmt.Sprintf("test-sub-clamp-%d", i)
150
151 event := &jetstream.JetstreamEvent{
152 Did: userDID,
153 Kind: "commit",
154 TimeUS: time.Now().UnixMicro(),
155 Commit: &jetstream.CommitEvent{
156 Rev: "test-rev-clamp",
157 Operation: "create",
158 Collection: "social.coves.community.subscription",
159 RKey: rkey,
160 CID: "bafyclamp",
161 Record: map[string]interface{}{
162 "$type": "social.coves.community.subscription",
163 "subject": community.DID,
164 "createdAt": time.Now().Format(time.RFC3339),
165 "contentVisibility": tc.input,
166 },
167 },
168 }
169
170 err := consumer.HandleEvent(ctx, event)
171 if err != nil {
172 t.Fatalf("Failed to handle subscription event: %v", err)
173 }
174
175 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
176 if err != nil {
177 t.Fatalf("Failed to get subscription: %v", err)
178 }
179
180 if subscription.ContentVisibility != tc.expected {
181 t.Errorf("Input %.0f: expected %d, got %d", tc.input, tc.expected, subscription.ContentVisibility)
182 }
183
184 t.Logf("✓ Input %.0f clamped to %d", tc.input, subscription.ContentVisibility)
185 })
186 }
187 })
188
189 t.Run("idempotency: duplicate subscription events don't fail", func(t *testing.T) {
190 userDID := "did:plc:test-idempotent"
191 rkey := "test-sub-idempotent"
192
193 event := &jetstream.JetstreamEvent{
194 Did: userDID,
195 Kind: "commit",
196 TimeUS: time.Now().UnixMicro(),
197 Commit: &jetstream.CommitEvent{
198 Rev: "test-rev-idempotent",
199 Operation: "create",
200 Collection: "social.coves.community.subscription",
201 RKey: rkey,
202 CID: "bafyidempotent",
203 Record: map[string]interface{}{
204 "$type": "social.coves.community.subscription",
205 "subject": community.DID,
206 "createdAt": time.Now().Format(time.RFC3339),
207 "contentVisibility": float64(4),
208 },
209 },
210 }
211
212 // Process first time
213 err := consumer.HandleEvent(ctx, event)
214 if err != nil {
215 t.Fatalf("Failed to handle first subscription event: %v", err)
216 }
217
218 // Process again (Jetstream replay scenario)
219 err = consumer.HandleEvent(ctx, event)
220 if err != nil {
221 t.Errorf("Idempotency failed: second event should not error, got: %v", err)
222 }
223
224 // Verify only one subscription exists
225 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
226 if err != nil {
227 t.Fatalf("Failed to get subscription: %v", err)
228 }
229
230 if subscription.ContentVisibility != 4 {
231 t.Errorf("Expected contentVisibility=4, got %d", subscription.ContentVisibility)
232 }
233
234 t.Logf("✓ Duplicate events handled idempotently")
235 })
236}
237
238// TestSubscriptionIndexing_DeleteOperations tests unsubscribe (DELETE) event handling
239func TestSubscriptionIndexing_DeleteOperations(t *testing.T) {
240 if testing.Short() {
241 t.Skip("Skipping integration test in short mode")
242 }
243
244 ctx := context.Background()
245 db := setupTestDB(t)
246 defer cleanupTestDB(t, db)
247
248 repo := createTestCommunityRepo(t, db)
249 consumer := jetstream.NewCommunityEventConsumer(repo)
250
251 // Create test community (with unique DID)
252 testDID := fmt.Sprintf("did:plc:test-unsub-%d", time.Now().UnixNano())
253 community := createTestCommunity(t, repo, "test-unsubscribe", testDID)
254
255 t.Run("deletes subscription when DELETE event received", func(t *testing.T) {
256 userDID := "did:plc:test-user-delete"
257 rkey := "test-sub-delete"
258
259 // First, create a subscription
260 createEvent := &jetstream.JetstreamEvent{
261 Did: userDID,
262 Kind: "commit",
263 TimeUS: time.Now().UnixMicro(),
264 Commit: &jetstream.CommitEvent{
265 Rev: "test-rev-create",
266 Operation: "create",
267 Collection: "social.coves.community.subscription",
268 RKey: rkey,
269 CID: "bafycreate",
270 Record: map[string]interface{}{
271 "$type": "social.coves.community.subscription",
272 "subject": community.DID,
273 "createdAt": time.Now().Format(time.RFC3339),
274 "contentVisibility": float64(3),
275 },
276 },
277 }
278
279 err := consumer.HandleEvent(ctx, createEvent)
280 if err != nil {
281 t.Fatalf("Failed to create subscription: %v", err)
282 }
283
284 // Verify subscription exists
285 _, err = repo.GetSubscription(ctx, userDID, community.DID)
286 if err != nil {
287 t.Fatalf("Subscription should exist: %v", err)
288 }
289
290 // Now send DELETE event (unsubscribe)
291 // IMPORTANT: DELETE operations don't include record data in Jetstream
292 deleteEvent := &jetstream.JetstreamEvent{
293 Did: userDID,
294 Kind: "commit",
295 TimeUS: time.Now().UnixMicro(),
296 Commit: &jetstream.CommitEvent{
297 Rev: "test-rev-delete",
298 Operation: "delete",
299 Collection: "social.coves.community.subscription",
300 RKey: rkey,
301 CID: "", // No CID on deletes
302 Record: nil, // No record data on deletes
303 },
304 }
305
306 err = consumer.HandleEvent(ctx, deleteEvent)
307 if err != nil {
308 t.Fatalf("Failed to handle delete event: %v", err)
309 }
310
311 // Verify subscription was deleted
312 _, err = repo.GetSubscription(ctx, userDID, community.DID)
313 if err == nil {
314 t.Errorf("Subscription should have been deleted")
315 }
316 if !communities.IsNotFound(err) {
317 t.Errorf("Expected NotFound error, got: %v", err)
318 }
319
320 t.Logf("✓ Subscription deleted successfully")
321 })
322
323 t.Run("idempotent delete: deleting non-existent subscription doesn't fail", func(t *testing.T) {
324 userDID := "did:plc:test-user-noexist"
325 rkey := "test-sub-noexist"
326
327 // Try to delete a subscription that doesn't exist
328 deleteEvent := &jetstream.JetstreamEvent{
329 Did: userDID,
330 Kind: "commit",
331 TimeUS: time.Now().UnixMicro(),
332 Commit: &jetstream.CommitEvent{
333 Rev: "test-rev-noexist",
334 Operation: "delete",
335 Collection: "social.coves.community.subscription",
336 RKey: rkey,
337 CID: "",
338 Record: nil,
339 },
340 }
341
342 // Should not error (idempotent)
343 err := consumer.HandleEvent(ctx, deleteEvent)
344 if err != nil {
345 t.Errorf("Deleting non-existent subscription should not error, got: %v", err)
346 }
347
348 t.Logf("✓ Idempotent delete handled gracefully")
349 })
350}
351
352// TestSubscriptionIndexing_SubscriberCount tests that subscriber counts are updated atomically
353func TestSubscriptionIndexing_SubscriberCount(t *testing.T) {
354 if testing.Short() {
355 t.Skip("Skipping integration test in short mode")
356 }
357
358 ctx := context.Background()
359 db := setupTestDB(t)
360 defer cleanupTestDB(t, db)
361
362 repo := createTestCommunityRepo(t, db)
363 consumer := jetstream.NewCommunityEventConsumer(repo)
364
365 // Create test community (with unique DID)
366 testDID := fmt.Sprintf("did:plc:test-subcount-%d", time.Now().UnixNano())
367 community := createTestCommunity(t, repo, "test-subscriber-count", testDID)
368
369 // Verify initial subscriber count is 0
370 comm, err := repo.GetByDID(ctx, community.DID)
371 if err != nil {
372 t.Fatalf("Failed to get community: %v", err)
373 }
374 if comm.SubscriberCount != 0 {
375 t.Errorf("Initial subscriber count should be 0, got %d", comm.SubscriberCount)
376 }
377
378 t.Run("increments subscriber count on subscribe", func(t *testing.T) {
379 userDID := "did:plc:test-user-count1"
380 rkey := "test-sub-count1"
381
382 event := &jetstream.JetstreamEvent{
383 Did: userDID,
384 Kind: "commit",
385 TimeUS: time.Now().UnixMicro(),
386 Commit: &jetstream.CommitEvent{
387 Rev: "test-rev-count",
388 Operation: "create",
389 Collection: "social.coves.community.subscription",
390 RKey: rkey,
391 CID: "bafycount",
392 Record: map[string]interface{}{
393 "$type": "social.coves.community.subscription",
394 "subject": community.DID,
395 "createdAt": time.Now().Format(time.RFC3339),
396 "contentVisibility": float64(3),
397 },
398 },
399 }
400
401 err := consumer.HandleEvent(ctx, event)
402 if err != nil {
403 t.Fatalf("Failed to handle subscription: %v", err)
404 }
405
406 // Check subscriber count incremented
407 comm, err := repo.GetByDID(ctx, community.DID)
408 if err != nil {
409 t.Fatalf("Failed to get community: %v", err)
410 }
411
412 if comm.SubscriberCount != 1 {
413 t.Errorf("Subscriber count should be 1, got %d", comm.SubscriberCount)
414 }
415
416 t.Logf("✓ Subscriber count incremented to 1")
417 })
418
419 t.Run("decrements subscriber count on unsubscribe", func(t *testing.T) {
420 userDID := "did:plc:test-user-count1" // Same user from above
421 rkey := "test-sub-count1"
422
423 // Send DELETE event
424 deleteEvent := &jetstream.JetstreamEvent{
425 Did: userDID,
426 Kind: "commit",
427 TimeUS: time.Now().UnixMicro(),
428 Commit: &jetstream.CommitEvent{
429 Rev: "test-rev-unsub",
430 Operation: "delete",
431 Collection: "social.coves.community.subscription",
432 RKey: rkey,
433 CID: "",
434 Record: nil,
435 },
436 }
437
438 err := consumer.HandleEvent(ctx, deleteEvent)
439 if err != nil {
440 t.Fatalf("Failed to handle unsubscribe: %v", err)
441 }
442
443 // Check subscriber count decremented back to 0
444 comm, err := repo.GetByDID(ctx, community.DID)
445 if err != nil {
446 t.Fatalf("Failed to get community: %v", err)
447 }
448
449 if comm.SubscriberCount != 0 {
450 t.Errorf("Subscriber count should be 0, got %d", comm.SubscriberCount)
451 }
452
453 t.Logf("✓ Subscriber count decremented to 0")
454 })
455}
456
457// Helper functions
458
459func createTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community {
460 t.Helper()
461
462 // Add timestamp to make handles unique across test runs
463 uniqueHandle := fmt.Sprintf("%s-%d.test.coves.social", name, time.Now().UnixNano())
464
465 community := &communities.Community{
466 DID: did,
467 Handle: uniqueHandle,
468 Name: name,
469 DisplayName: "Test Community " + name,
470 Description: "Test community for subscription indexing",
471 OwnerDID: did,
472 CreatedByDID: "did:plc:test-creator",
473 HostedByDID: "did:plc:test-instance",
474 Visibility: "public",
475 CreatedAt: time.Now(),
476 UpdatedAt: time.Now(),
477 }
478
479 created, err := repo.Create(context.Background(), community)
480 if err != nil {
481 t.Fatalf("Failed to create test community: %v", err)
482 }
483
484 return created
485}
486
487func createTestCommunityRepo(t *testing.T, db interface{}) communities.Repository {
488 t.Helper()
489 // Import the postgres package to create a repo
490 return postgresRepo.NewCommunityRepository(db.(*sql.DB))
491}
492
493func cleanupTestDB(t *testing.T, db interface{}) {
494 t.Helper()
495 sqlDB := db.(*sql.DB)
496 if err := sqlDB.Close(); err != nil {
497 t.Logf("Failed to close database: %v", err)
498 }
499}