A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "context" 5 "testing" 6 "time" 7 8 "Coves/internal/atproto/identity" 9 "Coves/internal/atproto/jetstream" 10 "Coves/internal/core/users" 11 "Coves/internal/db/postgres" 12) 13 14func TestUserIndexingFromJetstream(t *testing.T) { 15 db := setupTestDB(t) 16 defer db.Close() 17 18 // Wire up dependencies 19 userRepo := postgres.NewUserRepository(db) 20 resolver := identity.NewResolver(db, identity.DefaultConfig()) 21 userService := users.NewUserService(userRepo, resolver, "http://localhost:3001") 22 23 ctx := context.Background() 24 25 t.Run("Index new user from identity event", func(t *testing.T) { 26 // Simulate an identity event from Jetstream 27 event := jetstream.JetstreamEvent{ 28 Did: "did:plc:jetstream123", 29 Kind: "identity", 30 Identity: &jetstream.IdentityEvent{ 31 Did: "did:plc:jetstream123", 32 Handle: "alice.jetstream.test", 33 Seq: 12345, 34 Time: time.Now().Format(time.RFC3339), 35 }, 36 } 37 38 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 39 40 // Handle the event 41 err := consumer.HandleIdentityEventPublic(ctx, &event) 42 if err != nil { 43 t.Fatalf("failed to handle identity event: %v", err) 44 } 45 46 // Verify user was indexed 47 user, err := userService.GetUserByDID(ctx, "did:plc:jetstream123") 48 if err != nil { 49 t.Fatalf("failed to get indexed user: %v", err) 50 } 51 52 if user.DID != "did:plc:jetstream123" { 53 t.Errorf("expected DID did:plc:jetstream123, got %s", user.DID) 54 } 55 56 if user.Handle != "alice.jetstream.test" { 57 t.Errorf("expected handle alice.jetstream.test, got %s", user.Handle) 58 } 59 }) 60 61 t.Run("Idempotent indexing - duplicate event", func(t *testing.T) { 62 // Create a user first 63 _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 64 DID: "did:plc:duplicate123", 65 Handle: "duplicate.test", 66 PDSURL: "https://bsky.social", 67 }) 68 if err != nil { 69 t.Fatalf("failed to create initial user: %v", err) 70 } 71 72 // Simulate duplicate identity event 73 event := jetstream.JetstreamEvent{ 74 Did: "did:plc:duplicate123", 75 Kind: "identity", 76 Identity: &jetstream.IdentityEvent{ 77 Did: "did:plc:duplicate123", 78 Handle: "duplicate.test", 79 Seq: 12346, 80 Time: time.Now().Format(time.RFC3339), 81 }, 82 } 83 84 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 85 86 // Handle duplicate event - should not error 87 err = consumer.HandleIdentityEventPublic(ctx, &event) 88 if err != nil { 89 t.Fatalf("duplicate event should be handled gracefully: %v", err) 90 } 91 92 // Verify still only one user 93 user, err := userService.GetUserByDID(ctx, "did:plc:duplicate123") 94 if err != nil { 95 t.Fatalf("failed to get user: %v", err) 96 } 97 98 if user.Handle != "duplicate.test" { 99 t.Errorf("expected handle duplicate.test, got %s", user.Handle) 100 } 101 }) 102 103 t.Run("Index multiple users", func(t *testing.T) { 104 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 105 106 users := []struct { 107 did string 108 handle string 109 }{ 110 {"did:plc:multi1", "user1.test"}, 111 {"did:plc:multi2", "user2.test"}, 112 {"did:plc:multi3", "user3.test"}, 113 } 114 115 for _, u := range users { 116 event := jetstream.JetstreamEvent{ 117 Did: u.did, 118 Kind: "identity", 119 Identity: &jetstream.IdentityEvent{ 120 Did: u.did, 121 Handle: u.handle, 122 Seq: 12345, 123 Time: time.Now().Format(time.RFC3339), 124 }, 125 } 126 127 err := consumer.HandleIdentityEventPublic(ctx, &event) 128 if err != nil { 129 t.Fatalf("failed to index user %s: %v", u.handle, err) 130 } 131 } 132 133 // Verify all users indexed 134 for _, u := range users { 135 user, err := userService.GetUserByDID(ctx, u.did) 136 if err != nil { 137 t.Fatalf("user %s not found: %v", u.did, err) 138 } 139 140 if user.Handle != u.handle { 141 t.Errorf("expected handle %s, got %s", u.handle, user.Handle) 142 } 143 } 144 }) 145 146 t.Run("Skip invalid events", func(t *testing.T) { 147 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 148 149 // Missing DID 150 invalidEvent1 := jetstream.JetstreamEvent{ 151 Did: "", 152 Kind: "identity", 153 Identity: &jetstream.IdentityEvent{ 154 Did: "", 155 Handle: "invalid.test", 156 Seq: 12345, 157 Time: time.Now().Format(time.RFC3339), 158 }, 159 } 160 161 err := consumer.HandleIdentityEventPublic(ctx, &invalidEvent1) 162 if err == nil { 163 t.Error("expected error for missing DID, got nil") 164 } 165 166 // Missing handle 167 invalidEvent2 := jetstream.JetstreamEvent{ 168 Did: "did:plc:invalid", 169 Kind: "identity", 170 Identity: &jetstream.IdentityEvent{ 171 Did: "did:plc:invalid", 172 Handle: "", 173 Seq: 12345, 174 Time: time.Now().Format(time.RFC3339), 175 }, 176 } 177 178 err = consumer.HandleIdentityEventPublic(ctx, &invalidEvent2) 179 if err == nil { 180 t.Error("expected error for missing handle, got nil") 181 } 182 183 // Missing identity data 184 invalidEvent3 := jetstream.JetstreamEvent{ 185 Did: "did:plc:invalid2", 186 Kind: "identity", 187 Identity: nil, 188 } 189 190 err = consumer.HandleIdentityEventPublic(ctx, &invalidEvent3) 191 if err == nil { 192 t.Error("expected error for nil identity data, got nil") 193 } 194 }) 195 196 t.Run("Handle change updates database and purges cache", func(t *testing.T) { 197 testID := "handlechange" 198 oldHandle := "old." + testID + ".test" 199 newHandle := "new." + testID + ".test" 200 did := "did:plc:" + testID 201 202 // 1. Create user with old handle 203 _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 204 DID: did, 205 Handle: oldHandle, 206 PDSURL: "https://bsky.social", 207 }) 208 if err != nil { 209 t.Fatalf("failed to create initial user: %v", err) 210 } 211 212 // 2. Manually cache the identity (simulate a previous resolution) 213 cache := identity.NewPostgresCache(db, 24*time.Hour) 214 err = cache.Set(ctx, &identity.Identity{ 215 DID: did, 216 Handle: oldHandle, 217 PDSURL: "https://bsky.social", 218 Method: identity.MethodDNS, 219 ResolvedAt: time.Now().UTC(), 220 }) 221 if err != nil { 222 t.Fatalf("failed to cache identity: %v", err) 223 } 224 225 // 3. Verify cached (both handle and DID should be cached) 226 cachedByHandle, err := cache.Get(ctx, oldHandle) 227 if err != nil { 228 t.Fatalf("expected old handle to be cached, got error: %v", err) 229 } 230 if cachedByHandle.DID != did { 231 t.Errorf("expected cached DID %s, got %s", did, cachedByHandle.DID) 232 } 233 234 cachedByDID, err := cache.Get(ctx, did) 235 if err != nil { 236 t.Fatalf("expected DID to be cached, got error: %v", err) 237 } 238 if cachedByDID.Handle != oldHandle { 239 t.Errorf("expected cached handle %s, got %s", oldHandle, cachedByDID.Handle) 240 } 241 242 // 4. Send identity event with NEW handle 243 event := jetstream.JetstreamEvent{ 244 Did: did, 245 Kind: "identity", 246 Identity: &jetstream.IdentityEvent{ 247 Did: did, 248 Handle: newHandle, 249 Seq: 99999, 250 Time: time.Now().Format(time.RFC3339), 251 }, 252 } 253 254 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 255 err = consumer.HandleIdentityEventPublic(ctx, &event) 256 if err != nil { 257 t.Fatalf("failed to handle handle change event: %v", err) 258 } 259 260 // 5. Verify database updated 261 user, err := userService.GetUserByDID(ctx, did) 262 if err != nil { 263 t.Fatalf("failed to get user by DID: %v", err) 264 } 265 if user.Handle != newHandle { 266 t.Errorf("expected database to have new handle %s, got %s", newHandle, user.Handle) 267 } 268 269 // 6. Verify old handle purged from cache 270 _, err = cache.Get(ctx, oldHandle) 271 if err == nil { 272 t.Error("expected old handle to be purged from cache, but it's still cached") 273 } 274 if _, isCacheMiss := err.(*identity.ErrCacheMiss); !isCacheMiss { 275 t.Errorf("expected ErrCacheMiss for old handle, got: %v", err) 276 } 277 278 // 7. Verify DID cache purged 279 _, err = cache.Get(ctx, did) 280 if err == nil { 281 t.Error("expected DID to be purged from cache, but it's still cached") 282 } 283 if _, isCacheMiss := err.(*identity.ErrCacheMiss); !isCacheMiss { 284 t.Errorf("expected ErrCacheMiss for DID, got: %v", err) 285 } 286 287 // 8. Verify user can be found by new handle 288 userByHandle, err := userService.GetUserByHandle(ctx, newHandle) 289 if err != nil { 290 t.Fatalf("failed to get user by new handle: %v", err) 291 } 292 if userByHandle.DID != did { 293 t.Errorf("expected DID %s when looking up by new handle, got %s", did, userByHandle.DID) 294 } 295 }) 296} 297 298func TestUserServiceIdempotency(t *testing.T) { 299 db := setupTestDB(t) 300 defer db.Close() 301 302 userRepo := postgres.NewUserRepository(db) 303 resolver := identity.NewResolver(db, identity.DefaultConfig()) 304 userService := users.NewUserService(userRepo, resolver, "http://localhost:3001") 305 ctx := context.Background() 306 307 t.Run("CreateUser is idempotent for duplicate DID", func(t *testing.T) { 308 req := users.CreateUserRequest{ 309 DID: "did:plc:idempotent123", 310 Handle: "idempotent.test", 311 PDSURL: "https://bsky.social", 312 } 313 314 // First creation 315 user1, err := userService.CreateUser(ctx, req) 316 if err != nil { 317 t.Fatalf("first creation failed: %v", err) 318 } 319 320 // Second creation with same DID - should return existing user, not error 321 user2, err := userService.CreateUser(ctx, req) 322 if err != nil { 323 t.Fatalf("second creation should be idempotent: %v", err) 324 } 325 326 if user1.DID != user2.DID { 327 t.Errorf("expected same DID, got %s and %s", user1.DID, user2.DID) 328 } 329 330 if user1.CreatedAt != user2.CreatedAt { 331 t.Errorf("expected same user (same created_at), got different timestamps") 332 } 333 }) 334 335 t.Run("CreateUser fails for duplicate handle with different DID", func(t *testing.T) { 336 // Create first user 337 _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 338 DID: "did:plc:handleconflict1", 339 Handle: "conflicting.handle", 340 PDSURL: "https://bsky.social", 341 }) 342 if err != nil { 343 t.Fatalf("first creation failed: %v", err) 344 } 345 346 // Try to create different user with same handle 347 _, err = userService.CreateUser(ctx, users.CreateUserRequest{ 348 DID: "did:plc:handleconflict2", 349 Handle: "conflicting.handle", // Same handle, different DID 350 PDSURL: "https://bsky.social", 351 }) 352 353 if err == nil { 354 t.Fatal("expected error for duplicate handle, got nil") 355 } 356 357 if !contains(err.Error(), "handle already taken") { 358 t.Errorf("expected 'handle already taken' error, got: %v", err) 359 } 360 }) 361} 362 363// Helper function 364func contains(s, substr string) bool { 365 return len(s) >= len(substr) && anySubstring(s, substr) 366} 367 368func anySubstring(s, substr string) bool { 369 for i := 0; i <= len(s)-len(substr); i++ { 370 if s[i:i+len(substr)] == substr { 371 return true 372 } 373 } 374 return false 375}