A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "context" 5 "testing" 6 "time" 7 8 "Coves/internal/atproto/identity" 9 "Coves/internal/atproto/jetstream" 10 "Coves/internal/core/users" 11 "Coves/internal/db/postgres" 12) 13 14func TestUserIndexingFromJetstream(t *testing.T) { 15 db := setupTestDB(t) 16 defer func() { 17 if err := db.Close(); err != nil { 18 t.Logf("Failed to close database: %v", err) 19 } 20 }() 21 22 // Wire up dependencies 23 userRepo := postgres.NewUserRepository(db) 24 resolver := identity.NewResolver(db, identity.DefaultConfig()) 25 userService := users.NewUserService(userRepo, resolver, "http://localhost:3001") 26 27 ctx := context.Background() 28 29 t.Run("Index new user from identity event", func(t *testing.T) { 30 // Simulate an identity event from Jetstream 31 event := jetstream.JetstreamEvent{ 32 Did: "did:plc:jetstream123", 33 Kind: "identity", 34 Identity: &jetstream.IdentityEvent{ 35 Did: "did:plc:jetstream123", 36 Handle: "alice.jetstream.test", 37 Seq: 12345, 38 Time: time.Now().Format(time.RFC3339), 39 }, 40 } 41 42 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 43 44 // Handle the event 45 err := consumer.HandleIdentityEventPublic(ctx, &event) 46 if err != nil { 47 t.Fatalf("failed to handle identity event: %v", err) 48 } 49 50 // Verify user was indexed 51 user, err := userService.GetUserByDID(ctx, "did:plc:jetstream123") 52 if err != nil { 53 t.Fatalf("failed to get indexed user: %v", err) 54 } 55 56 if user.DID != "did:plc:jetstream123" { 57 t.Errorf("expected DID did:plc:jetstream123, got %s", user.DID) 58 } 59 60 if user.Handle != "alice.jetstream.test" { 61 t.Errorf("expected handle alice.jetstream.test, got %s", user.Handle) 62 } 63 }) 64 65 t.Run("Idempotent indexing - duplicate event", func(t *testing.T) { 66 // Create a user first 67 _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 68 DID: "did:plc:duplicate123", 69 Handle: "duplicate.test", 70 PDSURL: "https://bsky.social", 71 }) 72 if err != nil { 73 t.Fatalf("failed to create initial user: %v", err) 74 } 75 76 // Simulate duplicate identity event 77 event := jetstream.JetstreamEvent{ 78 Did: "did:plc:duplicate123", 79 Kind: "identity", 80 Identity: &jetstream.IdentityEvent{ 81 Did: "did:plc:duplicate123", 82 Handle: "duplicate.test", 83 Seq: 12346, 84 Time: time.Now().Format(time.RFC3339), 85 }, 86 } 87 88 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 89 90 // Handle duplicate event - should not error 91 err = consumer.HandleIdentityEventPublic(ctx, &event) 92 if err != nil { 93 t.Fatalf("duplicate event should be handled gracefully: %v", err) 94 } 95 96 // Verify still only one user 97 user, err := userService.GetUserByDID(ctx, "did:plc:duplicate123") 98 if err != nil { 99 t.Fatalf("failed to get user: %v", err) 100 } 101 102 if user.Handle != "duplicate.test" { 103 t.Errorf("expected handle duplicate.test, got %s", user.Handle) 104 } 105 }) 106 107 t.Run("Index multiple users", func(t *testing.T) { 108 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 109 110 users := []struct { 111 did string 112 handle string 113 }{ 114 {"did:plc:multi1", "user1.test"}, 115 {"did:plc:multi2", "user2.test"}, 116 {"did:plc:multi3", "user3.test"}, 117 } 118 119 for _, u := range users { 120 event := jetstream.JetstreamEvent{ 121 Did: u.did, 122 Kind: "identity", 123 Identity: &jetstream.IdentityEvent{ 124 Did: u.did, 125 Handle: u.handle, 126 Seq: 12345, 127 Time: time.Now().Format(time.RFC3339), 128 }, 129 } 130 131 err := consumer.HandleIdentityEventPublic(ctx, &event) 132 if err != nil { 133 t.Fatalf("failed to index user %s: %v", u.handle, err) 134 } 135 } 136 137 // Verify all users indexed 138 for _, u := range users { 139 user, err := userService.GetUserByDID(ctx, u.did) 140 if err != nil { 141 t.Fatalf("user %s not found: %v", u.did, err) 142 } 143 144 if user.Handle != u.handle { 145 t.Errorf("expected handle %s, got %s", u.handle, user.Handle) 146 } 147 } 148 }) 149 150 t.Run("Skip invalid events", func(t *testing.T) { 151 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 152 153 // Missing DID 154 invalidEvent1 := jetstream.JetstreamEvent{ 155 Did: "", 156 Kind: "identity", 157 Identity: &jetstream.IdentityEvent{ 158 Did: "", 159 Handle: "invalid.test", 160 Seq: 12345, 161 Time: time.Now().Format(time.RFC3339), 162 }, 163 } 164 165 err := consumer.HandleIdentityEventPublic(ctx, &invalidEvent1) 166 if err == nil { 167 t.Error("expected error for missing DID, got nil") 168 } 169 170 // Missing handle 171 invalidEvent2 := jetstream.JetstreamEvent{ 172 Did: "did:plc:invalid", 173 Kind: "identity", 174 Identity: &jetstream.IdentityEvent{ 175 Did: "did:plc:invalid", 176 Handle: "", 177 Seq: 12345, 178 Time: time.Now().Format(time.RFC3339), 179 }, 180 } 181 182 err = consumer.HandleIdentityEventPublic(ctx, &invalidEvent2) 183 if err == nil { 184 t.Error("expected error for missing handle, got nil") 185 } 186 187 // Missing identity data 188 invalidEvent3 := jetstream.JetstreamEvent{ 189 Did: "did:plc:invalid2", 190 Kind: "identity", 191 Identity: nil, 192 } 193 194 err = consumer.HandleIdentityEventPublic(ctx, &invalidEvent3) 195 if err == nil { 196 t.Error("expected error for nil identity data, got nil") 197 } 198 }) 199 200 t.Run("Handle change updates database and purges cache", func(t *testing.T) { 201 testID := "handlechange" 202 oldHandle := "old." + testID + ".test" 203 newHandle := "new." + testID + ".test" 204 did := "did:plc:" + testID 205 206 // 1. Create user with old handle 207 _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 208 DID: did, 209 Handle: oldHandle, 210 PDSURL: "https://bsky.social", 211 }) 212 if err != nil { 213 t.Fatalf("failed to create initial user: %v", err) 214 } 215 216 // 2. Manually cache the identity (simulate a previous resolution) 217 cache := identity.NewPostgresCache(db, 24*time.Hour) 218 err = cache.Set(ctx, &identity.Identity{ 219 DID: did, 220 Handle: oldHandle, 221 PDSURL: "https://bsky.social", 222 Method: identity.MethodDNS, 223 ResolvedAt: time.Now().UTC(), 224 }) 225 if err != nil { 226 t.Fatalf("failed to cache identity: %v", err) 227 } 228 229 // 3. Verify cached (both handle and DID should be cached) 230 cachedByHandle, err := cache.Get(ctx, oldHandle) 231 if err != nil { 232 t.Fatalf("expected old handle to be cached, got error: %v", err) 233 } 234 if cachedByHandle.DID != did { 235 t.Errorf("expected cached DID %s, got %s", did, cachedByHandle.DID) 236 } 237 238 cachedByDID, err := cache.Get(ctx, did) 239 if err != nil { 240 t.Fatalf("expected DID to be cached, got error: %v", err) 241 } 242 if cachedByDID.Handle != oldHandle { 243 t.Errorf("expected cached handle %s, got %s", oldHandle, cachedByDID.Handle) 244 } 245 246 // 4. Send identity event with NEW handle 247 event := jetstream.JetstreamEvent{ 248 Did: did, 249 Kind: "identity", 250 Identity: &jetstream.IdentityEvent{ 251 Did: did, 252 Handle: newHandle, 253 Seq: 99999, 254 Time: time.Now().Format(time.RFC3339), 255 }, 256 } 257 258 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 259 err = consumer.HandleIdentityEventPublic(ctx, &event) 260 if err != nil { 261 t.Fatalf("failed to handle handle change event: %v", err) 262 } 263 264 // 5. Verify database updated 265 user, err := userService.GetUserByDID(ctx, did) 266 if err != nil { 267 t.Fatalf("failed to get user by DID: %v", err) 268 } 269 if user.Handle != newHandle { 270 t.Errorf("expected database to have new handle %s, got %s", newHandle, user.Handle) 271 } 272 273 // 6. Verify old handle purged from cache 274 _, err = cache.Get(ctx, oldHandle) 275 if err == nil { 276 t.Error("expected old handle to be purged from cache, but it's still cached") 277 } 278 if _, isCacheMiss := err.(*identity.ErrCacheMiss); !isCacheMiss { 279 t.Errorf("expected ErrCacheMiss for old handle, got: %v", err) 280 } 281 282 // 7. Verify DID cache purged 283 _, err = cache.Get(ctx, did) 284 if err == nil { 285 t.Error("expected DID to be purged from cache, but it's still cached") 286 } 287 if _, isCacheMiss := err.(*identity.ErrCacheMiss); !isCacheMiss { 288 t.Errorf("expected ErrCacheMiss for DID, got: %v", err) 289 } 290 291 // 8. Verify user can be found by new handle 292 userByHandle, err := userService.GetUserByHandle(ctx, newHandle) 293 if err != nil { 294 t.Fatalf("failed to get user by new handle: %v", err) 295 } 296 if userByHandle.DID != did { 297 t.Errorf("expected DID %s when looking up by new handle, got %s", did, userByHandle.DID) 298 } 299 }) 300} 301 302func TestUserServiceIdempotency(t *testing.T) { 303 db := setupTestDB(t) 304 defer func() { 305 if err := db.Close(); err != nil { 306 t.Logf("Failed to close database: %v", err) 307 } 308 }() 309 310 userRepo := postgres.NewUserRepository(db) 311 resolver := identity.NewResolver(db, identity.DefaultConfig()) 312 userService := users.NewUserService(userRepo, resolver, "http://localhost:3001") 313 ctx := context.Background() 314 315 t.Run("CreateUser is idempotent for duplicate DID", func(t *testing.T) { 316 req := users.CreateUserRequest{ 317 DID: "did:plc:idempotent123", 318 Handle: "idempotent.test", 319 PDSURL: "https://bsky.social", 320 } 321 322 // First creation 323 user1, err := userService.CreateUser(ctx, req) 324 if err != nil { 325 t.Fatalf("first creation failed: %v", err) 326 } 327 328 // Second creation with same DID - should return existing user, not error 329 user2, err := userService.CreateUser(ctx, req) 330 if err != nil { 331 t.Fatalf("second creation should be idempotent: %v", err) 332 } 333 334 if user1.DID != user2.DID { 335 t.Errorf("expected same DID, got %s and %s", user1.DID, user2.DID) 336 } 337 338 if user1.CreatedAt != user2.CreatedAt { 339 t.Errorf("expected same user (same created_at), got different timestamps") 340 } 341 }) 342 343 t.Run("CreateUser fails for duplicate handle with different DID", func(t *testing.T) { 344 // Create first user 345 _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 346 DID: "did:plc:handleconflict1", 347 Handle: "conflicting.handle", 348 PDSURL: "https://bsky.social", 349 }) 350 if err != nil { 351 t.Fatalf("first creation failed: %v", err) 352 } 353 354 // Try to create different user with same handle 355 _, err = userService.CreateUser(ctx, users.CreateUserRequest{ 356 DID: "did:plc:handleconflict2", 357 Handle: "conflicting.handle", // Same handle, different DID 358 PDSURL: "https://bsky.social", 359 }) 360 361 if err == nil { 362 t.Fatal("expected error for duplicate handle, got nil") 363 } 364 365 if !contains(err.Error(), "handle already taken") { 366 t.Errorf("expected 'handle already taken' error, got: %v", err) 367 } 368 }) 369} 370 371// Helper functions moved to helpers.go