A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "context" 5 "testing" 6 "time" 7 8 "Coves/internal/core/users" 9 "Coves/internal/db/postgres" 10 "Coves/internal/jetstream" 11) 12 13func TestUserIndexingFromJetstream(t *testing.T) { 14 db := setupTestDB(t) 15 defer db.Close() 16 17 // Wire up dependencies 18 userRepo := postgres.NewUserRepository(db) 19 userService := users.NewUserService(userRepo, "http://localhost:3001") 20 21 ctx := context.Background() 22 23 t.Run("Index new user from identity event", func(t *testing.T) { 24 // Simulate an identity event from Jetstream 25 event := jetstream.JetstreamEvent{ 26 Did: "did:plc:jetstream123", 27 Kind: "identity", 28 Identity: &jetstream.IdentityEvent{ 29 Did: "did:plc:jetstream123", 30 Handle: "alice.jetstream.test", 31 Seq: 12345, 32 Time: time.Now().Format(time.RFC3339), 33 }, 34 } 35 36 consumer := jetstream.NewUserEventConsumer(userService, "", "") 37 38 // Handle the event 39 err := consumer.HandleIdentityEventPublic(ctx, &event) 40 if err != nil { 41 t.Fatalf("failed to handle identity event: %v", err) 42 } 43 44 // Verify user was indexed 45 user, err := userService.GetUserByDID(ctx, "did:plc:jetstream123") 46 if err != nil { 47 t.Fatalf("failed to get indexed user: %v", err) 48 } 49 50 if user.DID != "did:plc:jetstream123" { 51 t.Errorf("expected DID did:plc:jetstream123, got %s", user.DID) 52 } 53 54 if user.Handle != "alice.jetstream.test" { 55 t.Errorf("expected handle alice.jetstream.test, got %s", user.Handle) 56 } 57 }) 58 59 t.Run("Idempotent indexing - duplicate event", func(t *testing.T) { 60 // Create a user first 61 _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 62 DID: "did:plc:duplicate123", 63 Handle: "duplicate.test", 64 PDSURL: "https://bsky.social", 65 }) 66 if err != nil { 67 t.Fatalf("failed to create initial user: %v", err) 68 } 69 70 // Simulate duplicate identity event 71 event := jetstream.JetstreamEvent{ 72 Did: "did:plc:duplicate123", 73 Kind: "identity", 74 Identity: &jetstream.IdentityEvent{ 75 Did: "did:plc:duplicate123", 76 Handle: "duplicate.test", 77 Seq: 12346, 78 Time: time.Now().Format(time.RFC3339), 79 }, 80 } 81 82 consumer := jetstream.NewUserEventConsumer(userService, "", "") 83 84 // Handle duplicate event - should not error 85 err = consumer.HandleIdentityEventPublic(ctx, &event) 86 if err != nil { 87 t.Fatalf("duplicate event should be handled gracefully: %v", err) 88 } 89 90 // Verify still only one user 91 user, err := userService.GetUserByDID(ctx, "did:plc:duplicate123") 92 if err != nil { 93 t.Fatalf("failed to get user: %v", err) 94 } 95 96 if user.Handle != "duplicate.test" { 97 t.Errorf("expected handle duplicate.test, got %s", user.Handle) 98 } 99 }) 100 101 t.Run("Index multiple users", func(t *testing.T) { 102 consumer := jetstream.NewUserEventConsumer(userService, "", "") 103 104 users := []struct { 105 did string 106 handle string 107 }{ 108 {"did:plc:multi1", "user1.test"}, 109 {"did:plc:multi2", "user2.test"}, 110 {"did:plc:multi3", "user3.test"}, 111 } 112 113 for _, u := range users { 114 event := jetstream.JetstreamEvent{ 115 Did: u.did, 116 Kind: "identity", 117 Identity: &jetstream.IdentityEvent{ 118 Did: u.did, 119 Handle: u.handle, 120 Seq: 12345, 121 Time: time.Now().Format(time.RFC3339), 122 }, 123 } 124 125 err := consumer.HandleIdentityEventPublic(ctx, &event) 126 if err != nil { 127 t.Fatalf("failed to index user %s: %v", u.handle, err) 128 } 129 } 130 131 // Verify all users indexed 132 for _, u := range users { 133 user, err := userService.GetUserByDID(ctx, u.did) 134 if err != nil { 135 t.Fatalf("user %s not found: %v", u.did, err) 136 } 137 138 if user.Handle != u.handle { 139 t.Errorf("expected handle %s, got %s", u.handle, user.Handle) 140 } 141 } 142 }) 143 144 t.Run("Skip invalid events", func(t *testing.T) { 145 consumer := jetstream.NewUserEventConsumer(userService, "", "") 146 147 // Missing DID 148 invalidEvent1 := jetstream.JetstreamEvent{ 149 Did: "", 150 Kind: "identity", 151 Identity: &jetstream.IdentityEvent{ 152 Did: "", 153 Handle: "invalid.test", 154 Seq: 12345, 155 Time: time.Now().Format(time.RFC3339), 156 }, 157 } 158 159 err := consumer.HandleIdentityEventPublic(ctx, &invalidEvent1) 160 if err == nil { 161 t.Error("expected error for missing DID, got nil") 162 } 163 164 // Missing handle 165 invalidEvent2 := jetstream.JetstreamEvent{ 166 Did: "did:plc:invalid", 167 Kind: "identity", 168 Identity: &jetstream.IdentityEvent{ 169 Did: "did:plc:invalid", 170 Handle: "", 171 Seq: 12345, 172 Time: time.Now().Format(time.RFC3339), 173 }, 174 } 175 176 err = consumer.HandleIdentityEventPublic(ctx, &invalidEvent2) 177 if err == nil { 178 t.Error("expected error for missing handle, got nil") 179 } 180 181 // Missing identity data 182 invalidEvent3 := jetstream.JetstreamEvent{ 183 Did: "did:plc:invalid2", 184 Kind: "identity", 185 Identity: nil, 186 } 187 188 err = consumer.HandleIdentityEventPublic(ctx, &invalidEvent3) 189 if err == nil { 190 t.Error("expected error for nil identity data, got nil") 191 } 192 }) 193} 194 195func TestUserServiceIdempotency(t *testing.T) { 196 db := setupTestDB(t) 197 defer db.Close() 198 199 userRepo := postgres.NewUserRepository(db) 200 userService := users.NewUserService(userRepo, "http://localhost:3001") 201 ctx := context.Background() 202 203 t.Run("CreateUser is idempotent for duplicate DID", func(t *testing.T) { 204 req := users.CreateUserRequest{ 205 DID: "did:plc:idempotent123", 206 Handle: "idempotent.test", 207 PDSURL: "https://bsky.social", 208 } 209 210 // First creation 211 user1, err := userService.CreateUser(ctx, req) 212 if err != nil { 213 t.Fatalf("first creation failed: %v", err) 214 } 215 216 // Second creation with same DID - should return existing user, not error 217 user2, err := userService.CreateUser(ctx, req) 218 if err != nil { 219 t.Fatalf("second creation should be idempotent: %v", err) 220 } 221 222 if user1.DID != user2.DID { 223 t.Errorf("expected same DID, got %s and %s", user1.DID, user2.DID) 224 } 225 226 if user1.CreatedAt != user2.CreatedAt { 227 t.Errorf("expected same user (same created_at), got different timestamps") 228 } 229 }) 230 231 t.Run("CreateUser fails for duplicate handle with different DID", func(t *testing.T) { 232 // Create first user 233 _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 234 DID: "did:plc:handleconflict1", 235 Handle: "conflicting.handle", 236 PDSURL: "https://bsky.social", 237 }) 238 if err != nil { 239 t.Fatalf("first creation failed: %v", err) 240 } 241 242 // Try to create different user with same handle 243 _, err = userService.CreateUser(ctx, users.CreateUserRequest{ 244 DID: "did:plc:handleconflict2", 245 Handle: "conflicting.handle", // Same handle, different DID 246 PDSURL: "https://bsky.social", 247 }) 248 249 if err == nil { 250 t.Fatal("expected error for duplicate handle, got nil") 251 } 252 253 if !contains(err.Error(), "handle already taken") { 254 t.Errorf("expected 'handle already taken' error, got: %v", err) 255 } 256 }) 257} 258 259// Helper function 260func contains(s, substr string) bool { 261 return len(s) >= len(substr) && anySubstring(s, substr) 262} 263 264func anySubstring(s, substr string) bool { 265 for i := 0; i <= len(s)-len(substr); i++ { 266 if s[i:i+len(substr)] == substr { 267 return true 268 } 269 } 270 return false 271}