A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "context"
5 "testing"
6 "time"
7
8 "Coves/internal/core/users"
9 "Coves/internal/db/postgres"
10 "Coves/internal/jetstream"
11)
12
13func TestUserIndexingFromJetstream(t *testing.T) {
14 db := setupTestDB(t)
15 defer db.Close()
16
17 // Wire up dependencies
18 userRepo := postgres.NewUserRepository(db)
19 userService := users.NewUserService(userRepo, "http://localhost:3001")
20
21 ctx := context.Background()
22
23 t.Run("Index new user from identity event", func(t *testing.T) {
24 // Simulate an identity event from Jetstream
25 event := jetstream.JetstreamEvent{
26 Did: "did:plc:jetstream123",
27 Kind: "identity",
28 Identity: &jetstream.IdentityEvent{
29 Did: "did:plc:jetstream123",
30 Handle: "alice.jetstream.test",
31 Seq: 12345,
32 Time: time.Now().Format(time.RFC3339),
33 },
34 }
35
36 consumer := jetstream.NewUserEventConsumer(userService, "", "")
37
38 // Handle the event
39 err := consumer.HandleIdentityEventPublic(ctx, &event)
40 if err != nil {
41 t.Fatalf("failed to handle identity event: %v", err)
42 }
43
44 // Verify user was indexed
45 user, err := userService.GetUserByDID(ctx, "did:plc:jetstream123")
46 if err != nil {
47 t.Fatalf("failed to get indexed user: %v", err)
48 }
49
50 if user.DID != "did:plc:jetstream123" {
51 t.Errorf("expected DID did:plc:jetstream123, got %s", user.DID)
52 }
53
54 if user.Handle != "alice.jetstream.test" {
55 t.Errorf("expected handle alice.jetstream.test, got %s", user.Handle)
56 }
57 })
58
59 t.Run("Idempotent indexing - duplicate event", func(t *testing.T) {
60 // Create a user first
61 _, err := userService.CreateUser(ctx, users.CreateUserRequest{
62 DID: "did:plc:duplicate123",
63 Handle: "duplicate.test",
64 PDSURL: "https://bsky.social",
65 })
66 if err != nil {
67 t.Fatalf("failed to create initial user: %v", err)
68 }
69
70 // Simulate duplicate identity event
71 event := jetstream.JetstreamEvent{
72 Did: "did:plc:duplicate123",
73 Kind: "identity",
74 Identity: &jetstream.IdentityEvent{
75 Did: "did:plc:duplicate123",
76 Handle: "duplicate.test",
77 Seq: 12346,
78 Time: time.Now().Format(time.RFC3339),
79 },
80 }
81
82 consumer := jetstream.NewUserEventConsumer(userService, "", "")
83
84 // Handle duplicate event - should not error
85 err = consumer.HandleIdentityEventPublic(ctx, &event)
86 if err != nil {
87 t.Fatalf("duplicate event should be handled gracefully: %v", err)
88 }
89
90 // Verify still only one user
91 user, err := userService.GetUserByDID(ctx, "did:plc:duplicate123")
92 if err != nil {
93 t.Fatalf("failed to get user: %v", err)
94 }
95
96 if user.Handle != "duplicate.test" {
97 t.Errorf("expected handle duplicate.test, got %s", user.Handle)
98 }
99 })
100
101 t.Run("Index multiple users", func(t *testing.T) {
102 consumer := jetstream.NewUserEventConsumer(userService, "", "")
103
104 users := []struct {
105 did string
106 handle string
107 }{
108 {"did:plc:multi1", "user1.test"},
109 {"did:plc:multi2", "user2.test"},
110 {"did:plc:multi3", "user3.test"},
111 }
112
113 for _, u := range users {
114 event := jetstream.JetstreamEvent{
115 Did: u.did,
116 Kind: "identity",
117 Identity: &jetstream.IdentityEvent{
118 Did: u.did,
119 Handle: u.handle,
120 Seq: 12345,
121 Time: time.Now().Format(time.RFC3339),
122 },
123 }
124
125 err := consumer.HandleIdentityEventPublic(ctx, &event)
126 if err != nil {
127 t.Fatalf("failed to index user %s: %v", u.handle, err)
128 }
129 }
130
131 // Verify all users indexed
132 for _, u := range users {
133 user, err := userService.GetUserByDID(ctx, u.did)
134 if err != nil {
135 t.Fatalf("user %s not found: %v", u.did, err)
136 }
137
138 if user.Handle != u.handle {
139 t.Errorf("expected handle %s, got %s", u.handle, user.Handle)
140 }
141 }
142 })
143
144 t.Run("Skip invalid events", func(t *testing.T) {
145 consumer := jetstream.NewUserEventConsumer(userService, "", "")
146
147 // Missing DID
148 invalidEvent1 := jetstream.JetstreamEvent{
149 Did: "",
150 Kind: "identity",
151 Identity: &jetstream.IdentityEvent{
152 Did: "",
153 Handle: "invalid.test",
154 Seq: 12345,
155 Time: time.Now().Format(time.RFC3339),
156 },
157 }
158
159 err := consumer.HandleIdentityEventPublic(ctx, &invalidEvent1)
160 if err == nil {
161 t.Error("expected error for missing DID, got nil")
162 }
163
164 // Missing handle
165 invalidEvent2 := jetstream.JetstreamEvent{
166 Did: "did:plc:invalid",
167 Kind: "identity",
168 Identity: &jetstream.IdentityEvent{
169 Did: "did:plc:invalid",
170 Handle: "",
171 Seq: 12345,
172 Time: time.Now().Format(time.RFC3339),
173 },
174 }
175
176 err = consumer.HandleIdentityEventPublic(ctx, &invalidEvent2)
177 if err == nil {
178 t.Error("expected error for missing handle, got nil")
179 }
180
181 // Missing identity data
182 invalidEvent3 := jetstream.JetstreamEvent{
183 Did: "did:plc:invalid2",
184 Kind: "identity",
185 Identity: nil,
186 }
187
188 err = consumer.HandleIdentityEventPublic(ctx, &invalidEvent3)
189 if err == nil {
190 t.Error("expected error for nil identity data, got nil")
191 }
192 })
193}
194
195func TestUserServiceIdempotency(t *testing.T) {
196 db := setupTestDB(t)
197 defer db.Close()
198
199 userRepo := postgres.NewUserRepository(db)
200 userService := users.NewUserService(userRepo, "http://localhost:3001")
201 ctx := context.Background()
202
203 t.Run("CreateUser is idempotent for duplicate DID", func(t *testing.T) {
204 req := users.CreateUserRequest{
205 DID: "did:plc:idempotent123",
206 Handle: "idempotent.test",
207 PDSURL: "https://bsky.social",
208 }
209
210 // First creation
211 user1, err := userService.CreateUser(ctx, req)
212 if err != nil {
213 t.Fatalf("first creation failed: %v", err)
214 }
215
216 // Second creation with same DID - should return existing user, not error
217 user2, err := userService.CreateUser(ctx, req)
218 if err != nil {
219 t.Fatalf("second creation should be idempotent: %v", err)
220 }
221
222 if user1.DID != user2.DID {
223 t.Errorf("expected same DID, got %s and %s", user1.DID, user2.DID)
224 }
225
226 if user1.CreatedAt != user2.CreatedAt {
227 t.Errorf("expected same user (same created_at), got different timestamps")
228 }
229 })
230
231 t.Run("CreateUser fails for duplicate handle with different DID", func(t *testing.T) {
232 // Create first user
233 _, err := userService.CreateUser(ctx, users.CreateUserRequest{
234 DID: "did:plc:handleconflict1",
235 Handle: "conflicting.handle",
236 PDSURL: "https://bsky.social",
237 })
238 if err != nil {
239 t.Fatalf("first creation failed: %v", err)
240 }
241
242 // Try to create different user with same handle
243 _, err = userService.CreateUser(ctx, users.CreateUserRequest{
244 DID: "did:plc:handleconflict2",
245 Handle: "conflicting.handle", // Same handle, different DID
246 PDSURL: "https://bsky.social",
247 })
248
249 if err == nil {
250 t.Fatal("expected error for duplicate handle, got nil")
251 }
252
253 if !contains(err.Error(), "handle already taken") {
254 t.Errorf("expected 'handle already taken' error, got: %v", err)
255 }
256 })
257}
258
259// Helper function
260func contains(s, substr string) bool {
261 return len(s) >= len(substr) && anySubstring(s, substr)
262}
263
264func anySubstring(s, substr string) bool {
265 for i := 0; i <= len(s)-len(substr); i++ {
266 if s[i:i+len(substr)] == substr {
267 return true
268 }
269 }
270 return false
271}