A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/atproto/identity" 5 "Coves/internal/atproto/jetstream" 6 "Coves/internal/core/users" 7 "Coves/internal/db/postgres" 8 "context" 9 "testing" 10 "time" 11) 12 13func TestUserIndexingFromJetstream(t *testing.T) { 14 db := setupTestDB(t) 15 defer func() { 16 if err := db.Close(); err != nil { 17 t.Logf("Failed to close database: %v", err) 18 } 19 }() 20 21 // Wire up dependencies 22 userRepo := postgres.NewUserRepository(db) 23 resolver := identity.NewResolver(db, identity.DefaultConfig()) 24 userService := users.NewUserService(userRepo, resolver, "http://localhost:3001") 25 26 ctx := context.Background() 27 28 t.Run("Index new user from identity event", func(t *testing.T) { 29 // Simulate an identity event from Jetstream 30 event := jetstream.JetstreamEvent{ 31 Did: "did:plc:jetstream123", 32 Kind: "identity", 33 Identity: &jetstream.IdentityEvent{ 34 Did: "did:plc:jetstream123", 35 Handle: "alice.jetstream.test", 36 Seq: 12345, 37 Time: time.Now().Format(time.RFC3339), 38 }, 39 } 40 41 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 42 43 // Handle the event 44 err := consumer.HandleIdentityEventPublic(ctx, &event) 45 if err != nil { 46 t.Fatalf("failed to handle identity event: %v", err) 47 } 48 49 // Verify user was indexed 50 user, err := userService.GetUserByDID(ctx, "did:plc:jetstream123") 51 if err != nil { 52 t.Fatalf("failed to get indexed user: %v", err) 53 } 54 55 if user.DID != "did:plc:jetstream123" { 56 t.Errorf("expected DID did:plc:jetstream123, got %s", user.DID) 57 } 58 59 if user.Handle != "alice.jetstream.test" { 60 t.Errorf("expected handle alice.jetstream.test, got %s", user.Handle) 61 } 62 }) 63 64 t.Run("Idempotent indexing - duplicate event", func(t *testing.T) { 65 // Create a user first 66 _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 67 DID: "did:plc:duplicate123", 68 Handle: "duplicate.test", 69 PDSURL: "https://bsky.social", 70 }) 71 if err != nil { 72 t.Fatalf("failed to create initial user: %v", err) 73 } 74 75 // Simulate duplicate identity event 76 event := jetstream.JetstreamEvent{ 77 Did: "did:plc:duplicate123", 78 Kind: "identity", 79 Identity: &jetstream.IdentityEvent{ 80 Did: "did:plc:duplicate123", 81 Handle: "duplicate.test", 82 Seq: 12346, 83 Time: time.Now().Format(time.RFC3339), 84 }, 85 } 86 87 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 88 89 // Handle duplicate event - should not error 90 err = consumer.HandleIdentityEventPublic(ctx, &event) 91 if err != nil { 92 t.Fatalf("duplicate event should be handled gracefully: %v", err) 93 } 94 95 // Verify still only one user 96 user, err := userService.GetUserByDID(ctx, "did:plc:duplicate123") 97 if err != nil { 98 t.Fatalf("failed to get user: %v", err) 99 } 100 101 if user.Handle != "duplicate.test" { 102 t.Errorf("expected handle duplicate.test, got %s", user.Handle) 103 } 104 }) 105 106 t.Run("Index multiple users", func(t *testing.T) { 107 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 108 109 users := []struct { 110 did string 111 handle string 112 }{ 113 {"did:plc:multi1", "user1.test"}, 114 {"did:plc:multi2", "user2.test"}, 115 {"did:plc:multi3", "user3.test"}, 116 } 117 118 for _, u := range users { 119 event := jetstream.JetstreamEvent{ 120 Did: u.did, 121 Kind: "identity", 122 Identity: &jetstream.IdentityEvent{ 123 Did: u.did, 124 Handle: u.handle, 125 Seq: 12345, 126 Time: time.Now().Format(time.RFC3339), 127 }, 128 } 129 130 err := consumer.HandleIdentityEventPublic(ctx, &event) 131 if err != nil { 132 t.Fatalf("failed to index user %s: %v", u.handle, err) 133 } 134 } 135 136 // Verify all users indexed 137 for _, u := range users { 138 user, err := userService.GetUserByDID(ctx, u.did) 139 if err != nil { 140 t.Fatalf("user %s not found: %v", u.did, err) 141 } 142 143 if user.Handle != u.handle { 144 t.Errorf("expected handle %s, got %s", u.handle, user.Handle) 145 } 146 } 147 }) 148 149 t.Run("Skip invalid events", func(t *testing.T) { 150 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 151 152 // Missing DID 153 invalidEvent1 := jetstream.JetstreamEvent{ 154 Did: "", 155 Kind: "identity", 156 Identity: &jetstream.IdentityEvent{ 157 Did: "", 158 Handle: "invalid.test", 159 Seq: 12345, 160 Time: time.Now().Format(time.RFC3339), 161 }, 162 } 163 164 err := consumer.HandleIdentityEventPublic(ctx, &invalidEvent1) 165 if err == nil { 166 t.Error("expected error for missing DID, got nil") 167 } 168 169 // Missing handle 170 invalidEvent2 := jetstream.JetstreamEvent{ 171 Did: "did:plc:invalid", 172 Kind: "identity", 173 Identity: &jetstream.IdentityEvent{ 174 Did: "did:plc:invalid", 175 Handle: "", 176 Seq: 12345, 177 Time: time.Now().Format(time.RFC3339), 178 }, 179 } 180 181 err = consumer.HandleIdentityEventPublic(ctx, &invalidEvent2) 182 if err == nil { 183 t.Error("expected error for missing handle, got nil") 184 } 185 186 // Missing identity data 187 invalidEvent3 := jetstream.JetstreamEvent{ 188 Did: "did:plc:invalid2", 189 Kind: "identity", 190 Identity: nil, 191 } 192 193 err = consumer.HandleIdentityEventPublic(ctx, &invalidEvent3) 194 if err == nil { 195 t.Error("expected error for nil identity data, got nil") 196 } 197 }) 198 199 t.Run("Handle change updates database and purges cache", func(t *testing.T) { 200 testID := "handlechange" 201 oldHandle := "old." + testID + ".test" 202 newHandle := "new." + testID + ".test" 203 did := "did:plc:" + testID 204 205 // 1. Create user with old handle 206 _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 207 DID: did, 208 Handle: oldHandle, 209 PDSURL: "https://bsky.social", 210 }) 211 if err != nil { 212 t.Fatalf("failed to create initial user: %v", err) 213 } 214 215 // 2. Manually cache the identity (simulate a previous resolution) 216 cache := identity.NewPostgresCache(db, 24*time.Hour) 217 err = cache.Set(ctx, &identity.Identity{ 218 DID: did, 219 Handle: oldHandle, 220 PDSURL: "https://bsky.social", 221 Method: identity.MethodDNS, 222 ResolvedAt: time.Now().UTC(), 223 }) 224 if err != nil { 225 t.Fatalf("failed to cache identity: %v", err) 226 } 227 228 // 3. Verify cached (both handle and DID should be cached) 229 cachedByHandle, err := cache.Get(ctx, oldHandle) 230 if err != nil { 231 t.Fatalf("expected old handle to be cached, got error: %v", err) 232 } 233 if cachedByHandle.DID != did { 234 t.Errorf("expected cached DID %s, got %s", did, cachedByHandle.DID) 235 } 236 237 cachedByDID, err := cache.Get(ctx, did) 238 if err != nil { 239 t.Fatalf("expected DID to be cached, got error: %v", err) 240 } 241 if cachedByDID.Handle != oldHandle { 242 t.Errorf("expected cached handle %s, got %s", oldHandle, cachedByDID.Handle) 243 } 244 245 // 4. Send identity event with NEW handle 246 event := jetstream.JetstreamEvent{ 247 Did: did, 248 Kind: "identity", 249 Identity: &jetstream.IdentityEvent{ 250 Did: did, 251 Handle: newHandle, 252 Seq: 99999, 253 Time: time.Now().Format(time.RFC3339), 254 }, 255 } 256 257 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 258 err = consumer.HandleIdentityEventPublic(ctx, &event) 259 if err != nil { 260 t.Fatalf("failed to handle handle change event: %v", err) 261 } 262 263 // 5. Verify database updated 264 user, err := userService.GetUserByDID(ctx, did) 265 if err != nil { 266 t.Fatalf("failed to get user by DID: %v", err) 267 } 268 if user.Handle != newHandle { 269 t.Errorf("expected database to have new handle %s, got %s", newHandle, user.Handle) 270 } 271 272 // 6. Verify old handle purged from cache 273 _, err = cache.Get(ctx, oldHandle) 274 if err == nil { 275 t.Error("expected old handle to be purged from cache, but it's still cached") 276 } 277 if _, isCacheMiss := err.(*identity.ErrCacheMiss); !isCacheMiss { 278 t.Errorf("expected ErrCacheMiss for old handle, got: %v", err) 279 } 280 281 // 7. Verify DID cache purged 282 _, err = cache.Get(ctx, did) 283 if err == nil { 284 t.Error("expected DID to be purged from cache, but it's still cached") 285 } 286 if _, isCacheMiss := err.(*identity.ErrCacheMiss); !isCacheMiss { 287 t.Errorf("expected ErrCacheMiss for DID, got: %v", err) 288 } 289 290 // 8. Verify user can be found by new handle 291 userByHandle, err := userService.GetUserByHandle(ctx, newHandle) 292 if err != nil { 293 t.Fatalf("failed to get user by new handle: %v", err) 294 } 295 if userByHandle.DID != did { 296 t.Errorf("expected DID %s when looking up by new handle, got %s", did, userByHandle.DID) 297 } 298 }) 299} 300 301func TestUserServiceIdempotency(t *testing.T) { 302 db := setupTestDB(t) 303 defer func() { 304 if err := db.Close(); err != nil { 305 t.Logf("Failed to close database: %v", err) 306 } 307 }() 308 309 userRepo := postgres.NewUserRepository(db) 310 resolver := identity.NewResolver(db, identity.DefaultConfig()) 311 userService := users.NewUserService(userRepo, resolver, "http://localhost:3001") 312 ctx := context.Background() 313 314 t.Run("CreateUser is idempotent for duplicate DID", func(t *testing.T) { 315 req := users.CreateUserRequest{ 316 DID: "did:plc:idempotent123", 317 Handle: "idempotent.test", 318 PDSURL: "https://bsky.social", 319 } 320 321 // First creation 322 user1, err := userService.CreateUser(ctx, req) 323 if err != nil { 324 t.Fatalf("first creation failed: %v", err) 325 } 326 327 // Second creation with same DID - should return existing user, not error 328 user2, err := userService.CreateUser(ctx, req) 329 if err != nil { 330 t.Fatalf("second creation should be idempotent: %v", err) 331 } 332 333 if user1.DID != user2.DID { 334 t.Errorf("expected same DID, got %s and %s", user1.DID, user2.DID) 335 } 336 337 if user1.CreatedAt != user2.CreatedAt { 338 t.Errorf("expected same user (same created_at), got different timestamps") 339 } 340 }) 341 342 t.Run("CreateUser fails for duplicate handle with different DID", func(t *testing.T) { 343 // Create first user 344 _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 345 DID: "did:plc:handleconflict1", 346 Handle: "conflicting.handle", 347 PDSURL: "https://bsky.social", 348 }) 349 if err != nil { 350 t.Fatalf("first creation failed: %v", err) 351 } 352 353 // Try to create different user with same handle 354 _, err = userService.CreateUser(ctx, users.CreateUserRequest{ 355 DID: "did:plc:handleconflict2", 356 Handle: "conflicting.handle", // Same handle, different DID 357 PDSURL: "https://bsky.social", 358 }) 359 360 if err == nil { 361 t.Fatal("expected error for duplicate handle, got nil") 362 } 363 364 if !contains(err.Error(), "handle already taken") { 365 t.Errorf("expected 'handle already taken' error, got: %v", err) 366 } 367 }) 368} 369 370// Helper functions moved to helpers.go