A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "context"
5 "fmt"
6 "testing"
7 "time"
8
9 "Coves/internal/atproto/did"
10 "Coves/internal/atproto/jetstream"
11 "Coves/internal/core/communities"
12 "Coves/internal/db/postgres"
13)
14
15func TestCommunityConsumer_HandleCommunityProfile(t *testing.T) {
16 db := setupTestDB(t)
17 defer db.Close()
18
19 repo := postgres.NewCommunityRepository(db)
20 consumer := jetstream.NewCommunityEventConsumer(repo)
21 didGen := did.NewGenerator(true, "https://plc.directory")
22 ctx := context.Background()
23
24 t.Run("creates community from firehose event", func(t *testing.T) {
25 communityDID, _ := didGen.GenerateCommunityDID()
26 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
27
28 // Simulate a Jetstream commit event
29 event := &jetstream.JetstreamEvent{
30 Did: communityDID,
31 TimeUS: time.Now().UnixMicro(),
32 Kind: "commit",
33 Commit: &jetstream.CommitEvent{
34 Rev: "rev123",
35 Operation: "create",
36 Collection: "social.coves.community.profile",
37 RKey: "self",
38 CID: "bafy123abc",
39 Record: map[string]interface{}{
40 "did": communityDID, // Community's unique DID
41 "handle": fmt.Sprintf("!test-community-%s@coves.local", uniqueSuffix),
42 "name": "test-community",
43 "displayName": "Test Community",
44 "description": "A test community",
45 "owner": "did:web:coves.local",
46 "createdBy": "did:plc:user123",
47 "hostedBy": "did:web:coves.local",
48 "visibility": "public",
49 "federation": map[string]interface{}{
50 "allowExternalDiscovery": true,
51 },
52 "memberCount": 0,
53 "subscriberCount": 0,
54 "createdAt": time.Now().Format(time.RFC3339),
55 },
56 },
57 }
58
59 // Handle the event
60 err := consumer.HandleEvent(ctx, event)
61 if err != nil {
62 t.Fatalf("Failed to handle event: %v", err)
63 }
64
65 // Verify community was indexed
66 community, err := repo.GetByDID(ctx, communityDID)
67 if err != nil {
68 t.Fatalf("Failed to get indexed community: %v", err)
69 }
70
71 if community.DID != communityDID {
72 t.Errorf("Expected DID %s, got %s", communityDID, community.DID)
73 }
74 if community.DisplayName != "Test Community" {
75 t.Errorf("Expected DisplayName 'Test Community', got %s", community.DisplayName)
76 }
77 if community.Visibility != "public" {
78 t.Errorf("Expected Visibility 'public', got %s", community.Visibility)
79 }
80 })
81
82 t.Run("updates existing community", func(t *testing.T) {
83 communityDID, _ := didGen.GenerateCommunityDID()
84 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
85 handle := fmt.Sprintf("!update-test-%s@coves.local", uniqueSuffix)
86
87 // Create initial community
88 initialCommunity := &communities.Community{
89 DID: communityDID,
90 Handle: handle,
91 Name: "update-test",
92 DisplayName: "Original Name",
93 Description: "Original description",
94 OwnerDID: "did:web:coves.local",
95 CreatedByDID: "did:plc:user123",
96 HostedByDID: "did:web:coves.local",
97 Visibility: "public",
98 AllowExternalDiscovery: true,
99 CreatedAt: time.Now(),
100 UpdatedAt: time.Now(),
101 }
102
103 _, err := repo.Create(ctx, initialCommunity)
104 if err != nil {
105 t.Fatalf("Failed to create initial community: %v", err)
106 }
107
108 // Simulate update event
109 updateEvent := &jetstream.JetstreamEvent{
110 Did: communityDID,
111 TimeUS: time.Now().UnixMicro(),
112 Kind: "commit",
113 Commit: &jetstream.CommitEvent{
114 Rev: "rev124",
115 Operation: "update",
116 Collection: "social.coves.community.profile",
117 RKey: "self",
118 CID: "bafy456def",
119 Record: map[string]interface{}{
120 "did": communityDID, // Community's unique DID
121 "handle": handle,
122 "name": "update-test",
123 "displayName": "Updated Name",
124 "description": "Updated description",
125 "owner": "did:web:coves.local",
126 "createdBy": "did:plc:user123",
127 "hostedBy": "did:web:coves.local",
128 "visibility": "unlisted",
129 "federation": map[string]interface{}{
130 "allowExternalDiscovery": false,
131 },
132 "memberCount": 5,
133 "subscriberCount": 10,
134 "createdAt": time.Now().Format(time.RFC3339),
135 },
136 },
137 }
138
139 // Handle the update
140 err = consumer.HandleEvent(ctx, updateEvent)
141 if err != nil {
142 t.Fatalf("Failed to handle update event: %v", err)
143 }
144
145 // Verify community was updated
146 updated, err := repo.GetByDID(ctx, communityDID)
147 if err != nil {
148 t.Fatalf("Failed to get updated community: %v", err)
149 }
150
151 if updated.DisplayName != "Updated Name" {
152 t.Errorf("Expected DisplayName 'Updated Name', got %s", updated.DisplayName)
153 }
154 if updated.Description != "Updated description" {
155 t.Errorf("Expected Description 'Updated description', got %s", updated.Description)
156 }
157 if updated.Visibility != "unlisted" {
158 t.Errorf("Expected Visibility 'unlisted', got %s", updated.Visibility)
159 }
160 if updated.AllowExternalDiscovery {
161 t.Error("Expected AllowExternalDiscovery to be false")
162 }
163 })
164
165 t.Run("deletes community", func(t *testing.T) {
166 communityDID, _ := didGen.GenerateCommunityDID()
167 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
168
169 // Create community to delete
170 community := &communities.Community{
171 DID: communityDID,
172 Handle: fmt.Sprintf("!delete-test-%s@coves.local", uniqueSuffix),
173 Name: "delete-test",
174 OwnerDID: "did:web:coves.local",
175 CreatedByDID: "did:plc:user123",
176 HostedByDID: "did:web:coves.local",
177 Visibility: "public",
178 CreatedAt: time.Now(),
179 UpdatedAt: time.Now(),
180 }
181
182 _, err := repo.Create(ctx, community)
183 if err != nil {
184 t.Fatalf("Failed to create community: %v", err)
185 }
186
187 // Simulate delete event
188 deleteEvent := &jetstream.JetstreamEvent{
189 Did: communityDID,
190 TimeUS: time.Now().UnixMicro(),
191 Kind: "commit",
192 Commit: &jetstream.CommitEvent{
193 Rev: "rev125",
194 Operation: "delete",
195 Collection: "social.coves.community.profile",
196 RKey: "self",
197 },
198 }
199
200 // Handle the delete
201 err = consumer.HandleEvent(ctx, deleteEvent)
202 if err != nil {
203 t.Fatalf("Failed to handle delete event: %v", err)
204 }
205
206 // Verify community was deleted
207 _, err = repo.GetByDID(ctx, communityDID)
208 if err != communities.ErrCommunityNotFound {
209 t.Errorf("Expected ErrCommunityNotFound, got: %v", err)
210 }
211 })
212}
213
214func TestCommunityConsumer_HandleSubscription(t *testing.T) {
215 db := setupTestDB(t)
216 defer db.Close()
217
218 repo := postgres.NewCommunityRepository(db)
219 consumer := jetstream.NewCommunityEventConsumer(repo)
220 didGen := did.NewGenerator(true, "https://plc.directory")
221 ctx := context.Background()
222
223 t.Run("creates subscription from event", func(t *testing.T) {
224 // Create a community first
225 communityDID, _ := didGen.GenerateCommunityDID()
226 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
227
228 community := &communities.Community{
229 DID: communityDID,
230 Handle: fmt.Sprintf("!sub-test-%s@coves.local", uniqueSuffix),
231 Name: "sub-test",
232 OwnerDID: "did:web:coves.local",
233 CreatedByDID: "did:plc:user123",
234 HostedByDID: "did:web:coves.local",
235 Visibility: "public",
236 CreatedAt: time.Now(),
237 UpdatedAt: time.Now(),
238 }
239
240 _, err := repo.Create(ctx, community)
241 if err != nil {
242 t.Fatalf("Failed to create community: %v", err)
243 }
244
245 // Simulate subscription event
246 userDID := "did:plc:subscriber123"
247 subEvent := &jetstream.JetstreamEvent{
248 Did: userDID,
249 TimeUS: time.Now().UnixMicro(),
250 Kind: "commit",
251 Commit: &jetstream.CommitEvent{
252 Rev: "rev200",
253 Operation: "create",
254 Collection: "social.coves.community.subscribe",
255 RKey: "sub123",
256 CID: "bafy789ghi",
257 Record: map[string]interface{}{
258 "community": communityDID,
259 },
260 },
261 }
262
263 // Handle the subscription
264 err = consumer.HandleEvent(ctx, subEvent)
265 if err != nil {
266 t.Fatalf("Failed to handle subscription event: %v", err)
267 }
268
269 // Verify subscription was created
270 subscription, err := repo.GetSubscription(ctx, userDID, communityDID)
271 if err != nil {
272 t.Fatalf("Failed to get subscription: %v", err)
273 }
274
275 if subscription.UserDID != userDID {
276 t.Errorf("Expected UserDID %s, got %s", userDID, subscription.UserDID)
277 }
278 if subscription.CommunityDID != communityDID {
279 t.Errorf("Expected CommunityDID %s, got %s", communityDID, subscription.CommunityDID)
280 }
281
282 // Verify subscriber count was incremented
283 updated, err := repo.GetByDID(ctx, communityDID)
284 if err != nil {
285 t.Fatalf("Failed to get community: %v", err)
286 }
287
288 if updated.SubscriberCount != 1 {
289 t.Errorf("Expected SubscriberCount 1, got %d", updated.SubscriberCount)
290 }
291 })
292}
293
294func TestCommunityConsumer_IgnoresNonCommunityEvents(t *testing.T) {
295 db := setupTestDB(t)
296 defer db.Close()
297
298 repo := postgres.NewCommunityRepository(db)
299 consumer := jetstream.NewCommunityEventConsumer(repo)
300 ctx := context.Background()
301
302 t.Run("ignores identity events", func(t *testing.T) {
303 event := &jetstream.JetstreamEvent{
304 Did: "did:plc:user123",
305 TimeUS: time.Now().UnixMicro(),
306 Kind: "identity",
307 Identity: &jetstream.IdentityEvent{
308 Did: "did:plc:user123",
309 Handle: "alice.bsky.social",
310 },
311 }
312
313 err := consumer.HandleEvent(ctx, event)
314 if err != nil {
315 t.Errorf("Expected no error for identity event, got: %v", err)
316 }
317 })
318
319 t.Run("ignores non-community collections", func(t *testing.T) {
320 event := &jetstream.JetstreamEvent{
321 Did: "did:plc:user123",
322 TimeUS: time.Now().UnixMicro(),
323 Kind: "commit",
324 Commit: &jetstream.CommitEvent{
325 Rev: "rev300",
326 Operation: "create",
327 Collection: "app.bsky.feed.post",
328 RKey: "post123",
329 Record: map[string]interface{}{
330 "text": "Hello world",
331 },
332 },
333 }
334
335 err := consumer.HandleEvent(ctx, event)
336 if err != nil {
337 t.Errorf("Expected no error for non-community event, got: %v", err)
338 }
339 })
340}