A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "context"
5 "testing"
6 "time"
7
8 "Coves/internal/atproto/identity"
9 "Coves/internal/atproto/jetstream"
10 "Coves/internal/core/users"
11 "Coves/internal/db/postgres"
12)
13
14func TestUserIndexingFromJetstream(t *testing.T) {
15 db := setupTestDB(t)
16 defer db.Close()
17
18 // Wire up dependencies
19 userRepo := postgres.NewUserRepository(db)
20 resolver := identity.NewResolver(db, identity.DefaultConfig())
21 userService := users.NewUserService(userRepo, resolver, "http://localhost:3001")
22
23 ctx := context.Background()
24
25 t.Run("Index new user from identity event", func(t *testing.T) {
26 // Simulate an identity event from Jetstream
27 event := jetstream.JetstreamEvent{
28 Did: "did:plc:jetstream123",
29 Kind: "identity",
30 Identity: &jetstream.IdentityEvent{
31 Did: "did:plc:jetstream123",
32 Handle: "alice.jetstream.test",
33 Seq: 12345,
34 Time: time.Now().Format(time.RFC3339),
35 },
36 }
37
38 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
39
40 // Handle the event
41 err := consumer.HandleIdentityEventPublic(ctx, &event)
42 if err != nil {
43 t.Fatalf("failed to handle identity event: %v", err)
44 }
45
46 // Verify user was indexed
47 user, err := userService.GetUserByDID(ctx, "did:plc:jetstream123")
48 if err != nil {
49 t.Fatalf("failed to get indexed user: %v", err)
50 }
51
52 if user.DID != "did:plc:jetstream123" {
53 t.Errorf("expected DID did:plc:jetstream123, got %s", user.DID)
54 }
55
56 if user.Handle != "alice.jetstream.test" {
57 t.Errorf("expected handle alice.jetstream.test, got %s", user.Handle)
58 }
59 })
60
61 t.Run("Idempotent indexing - duplicate event", func(t *testing.T) {
62 // Create a user first
63 _, err := userService.CreateUser(ctx, users.CreateUserRequest{
64 DID: "did:plc:duplicate123",
65 Handle: "duplicate.test",
66 PDSURL: "https://bsky.social",
67 })
68 if err != nil {
69 t.Fatalf("failed to create initial user: %v", err)
70 }
71
72 // Simulate duplicate identity event
73 event := jetstream.JetstreamEvent{
74 Did: "did:plc:duplicate123",
75 Kind: "identity",
76 Identity: &jetstream.IdentityEvent{
77 Did: "did:plc:duplicate123",
78 Handle: "duplicate.test",
79 Seq: 12346,
80 Time: time.Now().Format(time.RFC3339),
81 },
82 }
83
84 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
85
86 // Handle duplicate event - should not error
87 err = consumer.HandleIdentityEventPublic(ctx, &event)
88 if err != nil {
89 t.Fatalf("duplicate event should be handled gracefully: %v", err)
90 }
91
92 // Verify still only one user
93 user, err := userService.GetUserByDID(ctx, "did:plc:duplicate123")
94 if err != nil {
95 t.Fatalf("failed to get user: %v", err)
96 }
97
98 if user.Handle != "duplicate.test" {
99 t.Errorf("expected handle duplicate.test, got %s", user.Handle)
100 }
101 })
102
103 t.Run("Index multiple users", func(t *testing.T) {
104 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
105
106 users := []struct {
107 did string
108 handle string
109 }{
110 {"did:plc:multi1", "user1.test"},
111 {"did:plc:multi2", "user2.test"},
112 {"did:plc:multi3", "user3.test"},
113 }
114
115 for _, u := range users {
116 event := jetstream.JetstreamEvent{
117 Did: u.did,
118 Kind: "identity",
119 Identity: &jetstream.IdentityEvent{
120 Did: u.did,
121 Handle: u.handle,
122 Seq: 12345,
123 Time: time.Now().Format(time.RFC3339),
124 },
125 }
126
127 err := consumer.HandleIdentityEventPublic(ctx, &event)
128 if err != nil {
129 t.Fatalf("failed to index user %s: %v", u.handle, err)
130 }
131 }
132
133 // Verify all users indexed
134 for _, u := range users {
135 user, err := userService.GetUserByDID(ctx, u.did)
136 if err != nil {
137 t.Fatalf("user %s not found: %v", u.did, err)
138 }
139
140 if user.Handle != u.handle {
141 t.Errorf("expected handle %s, got %s", u.handle, user.Handle)
142 }
143 }
144 })
145
146 t.Run("Skip invalid events", func(t *testing.T) {
147 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
148
149 // Missing DID
150 invalidEvent1 := jetstream.JetstreamEvent{
151 Did: "",
152 Kind: "identity",
153 Identity: &jetstream.IdentityEvent{
154 Did: "",
155 Handle: "invalid.test",
156 Seq: 12345,
157 Time: time.Now().Format(time.RFC3339),
158 },
159 }
160
161 err := consumer.HandleIdentityEventPublic(ctx, &invalidEvent1)
162 if err == nil {
163 t.Error("expected error for missing DID, got nil")
164 }
165
166 // Missing handle
167 invalidEvent2 := jetstream.JetstreamEvent{
168 Did: "did:plc:invalid",
169 Kind: "identity",
170 Identity: &jetstream.IdentityEvent{
171 Did: "did:plc:invalid",
172 Handle: "",
173 Seq: 12345,
174 Time: time.Now().Format(time.RFC3339),
175 },
176 }
177
178 err = consumer.HandleIdentityEventPublic(ctx, &invalidEvent2)
179 if err == nil {
180 t.Error("expected error for missing handle, got nil")
181 }
182
183 // Missing identity data
184 invalidEvent3 := jetstream.JetstreamEvent{
185 Did: "did:plc:invalid2",
186 Kind: "identity",
187 Identity: nil,
188 }
189
190 err = consumer.HandleIdentityEventPublic(ctx, &invalidEvent3)
191 if err == nil {
192 t.Error("expected error for nil identity data, got nil")
193 }
194 })
195
196 t.Run("Handle change updates database and purges cache", func(t *testing.T) {
197 testID := "handlechange"
198 oldHandle := "old." + testID + ".test"
199 newHandle := "new." + testID + ".test"
200 did := "did:plc:" + testID
201
202 // 1. Create user with old handle
203 _, err := userService.CreateUser(ctx, users.CreateUserRequest{
204 DID: did,
205 Handle: oldHandle,
206 PDSURL: "https://bsky.social",
207 })
208 if err != nil {
209 t.Fatalf("failed to create initial user: %v", err)
210 }
211
212 // 2. Manually cache the identity (simulate a previous resolution)
213 cache := identity.NewPostgresCache(db, 24*time.Hour)
214 err = cache.Set(ctx, &identity.Identity{
215 DID: did,
216 Handle: oldHandle,
217 PDSURL: "https://bsky.social",
218 Method: identity.MethodDNS,
219 ResolvedAt: time.Now().UTC(),
220 })
221 if err != nil {
222 t.Fatalf("failed to cache identity: %v", err)
223 }
224
225 // 3. Verify cached (both handle and DID should be cached)
226 cachedByHandle, err := cache.Get(ctx, oldHandle)
227 if err != nil {
228 t.Fatalf("expected old handle to be cached, got error: %v", err)
229 }
230 if cachedByHandle.DID != did {
231 t.Errorf("expected cached DID %s, got %s", did, cachedByHandle.DID)
232 }
233
234 cachedByDID, err := cache.Get(ctx, did)
235 if err != nil {
236 t.Fatalf("expected DID to be cached, got error: %v", err)
237 }
238 if cachedByDID.Handle != oldHandle {
239 t.Errorf("expected cached handle %s, got %s", oldHandle, cachedByDID.Handle)
240 }
241
242 // 4. Send identity event with NEW handle
243 event := jetstream.JetstreamEvent{
244 Did: did,
245 Kind: "identity",
246 Identity: &jetstream.IdentityEvent{
247 Did: did,
248 Handle: newHandle,
249 Seq: 99999,
250 Time: time.Now().Format(time.RFC3339),
251 },
252 }
253
254 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
255 err = consumer.HandleIdentityEventPublic(ctx, &event)
256 if err != nil {
257 t.Fatalf("failed to handle handle change event: %v", err)
258 }
259
260 // 5. Verify database updated
261 user, err := userService.GetUserByDID(ctx, did)
262 if err != nil {
263 t.Fatalf("failed to get user by DID: %v", err)
264 }
265 if user.Handle != newHandle {
266 t.Errorf("expected database to have new handle %s, got %s", newHandle, user.Handle)
267 }
268
269 // 6. Verify old handle purged from cache
270 _, err = cache.Get(ctx, oldHandle)
271 if err == nil {
272 t.Error("expected old handle to be purged from cache, but it's still cached")
273 }
274 if _, isCacheMiss := err.(*identity.ErrCacheMiss); !isCacheMiss {
275 t.Errorf("expected ErrCacheMiss for old handle, got: %v", err)
276 }
277
278 // 7. Verify DID cache purged
279 _, err = cache.Get(ctx, did)
280 if err == nil {
281 t.Error("expected DID to be purged from cache, but it's still cached")
282 }
283 if _, isCacheMiss := err.(*identity.ErrCacheMiss); !isCacheMiss {
284 t.Errorf("expected ErrCacheMiss for DID, got: %v", err)
285 }
286
287 // 8. Verify user can be found by new handle
288 userByHandle, err := userService.GetUserByHandle(ctx, newHandle)
289 if err != nil {
290 t.Fatalf("failed to get user by new handle: %v", err)
291 }
292 if userByHandle.DID != did {
293 t.Errorf("expected DID %s when looking up by new handle, got %s", did, userByHandle.DID)
294 }
295 })
296}
297
298func TestUserServiceIdempotency(t *testing.T) {
299 db := setupTestDB(t)
300 defer db.Close()
301
302 userRepo := postgres.NewUserRepository(db)
303 resolver := identity.NewResolver(db, identity.DefaultConfig())
304 userService := users.NewUserService(userRepo, resolver, "http://localhost:3001")
305 ctx := context.Background()
306
307 t.Run("CreateUser is idempotent for duplicate DID", func(t *testing.T) {
308 req := users.CreateUserRequest{
309 DID: "did:plc:idempotent123",
310 Handle: "idempotent.test",
311 PDSURL: "https://bsky.social",
312 }
313
314 // First creation
315 user1, err := userService.CreateUser(ctx, req)
316 if err != nil {
317 t.Fatalf("first creation failed: %v", err)
318 }
319
320 // Second creation with same DID - should return existing user, not error
321 user2, err := userService.CreateUser(ctx, req)
322 if err != nil {
323 t.Fatalf("second creation should be idempotent: %v", err)
324 }
325
326 if user1.DID != user2.DID {
327 t.Errorf("expected same DID, got %s and %s", user1.DID, user2.DID)
328 }
329
330 if user1.CreatedAt != user2.CreatedAt {
331 t.Errorf("expected same user (same created_at), got different timestamps")
332 }
333 })
334
335 t.Run("CreateUser fails for duplicate handle with different DID", func(t *testing.T) {
336 // Create first user
337 _, err := userService.CreateUser(ctx, users.CreateUserRequest{
338 DID: "did:plc:handleconflict1",
339 Handle: "conflicting.handle",
340 PDSURL: "https://bsky.social",
341 })
342 if err != nil {
343 t.Fatalf("first creation failed: %v", err)
344 }
345
346 // Try to create different user with same handle
347 _, err = userService.CreateUser(ctx, users.CreateUserRequest{
348 DID: "did:plc:handleconflict2",
349 Handle: "conflicting.handle", // Same handle, different DID
350 PDSURL: "https://bsky.social",
351 })
352
353 if err == nil {
354 t.Fatal("expected error for duplicate handle, got nil")
355 }
356
357 if !contains(err.Error(), "handle already taken") {
358 t.Errorf("expected 'handle already taken' error, got: %v", err)
359 }
360 })
361}
362
363// Helper function
364func contains(s, substr string) bool {
365 return len(s) >= len(substr) && anySubstring(s, substr)
366}
367
368func anySubstring(s, substr string) bool {
369 for i := 0; i <= len(s)-len(substr); i++ {
370 if s[i:i+len(substr)] == substr {
371 return true
372 }
373 }
374 return false
375}