A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/did"
5 "Coves/internal/atproto/jetstream"
6 "Coves/internal/core/communities"
7 "Coves/internal/db/postgres"
8 "context"
9 "fmt"
10 "testing"
11 "time"
12)
13
14func TestCommunityConsumer_HandleCommunityProfile(t *testing.T) {
15 db := setupTestDB(t)
16 defer func() {
17 if err := db.Close(); err != nil {
18 t.Logf("Failed to close database: %v", err)
19 }
20 }()
21
22 repo := postgres.NewCommunityRepository(db)
23 consumer := jetstream.NewCommunityEventConsumer(repo)
24 didGen := did.NewGenerator(true, "https://plc.directory")
25 ctx := context.Background()
26
27 t.Run("creates community from firehose event", func(t *testing.T) {
28 communityDID, err := didGen.GenerateCommunityDID()
29 if err != nil {
30 t.Fatalf("Failed to generate community DID: %v", err)
31 }
32 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
33
34 // Simulate a Jetstream commit event
35 event := &jetstream.JetstreamEvent{
36 Did: communityDID,
37 TimeUS: time.Now().UnixMicro(),
38 Kind: "commit",
39 Commit: &jetstream.CommitEvent{
40 Rev: "rev123",
41 Operation: "create",
42 Collection: "social.coves.community.profile",
43 RKey: "self",
44 CID: "bafy123abc",
45 Record: map[string]interface{}{
46 "did": communityDID, // Community's unique DID
47 "handle": fmt.Sprintf("!test-community-%s@coves.local", uniqueSuffix),
48 "name": "test-community",
49 "displayName": "Test Community",
50 "description": "A test community",
51 "owner": "did:web:coves.local",
52 "createdBy": "did:plc:user123",
53 "hostedBy": "did:web:coves.local",
54 "visibility": "public",
55 "federation": map[string]interface{}{
56 "allowExternalDiscovery": true,
57 },
58 "memberCount": 0,
59 "subscriberCount": 0,
60 "createdAt": time.Now().Format(time.RFC3339),
61 },
62 },
63 }
64
65 // Handle the event
66 if err := consumer.HandleEvent(ctx, event); err != nil {
67 t.Fatalf("Failed to handle event: %v", err)
68 }
69
70 // Verify community was indexed
71 community, err := repo.GetByDID(ctx, communityDID)
72 if err != nil {
73 t.Fatalf("Failed to get indexed community: %v", err)
74 }
75
76 if community.DID != communityDID {
77 t.Errorf("Expected DID %s, got %s", communityDID, community.DID)
78 }
79 if community.DisplayName != "Test Community" {
80 t.Errorf("Expected DisplayName 'Test Community', got %s", community.DisplayName)
81 }
82 if community.Visibility != "public" {
83 t.Errorf("Expected Visibility 'public', got %s", community.Visibility)
84 }
85 })
86
87 t.Run("updates existing community", func(t *testing.T) {
88 communityDID, err := didGen.GenerateCommunityDID()
89 if err != nil {
90 t.Fatalf("Failed to generate community DID: %v", err)
91 }
92 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
93 handle := fmt.Sprintf("!update-test-%s@coves.local", uniqueSuffix)
94
95 // Create initial community
96 initialCommunity := &communities.Community{
97 DID: communityDID,
98 Handle: handle,
99 Name: "update-test",
100 DisplayName: "Original Name",
101 Description: "Original description",
102 OwnerDID: "did:web:coves.local",
103 CreatedByDID: "did:plc:user123",
104 HostedByDID: "did:web:coves.local",
105 Visibility: "public",
106 AllowExternalDiscovery: true,
107 CreatedAt: time.Now(),
108 UpdatedAt: time.Now(),
109 }
110
111 if _, err := repo.Create(ctx, initialCommunity); err != nil {
112 t.Fatalf("Failed to create initial community: %v", err)
113 }
114
115 // Simulate update event
116 updateEvent := &jetstream.JetstreamEvent{
117 Did: communityDID,
118 TimeUS: time.Now().UnixMicro(),
119 Kind: "commit",
120 Commit: &jetstream.CommitEvent{
121 Rev: "rev124",
122 Operation: "update",
123 Collection: "social.coves.community.profile",
124 RKey: "self",
125 CID: "bafy456def",
126 Record: map[string]interface{}{
127 "did": communityDID, // Community's unique DID
128 "handle": handle,
129 "name": "update-test",
130 "displayName": "Updated Name",
131 "description": "Updated description",
132 "owner": "did:web:coves.local",
133 "createdBy": "did:plc:user123",
134 "hostedBy": "did:web:coves.local",
135 "visibility": "unlisted",
136 "federation": map[string]interface{}{
137 "allowExternalDiscovery": false,
138 },
139 "memberCount": 5,
140 "subscriberCount": 10,
141 "createdAt": time.Now().Format(time.RFC3339),
142 },
143 },
144 }
145
146 // Handle the update
147 if err := consumer.HandleEvent(ctx, updateEvent); err != nil {
148 t.Fatalf("Failed to handle update event: %v", err)
149 }
150
151 // Verify community was updated
152 updated, err := repo.GetByDID(ctx, communityDID)
153 if err != nil {
154 t.Fatalf("Failed to get updated community: %v", err)
155 }
156
157 if updated.DisplayName != "Updated Name" {
158 t.Errorf("Expected DisplayName 'Updated Name', got %s", updated.DisplayName)
159 }
160 if updated.Description != "Updated description" {
161 t.Errorf("Expected Description 'Updated description', got %s", updated.Description)
162 }
163 if updated.Visibility != "unlisted" {
164 t.Errorf("Expected Visibility 'unlisted', got %s", updated.Visibility)
165 }
166 if updated.AllowExternalDiscovery {
167 t.Error("Expected AllowExternalDiscovery to be false")
168 }
169 })
170
171 t.Run("deletes community", func(t *testing.T) {
172 communityDID, err := didGen.GenerateCommunityDID()
173 if err != nil {
174 t.Fatalf("Failed to generate community DID: %v", err)
175 }
176 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
177
178 // Create community to delete
179 community := &communities.Community{
180 DID: communityDID,
181 Handle: fmt.Sprintf("!delete-test-%s@coves.local", uniqueSuffix),
182 Name: "delete-test",
183 OwnerDID: "did:web:coves.local",
184 CreatedByDID: "did:plc:user123",
185 HostedByDID: "did:web:coves.local",
186 Visibility: "public",
187 CreatedAt: time.Now(),
188 UpdatedAt: time.Now(),
189 }
190
191 if _, err := repo.Create(ctx, community); err != nil {
192 t.Fatalf("Failed to create community: %v", err)
193 }
194
195 // Simulate delete event
196 deleteEvent := &jetstream.JetstreamEvent{
197 Did: communityDID,
198 TimeUS: time.Now().UnixMicro(),
199 Kind: "commit",
200 Commit: &jetstream.CommitEvent{
201 Rev: "rev125",
202 Operation: "delete",
203 Collection: "social.coves.community.profile",
204 RKey: "self",
205 },
206 }
207
208 // Handle the delete
209 if err := consumer.HandleEvent(ctx, deleteEvent); err != nil {
210 t.Fatalf("Failed to handle delete event: %v", err)
211 }
212
213 // Verify community was deleted
214 if _, err := repo.GetByDID(ctx, communityDID); err != communities.ErrCommunityNotFound {
215 t.Errorf("Expected ErrCommunityNotFound, got: %v", err)
216 }
217 })
218}
219
220func TestCommunityConsumer_HandleSubscription(t *testing.T) {
221 db := setupTestDB(t)
222 defer func() {
223 if err := db.Close(); err != nil {
224 t.Logf("Failed to close database: %v", err)
225 }
226 }()
227
228 repo := postgres.NewCommunityRepository(db)
229 consumer := jetstream.NewCommunityEventConsumer(repo)
230 didGen := did.NewGenerator(true, "https://plc.directory")
231 ctx := context.Background()
232
233 t.Run("creates subscription from event", func(t *testing.T) {
234 // Create a community first
235 communityDID, err := didGen.GenerateCommunityDID()
236 if err != nil {
237 t.Fatalf("Failed to generate community DID: %v", err)
238 }
239 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
240
241 community := &communities.Community{
242 DID: communityDID,
243 Handle: fmt.Sprintf("!sub-test-%s@coves.local", uniqueSuffix),
244 Name: "sub-test",
245 OwnerDID: "did:web:coves.local",
246 CreatedByDID: "did:plc:user123",
247 HostedByDID: "did:web:coves.local",
248 Visibility: "public",
249 CreatedAt: time.Now(),
250 UpdatedAt: time.Now(),
251 }
252
253 if _, err := repo.Create(ctx, community); err != nil {
254 t.Fatalf("Failed to create community: %v", err)
255 }
256
257 // Simulate subscription event
258 userDID := "did:plc:subscriber123"
259 subEvent := &jetstream.JetstreamEvent{
260 Did: userDID,
261 TimeUS: time.Now().UnixMicro(),
262 Kind: "commit",
263 Commit: &jetstream.CommitEvent{
264 Rev: "rev200",
265 Operation: "create",
266 Collection: "social.coves.community.subscribe",
267 RKey: "sub123",
268 CID: "bafy789ghi",
269 Record: map[string]interface{}{
270 "community": communityDID,
271 },
272 },
273 }
274
275 // Handle the subscription
276 if err := consumer.HandleEvent(ctx, subEvent); err != nil {
277 t.Fatalf("Failed to handle subscription event: %v", err)
278 }
279
280 // Verify subscription was created
281 subscription, err := repo.GetSubscription(ctx, userDID, communityDID)
282 if err != nil {
283 t.Fatalf("Failed to get subscription: %v", err)
284 }
285
286 if subscription.UserDID != userDID {
287 t.Errorf("Expected UserDID %s, got %s", userDID, subscription.UserDID)
288 }
289 if subscription.CommunityDID != communityDID {
290 t.Errorf("Expected CommunityDID %s, got %s", communityDID, subscription.CommunityDID)
291 }
292
293 // Verify subscriber count was incremented
294 updated, err := repo.GetByDID(ctx, communityDID)
295 if err != nil {
296 t.Fatalf("Failed to get community: %v", err)
297 }
298
299 if updated.SubscriberCount != 1 {
300 t.Errorf("Expected SubscriberCount 1, got %d", updated.SubscriberCount)
301 }
302 })
303}
304
305func TestCommunityConsumer_IgnoresNonCommunityEvents(t *testing.T) {
306 db := setupTestDB(t)
307 defer func() {
308 if err := db.Close(); err != nil {
309 t.Logf("Failed to close database: %v", err)
310 }
311 }()
312
313 repo := postgres.NewCommunityRepository(db)
314 consumer := jetstream.NewCommunityEventConsumer(repo)
315 ctx := context.Background()
316
317 t.Run("ignores identity events", func(t *testing.T) {
318 event := &jetstream.JetstreamEvent{
319 Did: "did:plc:user123",
320 TimeUS: time.Now().UnixMicro(),
321 Kind: "identity",
322 Identity: &jetstream.IdentityEvent{
323 Did: "did:plc:user123",
324 Handle: "alice.bsky.social",
325 },
326 }
327
328 err := consumer.HandleEvent(ctx, event)
329 if err != nil {
330 t.Errorf("Expected no error for identity event, got: %v", err)
331 }
332 })
333
334 t.Run("ignores non-community collections", func(t *testing.T) {
335 event := &jetstream.JetstreamEvent{
336 Did: "did:plc:user123",
337 TimeUS: time.Now().UnixMicro(),
338 Kind: "commit",
339 Commit: &jetstream.CommitEvent{
340 Rev: "rev300",
341 Operation: "create",
342 Collection: "app.bsky.feed.post",
343 RKey: "post123",
344 Record: map[string]interface{}{
345 "text": "Hello world",
346 },
347 },
348 }
349
350 err := consumer.HandleEvent(ctx, event)
351 if err != nil {
352 t.Errorf("Expected no error for non-community event, got: %v", err)
353 }
354 })
355}