A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/jetstream"
5 "Coves/internal/core/communities"
6 "context"
7 "database/sql"
8 "fmt"
9 "testing"
10 "time"
11
12 postgresRepo "Coves/internal/db/postgres"
13)
14
15// TestSubscriptionIndexing_ContentVisibility tests that contentVisibility is properly indexed
16// from Jetstream events and stored in the AppView database
17func TestSubscriptionIndexing_ContentVisibility(t *testing.T) {
18 if testing.Short() {
19 t.Skip("Skipping integration test in short mode")
20 }
21
22 ctx := context.Background()
23 db := setupTestDB(t)
24 defer cleanupTestDB(t, db)
25
26 repo := createTestCommunityRepo(t, db)
27 // Skip verification in tests
28 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true)
29
30 // Create a test community first (with unique DID)
31 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
32 community := createTestCommunity(t, repo, "test-community-visibility", testDID)
33
34 t.Run("indexes subscription with contentVisibility=5", func(t *testing.T) {
35 userDID := "did:plc:test-user-123"
36 rkey := "test-sub-1"
37 uri := "at://" + userDID + "/social.coves.community.subscription/" + rkey
38
39 // Simulate Jetstream CREATE event for subscription
40 event := &jetstream.JetstreamEvent{
41 Did: userDID,
42 Kind: "commit",
43 TimeUS: time.Now().UnixMicro(),
44 Commit: &jetstream.CommitEvent{
45 Rev: "test-rev-1",
46 Operation: "create",
47 Collection: "social.coves.community.subscription", // CORRECT collection name
48 RKey: rkey,
49 CID: "bafytest123",
50 Record: map[string]interface{}{
51 "$type": "social.coves.community.subscription",
52 "subject": community.DID,
53 "createdAt": time.Now().Format(time.RFC3339),
54 "contentVisibility": float64(5), // JSON numbers decode as float64
55 },
56 },
57 }
58
59 // Process event through consumer
60 err := consumer.HandleEvent(ctx, event)
61 if err != nil {
62 t.Fatalf("Failed to handle subscription event: %v", err)
63 }
64
65 // Verify subscription was indexed with correct contentVisibility
66 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
67 if err != nil {
68 t.Fatalf("Failed to get subscription: %v", err)
69 }
70
71 if subscription.ContentVisibility != 5 {
72 t.Errorf("Expected contentVisibility=5, got %d", subscription.ContentVisibility)
73 }
74
75 if subscription.UserDID != userDID {
76 t.Errorf("Expected userDID=%s, got %s", userDID, subscription.UserDID)
77 }
78
79 if subscription.CommunityDID != community.DID {
80 t.Errorf("Expected communityDID=%s, got %s", community.DID, subscription.CommunityDID)
81 }
82
83 if subscription.RecordURI != uri {
84 t.Errorf("Expected recordURI=%s, got %s", uri, subscription.RecordURI)
85 }
86
87 t.Logf("✓ Subscription indexed with contentVisibility=5")
88 })
89
90 t.Run("defaults to contentVisibility=3 when not provided", func(t *testing.T) {
91 userDID := "did:plc:test-user-default"
92 rkey := "test-sub-default"
93
94 // Simulate Jetstream CREATE event WITHOUT contentVisibility field
95 event := &jetstream.JetstreamEvent{
96 Did: userDID,
97 Kind: "commit",
98 TimeUS: time.Now().UnixMicro(),
99 Commit: &jetstream.CommitEvent{
100 Rev: "test-rev-default",
101 Operation: "create",
102 Collection: "social.coves.community.subscription",
103 RKey: rkey,
104 CID: "bafydefault",
105 Record: map[string]interface{}{
106 "$type": "social.coves.community.subscription",
107 "subject": community.DID,
108 "createdAt": time.Now().Format(time.RFC3339),
109 // contentVisibility NOT provided
110 },
111 },
112 }
113
114 // Process event
115 err := consumer.HandleEvent(ctx, event)
116 if err != nil {
117 t.Fatalf("Failed to handle subscription event: %v", err)
118 }
119
120 // Verify defaults to 3
121 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
122 if err != nil {
123 t.Fatalf("Failed to get subscription: %v", err)
124 }
125
126 if subscription.ContentVisibility != 3 {
127 t.Errorf("Expected contentVisibility=3 (default), got %d", subscription.ContentVisibility)
128 }
129
130 t.Logf("✓ Subscription defaulted to contentVisibility=3")
131 })
132
133 t.Run("clamps contentVisibility to valid range (1-5)", func(t *testing.T) {
134 testCases := []struct {
135 name string
136 input float64
137 expected int
138 }{
139 {input: 0, expected: 1, name: "zero clamped to 1"},
140 {input: -5, expected: 1, name: "negative clamped to 1"},
141 {input: 10, expected: 5, name: "10 clamped to 5"},
142 {input: 100, expected: 5, name: "100 clamped to 5"},
143 {input: 1, expected: 1, name: "1 stays 1"},
144 {input: 3, expected: 3, name: "3 stays 3"},
145 {input: 5, expected: 5, name: "5 stays 5"},
146 }
147
148 for i, tc := range testCases {
149 t.Run(tc.name, func(t *testing.T) {
150 userDID := fmt.Sprintf("did:plc:test-clamp-%d", i)
151 rkey := fmt.Sprintf("test-sub-clamp-%d", i)
152
153 event := &jetstream.JetstreamEvent{
154 Did: userDID,
155 Kind: "commit",
156 TimeUS: time.Now().UnixMicro(),
157 Commit: &jetstream.CommitEvent{
158 Rev: "test-rev-clamp",
159 Operation: "create",
160 Collection: "social.coves.community.subscription",
161 RKey: rkey,
162 CID: "bafyclamp",
163 Record: map[string]interface{}{
164 "$type": "social.coves.community.subscription",
165 "subject": community.DID,
166 "createdAt": time.Now().Format(time.RFC3339),
167 "contentVisibility": tc.input,
168 },
169 },
170 }
171
172 err := consumer.HandleEvent(ctx, event)
173 if err != nil {
174 t.Fatalf("Failed to handle subscription event: %v", err)
175 }
176
177 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
178 if err != nil {
179 t.Fatalf("Failed to get subscription: %v", err)
180 }
181
182 if subscription.ContentVisibility != tc.expected {
183 t.Errorf("Input %.0f: expected %d, got %d", tc.input, tc.expected, subscription.ContentVisibility)
184 }
185
186 t.Logf("✓ Input %.0f clamped to %d", tc.input, subscription.ContentVisibility)
187 })
188 }
189 })
190
191 t.Run("idempotency: duplicate subscription events don't fail", func(t *testing.T) {
192 userDID := "did:plc:test-idempotent"
193 rkey := "test-sub-idempotent"
194
195 event := &jetstream.JetstreamEvent{
196 Did: userDID,
197 Kind: "commit",
198 TimeUS: time.Now().UnixMicro(),
199 Commit: &jetstream.CommitEvent{
200 Rev: "test-rev-idempotent",
201 Operation: "create",
202 Collection: "social.coves.community.subscription",
203 RKey: rkey,
204 CID: "bafyidempotent",
205 Record: map[string]interface{}{
206 "$type": "social.coves.community.subscription",
207 "subject": community.DID,
208 "createdAt": time.Now().Format(time.RFC3339),
209 "contentVisibility": float64(4),
210 },
211 },
212 }
213
214 // Process first time
215 err := consumer.HandleEvent(ctx, event)
216 if err != nil {
217 t.Fatalf("Failed to handle first subscription event: %v", err)
218 }
219
220 // Process again (Jetstream replay scenario)
221 err = consumer.HandleEvent(ctx, event)
222 if err != nil {
223 t.Errorf("Idempotency failed: second event should not error, got: %v", err)
224 }
225
226 // Verify only one subscription exists
227 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
228 if err != nil {
229 t.Fatalf("Failed to get subscription: %v", err)
230 }
231
232 if subscription.ContentVisibility != 4 {
233 t.Errorf("Expected contentVisibility=4, got %d", subscription.ContentVisibility)
234 }
235
236 t.Logf("✓ Duplicate events handled idempotently")
237 })
238}
239
240// TestSubscriptionIndexing_DeleteOperations tests unsubscribe (DELETE) event handling
241func TestSubscriptionIndexing_DeleteOperations(t *testing.T) {
242 if testing.Short() {
243 t.Skip("Skipping integration test in short mode")
244 }
245
246 ctx := context.Background()
247 db := setupTestDB(t)
248 defer cleanupTestDB(t, db)
249
250 repo := createTestCommunityRepo(t, db)
251 // Skip verification in tests
252 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true)
253
254 // Create test community (with unique DID)
255 testDID := fmt.Sprintf("did:plc:test-unsub-%d", time.Now().UnixNano())
256 community := createTestCommunity(t, repo, "test-unsubscribe", testDID)
257
258 t.Run("deletes subscription when DELETE event received", func(t *testing.T) {
259 userDID := "did:plc:test-user-delete"
260 rkey := "test-sub-delete"
261
262 // First, create a subscription
263 createEvent := &jetstream.JetstreamEvent{
264 Did: userDID,
265 Kind: "commit",
266 TimeUS: time.Now().UnixMicro(),
267 Commit: &jetstream.CommitEvent{
268 Rev: "test-rev-create",
269 Operation: "create",
270 Collection: "social.coves.community.subscription",
271 RKey: rkey,
272 CID: "bafycreate",
273 Record: map[string]interface{}{
274 "$type": "social.coves.community.subscription",
275 "subject": community.DID,
276 "createdAt": time.Now().Format(time.RFC3339),
277 "contentVisibility": float64(3),
278 },
279 },
280 }
281
282 err := consumer.HandleEvent(ctx, createEvent)
283 if err != nil {
284 t.Fatalf("Failed to create subscription: %v", err)
285 }
286
287 // Verify subscription exists
288 _, err = repo.GetSubscription(ctx, userDID, community.DID)
289 if err != nil {
290 t.Fatalf("Subscription should exist: %v", err)
291 }
292
293 // Now send DELETE event (unsubscribe)
294 // IMPORTANT: DELETE operations don't include record data in Jetstream
295 deleteEvent := &jetstream.JetstreamEvent{
296 Did: userDID,
297 Kind: "commit",
298 TimeUS: time.Now().UnixMicro(),
299 Commit: &jetstream.CommitEvent{
300 Rev: "test-rev-delete",
301 Operation: "delete",
302 Collection: "social.coves.community.subscription",
303 RKey: rkey,
304 CID: "", // No CID on deletes
305 Record: nil, // No record data on deletes
306 },
307 }
308
309 err = consumer.HandleEvent(ctx, deleteEvent)
310 if err != nil {
311 t.Fatalf("Failed to handle delete event: %v", err)
312 }
313
314 // Verify subscription was deleted
315 _, err = repo.GetSubscription(ctx, userDID, community.DID)
316 if err == nil {
317 t.Errorf("Subscription should have been deleted")
318 }
319 if !communities.IsNotFound(err) {
320 t.Errorf("Expected NotFound error, got: %v", err)
321 }
322
323 t.Logf("✓ Subscription deleted successfully")
324 })
325
326 t.Run("idempotent delete: deleting non-existent subscription doesn't fail", func(t *testing.T) {
327 userDID := "did:plc:test-user-noexist"
328 rkey := "test-sub-noexist"
329
330 // Try to delete a subscription that doesn't exist
331 deleteEvent := &jetstream.JetstreamEvent{
332 Did: userDID,
333 Kind: "commit",
334 TimeUS: time.Now().UnixMicro(),
335 Commit: &jetstream.CommitEvent{
336 Rev: "test-rev-noexist",
337 Operation: "delete",
338 Collection: "social.coves.community.subscription",
339 RKey: rkey,
340 CID: "",
341 Record: nil,
342 },
343 }
344
345 // Should not error (idempotent)
346 err := consumer.HandleEvent(ctx, deleteEvent)
347 if err != nil {
348 t.Errorf("Deleting non-existent subscription should not error, got: %v", err)
349 }
350
351 t.Logf("✓ Idempotent delete handled gracefully")
352 })
353}
354
355// TestSubscriptionIndexing_SubscriberCount tests that subscriber counts are updated atomically
356func TestSubscriptionIndexing_SubscriberCount(t *testing.T) {
357 if testing.Short() {
358 t.Skip("Skipping integration test in short mode")
359 }
360
361 ctx := context.Background()
362 db := setupTestDB(t)
363 defer cleanupTestDB(t, db)
364
365 repo := createTestCommunityRepo(t, db)
366 // Skip verification in tests
367 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true)
368
369 // Create test community (with unique DID)
370 testDID := fmt.Sprintf("did:plc:test-subcount-%d", time.Now().UnixNano())
371 community := createTestCommunity(t, repo, "test-subscriber-count", testDID)
372
373 // Verify initial subscriber count is 0
374 comm, err := repo.GetByDID(ctx, community.DID)
375 if err != nil {
376 t.Fatalf("Failed to get community: %v", err)
377 }
378 if comm.SubscriberCount != 0 {
379 t.Errorf("Initial subscriber count should be 0, got %d", comm.SubscriberCount)
380 }
381
382 t.Run("increments subscriber count on subscribe", func(t *testing.T) {
383 userDID := "did:plc:test-user-count1"
384 rkey := "test-sub-count1"
385
386 event := &jetstream.JetstreamEvent{
387 Did: userDID,
388 Kind: "commit",
389 TimeUS: time.Now().UnixMicro(),
390 Commit: &jetstream.CommitEvent{
391 Rev: "test-rev-count",
392 Operation: "create",
393 Collection: "social.coves.community.subscription",
394 RKey: rkey,
395 CID: "bafycount",
396 Record: map[string]interface{}{
397 "$type": "social.coves.community.subscription",
398 "subject": community.DID,
399 "createdAt": time.Now().Format(time.RFC3339),
400 "contentVisibility": float64(3),
401 },
402 },
403 }
404
405 err := consumer.HandleEvent(ctx, event)
406 if err != nil {
407 t.Fatalf("Failed to handle subscription: %v", err)
408 }
409
410 // Check subscriber count incremented
411 comm, err := repo.GetByDID(ctx, community.DID)
412 if err != nil {
413 t.Fatalf("Failed to get community: %v", err)
414 }
415
416 if comm.SubscriberCount != 1 {
417 t.Errorf("Subscriber count should be 1, got %d", comm.SubscriberCount)
418 }
419
420 t.Logf("✓ Subscriber count incremented to 1")
421 })
422
423 t.Run("decrements subscriber count on unsubscribe", func(t *testing.T) {
424 userDID := "did:plc:test-user-count1" // Same user from above
425 rkey := "test-sub-count1"
426
427 // Send DELETE event
428 deleteEvent := &jetstream.JetstreamEvent{
429 Did: userDID,
430 Kind: "commit",
431 TimeUS: time.Now().UnixMicro(),
432 Commit: &jetstream.CommitEvent{
433 Rev: "test-rev-unsub",
434 Operation: "delete",
435 Collection: "social.coves.community.subscription",
436 RKey: rkey,
437 CID: "",
438 Record: nil,
439 },
440 }
441
442 err := consumer.HandleEvent(ctx, deleteEvent)
443 if err != nil {
444 t.Fatalf("Failed to handle unsubscribe: %v", err)
445 }
446
447 // Check subscriber count decremented back to 0
448 comm, err := repo.GetByDID(ctx, community.DID)
449 if err != nil {
450 t.Fatalf("Failed to get community: %v", err)
451 }
452
453 if comm.SubscriberCount != 0 {
454 t.Errorf("Subscriber count should be 0, got %d", comm.SubscriberCount)
455 }
456
457 t.Logf("✓ Subscriber count decremented to 0")
458 })
459}
460
461// Helper functions
462
463func createTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community {
464 t.Helper()
465
466 // Add timestamp to make handles unique across test runs
467 uniqueHandle := fmt.Sprintf("%s-%d.test.coves.social", name, time.Now().UnixNano())
468
469 community := &communities.Community{
470 DID: did,
471 Handle: uniqueHandle,
472 Name: name,
473 DisplayName: "Test Community " + name,
474 Description: "Test community for subscription indexing",
475 OwnerDID: did,
476 CreatedByDID: "did:plc:test-creator",
477 HostedByDID: "did:plc:test-instance",
478 Visibility: "public",
479 CreatedAt: time.Now(),
480 UpdatedAt: time.Now(),
481 }
482
483 created, err := repo.Create(context.Background(), community)
484 if err != nil {
485 t.Fatalf("Failed to create test community: %v", err)
486 }
487
488 return created
489}
490
491func createTestCommunityRepo(t *testing.T, db interface{}) communities.Repository {
492 t.Helper()
493 // Import the postgres package to create a repo
494 return postgresRepo.NewCommunityRepository(db.(*sql.DB))
495}
496
497func cleanupTestDB(t *testing.T, db interface{}) {
498 t.Helper()
499 sqlDB := db.(*sql.DB)
500 if err := sqlDB.Close(); err != nil {
501 t.Logf("Failed to close database: %v", err)
502 }
503}