A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/identity"
5 "Coves/internal/atproto/jetstream"
6 "Coves/internal/core/users"
7 "Coves/internal/db/postgres"
8 "context"
9 "testing"
10 "time"
11)
12
13func TestUserIndexingFromJetstream(t *testing.T) {
14 db := setupTestDB(t)
15 defer func() {
16 if err := db.Close(); err != nil {
17 t.Logf("Failed to close database: %v", err)
18 }
19 }()
20
21 // Wire up dependencies
22 userRepo := postgres.NewUserRepository(db)
23 resolver := identity.NewResolver(db, identity.DefaultConfig())
24 userService := users.NewUserService(userRepo, resolver, "http://localhost:3001")
25
26 ctx := context.Background()
27
28 t.Run("Index new user from identity event", func(t *testing.T) {
29 // Simulate an identity event from Jetstream
30 event := jetstream.JetstreamEvent{
31 Did: "did:plc:jetstream123",
32 Kind: "identity",
33 Identity: &jetstream.IdentityEvent{
34 Did: "did:plc:jetstream123",
35 Handle: "alice.jetstream.test",
36 Seq: 12345,
37 Time: time.Now().Format(time.RFC3339),
38 },
39 }
40
41 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
42
43 // Handle the event
44 err := consumer.HandleIdentityEventPublic(ctx, &event)
45 if err != nil {
46 t.Fatalf("failed to handle identity event: %v", err)
47 }
48
49 // Verify user was indexed
50 user, err := userService.GetUserByDID(ctx, "did:plc:jetstream123")
51 if err != nil {
52 t.Fatalf("failed to get indexed user: %v", err)
53 }
54
55 if user.DID != "did:plc:jetstream123" {
56 t.Errorf("expected DID did:plc:jetstream123, got %s", user.DID)
57 }
58
59 if user.Handle != "alice.jetstream.test" {
60 t.Errorf("expected handle alice.jetstream.test, got %s", user.Handle)
61 }
62 })
63
64 t.Run("Idempotent indexing - duplicate event", func(t *testing.T) {
65 // Create a user first
66 _, err := userService.CreateUser(ctx, users.CreateUserRequest{
67 DID: "did:plc:duplicate123",
68 Handle: "duplicate.test",
69 PDSURL: "https://bsky.social",
70 })
71 if err != nil {
72 t.Fatalf("failed to create initial user: %v", err)
73 }
74
75 // Simulate duplicate identity event
76 event := jetstream.JetstreamEvent{
77 Did: "did:plc:duplicate123",
78 Kind: "identity",
79 Identity: &jetstream.IdentityEvent{
80 Did: "did:plc:duplicate123",
81 Handle: "duplicate.test",
82 Seq: 12346,
83 Time: time.Now().Format(time.RFC3339),
84 },
85 }
86
87 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
88
89 // Handle duplicate event - should not error
90 err = consumer.HandleIdentityEventPublic(ctx, &event)
91 if err != nil {
92 t.Fatalf("duplicate event should be handled gracefully: %v", err)
93 }
94
95 // Verify still only one user
96 user, err := userService.GetUserByDID(ctx, "did:plc:duplicate123")
97 if err != nil {
98 t.Fatalf("failed to get user: %v", err)
99 }
100
101 if user.Handle != "duplicate.test" {
102 t.Errorf("expected handle duplicate.test, got %s", user.Handle)
103 }
104 })
105
106 t.Run("Index multiple users", func(t *testing.T) {
107 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
108
109 users := []struct {
110 did string
111 handle string
112 }{
113 {"did:plc:multi1", "user1.test"},
114 {"did:plc:multi2", "user2.test"},
115 {"did:plc:multi3", "user3.test"},
116 }
117
118 for _, u := range users {
119 event := jetstream.JetstreamEvent{
120 Did: u.did,
121 Kind: "identity",
122 Identity: &jetstream.IdentityEvent{
123 Did: u.did,
124 Handle: u.handle,
125 Seq: 12345,
126 Time: time.Now().Format(time.RFC3339),
127 },
128 }
129
130 err := consumer.HandleIdentityEventPublic(ctx, &event)
131 if err != nil {
132 t.Fatalf("failed to index user %s: %v", u.handle, err)
133 }
134 }
135
136 // Verify all users indexed
137 for _, u := range users {
138 user, err := userService.GetUserByDID(ctx, u.did)
139 if err != nil {
140 t.Fatalf("user %s not found: %v", u.did, err)
141 }
142
143 if user.Handle != u.handle {
144 t.Errorf("expected handle %s, got %s", u.handle, user.Handle)
145 }
146 }
147 })
148
149 t.Run("Skip invalid events", func(t *testing.T) {
150 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
151
152 // Missing DID
153 invalidEvent1 := jetstream.JetstreamEvent{
154 Did: "",
155 Kind: "identity",
156 Identity: &jetstream.IdentityEvent{
157 Did: "",
158 Handle: "invalid.test",
159 Seq: 12345,
160 Time: time.Now().Format(time.RFC3339),
161 },
162 }
163
164 err := consumer.HandleIdentityEventPublic(ctx, &invalidEvent1)
165 if err == nil {
166 t.Error("expected error for missing DID, got nil")
167 }
168
169 // Missing handle
170 invalidEvent2 := jetstream.JetstreamEvent{
171 Did: "did:plc:invalid",
172 Kind: "identity",
173 Identity: &jetstream.IdentityEvent{
174 Did: "did:plc:invalid",
175 Handle: "",
176 Seq: 12345,
177 Time: time.Now().Format(time.RFC3339),
178 },
179 }
180
181 err = consumer.HandleIdentityEventPublic(ctx, &invalidEvent2)
182 if err == nil {
183 t.Error("expected error for missing handle, got nil")
184 }
185
186 // Missing identity data
187 invalidEvent3 := jetstream.JetstreamEvent{
188 Did: "did:plc:invalid2",
189 Kind: "identity",
190 Identity: nil,
191 }
192
193 err = consumer.HandleIdentityEventPublic(ctx, &invalidEvent3)
194 if err == nil {
195 t.Error("expected error for nil identity data, got nil")
196 }
197 })
198
199 t.Run("Handle change updates database and purges cache", func(t *testing.T) {
200 testID := "handlechange"
201 oldHandle := "old." + testID + ".test"
202 newHandle := "new." + testID + ".test"
203 did := "did:plc:" + testID
204
205 // 1. Create user with old handle
206 _, err := userService.CreateUser(ctx, users.CreateUserRequest{
207 DID: did,
208 Handle: oldHandle,
209 PDSURL: "https://bsky.social",
210 })
211 if err != nil {
212 t.Fatalf("failed to create initial user: %v", err)
213 }
214
215 // 2. Manually cache the identity (simulate a previous resolution)
216 cache := identity.NewPostgresCache(db, 24*time.Hour)
217 err = cache.Set(ctx, &identity.Identity{
218 DID: did,
219 Handle: oldHandle,
220 PDSURL: "https://bsky.social",
221 Method: identity.MethodDNS,
222 ResolvedAt: time.Now().UTC(),
223 })
224 if err != nil {
225 t.Fatalf("failed to cache identity: %v", err)
226 }
227
228 // 3. Verify cached (both handle and DID should be cached)
229 cachedByHandle, err := cache.Get(ctx, oldHandle)
230 if err != nil {
231 t.Fatalf("expected old handle to be cached, got error: %v", err)
232 }
233 if cachedByHandle.DID != did {
234 t.Errorf("expected cached DID %s, got %s", did, cachedByHandle.DID)
235 }
236
237 cachedByDID, err := cache.Get(ctx, did)
238 if err != nil {
239 t.Fatalf("expected DID to be cached, got error: %v", err)
240 }
241 if cachedByDID.Handle != oldHandle {
242 t.Errorf("expected cached handle %s, got %s", oldHandle, cachedByDID.Handle)
243 }
244
245 // 4. Send identity event with NEW handle
246 event := jetstream.JetstreamEvent{
247 Did: did,
248 Kind: "identity",
249 Identity: &jetstream.IdentityEvent{
250 Did: did,
251 Handle: newHandle,
252 Seq: 99999,
253 Time: time.Now().Format(time.RFC3339),
254 },
255 }
256
257 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
258 err = consumer.HandleIdentityEventPublic(ctx, &event)
259 if err != nil {
260 t.Fatalf("failed to handle handle change event: %v", err)
261 }
262
263 // 5. Verify database updated
264 user, err := userService.GetUserByDID(ctx, did)
265 if err != nil {
266 t.Fatalf("failed to get user by DID: %v", err)
267 }
268 if user.Handle != newHandle {
269 t.Errorf("expected database to have new handle %s, got %s", newHandle, user.Handle)
270 }
271
272 // 6. Verify old handle purged from cache
273 _, err = cache.Get(ctx, oldHandle)
274 if err == nil {
275 t.Error("expected old handle to be purged from cache, but it's still cached")
276 }
277 if _, isCacheMiss := err.(*identity.ErrCacheMiss); !isCacheMiss {
278 t.Errorf("expected ErrCacheMiss for old handle, got: %v", err)
279 }
280
281 // 7. Verify DID cache purged
282 _, err = cache.Get(ctx, did)
283 if err == nil {
284 t.Error("expected DID to be purged from cache, but it's still cached")
285 }
286 if _, isCacheMiss := err.(*identity.ErrCacheMiss); !isCacheMiss {
287 t.Errorf("expected ErrCacheMiss for DID, got: %v", err)
288 }
289
290 // 8. Verify user can be found by new handle
291 userByHandle, err := userService.GetUserByHandle(ctx, newHandle)
292 if err != nil {
293 t.Fatalf("failed to get user by new handle: %v", err)
294 }
295 if userByHandle.DID != did {
296 t.Errorf("expected DID %s when looking up by new handle, got %s", did, userByHandle.DID)
297 }
298 })
299}
300
301func TestUserServiceIdempotency(t *testing.T) {
302 db := setupTestDB(t)
303 defer func() {
304 if err := db.Close(); err != nil {
305 t.Logf("Failed to close database: %v", err)
306 }
307 }()
308
309 userRepo := postgres.NewUserRepository(db)
310 resolver := identity.NewResolver(db, identity.DefaultConfig())
311 userService := users.NewUserService(userRepo, resolver, "http://localhost:3001")
312 ctx := context.Background()
313
314 t.Run("CreateUser is idempotent for duplicate DID", func(t *testing.T) {
315 req := users.CreateUserRequest{
316 DID: "did:plc:idempotent123",
317 Handle: "idempotent.test",
318 PDSURL: "https://bsky.social",
319 }
320
321 // First creation
322 user1, err := userService.CreateUser(ctx, req)
323 if err != nil {
324 t.Fatalf("first creation failed: %v", err)
325 }
326
327 // Second creation with same DID - should return existing user, not error
328 user2, err := userService.CreateUser(ctx, req)
329 if err != nil {
330 t.Fatalf("second creation should be idempotent: %v", err)
331 }
332
333 if user1.DID != user2.DID {
334 t.Errorf("expected same DID, got %s and %s", user1.DID, user2.DID)
335 }
336
337 if user1.CreatedAt != user2.CreatedAt {
338 t.Errorf("expected same user (same created_at), got different timestamps")
339 }
340 })
341
342 t.Run("CreateUser fails for duplicate handle with different DID", func(t *testing.T) {
343 // Create first user
344 _, err := userService.CreateUser(ctx, users.CreateUserRequest{
345 DID: "did:plc:handleconflict1",
346 Handle: "conflicting.handle",
347 PDSURL: "https://bsky.social",
348 })
349 if err != nil {
350 t.Fatalf("first creation failed: %v", err)
351 }
352
353 // Try to create different user with same handle
354 _, err = userService.CreateUser(ctx, users.CreateUserRequest{
355 DID: "did:plc:handleconflict2",
356 Handle: "conflicting.handle", // Same handle, different DID
357 PDSURL: "https://bsky.social",
358 })
359
360 if err == nil {
361 t.Fatal("expected error for duplicate handle, got nil")
362 }
363
364 if !contains(err.Error(), "handle already taken") {
365 t.Errorf("expected 'handle already taken' error, got: %v", err)
366 }
367 })
368}
369
370// Helper functions moved to helpers.go