A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/jetstream"
5 "Coves/internal/core/communities"
6 "Coves/internal/db/postgres"
7 "context"
8 "fmt"
9 "testing"
10 "time"
11)
12
13func TestCommunityConsumer_HandleCommunityProfile(t *testing.T) {
14 db := setupTestDB(t)
15 defer func() {
16 if err := db.Close(); err != nil {
17 t.Logf("Failed to close database: %v", err)
18 }
19 }()
20
21 repo := postgres.NewCommunityRepository(db)
22 // Skip verification in tests
23 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true)
24 ctx := context.Background()
25
26 t.Run("creates community from firehose event", func(t *testing.T) {
27 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
28 communityDID := generateTestDID(uniqueSuffix)
29
30 // Simulate a Jetstream commit event
31 event := &jetstream.JetstreamEvent{
32 Did: communityDID,
33 TimeUS: time.Now().UnixMicro(),
34 Kind: "commit",
35 Commit: &jetstream.CommitEvent{
36 Rev: "rev123",
37 Operation: "create",
38 Collection: "social.coves.community.profile",
39 RKey: "self",
40 CID: "bafy123abc",
41 Record: map[string]interface{}{
42 "did": communityDID, // Community's unique DID
43 "handle": fmt.Sprintf("!test-community-%s@coves.local", uniqueSuffix),
44 "name": "test-community",
45 "displayName": "Test Community",
46 "description": "A test community",
47 "owner": "did:web:coves.local",
48 "createdBy": "did:plc:user123",
49 "hostedBy": "did:web:coves.local",
50 "visibility": "public",
51 "federation": map[string]interface{}{
52 "allowExternalDiscovery": true,
53 },
54 "memberCount": 0,
55 "subscriberCount": 0,
56 "createdAt": time.Now().Format(time.RFC3339),
57 },
58 },
59 }
60
61 // Handle the event
62 if err := consumer.HandleEvent(ctx, event); err != nil {
63 t.Fatalf("Failed to handle event: %v", err)
64 }
65
66 // Verify community was indexed
67 community, err := repo.GetByDID(ctx, communityDID)
68 if err != nil {
69 t.Fatalf("Failed to get indexed community: %v", err)
70 }
71
72 if community.DID != communityDID {
73 t.Errorf("Expected DID %s, got %s", communityDID, community.DID)
74 }
75 if community.DisplayName != "Test Community" {
76 t.Errorf("Expected DisplayName 'Test Community', got %s", community.DisplayName)
77 }
78 if community.Visibility != "public" {
79 t.Errorf("Expected Visibility 'public', got %s", community.Visibility)
80 }
81 })
82
83 t.Run("updates existing community", func(t *testing.T) {
84 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
85 communityDID := generateTestDID(uniqueSuffix)
86 handle := fmt.Sprintf("!update-test-%s@coves.local", uniqueSuffix)
87
88 // Create initial community
89 initialCommunity := &communities.Community{
90 DID: communityDID,
91 Handle: handle,
92 Name: "update-test",
93 DisplayName: "Original Name",
94 Description: "Original description",
95 OwnerDID: "did:web:coves.local",
96 CreatedByDID: "did:plc:user123",
97 HostedByDID: "did:web:coves.local",
98 Visibility: "public",
99 AllowExternalDiscovery: true,
100 CreatedAt: time.Now(),
101 UpdatedAt: time.Now(),
102 }
103
104 if _, err := repo.Create(ctx, initialCommunity); err != nil {
105 t.Fatalf("Failed to create initial community: %v", err)
106 }
107
108 // Simulate update event
109 updateEvent := &jetstream.JetstreamEvent{
110 Did: communityDID,
111 TimeUS: time.Now().UnixMicro(),
112 Kind: "commit",
113 Commit: &jetstream.CommitEvent{
114 Rev: "rev124",
115 Operation: "update",
116 Collection: "social.coves.community.profile",
117 RKey: "self",
118 CID: "bafy456def",
119 Record: map[string]interface{}{
120 "did": communityDID, // Community's unique DID
121 "handle": handle,
122 "name": "update-test",
123 "displayName": "Updated Name",
124 "description": "Updated description",
125 "owner": "did:web:coves.local",
126 "createdBy": "did:plc:user123",
127 "hostedBy": "did:web:coves.local",
128 "visibility": "unlisted",
129 "federation": map[string]interface{}{
130 "allowExternalDiscovery": false,
131 },
132 "memberCount": 5,
133 "subscriberCount": 10,
134 "createdAt": time.Now().Format(time.RFC3339),
135 },
136 },
137 }
138
139 // Handle the update
140 if err := consumer.HandleEvent(ctx, updateEvent); err != nil {
141 t.Fatalf("Failed to handle update event: %v", err)
142 }
143
144 // Verify community was updated
145 updated, err := repo.GetByDID(ctx, communityDID)
146 if err != nil {
147 t.Fatalf("Failed to get updated community: %v", err)
148 }
149
150 if updated.DisplayName != "Updated Name" {
151 t.Errorf("Expected DisplayName 'Updated Name', got %s", updated.DisplayName)
152 }
153 if updated.Description != "Updated description" {
154 t.Errorf("Expected Description 'Updated description', got %s", updated.Description)
155 }
156 if updated.Visibility != "unlisted" {
157 t.Errorf("Expected Visibility 'unlisted', got %s", updated.Visibility)
158 }
159 if updated.AllowExternalDiscovery {
160 t.Error("Expected AllowExternalDiscovery to be false")
161 }
162 })
163
164 t.Run("deletes community", func(t *testing.T) {
165 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
166 communityDID := generateTestDID(uniqueSuffix)
167
168 // Create community to delete
169 community := &communities.Community{
170 DID: communityDID,
171 Handle: fmt.Sprintf("!delete-test-%s@coves.local", uniqueSuffix),
172 Name: "delete-test",
173 OwnerDID: "did:web:coves.local",
174 CreatedByDID: "did:plc:user123",
175 HostedByDID: "did:web:coves.local",
176 Visibility: "public",
177 CreatedAt: time.Now(),
178 UpdatedAt: time.Now(),
179 }
180
181 if _, err := repo.Create(ctx, community); err != nil {
182 t.Fatalf("Failed to create community: %v", err)
183 }
184
185 // Simulate delete event
186 deleteEvent := &jetstream.JetstreamEvent{
187 Did: communityDID,
188 TimeUS: time.Now().UnixMicro(),
189 Kind: "commit",
190 Commit: &jetstream.CommitEvent{
191 Rev: "rev125",
192 Operation: "delete",
193 Collection: "social.coves.community.profile",
194 RKey: "self",
195 },
196 }
197
198 // Handle the delete
199 if err := consumer.HandleEvent(ctx, deleteEvent); err != nil {
200 t.Fatalf("Failed to handle delete event: %v", err)
201 }
202
203 // Verify community was deleted
204 if _, err := repo.GetByDID(ctx, communityDID); err != communities.ErrCommunityNotFound {
205 t.Errorf("Expected ErrCommunityNotFound, got: %v", err)
206 }
207 })
208}
209
210func TestCommunityConsumer_HandleSubscription(t *testing.T) {
211 db := setupTestDB(t)
212 defer func() {
213 if err := db.Close(); err != nil {
214 t.Logf("Failed to close database: %v", err)
215 }
216 }()
217
218 repo := postgres.NewCommunityRepository(db)
219 // Skip verification in tests
220 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true)
221 ctx := context.Background()
222
223 t.Run("creates subscription from event", func(t *testing.T) {
224 // Create a community first
225 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
226 communityDID := generateTestDID(uniqueSuffix)
227
228 community := &communities.Community{
229 DID: communityDID,
230 Handle: fmt.Sprintf("!sub-test-%s@coves.local", uniqueSuffix),
231 Name: "sub-test",
232 OwnerDID: "did:web:coves.local",
233 CreatedByDID: "did:plc:user123",
234 HostedByDID: "did:web:coves.local",
235 Visibility: "public",
236 CreatedAt: time.Now(),
237 UpdatedAt: time.Now(),
238 }
239
240 if _, err := repo.Create(ctx, community); err != nil {
241 t.Fatalf("Failed to create community: %v", err)
242 }
243
244 // Simulate subscription event
245 // IMPORTANT: Use correct collection name (record type, not XRPC procedure)
246 userDID := "did:plc:subscriber123"
247 subEvent := &jetstream.JetstreamEvent{
248 Did: userDID,
249 TimeUS: time.Now().UnixMicro(),
250 Kind: "commit",
251 Commit: &jetstream.CommitEvent{
252 Rev: "rev200",
253 Operation: "create",
254 Collection: "social.coves.community.subscription", // Updated to communities namespace
255 RKey: "sub123",
256 CID: "bafy789ghi",
257 Record: map[string]interface{}{
258 "subject": communityDID, // Using 'subject' per atProto conventions
259 "contentVisibility": 3,
260 "createdAt": time.Now().Format(time.RFC3339),
261 },
262 },
263 }
264
265 // Handle the subscription
266 if err := consumer.HandleEvent(ctx, subEvent); err != nil {
267 t.Fatalf("Failed to handle subscription event: %v", err)
268 }
269
270 // Verify subscription was created
271 subscription, err := repo.GetSubscription(ctx, userDID, communityDID)
272 if err != nil {
273 t.Fatalf("Failed to get subscription: %v", err)
274 }
275
276 if subscription.UserDID != userDID {
277 t.Errorf("Expected UserDID %s, got %s", userDID, subscription.UserDID)
278 }
279 if subscription.CommunityDID != communityDID {
280 t.Errorf("Expected CommunityDID %s, got %s", communityDID, subscription.CommunityDID)
281 }
282
283 // Verify subscriber count was incremented
284 updated, err := repo.GetByDID(ctx, communityDID)
285 if err != nil {
286 t.Fatalf("Failed to get community: %v", err)
287 }
288
289 if updated.SubscriberCount != 1 {
290 t.Errorf("Expected SubscriberCount 1, got %d", updated.SubscriberCount)
291 }
292 })
293}
294
295func TestCommunityConsumer_IgnoresNonCommunityEvents(t *testing.T) {
296 db := setupTestDB(t)
297 defer func() {
298 if err := db.Close(); err != nil {
299 t.Logf("Failed to close database: %v", err)
300 }
301 }()
302
303 repo := postgres.NewCommunityRepository(db)
304 // Skip verification in tests
305 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true)
306 ctx := context.Background()
307
308 t.Run("ignores identity events", func(t *testing.T) {
309 event := &jetstream.JetstreamEvent{
310 Did: "did:plc:user123",
311 TimeUS: time.Now().UnixMicro(),
312 Kind: "identity",
313 Identity: &jetstream.IdentityEvent{
314 Did: "did:plc:user123",
315 Handle: "alice.bsky.social",
316 },
317 }
318
319 err := consumer.HandleEvent(ctx, event)
320 if err != nil {
321 t.Errorf("Expected no error for identity event, got: %v", err)
322 }
323 })
324
325 t.Run("ignores non-community collections", func(t *testing.T) {
326 event := &jetstream.JetstreamEvent{
327 Did: "did:plc:user123",
328 TimeUS: time.Now().UnixMicro(),
329 Kind: "commit",
330 Commit: &jetstream.CommitEvent{
331 Rev: "rev300",
332 Operation: "create",
333 Collection: "app.bsky.feed.post",
334 RKey: "post123",
335 Record: map[string]interface{}{
336 "text": "Hello world",
337 },
338 },
339 }
340
341 err := consumer.HandleEvent(ctx, event)
342 if err != nil {
343 t.Errorf("Expected no error for non-community event, got: %v", err)
344 }
345 })
346}