A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/atproto/jetstream" 5 "Coves/internal/core/communities" 6 "context" 7 "database/sql" 8 "fmt" 9 "testing" 10 "time" 11 12 postgresRepo "Coves/internal/db/postgres" 13) 14 15// TestSubscriptionIndexing_ContentVisibility tests that contentVisibility is properly indexed 16// from Jetstream events and stored in the AppView database 17func TestSubscriptionIndexing_ContentVisibility(t *testing.T) { 18 if testing.Short() { 19 t.Skip("Skipping integration test in short mode") 20 } 21 22 ctx := context.Background() 23 db := setupTestDB(t) 24 defer cleanupTestDB(t, db) 25 26 repo := createTestCommunityRepo(t, db) 27 // Skip verification in tests 28 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true) 29 30 // Create a test community first (with unique DID) 31 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 32 community := createTestCommunity(t, repo, "test-community-visibility", testDID) 33 34 t.Run("indexes subscription with contentVisibility=5", func(t *testing.T) { 35 userDID := "did:plc:test-user-123" 36 rkey := "test-sub-1" 37 uri := "at://" + userDID + "/social.coves.community.subscription/" + rkey 38 39 // Simulate Jetstream CREATE event for subscription 40 event := &jetstream.JetstreamEvent{ 41 Did: userDID, 42 Kind: "commit", 43 TimeUS: time.Now().UnixMicro(), 44 Commit: &jetstream.CommitEvent{ 45 Rev: "test-rev-1", 46 Operation: "create", 47 Collection: "social.coves.community.subscription", // CORRECT collection name 48 RKey: rkey, 49 CID: "bafytest123", 50 Record: map[string]interface{}{ 51 "$type": "social.coves.community.subscription", 52 "subject": community.DID, 53 "createdAt": time.Now().Format(time.RFC3339), 54 "contentVisibility": float64(5), // JSON numbers decode as float64 55 }, 56 }, 57 } 58 59 // Process event through consumer 60 err := consumer.HandleEvent(ctx, event) 61 if err != nil { 62 t.Fatalf("Failed to handle subscription event: %v", err) 63 } 64 65 // Verify subscription was indexed with correct contentVisibility 66 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 67 if err != nil { 68 t.Fatalf("Failed to get subscription: %v", err) 69 } 70 71 if subscription.ContentVisibility != 5 { 72 t.Errorf("Expected contentVisibility=5, got %d", subscription.ContentVisibility) 73 } 74 75 if subscription.UserDID != userDID { 76 t.Errorf("Expected userDID=%s, got %s", userDID, subscription.UserDID) 77 } 78 79 if subscription.CommunityDID != community.DID { 80 t.Errorf("Expected communityDID=%s, got %s", community.DID, subscription.CommunityDID) 81 } 82 83 if subscription.RecordURI != uri { 84 t.Errorf("Expected recordURI=%s, got %s", uri, subscription.RecordURI) 85 } 86 87 t.Logf("✓ Subscription indexed with contentVisibility=5") 88 }) 89 90 t.Run("defaults to contentVisibility=3 when not provided", func(t *testing.T) { 91 userDID := "did:plc:test-user-default" 92 rkey := "test-sub-default" 93 94 // Simulate Jetstream CREATE event WITHOUT contentVisibility field 95 event := &jetstream.JetstreamEvent{ 96 Did: userDID, 97 Kind: "commit", 98 TimeUS: time.Now().UnixMicro(), 99 Commit: &jetstream.CommitEvent{ 100 Rev: "test-rev-default", 101 Operation: "create", 102 Collection: "social.coves.community.subscription", 103 RKey: rkey, 104 CID: "bafydefault", 105 Record: map[string]interface{}{ 106 "$type": "social.coves.community.subscription", 107 "subject": community.DID, 108 "createdAt": time.Now().Format(time.RFC3339), 109 // contentVisibility NOT provided 110 }, 111 }, 112 } 113 114 // Process event 115 err := consumer.HandleEvent(ctx, event) 116 if err != nil { 117 t.Fatalf("Failed to handle subscription event: %v", err) 118 } 119 120 // Verify defaults to 3 121 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 122 if err != nil { 123 t.Fatalf("Failed to get subscription: %v", err) 124 } 125 126 if subscription.ContentVisibility != 3 { 127 t.Errorf("Expected contentVisibility=3 (default), got %d", subscription.ContentVisibility) 128 } 129 130 t.Logf("✓ Subscription defaulted to contentVisibility=3") 131 }) 132 133 t.Run("clamps contentVisibility to valid range (1-5)", func(t *testing.T) { 134 testCases := []struct { 135 name string 136 input float64 137 expected int 138 }{ 139 {input: 0, expected: 1, name: "zero clamped to 1"}, 140 {input: -5, expected: 1, name: "negative clamped to 1"}, 141 {input: 10, expected: 5, name: "10 clamped to 5"}, 142 {input: 100, expected: 5, name: "100 clamped to 5"}, 143 {input: 1, expected: 1, name: "1 stays 1"}, 144 {input: 3, expected: 3, name: "3 stays 3"}, 145 {input: 5, expected: 5, name: "5 stays 5"}, 146 } 147 148 for i, tc := range testCases { 149 t.Run(tc.name, func(t *testing.T) { 150 userDID := fmt.Sprintf("did:plc:test-clamp-%d", i) 151 rkey := fmt.Sprintf("test-sub-clamp-%d", i) 152 153 event := &jetstream.JetstreamEvent{ 154 Did: userDID, 155 Kind: "commit", 156 TimeUS: time.Now().UnixMicro(), 157 Commit: &jetstream.CommitEvent{ 158 Rev: "test-rev-clamp", 159 Operation: "create", 160 Collection: "social.coves.community.subscription", 161 RKey: rkey, 162 CID: "bafyclamp", 163 Record: map[string]interface{}{ 164 "$type": "social.coves.community.subscription", 165 "subject": community.DID, 166 "createdAt": time.Now().Format(time.RFC3339), 167 "contentVisibility": tc.input, 168 }, 169 }, 170 } 171 172 err := consumer.HandleEvent(ctx, event) 173 if err != nil { 174 t.Fatalf("Failed to handle subscription event: %v", err) 175 } 176 177 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 178 if err != nil { 179 t.Fatalf("Failed to get subscription: %v", err) 180 } 181 182 if subscription.ContentVisibility != tc.expected { 183 t.Errorf("Input %.0f: expected %d, got %d", tc.input, tc.expected, subscription.ContentVisibility) 184 } 185 186 t.Logf("✓ Input %.0f clamped to %d", tc.input, subscription.ContentVisibility) 187 }) 188 } 189 }) 190 191 t.Run("idempotency: duplicate subscription events don't fail", func(t *testing.T) { 192 userDID := "did:plc:test-idempotent" 193 rkey := "test-sub-idempotent" 194 195 event := &jetstream.JetstreamEvent{ 196 Did: userDID, 197 Kind: "commit", 198 TimeUS: time.Now().UnixMicro(), 199 Commit: &jetstream.CommitEvent{ 200 Rev: "test-rev-idempotent", 201 Operation: "create", 202 Collection: "social.coves.community.subscription", 203 RKey: rkey, 204 CID: "bafyidempotent", 205 Record: map[string]interface{}{ 206 "$type": "social.coves.community.subscription", 207 "subject": community.DID, 208 "createdAt": time.Now().Format(time.RFC3339), 209 "contentVisibility": float64(4), 210 }, 211 }, 212 } 213 214 // Process first time 215 err := consumer.HandleEvent(ctx, event) 216 if err != nil { 217 t.Fatalf("Failed to handle first subscription event: %v", err) 218 } 219 220 // Process again (Jetstream replay scenario) 221 err = consumer.HandleEvent(ctx, event) 222 if err != nil { 223 t.Errorf("Idempotency failed: second event should not error, got: %v", err) 224 } 225 226 // Verify only one subscription exists 227 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 228 if err != nil { 229 t.Fatalf("Failed to get subscription: %v", err) 230 } 231 232 if subscription.ContentVisibility != 4 { 233 t.Errorf("Expected contentVisibility=4, got %d", subscription.ContentVisibility) 234 } 235 236 t.Logf("✓ Duplicate events handled idempotently") 237 }) 238} 239 240// TestSubscriptionIndexing_DeleteOperations tests unsubscribe (DELETE) event handling 241func TestSubscriptionIndexing_DeleteOperations(t *testing.T) { 242 if testing.Short() { 243 t.Skip("Skipping integration test in short mode") 244 } 245 246 ctx := context.Background() 247 db := setupTestDB(t) 248 defer cleanupTestDB(t, db) 249 250 repo := createTestCommunityRepo(t, db) 251 // Skip verification in tests 252 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true) 253 254 // Create test community (with unique DID) 255 testDID := fmt.Sprintf("did:plc:test-unsub-%d", time.Now().UnixNano()) 256 community := createTestCommunity(t, repo, "test-unsubscribe", testDID) 257 258 t.Run("deletes subscription when DELETE event received", func(t *testing.T) { 259 userDID := "did:plc:test-user-delete" 260 rkey := "test-sub-delete" 261 262 // First, create a subscription 263 createEvent := &jetstream.JetstreamEvent{ 264 Did: userDID, 265 Kind: "commit", 266 TimeUS: time.Now().UnixMicro(), 267 Commit: &jetstream.CommitEvent{ 268 Rev: "test-rev-create", 269 Operation: "create", 270 Collection: "social.coves.community.subscription", 271 RKey: rkey, 272 CID: "bafycreate", 273 Record: map[string]interface{}{ 274 "$type": "social.coves.community.subscription", 275 "subject": community.DID, 276 "createdAt": time.Now().Format(time.RFC3339), 277 "contentVisibility": float64(3), 278 }, 279 }, 280 } 281 282 err := consumer.HandleEvent(ctx, createEvent) 283 if err != nil { 284 t.Fatalf("Failed to create subscription: %v", err) 285 } 286 287 // Verify subscription exists 288 _, err = repo.GetSubscription(ctx, userDID, community.DID) 289 if err != nil { 290 t.Fatalf("Subscription should exist: %v", err) 291 } 292 293 // Now send DELETE event (unsubscribe) 294 // IMPORTANT: DELETE operations don't include record data in Jetstream 295 deleteEvent := &jetstream.JetstreamEvent{ 296 Did: userDID, 297 Kind: "commit", 298 TimeUS: time.Now().UnixMicro(), 299 Commit: &jetstream.CommitEvent{ 300 Rev: "test-rev-delete", 301 Operation: "delete", 302 Collection: "social.coves.community.subscription", 303 RKey: rkey, 304 CID: "", // No CID on deletes 305 Record: nil, // No record data on deletes 306 }, 307 } 308 309 err = consumer.HandleEvent(ctx, deleteEvent) 310 if err != nil { 311 t.Fatalf("Failed to handle delete event: %v", err) 312 } 313 314 // Verify subscription was deleted 315 _, err = repo.GetSubscription(ctx, userDID, community.DID) 316 if err == nil { 317 t.Errorf("Subscription should have been deleted") 318 } 319 if !communities.IsNotFound(err) { 320 t.Errorf("Expected NotFound error, got: %v", err) 321 } 322 323 t.Logf("✓ Subscription deleted successfully") 324 }) 325 326 t.Run("idempotent delete: deleting non-existent subscription doesn't fail", func(t *testing.T) { 327 userDID := "did:plc:test-user-noexist" 328 rkey := "test-sub-noexist" 329 330 // Try to delete a subscription that doesn't exist 331 deleteEvent := &jetstream.JetstreamEvent{ 332 Did: userDID, 333 Kind: "commit", 334 TimeUS: time.Now().UnixMicro(), 335 Commit: &jetstream.CommitEvent{ 336 Rev: "test-rev-noexist", 337 Operation: "delete", 338 Collection: "social.coves.community.subscription", 339 RKey: rkey, 340 CID: "", 341 Record: nil, 342 }, 343 } 344 345 // Should not error (idempotent) 346 err := consumer.HandleEvent(ctx, deleteEvent) 347 if err != nil { 348 t.Errorf("Deleting non-existent subscription should not error, got: %v", err) 349 } 350 351 t.Logf("✓ Idempotent delete handled gracefully") 352 }) 353} 354 355// TestSubscriptionIndexing_SubscriberCount tests that subscriber counts are updated atomically 356func TestSubscriptionIndexing_SubscriberCount(t *testing.T) { 357 if testing.Short() { 358 t.Skip("Skipping integration test in short mode") 359 } 360 361 ctx := context.Background() 362 db := setupTestDB(t) 363 defer cleanupTestDB(t, db) 364 365 repo := createTestCommunityRepo(t, db) 366 // Skip verification in tests 367 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true) 368 369 // Create test community (with unique DID) 370 testDID := fmt.Sprintf("did:plc:test-subcount-%d", time.Now().UnixNano()) 371 community := createTestCommunity(t, repo, "test-subscriber-count", testDID) 372 373 // Verify initial subscriber count is 0 374 comm, err := repo.GetByDID(ctx, community.DID) 375 if err != nil { 376 t.Fatalf("Failed to get community: %v", err) 377 } 378 if comm.SubscriberCount != 0 { 379 t.Errorf("Initial subscriber count should be 0, got %d", comm.SubscriberCount) 380 } 381 382 t.Run("increments subscriber count on subscribe", func(t *testing.T) { 383 userDID := "did:plc:test-user-count1" 384 rkey := "test-sub-count1" 385 386 event := &jetstream.JetstreamEvent{ 387 Did: userDID, 388 Kind: "commit", 389 TimeUS: time.Now().UnixMicro(), 390 Commit: &jetstream.CommitEvent{ 391 Rev: "test-rev-count", 392 Operation: "create", 393 Collection: "social.coves.community.subscription", 394 RKey: rkey, 395 CID: "bafycount", 396 Record: map[string]interface{}{ 397 "$type": "social.coves.community.subscription", 398 "subject": community.DID, 399 "createdAt": time.Now().Format(time.RFC3339), 400 "contentVisibility": float64(3), 401 }, 402 }, 403 } 404 405 err := consumer.HandleEvent(ctx, event) 406 if err != nil { 407 t.Fatalf("Failed to handle subscription: %v", err) 408 } 409 410 // Check subscriber count incremented 411 comm, err := repo.GetByDID(ctx, community.DID) 412 if err != nil { 413 t.Fatalf("Failed to get community: %v", err) 414 } 415 416 if comm.SubscriberCount != 1 { 417 t.Errorf("Subscriber count should be 1, got %d", comm.SubscriberCount) 418 } 419 420 t.Logf("✓ Subscriber count incremented to 1") 421 }) 422 423 t.Run("decrements subscriber count on unsubscribe", func(t *testing.T) { 424 userDID := "did:plc:test-user-count1" // Same user from above 425 rkey := "test-sub-count1" 426 427 // Send DELETE event 428 deleteEvent := &jetstream.JetstreamEvent{ 429 Did: userDID, 430 Kind: "commit", 431 TimeUS: time.Now().UnixMicro(), 432 Commit: &jetstream.CommitEvent{ 433 Rev: "test-rev-unsub", 434 Operation: "delete", 435 Collection: "social.coves.community.subscription", 436 RKey: rkey, 437 CID: "", 438 Record: nil, 439 }, 440 } 441 442 err := consumer.HandleEvent(ctx, deleteEvent) 443 if err != nil { 444 t.Fatalf("Failed to handle unsubscribe: %v", err) 445 } 446 447 // Check subscriber count decremented back to 0 448 comm, err := repo.GetByDID(ctx, community.DID) 449 if err != nil { 450 t.Fatalf("Failed to get community: %v", err) 451 } 452 453 if comm.SubscriberCount != 0 { 454 t.Errorf("Subscriber count should be 0, got %d", comm.SubscriberCount) 455 } 456 457 t.Logf("✓ Subscriber count decremented to 0") 458 }) 459} 460 461// Helper functions 462 463func createTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community { 464 t.Helper() 465 466 // Add timestamp to make handles unique across test runs 467 uniqueHandle := fmt.Sprintf("%s-%d.test.coves.social", name, time.Now().UnixNano()) 468 469 community := &communities.Community{ 470 DID: did, 471 Handle: uniqueHandle, 472 Name: name, 473 DisplayName: "Test Community " + name, 474 Description: "Test community for subscription indexing", 475 OwnerDID: did, 476 CreatedByDID: "did:plc:test-creator", 477 HostedByDID: "did:plc:test-instance", 478 Visibility: "public", 479 CreatedAt: time.Now(), 480 UpdatedAt: time.Now(), 481 } 482 483 created, err := repo.Create(context.Background(), community) 484 if err != nil { 485 t.Fatalf("Failed to create test community: %v", err) 486 } 487 488 return created 489} 490 491func createTestCommunityRepo(t *testing.T, db interface{}) communities.Repository { 492 t.Helper() 493 // Import the postgres package to create a repo 494 return postgresRepo.NewCommunityRepository(db.(*sql.DB)) 495} 496 497func cleanupTestDB(t *testing.T, db interface{}) { 498 t.Helper() 499 sqlDB := db.(*sql.DB) 500 if err := sqlDB.Close(); err != nil { 501 t.Logf("Failed to close database: %v", err) 502 } 503}