A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/api/handlers/aggregator" 5 "Coves/internal/api/handlers/post" 6 "Coves/internal/atproto/identity" 7 "Coves/internal/atproto/jetstream" 8 "Coves/internal/core/aggregators" 9 "Coves/internal/core/communities" 10 "Coves/internal/core/posts" 11 "Coves/internal/core/users" 12 "Coves/internal/db/postgres" 13 "bytes" 14 "context" 15 "database/sql" 16 "encoding/json" 17 "fmt" 18 "net/http" 19 "net/http/httptest" 20 "os" 21 "strings" 22 "testing" 23 "time" 24 25 _ "github.com/lib/pq" 26 "github.com/pressly/goose/v3" 27 "github.com/stretchr/testify/assert" 28 "github.com/stretchr/testify/require" 29) 30 31// TestAggregator_E2E_WithJetstream tests the complete aggregator flow with real PDS: 32// 1. Service Declaration: Create aggregator account → Write service record → Jetstream → AppView DB 33// 2. Authorization: Create community account → Write authorization record → Jetstream → AppView DB 34// 3. Post Creation: Aggregator creates post → Validates authorization + rate limits → PDS → Jetstream → AppView 35// 4. Query Endpoints: Verify XRPC handlers return correct data from AppView 36// 37// This tests the REAL atProto flow: 38// - Real accounts created on PDS 39// - Real records written via XRPC 40// - Simulated Jetstream events (for test speed - testing AppView indexing, not Jetstream itself) 41// - AppView indexes and serves data via XRPC 42// 43// NOTE: Requires PDS running at http://localhost:3001 44func TestAggregator_E2E_WithJetstream(t *testing.T) { 45 // Check if PDS is available 46 pdsURL := "http://localhost:3001" 47 resp, err := http.Get(pdsURL + "/xrpc/_health") 48 if err != nil || resp.StatusCode != http.StatusOK { 49 t.Skipf("PDS not available at %s - run 'make dev-up' to start it", pdsURL) 50 } 51 if resp != nil { 52 _ = resp.Body.Close() 53 } 54 db := setupTestDB(t) 55 defer func() { 56 if err := db.Close(); err != nil { 57 t.Logf("Failed to close database: %v", err) 58 } 59 }() 60 61 // Setup repositories 62 aggregatorRepo := postgres.NewAggregatorRepository(db) 63 communityRepo := postgres.NewCommunityRepository(db) 64 postRepo := postgres.NewPostRepository(db) 65 userRepo := postgres.NewUserRepository(db) 66 67 // Setup services 68 identityConfig := identity.DefaultConfig() 69 identityResolver := identity.NewResolver(db, identityConfig) 70 userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001") 71 communityService := communities.NewCommunityService(communityRepo, "http://localhost:3001", "did:web:test.coves.social", "coves.social", nil) 72 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 73 postService := posts.NewPostService(postRepo, communityService, aggregatorService, nil, nil, "http://localhost:3001") 74 75 // Setup consumers 76 aggregatorConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 77 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 78 79 // Setup HTTP handlers 80 getServicesHandler := aggregator.NewGetServicesHandler(aggregatorService) 81 getAuthorizationsHandler := aggregator.NewGetAuthorizationsHandler(aggregatorService) 82 listForCommunityHandler := aggregator.NewListForCommunityHandler(aggregatorService) 83 createPostHandler := post.NewCreateHandler(postService) 84 e2eAuth := NewE2EOAuthMiddleware() 85 86 ctx := context.Background() 87 88 // Cleanup test data (aggregators and communities will be created via real PDS in test parts) 89 _, _ = db.Exec("DELETE FROM aggregator_posts WHERE aggregator_did LIKE 'did:plc:%'") 90 _, _ = db.Exec("DELETE FROM aggregator_authorizations WHERE aggregator_did LIKE 'did:plc:%'") 91 _, _ = db.Exec("DELETE FROM aggregators WHERE did LIKE 'did:plc:%'") 92 _, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE 'did:plc:%'") 93 _, _ = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:%'") 94 _, _ = db.Exec("DELETE FROM users WHERE did LIKE 'did:plc:%'") 95 96 // ==================================================================================== 97 // Part 1: Service Declaration via Real PDS 98 // ==================================================================================== 99 // Store DIDs, tokens, and URIs for use across all test parts 100 var aggregatorDID, aggregatorToken, aggregatorAPIToken, aggregatorHandle, communityDID, communityToken, authorizationRkey string 101 102 t.Run("1. Service Declaration - PDS Account → Write Record → Jetstream → AppView DB", func(t *testing.T) { 103 t.Log("\n📝 Part 1: Create aggregator account and publish service declaration to PDS...") 104 105 // STEP 1: Create aggregator account on real PDS 106 // Use PDS configured domain (.local.coves.dev for users/services) 107 timestamp := time.Now().Unix() // Use Unix seconds instead of nanoseconds for shorter handle 108 aggregatorHandle = fmt.Sprintf("rss-agg-%d.local.coves.dev", timestamp) 109 email := fmt.Sprintf("agg-%d@test.com", timestamp) 110 password := "test-password-123" 111 112 var err error 113 aggregatorToken, aggregatorDID, err = createPDSAccount(pdsURL, aggregatorHandle, email, password) 114 require.NoError(t, err, "Failed to create aggregator account on PDS") 115 require.NotEmpty(t, aggregatorToken, "Should receive access token") 116 require.NotEmpty(t, aggregatorDID, "Should receive DID") 117 118 t.Logf("✓ Created aggregator account: %s (%s)", aggregatorHandle, aggregatorDID) 119 120 // Register aggregator user with OAuth middleware for API requests 121 aggregatorAPIToken = e2eAuth.AddUser(aggregatorDID) 122 123 // STEP 2: Write service declaration to aggregator's repository on PDS 124 configSchema := map[string]interface{}{ 125 "type": "object", 126 "properties": map[string]interface{}{ 127 "feedUrl": map[string]interface{}{ 128 "type": "string", 129 "description": "RSS feed URL to aggregate", 130 }, 131 "updateInterval": map[string]interface{}{ 132 "type": "number", 133 "minimum": 5, 134 "maximum": 60, 135 "description": "Minutes between feed checks", 136 }, 137 }, 138 "required": []string{"feedUrl"}, 139 } 140 141 serviceRecord := map[string]interface{}{ 142 "$type": "social.coves.aggregator.service", 143 "did": aggregatorDID, 144 "displayName": "RSS Feed Aggregator", 145 "description": "Aggregates content from RSS feeds", 146 "configSchema": configSchema, 147 "maintainer": aggregatorDID, // Aggregator maintains itself 148 "sourceUrl": "https://github.com/example/rss-aggregator", 149 "createdAt": time.Now().Format(time.RFC3339), 150 } 151 152 // Write to at://{aggregatorDID}/social.coves.aggregator.service/self 153 uri, cid, err := writePDSRecord(pdsURL, aggregatorToken, aggregatorDID, "social.coves.aggregator.service", "self", serviceRecord) 154 require.NoError(t, err, "Failed to write service declaration to PDS") 155 require.NotEmpty(t, uri, "Should receive record URI") 156 require.NotEmpty(t, cid, "Should receive record CID") 157 158 t.Logf("✓ Wrote service declaration to PDS: %s (CID: %s)", uri, cid) 159 160 // STEP 3: Simulate Jetstream event (in production, this comes from real Jetstream) 161 // We simulate it here for test speed - we're testing AppView indexing, not Jetstream itself 162 serviceEvent := jetstream.JetstreamEvent{ 163 Did: aggregatorDID, 164 Kind: "commit", 165 Commit: &jetstream.CommitEvent{ 166 Operation: "create", 167 Collection: "social.coves.aggregator.service", 168 RKey: "self", 169 CID: cid, 170 Record: serviceRecord, 171 }, 172 } 173 174 // STEP 4: Process through Jetstream consumer (simulates what happens when Jetstream broadcasts) 175 err = aggregatorConsumer.HandleEvent(ctx, &serviceEvent) 176 require.NoError(t, err, "Consumer should index service declaration") 177 178 // STEP 2: Verify indexed in AppView database 179 indexedAgg, err := aggregatorRepo.GetAggregator(ctx, aggregatorDID) 180 require.NoError(t, err, "Aggregator should be indexed in AppView") 181 182 assert.Equal(t, aggregatorDID, indexedAgg.DID) 183 assert.Equal(t, "RSS Feed Aggregator", indexedAgg.DisplayName) 184 assert.Equal(t, "Aggregates content from RSS feeds", indexedAgg.Description) 185 assert.Empty(t, indexedAgg.AvatarURL, "Avatar not uploaded in this test") 186 assert.Equal(t, aggregatorDID, indexedAgg.MaintainerDID, "Aggregator maintains itself") 187 assert.Equal(t, "https://github.com/example/rss-aggregator", indexedAgg.SourceURL) 188 assert.NotEmpty(t, indexedAgg.ConfigSchema, "Config schema should be stored") 189 assert.Equal(t, fmt.Sprintf("at://%s/social.coves.aggregator.service/self", aggregatorDID), indexedAgg.RecordURI) 190 assert.False(t, indexedAgg.CreatedAt.IsZero(), "CreatedAt should be parsed from record") 191 assert.False(t, indexedAgg.IndexedAt.IsZero(), "IndexedAt should be set") 192 193 // Verify stats initialized to zero 194 assert.Equal(t, 0, indexedAgg.CommunitiesUsing) 195 assert.Equal(t, 0, indexedAgg.PostsCreated) 196 197 // STEP 6: Index aggregator as a user in AppView (required for post authorship) 198 // In production, this would come from Jetstream indexing app.bsky.actor.profile 199 // For this E2E test, we create it directly 200 testUser := createTestUser(t, db, aggregatorHandle, aggregatorDID) 201 require.NotNil(t, testUser, "Should create aggregator user") 202 203 t.Logf("✓ Indexed aggregator as user: %s", aggregatorHandle) 204 t.Log("✅ Service declaration indexed and aggregator registered as user") 205 }) 206 207 // ==================================================================================== 208 // Part 2: Authorization via Real PDS 209 // ==================================================================================== 210 t.Run("2. Authorization - Community Account → PDS → Jetstream → AppView DB", func(t *testing.T) { 211 t.Log("\n🔐 Part 2: Create community account and authorize aggregator...") 212 213 // STEP 1: Create community account on real PDS 214 // Use PDS configured domain (.community.coves.social for communities) 215 // Keep handle short to avoid PDS "handle too long" error 216 timestamp := time.Now().Unix() % 100000 // Last 5 digits 217 communityHandle := fmt.Sprintf("e2e-%d.community.coves.social", timestamp) 218 communityEmail := fmt.Sprintf("comm-%d@test.com", timestamp) 219 communityPassword := "community-test-password-123" 220 221 var err error 222 communityToken, communityDID, err = createPDSAccount(pdsURL, communityHandle, communityEmail, communityPassword) 223 require.NoError(t, err, "Failed to create community account on PDS") 224 require.NotEmpty(t, communityToken, "Should receive community access token") 225 require.NotEmpty(t, communityDID, "Should receive community DID") 226 227 t.Logf("✓ Created community account: %s (%s)", communityHandle, communityDID) 228 229 // STEP 2: Index community in AppView database (required for foreign key) 230 // In production, this would come from Jetstream indexing community.profile records 231 // For this E2E test, we create it directly 232 testCommunity := &communities.Community{ 233 DID: communityDID, 234 Handle: communityHandle, 235 Name: fmt.Sprintf("e2e-%d", timestamp), 236 DisplayName: "E2E Test Community", 237 OwnerDID: communityDID, 238 CreatedByDID: communityDID, 239 HostedByDID: "did:web:test.coves.social", 240 Visibility: "public", 241 ModerationType: "moderator", 242 RecordURI: fmt.Sprintf("at://%s/social.coves.community.profile/self", communityDID), 243 RecordCID: "fakecid123", 244 PDSAccessToken: communityToken, 245 PDSRefreshToken: communityToken, 246 } 247 _, err = communityRepo.Create(ctx, testCommunity) 248 require.NoError(t, err, "Failed to index community in AppView") 249 250 t.Logf("✓ Indexed community in AppView database") 251 252 // STEP 3: Build aggregator config (matches the schema from Part 1) 253 aggregatorConfig := map[string]interface{}{ 254 "feedUrl": "https://example.com/feed.xml", 255 "updateInterval": 15, 256 } 257 258 // STEP 4: Write authorization record to community's repository on PDS 259 // This record grants permission for the aggregator to post to this community 260 authRecord := map[string]interface{}{ 261 "$type": "social.coves.aggregator.authorization", 262 "aggregatorDid": aggregatorDID, 263 "communityDid": communityDID, 264 "enabled": true, 265 "config": aggregatorConfig, 266 "createdBy": communityDID, // Community authorizes itself 267 "createdAt": time.Now().Format(time.RFC3339), 268 } 269 270 // Write to at://{communityDID}/social.coves.aggregator.authorization/{rkey} 271 authURI, authCID, err := writePDSRecord(pdsURL, communityToken, communityDID, "social.coves.aggregator.authorization", "", authRecord) 272 require.NoError(t, err, "Failed to write authorization to PDS") 273 require.NotEmpty(t, authURI, "Should receive authorization URI") 274 require.NotEmpty(t, authCID, "Should receive authorization CID") 275 276 t.Logf("✓ Wrote authorization to PDS: %s (CID: %s)", authURI, authCID) 277 278 // STEP 5: Simulate Jetstream event (in production, this comes from real Jetstream) 279 authorizationRkey = strings.Split(authURI, "/")[4] // Extract rkey from URI and store for later 280 authEvent := jetstream.JetstreamEvent{ 281 Did: communityDID, // Repository owner (community) 282 Kind: "commit", 283 Commit: &jetstream.CommitEvent{ 284 Operation: "create", 285 Collection: "social.coves.aggregator.authorization", 286 RKey: authorizationRkey, 287 CID: authCID, 288 Record: authRecord, 289 }, 290 } 291 292 // STEP 6: Process through Jetstream consumer 293 err = aggregatorConsumer.HandleEvent(ctx, &authEvent) 294 require.NoError(t, err, "Consumer should index authorization") 295 296 // STEP 7: Verify indexed in AppView database 297 indexedAuth, err := aggregatorRepo.GetAuthorization(ctx, aggregatorDID, communityDID) 298 require.NoError(t, err, "Authorization should be indexed in AppView") 299 300 assert.Equal(t, aggregatorDID, indexedAuth.AggregatorDID) 301 assert.Equal(t, communityDID, indexedAuth.CommunityDID) 302 assert.True(t, indexedAuth.Enabled) 303 assert.Equal(t, communityDID, indexedAuth.CreatedBy) 304 assert.NotEmpty(t, indexedAuth.Config, "Config should be stored") 305 assert.False(t, indexedAuth.CreatedAt.IsZero()) 306 307 // STEP 8: Verify aggregator stats updated via trigger 308 agg, err := aggregatorRepo.GetAggregator(ctx, aggregatorDID) 309 require.NoError(t, err) 310 assert.Equal(t, 1, agg.CommunitiesUsing, "Trigger should increment communities_using") 311 312 // STEP 9: Verify fast authorization check 313 isAuthorized, err := aggregatorRepo.IsAuthorized(ctx, aggregatorDID, communityDID) 314 require.NoError(t, err) 315 assert.True(t, isAuthorized, "IsAuthorized should return true") 316 317 t.Log("✅ Community created and authorization indexed successfully") 318 }) 319 320 // ==================================================================================== 321 // Part 3: Post Creation by Aggregator 322 // ==================================================================================== 323 t.Run("3. Post Creation - Aggregator → Validation → PDS → Jetstream → AppView", func(t *testing.T) { 324 t.Log("\n📮 Part 3: Aggregator creates post in authorized community...") 325 326 // STEP 1: Aggregator calls XRPC endpoint to create post 327 title := "Breaking News from RSS Feed" 328 content := "This post was created by an authorized aggregator!" 329 reqBody := map[string]interface{}{ 330 "community": communityDID, 331 "title": title, 332 "content": content, 333 } 334 reqJSON, err := json.Marshal(reqBody) 335 require.NoError(t, err) 336 337 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 338 req.Header.Set("Content-Type", "application/json") 339 req.Header.Set("Authorization", "Bearer "+aggregatorAPIToken) 340 341 // Execute request through auth middleware + handler 342 rr := httptest.NewRecorder() 343 handler := e2eAuth.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 344 handler.ServeHTTP(rr, req) 345 346 // STEP 2: Verify post creation succeeded 347 require.Equal(t, http.StatusOK, rr.Code, "Handler should return 200 OK, body: %s", rr.Body.String()) 348 349 var response posts.CreatePostResponse 350 err = json.NewDecoder(rr.Body).Decode(&response) 351 require.NoError(t, err, "Failed to parse response") 352 353 t.Logf("✓ Post created on PDS: URI=%s, CID=%s", response.URI, response.CID) 354 355 // STEP 3: Simulate Jetstream event (post written to PDS → firehose) 356 rkey := strings.Split(response.URI, "/")[4] // Extract rkey from URI 357 postEvent := jetstream.JetstreamEvent{ 358 Did: communityDID, 359 Kind: "commit", 360 Commit: &jetstream.CommitEvent{ 361 Operation: "create", 362 Collection: "social.coves.community.post", 363 RKey: rkey, 364 CID: response.CID, 365 Record: map[string]interface{}{ 366 "$type": "social.coves.community.post", 367 "community": communityDID, 368 "author": aggregatorDID, // Aggregator is the author 369 "title": title, 370 "content": content, 371 "createdAt": time.Now().Format(time.RFC3339), 372 }, 373 }, 374 } 375 376 // STEP 4: Process through Jetstream post consumer 377 err = postConsumer.HandleEvent(ctx, &postEvent) 378 require.NoError(t, err, "Post consumer should index post") 379 380 // STEP 5: Verify post indexed in AppView 381 indexedPost, err := postRepo.GetByURI(ctx, response.URI) 382 require.NoError(t, err, "Post should be indexed in AppView") 383 384 assert.Equal(t, response.URI, indexedPost.URI) 385 assert.Equal(t, response.CID, indexedPost.CID) 386 assert.Equal(t, aggregatorDID, indexedPost.AuthorDID, "Author should be aggregator") 387 assert.Equal(t, communityDID, indexedPost.CommunityDID) 388 assert.Equal(t, title, *indexedPost.Title) 389 assert.Equal(t, content, *indexedPost.Content) 390 391 // STEP 6: Verify aggregator stats updated 392 agg, err := aggregatorRepo.GetAggregator(ctx, aggregatorDID) 393 require.NoError(t, err) 394 assert.Equal(t, 1, agg.PostsCreated, "Trigger should increment posts_created") 395 396 // STEP 7: Verify post tracking for rate limiting 397 since := time.Now().Add(-1 * time.Hour) 398 postCount, err := aggregatorRepo.CountRecentPosts(ctx, aggregatorDID, communityDID, since) 399 require.NoError(t, err) 400 assert.Equal(t, 1, postCount, "Should track 1 post for rate limiting") 401 402 t.Log("✅ Post created, indexed, and stats updated") 403 }) 404 405 // ==================================================================================== 406 // Part 4: Rate Limiting 407 // ==================================================================================== 408 t.Run("4. Rate Limiting - Enforces 10 posts/hour limit", func(t *testing.T) { 409 t.Log("\n⏱️ Part 4: Testing rate limit enforcement...") 410 411 // Create 8 more posts (we already have 1 from Part 3, need 9 total to be under limit) 412 for i := 2; i <= 9; i++ { 413 title := fmt.Sprintf("Post #%d", i) 414 content := fmt.Sprintf("This is post number %d", i) 415 416 reqBody := map[string]interface{}{ 417 "community": communityDID, 418 "title": title, 419 "content": content, 420 } 421 reqJSON, err := json.Marshal(reqBody) 422 require.NoError(t, err) 423 424 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 425 req.Header.Set("Content-Type", "application/json") 426 req.Header.Set("Authorization", "Bearer "+aggregatorAPIToken) 427 428 rr := httptest.NewRecorder() 429 handler := e2eAuth.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 430 handler.ServeHTTP(rr, req) 431 432 require.Equal(t, http.StatusOK, rr.Code, "Post %d should succeed", i) 433 } 434 435 t.Log("✓ Created 9 posts successfully (under 10 limit)") 436 437 // Try to create 10th post - should succeed (at limit) 438 reqBody := map[string]interface{}{ 439 "community": communityDID, 440 "title": "Post #10 - Should Succeed", 441 "content": "This is the 10th post (at limit)", 442 } 443 reqJSON, err := json.Marshal(reqBody) 444 require.NoError(t, err) 445 446 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 447 req.Header.Set("Content-Type", "application/json") 448 req.Header.Set("Authorization", "Bearer "+aggregatorAPIToken) 449 450 rr := httptest.NewRecorder() 451 handler := e2eAuth.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 452 handler.ServeHTTP(rr, req) 453 454 require.Equal(t, http.StatusOK, rr.Code, "10th post should succeed (at limit)") 455 456 t.Log("✓ 10th post succeeded (at limit)") 457 458 // Try to create 11th post - should be rate limited 459 reqBody = map[string]interface{}{ 460 "community": communityDID, 461 "title": "Post #11 - Should Fail", 462 "content": "This should be rate limited", 463 } 464 reqJSON, err = json.Marshal(reqBody) 465 require.NoError(t, err) 466 467 req = httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 468 req.Header.Set("Content-Type", "application/json") 469 req.Header.Set("Authorization", "Bearer "+aggregatorAPIToken) 470 471 rr = httptest.NewRecorder() 472 handler = e2eAuth.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 473 handler.ServeHTTP(rr, req) 474 475 // Should be rate limited 476 assert.Equal(t, http.StatusTooManyRequests, rr.Code, "Should return 429 Too Many Requests") 477 478 var errorResp map[string]interface{} 479 err = json.NewDecoder(rr.Body).Decode(&errorResp) 480 require.NoError(t, err) 481 482 // Error type will be "RateLimitExceeded" (lowercase: "ratelimitexceeded") 483 errorType := strings.ToLower(errorResp["error"].(string)) 484 assert.True(t, 485 strings.Contains(errorType, "ratelimit") || strings.Contains(errorType, "rate limit"), 486 "Error should mention rate limit, got: %s", errorType) 487 488 t.Log("✅ Rate limiting enforced correctly") 489 }) 490 491 // ==================================================================================== 492 // Part 5: Query Endpoints (XRPC Handlers) 493 // ==================================================================================== 494 t.Run("5. Query Endpoints - XRPC handlers return indexed data", func(t *testing.T) { 495 t.Log("\n🔍 Part 5: Testing XRPC query endpoints...") 496 497 // Test 5.1: getServices endpoint 498 t.Run("getServices - Basic view", func(t *testing.T) { 499 req := httptest.NewRequest("GET", fmt.Sprintf("/xrpc/social.coves.aggregator.getServices?dids=%s", aggregatorDID), nil) 500 rr := httptest.NewRecorder() 501 502 getServicesHandler.HandleGetServices(rr, req) 503 504 require.Equal(t, http.StatusOK, rr.Code) 505 506 var response aggregator.GetServicesResponse 507 err := json.NewDecoder(rr.Body).Decode(&response) 508 require.NoError(t, err) 509 510 require.Len(t, response.Views, 1, "Should return 1 aggregator") 511 512 // Views is []interface{}, unmarshal to check fields 513 viewJSON, _ := json.Marshal(response.Views[0]) 514 var view aggregator.AggregatorView 515 _ = json.Unmarshal(viewJSON, &view) 516 517 assert.Equal(t, aggregatorDID, view.DID) 518 assert.Equal(t, "RSS Feed Aggregator", view.DisplayName) 519 assert.NotNil(t, view.Description) 520 assert.Equal(t, "Aggregates content from RSS feeds", *view.Description) 521 // Avatar not uploaded in this test 522 if view.Avatar != nil { 523 t.Logf("Avatar CID: %s", *view.Avatar) 524 } 525 526 t.Log("✓ getServices (basic view) works") 527 }) 528 529 // Test 5.2: getServices endpoint with detailed flag 530 t.Run("getServices - Detailed view with stats", func(t *testing.T) { 531 req := httptest.NewRequest("GET", fmt.Sprintf("/xrpc/social.coves.aggregator.getServices?dids=%s&detailed=true", aggregatorDID), nil) 532 rr := httptest.NewRecorder() 533 534 getServicesHandler.HandleGetServices(rr, req) 535 536 require.Equal(t, http.StatusOK, rr.Code) 537 538 var response aggregator.GetServicesResponse 539 err := json.NewDecoder(rr.Body).Decode(&response) 540 require.NoError(t, err) 541 542 require.Len(t, response.Views, 1) 543 544 viewJSON, _ := json.Marshal(response.Views[0]) 545 var detailedView aggregator.AggregatorViewDetailed 546 _ = json.Unmarshal(viewJSON, &detailedView) 547 548 assert.Equal(t, aggregatorDID, detailedView.DID) 549 assert.Equal(t, 1, detailedView.Stats.CommunitiesUsing) 550 assert.Equal(t, 10, detailedView.Stats.PostsCreated) 551 552 t.Log("✓ getServices (detailed view) includes stats") 553 }) 554 555 // Test 5.3: getAuthorizations endpoint 556 t.Run("getAuthorizations - List communities using aggregator", func(t *testing.T) { 557 req := httptest.NewRequest("GET", fmt.Sprintf("/xrpc/social.coves.aggregator.getAuthorizations?aggregatorDid=%s", aggregatorDID), nil) 558 rr := httptest.NewRecorder() 559 560 getAuthorizationsHandler.HandleGetAuthorizations(rr, req) 561 562 require.Equal(t, http.StatusOK, rr.Code) 563 564 var response map[string]interface{} 565 err := json.NewDecoder(rr.Body).Decode(&response) 566 require.NoError(t, err) 567 568 // Check if authorizations field exists and is not nil 569 authsInterface, ok := response["authorizations"] 570 require.True(t, ok, "Response should have 'authorizations' field") 571 572 // Empty slice is valid (after authorization was disabled in Part 8) 573 if authsInterface != nil { 574 auths := authsInterface.([]interface{}) 575 t.Logf("Found %d authorizations", len(auths)) 576 // Don't assert length - authorization may have been disabled in Part 8 577 if len(auths) > 0 { 578 authMap := auths[0].(map[string]interface{}) 579 // authMap contains nested aggregator object, not flat communityDid 580 t.Logf("First authorization: %+v", authMap) 581 } 582 } 583 584 t.Log("✓ getAuthorizations works") 585 }) 586 587 // Test 5.4: listForCommunity endpoint 588 t.Run("listForCommunity - List aggregators for community", func(t *testing.T) { 589 req := httptest.NewRequest("GET", fmt.Sprintf("/xrpc/social.coves.aggregator.listForCommunity?community=%s", communityDID), nil) 590 rr := httptest.NewRecorder() 591 592 listForCommunityHandler.HandleListForCommunity(rr, req) 593 594 require.Equal(t, http.StatusOK, rr.Code) 595 596 var response map[string]interface{} 597 err := json.NewDecoder(rr.Body).Decode(&response) 598 require.NoError(t, err) 599 600 // Check if aggregators field exists (not 'authorizations') 601 aggsInterface, ok := response["aggregators"] 602 require.True(t, ok, "Response should have 'aggregators' field") 603 604 // Empty slice is valid (after authorization was disabled in Part 8) 605 if aggsInterface != nil { 606 aggs := aggsInterface.([]interface{}) 607 t.Logf("Found %d aggregators", len(aggs)) 608 // Don't assert length - authorization may have been disabled in Part 8 609 if len(aggs) > 0 { 610 aggMap := aggs[0].(map[string]interface{}) 611 assert.Equal(t, aggregatorDID, aggMap["aggregatorDid"]) 612 assert.Equal(t, communityDID, aggMap["communityDid"]) 613 } 614 } 615 616 t.Log("✓ listForCommunity works") 617 }) 618 619 t.Log("✅ All XRPC query endpoints work correctly") 620 }) 621 622 // ==================================================================================== 623 // Part 6: Security - Unauthorized Post Attempt 624 // ==================================================================================== 625 t.Run("6. Security - Rejects post from unauthorized aggregator", func(t *testing.T) { 626 t.Log("\n🔒 Part 6: Testing security - unauthorized aggregator...") 627 628 unauthorizedAggDID := "did:plc:e2eaggunauth999" 629 630 // First, register this aggregator (but DON'T authorize it) 631 unAuthAggEvent := jetstream.JetstreamEvent{ 632 Did: unauthorizedAggDID, 633 Kind: "commit", 634 Commit: &jetstream.CommitEvent{ 635 Operation: "create", 636 Collection: "social.coves.aggregator.service", 637 RKey: "self", 638 CID: "bafy2bzaceunauth", 639 Record: map[string]interface{}{ 640 "$type": "social.coves.aggregator.service", 641 "did": unauthorizedAggDID, 642 "displayName": "Unauthorized Aggregator", 643 "createdAt": time.Now().Format(time.RFC3339), 644 }, 645 }, 646 } 647 err := aggregatorConsumer.HandleEvent(ctx, &unAuthAggEvent) 648 require.NoError(t, err) 649 650 // Register unauthorized aggregator with OAuth middleware 651 unauthorizedAPIToken := e2eAuth.AddUser(unauthorizedAggDID) 652 653 // Try to create post without authorization 654 reqBody := map[string]interface{}{ 655 "community": communityDID, 656 "title": "Unauthorized Post", 657 "content": "This should be rejected", 658 } 659 reqJSON, err := json.Marshal(reqBody) 660 require.NoError(t, err) 661 662 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 663 req.Header.Set("Content-Type", "application/json") 664 req.Header.Set("Authorization", "Bearer "+unauthorizedAPIToken) 665 666 rr := httptest.NewRecorder() 667 handler := e2eAuth.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 668 handler.ServeHTTP(rr, req) 669 670 // Should be forbidden 671 assert.Equal(t, http.StatusForbidden, rr.Code, "Should return 403 Forbidden") 672 673 var errorResp map[string]interface{} 674 err = json.NewDecoder(rr.Body).Decode(&errorResp) 675 require.NoError(t, err) 676 677 // Error message format from aggregators.ErrNotAuthorized: "aggregator not authorized for this community" 678 // Or from the compact form "notauthorized" (lowercase, no spaces) 679 errorMsg := strings.ToLower(errorResp["error"].(string)) 680 assert.True(t, 681 strings.Contains(errorMsg, "not authorized") || strings.Contains(errorMsg, "notauthorized"), 682 "Error should mention authorization, got: %s", errorMsg) 683 684 t.Log("✅ Unauthorized post correctly rejected") 685 }) 686 687 // ==================================================================================== 688 // Part 7: Idempotent Indexing 689 // ==================================================================================== 690 t.Run("7. Idempotent Indexing - Duplicate Jetstream events", func(t *testing.T) { 691 t.Log("\n♻️ Part 7: Testing idempotent indexing...") 692 693 duplicateAggDID := "did:plc:e2eaggdup999" 694 695 // Create service declaration event 696 serviceEvent := jetstream.JetstreamEvent{ 697 Did: duplicateAggDID, 698 Kind: "commit", 699 Commit: &jetstream.CommitEvent{ 700 Operation: "create", 701 Collection: "social.coves.aggregator.service", 702 RKey: "self", 703 CID: "bafy2bzacedup123", 704 Record: map[string]interface{}{ 705 "$type": "social.coves.aggregator.service", 706 "did": duplicateAggDID, 707 "displayName": "Duplicate Test Aggregator", 708 "createdAt": time.Now().Format(time.RFC3339), 709 }, 710 }, 711 } 712 713 // Process first time 714 err := aggregatorConsumer.HandleEvent(ctx, &serviceEvent) 715 require.NoError(t, err, "First event should succeed") 716 717 // Process second time (duplicate) 718 err = aggregatorConsumer.HandleEvent(ctx, &serviceEvent) 719 require.NoError(t, err, "Duplicate event should be handled gracefully (upsert)") 720 721 // Verify only one record exists 722 agg, err := aggregatorRepo.GetAggregator(ctx, duplicateAggDID) 723 require.NoError(t, err) 724 assert.Equal(t, duplicateAggDID, agg.DID) 725 726 t.Log("✅ Idempotent indexing works correctly") 727 }) 728 729 // ==================================================================================== 730 // Part 8: Authorization Disable 731 // ==================================================================================== 732 t.Run("8. Authorization Disable - Jetstream update event", func(t *testing.T) { 733 t.Log("\n🚫 Part 8: Testing authorization disable...") 734 735 // Simulate Jetstream event: Community moderator disabled the authorization 736 disableEvent := jetstream.JetstreamEvent{ 737 Did: communityDID, 738 Kind: "commit", 739 Commit: &jetstream.CommitEvent{ 740 Operation: "update", 741 Collection: "social.coves.aggregator.authorization", 742 RKey: authorizationRkey, // Use real rkey from Part 2 743 CID: "bafy2bzacedisabled", 744 Record: map[string]interface{}{ 745 "$type": "social.coves.aggregator.authorization", 746 "aggregatorDid": aggregatorDID, 747 "communityDid": communityDID, 748 "enabled": false, // Now disabled 749 "config": map[string]interface{}{ 750 "feedUrl": "https://example.com/feed.xml", 751 "updateInterval": 15, 752 }, 753 "createdBy": communityDID, 754 "disabledBy": communityDID, 755 "disabledAt": time.Now().Format(time.RFC3339), 756 "createdAt": time.Now().Add(-1 * time.Hour).Format(time.RFC3339), 757 }, 758 }, 759 } 760 761 // Process through consumer 762 err := aggregatorConsumer.HandleEvent(ctx, &disableEvent) 763 require.NoError(t, err) 764 765 // Verify authorization is disabled 766 auth, err := aggregatorRepo.GetAuthorization(ctx, aggregatorDID, communityDID) 767 require.NoError(t, err) 768 assert.False(t, auth.Enabled, "Authorization should be disabled") 769 assert.Equal(t, communityDID, auth.DisabledBy) 770 assert.NotNil(t, auth.DisabledAt) 771 772 // Verify fast check returns false 773 isAuthorized, err := aggregatorRepo.IsAuthorized(ctx, aggregatorDID, communityDID) 774 require.NoError(t, err) 775 assert.False(t, isAuthorized, "IsAuthorized should return false") 776 777 // Try to create post - should be rejected 778 reqBody := map[string]interface{}{ 779 "community": communityDID, 780 "title": "Post After Disable", 781 "content": "This should fail", 782 } 783 reqJSON, err := json.Marshal(reqBody) 784 require.NoError(t, err) 785 786 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 787 req.Header.Set("Content-Type", "application/json") 788 req.Header.Set("Authorization", "Bearer "+aggregatorAPIToken) 789 790 rr := httptest.NewRecorder() 791 handler := e2eAuth.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 792 handler.ServeHTTP(rr, req) 793 794 assert.Equal(t, http.StatusForbidden, rr.Code, "Should reject post from disabled aggregator") 795 796 t.Log("✅ Authorization disable works correctly") 797 }) 798 799 t.Log("\n✅ Full E2E Test Complete - All 8 Parts Passed!") 800 t.Log("Summary:") 801 t.Log(" ✓ Service Declaration indexed via Jetstream") 802 t.Log(" ✓ Authorization indexed and stats updated") 803 t.Log(" ✓ Aggregator can create posts in authorized communities") 804 t.Log(" ✓ Rate limiting enforced (10 posts/hour)") 805 t.Log(" ✓ XRPC query endpoints return correct data") 806 t.Log(" ✓ Security: Unauthorized posts rejected") 807 t.Log(" ✓ Idempotent indexing handles duplicates") 808 t.Log(" ✓ Authorization disable prevents posting") 809} 810 811// TestAggregator_E2E_LivePDS tests the COMPLETE end-to-end flow with a live PDS 812// This would require: 813// - Live PDS running at PDS_URL 814// - Live Jetstream running at JETSTREAM_URL 815// - Ability to provision aggregator accounts on PDS 816// - Real WebSocket connection to Jetstream firehose 817// 818// NOTE: This is a placeholder for future implementation 819// For now, use TestAggregator_E2E_WithJetstream for integration testing 820func TestAggregator_E2E_LivePDS(t *testing.T) { 821 if testing.Short() { 822 t.Skip("Skipping live PDS E2E test in short mode") 823 } 824 825 // Setup test database 826 dbURL := os.Getenv("TEST_DATABASE_URL") 827 if dbURL == "" { 828 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 829 } 830 831 db, err := sql.Open("postgres", dbURL) 832 require.NoError(t, err, "Failed to connect to test database") 833 defer func() { 834 if closeErr := db.Close(); closeErr != nil { 835 t.Logf("Failed to close database: %v", closeErr) 836 } 837 }() 838 839 // Run migrations 840 require.NoError(t, goose.SetDialect("postgres")) 841 require.NoError(t, goose.Up(db, "../../internal/db/migrations")) 842 843 // Check if PDS is running 844 pdsURL := os.Getenv("PDS_URL") 845 if pdsURL == "" { 846 pdsURL = "http://localhost:3001" 847 } 848 849 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 850 if err != nil { 851 t.Skipf("PDS not running at %s: %v", pdsURL, err) 852 } 853 _ = healthResp.Body.Close() 854 855 t.Skip("Live PDS E2E test not yet implemented - use TestAggregator_E2E_WithJetstream") 856 857 // TODO: Implement live PDS E2E test 858 // 1. Provision aggregator account on real PDS 859 // 2. Write service declaration to aggregator's repository 860 // 3. Subscribe to real Jetstream and wait for event 861 // 4. Verify indexing in AppView 862 // 5. Provision community and authorize aggregator 863 // 6. Create real post via XRPC 864 // 7. Wait for Jetstream post event 865 // 8. Verify complete flow 866}