A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/api/handlers/aggregator" 5 "Coves/internal/api/handlers/post" 6 "Coves/internal/api/middleware" 7 "Coves/internal/atproto/identity" 8 "Coves/internal/atproto/jetstream" 9 "Coves/internal/core/aggregators" 10 "Coves/internal/core/communities" 11 "Coves/internal/core/posts" 12 "Coves/internal/core/users" 13 "Coves/internal/db/postgres" 14 "bytes" 15 "context" 16 "database/sql" 17 "encoding/json" 18 "fmt" 19 "net/http" 20 "net/http/httptest" 21 "os" 22 "strings" 23 "testing" 24 "time" 25 26 _ "github.com/lib/pq" 27 "github.com/pressly/goose/v3" 28 "github.com/stretchr/testify/assert" 29 "github.com/stretchr/testify/require" 30) 31 32// TestAggregator_E2E_WithJetstream tests the complete aggregator flow with real PDS: 33// 1. Service Declaration: Create aggregator account → Write service record → Jetstream → AppView DB 34// 2. Authorization: Create community account → Write authorization record → Jetstream → AppView DB 35// 3. Post Creation: Aggregator creates post → Validates authorization + rate limits → PDS → Jetstream → AppView 36// 4. Query Endpoints: Verify XRPC handlers return correct data from AppView 37// 38// This tests the REAL atProto flow: 39// - Real accounts created on PDS 40// - Real records written via XRPC 41// - Simulated Jetstream events (for test speed - testing AppView indexing, not Jetstream itself) 42// - AppView indexes and serves data via XRPC 43// 44// NOTE: Requires PDS running at http://localhost:3001 45func TestAggregator_E2E_WithJetstream(t *testing.T) { 46 // Check if PDS is available 47 pdsURL := "http://localhost:3001" 48 resp, err := http.Get(pdsURL + "/xrpc/_health") 49 if err != nil || resp.StatusCode != http.StatusOK { 50 t.Skipf("PDS not available at %s - run 'make dev-up' to start it", pdsURL) 51 } 52 if resp != nil { 53 _ = resp.Body.Close() 54 } 55 db := setupTestDB(t) 56 defer func() { 57 if err := db.Close(); err != nil { 58 t.Logf("Failed to close database: %v", err) 59 } 60 }() 61 62 // Setup repositories 63 aggregatorRepo := postgres.NewAggregatorRepository(db) 64 communityRepo := postgres.NewCommunityRepository(db) 65 postRepo := postgres.NewPostRepository(db) 66 userRepo := postgres.NewUserRepository(db) 67 68 // Setup services 69 identityConfig := identity.DefaultConfig() 70 identityResolver := identity.NewResolver(db, identityConfig) 71 userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001") 72 communityService := communities.NewCommunityService(communityRepo, "http://localhost:3001", "did:web:test.coves.social", "coves.social", nil) 73 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 74 postService := posts.NewPostService(postRepo, communityService, aggregatorService, "http://localhost:3001") 75 76 // Setup consumers 77 aggregatorConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 78 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 79 80 // Setup HTTP handlers 81 getServicesHandler := aggregator.NewGetServicesHandler(aggregatorService) 82 getAuthorizationsHandler := aggregator.NewGetAuthorizationsHandler(aggregatorService) 83 listForCommunityHandler := aggregator.NewListForCommunityHandler(aggregatorService) 84 createPostHandler := post.NewCreateHandler(postService) 85 authMiddleware := middleware.NewAtProtoAuthMiddleware(nil, true) // Skip JWT verification for testing 86 87 ctx := context.Background() 88 89 // Cleanup test data (aggregators and communities will be created via real PDS in test parts) 90 _, _ = db.Exec("DELETE FROM aggregator_posts WHERE aggregator_did LIKE 'did:plc:%'") 91 _, _ = db.Exec("DELETE FROM aggregator_authorizations WHERE aggregator_did LIKE 'did:plc:%'") 92 _, _ = db.Exec("DELETE FROM aggregators WHERE did LIKE 'did:plc:%'") 93 _, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE 'did:plc:%'") 94 _, _ = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:%'") 95 _, _ = db.Exec("DELETE FROM users WHERE did LIKE 'did:plc:%'") 96 97 // ==================================================================================== 98 // Part 1: Service Declaration via Real PDS 99 // ==================================================================================== 100 // Store DIDs, tokens, and URIs for use across all test parts 101 var aggregatorDID, aggregatorToken, aggregatorHandle, communityDID, communityToken, authorizationRkey string 102 103 t.Run("1. Service Declaration - PDS Account → Write Record → Jetstream → AppView DB", func(t *testing.T) { 104 t.Log("\n📝 Part 1: Create aggregator account and publish service declaration to PDS...") 105 106 // STEP 1: Create aggregator account on real PDS 107 // Use PDS configured domain (.local.coves.dev for users/services) 108 timestamp := time.Now().Unix() // Use Unix seconds instead of nanoseconds for shorter handle 109 aggregatorHandle = fmt.Sprintf("rss-agg-%d.local.coves.dev", timestamp) 110 email := fmt.Sprintf("agg-%d@test.com", timestamp) 111 password := "test-password-123" 112 113 var err error 114 aggregatorToken, aggregatorDID, err = createPDSAccount(pdsURL, aggregatorHandle, email, password) 115 require.NoError(t, err, "Failed to create aggregator account on PDS") 116 require.NotEmpty(t, aggregatorToken, "Should receive access token") 117 require.NotEmpty(t, aggregatorDID, "Should receive DID") 118 119 t.Logf("✓ Created aggregator account: %s (%s)", aggregatorHandle, aggregatorDID) 120 121 // STEP 2: Write service declaration to aggregator's repository on PDS 122 configSchema := map[string]interface{}{ 123 "type": "object", 124 "properties": map[string]interface{}{ 125 "feedUrl": map[string]interface{}{ 126 "type": "string", 127 "description": "RSS feed URL to aggregate", 128 }, 129 "updateInterval": map[string]interface{}{ 130 "type": "number", 131 "minimum": 5, 132 "maximum": 60, 133 "description": "Minutes between feed checks", 134 }, 135 }, 136 "required": []string{"feedUrl"}, 137 } 138 139 serviceRecord := map[string]interface{}{ 140 "$type": "social.coves.aggregator.service", 141 "did": aggregatorDID, 142 "displayName": "RSS Feed Aggregator", 143 "description": "Aggregates content from RSS feeds", 144 "configSchema": configSchema, 145 "maintainer": aggregatorDID, // Aggregator maintains itself 146 "sourceUrl": "https://github.com/example/rss-aggregator", 147 "createdAt": time.Now().Format(time.RFC3339), 148 } 149 150 // Write to at://{aggregatorDID}/social.coves.aggregator.service/self 151 uri, cid, err := writePDSRecord(pdsURL, aggregatorToken, aggregatorDID, "social.coves.aggregator.service", "self", serviceRecord) 152 require.NoError(t, err, "Failed to write service declaration to PDS") 153 require.NotEmpty(t, uri, "Should receive record URI") 154 require.NotEmpty(t, cid, "Should receive record CID") 155 156 t.Logf("✓ Wrote service declaration to PDS: %s (CID: %s)", uri, cid) 157 158 // STEP 3: Simulate Jetstream event (in production, this comes from real Jetstream) 159 // We simulate it here for test speed - we're testing AppView indexing, not Jetstream itself 160 serviceEvent := jetstream.JetstreamEvent{ 161 Did: aggregatorDID, 162 Kind: "commit", 163 Commit: &jetstream.CommitEvent{ 164 Operation: "create", 165 Collection: "social.coves.aggregator.service", 166 RKey: "self", 167 CID: cid, 168 Record: serviceRecord, 169 }, 170 } 171 172 // STEP 4: Process through Jetstream consumer (simulates what happens when Jetstream broadcasts) 173 err = aggregatorConsumer.HandleEvent(ctx, &serviceEvent) 174 require.NoError(t, err, "Consumer should index service declaration") 175 176 // STEP 2: Verify indexed in AppView database 177 indexedAgg, err := aggregatorRepo.GetAggregator(ctx, aggregatorDID) 178 require.NoError(t, err, "Aggregator should be indexed in AppView") 179 180 assert.Equal(t, aggregatorDID, indexedAgg.DID) 181 assert.Equal(t, "RSS Feed Aggregator", indexedAgg.DisplayName) 182 assert.Equal(t, "Aggregates content from RSS feeds", indexedAgg.Description) 183 assert.Empty(t, indexedAgg.AvatarURL, "Avatar not uploaded in this test") 184 assert.Equal(t, aggregatorDID, indexedAgg.MaintainerDID, "Aggregator maintains itself") 185 assert.Equal(t, "https://github.com/example/rss-aggregator", indexedAgg.SourceURL) 186 assert.NotEmpty(t, indexedAgg.ConfigSchema, "Config schema should be stored") 187 assert.Equal(t, fmt.Sprintf("at://%s/social.coves.aggregator.service/self", aggregatorDID), indexedAgg.RecordURI) 188 assert.False(t, indexedAgg.CreatedAt.IsZero(), "CreatedAt should be parsed from record") 189 assert.False(t, indexedAgg.IndexedAt.IsZero(), "IndexedAt should be set") 190 191 // Verify stats initialized to zero 192 assert.Equal(t, 0, indexedAgg.CommunitiesUsing) 193 assert.Equal(t, 0, indexedAgg.PostsCreated) 194 195 // STEP 6: Index aggregator as a user in AppView (required for post authorship) 196 // In production, this would come from Jetstream indexing app.bsky.actor.profile 197 // For this E2E test, we create it directly 198 testUser := createTestUser(t, db, aggregatorHandle, aggregatorDID) 199 require.NotNil(t, testUser, "Should create aggregator user") 200 201 t.Logf("✓ Indexed aggregator as user: %s", aggregatorHandle) 202 t.Log("✅ Service declaration indexed and aggregator registered as user") 203 }) 204 205 // ==================================================================================== 206 // Part 2: Authorization via Real PDS 207 // ==================================================================================== 208 t.Run("2. Authorization - Community Account → PDS → Jetstream → AppView DB", func(t *testing.T) { 209 t.Log("\n🔐 Part 2: Create community account and authorize aggregator...") 210 211 // STEP 1: Create community account on real PDS 212 // Use PDS configured domain (.community.coves.social for communities) 213 // Keep handle short to avoid PDS "handle too long" error 214 timestamp := time.Now().Unix() % 100000 // Last 5 digits 215 communityHandle := fmt.Sprintf("e2e-%d.community.coves.social", timestamp) 216 communityEmail := fmt.Sprintf("comm-%d@test.com", timestamp) 217 communityPassword := "community-test-password-123" 218 219 var err error 220 communityToken, communityDID, err = createPDSAccount(pdsURL, communityHandle, communityEmail, communityPassword) 221 require.NoError(t, err, "Failed to create community account on PDS") 222 require.NotEmpty(t, communityToken, "Should receive community access token") 223 require.NotEmpty(t, communityDID, "Should receive community DID") 224 225 t.Logf("✓ Created community account: %s (%s)", communityHandle, communityDID) 226 227 // STEP 2: Index community in AppView database (required for foreign key) 228 // In production, this would come from Jetstream indexing community.profile records 229 // For this E2E test, we create it directly 230 testCommunity := &communities.Community{ 231 DID: communityDID, 232 Handle: communityHandle, 233 Name: fmt.Sprintf("e2e-%d", timestamp), 234 DisplayName: "E2E Test Community", 235 OwnerDID: communityDID, 236 CreatedByDID: communityDID, 237 HostedByDID: "did:web:test.coves.social", 238 Visibility: "public", 239 ModerationType: "moderator", 240 RecordURI: fmt.Sprintf("at://%s/social.coves.community.profile/self", communityDID), 241 RecordCID: "fakecid123", 242 PDSAccessToken: communityToken, 243 PDSRefreshToken: communityToken, 244 } 245 _, err = communityRepo.Create(ctx, testCommunity) 246 require.NoError(t, err, "Failed to index community in AppView") 247 248 t.Logf("✓ Indexed community in AppView database") 249 250 // STEP 3: Build aggregator config (matches the schema from Part 1) 251 aggregatorConfig := map[string]interface{}{ 252 "feedUrl": "https://example.com/feed.xml", 253 "updateInterval": 15, 254 } 255 256 // STEP 4: Write authorization record to community's repository on PDS 257 // This record grants permission for the aggregator to post to this community 258 authRecord := map[string]interface{}{ 259 "$type": "social.coves.aggregator.authorization", 260 "aggregatorDid": aggregatorDID, 261 "communityDid": communityDID, 262 "enabled": true, 263 "config": aggregatorConfig, 264 "createdBy": communityDID, // Community authorizes itself 265 "createdAt": time.Now().Format(time.RFC3339), 266 } 267 268 // Write to at://{communityDID}/social.coves.aggregator.authorization/{rkey} 269 authURI, authCID, err := writePDSRecord(pdsURL, communityToken, communityDID, "social.coves.aggregator.authorization", "", authRecord) 270 require.NoError(t, err, "Failed to write authorization to PDS") 271 require.NotEmpty(t, authURI, "Should receive authorization URI") 272 require.NotEmpty(t, authCID, "Should receive authorization CID") 273 274 t.Logf("✓ Wrote authorization to PDS: %s (CID: %s)", authURI, authCID) 275 276 // STEP 5: Simulate Jetstream event (in production, this comes from real Jetstream) 277 authorizationRkey = strings.Split(authURI, "/")[4] // Extract rkey from URI and store for later 278 authEvent := jetstream.JetstreamEvent{ 279 Did: communityDID, // Repository owner (community) 280 Kind: "commit", 281 Commit: &jetstream.CommitEvent{ 282 Operation: "create", 283 Collection: "social.coves.aggregator.authorization", 284 RKey: authorizationRkey, 285 CID: authCID, 286 Record: authRecord, 287 }, 288 } 289 290 // STEP 6: Process through Jetstream consumer 291 err = aggregatorConsumer.HandleEvent(ctx, &authEvent) 292 require.NoError(t, err, "Consumer should index authorization") 293 294 // STEP 7: Verify indexed in AppView database 295 indexedAuth, err := aggregatorRepo.GetAuthorization(ctx, aggregatorDID, communityDID) 296 require.NoError(t, err, "Authorization should be indexed in AppView") 297 298 assert.Equal(t, aggregatorDID, indexedAuth.AggregatorDID) 299 assert.Equal(t, communityDID, indexedAuth.CommunityDID) 300 assert.True(t, indexedAuth.Enabled) 301 assert.Equal(t, communityDID, indexedAuth.CreatedBy) 302 assert.NotEmpty(t, indexedAuth.Config, "Config should be stored") 303 assert.False(t, indexedAuth.CreatedAt.IsZero()) 304 305 // STEP 8: Verify aggregator stats updated via trigger 306 agg, err := aggregatorRepo.GetAggregator(ctx, aggregatorDID) 307 require.NoError(t, err) 308 assert.Equal(t, 1, agg.CommunitiesUsing, "Trigger should increment communities_using") 309 310 // STEP 9: Verify fast authorization check 311 isAuthorized, err := aggregatorRepo.IsAuthorized(ctx, aggregatorDID, communityDID) 312 require.NoError(t, err) 313 assert.True(t, isAuthorized, "IsAuthorized should return true") 314 315 t.Log("✅ Community created and authorization indexed successfully") 316 }) 317 318 // ==================================================================================== 319 // Part 3: Post Creation by Aggregator 320 // ==================================================================================== 321 t.Run("3. Post Creation - Aggregator → Validation → PDS → Jetstream → AppView", func(t *testing.T) { 322 t.Log("\n📮 Part 3: Aggregator creates post in authorized community...") 323 324 // STEP 1: Aggregator calls XRPC endpoint to create post 325 title := "Breaking News from RSS Feed" 326 content := "This post was created by an authorized aggregator!" 327 reqBody := map[string]interface{}{ 328 "community": communityDID, 329 "title": title, 330 "content": content, 331 } 332 reqJSON, err := json.Marshal(reqBody) 333 require.NoError(t, err) 334 335 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 336 req.Header.Set("Content-Type", "application/json") 337 338 // Create JWT for aggregator (not a user) 339 aggregatorJWT := createSimpleTestJWT(aggregatorDID) 340 req.Header.Set("Authorization", "Bearer "+aggregatorJWT) 341 342 // Execute request through auth middleware + handler 343 rr := httptest.NewRecorder() 344 handler := authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 345 handler.ServeHTTP(rr, req) 346 347 // STEP 2: Verify post creation succeeded 348 require.Equal(t, http.StatusOK, rr.Code, "Handler should return 200 OK, body: %s", rr.Body.String()) 349 350 var response posts.CreatePostResponse 351 err = json.NewDecoder(rr.Body).Decode(&response) 352 require.NoError(t, err, "Failed to parse response") 353 354 t.Logf("✓ Post created on PDS: URI=%s, CID=%s", response.URI, response.CID) 355 356 // STEP 3: Simulate Jetstream event (post written to PDS → firehose) 357 rkey := strings.Split(response.URI, "/")[4] // Extract rkey from URI 358 postEvent := jetstream.JetstreamEvent{ 359 Did: communityDID, 360 Kind: "commit", 361 Commit: &jetstream.CommitEvent{ 362 Operation: "create", 363 Collection: "social.coves.community.post", 364 RKey: rkey, 365 CID: response.CID, 366 Record: map[string]interface{}{ 367 "$type": "social.coves.community.post", 368 "community": communityDID, 369 "author": aggregatorDID, // Aggregator is the author 370 "title": title, 371 "content": content, 372 "createdAt": time.Now().Format(time.RFC3339), 373 }, 374 }, 375 } 376 377 // STEP 4: Process through Jetstream post consumer 378 err = postConsumer.HandleEvent(ctx, &postEvent) 379 require.NoError(t, err, "Post consumer should index post") 380 381 // STEP 5: Verify post indexed in AppView 382 indexedPost, err := postRepo.GetByURI(ctx, response.URI) 383 require.NoError(t, err, "Post should be indexed in AppView") 384 385 assert.Equal(t, response.URI, indexedPost.URI) 386 assert.Equal(t, response.CID, indexedPost.CID) 387 assert.Equal(t, aggregatorDID, indexedPost.AuthorDID, "Author should be aggregator") 388 assert.Equal(t, communityDID, indexedPost.CommunityDID) 389 assert.Equal(t, title, *indexedPost.Title) 390 assert.Equal(t, content, *indexedPost.Content) 391 392 // STEP 6: Verify aggregator stats updated 393 agg, err := aggregatorRepo.GetAggregator(ctx, aggregatorDID) 394 require.NoError(t, err) 395 assert.Equal(t, 1, agg.PostsCreated, "Trigger should increment posts_created") 396 397 // STEP 7: Verify post tracking for rate limiting 398 since := time.Now().Add(-1 * time.Hour) 399 postCount, err := aggregatorRepo.CountRecentPosts(ctx, aggregatorDID, communityDID, since) 400 require.NoError(t, err) 401 assert.Equal(t, 1, postCount, "Should track 1 post for rate limiting") 402 403 t.Log("✅ Post created, indexed, and stats updated") 404 }) 405 406 // ==================================================================================== 407 // Part 4: Rate Limiting 408 // ==================================================================================== 409 t.Run("4. Rate Limiting - Enforces 10 posts/hour limit", func(t *testing.T) { 410 t.Log("\n⏱️ Part 4: Testing rate limit enforcement...") 411 412 // Create 8 more posts (we already have 1 from Part 3, need 9 total to be under limit) 413 for i := 2; i <= 9; i++ { 414 title := fmt.Sprintf("Post #%d", i) 415 content := fmt.Sprintf("This is post number %d", i) 416 417 reqBody := map[string]interface{}{ 418 "community": communityDID, 419 "title": title, 420 "content": content, 421 } 422 reqJSON, err := json.Marshal(reqBody) 423 require.NoError(t, err) 424 425 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 426 req.Header.Set("Content-Type", "application/json") 427 req.Header.Set("Authorization", "Bearer "+createSimpleTestJWT(aggregatorDID)) 428 429 rr := httptest.NewRecorder() 430 handler := authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 431 handler.ServeHTTP(rr, req) 432 433 require.Equal(t, http.StatusOK, rr.Code, "Post %d should succeed", i) 434 } 435 436 t.Log("✓ Created 9 posts successfully (under 10 limit)") 437 438 // Try to create 10th post - should succeed (at limit) 439 reqBody := map[string]interface{}{ 440 "community": communityDID, 441 "title": "Post #10 - Should Succeed", 442 "content": "This is the 10th post (at limit)", 443 } 444 reqJSON, err := json.Marshal(reqBody) 445 require.NoError(t, err) 446 447 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 448 req.Header.Set("Content-Type", "application/json") 449 req.Header.Set("Authorization", "Bearer "+createSimpleTestJWT(aggregatorDID)) 450 451 rr := httptest.NewRecorder() 452 handler := authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 453 handler.ServeHTTP(rr, req) 454 455 require.Equal(t, http.StatusOK, rr.Code, "10th post should succeed (at limit)") 456 457 t.Log("✓ 10th post succeeded (at limit)") 458 459 // Try to create 11th post - should be rate limited 460 reqBody = map[string]interface{}{ 461 "community": communityDID, 462 "title": "Post #11 - Should Fail", 463 "content": "This should be rate limited", 464 } 465 reqJSON, err = json.Marshal(reqBody) 466 require.NoError(t, err) 467 468 req = httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 469 req.Header.Set("Content-Type", "application/json") 470 req.Header.Set("Authorization", "Bearer "+createSimpleTestJWT(aggregatorDID)) 471 472 rr = httptest.NewRecorder() 473 handler = authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 474 handler.ServeHTTP(rr, req) 475 476 // Should be rate limited 477 assert.Equal(t, http.StatusTooManyRequests, rr.Code, "Should return 429 Too Many Requests") 478 479 var errorResp map[string]interface{} 480 err = json.NewDecoder(rr.Body).Decode(&errorResp) 481 require.NoError(t, err) 482 483 // Error type will be "RateLimitExceeded" (lowercase: "ratelimitexceeded") 484 errorType := strings.ToLower(errorResp["error"].(string)) 485 assert.True(t, 486 strings.Contains(errorType, "ratelimit") || strings.Contains(errorType, "rate limit"), 487 "Error should mention rate limit, got: %s", errorType) 488 489 t.Log("✅ Rate limiting enforced correctly") 490 }) 491 492 // ==================================================================================== 493 // Part 5: Query Endpoints (XRPC Handlers) 494 // ==================================================================================== 495 t.Run("5. Query Endpoints - XRPC handlers return indexed data", func(t *testing.T) { 496 t.Log("\n🔍 Part 5: Testing XRPC query endpoints...") 497 498 // Test 5.1: getServices endpoint 499 t.Run("getServices - Basic view", func(t *testing.T) { 500 req := httptest.NewRequest("GET", fmt.Sprintf("/xrpc/social.coves.aggregator.getServices?dids=%s", aggregatorDID), nil) 501 rr := httptest.NewRecorder() 502 503 getServicesHandler.HandleGetServices(rr, req) 504 505 require.Equal(t, http.StatusOK, rr.Code) 506 507 var response aggregator.GetServicesResponse 508 err := json.NewDecoder(rr.Body).Decode(&response) 509 require.NoError(t, err) 510 511 require.Len(t, response.Views, 1, "Should return 1 aggregator") 512 513 // Views is []interface{}, unmarshal to check fields 514 viewJSON, _ := json.Marshal(response.Views[0]) 515 var view aggregator.AggregatorView 516 _ = json.Unmarshal(viewJSON, &view) 517 518 assert.Equal(t, aggregatorDID, view.DID) 519 assert.Equal(t, "RSS Feed Aggregator", view.DisplayName) 520 assert.NotNil(t, view.Description) 521 assert.Equal(t, "Aggregates content from RSS feeds", *view.Description) 522 // Avatar not uploaded in this test 523 if view.Avatar != nil { 524 t.Logf("Avatar CID: %s", *view.Avatar) 525 } 526 527 t.Log("✓ getServices (basic view) works") 528 }) 529 530 // Test 5.2: getServices endpoint with detailed flag 531 t.Run("getServices - Detailed view with stats", func(t *testing.T) { 532 req := httptest.NewRequest("GET", fmt.Sprintf("/xrpc/social.coves.aggregator.getServices?dids=%s&detailed=true", aggregatorDID), nil) 533 rr := httptest.NewRecorder() 534 535 getServicesHandler.HandleGetServices(rr, req) 536 537 require.Equal(t, http.StatusOK, rr.Code) 538 539 var response aggregator.GetServicesResponse 540 err := json.NewDecoder(rr.Body).Decode(&response) 541 require.NoError(t, err) 542 543 require.Len(t, response.Views, 1) 544 545 viewJSON, _ := json.Marshal(response.Views[0]) 546 var detailedView aggregator.AggregatorViewDetailed 547 _ = json.Unmarshal(viewJSON, &detailedView) 548 549 assert.Equal(t, aggregatorDID, detailedView.DID) 550 assert.Equal(t, 1, detailedView.Stats.CommunitiesUsing) 551 assert.Equal(t, 10, detailedView.Stats.PostsCreated) 552 553 t.Log("✓ getServices (detailed view) includes stats") 554 }) 555 556 // Test 5.3: getAuthorizations endpoint 557 t.Run("getAuthorizations - List communities using aggregator", func(t *testing.T) { 558 req := httptest.NewRequest("GET", fmt.Sprintf("/xrpc/social.coves.aggregator.getAuthorizations?aggregatorDid=%s", aggregatorDID), nil) 559 rr := httptest.NewRecorder() 560 561 getAuthorizationsHandler.HandleGetAuthorizations(rr, req) 562 563 require.Equal(t, http.StatusOK, rr.Code) 564 565 var response map[string]interface{} 566 err := json.NewDecoder(rr.Body).Decode(&response) 567 require.NoError(t, err) 568 569 // Check if authorizations field exists and is not nil 570 authsInterface, ok := response["authorizations"] 571 require.True(t, ok, "Response should have 'authorizations' field") 572 573 // Empty slice is valid (after authorization was disabled in Part 8) 574 if authsInterface != nil { 575 auths := authsInterface.([]interface{}) 576 t.Logf("Found %d authorizations", len(auths)) 577 // Don't assert length - authorization may have been disabled in Part 8 578 if len(auths) > 0 { 579 authMap := auths[0].(map[string]interface{}) 580 // authMap contains nested aggregator object, not flat communityDid 581 t.Logf("First authorization: %+v", authMap) 582 } 583 } 584 585 t.Log("✓ getAuthorizations works") 586 }) 587 588 // Test 5.4: listForCommunity endpoint 589 t.Run("listForCommunity - List aggregators for community", func(t *testing.T) { 590 req := httptest.NewRequest("GET", fmt.Sprintf("/xrpc/social.coves.aggregator.listForCommunity?community=%s", communityDID), nil) 591 rr := httptest.NewRecorder() 592 593 listForCommunityHandler.HandleListForCommunity(rr, req) 594 595 require.Equal(t, http.StatusOK, rr.Code) 596 597 var response map[string]interface{} 598 err := json.NewDecoder(rr.Body).Decode(&response) 599 require.NoError(t, err) 600 601 // Check if aggregators field exists (not 'authorizations') 602 aggsInterface, ok := response["aggregators"] 603 require.True(t, ok, "Response should have 'aggregators' field") 604 605 // Empty slice is valid (after authorization was disabled in Part 8) 606 if aggsInterface != nil { 607 aggs := aggsInterface.([]interface{}) 608 t.Logf("Found %d aggregators", len(aggs)) 609 // Don't assert length - authorization may have been disabled in Part 8 610 if len(aggs) > 0 { 611 aggMap := aggs[0].(map[string]interface{}) 612 assert.Equal(t, aggregatorDID, aggMap["aggregatorDid"]) 613 assert.Equal(t, communityDID, aggMap["communityDid"]) 614 } 615 } 616 617 t.Log("✓ listForCommunity works") 618 }) 619 620 t.Log("✅ All XRPC query endpoints work correctly") 621 }) 622 623 // ==================================================================================== 624 // Part 6: Security - Unauthorized Post Attempt 625 // ==================================================================================== 626 t.Run("6. Security - Rejects post from unauthorized aggregator", func(t *testing.T) { 627 t.Log("\n🔒 Part 6: Testing security - unauthorized aggregator...") 628 629 unauthorizedAggDID := "did:plc:e2eaggunauth999" 630 631 // First, register this aggregator (but DON'T authorize it) 632 unAuthAggEvent := jetstream.JetstreamEvent{ 633 Did: unauthorizedAggDID, 634 Kind: "commit", 635 Commit: &jetstream.CommitEvent{ 636 Operation: "create", 637 Collection: "social.coves.aggregator.service", 638 RKey: "self", 639 CID: "bafy2bzaceunauth", 640 Record: map[string]interface{}{ 641 "$type": "social.coves.aggregator.service", 642 "did": unauthorizedAggDID, 643 "displayName": "Unauthorized Aggregator", 644 "createdAt": time.Now().Format(time.RFC3339), 645 }, 646 }, 647 } 648 err := aggregatorConsumer.HandleEvent(ctx, &unAuthAggEvent) 649 require.NoError(t, err) 650 651 // Try to create post without authorization 652 reqBody := map[string]interface{}{ 653 "community": communityDID, 654 "title": "Unauthorized Post", 655 "content": "This should be rejected", 656 } 657 reqJSON, err := json.Marshal(reqBody) 658 require.NoError(t, err) 659 660 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 661 req.Header.Set("Content-Type", "application/json") 662 req.Header.Set("Authorization", "Bearer "+createSimpleTestJWT(unauthorizedAggDID)) 663 664 rr := httptest.NewRecorder() 665 handler := authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 666 handler.ServeHTTP(rr, req) 667 668 // Should be forbidden 669 assert.Equal(t, http.StatusForbidden, rr.Code, "Should return 403 Forbidden") 670 671 var errorResp map[string]interface{} 672 err = json.NewDecoder(rr.Body).Decode(&errorResp) 673 require.NoError(t, err) 674 675 // Error message format from aggregators.ErrNotAuthorized: "aggregator not authorized for this community" 676 // Or from the compact form "notauthorized" (lowercase, no spaces) 677 errorMsg := strings.ToLower(errorResp["error"].(string)) 678 assert.True(t, 679 strings.Contains(errorMsg, "not authorized") || strings.Contains(errorMsg, "notauthorized"), 680 "Error should mention authorization, got: %s", errorMsg) 681 682 t.Log("✅ Unauthorized post correctly rejected") 683 }) 684 685 // ==================================================================================== 686 // Part 7: Idempotent Indexing 687 // ==================================================================================== 688 t.Run("7. Idempotent Indexing - Duplicate Jetstream events", func(t *testing.T) { 689 t.Log("\n♻️ Part 7: Testing idempotent indexing...") 690 691 duplicateAggDID := "did:plc:e2eaggdup999" 692 693 // Create service declaration event 694 serviceEvent := jetstream.JetstreamEvent{ 695 Did: duplicateAggDID, 696 Kind: "commit", 697 Commit: &jetstream.CommitEvent{ 698 Operation: "create", 699 Collection: "social.coves.aggregator.service", 700 RKey: "self", 701 CID: "bafy2bzacedup123", 702 Record: map[string]interface{}{ 703 "$type": "social.coves.aggregator.service", 704 "did": duplicateAggDID, 705 "displayName": "Duplicate Test Aggregator", 706 "createdAt": time.Now().Format(time.RFC3339), 707 }, 708 }, 709 } 710 711 // Process first time 712 err := aggregatorConsumer.HandleEvent(ctx, &serviceEvent) 713 require.NoError(t, err, "First event should succeed") 714 715 // Process second time (duplicate) 716 err = aggregatorConsumer.HandleEvent(ctx, &serviceEvent) 717 require.NoError(t, err, "Duplicate event should be handled gracefully (upsert)") 718 719 // Verify only one record exists 720 agg, err := aggregatorRepo.GetAggregator(ctx, duplicateAggDID) 721 require.NoError(t, err) 722 assert.Equal(t, duplicateAggDID, agg.DID) 723 724 t.Log("✅ Idempotent indexing works correctly") 725 }) 726 727 // ==================================================================================== 728 // Part 8: Authorization Disable 729 // ==================================================================================== 730 t.Run("8. Authorization Disable - Jetstream update event", func(t *testing.T) { 731 t.Log("\n🚫 Part 8: Testing authorization disable...") 732 733 // Simulate Jetstream event: Community moderator disabled the authorization 734 disableEvent := jetstream.JetstreamEvent{ 735 Did: communityDID, 736 Kind: "commit", 737 Commit: &jetstream.CommitEvent{ 738 Operation: "update", 739 Collection: "social.coves.aggregator.authorization", 740 RKey: authorizationRkey, // Use real rkey from Part 2 741 CID: "bafy2bzacedisabled", 742 Record: map[string]interface{}{ 743 "$type": "social.coves.aggregator.authorization", 744 "aggregatorDid": aggregatorDID, 745 "communityDid": communityDID, 746 "enabled": false, // Now disabled 747 "config": map[string]interface{}{ 748 "feedUrl": "https://example.com/feed.xml", 749 "updateInterval": 15, 750 }, 751 "createdBy": communityDID, 752 "disabledBy": communityDID, 753 "disabledAt": time.Now().Format(time.RFC3339), 754 "createdAt": time.Now().Add(-1 * time.Hour).Format(time.RFC3339), 755 }, 756 }, 757 } 758 759 // Process through consumer 760 err := aggregatorConsumer.HandleEvent(ctx, &disableEvent) 761 require.NoError(t, err) 762 763 // Verify authorization is disabled 764 auth, err := aggregatorRepo.GetAuthorization(ctx, aggregatorDID, communityDID) 765 require.NoError(t, err) 766 assert.False(t, auth.Enabled, "Authorization should be disabled") 767 assert.Equal(t, communityDID, auth.DisabledBy) 768 assert.NotNil(t, auth.DisabledAt) 769 770 // Verify fast check returns false 771 isAuthorized, err := aggregatorRepo.IsAuthorized(ctx, aggregatorDID, communityDID) 772 require.NoError(t, err) 773 assert.False(t, isAuthorized, "IsAuthorized should return false") 774 775 // Try to create post - should be rejected 776 reqBody := map[string]interface{}{ 777 "community": communityDID, 778 "title": "Post After Disable", 779 "content": "This should fail", 780 } 781 reqJSON, err := json.Marshal(reqBody) 782 require.NoError(t, err) 783 784 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 785 req.Header.Set("Content-Type", "application/json") 786 req.Header.Set("Authorization", "Bearer "+createSimpleTestJWT(aggregatorDID)) 787 788 rr := httptest.NewRecorder() 789 handler := authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 790 handler.ServeHTTP(rr, req) 791 792 assert.Equal(t, http.StatusForbidden, rr.Code, "Should reject post from disabled aggregator") 793 794 t.Log("✅ Authorization disable works correctly") 795 }) 796 797 t.Log("\n✅ Full E2E Test Complete - All 8 Parts Passed!") 798 t.Log("Summary:") 799 t.Log(" ✓ Service Declaration indexed via Jetstream") 800 t.Log(" ✓ Authorization indexed and stats updated") 801 t.Log(" ✓ Aggregator can create posts in authorized communities") 802 t.Log(" ✓ Rate limiting enforced (10 posts/hour)") 803 t.Log(" ✓ XRPC query endpoints return correct data") 804 t.Log(" ✓ Security: Unauthorized posts rejected") 805 t.Log(" ✓ Idempotent indexing handles duplicates") 806 t.Log(" ✓ Authorization disable prevents posting") 807} 808 809// TestAggregator_E2E_LivePDS tests the COMPLETE end-to-end flow with a live PDS 810// This would require: 811// - Live PDS running at PDS_URL 812// - Live Jetstream running at JETSTREAM_URL 813// - Ability to provision aggregator accounts on PDS 814// - Real WebSocket connection to Jetstream firehose 815// 816// NOTE: This is a placeholder for future implementation 817// For now, use TestAggregator_E2E_WithJetstream for integration testing 818func TestAggregator_E2E_LivePDS(t *testing.T) { 819 if testing.Short() { 820 t.Skip("Skipping live PDS E2E test in short mode") 821 } 822 823 // Setup test database 824 dbURL := os.Getenv("TEST_DATABASE_URL") 825 if dbURL == "" { 826 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 827 } 828 829 db, err := sql.Open("postgres", dbURL) 830 require.NoError(t, err, "Failed to connect to test database") 831 defer func() { 832 if closeErr := db.Close(); closeErr != nil { 833 t.Logf("Failed to close database: %v", closeErr) 834 } 835 }() 836 837 // Run migrations 838 require.NoError(t, goose.SetDialect("postgres")) 839 require.NoError(t, goose.Up(db, "../../internal/db/migrations")) 840 841 // Check if PDS is running 842 pdsURL := os.Getenv("PDS_URL") 843 if pdsURL == "" { 844 pdsURL = "http://localhost:3001" 845 } 846 847 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 848 if err != nil { 849 t.Skipf("PDS not running at %s: %v", pdsURL, err) 850 } 851 _ = healthResp.Body.Close() 852 853 t.Skip("Live PDS E2E test not yet implemented - use TestAggregator_E2E_WithJetstream") 854 855 // TODO: Implement live PDS E2E test 856 // 1. Provision aggregator account on real PDS 857 // 2. Write service declaration to aggregator's repository 858 // 3. Subscribe to real Jetstream and wait for event 859 // 4. Verify indexing in AppView 860 // 5. Provision community and authorize aggregator 861 // 6. Create real post via XRPC 862 // 7. Wait for Jetstream post event 863 // 8. Verify complete flow 864}