A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/jetstream"
5 "Coves/internal/core/communities"
6 "context"
7 "database/sql"
8 "fmt"
9 "testing"
10 "time"
11
12 postgresRepo "Coves/internal/db/postgres"
13)
14
15// TestCommunityBlocking_Indexing tests Jetstream indexing of block events
16func TestCommunityBlocking_Indexing(t *testing.T) {
17 if testing.Short() {
18 t.Skip("Skipping integration test in short mode")
19 }
20
21 ctx := context.Background()
22 db := setupTestDB(t)
23 defer cleanupBlockingTestDB(t, db)
24
25 repo := createBlockingTestCommunityRepo(t, db)
26 // Skip verification in tests
27 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs
28 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil)
29
30 // Create test community
31 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
32 community := createBlockingTestCommunity(t, repo, "test-community-blocking", testDID)
33
34 t.Run("indexes block CREATE event", func(t *testing.T) {
35 userDID := "did:plc:test-user-blocker"
36 rkey := "test-block-1"
37
38 // Simulate Jetstream CREATE event
39 event := &jetstream.JetstreamEvent{
40 Did: userDID,
41 Kind: "commit",
42 TimeUS: time.Now().UnixMicro(),
43 Commit: &jetstream.CommitEvent{
44 Rev: "test-rev-1",
45 Operation: "create",
46 Collection: "social.coves.community.block",
47 RKey: rkey,
48 CID: "bafyblock123",
49 Record: map[string]interface{}{
50 "$type": "social.coves.community.block",
51 "subject": community.DID,
52 "createdAt": time.Now().Format(time.RFC3339),
53 },
54 },
55 }
56
57 // Process event
58 err := consumer.HandleEvent(ctx, event)
59 if err != nil {
60 t.Fatalf("Failed to handle block event: %v", err)
61 }
62
63 // Verify block indexed
64 block, err := repo.GetBlock(ctx, userDID, community.DID)
65 if err != nil {
66 t.Fatalf("Failed to get block: %v", err)
67 }
68
69 if block.UserDID != userDID {
70 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID)
71 }
72 if block.CommunityDID != community.DID {
73 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID)
74 }
75
76 // Verify IsBlocked works
77 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
78 if err != nil {
79 t.Fatalf("IsBlocked failed: %v", err)
80 }
81 if !isBlocked {
82 t.Error("Expected IsBlocked=true, got false")
83 }
84 })
85
86 t.Run("indexes block DELETE event", func(t *testing.T) {
87 userDID := "did:plc:test-user-unblocker"
88 rkey := "test-block-2"
89 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, rkey)
90
91 // First create a block
92 block := &communities.CommunityBlock{
93 UserDID: userDID,
94 CommunityDID: community.DID,
95 BlockedAt: time.Now(),
96 RecordURI: uri,
97 RecordCID: "bafyblock456",
98 }
99 _, err := repo.BlockCommunity(ctx, block)
100 if err != nil {
101 t.Fatalf("Failed to create block: %v", err)
102 }
103
104 // Simulate DELETE event
105 event := &jetstream.JetstreamEvent{
106 Did: userDID,
107 Kind: "commit",
108 TimeUS: time.Now().UnixMicro(),
109 Commit: &jetstream.CommitEvent{
110 Rev: "test-rev-2",
111 Operation: "delete",
112 Collection: "social.coves.community.block",
113 RKey: rkey,
114 },
115 }
116
117 // Process delete
118 err = consumer.HandleEvent(ctx, event)
119 if err != nil {
120 t.Fatalf("Failed to handle delete event: %v", err)
121 }
122
123 // Verify block removed
124 _, err = repo.GetBlock(ctx, userDID, community.DID)
125 if !communities.IsNotFound(err) {
126 t.Error("Expected block to be deleted")
127 }
128
129 // Verify IsBlocked returns false
130 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
131 if err != nil {
132 t.Fatalf("IsBlocked failed: %v", err)
133 }
134 if isBlocked {
135 t.Error("Expected IsBlocked=false, got true")
136 }
137 })
138
139 t.Run("block is idempotent", func(t *testing.T) {
140 userDID := "did:plc:test-user-idempotent"
141 rkey := "test-block-3"
142
143 event := &jetstream.JetstreamEvent{
144 Did: userDID,
145 Kind: "commit",
146 TimeUS: time.Now().UnixMicro(),
147 Commit: &jetstream.CommitEvent{
148 Rev: "test-rev-3",
149 Operation: "create",
150 Collection: "social.coves.community.block",
151 RKey: rkey,
152 CID: "bafyblock789",
153 Record: map[string]interface{}{
154 "$type": "social.coves.community.block",
155 "subject": community.DID,
156 "createdAt": time.Now().Format(time.RFC3339),
157 },
158 },
159 }
160
161 // Process event twice
162 err := consumer.HandleEvent(ctx, event)
163 if err != nil {
164 t.Fatalf("First block failed: %v", err)
165 }
166
167 err = consumer.HandleEvent(ctx, event)
168 if err != nil {
169 t.Fatalf("Second block (idempotent) failed: %v", err)
170 }
171
172 // Should still exist only once
173 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0)
174 if err != nil {
175 t.Fatalf("ListBlockedCommunities failed: %v", err)
176 }
177 if len(blocks) != 1 {
178 t.Errorf("Expected 1 block, got %d", len(blocks))
179 }
180 })
181
182 t.Run("handles DELETE of non-existent block gracefully", func(t *testing.T) {
183 userDID := "did:plc:test-user-nonexistent"
184 rkey := "test-block-nonexistent"
185
186 // Simulate DELETE event for block that doesn't exist
187 event := &jetstream.JetstreamEvent{
188 Did: userDID,
189 Kind: "commit",
190 TimeUS: time.Now().UnixMicro(),
191 Commit: &jetstream.CommitEvent{
192 Rev: "test-rev-99",
193 Operation: "delete",
194 Collection: "social.coves.community.block",
195 RKey: rkey,
196 },
197 }
198
199 // Should not error (idempotent)
200 err := consumer.HandleEvent(ctx, event)
201 if err != nil {
202 t.Errorf("DELETE of non-existent block should be idempotent, got error: %v", err)
203 }
204 })
205}
206
207// TestCommunityBlocking_ListBlocked tests listing blocked communities
208func TestCommunityBlocking_ListBlocked(t *testing.T) {
209 if testing.Short() {
210 t.Skip("Skipping integration test in short mode")
211 }
212
213 ctx := context.Background()
214 db := setupTestDB(t)
215 defer cleanupBlockingTestDB(t, db)
216
217 repo := createBlockingTestCommunityRepo(t, db)
218 userDID := "did:plc:test-user-list"
219
220 // Create and block 3 communities
221 testCommunities := make([]*communities.Community, 3)
222 for i := 0; i < 3; i++ {
223 communityDID := fmt.Sprintf("did:plc:test-community-list-%d", i)
224 testCommunities[i] = createBlockingTestCommunity(t, repo, fmt.Sprintf("community-list-%d", i), communityDID)
225
226 block := &communities.CommunityBlock{
227 UserDID: userDID,
228 CommunityDID: testCommunities[i].DID,
229 BlockedAt: time.Now(),
230 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/%d", userDID, i),
231 RecordCID: fmt.Sprintf("bafyblock%d", i),
232 }
233 _, err := repo.BlockCommunity(ctx, block)
234 if err != nil {
235 t.Fatalf("Failed to block community %d: %v", i, err)
236 }
237 }
238
239 t.Run("lists all blocked communities", func(t *testing.T) {
240 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0)
241 if err != nil {
242 t.Fatalf("ListBlockedCommunities failed: %v", err)
243 }
244
245 if len(blocks) != 3 {
246 t.Errorf("Expected 3 blocks, got %d", len(blocks))
247 }
248
249 // Verify all blocks belong to correct user
250 for _, block := range blocks {
251 if block.UserDID != userDID {
252 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID)
253 }
254 }
255 })
256
257 t.Run("pagination works correctly", func(t *testing.T) {
258 // Get first 2
259 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 2, 0)
260 if err != nil {
261 t.Fatalf("ListBlockedCommunities with limit failed: %v", err)
262 }
263 if len(blocks) != 2 {
264 t.Errorf("Expected 2 blocks (paginated), got %d", len(blocks))
265 }
266
267 // Get next 2 (should only get 1)
268 blocksPage2, err := repo.ListBlockedCommunities(ctx, userDID, 2, 2)
269 if err != nil {
270 t.Fatalf("ListBlockedCommunities page 2 failed: %v", err)
271 }
272 if len(blocksPage2) != 1 {
273 t.Errorf("Expected 1 block on page 2, got %d", len(blocksPage2))
274 }
275 })
276
277 t.Run("returns empty list for user with no blocks", func(t *testing.T) {
278 blocks, err := repo.ListBlockedCommunities(ctx, "did:plc:user-no-blocks", 10, 0)
279 if err != nil {
280 t.Fatalf("ListBlockedCommunities failed: %v", err)
281 }
282 if len(blocks) != 0 {
283 t.Errorf("Expected 0 blocks, got %d", len(blocks))
284 }
285 })
286}
287
288// TestCommunityBlocking_IsBlocked tests the fast block check
289func TestCommunityBlocking_IsBlocked(t *testing.T) {
290 if testing.Short() {
291 t.Skip("Skipping integration test in short mode")
292 }
293
294 ctx := context.Background()
295 db := setupTestDB(t)
296 defer cleanupBlockingTestDB(t, db)
297
298 repo := createBlockingTestCommunityRepo(t, db)
299
300 userDID := "did:plc:test-user-isblocked"
301 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
302 community := createBlockingTestCommunity(t, repo, "test-community-isblocked", communityDID)
303
304 t.Run("returns false when not blocked", func(t *testing.T) {
305 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
306 if err != nil {
307 t.Fatalf("IsBlocked failed: %v", err)
308 }
309 if isBlocked {
310 t.Error("Expected IsBlocked=false, got true")
311 }
312 })
313
314 t.Run("returns true when blocked", func(t *testing.T) {
315 // Create block
316 block := &communities.CommunityBlock{
317 UserDID: userDID,
318 CommunityDID: community.DID,
319 BlockedAt: time.Now(),
320 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/test", userDID),
321 RecordCID: "bafyblocktest",
322 }
323 _, err := repo.BlockCommunity(ctx, block)
324 if err != nil {
325 t.Fatalf("Failed to create block: %v", err)
326 }
327
328 // Check IsBlocked
329 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
330 if err != nil {
331 t.Fatalf("IsBlocked failed: %v", err)
332 }
333 if !isBlocked {
334 t.Error("Expected IsBlocked=true, got false")
335 }
336 })
337
338 t.Run("returns false after unblock", func(t *testing.T) {
339 // Unblock
340 err := repo.UnblockCommunity(ctx, userDID, community.DID)
341 if err != nil {
342 t.Fatalf("UnblockCommunity failed: %v", err)
343 }
344
345 // Check IsBlocked
346 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
347 if err != nil {
348 t.Fatalf("IsBlocked failed: %v", err)
349 }
350 if isBlocked {
351 t.Error("Expected IsBlocked=false after unblock, got true")
352 }
353 })
354}
355
356// TestCommunityBlocking_GetBlock tests block retrieval
357func TestCommunityBlocking_GetBlock(t *testing.T) {
358 if testing.Short() {
359 t.Skip("Skipping integration test in short mode")
360 }
361
362 ctx := context.Background()
363 db := setupTestDB(t)
364 defer cleanupBlockingTestDB(t, db)
365
366 repo := createBlockingTestCommunityRepo(t, db)
367
368 userDID := "did:plc:test-user-getblock"
369 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
370 community := createBlockingTestCommunity(t, repo, "test-community-getblock", communityDID)
371
372 t.Run("returns error when block doesn't exist", func(t *testing.T) {
373 _, err := repo.GetBlock(ctx, userDID, community.DID)
374 if !communities.IsNotFound(err) {
375 t.Errorf("Expected ErrBlockNotFound, got: %v", err)
376 }
377 })
378
379 t.Run("retrieves block by user and community DID", func(t *testing.T) {
380 // Create block
381 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID)
382 originalBlock := &communities.CommunityBlock{
383 UserDID: userDID,
384 CommunityDID: community.DID,
385 BlockedAt: time.Now(),
386 RecordURI: recordURI,
387 RecordCID: "bafyblockgettest",
388 }
389 _, err := repo.BlockCommunity(ctx, originalBlock)
390 if err != nil {
391 t.Fatalf("Failed to create block: %v", err)
392 }
393
394 // Retrieve by user+community
395 block, err := repo.GetBlock(ctx, userDID, community.DID)
396 if err != nil {
397 t.Fatalf("GetBlock failed: %v", err)
398 }
399
400 if block.UserDID != userDID {
401 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID)
402 }
403 if block.CommunityDID != community.DID {
404 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID)
405 }
406 if block.RecordURI != recordURI {
407 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI)
408 }
409 })
410
411 t.Run("retrieves block by URI", func(t *testing.T) {
412 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID)
413
414 // Retrieve by URI
415 block, err := repo.GetBlockByURI(ctx, recordURI)
416 if err != nil {
417 t.Fatalf("GetBlockByURI failed: %v", err)
418 }
419
420 if block.RecordURI != recordURI {
421 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI)
422 }
423 if block.CommunityDID != community.DID {
424 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID)
425 }
426 })
427}
428
429// Helper functions for blocking tests
430
431func createBlockingTestCommunityRepo(t *testing.T, db *sql.DB) communities.Repository {
432 return postgresRepo.NewCommunityRepository(db)
433}
434
435func createBlockingTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community {
436 community := &communities.Community{
437 DID: did,
438 Handle: fmt.Sprintf("!%s@coves.test", name),
439 Name: name,
440 DisplayName: fmt.Sprintf("Test Community %s", name),
441 Description: "Test community for blocking tests",
442 OwnerDID: did,
443 CreatedByDID: "did:plc:test-creator",
444 HostedByDID: "did:plc:test-instance",
445 Visibility: "public",
446 CreatedAt: time.Now(),
447 UpdatedAt: time.Now(),
448 }
449
450 created, err := repo.Create(context.Background(), community)
451 if err != nil {
452 t.Fatalf("Failed to create test community: %v", err)
453 }
454
455 return created
456}
457
458func cleanupBlockingTestDB(t *testing.T, db *sql.DB) {
459 // Clean up test data
460 _, err := db.Exec("DELETE FROM community_blocks WHERE user_did LIKE 'did:plc:test-%'")
461 if err != nil {
462 t.Logf("Warning: Failed to clean up blocks: %v", err)
463 }
464
465 _, err = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:test-community-%'")
466 if err != nil {
467 t.Logf("Warning: Failed to clean up communities: %v", err)
468 }
469
470 if closeErr := db.Close(); closeErr != nil {
471 t.Logf("Failed to close database: %v", closeErr)
472 }
473}