A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/jetstream"
5 "Coves/internal/core/communities"
6 "context"
7 "database/sql"
8 "fmt"
9 "testing"
10 "time"
11
12 postgresRepo "Coves/internal/db/postgres"
13)
14
15// TestCommunityBlocking_Indexing tests Jetstream indexing of block events
16func TestCommunityBlocking_Indexing(t *testing.T) {
17 if testing.Short() {
18 t.Skip("Skipping integration test in short mode")
19 }
20
21 ctx := context.Background()
22 db := setupTestDB(t)
23 defer cleanupBlockingTestDB(t, db)
24
25 repo := createBlockingTestCommunityRepo(t, db)
26 // Skip verification in tests
27 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true)
28
29 // Create test community
30 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
31 community := createBlockingTestCommunity(t, repo, "test-community-blocking", testDID)
32
33 t.Run("indexes block CREATE event", func(t *testing.T) {
34 userDID := "did:plc:test-user-blocker"
35 rkey := "test-block-1"
36
37 // Simulate Jetstream CREATE event
38 event := &jetstream.JetstreamEvent{
39 Did: userDID,
40 Kind: "commit",
41 TimeUS: time.Now().UnixMicro(),
42 Commit: &jetstream.CommitEvent{
43 Rev: "test-rev-1",
44 Operation: "create",
45 Collection: "social.coves.community.block",
46 RKey: rkey,
47 CID: "bafyblock123",
48 Record: map[string]interface{}{
49 "$type": "social.coves.community.block",
50 "subject": community.DID,
51 "createdAt": time.Now().Format(time.RFC3339),
52 },
53 },
54 }
55
56 // Process event
57 err := consumer.HandleEvent(ctx, event)
58 if err != nil {
59 t.Fatalf("Failed to handle block event: %v", err)
60 }
61
62 // Verify block indexed
63 block, err := repo.GetBlock(ctx, userDID, community.DID)
64 if err != nil {
65 t.Fatalf("Failed to get block: %v", err)
66 }
67
68 if block.UserDID != userDID {
69 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID)
70 }
71 if block.CommunityDID != community.DID {
72 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID)
73 }
74
75 // Verify IsBlocked works
76 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
77 if err != nil {
78 t.Fatalf("IsBlocked failed: %v", err)
79 }
80 if !isBlocked {
81 t.Error("Expected IsBlocked=true, got false")
82 }
83 })
84
85 t.Run("indexes block DELETE event", func(t *testing.T) {
86 userDID := "did:plc:test-user-unblocker"
87 rkey := "test-block-2"
88 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, rkey)
89
90 // First create a block
91 block := &communities.CommunityBlock{
92 UserDID: userDID,
93 CommunityDID: community.DID,
94 BlockedAt: time.Now(),
95 RecordURI: uri,
96 RecordCID: "bafyblock456",
97 }
98 _, err := repo.BlockCommunity(ctx, block)
99 if err != nil {
100 t.Fatalf("Failed to create block: %v", err)
101 }
102
103 // Simulate DELETE event
104 event := &jetstream.JetstreamEvent{
105 Did: userDID,
106 Kind: "commit",
107 TimeUS: time.Now().UnixMicro(),
108 Commit: &jetstream.CommitEvent{
109 Rev: "test-rev-2",
110 Operation: "delete",
111 Collection: "social.coves.community.block",
112 RKey: rkey,
113 },
114 }
115
116 // Process delete
117 err = consumer.HandleEvent(ctx, event)
118 if err != nil {
119 t.Fatalf("Failed to handle delete event: %v", err)
120 }
121
122 // Verify block removed
123 _, err = repo.GetBlock(ctx, userDID, community.DID)
124 if !communities.IsNotFound(err) {
125 t.Error("Expected block to be deleted")
126 }
127
128 // Verify IsBlocked returns false
129 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
130 if err != nil {
131 t.Fatalf("IsBlocked failed: %v", err)
132 }
133 if isBlocked {
134 t.Error("Expected IsBlocked=false, got true")
135 }
136 })
137
138 t.Run("block is idempotent", func(t *testing.T) {
139 userDID := "did:plc:test-user-idempotent"
140 rkey := "test-block-3"
141
142 event := &jetstream.JetstreamEvent{
143 Did: userDID,
144 Kind: "commit",
145 TimeUS: time.Now().UnixMicro(),
146 Commit: &jetstream.CommitEvent{
147 Rev: "test-rev-3",
148 Operation: "create",
149 Collection: "social.coves.community.block",
150 RKey: rkey,
151 CID: "bafyblock789",
152 Record: map[string]interface{}{
153 "$type": "social.coves.community.block",
154 "subject": community.DID,
155 "createdAt": time.Now().Format(time.RFC3339),
156 },
157 },
158 }
159
160 // Process event twice
161 err := consumer.HandleEvent(ctx, event)
162 if err != nil {
163 t.Fatalf("First block failed: %v", err)
164 }
165
166 err = consumer.HandleEvent(ctx, event)
167 if err != nil {
168 t.Fatalf("Second block (idempotent) failed: %v", err)
169 }
170
171 // Should still exist only once
172 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0)
173 if err != nil {
174 t.Fatalf("ListBlockedCommunities failed: %v", err)
175 }
176 if len(blocks) != 1 {
177 t.Errorf("Expected 1 block, got %d", len(blocks))
178 }
179 })
180
181 t.Run("handles DELETE of non-existent block gracefully", func(t *testing.T) {
182 userDID := "did:plc:test-user-nonexistent"
183 rkey := "test-block-nonexistent"
184
185 // Simulate DELETE event for block that doesn't exist
186 event := &jetstream.JetstreamEvent{
187 Did: userDID,
188 Kind: "commit",
189 TimeUS: time.Now().UnixMicro(),
190 Commit: &jetstream.CommitEvent{
191 Rev: "test-rev-99",
192 Operation: "delete",
193 Collection: "social.coves.community.block",
194 RKey: rkey,
195 },
196 }
197
198 // Should not error (idempotent)
199 err := consumer.HandleEvent(ctx, event)
200 if err != nil {
201 t.Errorf("DELETE of non-existent block should be idempotent, got error: %v", err)
202 }
203 })
204}
205
206// TestCommunityBlocking_ListBlocked tests listing blocked communities
207func TestCommunityBlocking_ListBlocked(t *testing.T) {
208 if testing.Short() {
209 t.Skip("Skipping integration test in short mode")
210 }
211
212 ctx := context.Background()
213 db := setupTestDB(t)
214 defer cleanupBlockingTestDB(t, db)
215
216 repo := createBlockingTestCommunityRepo(t, db)
217 userDID := "did:plc:test-user-list"
218
219 // Create and block 3 communities
220 testCommunities := make([]*communities.Community, 3)
221 for i := 0; i < 3; i++ {
222 communityDID := fmt.Sprintf("did:plc:test-community-list-%d", i)
223 testCommunities[i] = createBlockingTestCommunity(t, repo, fmt.Sprintf("community-list-%d", i), communityDID)
224
225 block := &communities.CommunityBlock{
226 UserDID: userDID,
227 CommunityDID: testCommunities[i].DID,
228 BlockedAt: time.Now(),
229 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/%d", userDID, i),
230 RecordCID: fmt.Sprintf("bafyblock%d", i),
231 }
232 _, err := repo.BlockCommunity(ctx, block)
233 if err != nil {
234 t.Fatalf("Failed to block community %d: %v", i, err)
235 }
236 }
237
238 t.Run("lists all blocked communities", func(t *testing.T) {
239 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0)
240 if err != nil {
241 t.Fatalf("ListBlockedCommunities failed: %v", err)
242 }
243
244 if len(blocks) != 3 {
245 t.Errorf("Expected 3 blocks, got %d", len(blocks))
246 }
247
248 // Verify all blocks belong to correct user
249 for _, block := range blocks {
250 if block.UserDID != userDID {
251 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID)
252 }
253 }
254 })
255
256 t.Run("pagination works correctly", func(t *testing.T) {
257 // Get first 2
258 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 2, 0)
259 if err != nil {
260 t.Fatalf("ListBlockedCommunities with limit failed: %v", err)
261 }
262 if len(blocks) != 2 {
263 t.Errorf("Expected 2 blocks (paginated), got %d", len(blocks))
264 }
265
266 // Get next 2 (should only get 1)
267 blocksPage2, err := repo.ListBlockedCommunities(ctx, userDID, 2, 2)
268 if err != nil {
269 t.Fatalf("ListBlockedCommunities page 2 failed: %v", err)
270 }
271 if len(blocksPage2) != 1 {
272 t.Errorf("Expected 1 block on page 2, got %d", len(blocksPage2))
273 }
274 })
275
276 t.Run("returns empty list for user with no blocks", func(t *testing.T) {
277 blocks, err := repo.ListBlockedCommunities(ctx, "did:plc:user-no-blocks", 10, 0)
278 if err != nil {
279 t.Fatalf("ListBlockedCommunities failed: %v", err)
280 }
281 if len(blocks) != 0 {
282 t.Errorf("Expected 0 blocks, got %d", len(blocks))
283 }
284 })
285}
286
287// TestCommunityBlocking_IsBlocked tests the fast block check
288func TestCommunityBlocking_IsBlocked(t *testing.T) {
289 if testing.Short() {
290 t.Skip("Skipping integration test in short mode")
291 }
292
293 ctx := context.Background()
294 db := setupTestDB(t)
295 defer cleanupBlockingTestDB(t, db)
296
297 repo := createBlockingTestCommunityRepo(t, db)
298
299 userDID := "did:plc:test-user-isblocked"
300 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
301 community := createBlockingTestCommunity(t, repo, "test-community-isblocked", communityDID)
302
303 t.Run("returns false when not blocked", func(t *testing.T) {
304 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
305 if err != nil {
306 t.Fatalf("IsBlocked failed: %v", err)
307 }
308 if isBlocked {
309 t.Error("Expected IsBlocked=false, got true")
310 }
311 })
312
313 t.Run("returns true when blocked", func(t *testing.T) {
314 // Create block
315 block := &communities.CommunityBlock{
316 UserDID: userDID,
317 CommunityDID: community.DID,
318 BlockedAt: time.Now(),
319 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/test", userDID),
320 RecordCID: "bafyblocktest",
321 }
322 _, err := repo.BlockCommunity(ctx, block)
323 if err != nil {
324 t.Fatalf("Failed to create block: %v", err)
325 }
326
327 // Check IsBlocked
328 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
329 if err != nil {
330 t.Fatalf("IsBlocked failed: %v", err)
331 }
332 if !isBlocked {
333 t.Error("Expected IsBlocked=true, got false")
334 }
335 })
336
337 t.Run("returns false after unblock", func(t *testing.T) {
338 // Unblock
339 err := repo.UnblockCommunity(ctx, userDID, community.DID)
340 if err != nil {
341 t.Fatalf("UnblockCommunity failed: %v", err)
342 }
343
344 // Check IsBlocked
345 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
346 if err != nil {
347 t.Fatalf("IsBlocked failed: %v", err)
348 }
349 if isBlocked {
350 t.Error("Expected IsBlocked=false after unblock, got true")
351 }
352 })
353}
354
355// TestCommunityBlocking_GetBlock tests block retrieval
356func TestCommunityBlocking_GetBlock(t *testing.T) {
357 if testing.Short() {
358 t.Skip("Skipping integration test in short mode")
359 }
360
361 ctx := context.Background()
362 db := setupTestDB(t)
363 defer cleanupBlockingTestDB(t, db)
364
365 repo := createBlockingTestCommunityRepo(t, db)
366
367 userDID := "did:plc:test-user-getblock"
368 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
369 community := createBlockingTestCommunity(t, repo, "test-community-getblock", communityDID)
370
371 t.Run("returns error when block doesn't exist", func(t *testing.T) {
372 _, err := repo.GetBlock(ctx, userDID, community.DID)
373 if !communities.IsNotFound(err) {
374 t.Errorf("Expected ErrBlockNotFound, got: %v", err)
375 }
376 })
377
378 t.Run("retrieves block by user and community DID", func(t *testing.T) {
379 // Create block
380 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID)
381 originalBlock := &communities.CommunityBlock{
382 UserDID: userDID,
383 CommunityDID: community.DID,
384 BlockedAt: time.Now(),
385 RecordURI: recordURI,
386 RecordCID: "bafyblockgettest",
387 }
388 _, err := repo.BlockCommunity(ctx, originalBlock)
389 if err != nil {
390 t.Fatalf("Failed to create block: %v", err)
391 }
392
393 // Retrieve by user+community
394 block, err := repo.GetBlock(ctx, userDID, community.DID)
395 if err != nil {
396 t.Fatalf("GetBlock failed: %v", err)
397 }
398
399 if block.UserDID != userDID {
400 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID)
401 }
402 if block.CommunityDID != community.DID {
403 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID)
404 }
405 if block.RecordURI != recordURI {
406 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI)
407 }
408 })
409
410 t.Run("retrieves block by URI", func(t *testing.T) {
411 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID)
412
413 // Retrieve by URI
414 block, err := repo.GetBlockByURI(ctx, recordURI)
415 if err != nil {
416 t.Fatalf("GetBlockByURI failed: %v", err)
417 }
418
419 if block.RecordURI != recordURI {
420 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI)
421 }
422 if block.CommunityDID != community.DID {
423 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID)
424 }
425 })
426}
427
428// Helper functions for blocking tests
429
430func createBlockingTestCommunityRepo(t *testing.T, db *sql.DB) communities.Repository {
431 return postgresRepo.NewCommunityRepository(db)
432}
433
434func createBlockingTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community {
435 community := &communities.Community{
436 DID: did,
437 Handle: fmt.Sprintf("!%s@coves.test", name),
438 Name: name,
439 DisplayName: fmt.Sprintf("Test Community %s", name),
440 Description: "Test community for blocking tests",
441 OwnerDID: did,
442 CreatedByDID: "did:plc:test-creator",
443 HostedByDID: "did:plc:test-instance",
444 Visibility: "public",
445 CreatedAt: time.Now(),
446 UpdatedAt: time.Now(),
447 }
448
449 created, err := repo.Create(context.Background(), community)
450 if err != nil {
451 t.Fatalf("Failed to create test community: %v", err)
452 }
453
454 return created
455}
456
457func cleanupBlockingTestDB(t *testing.T, db *sql.DB) {
458 // Clean up test data
459 _, err := db.Exec("DELETE FROM community_blocks WHERE user_did LIKE 'did:plc:test-%'")
460 if err != nil {
461 t.Logf("Warning: Failed to clean up blocks: %v", err)
462 }
463
464 _, err = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:test-community-%'")
465 if err != nil {
466 t.Logf("Warning: Failed to clean up communities: %v", err)
467 }
468
469 if closeErr := db.Close(); closeErr != nil {
470 t.Logf("Failed to close database: %v", closeErr)
471 }
472}