A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "context"
5 "testing"
6 "time"
7
8 "Coves/internal/atproto/identity"
9 "Coves/internal/atproto/jetstream"
10 "Coves/internal/core/users"
11 "Coves/internal/db/postgres"
12)
13
14func TestUserIndexingFromJetstream(t *testing.T) {
15 db := setupTestDB(t)
16 defer func() {
17 if err := db.Close(); err != nil {
18 t.Logf("Failed to close database: %v", err)
19 }
20 }()
21
22 // Wire up dependencies
23 userRepo := postgres.NewUserRepository(db)
24 resolver := identity.NewResolver(db, identity.DefaultConfig())
25 userService := users.NewUserService(userRepo, resolver, "http://localhost:3001")
26
27 ctx := context.Background()
28
29 t.Run("Index new user from identity event", func(t *testing.T) {
30 // Simulate an identity event from Jetstream
31 event := jetstream.JetstreamEvent{
32 Did: "did:plc:jetstream123",
33 Kind: "identity",
34 Identity: &jetstream.IdentityEvent{
35 Did: "did:plc:jetstream123",
36 Handle: "alice.jetstream.test",
37 Seq: 12345,
38 Time: time.Now().Format(time.RFC3339),
39 },
40 }
41
42 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
43
44 // Handle the event
45 err := consumer.HandleIdentityEventPublic(ctx, &event)
46 if err != nil {
47 t.Fatalf("failed to handle identity event: %v", err)
48 }
49
50 // Verify user was indexed
51 user, err := userService.GetUserByDID(ctx, "did:plc:jetstream123")
52 if err != nil {
53 t.Fatalf("failed to get indexed user: %v", err)
54 }
55
56 if user.DID != "did:plc:jetstream123" {
57 t.Errorf("expected DID did:plc:jetstream123, got %s", user.DID)
58 }
59
60 if user.Handle != "alice.jetstream.test" {
61 t.Errorf("expected handle alice.jetstream.test, got %s", user.Handle)
62 }
63 })
64
65 t.Run("Idempotent indexing - duplicate event", func(t *testing.T) {
66 // Create a user first
67 _, err := userService.CreateUser(ctx, users.CreateUserRequest{
68 DID: "did:plc:duplicate123",
69 Handle: "duplicate.test",
70 PDSURL: "https://bsky.social",
71 })
72 if err != nil {
73 t.Fatalf("failed to create initial user: %v", err)
74 }
75
76 // Simulate duplicate identity event
77 event := jetstream.JetstreamEvent{
78 Did: "did:plc:duplicate123",
79 Kind: "identity",
80 Identity: &jetstream.IdentityEvent{
81 Did: "did:plc:duplicate123",
82 Handle: "duplicate.test",
83 Seq: 12346,
84 Time: time.Now().Format(time.RFC3339),
85 },
86 }
87
88 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
89
90 // Handle duplicate event - should not error
91 err = consumer.HandleIdentityEventPublic(ctx, &event)
92 if err != nil {
93 t.Fatalf("duplicate event should be handled gracefully: %v", err)
94 }
95
96 // Verify still only one user
97 user, err := userService.GetUserByDID(ctx, "did:plc:duplicate123")
98 if err != nil {
99 t.Fatalf("failed to get user: %v", err)
100 }
101
102 if user.Handle != "duplicate.test" {
103 t.Errorf("expected handle duplicate.test, got %s", user.Handle)
104 }
105 })
106
107 t.Run("Index multiple users", func(t *testing.T) {
108 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
109
110 users := []struct {
111 did string
112 handle string
113 }{
114 {"did:plc:multi1", "user1.test"},
115 {"did:plc:multi2", "user2.test"},
116 {"did:plc:multi3", "user3.test"},
117 }
118
119 for _, u := range users {
120 event := jetstream.JetstreamEvent{
121 Did: u.did,
122 Kind: "identity",
123 Identity: &jetstream.IdentityEvent{
124 Did: u.did,
125 Handle: u.handle,
126 Seq: 12345,
127 Time: time.Now().Format(time.RFC3339),
128 },
129 }
130
131 err := consumer.HandleIdentityEventPublic(ctx, &event)
132 if err != nil {
133 t.Fatalf("failed to index user %s: %v", u.handle, err)
134 }
135 }
136
137 // Verify all users indexed
138 for _, u := range users {
139 user, err := userService.GetUserByDID(ctx, u.did)
140 if err != nil {
141 t.Fatalf("user %s not found: %v", u.did, err)
142 }
143
144 if user.Handle != u.handle {
145 t.Errorf("expected handle %s, got %s", u.handle, user.Handle)
146 }
147 }
148 })
149
150 t.Run("Skip invalid events", func(t *testing.T) {
151 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
152
153 // Missing DID
154 invalidEvent1 := jetstream.JetstreamEvent{
155 Did: "",
156 Kind: "identity",
157 Identity: &jetstream.IdentityEvent{
158 Did: "",
159 Handle: "invalid.test",
160 Seq: 12345,
161 Time: time.Now().Format(time.RFC3339),
162 },
163 }
164
165 err := consumer.HandleIdentityEventPublic(ctx, &invalidEvent1)
166 if err == nil {
167 t.Error("expected error for missing DID, got nil")
168 }
169
170 // Missing handle
171 invalidEvent2 := jetstream.JetstreamEvent{
172 Did: "did:plc:invalid",
173 Kind: "identity",
174 Identity: &jetstream.IdentityEvent{
175 Did: "did:plc:invalid",
176 Handle: "",
177 Seq: 12345,
178 Time: time.Now().Format(time.RFC3339),
179 },
180 }
181
182 err = consumer.HandleIdentityEventPublic(ctx, &invalidEvent2)
183 if err == nil {
184 t.Error("expected error for missing handle, got nil")
185 }
186
187 // Missing identity data
188 invalidEvent3 := jetstream.JetstreamEvent{
189 Did: "did:plc:invalid2",
190 Kind: "identity",
191 Identity: nil,
192 }
193
194 err = consumer.HandleIdentityEventPublic(ctx, &invalidEvent3)
195 if err == nil {
196 t.Error("expected error for nil identity data, got nil")
197 }
198 })
199
200 t.Run("Handle change updates database and purges cache", func(t *testing.T) {
201 testID := "handlechange"
202 oldHandle := "old." + testID + ".test"
203 newHandle := "new." + testID + ".test"
204 did := "did:plc:" + testID
205
206 // 1. Create user with old handle
207 _, err := userService.CreateUser(ctx, users.CreateUserRequest{
208 DID: did,
209 Handle: oldHandle,
210 PDSURL: "https://bsky.social",
211 })
212 if err != nil {
213 t.Fatalf("failed to create initial user: %v", err)
214 }
215
216 // 2. Manually cache the identity (simulate a previous resolution)
217 cache := identity.NewPostgresCache(db, 24*time.Hour)
218 err = cache.Set(ctx, &identity.Identity{
219 DID: did,
220 Handle: oldHandle,
221 PDSURL: "https://bsky.social",
222 Method: identity.MethodDNS,
223 ResolvedAt: time.Now().UTC(),
224 })
225 if err != nil {
226 t.Fatalf("failed to cache identity: %v", err)
227 }
228
229 // 3. Verify cached (both handle and DID should be cached)
230 cachedByHandle, err := cache.Get(ctx, oldHandle)
231 if err != nil {
232 t.Fatalf("expected old handle to be cached, got error: %v", err)
233 }
234 if cachedByHandle.DID != did {
235 t.Errorf("expected cached DID %s, got %s", did, cachedByHandle.DID)
236 }
237
238 cachedByDID, err := cache.Get(ctx, did)
239 if err != nil {
240 t.Fatalf("expected DID to be cached, got error: %v", err)
241 }
242 if cachedByDID.Handle != oldHandle {
243 t.Errorf("expected cached handle %s, got %s", oldHandle, cachedByDID.Handle)
244 }
245
246 // 4. Send identity event with NEW handle
247 event := jetstream.JetstreamEvent{
248 Did: did,
249 Kind: "identity",
250 Identity: &jetstream.IdentityEvent{
251 Did: did,
252 Handle: newHandle,
253 Seq: 99999,
254 Time: time.Now().Format(time.RFC3339),
255 },
256 }
257
258 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
259 err = consumer.HandleIdentityEventPublic(ctx, &event)
260 if err != nil {
261 t.Fatalf("failed to handle handle change event: %v", err)
262 }
263
264 // 5. Verify database updated
265 user, err := userService.GetUserByDID(ctx, did)
266 if err != nil {
267 t.Fatalf("failed to get user by DID: %v", err)
268 }
269 if user.Handle != newHandle {
270 t.Errorf("expected database to have new handle %s, got %s", newHandle, user.Handle)
271 }
272
273 // 6. Verify old handle purged from cache
274 _, err = cache.Get(ctx, oldHandle)
275 if err == nil {
276 t.Error("expected old handle to be purged from cache, but it's still cached")
277 }
278 if _, isCacheMiss := err.(*identity.ErrCacheMiss); !isCacheMiss {
279 t.Errorf("expected ErrCacheMiss for old handle, got: %v", err)
280 }
281
282 // 7. Verify DID cache purged
283 _, err = cache.Get(ctx, did)
284 if err == nil {
285 t.Error("expected DID to be purged from cache, but it's still cached")
286 }
287 if _, isCacheMiss := err.(*identity.ErrCacheMiss); !isCacheMiss {
288 t.Errorf("expected ErrCacheMiss for DID, got: %v", err)
289 }
290
291 // 8. Verify user can be found by new handle
292 userByHandle, err := userService.GetUserByHandle(ctx, newHandle)
293 if err != nil {
294 t.Fatalf("failed to get user by new handle: %v", err)
295 }
296 if userByHandle.DID != did {
297 t.Errorf("expected DID %s when looking up by new handle, got %s", did, userByHandle.DID)
298 }
299 })
300}
301
302func TestUserServiceIdempotency(t *testing.T) {
303 db := setupTestDB(t)
304 defer func() {
305 if err := db.Close(); err != nil {
306 t.Logf("Failed to close database: %v", err)
307 }
308 }()
309
310 userRepo := postgres.NewUserRepository(db)
311 resolver := identity.NewResolver(db, identity.DefaultConfig())
312 userService := users.NewUserService(userRepo, resolver, "http://localhost:3001")
313 ctx := context.Background()
314
315 t.Run("CreateUser is idempotent for duplicate DID", func(t *testing.T) {
316 req := users.CreateUserRequest{
317 DID: "did:plc:idempotent123",
318 Handle: "idempotent.test",
319 PDSURL: "https://bsky.social",
320 }
321
322 // First creation
323 user1, err := userService.CreateUser(ctx, req)
324 if err != nil {
325 t.Fatalf("first creation failed: %v", err)
326 }
327
328 // Second creation with same DID - should return existing user, not error
329 user2, err := userService.CreateUser(ctx, req)
330 if err != nil {
331 t.Fatalf("second creation should be idempotent: %v", err)
332 }
333
334 if user1.DID != user2.DID {
335 t.Errorf("expected same DID, got %s and %s", user1.DID, user2.DID)
336 }
337
338 if user1.CreatedAt != user2.CreatedAt {
339 t.Errorf("expected same user (same created_at), got different timestamps")
340 }
341 })
342
343 t.Run("CreateUser fails for duplicate handle with different DID", func(t *testing.T) {
344 // Create first user
345 _, err := userService.CreateUser(ctx, users.CreateUserRequest{
346 DID: "did:plc:handleconflict1",
347 Handle: "conflicting.handle",
348 PDSURL: "https://bsky.social",
349 })
350 if err != nil {
351 t.Fatalf("first creation failed: %v", err)
352 }
353
354 // Try to create different user with same handle
355 _, err = userService.CreateUser(ctx, users.CreateUserRequest{
356 DID: "did:plc:handleconflict2",
357 Handle: "conflicting.handle", // Same handle, different DID
358 PDSURL: "https://bsky.social",
359 })
360
361 if err == nil {
362 t.Fatal("expected error for duplicate handle, got nil")
363 }
364
365 if !contains(err.Error(), "handle already taken") {
366 t.Errorf("expected 'handle already taken' error, got: %v", err)
367 }
368 })
369}
370
371// Helper functions moved to helpers.go