A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/jetstream"
5 "Coves/internal/core/communities"
6 postgresRepo "Coves/internal/db/postgres"
7 "context"
8 "database/sql"
9 "fmt"
10 "testing"
11 "time"
12)
13
14// TestCommunityBlocking_Indexing tests Jetstream indexing of block events
15func TestCommunityBlocking_Indexing(t *testing.T) {
16 if testing.Short() {
17 t.Skip("Skipping integration test in short mode")
18 }
19
20 ctx := context.Background()
21 db := setupTestDB(t)
22 defer cleanupBlockingTestDB(t, db)
23
24 repo := createBlockingTestCommunityRepo(t, db)
25 // Skip verification in tests
26 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true)
27
28 // Create test community
29 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
30 community := createBlockingTestCommunity(t, repo, "test-community-blocking", testDID)
31
32 t.Run("indexes block CREATE event", func(t *testing.T) {
33 userDID := "did:plc:test-user-blocker"
34 rkey := "test-block-1"
35
36 // Simulate Jetstream CREATE event
37 event := &jetstream.JetstreamEvent{
38 Did: userDID,
39 Kind: "commit",
40 TimeUS: time.Now().UnixMicro(),
41 Commit: &jetstream.CommitEvent{
42 Rev: "test-rev-1",
43 Operation: "create",
44 Collection: "social.coves.community.block",
45 RKey: rkey,
46 CID: "bafyblock123",
47 Record: map[string]interface{}{
48 "$type": "social.coves.community.block",
49 "subject": community.DID,
50 "createdAt": time.Now().Format(time.RFC3339),
51 },
52 },
53 }
54
55 // Process event
56 err := consumer.HandleEvent(ctx, event)
57 if err != nil {
58 t.Fatalf("Failed to handle block event: %v", err)
59 }
60
61 // Verify block indexed
62 block, err := repo.GetBlock(ctx, userDID, community.DID)
63 if err != nil {
64 t.Fatalf("Failed to get block: %v", err)
65 }
66
67 if block.UserDID != userDID {
68 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID)
69 }
70 if block.CommunityDID != community.DID {
71 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID)
72 }
73
74 // Verify IsBlocked works
75 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
76 if err != nil {
77 t.Fatalf("IsBlocked failed: %v", err)
78 }
79 if !isBlocked {
80 t.Error("Expected IsBlocked=true, got false")
81 }
82 })
83
84 t.Run("indexes block DELETE event", func(t *testing.T) {
85 userDID := "did:plc:test-user-unblocker"
86 rkey := "test-block-2"
87 uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, rkey)
88
89 // First create a block
90 block := &communities.CommunityBlock{
91 UserDID: userDID,
92 CommunityDID: community.DID,
93 BlockedAt: time.Now(),
94 RecordURI: uri,
95 RecordCID: "bafyblock456",
96 }
97 _, err := repo.BlockCommunity(ctx, block)
98 if err != nil {
99 t.Fatalf("Failed to create block: %v", err)
100 }
101
102 // Simulate DELETE event
103 event := &jetstream.JetstreamEvent{
104 Did: userDID,
105 Kind: "commit",
106 TimeUS: time.Now().UnixMicro(),
107 Commit: &jetstream.CommitEvent{
108 Rev: "test-rev-2",
109 Operation: "delete",
110 Collection: "social.coves.community.block",
111 RKey: rkey,
112 },
113 }
114
115 // Process delete
116 err = consumer.HandleEvent(ctx, event)
117 if err != nil {
118 t.Fatalf("Failed to handle delete event: %v", err)
119 }
120
121 // Verify block removed
122 _, err = repo.GetBlock(ctx, userDID, community.DID)
123 if !communities.IsNotFound(err) {
124 t.Error("Expected block to be deleted")
125 }
126
127 // Verify IsBlocked returns false
128 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
129 if err != nil {
130 t.Fatalf("IsBlocked failed: %v", err)
131 }
132 if isBlocked {
133 t.Error("Expected IsBlocked=false, got true")
134 }
135 })
136
137 t.Run("block is idempotent", func(t *testing.T) {
138 userDID := "did:plc:test-user-idempotent"
139 rkey := "test-block-3"
140
141 event := &jetstream.JetstreamEvent{
142 Did: userDID,
143 Kind: "commit",
144 TimeUS: time.Now().UnixMicro(),
145 Commit: &jetstream.CommitEvent{
146 Rev: "test-rev-3",
147 Operation: "create",
148 Collection: "social.coves.community.block",
149 RKey: rkey,
150 CID: "bafyblock789",
151 Record: map[string]interface{}{
152 "$type": "social.coves.community.block",
153 "subject": community.DID,
154 "createdAt": time.Now().Format(time.RFC3339),
155 },
156 },
157 }
158
159 // Process event twice
160 err := consumer.HandleEvent(ctx, event)
161 if err != nil {
162 t.Fatalf("First block failed: %v", err)
163 }
164
165 err = consumer.HandleEvent(ctx, event)
166 if err != nil {
167 t.Fatalf("Second block (idempotent) failed: %v", err)
168 }
169
170 // Should still exist only once
171 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0)
172 if err != nil {
173 t.Fatalf("ListBlockedCommunities failed: %v", err)
174 }
175 if len(blocks) != 1 {
176 t.Errorf("Expected 1 block, got %d", len(blocks))
177 }
178 })
179
180 t.Run("handles DELETE of non-existent block gracefully", func(t *testing.T) {
181 userDID := "did:plc:test-user-nonexistent"
182 rkey := "test-block-nonexistent"
183
184 // Simulate DELETE event for block that doesn't exist
185 event := &jetstream.JetstreamEvent{
186 Did: userDID,
187 Kind: "commit",
188 TimeUS: time.Now().UnixMicro(),
189 Commit: &jetstream.CommitEvent{
190 Rev: "test-rev-99",
191 Operation: "delete",
192 Collection: "social.coves.community.block",
193 RKey: rkey,
194 },
195 }
196
197 // Should not error (idempotent)
198 err := consumer.HandleEvent(ctx, event)
199 if err != nil {
200 t.Errorf("DELETE of non-existent block should be idempotent, got error: %v", err)
201 }
202 })
203}
204
205// TestCommunityBlocking_ListBlocked tests listing blocked communities
206func TestCommunityBlocking_ListBlocked(t *testing.T) {
207 if testing.Short() {
208 t.Skip("Skipping integration test in short mode")
209 }
210
211 ctx := context.Background()
212 db := setupTestDB(t)
213 defer cleanupBlockingTestDB(t, db)
214
215 repo := createBlockingTestCommunityRepo(t, db)
216 userDID := "did:plc:test-user-list"
217
218 // Create and block 3 communities
219 testCommunities := make([]*communities.Community, 3)
220 for i := 0; i < 3; i++ {
221 communityDID := fmt.Sprintf("did:plc:test-community-list-%d", i)
222 testCommunities[i] = createBlockingTestCommunity(t, repo, fmt.Sprintf("community-list-%d", i), communityDID)
223
224 block := &communities.CommunityBlock{
225 UserDID: userDID,
226 CommunityDID: testCommunities[i].DID,
227 BlockedAt: time.Now(),
228 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/%d", userDID, i),
229 RecordCID: fmt.Sprintf("bafyblock%d", i),
230 }
231 _, err := repo.BlockCommunity(ctx, block)
232 if err != nil {
233 t.Fatalf("Failed to block community %d: %v", i, err)
234 }
235 }
236
237 t.Run("lists all blocked communities", func(t *testing.T) {
238 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 10, 0)
239 if err != nil {
240 t.Fatalf("ListBlockedCommunities failed: %v", err)
241 }
242
243 if len(blocks) != 3 {
244 t.Errorf("Expected 3 blocks, got %d", len(blocks))
245 }
246
247 // Verify all blocks belong to correct user
248 for _, block := range blocks {
249 if block.UserDID != userDID {
250 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID)
251 }
252 }
253 })
254
255 t.Run("pagination works correctly", func(t *testing.T) {
256 // Get first 2
257 blocks, err := repo.ListBlockedCommunities(ctx, userDID, 2, 0)
258 if err != nil {
259 t.Fatalf("ListBlockedCommunities with limit failed: %v", err)
260 }
261 if len(blocks) != 2 {
262 t.Errorf("Expected 2 blocks (paginated), got %d", len(blocks))
263 }
264
265 // Get next 2 (should only get 1)
266 blocksPage2, err := repo.ListBlockedCommunities(ctx, userDID, 2, 2)
267 if err != nil {
268 t.Fatalf("ListBlockedCommunities page 2 failed: %v", err)
269 }
270 if len(blocksPage2) != 1 {
271 t.Errorf("Expected 1 block on page 2, got %d", len(blocksPage2))
272 }
273 })
274
275 t.Run("returns empty list for user with no blocks", func(t *testing.T) {
276 blocks, err := repo.ListBlockedCommunities(ctx, "did:plc:user-no-blocks", 10, 0)
277 if err != nil {
278 t.Fatalf("ListBlockedCommunities failed: %v", err)
279 }
280 if len(blocks) != 0 {
281 t.Errorf("Expected 0 blocks, got %d", len(blocks))
282 }
283 })
284}
285
286// TestCommunityBlocking_IsBlocked tests the fast block check
287func TestCommunityBlocking_IsBlocked(t *testing.T) {
288 if testing.Short() {
289 t.Skip("Skipping integration test in short mode")
290 }
291
292 ctx := context.Background()
293 db := setupTestDB(t)
294 defer cleanupBlockingTestDB(t, db)
295
296 repo := createBlockingTestCommunityRepo(t, db)
297
298 userDID := "did:plc:test-user-isblocked"
299 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
300 community := createBlockingTestCommunity(t, repo, "test-community-isblocked", communityDID)
301
302 t.Run("returns false when not blocked", func(t *testing.T) {
303 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
304 if err != nil {
305 t.Fatalf("IsBlocked failed: %v", err)
306 }
307 if isBlocked {
308 t.Error("Expected IsBlocked=false, got true")
309 }
310 })
311
312 t.Run("returns true when blocked", func(t *testing.T) {
313 // Create block
314 block := &communities.CommunityBlock{
315 UserDID: userDID,
316 CommunityDID: community.DID,
317 BlockedAt: time.Now(),
318 RecordURI: fmt.Sprintf("at://%s/social.coves.community.block/test", userDID),
319 RecordCID: "bafyblocktest",
320 }
321 _, err := repo.BlockCommunity(ctx, block)
322 if err != nil {
323 t.Fatalf("Failed to create block: %v", err)
324 }
325
326 // Check IsBlocked
327 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
328 if err != nil {
329 t.Fatalf("IsBlocked failed: %v", err)
330 }
331 if !isBlocked {
332 t.Error("Expected IsBlocked=true, got false")
333 }
334 })
335
336 t.Run("returns false after unblock", func(t *testing.T) {
337 // Unblock
338 err := repo.UnblockCommunity(ctx, userDID, community.DID)
339 if err != nil {
340 t.Fatalf("UnblockCommunity failed: %v", err)
341 }
342
343 // Check IsBlocked
344 isBlocked, err := repo.IsBlocked(ctx, userDID, community.DID)
345 if err != nil {
346 t.Fatalf("IsBlocked failed: %v", err)
347 }
348 if isBlocked {
349 t.Error("Expected IsBlocked=false after unblock, got true")
350 }
351 })
352}
353
354// TestCommunityBlocking_GetBlock tests block retrieval
355func TestCommunityBlocking_GetBlock(t *testing.T) {
356 if testing.Short() {
357 t.Skip("Skipping integration test in short mode")
358 }
359
360 ctx := context.Background()
361 db := setupTestDB(t)
362 defer cleanupBlockingTestDB(t, db)
363
364 repo := createBlockingTestCommunityRepo(t, db)
365
366 userDID := "did:plc:test-user-getblock"
367 communityDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano())
368 community := createBlockingTestCommunity(t, repo, "test-community-getblock", communityDID)
369
370 t.Run("returns error when block doesn't exist", func(t *testing.T) {
371 _, err := repo.GetBlock(ctx, userDID, community.DID)
372 if !communities.IsNotFound(err) {
373 t.Errorf("Expected ErrBlockNotFound, got: %v", err)
374 }
375 })
376
377 t.Run("retrieves block by user and community DID", func(t *testing.T) {
378 // Create block
379 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID)
380 originalBlock := &communities.CommunityBlock{
381 UserDID: userDID,
382 CommunityDID: community.DID,
383 BlockedAt: time.Now(),
384 RecordURI: recordURI,
385 RecordCID: "bafyblockgettest",
386 }
387 _, err := repo.BlockCommunity(ctx, originalBlock)
388 if err != nil {
389 t.Fatalf("Failed to create block: %v", err)
390 }
391
392 // Retrieve by user+community
393 block, err := repo.GetBlock(ctx, userDID, community.DID)
394 if err != nil {
395 t.Fatalf("GetBlock failed: %v", err)
396 }
397
398 if block.UserDID != userDID {
399 t.Errorf("Expected userDID=%s, got %s", userDID, block.UserDID)
400 }
401 if block.CommunityDID != community.DID {
402 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID)
403 }
404 if block.RecordURI != recordURI {
405 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI)
406 }
407 })
408
409 t.Run("retrieves block by URI", func(t *testing.T) {
410 recordURI := fmt.Sprintf("at://%s/social.coves.community.block/test-getblock", userDID)
411
412 // Retrieve by URI
413 block, err := repo.GetBlockByURI(ctx, recordURI)
414 if err != nil {
415 t.Fatalf("GetBlockByURI failed: %v", err)
416 }
417
418 if block.RecordURI != recordURI {
419 t.Errorf("Expected recordURI=%s, got %s", recordURI, block.RecordURI)
420 }
421 if block.CommunityDID != community.DID {
422 t.Errorf("Expected communityDID=%s, got %s", community.DID, block.CommunityDID)
423 }
424 })
425}
426
427// Helper functions for blocking tests
428
429func createBlockingTestCommunityRepo(t *testing.T, db *sql.DB) communities.Repository {
430 return postgresRepo.NewCommunityRepository(db)
431}
432
433func createBlockingTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community {
434 community := &communities.Community{
435 DID: did,
436 Handle: fmt.Sprintf("!%s@coves.test", name),
437 Name: name,
438 DisplayName: fmt.Sprintf("Test Community %s", name),
439 Description: "Test community for blocking tests",
440 OwnerDID: did,
441 CreatedByDID: "did:plc:test-creator",
442 HostedByDID: "did:plc:test-instance",
443 Visibility: "public",
444 CreatedAt: time.Now(),
445 UpdatedAt: time.Now(),
446 }
447
448 created, err := repo.Create(context.Background(), community)
449 if err != nil {
450 t.Fatalf("Failed to create test community: %v", err)
451 }
452
453 return created
454}
455
456func cleanupBlockingTestDB(t *testing.T, db *sql.DB) {
457 // Clean up test data
458 _, err := db.Exec("DELETE FROM community_blocks WHERE user_did LIKE 'did:plc:test-%'")
459 if err != nil {
460 t.Logf("Warning: Failed to clean up blocks: %v", err)
461 }
462
463 _, err = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:test-community-%'")
464 if err != nil {
465 t.Logf("Warning: Failed to clean up communities: %v", err)
466 }
467
468 if closeErr := db.Close(); closeErr != nil {
469 t.Logf("Failed to close database: %v", closeErr)
470 }
471}