A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/jetstream"
5 "Coves/internal/core/communities"
6 "Coves/internal/db/postgres"
7 "context"
8 "fmt"
9 "testing"
10 "time"
11)
12
13func TestCommunityConsumer_HandleCommunityProfile(t *testing.T) {
14 db := setupTestDB(t)
15 defer func() {
16 if err := db.Close(); err != nil {
17 t.Logf("Failed to close database: %v", err)
18 }
19 }()
20
21 repo := postgres.NewCommunityRepository(db)
22 consumer := jetstream.NewCommunityEventConsumer(repo)
23 ctx := context.Background()
24
25 t.Run("creates community from firehose event", func(t *testing.T) {
26 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
27 communityDID := generateTestDID(uniqueSuffix)
28
29 // Simulate a Jetstream commit event
30 event := &jetstream.JetstreamEvent{
31 Did: communityDID,
32 TimeUS: time.Now().UnixMicro(),
33 Kind: "commit",
34 Commit: &jetstream.CommitEvent{
35 Rev: "rev123",
36 Operation: "create",
37 Collection: "social.coves.community.profile",
38 RKey: "self",
39 CID: "bafy123abc",
40 Record: map[string]interface{}{
41 "did": communityDID, // Community's unique DID
42 "handle": fmt.Sprintf("!test-community-%s@coves.local", uniqueSuffix),
43 "name": "test-community",
44 "displayName": "Test Community",
45 "description": "A test community",
46 "owner": "did:web:coves.local",
47 "createdBy": "did:plc:user123",
48 "hostedBy": "did:web:coves.local",
49 "visibility": "public",
50 "federation": map[string]interface{}{
51 "allowExternalDiscovery": true,
52 },
53 "memberCount": 0,
54 "subscriberCount": 0,
55 "createdAt": time.Now().Format(time.RFC3339),
56 },
57 },
58 }
59
60 // Handle the event
61 if err := consumer.HandleEvent(ctx, event); err != nil {
62 t.Fatalf("Failed to handle event: %v", err)
63 }
64
65 // Verify community was indexed
66 community, err := repo.GetByDID(ctx, communityDID)
67 if err != nil {
68 t.Fatalf("Failed to get indexed community: %v", err)
69 }
70
71 if community.DID != communityDID {
72 t.Errorf("Expected DID %s, got %s", communityDID, community.DID)
73 }
74 if community.DisplayName != "Test Community" {
75 t.Errorf("Expected DisplayName 'Test Community', got %s", community.DisplayName)
76 }
77 if community.Visibility != "public" {
78 t.Errorf("Expected Visibility 'public', got %s", community.Visibility)
79 }
80 })
81
82 t.Run("updates existing community", func(t *testing.T) {
83 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
84 communityDID := generateTestDID(uniqueSuffix)
85 handle := fmt.Sprintf("!update-test-%s@coves.local", uniqueSuffix)
86
87 // Create initial community
88 initialCommunity := &communities.Community{
89 DID: communityDID,
90 Handle: handle,
91 Name: "update-test",
92 DisplayName: "Original Name",
93 Description: "Original description",
94 OwnerDID: "did:web:coves.local",
95 CreatedByDID: "did:plc:user123",
96 HostedByDID: "did:web:coves.local",
97 Visibility: "public",
98 AllowExternalDiscovery: true,
99 CreatedAt: time.Now(),
100 UpdatedAt: time.Now(),
101 }
102
103 if _, err := repo.Create(ctx, initialCommunity); err != nil {
104 t.Fatalf("Failed to create initial community: %v", err)
105 }
106
107 // Simulate update event
108 updateEvent := &jetstream.JetstreamEvent{
109 Did: communityDID,
110 TimeUS: time.Now().UnixMicro(),
111 Kind: "commit",
112 Commit: &jetstream.CommitEvent{
113 Rev: "rev124",
114 Operation: "update",
115 Collection: "social.coves.community.profile",
116 RKey: "self",
117 CID: "bafy456def",
118 Record: map[string]interface{}{
119 "did": communityDID, // Community's unique DID
120 "handle": handle,
121 "name": "update-test",
122 "displayName": "Updated Name",
123 "description": "Updated description",
124 "owner": "did:web:coves.local",
125 "createdBy": "did:plc:user123",
126 "hostedBy": "did:web:coves.local",
127 "visibility": "unlisted",
128 "federation": map[string]interface{}{
129 "allowExternalDiscovery": false,
130 },
131 "memberCount": 5,
132 "subscriberCount": 10,
133 "createdAt": time.Now().Format(time.RFC3339),
134 },
135 },
136 }
137
138 // Handle the update
139 if err := consumer.HandleEvent(ctx, updateEvent); err != nil {
140 t.Fatalf("Failed to handle update event: %v", err)
141 }
142
143 // Verify community was updated
144 updated, err := repo.GetByDID(ctx, communityDID)
145 if err != nil {
146 t.Fatalf("Failed to get updated community: %v", err)
147 }
148
149 if updated.DisplayName != "Updated Name" {
150 t.Errorf("Expected DisplayName 'Updated Name', got %s", updated.DisplayName)
151 }
152 if updated.Description != "Updated description" {
153 t.Errorf("Expected Description 'Updated description', got %s", updated.Description)
154 }
155 if updated.Visibility != "unlisted" {
156 t.Errorf("Expected Visibility 'unlisted', got %s", updated.Visibility)
157 }
158 if updated.AllowExternalDiscovery {
159 t.Error("Expected AllowExternalDiscovery to be false")
160 }
161 })
162
163 t.Run("deletes community", func(t *testing.T) {
164 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
165 communityDID := generateTestDID(uniqueSuffix)
166
167 // Create community to delete
168 community := &communities.Community{
169 DID: communityDID,
170 Handle: fmt.Sprintf("!delete-test-%s@coves.local", uniqueSuffix),
171 Name: "delete-test",
172 OwnerDID: "did:web:coves.local",
173 CreatedByDID: "did:plc:user123",
174 HostedByDID: "did:web:coves.local",
175 Visibility: "public",
176 CreatedAt: time.Now(),
177 UpdatedAt: time.Now(),
178 }
179
180 if _, err := repo.Create(ctx, community); err != nil {
181 t.Fatalf("Failed to create community: %v", err)
182 }
183
184 // Simulate delete event
185 deleteEvent := &jetstream.JetstreamEvent{
186 Did: communityDID,
187 TimeUS: time.Now().UnixMicro(),
188 Kind: "commit",
189 Commit: &jetstream.CommitEvent{
190 Rev: "rev125",
191 Operation: "delete",
192 Collection: "social.coves.community.profile",
193 RKey: "self",
194 },
195 }
196
197 // Handle the delete
198 if err := consumer.HandleEvent(ctx, deleteEvent); err != nil {
199 t.Fatalf("Failed to handle delete event: %v", err)
200 }
201
202 // Verify community was deleted
203 if _, err := repo.GetByDID(ctx, communityDID); err != communities.ErrCommunityNotFound {
204 t.Errorf("Expected ErrCommunityNotFound, got: %v", err)
205 }
206 })
207}
208
209func TestCommunityConsumer_HandleSubscription(t *testing.T) {
210 db := setupTestDB(t)
211 defer func() {
212 if err := db.Close(); err != nil {
213 t.Logf("Failed to close database: %v", err)
214 }
215 }()
216
217 repo := postgres.NewCommunityRepository(db)
218 consumer := jetstream.NewCommunityEventConsumer(repo)
219 ctx := context.Background()
220
221 t.Run("creates subscription from event", func(t *testing.T) {
222 // Create a community first
223 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
224 communityDID := generateTestDID(uniqueSuffix)
225
226 community := &communities.Community{
227 DID: communityDID,
228 Handle: fmt.Sprintf("!sub-test-%s@coves.local", uniqueSuffix),
229 Name: "sub-test",
230 OwnerDID: "did:web:coves.local",
231 CreatedByDID: "did:plc:user123",
232 HostedByDID: "did:web:coves.local",
233 Visibility: "public",
234 CreatedAt: time.Now(),
235 UpdatedAt: time.Now(),
236 }
237
238 if _, err := repo.Create(ctx, community); err != nil {
239 t.Fatalf("Failed to create community: %v", err)
240 }
241
242 // Simulate subscription event
243 userDID := "did:plc:subscriber123"
244 subEvent := &jetstream.JetstreamEvent{
245 Did: userDID,
246 TimeUS: time.Now().UnixMicro(),
247 Kind: "commit",
248 Commit: &jetstream.CommitEvent{
249 Rev: "rev200",
250 Operation: "create",
251 Collection: "social.coves.community.subscribe",
252 RKey: "sub123",
253 CID: "bafy789ghi",
254 Record: map[string]interface{}{
255 "community": communityDID,
256 },
257 },
258 }
259
260 // Handle the subscription
261 if err := consumer.HandleEvent(ctx, subEvent); err != nil {
262 t.Fatalf("Failed to handle subscription event: %v", err)
263 }
264
265 // Verify subscription was created
266 subscription, err := repo.GetSubscription(ctx, userDID, communityDID)
267 if err != nil {
268 t.Fatalf("Failed to get subscription: %v", err)
269 }
270
271 if subscription.UserDID != userDID {
272 t.Errorf("Expected UserDID %s, got %s", userDID, subscription.UserDID)
273 }
274 if subscription.CommunityDID != communityDID {
275 t.Errorf("Expected CommunityDID %s, got %s", communityDID, subscription.CommunityDID)
276 }
277
278 // Verify subscriber count was incremented
279 updated, err := repo.GetByDID(ctx, communityDID)
280 if err != nil {
281 t.Fatalf("Failed to get community: %v", err)
282 }
283
284 if updated.SubscriberCount != 1 {
285 t.Errorf("Expected SubscriberCount 1, got %d", updated.SubscriberCount)
286 }
287 })
288}
289
290func TestCommunityConsumer_IgnoresNonCommunityEvents(t *testing.T) {
291 db := setupTestDB(t)
292 defer func() {
293 if err := db.Close(); err != nil {
294 t.Logf("Failed to close database: %v", err)
295 }
296 }()
297
298 repo := postgres.NewCommunityRepository(db)
299 consumer := jetstream.NewCommunityEventConsumer(repo)
300 ctx := context.Background()
301
302 t.Run("ignores identity events", func(t *testing.T) {
303 event := &jetstream.JetstreamEvent{
304 Did: "did:plc:user123",
305 TimeUS: time.Now().UnixMicro(),
306 Kind: "identity",
307 Identity: &jetstream.IdentityEvent{
308 Did: "did:plc:user123",
309 Handle: "alice.bsky.social",
310 },
311 }
312
313 err := consumer.HandleEvent(ctx, event)
314 if err != nil {
315 t.Errorf("Expected no error for identity event, got: %v", err)
316 }
317 })
318
319 t.Run("ignores non-community collections", func(t *testing.T) {
320 event := &jetstream.JetstreamEvent{
321 Did: "did:plc:user123",
322 TimeUS: time.Now().UnixMicro(),
323 Kind: "commit",
324 Commit: &jetstream.CommitEvent{
325 Rev: "rev300",
326 Operation: "create",
327 Collection: "app.bsky.feed.post",
328 RKey: "post123",
329 Record: map[string]interface{}{
330 "text": "Hello world",
331 },
332 },
333 }
334
335 err := consumer.HandleEvent(ctx, event)
336 if err != nil {
337 t.Errorf("Expected no error for non-community event, got: %v", err)
338 }
339 })
340}