A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/jetstream"
5 "Coves/internal/core/communities"
6 "Coves/internal/db/postgres"
7 "context"
8 "fmt"
9 "testing"
10 "time"
11)
12
13func TestCommunityConsumer_HandleCommunityProfile(t *testing.T) {
14 db := setupTestDB(t)
15 defer func() {
16 if err := db.Close(); err != nil {
17 t.Logf("Failed to close database: %v", err)
18 }
19 }()
20
21 repo := postgres.NewCommunityRepository(db)
22 consumer := jetstream.NewCommunityEventConsumer(repo)
23 ctx := context.Background()
24
25 t.Run("creates community from firehose event", func(t *testing.T) {
26 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
27 communityDID := generateTestDID(uniqueSuffix)
28
29 // Simulate a Jetstream commit event
30 event := &jetstream.JetstreamEvent{
31 Did: communityDID,
32 TimeUS: time.Now().UnixMicro(),
33 Kind: "commit",
34 Commit: &jetstream.CommitEvent{
35 Rev: "rev123",
36 Operation: "create",
37 Collection: "social.coves.community.profile",
38 RKey: "self",
39 CID: "bafy123abc",
40 Record: map[string]interface{}{
41 "did": communityDID, // Community's unique DID
42 "handle": fmt.Sprintf("!test-community-%s@coves.local", uniqueSuffix),
43 "name": "test-community",
44 "displayName": "Test Community",
45 "description": "A test community",
46 "owner": "did:web:coves.local",
47 "createdBy": "did:plc:user123",
48 "hostedBy": "did:web:coves.local",
49 "visibility": "public",
50 "federation": map[string]interface{}{
51 "allowExternalDiscovery": true,
52 },
53 "memberCount": 0,
54 "subscriberCount": 0,
55 "createdAt": time.Now().Format(time.RFC3339),
56 },
57 },
58 }
59
60 // Handle the event
61 if err := consumer.HandleEvent(ctx, event); err != nil {
62 t.Fatalf("Failed to handle event: %v", err)
63 }
64
65 // Verify community was indexed
66 community, err := repo.GetByDID(ctx, communityDID)
67 if err != nil {
68 t.Fatalf("Failed to get indexed community: %v", err)
69 }
70
71 if community.DID != communityDID {
72 t.Errorf("Expected DID %s, got %s", communityDID, community.DID)
73 }
74 if community.DisplayName != "Test Community" {
75 t.Errorf("Expected DisplayName 'Test Community', got %s", community.DisplayName)
76 }
77 if community.Visibility != "public" {
78 t.Errorf("Expected Visibility 'public', got %s", community.Visibility)
79 }
80 })
81
82 t.Run("updates existing community", func(t *testing.T) {
83 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
84 communityDID := generateTestDID(uniqueSuffix)
85 handle := fmt.Sprintf("!update-test-%s@coves.local", uniqueSuffix)
86
87 // Create initial community
88 initialCommunity := &communities.Community{
89 DID: communityDID,
90 Handle: handle,
91 Name: "update-test",
92 DisplayName: "Original Name",
93 Description: "Original description",
94 OwnerDID: "did:web:coves.local",
95 CreatedByDID: "did:plc:user123",
96 HostedByDID: "did:web:coves.local",
97 Visibility: "public",
98 AllowExternalDiscovery: true,
99 CreatedAt: time.Now(),
100 UpdatedAt: time.Now(),
101 }
102
103 if _, err := repo.Create(ctx, initialCommunity); err != nil {
104 t.Fatalf("Failed to create initial community: %v", err)
105 }
106
107 // Simulate update event
108 updateEvent := &jetstream.JetstreamEvent{
109 Did: communityDID,
110 TimeUS: time.Now().UnixMicro(),
111 Kind: "commit",
112 Commit: &jetstream.CommitEvent{
113 Rev: "rev124",
114 Operation: "update",
115 Collection: "social.coves.community.profile",
116 RKey: "self",
117 CID: "bafy456def",
118 Record: map[string]interface{}{
119 "did": communityDID, // Community's unique DID
120 "handle": handle,
121 "name": "update-test",
122 "displayName": "Updated Name",
123 "description": "Updated description",
124 "owner": "did:web:coves.local",
125 "createdBy": "did:plc:user123",
126 "hostedBy": "did:web:coves.local",
127 "visibility": "unlisted",
128 "federation": map[string]interface{}{
129 "allowExternalDiscovery": false,
130 },
131 "memberCount": 5,
132 "subscriberCount": 10,
133 "createdAt": time.Now().Format(time.RFC3339),
134 },
135 },
136 }
137
138 // Handle the update
139 if err := consumer.HandleEvent(ctx, updateEvent); err != nil {
140 t.Fatalf("Failed to handle update event: %v", err)
141 }
142
143 // Verify community was updated
144 updated, err := repo.GetByDID(ctx, communityDID)
145 if err != nil {
146 t.Fatalf("Failed to get updated community: %v", err)
147 }
148
149 if updated.DisplayName != "Updated Name" {
150 t.Errorf("Expected DisplayName 'Updated Name', got %s", updated.DisplayName)
151 }
152 if updated.Description != "Updated description" {
153 t.Errorf("Expected Description 'Updated description', got %s", updated.Description)
154 }
155 if updated.Visibility != "unlisted" {
156 t.Errorf("Expected Visibility 'unlisted', got %s", updated.Visibility)
157 }
158 if updated.AllowExternalDiscovery {
159 t.Error("Expected AllowExternalDiscovery to be false")
160 }
161 })
162
163 t.Run("deletes community", func(t *testing.T) {
164 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
165 communityDID := generateTestDID(uniqueSuffix)
166
167 // Create community to delete
168 community := &communities.Community{
169 DID: communityDID,
170 Handle: fmt.Sprintf("!delete-test-%s@coves.local", uniqueSuffix),
171 Name: "delete-test",
172 OwnerDID: "did:web:coves.local",
173 CreatedByDID: "did:plc:user123",
174 HostedByDID: "did:web:coves.local",
175 Visibility: "public",
176 CreatedAt: time.Now(),
177 UpdatedAt: time.Now(),
178 }
179
180 if _, err := repo.Create(ctx, community); err != nil {
181 t.Fatalf("Failed to create community: %v", err)
182 }
183
184 // Simulate delete event
185 deleteEvent := &jetstream.JetstreamEvent{
186 Did: communityDID,
187 TimeUS: time.Now().UnixMicro(),
188 Kind: "commit",
189 Commit: &jetstream.CommitEvent{
190 Rev: "rev125",
191 Operation: "delete",
192 Collection: "social.coves.community.profile",
193 RKey: "self",
194 },
195 }
196
197 // Handle the delete
198 if err := consumer.HandleEvent(ctx, deleteEvent); err != nil {
199 t.Fatalf("Failed to handle delete event: %v", err)
200 }
201
202 // Verify community was deleted
203 if _, err := repo.GetByDID(ctx, communityDID); err != communities.ErrCommunityNotFound {
204 t.Errorf("Expected ErrCommunityNotFound, got: %v", err)
205 }
206 })
207}
208
209func TestCommunityConsumer_HandleSubscription(t *testing.T) {
210 db := setupTestDB(t)
211 defer func() {
212 if err := db.Close(); err != nil {
213 t.Logf("Failed to close database: %v", err)
214 }
215 }()
216
217 repo := postgres.NewCommunityRepository(db)
218 consumer := jetstream.NewCommunityEventConsumer(repo)
219 ctx := context.Background()
220
221 t.Run("creates subscription from event", func(t *testing.T) {
222 // Create a community first
223 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
224 communityDID := generateTestDID(uniqueSuffix)
225
226 community := &communities.Community{
227 DID: communityDID,
228 Handle: fmt.Sprintf("!sub-test-%s@coves.local", uniqueSuffix),
229 Name: "sub-test",
230 OwnerDID: "did:web:coves.local",
231 CreatedByDID: "did:plc:user123",
232 HostedByDID: "did:web:coves.local",
233 Visibility: "public",
234 CreatedAt: time.Now(),
235 UpdatedAt: time.Now(),
236 }
237
238 if _, err := repo.Create(ctx, community); err != nil {
239 t.Fatalf("Failed to create community: %v", err)
240 }
241
242 // Simulate subscription event
243 // IMPORTANT: Use correct collection name (record type, not XRPC procedure)
244 userDID := "did:plc:subscriber123"
245 subEvent := &jetstream.JetstreamEvent{
246 Did: userDID,
247 TimeUS: time.Now().UnixMicro(),
248 Kind: "commit",
249 Commit: &jetstream.CommitEvent{
250 Rev: "rev200",
251 Operation: "create",
252 Collection: "social.coves.community.subscription", // Updated to communities namespace
253 RKey: "sub123",
254 CID: "bafy789ghi",
255 Record: map[string]interface{}{
256 "subject": communityDID, // Using 'subject' per atProto conventions
257 "contentVisibility": 3,
258 "createdAt": time.Now().Format(time.RFC3339),
259 },
260 },
261 }
262
263 // Handle the subscription
264 if err := consumer.HandleEvent(ctx, subEvent); err != nil {
265 t.Fatalf("Failed to handle subscription event: %v", err)
266 }
267
268 // Verify subscription was created
269 subscription, err := repo.GetSubscription(ctx, userDID, communityDID)
270 if err != nil {
271 t.Fatalf("Failed to get subscription: %v", err)
272 }
273
274 if subscription.UserDID != userDID {
275 t.Errorf("Expected UserDID %s, got %s", userDID, subscription.UserDID)
276 }
277 if subscription.CommunityDID != communityDID {
278 t.Errorf("Expected CommunityDID %s, got %s", communityDID, subscription.CommunityDID)
279 }
280
281 // Verify subscriber count was incremented
282 updated, err := repo.GetByDID(ctx, communityDID)
283 if err != nil {
284 t.Fatalf("Failed to get community: %v", err)
285 }
286
287 if updated.SubscriberCount != 1 {
288 t.Errorf("Expected SubscriberCount 1, got %d", updated.SubscriberCount)
289 }
290 })
291}
292
293func TestCommunityConsumer_IgnoresNonCommunityEvents(t *testing.T) {
294 db := setupTestDB(t)
295 defer func() {
296 if err := db.Close(); err != nil {
297 t.Logf("Failed to close database: %v", err)
298 }
299 }()
300
301 repo := postgres.NewCommunityRepository(db)
302 consumer := jetstream.NewCommunityEventConsumer(repo)
303 ctx := context.Background()
304
305 t.Run("ignores identity events", func(t *testing.T) {
306 event := &jetstream.JetstreamEvent{
307 Did: "did:plc:user123",
308 TimeUS: time.Now().UnixMicro(),
309 Kind: "identity",
310 Identity: &jetstream.IdentityEvent{
311 Did: "did:plc:user123",
312 Handle: "alice.bsky.social",
313 },
314 }
315
316 err := consumer.HandleEvent(ctx, event)
317 if err != nil {
318 t.Errorf("Expected no error for identity event, got: %v", err)
319 }
320 })
321
322 t.Run("ignores non-community collections", func(t *testing.T) {
323 event := &jetstream.JetstreamEvent{
324 Did: "did:plc:user123",
325 TimeUS: time.Now().UnixMicro(),
326 Kind: "commit",
327 Commit: &jetstream.CommitEvent{
328 Rev: "rev300",
329 Operation: "create",
330 Collection: "app.bsky.feed.post",
331 RKey: "post123",
332 Record: map[string]interface{}{
333 "text": "Hello world",
334 },
335 },
336 }
337
338 err := consumer.HandleEvent(ctx, event)
339 if err != nil {
340 t.Errorf("Expected no error for non-community event, got: %v", err)
341 }
342 })
343}