A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/jetstream"
5 "Coves/internal/core/communities"
6 postgresRepo "Coves/internal/db/postgres"
7 "context"
8 "database/sql"
9 "fmt"
10 "testing"
11 "time"
12)
13
14// TestSubscriptionIndexing_ContentVisibility tests that contentVisibility is properly indexed
15// from Jetstream events and stored in the AppView database
16func TestSubscriptionIndexing_ContentVisibility(t *testing.T) {
17 if testing.Short() {
18 t.Skip("Skipping integration test in short mode")
19 }
20
21 ctx := context.Background()
22 db := setupTestDB(t)
23 defer cleanupTestDB(t, db)
24
25 repo := createTestCommunityRepo(t, db)
26 // Skip verification in tests
27 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true)
28
29 // Create a test community first (with unique DID)
30 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
31 community := createTestCommunity(t, repo, "test-community-visibility", testDID)
32
33 t.Run("indexes subscription with contentVisibility=5", func(t *testing.T) {
34 userDID := "did:plc:test-user-123"
35 rkey := "test-sub-1"
36 uri := "at://" + userDID + "/social.coves.community.subscription/" + rkey
37
38 // Simulate Jetstream CREATE event for subscription
39 event := &jetstream.JetstreamEvent{
40 Did: userDID,
41 Kind: "commit",
42 TimeUS: time.Now().UnixMicro(),
43 Commit: &jetstream.CommitEvent{
44 Rev: "test-rev-1",
45 Operation: "create",
46 Collection: "social.coves.community.subscription", // CORRECT collection name
47 RKey: rkey,
48 CID: "bafytest123",
49 Record: map[string]interface{}{
50 "$type": "social.coves.community.subscription",
51 "subject": community.DID,
52 "createdAt": time.Now().Format(time.RFC3339),
53 "contentVisibility": float64(5), // JSON numbers decode as float64
54 },
55 },
56 }
57
58 // Process event through consumer
59 err := consumer.HandleEvent(ctx, event)
60 if err != nil {
61 t.Fatalf("Failed to handle subscription event: %v", err)
62 }
63
64 // Verify subscription was indexed with correct contentVisibility
65 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
66 if err != nil {
67 t.Fatalf("Failed to get subscription: %v", err)
68 }
69
70 if subscription.ContentVisibility != 5 {
71 t.Errorf("Expected contentVisibility=5, got %d", subscription.ContentVisibility)
72 }
73
74 if subscription.UserDID != userDID {
75 t.Errorf("Expected userDID=%s, got %s", userDID, subscription.UserDID)
76 }
77
78 if subscription.CommunityDID != community.DID {
79 t.Errorf("Expected communityDID=%s, got %s", community.DID, subscription.CommunityDID)
80 }
81
82 if subscription.RecordURI != uri {
83 t.Errorf("Expected recordURI=%s, got %s", uri, subscription.RecordURI)
84 }
85
86 t.Logf("✓ Subscription indexed with contentVisibility=5")
87 })
88
89 t.Run("defaults to contentVisibility=3 when not provided", func(t *testing.T) {
90 userDID := "did:plc:test-user-default"
91 rkey := "test-sub-default"
92
93 // Simulate Jetstream CREATE event WITHOUT contentVisibility field
94 event := &jetstream.JetstreamEvent{
95 Did: userDID,
96 Kind: "commit",
97 TimeUS: time.Now().UnixMicro(),
98 Commit: &jetstream.CommitEvent{
99 Rev: "test-rev-default",
100 Operation: "create",
101 Collection: "social.coves.community.subscription",
102 RKey: rkey,
103 CID: "bafydefault",
104 Record: map[string]interface{}{
105 "$type": "social.coves.community.subscription",
106 "subject": community.DID,
107 "createdAt": time.Now().Format(time.RFC3339),
108 // contentVisibility NOT provided
109 },
110 },
111 }
112
113 // Process event
114 err := consumer.HandleEvent(ctx, event)
115 if err != nil {
116 t.Fatalf("Failed to handle subscription event: %v", err)
117 }
118
119 // Verify defaults to 3
120 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
121 if err != nil {
122 t.Fatalf("Failed to get subscription: %v", err)
123 }
124
125 if subscription.ContentVisibility != 3 {
126 t.Errorf("Expected contentVisibility=3 (default), got %d", subscription.ContentVisibility)
127 }
128
129 t.Logf("✓ Subscription defaulted to contentVisibility=3")
130 })
131
132 t.Run("clamps contentVisibility to valid range (1-5)", func(t *testing.T) {
133 testCases := []struct {
134 name string
135 input float64
136 expected int
137 }{
138 {input: 0, expected: 1, name: "zero clamped to 1"},
139 {input: -5, expected: 1, name: "negative clamped to 1"},
140 {input: 10, expected: 5, name: "10 clamped to 5"},
141 {input: 100, expected: 5, name: "100 clamped to 5"},
142 {input: 1, expected: 1, name: "1 stays 1"},
143 {input: 3, expected: 3, name: "3 stays 3"},
144 {input: 5, expected: 5, name: "5 stays 5"},
145 }
146
147 for i, tc := range testCases {
148 t.Run(tc.name, func(t *testing.T) {
149 userDID := fmt.Sprintf("did:plc:test-clamp-%d", i)
150 rkey := fmt.Sprintf("test-sub-clamp-%d", i)
151
152 event := &jetstream.JetstreamEvent{
153 Did: userDID,
154 Kind: "commit",
155 TimeUS: time.Now().UnixMicro(),
156 Commit: &jetstream.CommitEvent{
157 Rev: "test-rev-clamp",
158 Operation: "create",
159 Collection: "social.coves.community.subscription",
160 RKey: rkey,
161 CID: "bafyclamp",
162 Record: map[string]interface{}{
163 "$type": "social.coves.community.subscription",
164 "subject": community.DID,
165 "createdAt": time.Now().Format(time.RFC3339),
166 "contentVisibility": tc.input,
167 },
168 },
169 }
170
171 err := consumer.HandleEvent(ctx, event)
172 if err != nil {
173 t.Fatalf("Failed to handle subscription event: %v", err)
174 }
175
176 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
177 if err != nil {
178 t.Fatalf("Failed to get subscription: %v", err)
179 }
180
181 if subscription.ContentVisibility != tc.expected {
182 t.Errorf("Input %.0f: expected %d, got %d", tc.input, tc.expected, subscription.ContentVisibility)
183 }
184
185 t.Logf("✓ Input %.0f clamped to %d", tc.input, subscription.ContentVisibility)
186 })
187 }
188 })
189
190 t.Run("idempotency: duplicate subscription events don't fail", func(t *testing.T) {
191 userDID := "did:plc:test-idempotent"
192 rkey := "test-sub-idempotent"
193
194 event := &jetstream.JetstreamEvent{
195 Did: userDID,
196 Kind: "commit",
197 TimeUS: time.Now().UnixMicro(),
198 Commit: &jetstream.CommitEvent{
199 Rev: "test-rev-idempotent",
200 Operation: "create",
201 Collection: "social.coves.community.subscription",
202 RKey: rkey,
203 CID: "bafyidempotent",
204 Record: map[string]interface{}{
205 "$type": "social.coves.community.subscription",
206 "subject": community.DID,
207 "createdAt": time.Now().Format(time.RFC3339),
208 "contentVisibility": float64(4),
209 },
210 },
211 }
212
213 // Process first time
214 err := consumer.HandleEvent(ctx, event)
215 if err != nil {
216 t.Fatalf("Failed to handle first subscription event: %v", err)
217 }
218
219 // Process again (Jetstream replay scenario)
220 err = consumer.HandleEvent(ctx, event)
221 if err != nil {
222 t.Errorf("Idempotency failed: second event should not error, got: %v", err)
223 }
224
225 // Verify only one subscription exists
226 subscription, err := repo.GetSubscription(ctx, userDID, community.DID)
227 if err != nil {
228 t.Fatalf("Failed to get subscription: %v", err)
229 }
230
231 if subscription.ContentVisibility != 4 {
232 t.Errorf("Expected contentVisibility=4, got %d", subscription.ContentVisibility)
233 }
234
235 t.Logf("✓ Duplicate events handled idempotently")
236 })
237}
238
239// TestSubscriptionIndexing_DeleteOperations tests unsubscribe (DELETE) event handling
240func TestSubscriptionIndexing_DeleteOperations(t *testing.T) {
241 if testing.Short() {
242 t.Skip("Skipping integration test in short mode")
243 }
244
245 ctx := context.Background()
246 db := setupTestDB(t)
247 defer cleanupTestDB(t, db)
248
249 repo := createTestCommunityRepo(t, db)
250 // Skip verification in tests
251 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true)
252
253 // Create test community (with unique DID)
254 testDID := fmt.Sprintf("did:plc:test-unsub-%d", time.Now().UnixNano())
255 community := createTestCommunity(t, repo, "test-unsubscribe", testDID)
256
257 t.Run("deletes subscription when DELETE event received", func(t *testing.T) {
258 userDID := "did:plc:test-user-delete"
259 rkey := "test-sub-delete"
260
261 // First, create a subscription
262 createEvent := &jetstream.JetstreamEvent{
263 Did: userDID,
264 Kind: "commit",
265 TimeUS: time.Now().UnixMicro(),
266 Commit: &jetstream.CommitEvent{
267 Rev: "test-rev-create",
268 Operation: "create",
269 Collection: "social.coves.community.subscription",
270 RKey: rkey,
271 CID: "bafycreate",
272 Record: map[string]interface{}{
273 "$type": "social.coves.community.subscription",
274 "subject": community.DID,
275 "createdAt": time.Now().Format(time.RFC3339),
276 "contentVisibility": float64(3),
277 },
278 },
279 }
280
281 err := consumer.HandleEvent(ctx, createEvent)
282 if err != nil {
283 t.Fatalf("Failed to create subscription: %v", err)
284 }
285
286 // Verify subscription exists
287 _, err = repo.GetSubscription(ctx, userDID, community.DID)
288 if err != nil {
289 t.Fatalf("Subscription should exist: %v", err)
290 }
291
292 // Now send DELETE event (unsubscribe)
293 // IMPORTANT: DELETE operations don't include record data in Jetstream
294 deleteEvent := &jetstream.JetstreamEvent{
295 Did: userDID,
296 Kind: "commit",
297 TimeUS: time.Now().UnixMicro(),
298 Commit: &jetstream.CommitEvent{
299 Rev: "test-rev-delete",
300 Operation: "delete",
301 Collection: "social.coves.community.subscription",
302 RKey: rkey,
303 CID: "", // No CID on deletes
304 Record: nil, // No record data on deletes
305 },
306 }
307
308 err = consumer.HandleEvent(ctx, deleteEvent)
309 if err != nil {
310 t.Fatalf("Failed to handle delete event: %v", err)
311 }
312
313 // Verify subscription was deleted
314 _, err = repo.GetSubscription(ctx, userDID, community.DID)
315 if err == nil {
316 t.Errorf("Subscription should have been deleted")
317 }
318 if !communities.IsNotFound(err) {
319 t.Errorf("Expected NotFound error, got: %v", err)
320 }
321
322 t.Logf("✓ Subscription deleted successfully")
323 })
324
325 t.Run("idempotent delete: deleting non-existent subscription doesn't fail", func(t *testing.T) {
326 userDID := "did:plc:test-user-noexist"
327 rkey := "test-sub-noexist"
328
329 // Try to delete a subscription that doesn't exist
330 deleteEvent := &jetstream.JetstreamEvent{
331 Did: userDID,
332 Kind: "commit",
333 TimeUS: time.Now().UnixMicro(),
334 Commit: &jetstream.CommitEvent{
335 Rev: "test-rev-noexist",
336 Operation: "delete",
337 Collection: "social.coves.community.subscription",
338 RKey: rkey,
339 CID: "",
340 Record: nil,
341 },
342 }
343
344 // Should not error (idempotent)
345 err := consumer.HandleEvent(ctx, deleteEvent)
346 if err != nil {
347 t.Errorf("Deleting non-existent subscription should not error, got: %v", err)
348 }
349
350 t.Logf("✓ Idempotent delete handled gracefully")
351 })
352}
353
354// TestSubscriptionIndexing_SubscriberCount tests that subscriber counts are updated atomically
355func TestSubscriptionIndexing_SubscriberCount(t *testing.T) {
356 if testing.Short() {
357 t.Skip("Skipping integration test in short mode")
358 }
359
360 ctx := context.Background()
361 db := setupTestDB(t)
362 defer cleanupTestDB(t, db)
363
364 repo := createTestCommunityRepo(t, db)
365 // Skip verification in tests
366 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true)
367
368 // Create test community (with unique DID)
369 testDID := fmt.Sprintf("did:plc:test-subcount-%d", time.Now().UnixNano())
370 community := createTestCommunity(t, repo, "test-subscriber-count", testDID)
371
372 // Verify initial subscriber count is 0
373 comm, err := repo.GetByDID(ctx, community.DID)
374 if err != nil {
375 t.Fatalf("Failed to get community: %v", err)
376 }
377 if comm.SubscriberCount != 0 {
378 t.Errorf("Initial subscriber count should be 0, got %d", comm.SubscriberCount)
379 }
380
381 t.Run("increments subscriber count on subscribe", func(t *testing.T) {
382 userDID := "did:plc:test-user-count1"
383 rkey := "test-sub-count1"
384
385 event := &jetstream.JetstreamEvent{
386 Did: userDID,
387 Kind: "commit",
388 TimeUS: time.Now().UnixMicro(),
389 Commit: &jetstream.CommitEvent{
390 Rev: "test-rev-count",
391 Operation: "create",
392 Collection: "social.coves.community.subscription",
393 RKey: rkey,
394 CID: "bafycount",
395 Record: map[string]interface{}{
396 "$type": "social.coves.community.subscription",
397 "subject": community.DID,
398 "createdAt": time.Now().Format(time.RFC3339),
399 "contentVisibility": float64(3),
400 },
401 },
402 }
403
404 err := consumer.HandleEvent(ctx, event)
405 if err != nil {
406 t.Fatalf("Failed to handle subscription: %v", err)
407 }
408
409 // Check subscriber count incremented
410 comm, err := repo.GetByDID(ctx, community.DID)
411 if err != nil {
412 t.Fatalf("Failed to get community: %v", err)
413 }
414
415 if comm.SubscriberCount != 1 {
416 t.Errorf("Subscriber count should be 1, got %d", comm.SubscriberCount)
417 }
418
419 t.Logf("✓ Subscriber count incremented to 1")
420 })
421
422 t.Run("decrements subscriber count on unsubscribe", func(t *testing.T) {
423 userDID := "did:plc:test-user-count1" // Same user from above
424 rkey := "test-sub-count1"
425
426 // Send DELETE event
427 deleteEvent := &jetstream.JetstreamEvent{
428 Did: userDID,
429 Kind: "commit",
430 TimeUS: time.Now().UnixMicro(),
431 Commit: &jetstream.CommitEvent{
432 Rev: "test-rev-unsub",
433 Operation: "delete",
434 Collection: "social.coves.community.subscription",
435 RKey: rkey,
436 CID: "",
437 Record: nil,
438 },
439 }
440
441 err := consumer.HandleEvent(ctx, deleteEvent)
442 if err != nil {
443 t.Fatalf("Failed to handle unsubscribe: %v", err)
444 }
445
446 // Check subscriber count decremented back to 0
447 comm, err := repo.GetByDID(ctx, community.DID)
448 if err != nil {
449 t.Fatalf("Failed to get community: %v", err)
450 }
451
452 if comm.SubscriberCount != 0 {
453 t.Errorf("Subscriber count should be 0, got %d", comm.SubscriberCount)
454 }
455
456 t.Logf("✓ Subscriber count decremented to 0")
457 })
458}
459
460// Helper functions
461
462func createTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community {
463 t.Helper()
464
465 // Add timestamp to make handles unique across test runs
466 uniqueHandle := fmt.Sprintf("%s-%d.test.coves.social", name, time.Now().UnixNano())
467
468 community := &communities.Community{
469 DID: did,
470 Handle: uniqueHandle,
471 Name: name,
472 DisplayName: "Test Community " + name,
473 Description: "Test community for subscription indexing",
474 OwnerDID: did,
475 CreatedByDID: "did:plc:test-creator",
476 HostedByDID: "did:plc:test-instance",
477 Visibility: "public",
478 CreatedAt: time.Now(),
479 UpdatedAt: time.Now(),
480 }
481
482 created, err := repo.Create(context.Background(), community)
483 if err != nil {
484 t.Fatalf("Failed to create test community: %v", err)
485 }
486
487 return created
488}
489
490func createTestCommunityRepo(t *testing.T, db interface{}) communities.Repository {
491 t.Helper()
492 // Import the postgres package to create a repo
493 return postgresRepo.NewCommunityRepository(db.(*sql.DB))
494}
495
496func cleanupTestDB(t *testing.T, db interface{}) {
497 t.Helper()
498 sqlDB := db.(*sql.DB)
499 if err := sqlDB.Close(); err != nil {
500 t.Logf("Failed to close database: %v", err)
501 }
502}