A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/identity"
5 "Coves/internal/atproto/jetstream"
6 "Coves/internal/core/communities"
7 "Coves/internal/db/postgres"
8 "context"
9 "errors"
10 "fmt"
11 "testing"
12 "time"
13)
14
15func TestCommunityConsumer_HandleCommunityProfile(t *testing.T) {
16 db := setupTestDB(t)
17 defer func() {
18 if err := db.Close(); err != nil {
19 t.Logf("Failed to close database: %v", err)
20 }
21 }()
22
23 repo := postgres.NewCommunityRepository(db)
24 ctx := context.Background()
25
26 t.Run("creates community from firehose event", func(t *testing.T) {
27 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
28 communityDID := generateTestDID(uniqueSuffix)
29 communityName := fmt.Sprintf("test-community-%s", uniqueSuffix)
30 expectedHandle := fmt.Sprintf("%s.community.coves.local", communityName)
31
32 // Set up mock resolver for this test DID
33 mockResolver := newMockIdentityResolver()
34 mockResolver.resolutions[communityDID] = expectedHandle
35 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver)
36
37 // Simulate a Jetstream commit event
38 event := &jetstream.JetstreamEvent{
39 Did: communityDID,
40 TimeUS: time.Now().UnixMicro(),
41 Kind: "commit",
42 Commit: &jetstream.CommitEvent{
43 Rev: "rev123",
44 Operation: "create",
45 Collection: "social.coves.community.profile",
46 RKey: "self",
47 CID: "bafy123abc",
48 Record: map[string]interface{}{
49 // Note: No 'did', 'handle', 'memberCount', or 'subscriberCount' in record
50 // These are resolved/computed by AppView, not stored in immutable records
51 "name": communityName,
52 "displayName": "Test Community",
53 "description": "A test community",
54 "owner": "did:web:coves.local",
55 "createdBy": "did:plc:user123",
56 "hostedBy": "did:web:coves.local",
57 "visibility": "public",
58 "federation": map[string]interface{}{
59 "allowExternalDiscovery": true,
60 },
61 "createdAt": time.Now().Format(time.RFC3339),
62 },
63 },
64 }
65
66 // Handle the event
67 if err := consumer.HandleEvent(ctx, event); err != nil {
68 t.Fatalf("Failed to handle event: %v", err)
69 }
70
71 // Verify community was indexed
72 community, err := repo.GetByDID(ctx, communityDID)
73 if err != nil {
74 t.Fatalf("Failed to get indexed community: %v", err)
75 }
76
77 if community.DID != communityDID {
78 t.Errorf("Expected DID %s, got %s", communityDID, community.DID)
79 }
80 if community.DisplayName != "Test Community" {
81 t.Errorf("Expected DisplayName 'Test Community', got %s", community.DisplayName)
82 }
83 if community.Visibility != "public" {
84 t.Errorf("Expected Visibility 'public', got %s", community.Visibility)
85 }
86 })
87
88 t.Run("updates existing community", func(t *testing.T) {
89 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
90 communityDID := generateTestDID(uniqueSuffix)
91 communityName := fmt.Sprintf("update-test-%s", uniqueSuffix)
92 expectedHandle := fmt.Sprintf("%s.community.coves.local", communityName)
93
94 // Set up mock resolver for this test DID
95 mockResolver := newMockIdentityResolver()
96 mockResolver.resolutions[communityDID] = expectedHandle
97 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver)
98
99 // Create initial community
100 initialCommunity := &communities.Community{
101 DID: communityDID,
102 Handle: expectedHandle,
103 Name: communityName,
104 DisplayName: "Original Name",
105 Description: "Original description",
106 OwnerDID: "did:web:coves.local",
107 CreatedByDID: "did:plc:user123",
108 HostedByDID: "did:web:coves.local",
109 Visibility: "public",
110 AllowExternalDiscovery: true,
111 CreatedAt: time.Now(),
112 UpdatedAt: time.Now(),
113 }
114
115 if _, err := repo.Create(ctx, initialCommunity); err != nil {
116 t.Fatalf("Failed to create initial community: %v", err)
117 }
118
119 // Simulate update event
120 updateEvent := &jetstream.JetstreamEvent{
121 Did: communityDID,
122 TimeUS: time.Now().UnixMicro(),
123 Kind: "commit",
124 Commit: &jetstream.CommitEvent{
125 Rev: "rev124",
126 Operation: "update",
127 Collection: "social.coves.community.profile",
128 RKey: "self",
129 CID: "bafy456def",
130 Record: map[string]interface{}{
131 // Note: No 'did', 'handle', 'memberCount', or 'subscriberCount' in record
132 // These are resolved/computed by AppView, not stored in immutable records
133 "name": "update-test",
134 "displayName": "Updated Name",
135 "description": "Updated description",
136 "owner": "did:web:coves.local",
137 "createdBy": "did:plc:user123",
138 "hostedBy": "did:web:coves.local",
139 "visibility": "unlisted",
140 "federation": map[string]interface{}{
141 "allowExternalDiscovery": false,
142 },
143 "createdAt": time.Now().Format(time.RFC3339),
144 },
145 },
146 }
147
148 // Handle the update
149 if err := consumer.HandleEvent(ctx, updateEvent); err != nil {
150 t.Fatalf("Failed to handle update event: %v", err)
151 }
152
153 // Verify community was updated
154 updated, err := repo.GetByDID(ctx, communityDID)
155 if err != nil {
156 t.Fatalf("Failed to get updated community: %v", err)
157 }
158
159 if updated.DisplayName != "Updated Name" {
160 t.Errorf("Expected DisplayName 'Updated Name', got %s", updated.DisplayName)
161 }
162 if updated.Description != "Updated description" {
163 t.Errorf("Expected Description 'Updated description', got %s", updated.Description)
164 }
165 if updated.Visibility != "unlisted" {
166 t.Errorf("Expected Visibility 'unlisted', got %s", updated.Visibility)
167 }
168 if updated.AllowExternalDiscovery {
169 t.Error("Expected AllowExternalDiscovery to be false")
170 }
171 })
172
173 t.Run("deletes community", func(t *testing.T) {
174 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
175 communityDID := generateTestDID(uniqueSuffix)
176 communityName := fmt.Sprintf("delete-test-%s", uniqueSuffix)
177 expectedHandle := fmt.Sprintf("%s.community.coves.local", communityName)
178
179 // Set up mock resolver for this test DID
180 mockResolver := newMockIdentityResolver()
181 mockResolver.resolutions[communityDID] = expectedHandle
182 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver)
183
184 // Create community to delete
185 community := &communities.Community{
186 DID: communityDID,
187 Handle: expectedHandle,
188 Name: communityName,
189 OwnerDID: "did:web:coves.local",
190 CreatedByDID: "did:plc:user123",
191 HostedByDID: "did:web:coves.local",
192 Visibility: "public",
193 CreatedAt: time.Now(),
194 UpdatedAt: time.Now(),
195 }
196
197 if _, err := repo.Create(ctx, community); err != nil {
198 t.Fatalf("Failed to create community: %v", err)
199 }
200
201 // Simulate delete event
202 deleteEvent := &jetstream.JetstreamEvent{
203 Did: communityDID,
204 TimeUS: time.Now().UnixMicro(),
205 Kind: "commit",
206 Commit: &jetstream.CommitEvent{
207 Rev: "rev125",
208 Operation: "delete",
209 Collection: "social.coves.community.profile",
210 RKey: "self",
211 },
212 }
213
214 // Handle the delete
215 if err := consumer.HandleEvent(ctx, deleteEvent); err != nil {
216 t.Fatalf("Failed to handle delete event: %v", err)
217 }
218
219 // Verify community was deleted
220 if _, err := repo.GetByDID(ctx, communityDID); err != communities.ErrCommunityNotFound {
221 t.Errorf("Expected ErrCommunityNotFound, got: %v", err)
222 }
223 })
224}
225
226func TestCommunityConsumer_HandleSubscription(t *testing.T) {
227 db := setupTestDB(t)
228 defer func() {
229 if err := db.Close(); err != nil {
230 t.Logf("Failed to close database: %v", err)
231 }
232 }()
233
234 repo := postgres.NewCommunityRepository(db)
235 ctx := context.Background()
236
237 t.Run("creates subscription from event", func(t *testing.T) {
238 // Create a community first
239 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
240 communityDID := generateTestDID(uniqueSuffix)
241 communityName := fmt.Sprintf("sub-test-%s", uniqueSuffix)
242 expectedHandle := fmt.Sprintf("%s.community.coves.local", communityName)
243
244 // Set up mock resolver for this test DID
245 mockResolver := newMockIdentityResolver()
246 mockResolver.resolutions[communityDID] = expectedHandle
247 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver)
248
249 community := &communities.Community{
250 DID: communityDID,
251 Handle: expectedHandle,
252 Name: communityName,
253 OwnerDID: "did:web:coves.local",
254 CreatedByDID: "did:plc:user123",
255 HostedByDID: "did:web:coves.local",
256 Visibility: "public",
257 CreatedAt: time.Now(),
258 UpdatedAt: time.Now(),
259 }
260
261 if _, err := repo.Create(ctx, community); err != nil {
262 t.Fatalf("Failed to create community: %v", err)
263 }
264
265 // Simulate subscription event
266 // IMPORTANT: Use correct collection name (record type, not XRPC procedure)
267 userDID := "did:plc:subscriber123"
268 subEvent := &jetstream.JetstreamEvent{
269 Did: userDID,
270 TimeUS: time.Now().UnixMicro(),
271 Kind: "commit",
272 Commit: &jetstream.CommitEvent{
273 Rev: "rev200",
274 Operation: "create",
275 Collection: "social.coves.community.subscription", // Updated to communities namespace
276 RKey: "sub123",
277 CID: "bafy789ghi",
278 Record: map[string]interface{}{
279 "subject": communityDID, // Using 'subject' per atProto conventions
280 "contentVisibility": 3,
281 "createdAt": time.Now().Format(time.RFC3339),
282 },
283 },
284 }
285
286 // Handle the subscription
287 if err := consumer.HandleEvent(ctx, subEvent); err != nil {
288 t.Fatalf("Failed to handle subscription event: %v", err)
289 }
290
291 // Verify subscription was created
292 subscription, err := repo.GetSubscription(ctx, userDID, communityDID)
293 if err != nil {
294 t.Fatalf("Failed to get subscription: %v", err)
295 }
296
297 if subscription.UserDID != userDID {
298 t.Errorf("Expected UserDID %s, got %s", userDID, subscription.UserDID)
299 }
300 if subscription.CommunityDID != communityDID {
301 t.Errorf("Expected CommunityDID %s, got %s", communityDID, subscription.CommunityDID)
302 }
303
304 // Verify subscriber count was incremented
305 updated, err := repo.GetByDID(ctx, communityDID)
306 if err != nil {
307 t.Fatalf("Failed to get community: %v", err)
308 }
309
310 if updated.SubscriberCount != 1 {
311 t.Errorf("Expected SubscriberCount 1, got %d", updated.SubscriberCount)
312 }
313 })
314}
315
316func TestCommunityConsumer_IgnoresNonCommunityEvents(t *testing.T) {
317 db := setupTestDB(t)
318 defer func() {
319 if err := db.Close(); err != nil {
320 t.Logf("Failed to close database: %v", err)
321 }
322 }()
323
324 repo := postgres.NewCommunityRepository(db)
325 // Use mock resolver (though these tests don't create communities, so it won't be called)
326 mockResolver := newMockIdentityResolver()
327 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver)
328 ctx := context.Background()
329
330 t.Run("ignores identity events", func(t *testing.T) {
331 event := &jetstream.JetstreamEvent{
332 Did: "did:plc:user123",
333 TimeUS: time.Now().UnixMicro(),
334 Kind: "identity",
335 Identity: &jetstream.IdentityEvent{
336 Did: "did:plc:user123",
337 Handle: "alice.bsky.social",
338 },
339 }
340
341 err := consumer.HandleEvent(ctx, event)
342 if err != nil {
343 t.Errorf("Expected no error for identity event, got: %v", err)
344 }
345 })
346
347 t.Run("ignores non-community collections", func(t *testing.T) {
348 event := &jetstream.JetstreamEvent{
349 Did: "did:plc:user123",
350 TimeUS: time.Now().UnixMicro(),
351 Kind: "commit",
352 Commit: &jetstream.CommitEvent{
353 Rev: "rev300",
354 Operation: "create",
355 Collection: "app.bsky.communityFeed.post",
356 RKey: "post123",
357 Record: map[string]interface{}{
358 "text": "Hello world",
359 },
360 },
361 }
362
363 err := consumer.HandleEvent(ctx, event)
364 if err != nil {
365 t.Errorf("Expected no error for non-community event, got: %v", err)
366 }
367 })
368}
369
370// mockIdentityResolver is a test double for identity resolution
371type mockIdentityResolver struct {
372 resolutions map[string]string
373 lastDID string
374 callCount int
375 shouldFail bool
376}
377
378func newMockIdentityResolver() *mockIdentityResolver {
379 return &mockIdentityResolver{
380 resolutions: make(map[string]string),
381 }
382}
383
384func (m *mockIdentityResolver) Resolve(ctx context.Context, did string) (*identity.Identity, error) {
385 m.callCount++
386 m.lastDID = did
387
388 if m.shouldFail {
389 return nil, errors.New("mock PLC resolution failure")
390 }
391
392 handle, ok := m.resolutions[did]
393 if !ok {
394 return nil, fmt.Errorf("no resolution configured for DID: %s", did)
395 }
396
397 return &identity.Identity{
398 DID: did,
399 Handle: handle,
400 PDSURL: "https://pds.example.com",
401 ResolvedAt: time.Now(),
402 Method: identity.MethodHTTPS,
403 }, nil
404}
405
406func TestCommunityConsumer_PLCHandleResolution(t *testing.T) {
407 db := setupTestDB(t)
408 defer func() {
409 if err := db.Close(); err != nil {
410 t.Logf("Failed to close database: %v", err)
411 }
412 }()
413
414 repo := postgres.NewCommunityRepository(db)
415 ctx := context.Background()
416
417 t.Run("resolves handle from PLC successfully", func(t *testing.T) {
418 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
419 communityDID := generateTestDID(uniqueSuffix)
420 communityName := fmt.Sprintf("test-plc-%s", uniqueSuffix)
421 expectedHandle := fmt.Sprintf("%s.community.coves.social", communityName)
422
423 // Create mock resolver
424 mockResolver := newMockIdentityResolver()
425 mockResolver.resolutions[communityDID] = expectedHandle
426
427 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver)
428
429 // Simulate Jetstream event without handle in record
430 event := &jetstream.JetstreamEvent{
431 Did: communityDID,
432 TimeUS: time.Now().UnixMicro(),
433 Kind: "commit",
434 Commit: &jetstream.CommitEvent{
435 Rev: "rev123",
436 Operation: "create",
437 Collection: "social.coves.community.profile",
438 RKey: "self",
439 CID: "bafy123abc",
440 Record: map[string]interface{}{
441 // No handle field - should trigger PLC resolution
442 "name": communityName,
443 "displayName": "Test PLC Community",
444 "description": "Testing PLC resolution",
445 "owner": "did:web:coves.local",
446 "createdBy": "did:plc:user123",
447 "hostedBy": "did:web:coves.local",
448 "visibility": "public",
449 "federation": map[string]interface{}{
450 "allowExternalDiscovery": true,
451 },
452 "createdAt": time.Now().Format(time.RFC3339),
453 },
454 },
455 }
456
457 // Handle the event
458 if err := consumer.HandleEvent(ctx, event); err != nil {
459 t.Fatalf("Failed to handle event: %v", err)
460 }
461
462 // Verify mock was called
463 if mockResolver.callCount != 1 {
464 t.Errorf("Expected 1 PLC resolution call, got %d", mockResolver.callCount)
465 }
466 if mockResolver.lastDID != communityDID {
467 t.Errorf("Expected PLC resolution for DID %s, got %s", communityDID, mockResolver.lastDID)
468 }
469
470 // Verify community was indexed with PLC-resolved handle
471 community, err := repo.GetByDID(ctx, communityDID)
472 if err != nil {
473 t.Fatalf("Failed to get indexed community: %v", err)
474 }
475
476 if community.Handle != expectedHandle {
477 t.Errorf("Expected handle %s from PLC, got %s", expectedHandle, community.Handle)
478 }
479 })
480
481 t.Run("fails when PLC resolution fails (no fallback)", func(t *testing.T) {
482 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
483 communityDID := generateTestDID(uniqueSuffix)
484 communityName := fmt.Sprintf("test-plc-fail-%s", uniqueSuffix)
485
486 // Create mock resolver that fails
487 mockResolver := newMockIdentityResolver()
488 mockResolver.shouldFail = true
489
490 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, mockResolver)
491
492 // Simulate Jetstream event without handle in record
493 event := &jetstream.JetstreamEvent{
494 Did: communityDID,
495 TimeUS: time.Now().UnixMicro(),
496 Kind: "commit",
497 Commit: &jetstream.CommitEvent{
498 Rev: "rev456",
499 Operation: "create",
500 Collection: "social.coves.community.profile",
501 RKey: "self",
502 CID: "bafy456def",
503 Record: map[string]interface{}{
504 "name": communityName,
505 "displayName": "Test PLC Failure",
506 "description": "Testing PLC failure",
507 "owner": "did:web:coves.local",
508 "createdBy": "did:plc:user123",
509 "hostedBy": "did:web:coves.local",
510 "visibility": "public",
511 "federation": map[string]interface{}{
512 "allowExternalDiscovery": true,
513 },
514 "createdAt": time.Now().Format(time.RFC3339),
515 },
516 },
517 }
518
519 // Handle the event - should fail
520 err := consumer.HandleEvent(ctx, event)
521 if err == nil {
522 t.Fatal("Expected error when PLC resolution fails, got nil")
523 }
524
525 // Verify error message indicates PLC failure
526 expectedErrSubstring := "failed to resolve handle from PLC"
527 if !contains(err.Error(), expectedErrSubstring) {
528 t.Errorf("Expected error containing '%s', got: %v", expectedErrSubstring, err)
529 }
530
531 // Verify community was NOT indexed
532 _, err = repo.GetByDID(ctx, communityDID)
533 if !communities.IsNotFound(err) {
534 t.Errorf("Expected community NOT to be indexed when PLC fails, but got: %v", err)
535 }
536
537 // Verify mock was called (failure happened during resolution, not before)
538 if mockResolver.callCount != 1 {
539 t.Errorf("Expected 1 PLC resolution attempt, got %d", mockResolver.callCount)
540 }
541 })
542
543 t.Run("test mode rejects invalid hostedBy format", func(t *testing.T) {
544 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
545 communityDID := generateTestDID(uniqueSuffix)
546 communityName := fmt.Sprintf("test-invalid-hosted-%s", uniqueSuffix)
547
548 // No identity resolver (test mode)
549 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil)
550
551 // Event with invalid hostedBy format (not did:web)
552 event := &jetstream.JetstreamEvent{
553 Did: communityDID,
554 TimeUS: time.Now().UnixMicro(),
555 Kind: "commit",
556 Commit: &jetstream.CommitEvent{
557 Rev: "rev789",
558 Operation: "create",
559 Collection: "social.coves.community.profile",
560 RKey: "self",
561 CID: "bafy789ghi",
562 Record: map[string]interface{}{
563 "name": communityName,
564 "displayName": "Test Invalid HostedBy",
565 "description": "Testing validation",
566 "owner": "did:web:coves.local",
567 "createdBy": "did:plc:user123",
568 "hostedBy": "did:plc:invalid", // Invalid format - not did:web
569 "visibility": "public",
570 "federation": map[string]interface{}{
571 "allowExternalDiscovery": true,
572 },
573 "createdAt": time.Now().Format(time.RFC3339),
574 },
575 },
576 }
577
578 // Handle the event - should fail due to empty handle
579 err := consumer.HandleEvent(ctx, event)
580 if err == nil {
581 t.Fatal("Expected error for invalid hostedBy format in test mode, got nil")
582 }
583
584 // Verify error is about handle being required
585 expectedErrSubstring := "handle is required"
586 if !contains(err.Error(), expectedErrSubstring) {
587 t.Errorf("Expected error containing '%s', got: %v", expectedErrSubstring, err)
588 }
589 })
590}