A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "testing"
8 "time"
9
10 "Coves/internal/atproto/identity"
11 "Coves/internal/atproto/jetstream"
12 "Coves/internal/core/communities"
13 "Coves/internal/db/postgres"
14)
15
16func TestCommunityConsumer_HandleCommunityProfile(t *testing.T) {
17 db := setupTestDB(t)
18 defer func() {
19 if err := db.Close(); err != nil {
20 t.Logf("Failed to close database: %v", err)
21 }
22 }()
23
24 repo := postgres.NewCommunityRepository(db)
25 ctx := context.Background()
26
27 t.Run("creates community from firehose event", func(t *testing.T) {
28 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
29 communityDID := generateTestDID(uniqueSuffix)
30 communityName := fmt.Sprintf("test-community-%s", uniqueSuffix)
31 expectedHandle := fmt.Sprintf("%s.community.coves.local", communityName)
32
33 // Set up mock resolver for this test DID
34 mockResolver := newMockIdentityResolver()
35 mockResolver.resolutions[communityDID] = expectedHandle
36 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver)
37
38 // Simulate a Jetstream commit event
39 event := &jetstream.JetstreamEvent{
40 Did: communityDID,
41 TimeUS: time.Now().UnixMicro(),
42 Kind: "commit",
43 Commit: &jetstream.CommitEvent{
44 Rev: "rev123",
45 Operation: "create",
46 Collection: "social.coves.community.profile",
47 RKey: "self",
48 CID: "bafy123abc",
49 Record: map[string]interface{}{
50 // Note: No 'did', 'handle', 'memberCount', or 'subscriberCount' in record
51 // These are resolved/computed by AppView, not stored in immutable records
52 "name": communityName,
53 "displayName": "Test Community",
54 "description": "A test community",
55 "owner": "did:web:coves.local",
56 "createdBy": "did:plc:user123",
57 "hostedBy": "did:web:coves.local",
58 "visibility": "public",
59 "federation": map[string]interface{}{
60 "allowExternalDiscovery": true,
61 },
62 "createdAt": time.Now().Format(time.RFC3339),
63 },
64 },
65 }
66
67 // Handle the event
68 if err := consumer.HandleEvent(ctx, event); err != nil {
69 t.Fatalf("Failed to handle event: %v", err)
70 }
71
72 // Verify community was indexed
73 community, err := repo.GetByDID(ctx, communityDID)
74 if err != nil {
75 t.Fatalf("Failed to get indexed community: %v", err)
76 }
77
78 if community.DID != communityDID {
79 t.Errorf("Expected DID %s, got %s", communityDID, community.DID)
80 }
81 if community.DisplayName != "Test Community" {
82 t.Errorf("Expected DisplayName 'Test Community', got %s", community.DisplayName)
83 }
84 if community.Visibility != "public" {
85 t.Errorf("Expected Visibility 'public', got %s", community.Visibility)
86 }
87 })
88
89 t.Run("updates existing community", func(t *testing.T) {
90 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
91 communityDID := generateTestDID(uniqueSuffix)
92 communityName := fmt.Sprintf("update-test-%s", uniqueSuffix)
93 expectedHandle := fmt.Sprintf("%s.community.coves.local", communityName)
94
95 // Set up mock resolver for this test DID
96 mockResolver := newMockIdentityResolver()
97 mockResolver.resolutions[communityDID] = expectedHandle
98 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver)
99
100 // Create initial community
101 initialCommunity := &communities.Community{
102 DID: communityDID,
103 Handle: expectedHandle,
104 Name: communityName,
105 DisplayName: "Original Name",
106 Description: "Original description",
107 OwnerDID: "did:web:coves.local",
108 CreatedByDID: "did:plc:user123",
109 HostedByDID: "did:web:coves.local",
110 Visibility: "public",
111 AllowExternalDiscovery: true,
112 CreatedAt: time.Now(),
113 UpdatedAt: time.Now(),
114 }
115
116 if _, err := repo.Create(ctx, initialCommunity); err != nil {
117 t.Fatalf("Failed to create initial community: %v", err)
118 }
119
120 // Simulate update event
121 updateEvent := &jetstream.JetstreamEvent{
122 Did: communityDID,
123 TimeUS: time.Now().UnixMicro(),
124 Kind: "commit",
125 Commit: &jetstream.CommitEvent{
126 Rev: "rev124",
127 Operation: "update",
128 Collection: "social.coves.community.profile",
129 RKey: "self",
130 CID: "bafy456def",
131 Record: map[string]interface{}{
132 // Note: No 'did', 'handle', 'memberCount', or 'subscriberCount' in record
133 // These are resolved/computed by AppView, not stored in immutable records
134 "name": "update-test",
135 "displayName": "Updated Name",
136 "description": "Updated description",
137 "owner": "did:web:coves.local",
138 "createdBy": "did:plc:user123",
139 "hostedBy": "did:web:coves.local",
140 "visibility": "unlisted",
141 "federation": map[string]interface{}{
142 "allowExternalDiscovery": false,
143 },
144 "createdAt": time.Now().Format(time.RFC3339),
145 },
146 },
147 }
148
149 // Handle the update
150 if err := consumer.HandleEvent(ctx, updateEvent); err != nil {
151 t.Fatalf("Failed to handle update event: %v", err)
152 }
153
154 // Verify community was updated
155 updated, err := repo.GetByDID(ctx, communityDID)
156 if err != nil {
157 t.Fatalf("Failed to get updated community: %v", err)
158 }
159
160 if updated.DisplayName != "Updated Name" {
161 t.Errorf("Expected DisplayName 'Updated Name', got %s", updated.DisplayName)
162 }
163 if updated.Description != "Updated description" {
164 t.Errorf("Expected Description 'Updated description', got %s", updated.Description)
165 }
166 if updated.Visibility != "unlisted" {
167 t.Errorf("Expected Visibility 'unlisted', got %s", updated.Visibility)
168 }
169 if updated.AllowExternalDiscovery {
170 t.Error("Expected AllowExternalDiscovery to be false")
171 }
172 })
173
174 t.Run("deletes community", func(t *testing.T) {
175 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
176 communityDID := generateTestDID(uniqueSuffix)
177 communityName := fmt.Sprintf("delete-test-%s", uniqueSuffix)
178 expectedHandle := fmt.Sprintf("%s.community.coves.local", communityName)
179
180 // Set up mock resolver for this test DID
181 mockResolver := newMockIdentityResolver()
182 mockResolver.resolutions[communityDID] = expectedHandle
183 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver)
184
185 // Create community to delete
186 community := &communities.Community{
187 DID: communityDID,
188 Handle: expectedHandle,
189 Name: communityName,
190 OwnerDID: "did:web:coves.local",
191 CreatedByDID: "did:plc:user123",
192 HostedByDID: "did:web:coves.local",
193 Visibility: "public",
194 CreatedAt: time.Now(),
195 UpdatedAt: time.Now(),
196 }
197
198 if _, err := repo.Create(ctx, community); err != nil {
199 t.Fatalf("Failed to create community: %v", err)
200 }
201
202 // Simulate delete event
203 deleteEvent := &jetstream.JetstreamEvent{
204 Did: communityDID,
205 TimeUS: time.Now().UnixMicro(),
206 Kind: "commit",
207 Commit: &jetstream.CommitEvent{
208 Rev: "rev125",
209 Operation: "delete",
210 Collection: "social.coves.community.profile",
211 RKey: "self",
212 },
213 }
214
215 // Handle the delete
216 if err := consumer.HandleEvent(ctx, deleteEvent); err != nil {
217 t.Fatalf("Failed to handle delete event: %v", err)
218 }
219
220 // Verify community was deleted
221 if _, err := repo.GetByDID(ctx, communityDID); err != communities.ErrCommunityNotFound {
222 t.Errorf("Expected ErrCommunityNotFound, got: %v", err)
223 }
224 })
225}
226
227func TestCommunityConsumer_HandleSubscription(t *testing.T) {
228 db := setupTestDB(t)
229 defer func() {
230 if err := db.Close(); err != nil {
231 t.Logf("Failed to close database: %v", err)
232 }
233 }()
234
235 repo := postgres.NewCommunityRepository(db)
236 ctx := context.Background()
237
238 t.Run("creates subscription from event", func(t *testing.T) {
239 // Create a community first
240 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
241 communityDID := generateTestDID(uniqueSuffix)
242 communityName := fmt.Sprintf("sub-test-%s", uniqueSuffix)
243 expectedHandle := fmt.Sprintf("%s.community.coves.local", communityName)
244
245 // Set up mock resolver for this test DID
246 mockResolver := newMockIdentityResolver()
247 mockResolver.resolutions[communityDID] = expectedHandle
248 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver)
249
250 community := &communities.Community{
251 DID: communityDID,
252 Handle: expectedHandle,
253 Name: communityName,
254 OwnerDID: "did:web:coves.local",
255 CreatedByDID: "did:plc:user123",
256 HostedByDID: "did:web:coves.local",
257 Visibility: "public",
258 CreatedAt: time.Now(),
259 UpdatedAt: time.Now(),
260 }
261
262 if _, err := repo.Create(ctx, community); err != nil {
263 t.Fatalf("Failed to create community: %v", err)
264 }
265
266 // Simulate subscription event
267 // IMPORTANT: Use correct collection name (record type, not XRPC procedure)
268 userDID := "did:plc:subscriber123"
269 subEvent := &jetstream.JetstreamEvent{
270 Did: userDID,
271 TimeUS: time.Now().UnixMicro(),
272 Kind: "commit",
273 Commit: &jetstream.CommitEvent{
274 Rev: "rev200",
275 Operation: "create",
276 Collection: "social.coves.community.subscription", // Updated to communities namespace
277 RKey: "sub123",
278 CID: "bafy789ghi",
279 Record: map[string]interface{}{
280 "subject": communityDID, // Using 'subject' per atProto conventions
281 "contentVisibility": 3,
282 "createdAt": time.Now().Format(time.RFC3339),
283 },
284 },
285 }
286
287 // Handle the subscription
288 if err := consumer.HandleEvent(ctx, subEvent); err != nil {
289 t.Fatalf("Failed to handle subscription event: %v", err)
290 }
291
292 // Verify subscription was created
293 subscription, err := repo.GetSubscription(ctx, userDID, communityDID)
294 if err != nil {
295 t.Fatalf("Failed to get subscription: %v", err)
296 }
297
298 if subscription.UserDID != userDID {
299 t.Errorf("Expected UserDID %s, got %s", userDID, subscription.UserDID)
300 }
301 if subscription.CommunityDID != communityDID {
302 t.Errorf("Expected CommunityDID %s, got %s", communityDID, subscription.CommunityDID)
303 }
304
305 // Verify subscriber count was incremented
306 updated, err := repo.GetByDID(ctx, communityDID)
307 if err != nil {
308 t.Fatalf("Failed to get community: %v", err)
309 }
310
311 if updated.SubscriberCount != 1 {
312 t.Errorf("Expected SubscriberCount 1, got %d", updated.SubscriberCount)
313 }
314 })
315}
316
317func TestCommunityConsumer_IgnoresNonCommunityEvents(t *testing.T) {
318 db := setupTestDB(t)
319 defer func() {
320 if err := db.Close(); err != nil {
321 t.Logf("Failed to close database: %v", err)
322 }
323 }()
324
325 repo := postgres.NewCommunityRepository(db)
326 // Use mock resolver (though these tests don't create communities, so it won't be called)
327 mockResolver := newMockIdentityResolver()
328 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver)
329 ctx := context.Background()
330
331 t.Run("ignores identity events", func(t *testing.T) {
332 event := &jetstream.JetstreamEvent{
333 Did: "did:plc:user123",
334 TimeUS: time.Now().UnixMicro(),
335 Kind: "identity",
336 Identity: &jetstream.IdentityEvent{
337 Did: "did:plc:user123",
338 Handle: "alice.bsky.social",
339 },
340 }
341
342 err := consumer.HandleEvent(ctx, event)
343 if err != nil {
344 t.Errorf("Expected no error for identity event, got: %v", err)
345 }
346 })
347
348 t.Run("ignores non-community collections", func(t *testing.T) {
349 event := &jetstream.JetstreamEvent{
350 Did: "did:plc:user123",
351 TimeUS: time.Now().UnixMicro(),
352 Kind: "commit",
353 Commit: &jetstream.CommitEvent{
354 Rev: "rev300",
355 Operation: "create",
356 Collection: "app.bsky.communityFeed.post",
357 RKey: "post123",
358 Record: map[string]interface{}{
359 "text": "Hello world",
360 },
361 },
362 }
363
364 err := consumer.HandleEvent(ctx, event)
365 if err != nil {
366 t.Errorf("Expected no error for non-community event, got: %v", err)
367 }
368 })
369}
370
371// mockIdentityResolver is a test double for identity resolution
372type mockIdentityResolver struct {
373 resolutions map[string]string
374 lastDID string
375 callCount int
376 shouldFail bool
377}
378
379func newMockIdentityResolver() *mockIdentityResolver {
380 return &mockIdentityResolver{
381 resolutions: make(map[string]string),
382 }
383}
384
385func (m *mockIdentityResolver) Resolve(ctx context.Context, did string) (*identity.Identity, error) {
386 m.callCount++
387 m.lastDID = did
388
389 if m.shouldFail {
390 return nil, errors.New("mock PLC resolution failure")
391 }
392
393 handle, ok := m.resolutions[did]
394 if !ok {
395 return nil, fmt.Errorf("no resolution configured for DID: %s", did)
396 }
397
398 return &identity.Identity{
399 DID: did,
400 Handle: handle,
401 PDSURL: "https://pds.example.com",
402 ResolvedAt: time.Now(),
403 Method: identity.MethodHTTPS,
404 }, nil
405}
406
407func TestCommunityConsumer_PLCHandleResolution(t *testing.T) {
408 db := setupTestDB(t)
409 defer func() {
410 if err := db.Close(); err != nil {
411 t.Logf("Failed to close database: %v", err)
412 }
413 }()
414
415 repo := postgres.NewCommunityRepository(db)
416 ctx := context.Background()
417
418 t.Run("resolves handle from PLC successfully", func(t *testing.T) {
419 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
420 communityDID := generateTestDID(uniqueSuffix)
421 communityName := fmt.Sprintf("test-plc-%s", uniqueSuffix)
422 expectedHandle := fmt.Sprintf("%s.community.coves.social", communityName)
423
424 // Create mock resolver
425 mockResolver := newMockIdentityResolver()
426 mockResolver.resolutions[communityDID] = expectedHandle
427
428 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver)
429
430 // Simulate Jetstream event without handle in record
431 event := &jetstream.JetstreamEvent{
432 Did: communityDID,
433 TimeUS: time.Now().UnixMicro(),
434 Kind: "commit",
435 Commit: &jetstream.CommitEvent{
436 Rev: "rev123",
437 Operation: "create",
438 Collection: "social.coves.community.profile",
439 RKey: "self",
440 CID: "bafy123abc",
441 Record: map[string]interface{}{
442 // No handle field - should trigger PLC resolution
443 "name": communityName,
444 "displayName": "Test PLC Community",
445 "description": "Testing PLC resolution",
446 "owner": "did:web:coves.local",
447 "createdBy": "did:plc:user123",
448 "hostedBy": "did:web:coves.local",
449 "visibility": "public",
450 "federation": map[string]interface{}{
451 "allowExternalDiscovery": true,
452 },
453 "createdAt": time.Now().Format(time.RFC3339),
454 },
455 },
456 }
457
458 // Handle the event
459 if err := consumer.HandleEvent(ctx, event); err != nil {
460 t.Fatalf("Failed to handle event: %v", err)
461 }
462
463 // Verify mock was called
464 if mockResolver.callCount != 1 {
465 t.Errorf("Expected 1 PLC resolution call, got %d", mockResolver.callCount)
466 }
467 if mockResolver.lastDID != communityDID {
468 t.Errorf("Expected PLC resolution for DID %s, got %s", communityDID, mockResolver.lastDID)
469 }
470
471 // Verify community was indexed with PLC-resolved handle
472 community, err := repo.GetByDID(ctx, communityDID)
473 if err != nil {
474 t.Fatalf("Failed to get indexed community: %v", err)
475 }
476
477 if community.Handle != expectedHandle {
478 t.Errorf("Expected handle %s from PLC, got %s", expectedHandle, community.Handle)
479 }
480 })
481
482 t.Run("fails when PLC resolution fails (no fallback)", func(t *testing.T) {
483 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
484 communityDID := generateTestDID(uniqueSuffix)
485 communityName := fmt.Sprintf("test-plc-fail-%s", uniqueSuffix)
486
487 // Create mock resolver that fails
488 mockResolver := newMockIdentityResolver()
489 mockResolver.shouldFail = true
490
491 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver)
492
493 // Simulate Jetstream event without handle in record
494 event := &jetstream.JetstreamEvent{
495 Did: communityDID,
496 TimeUS: time.Now().UnixMicro(),
497 Kind: "commit",
498 Commit: &jetstream.CommitEvent{
499 Rev: "rev456",
500 Operation: "create",
501 Collection: "social.coves.community.profile",
502 RKey: "self",
503 CID: "bafy456def",
504 Record: map[string]interface{}{
505 "name": communityName,
506 "displayName": "Test PLC Failure",
507 "description": "Testing PLC failure",
508 "owner": "did:web:coves.local",
509 "createdBy": "did:plc:user123",
510 "hostedBy": "did:web:coves.local",
511 "visibility": "public",
512 "federation": map[string]interface{}{
513 "allowExternalDiscovery": true,
514 },
515 "createdAt": time.Now().Format(time.RFC3339),
516 },
517 },
518 }
519
520 // Handle the event - should fail
521 err := consumer.HandleEvent(ctx, event)
522 if err == nil {
523 t.Fatal("Expected error when PLC resolution fails, got nil")
524 }
525
526 // Verify error message indicates PLC failure
527 expectedErrSubstring := "failed to resolve handle from PLC"
528 if !contains(err.Error(), expectedErrSubstring) {
529 t.Errorf("Expected error containing '%s', got: %v", expectedErrSubstring, err)
530 }
531
532 // Verify community was NOT indexed
533 _, err = repo.GetByDID(ctx, communityDID)
534 if !communities.IsNotFound(err) {
535 t.Errorf("Expected community NOT to be indexed when PLC fails, but got: %v", err)
536 }
537
538 // Verify mock was called (failure happened during resolution, not before)
539 if mockResolver.callCount != 1 {
540 t.Errorf("Expected 1 PLC resolution attempt, got %d", mockResolver.callCount)
541 }
542 })
543
544 t.Run("test mode rejects invalid hostedBy format", func(t *testing.T) {
545 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
546 communityDID := generateTestDID(uniqueSuffix)
547 communityName := fmt.Sprintf("test-invalid-hosted-%s", uniqueSuffix)
548
549 // No identity resolver (test mode)
550 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil)
551
552 // Event with invalid hostedBy format (not did:web)
553 event := &jetstream.JetstreamEvent{
554 Did: communityDID,
555 TimeUS: time.Now().UnixMicro(),
556 Kind: "commit",
557 Commit: &jetstream.CommitEvent{
558 Rev: "rev789",
559 Operation: "create",
560 Collection: "social.coves.community.profile",
561 RKey: "self",
562 CID: "bafy789ghi",
563 Record: map[string]interface{}{
564 "name": communityName,
565 "displayName": "Test Invalid HostedBy",
566 "description": "Testing validation",
567 "owner": "did:web:coves.local",
568 "createdBy": "did:plc:user123",
569 "hostedBy": "did:plc:invalid", // Invalid format - not did:web
570 "visibility": "public",
571 "federation": map[string]interface{}{
572 "allowExternalDiscovery": true,
573 },
574 "createdAt": time.Now().Format(time.RFC3339),
575 },
576 },
577 }
578
579 // Handle the event - should fail due to empty handle
580 err := consumer.HandleEvent(ctx, event)
581 if err == nil {
582 t.Fatal("Expected error for invalid hostedBy format in test mode, got nil")
583 }
584
585 // Verify error is about handle being required
586 expectedErrSubstring := "handle is required"
587 if !contains(err.Error(), expectedErrSubstring) {
588 t.Errorf("Expected error containing '%s', got: %v", expectedErrSubstring, err)
589 }
590 })
591}