A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/jetstream"
5 "Coves/internal/core/communities"
6 postgresRepo "Coves/internal/db/postgres"
7 "context"
8 "database/sql"
9 "fmt"
10 "testing"
11 "time"
12)
13
14// TestCommunityBlocking_Indexing tests Jetstream indexing of block events
15func TestCommunityBlocking_Indexing(t *testing.T) {
16 if testing.Short() {
17 t.Skip("Skipping integration test in short mode")
18 }
19
20 ctx := context.Background()
21 db := setupTestDB(t)
22 defer cleanupBlockingTestDB(t, db)
23
24 repo := createBlockingTestCommunityRepo(t, db)
25 consumer := jetstream.NewCommunityEventConsumer(repo)
26
27 // Create test community
28 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
29 community := createBlockingTestCommunity(t, repo, "test-community-blocking", testDID)
30
31 t.Run("indexes block CREATE event", func(t *testing.T) {
32 userDID := "did:plc:test-user-blocker"
33 rkey := "test-block-1"
34
35 // Simulate Jetstream CREATE event
36 event := &jetstream.JetstreamEvent{
37 Did: userDID,
38 Kind: "commit",
39 TimeUS: time.Now().UnixMicro(),
40 Commit: &jetstream.CommitEvent{
41 Rev: "test-rev-1",
42 Operation: "create",
43 Collection: "social.coves.community.block",
44 RKey: rkey,
45 CID: "bafyblock123",
46 Record: map[string]interface{}{
47 "$type": "social.coves.community.block",
48 "subject": community.DID,
49 "createdAt": time.Now().Format(time.RFC3339),
50 },
51 },
52 }
53
54 // Process event
55 err := consumer.HandleEvent(ctx, event)
56 if err != nil {
57 t.Fatalf("Failed to handle block event: %v", err)
58 }
59
60 // Verify block indexed
61 block, err := repo.GetBlock(ctx, userDID, community.DID)
62 if err != nil {
63 t.Fatalf("Failed to get block: %v", err)
64 }
65
66 if block.UserDID != userDID {
67 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID)
68 }
69 if block.CommunityDID != community.DID {
70 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID)
71 }
72
73 // Verify IsBlocked works
74 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
75 if err != nil {
76 t.Fatalf("IsBlocked failed: %v", err)
77 }
78 if !isBlocked {
79 t.Error("Expected IsBlocked=true, got false")
80 }
81 })
82
83 t.Run("indexes block DELETE event", func(t *testing.T) {
84 userDID := "did:plc:test-user-unblocker"
85 rkey := "test-block-2"
86 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, rkey)
87
88 // First create a block
89 block := &communities.CommunityBlock{
90 UserDID: userDID,
91 CommunityDID: community.DID,
92 BlockedAt: time.Now(),
93 RecordURI: uri,
94 RecordCID: "bafyblock456",
95 }
96 _, err := repo.BlockCommunity(ctx, block)
97 if err != nil {
98 t.Fatalf("Failed to create block: %v", err)
99 }
100
101 // Simulate DELETE event
102 event := &jetstream.JetstreamEvent{
103 Did: userDID,
104 Kind: "commit",
105 TimeUS: time.Now().UnixMicro(),
106 Commit: &jetstream.CommitEvent{
107 Rev: "test-rev-2",
108 Operation: "delete",
109 Collection: "social.coves.community.block",
110 RKey: rkey,
111 },
112 }
113
114 // Process delete
115 err = consumer.HandleEvent(ctx, event)
116 if err != nil {
117 t.Fatalf("Failed to handle delete event: %v", err)
118 }
119
120 // Verify block removed
121 _, err = repo.GetBlock(ctx, userDID, community.DID)
122 if !communities.IsNotFound(err) {
123 t.Error("Expected block to be deleted")
124 }
125
126 // Verify IsBlocked returns false
127 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
128 if err != nil {
129 t.Fatalf("IsBlocked failed: %v", err)
130 }
131 if isBlocked {
132 t.Error("Expected IsBlocked=false, got true")
133 }
134 })
135
136 t.Run("block is idempotent", func(t *testing.T) {
137 userDID := "did:plc:test-user-idempotent"
138 rkey := "test-block-3"
139
140 event := &jetstream.JetstreamEvent{
141 Did: userDID,
142 Kind: "commit",
143 TimeUS: time.Now().UnixMicro(),
144 Commit: &jetstream.CommitEvent{
145 Rev: "test-rev-3",
146 Operation: "create",
147 Collection: "social.coves.community.block",
148 RKey: rkey,
149 CID: "bafyblock789",
150 Record: map[string]interface{}{
151 "$type": "social.coves.community.block",
152 "subject": community.DID,
153 "createdAt": time.Now().Format(time.RFC3339),
154 },
155 },
156 }
157
158 // Process event twice
159 err := consumer.HandleEvent(ctx, event)
160 if err != nil {
161 t.Fatalf("First block failed: %v", err)
162 }
163
164 err = consumer.HandleEvent(ctx, event)
165 if err != nil {
166 t.Fatalf("Second block (idempotent) failed: %v", err)
167 }
168
169 // Should still exist only once
170 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0)
171 if err != nil {
172 t.Fatalf("ListBlockedCommunities failed: %v", err)
173 }
174 if len(blocks) != 1 {
175 t.Errorf("Expected 1 block, got %d", len(blocks))
176 }
177 })
178
179 t.Run("handles DELETE of non-existent block gracefully", func(t *testing.T) {
180 userDID := "did:plc:test-user-nonexistent"
181 rkey := "test-block-nonexistent"
182
183 // Simulate DELETE event for block that doesn't exist
184 event := &jetstream.JetstreamEvent{
185 Did: userDID,
186 Kind: "commit",
187 TimeUS: time.Now().UnixMicro(),
188 Commit: &jetstream.CommitEvent{
189 Rev: "test-rev-99",
190 Operation: "delete",
191 Collection: "social.coves.community.block",
192 RKey: rkey,
193 },
194 }
195
196 // Should not error (idempotent)
197 err := consumer.HandleEvent(ctx, event)
198 if err != nil {
199 t.Errorf("DELETE of non-existent block should be idempotent, got error: %v", err)
200 }
201 })
202}
203
204// TestCommunityBlocking_ListBlocked tests listing blocked communities
205func TestCommunityBlocking_ListBlocked(t *testing.T) {
206 if testing.Short() {
207 t.Skip("Skipping integration test in short mode")
208 }
209
210 ctx := context.Background()
211 db := setupTestDB(t)
212 defer cleanupBlockingTestDB(t, db)
213
214 repo := createBlockingTestCommunityRepo(t, db)
215 userDID := "did:plc:test-user-list"
216
217 // Create and block 3 communities
218 testCommunities := make([]*communities.Community, 3)
219 for i := 0; i < 3; i++ {
220 communityDID := fmt.Sprintf("did:plc:test-community-list-%d", i)
221 testCommunities[i] = createBlockingTestCommunity(t, repo, fmt.Sprintf("community-list-%d", i), communityDID)
222
223 block := &communities.CommunityBlock{
224 UserDID: userDID,
225 CommunityDID: testCommunities[i].DID,
226 BlockedAt: time.Now(),
227 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/%d", userDID, i),
228 RecordCID: fmt.Sprintf("bafyblock%d", i),
229 }
230 _, err := repo.BlockCommunity(ctx, block)
231 if err != nil {
232 t.Fatalf("Failed to block community %d: %v", i, err)
233 }
234 }
235
236 t.Run("lists all blocked communities", func(t *testing.T) {
237 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0)
238 if err != nil {
239 t.Fatalf("ListBlockedCommunities failed: %v", err)
240 }
241
242 if len(blocks) != 3 {
243 t.Errorf("Expected 3 blocks, got %d", len(blocks))
244 }
245
246 // Verify all blocks belong to correct user
247 for _, block := range blocks {
248 if block.UserDID != userDID {
249 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID)
250 }
251 }
252 })
253
254 t.Run("pagination works correctly", func(t *testing.T) {
255 // Get first 2
256 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 2, 0)
257 if err != nil {
258 t.Fatalf("ListBlockedCommunities with limit failed: %v", err)
259 }
260 if len(blocks) != 2 {
261 t.Errorf("Expected 2 blocks (paginated), got %d", len(blocks))
262 }
263
264 // Get next 2 (should only get 1)
265 blocksPage2, err := repo.ListBlockedCommunities(ctx, userDID, 2, 2)
266 if err != nil {
267 t.Fatalf("ListBlockedCommunities page 2 failed: %v", err)
268 }
269 if len(blocksPage2) != 1 {
270 t.Errorf("Expected 1 block on page 2, got %d", len(blocksPage2))
271 }
272 })
273
274 t.Run("returns empty list for user with no blocks", func(t *testing.T) {
275 blocks, err := repo.ListBlockedCommunities(ctx, "did:plc:user-no-blocks", 10, 0)
276 if err != nil {
277 t.Fatalf("ListBlockedCommunities failed: %v", err)
278 }
279 if len(blocks) != 0 {
280 t.Errorf("Expected 0 blocks, got %d", len(blocks))
281 }
282 })
283}
284
285// TestCommunityBlocking_IsBlocked tests the fast block check
286func TestCommunityBlocking_IsBlocked(t *testing.T) {
287 if testing.Short() {
288 t.Skip("Skipping integration test in short mode")
289 }
290
291 ctx := context.Background()
292 db := setupTestDB(t)
293 defer cleanupBlockingTestDB(t, db)
294
295 repo := createBlockingTestCommunityRepo(t, db)
296
297 userDID := "did:plc:test-user-isblocked"
298 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
299 community := createBlockingTestCommunity(t, repo, "test-community-isblocked", communityDID)
300
301 t.Run("returns false when not blocked", func(t *testing.T) {
302 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
303 if err != nil {
304 t.Fatalf("IsBlocked failed: %v", err)
305 }
306 if isBlocked {
307 t.Error("Expected IsBlocked=false, got true")
308 }
309 })
310
311 t.Run("returns true when blocked", func(t *testing.T) {
312 // Create block
313 block := &communities.CommunityBlock{
314 UserDID: userDID,
315 CommunityDID: community.DID,
316 BlockedAt: time.Now(),
317 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/test", userDID),
318 RecordCID: "bafyblocktest",
319 }
320 _, err := repo.BlockCommunity(ctx, block)
321 if err != nil {
322 t.Fatalf("Failed to create block: %v", err)
323 }
324
325 // Check IsBlocked
326 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
327 if err != nil {
328 t.Fatalf("IsBlocked failed: %v", err)
329 }
330 if !isBlocked {
331 t.Error("Expected IsBlocked=true, got false")
332 }
333 })
334
335 t.Run("returns false after unblock", func(t *testing.T) {
336 // Unblock
337 err := repo.UnblockCommunity(ctx, userDID, community.DID)
338 if err != nil {
339 t.Fatalf("UnblockCommunity failed: %v", err)
340 }
341
342 // Check IsBlocked
343 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
344 if err != nil {
345 t.Fatalf("IsBlocked failed: %v", err)
346 }
347 if isBlocked {
348 t.Error("Expected IsBlocked=false after unblock, got true")
349 }
350 })
351}
352
353// TestCommunityBlocking_GetBlock tests block retrieval
354func TestCommunityBlocking_GetBlock(t *testing.T) {
355 if testing.Short() {
356 t.Skip("Skipping integration test in short mode")
357 }
358
359 ctx := context.Background()
360 db := setupTestDB(t)
361 defer cleanupBlockingTestDB(t, db)
362
363 repo := createBlockingTestCommunityRepo(t, db)
364
365 userDID := "did:plc:test-user-getblock"
366 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
367 community := createBlockingTestCommunity(t, repo, "test-community-getblock", communityDID)
368
369 t.Run("returns error when block doesn't exist", func(t *testing.T) {
370 _, err := repo.GetBlock(ctx, userDID, community.DID)
371 if !communities.IsNotFound(err) {
372 t.Errorf("Expected ErrBlockNotFound, got: %v", err)
373 }
374 })
375
376 t.Run("retrieves block by user and community DID", func(t *testing.T) {
377 // Create block
378 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID)
379 originalBlock := &communities.CommunityBlock{
380 UserDID: userDID,
381 CommunityDID: community.DID,
382 BlockedAt: time.Now(),
383 RecordURI: recordURI,
384 RecordCID: "bafyblockgettest",
385 }
386 _, err := repo.BlockCommunity(ctx, originalBlock)
387 if err != nil {
388 t.Fatalf("Failed to create block: %v", err)
389 }
390
391 // Retrieve by user+community
392 block, err := repo.GetBlock(ctx, userDID, community.DID)
393 if err != nil {
394 t.Fatalf("GetBlock failed: %v", err)
395 }
396
397 if block.UserDID != userDID {
398 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID)
399 }
400 if block.CommunityDID != community.DID {
401 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID)
402 }
403 if block.RecordURI != recordURI {
404 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI)
405 }
406 })
407
408 t.Run("retrieves block by URI", func(t *testing.T) {
409 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID)
410
411 // Retrieve by URI
412 block, err := repo.GetBlockByURI(ctx, recordURI)
413 if err != nil {
414 t.Fatalf("GetBlockByURI failed: %v", err)
415 }
416
417 if block.RecordURI != recordURI {
418 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI)
419 }
420 if block.CommunityDID != community.DID {
421 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID)
422 }
423 })
424}
425
426// Helper functions for blocking tests
427
428func createBlockingTestCommunityRepo(t *testing.T, db *sql.DB) communities.Repository {
429 return postgresRepo.NewCommunityRepository(db)
430}
431
432func createBlockingTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community {
433 community := &communities.Community{
434 DID: did,
435 Handle: fmt.Sprintf("!%s@coves.test", name),
436 Name: name,
437 DisplayName: fmt.Sprintf("Test Community %s", name),
438 Description: "Test community for blocking tests",
439 OwnerDID: did,
440 CreatedByDID: "did:plc:test-creator",
441 HostedByDID: "did:plc:test-instance",
442 Visibility: "public",
443 CreatedAt: time.Now(),
444 UpdatedAt: time.Now(),
445 }
446
447 created, err := repo.Create(context.Background(), community)
448 if err != nil {
449 t.Fatalf("Failed to create test community: %v", err)
450 }
451
452 return created
453}
454
455func cleanupBlockingTestDB(t *testing.T, db *sql.DB) {
456 // Clean up test data
457 _, err := db.Exec("DELETE FROM community_blocks WHERE user_did LIKE 'did:plc:test-%'")
458 if err != nil {
459 t.Logf("Warning: Failed to clean up blocks: %v", err)
460 }
461
462 _, err = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:test-community-%'")
463 if err != nil {
464 t.Logf("Warning: Failed to clean up communities: %v", err)
465 }
466
467 if closeErr := db.Close(); closeErr != nil {
468 t.Logf("Failed to close database: %v", closeErr)
469 }
470}