A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/api/handlers/aggregator" 5 "Coves/internal/api/handlers/post" 6 "Coves/internal/api/middleware" 7 "Coves/internal/atproto/identity" 8 "Coves/internal/atproto/jetstream" 9 "Coves/internal/core/aggregators" 10 "Coves/internal/core/communities" 11 "Coves/internal/core/posts" 12 "Coves/internal/core/users" 13 "Coves/internal/db/postgres" 14 "bytes" 15 "context" 16 "database/sql" 17 "encoding/json" 18 "fmt" 19 "net/http" 20 "net/http/httptest" 21 "os" 22 "strings" 23 "testing" 24 "time" 25 26 _ "github.com/lib/pq" 27 "github.com/pressly/goose/v3" 28 "github.com/stretchr/testify/assert" 29 "github.com/stretchr/testify/require" 30) 31 32// TestAggregator_E2E_WithJetstream tests the complete aggregator flow with real PDS: 33// 1. Service Declaration: Create aggregator account → Write service record → Jetstream → AppView DB 34// 2. Authorization: Create community account → Write authorization record → Jetstream → AppView DB 35// 3. Post Creation: Aggregator creates post → Validates authorization + rate limits → PDS → Jetstream → AppView 36// 4. Query Endpoints: Verify XRPC handlers return correct data from AppView 37// 38// This tests the REAL atProto flow: 39// - Real accounts created on PDS 40// - Real records written via XRPC 41// - Simulated Jetstream events (for test speed - testing AppView indexing, not Jetstream itself) 42// - AppView indexes and serves data via XRPC 43// 44// NOTE: Requires PDS running at http://localhost:3001 45func TestAggregator_E2E_WithJetstream(t *testing.T) { 46 // Check if PDS is available 47 pdsURL := "http://localhost:3001" 48 resp, err := http.Get(pdsURL + "/xrpc/_health") 49 if err != nil || resp.StatusCode != http.StatusOK { 50 t.Skipf("PDS not available at %s - run 'make dev-up' to start it", pdsURL) 51 } 52 if resp != nil { 53 _ = resp.Body.Close() 54 } 55 db := setupTestDB(t) 56 defer func() { 57 if err := db.Close(); err != nil { 58 t.Logf("Failed to close database: %v", err) 59 } 60 }() 61 62 // Setup repositories 63 aggregatorRepo := postgres.NewAggregatorRepository(db) 64 communityRepo := postgres.NewCommunityRepository(db) 65 postRepo := postgres.NewPostRepository(db) 66 userRepo := postgres.NewUserRepository(db) 67 68 // Setup services 69 identityConfig := identity.DefaultConfig() 70 identityResolver := identity.NewResolver(db, identityConfig) 71 userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001") 72 communityService := communities.NewCommunityService(communityRepo, "http://localhost:3001", "did:web:test.coves.social", "coves.social", nil) 73 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 74 postService := posts.NewPostService(postRepo, communityService, aggregatorService, nil, nil, "http://localhost:3001") 75 76 // Setup consumers 77 aggregatorConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 78 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 79 80 // Setup HTTP handlers 81 getServicesHandler := aggregator.NewGetServicesHandler(aggregatorService) 82 getAuthorizationsHandler := aggregator.NewGetAuthorizationsHandler(aggregatorService) 83 listForCommunityHandler := aggregator.NewListForCommunityHandler(aggregatorService) 84 createPostHandler := post.NewCreateHandler(postService) 85 authMiddleware := middleware.NewAtProtoAuthMiddleware(nil, true) // Skip JWT verification for testing 86 defer authMiddleware.Stop() // Clean up DPoP replay cache goroutine 87 88 ctx := context.Background() 89 90 // Cleanup test data (aggregators and communities will be created via real PDS in test parts) 91 _, _ = db.Exec("DELETE FROM aggregator_posts WHERE aggregator_did LIKE 'did:plc:%'") 92 _, _ = db.Exec("DELETE FROM aggregator_authorizations WHERE aggregator_did LIKE 'did:plc:%'") 93 _, _ = db.Exec("DELETE FROM aggregators WHERE did LIKE 'did:plc:%'") 94 _, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE 'did:plc:%'") 95 _, _ = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:%'") 96 _, _ = db.Exec("DELETE FROM users WHERE did LIKE 'did:plc:%'") 97 98 // ==================================================================================== 99 // Part 1: Service Declaration via Real PDS 100 // ==================================================================================== 101 // Store DIDs, tokens, and URIs for use across all test parts 102 var aggregatorDID, aggregatorToken, aggregatorHandle, communityDID, communityToken, authorizationRkey string 103 104 t.Run("1. Service Declaration - PDS Account → Write Record → Jetstream → AppView DB", func(t *testing.T) { 105 t.Log("\n📝 Part 1: Create aggregator account and publish service declaration to PDS...") 106 107 // STEP 1: Create aggregator account on real PDS 108 // Use PDS configured domain (.local.coves.dev for users/services) 109 timestamp := time.Now().Unix() // Use Unix seconds instead of nanoseconds for shorter handle 110 aggregatorHandle = fmt.Sprintf("rss-agg-%d.local.coves.dev", timestamp) 111 email := fmt.Sprintf("agg-%d@test.com", timestamp) 112 password := "test-password-123" 113 114 var err error 115 aggregatorToken, aggregatorDID, err = createPDSAccount(pdsURL, aggregatorHandle, email, password) 116 require.NoError(t, err, "Failed to create aggregator account on PDS") 117 require.NotEmpty(t, aggregatorToken, "Should receive access token") 118 require.NotEmpty(t, aggregatorDID, "Should receive DID") 119 120 t.Logf("✓ Created aggregator account: %s (%s)", aggregatorHandle, aggregatorDID) 121 122 // STEP 2: Write service declaration to aggregator's repository on PDS 123 configSchema := map[string]interface{}{ 124 "type": "object", 125 "properties": map[string]interface{}{ 126 "feedUrl": map[string]interface{}{ 127 "type": "string", 128 "description": "RSS feed URL to aggregate", 129 }, 130 "updateInterval": map[string]interface{}{ 131 "type": "number", 132 "minimum": 5, 133 "maximum": 60, 134 "description": "Minutes between feed checks", 135 }, 136 }, 137 "required": []string{"feedUrl"}, 138 } 139 140 serviceRecord := map[string]interface{}{ 141 "$type": "social.coves.aggregator.service", 142 "did": aggregatorDID, 143 "displayName": "RSS Feed Aggregator", 144 "description": "Aggregates content from RSS feeds", 145 "configSchema": configSchema, 146 "maintainer": aggregatorDID, // Aggregator maintains itself 147 "sourceUrl": "https://github.com/example/rss-aggregator", 148 "createdAt": time.Now().Format(time.RFC3339), 149 } 150 151 // Write to at://{aggregatorDID}/social.coves.aggregator.service/self 152 uri, cid, err := writePDSRecord(pdsURL, aggregatorToken, aggregatorDID, "social.coves.aggregator.service", "self", serviceRecord) 153 require.NoError(t, err, "Failed to write service declaration to PDS") 154 require.NotEmpty(t, uri, "Should receive record URI") 155 require.NotEmpty(t, cid, "Should receive record CID") 156 157 t.Logf("✓ Wrote service declaration to PDS: %s (CID: %s)", uri, cid) 158 159 // STEP 3: Simulate Jetstream event (in production, this comes from real Jetstream) 160 // We simulate it here for test speed - we're testing AppView indexing, not Jetstream itself 161 serviceEvent := jetstream.JetstreamEvent{ 162 Did: aggregatorDID, 163 Kind: "commit", 164 Commit: &jetstream.CommitEvent{ 165 Operation: "create", 166 Collection: "social.coves.aggregator.service", 167 RKey: "self", 168 CID: cid, 169 Record: serviceRecord, 170 }, 171 } 172 173 // STEP 4: Process through Jetstream consumer (simulates what happens when Jetstream broadcasts) 174 err = aggregatorConsumer.HandleEvent(ctx, &serviceEvent) 175 require.NoError(t, err, "Consumer should index service declaration") 176 177 // STEP 2: Verify indexed in AppView database 178 indexedAgg, err := aggregatorRepo.GetAggregator(ctx, aggregatorDID) 179 require.NoError(t, err, "Aggregator should be indexed in AppView") 180 181 assert.Equal(t, aggregatorDID, indexedAgg.DID) 182 assert.Equal(t, "RSS Feed Aggregator", indexedAgg.DisplayName) 183 assert.Equal(t, "Aggregates content from RSS feeds", indexedAgg.Description) 184 assert.Empty(t, indexedAgg.AvatarURL, "Avatar not uploaded in this test") 185 assert.Equal(t, aggregatorDID, indexedAgg.MaintainerDID, "Aggregator maintains itself") 186 assert.Equal(t, "https://github.com/example/rss-aggregator", indexedAgg.SourceURL) 187 assert.NotEmpty(t, indexedAgg.ConfigSchema, "Config schema should be stored") 188 assert.Equal(t, fmt.Sprintf("at://%s/social.coves.aggregator.service/self", aggregatorDID), indexedAgg.RecordURI) 189 assert.False(t, indexedAgg.CreatedAt.IsZero(), "CreatedAt should be parsed from record") 190 assert.False(t, indexedAgg.IndexedAt.IsZero(), "IndexedAt should be set") 191 192 // Verify stats initialized to zero 193 assert.Equal(t, 0, indexedAgg.CommunitiesUsing) 194 assert.Equal(t, 0, indexedAgg.PostsCreated) 195 196 // STEP 6: Index aggregator as a user in AppView (required for post authorship) 197 // In production, this would come from Jetstream indexing app.bsky.actor.profile 198 // For this E2E test, we create it directly 199 testUser := createTestUser(t, db, aggregatorHandle, aggregatorDID) 200 require.NotNil(t, testUser, "Should create aggregator user") 201 202 t.Logf("✓ Indexed aggregator as user: %s", aggregatorHandle) 203 t.Log("✅ Service declaration indexed and aggregator registered as user") 204 }) 205 206 // ==================================================================================== 207 // Part 2: Authorization via Real PDS 208 // ==================================================================================== 209 t.Run("2. Authorization - Community Account → PDS → Jetstream → AppView DB", func(t *testing.T) { 210 t.Log("\n🔐 Part 2: Create community account and authorize aggregator...") 211 212 // STEP 1: Create community account on real PDS 213 // Use PDS configured domain (.community.coves.social for communities) 214 // Keep handle short to avoid PDS "handle too long" error 215 timestamp := time.Now().Unix() % 100000 // Last 5 digits 216 communityHandle := fmt.Sprintf("e2e-%d.community.coves.social", timestamp) 217 communityEmail := fmt.Sprintf("comm-%d@test.com", timestamp) 218 communityPassword := "community-test-password-123" 219 220 var err error 221 communityToken, communityDID, err = createPDSAccount(pdsURL, communityHandle, communityEmail, communityPassword) 222 require.NoError(t, err, "Failed to create community account on PDS") 223 require.NotEmpty(t, communityToken, "Should receive community access token") 224 require.NotEmpty(t, communityDID, "Should receive community DID") 225 226 t.Logf("✓ Created community account: %s (%s)", communityHandle, communityDID) 227 228 // STEP 2: Index community in AppView database (required for foreign key) 229 // In production, this would come from Jetstream indexing community.profile records 230 // For this E2E test, we create it directly 231 testCommunity := &communities.Community{ 232 DID: communityDID, 233 Handle: communityHandle, 234 Name: fmt.Sprintf("e2e-%d", timestamp), 235 DisplayName: "E2E Test Community", 236 OwnerDID: communityDID, 237 CreatedByDID: communityDID, 238 HostedByDID: "did:web:test.coves.social", 239 Visibility: "public", 240 ModerationType: "moderator", 241 RecordURI: fmt.Sprintf("at://%s/social.coves.community.profile/self", communityDID), 242 RecordCID: "fakecid123", 243 PDSAccessToken: communityToken, 244 PDSRefreshToken: communityToken, 245 } 246 _, err = communityRepo.Create(ctx, testCommunity) 247 require.NoError(t, err, "Failed to index community in AppView") 248 249 t.Logf("✓ Indexed community in AppView database") 250 251 // STEP 3: Build aggregator config (matches the schema from Part 1) 252 aggregatorConfig := map[string]interface{}{ 253 "feedUrl": "https://example.com/feed.xml", 254 "updateInterval": 15, 255 } 256 257 // STEP 4: Write authorization record to community's repository on PDS 258 // This record grants permission for the aggregator to post to this community 259 authRecord := map[string]interface{}{ 260 "$type": "social.coves.aggregator.authorization", 261 "aggregatorDid": aggregatorDID, 262 "communityDid": communityDID, 263 "enabled": true, 264 "config": aggregatorConfig, 265 "createdBy": communityDID, // Community authorizes itself 266 "createdAt": time.Now().Format(time.RFC3339), 267 } 268 269 // Write to at://{communityDID}/social.coves.aggregator.authorization/{rkey} 270 authURI, authCID, err := writePDSRecord(pdsURL, communityToken, communityDID, "social.coves.aggregator.authorization", "", authRecord) 271 require.NoError(t, err, "Failed to write authorization to PDS") 272 require.NotEmpty(t, authURI, "Should receive authorization URI") 273 require.NotEmpty(t, authCID, "Should receive authorization CID") 274 275 t.Logf("✓ Wrote authorization to PDS: %s (CID: %s)", authURI, authCID) 276 277 // STEP 5: Simulate Jetstream event (in production, this comes from real Jetstream) 278 authorizationRkey = strings.Split(authURI, "/")[4] // Extract rkey from URI and store for later 279 authEvent := jetstream.JetstreamEvent{ 280 Did: communityDID, // Repository owner (community) 281 Kind: "commit", 282 Commit: &jetstream.CommitEvent{ 283 Operation: "create", 284 Collection: "social.coves.aggregator.authorization", 285 RKey: authorizationRkey, 286 CID: authCID, 287 Record: authRecord, 288 }, 289 } 290 291 // STEP 6: Process through Jetstream consumer 292 err = aggregatorConsumer.HandleEvent(ctx, &authEvent) 293 require.NoError(t, err, "Consumer should index authorization") 294 295 // STEP 7: Verify indexed in AppView database 296 indexedAuth, err := aggregatorRepo.GetAuthorization(ctx, aggregatorDID, communityDID) 297 require.NoError(t, err, "Authorization should be indexed in AppView") 298 299 assert.Equal(t, aggregatorDID, indexedAuth.AggregatorDID) 300 assert.Equal(t, communityDID, indexedAuth.CommunityDID) 301 assert.True(t, indexedAuth.Enabled) 302 assert.Equal(t, communityDID, indexedAuth.CreatedBy) 303 assert.NotEmpty(t, indexedAuth.Config, "Config should be stored") 304 assert.False(t, indexedAuth.CreatedAt.IsZero()) 305 306 // STEP 8: Verify aggregator stats updated via trigger 307 agg, err := aggregatorRepo.GetAggregator(ctx, aggregatorDID) 308 require.NoError(t, err) 309 assert.Equal(t, 1, agg.CommunitiesUsing, "Trigger should increment communities_using") 310 311 // STEP 9: Verify fast authorization check 312 isAuthorized, err := aggregatorRepo.IsAuthorized(ctx, aggregatorDID, communityDID) 313 require.NoError(t, err) 314 assert.True(t, isAuthorized, "IsAuthorized should return true") 315 316 t.Log("✅ Community created and authorization indexed successfully") 317 }) 318 319 // ==================================================================================== 320 // Part 3: Post Creation by Aggregator 321 // ==================================================================================== 322 t.Run("3. Post Creation - Aggregator → Validation → PDS → Jetstream → AppView", func(t *testing.T) { 323 t.Log("\n📮 Part 3: Aggregator creates post in authorized community...") 324 325 // STEP 1: Aggregator calls XRPC endpoint to create post 326 title := "Breaking News from RSS Feed" 327 content := "This post was created by an authorized aggregator!" 328 reqBody := map[string]interface{}{ 329 "community": communityDID, 330 "title": title, 331 "content": content, 332 } 333 reqJSON, err := json.Marshal(reqBody) 334 require.NoError(t, err) 335 336 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 337 req.Header.Set("Content-Type", "application/json") 338 339 // Create JWT for aggregator (not a user) 340 aggregatorJWT := createSimpleTestJWT(aggregatorDID) 341 req.Header.Set("Authorization", "DPoP "+aggregatorJWT) 342 343 // Execute request through auth middleware + handler 344 rr := httptest.NewRecorder() 345 handler := authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 346 handler.ServeHTTP(rr, req) 347 348 // STEP 2: Verify post creation succeeded 349 require.Equal(t, http.StatusOK, rr.Code, "Handler should return 200 OK, body: %s", rr.Body.String()) 350 351 var response posts.CreatePostResponse 352 err = json.NewDecoder(rr.Body).Decode(&response) 353 require.NoError(t, err, "Failed to parse response") 354 355 t.Logf("✓ Post created on PDS: URI=%s, CID=%s", response.URI, response.CID) 356 357 // STEP 3: Simulate Jetstream event (post written to PDS → firehose) 358 rkey := strings.Split(response.URI, "/")[4] // Extract rkey from URI 359 postEvent := jetstream.JetstreamEvent{ 360 Did: communityDID, 361 Kind: "commit", 362 Commit: &jetstream.CommitEvent{ 363 Operation: "create", 364 Collection: "social.coves.community.post", 365 RKey: rkey, 366 CID: response.CID, 367 Record: map[string]interface{}{ 368 "$type": "social.coves.community.post", 369 "community": communityDID, 370 "author": aggregatorDID, // Aggregator is the author 371 "title": title, 372 "content": content, 373 "createdAt": time.Now().Format(time.RFC3339), 374 }, 375 }, 376 } 377 378 // STEP 4: Process through Jetstream post consumer 379 err = postConsumer.HandleEvent(ctx, &postEvent) 380 require.NoError(t, err, "Post consumer should index post") 381 382 // STEP 5: Verify post indexed in AppView 383 indexedPost, err := postRepo.GetByURI(ctx, response.URI) 384 require.NoError(t, err, "Post should be indexed in AppView") 385 386 assert.Equal(t, response.URI, indexedPost.URI) 387 assert.Equal(t, response.CID, indexedPost.CID) 388 assert.Equal(t, aggregatorDID, indexedPost.AuthorDID, "Author should be aggregator") 389 assert.Equal(t, communityDID, indexedPost.CommunityDID) 390 assert.Equal(t, title, *indexedPost.Title) 391 assert.Equal(t, content, *indexedPost.Content) 392 393 // STEP 6: Verify aggregator stats updated 394 agg, err := aggregatorRepo.GetAggregator(ctx, aggregatorDID) 395 require.NoError(t, err) 396 assert.Equal(t, 1, agg.PostsCreated, "Trigger should increment posts_created") 397 398 // STEP 7: Verify post tracking for rate limiting 399 since := time.Now().Add(-1 * time.Hour) 400 postCount, err := aggregatorRepo.CountRecentPosts(ctx, aggregatorDID, communityDID, since) 401 require.NoError(t, err) 402 assert.Equal(t, 1, postCount, "Should track 1 post for rate limiting") 403 404 t.Log("✅ Post created, indexed, and stats updated") 405 }) 406 407 // ==================================================================================== 408 // Part 4: Rate Limiting 409 // ==================================================================================== 410 t.Run("4. Rate Limiting - Enforces 10 posts/hour limit", func(t *testing.T) { 411 t.Log("\n⏱️ Part 4: Testing rate limit enforcement...") 412 413 // Create 8 more posts (we already have 1 from Part 3, need 9 total to be under limit) 414 for i := 2; i <= 9; i++ { 415 title := fmt.Sprintf("Post #%d", i) 416 content := fmt.Sprintf("This is post number %d", i) 417 418 reqBody := map[string]interface{}{ 419 "community": communityDID, 420 "title": title, 421 "content": content, 422 } 423 reqJSON, err := json.Marshal(reqBody) 424 require.NoError(t, err) 425 426 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 427 req.Header.Set("Content-Type", "application/json") 428 req.Header.Set("Authorization", "DPoP "+createSimpleTestJWT(aggregatorDID)) 429 430 rr := httptest.NewRecorder() 431 handler := authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 432 handler.ServeHTTP(rr, req) 433 434 require.Equal(t, http.StatusOK, rr.Code, "Post %d should succeed", i) 435 } 436 437 t.Log("✓ Created 9 posts successfully (under 10 limit)") 438 439 // Try to create 10th post - should succeed (at limit) 440 reqBody := map[string]interface{}{ 441 "community": communityDID, 442 "title": "Post #10 - Should Succeed", 443 "content": "This is the 10th post (at limit)", 444 } 445 reqJSON, err := json.Marshal(reqBody) 446 require.NoError(t, err) 447 448 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 449 req.Header.Set("Content-Type", "application/json") 450 req.Header.Set("Authorization", "DPoP "+createSimpleTestJWT(aggregatorDID)) 451 452 rr := httptest.NewRecorder() 453 handler := authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 454 handler.ServeHTTP(rr, req) 455 456 require.Equal(t, http.StatusOK, rr.Code, "10th post should succeed (at limit)") 457 458 t.Log("✓ 10th post succeeded (at limit)") 459 460 // Try to create 11th post - should be rate limited 461 reqBody = map[string]interface{}{ 462 "community": communityDID, 463 "title": "Post #11 - Should Fail", 464 "content": "This should be rate limited", 465 } 466 reqJSON, err = json.Marshal(reqBody) 467 require.NoError(t, err) 468 469 req = httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 470 req.Header.Set("Content-Type", "application/json") 471 req.Header.Set("Authorization", "DPoP "+createSimpleTestJWT(aggregatorDID)) 472 473 rr = httptest.NewRecorder() 474 handler = authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 475 handler.ServeHTTP(rr, req) 476 477 // Should be rate limited 478 assert.Equal(t, http.StatusTooManyRequests, rr.Code, "Should return 429 Too Many Requests") 479 480 var errorResp map[string]interface{} 481 err = json.NewDecoder(rr.Body).Decode(&errorResp) 482 require.NoError(t, err) 483 484 // Error type will be "RateLimitExceeded" (lowercase: "ratelimitexceeded") 485 errorType := strings.ToLower(errorResp["error"].(string)) 486 assert.True(t, 487 strings.Contains(errorType, "ratelimit") || strings.Contains(errorType, "rate limit"), 488 "Error should mention rate limit, got: %s", errorType) 489 490 t.Log("✅ Rate limiting enforced correctly") 491 }) 492 493 // ==================================================================================== 494 // Part 5: Query Endpoints (XRPC Handlers) 495 // ==================================================================================== 496 t.Run("5. Query Endpoints - XRPC handlers return indexed data", func(t *testing.T) { 497 t.Log("\n🔍 Part 5: Testing XRPC query endpoints...") 498 499 // Test 5.1: getServices endpoint 500 t.Run("getServices - Basic view", func(t *testing.T) { 501 req := httptest.NewRequest("GET", fmt.Sprintf("/xrpc/social.coves.aggregator.getServices?dids=%s", aggregatorDID), nil) 502 rr := httptest.NewRecorder() 503 504 getServicesHandler.HandleGetServices(rr, req) 505 506 require.Equal(t, http.StatusOK, rr.Code) 507 508 var response aggregator.GetServicesResponse 509 err := json.NewDecoder(rr.Body).Decode(&response) 510 require.NoError(t, err) 511 512 require.Len(t, response.Views, 1, "Should return 1 aggregator") 513 514 // Views is []interface{}, unmarshal to check fields 515 viewJSON, _ := json.Marshal(response.Views[0]) 516 var view aggregator.AggregatorView 517 _ = json.Unmarshal(viewJSON, &view) 518 519 assert.Equal(t, aggregatorDID, view.DID) 520 assert.Equal(t, "RSS Feed Aggregator", view.DisplayName) 521 assert.NotNil(t, view.Description) 522 assert.Equal(t, "Aggregates content from RSS feeds", *view.Description) 523 // Avatar not uploaded in this test 524 if view.Avatar != nil { 525 t.Logf("Avatar CID: %s", *view.Avatar) 526 } 527 528 t.Log("✓ getServices (basic view) works") 529 }) 530 531 // Test 5.2: getServices endpoint with detailed flag 532 t.Run("getServices - Detailed view with stats", func(t *testing.T) { 533 req := httptest.NewRequest("GET", fmt.Sprintf("/xrpc/social.coves.aggregator.getServices?dids=%s&detailed=true", aggregatorDID), nil) 534 rr := httptest.NewRecorder() 535 536 getServicesHandler.HandleGetServices(rr, req) 537 538 require.Equal(t, http.StatusOK, rr.Code) 539 540 var response aggregator.GetServicesResponse 541 err := json.NewDecoder(rr.Body).Decode(&response) 542 require.NoError(t, err) 543 544 require.Len(t, response.Views, 1) 545 546 viewJSON, _ := json.Marshal(response.Views[0]) 547 var detailedView aggregator.AggregatorViewDetailed 548 _ = json.Unmarshal(viewJSON, &detailedView) 549 550 assert.Equal(t, aggregatorDID, detailedView.DID) 551 assert.Equal(t, 1, detailedView.Stats.CommunitiesUsing) 552 assert.Equal(t, 10, detailedView.Stats.PostsCreated) 553 554 t.Log("✓ getServices (detailed view) includes stats") 555 }) 556 557 // Test 5.3: getAuthorizations endpoint 558 t.Run("getAuthorizations - List communities using aggregator", func(t *testing.T) { 559 req := httptest.NewRequest("GET", fmt.Sprintf("/xrpc/social.coves.aggregator.getAuthorizations?aggregatorDid=%s", aggregatorDID), nil) 560 rr := httptest.NewRecorder() 561 562 getAuthorizationsHandler.HandleGetAuthorizations(rr, req) 563 564 require.Equal(t, http.StatusOK, rr.Code) 565 566 var response map[string]interface{} 567 err := json.NewDecoder(rr.Body).Decode(&response) 568 require.NoError(t, err) 569 570 // Check if authorizations field exists and is not nil 571 authsInterface, ok := response["authorizations"] 572 require.True(t, ok, "Response should have 'authorizations' field") 573 574 // Empty slice is valid (after authorization was disabled in Part 8) 575 if authsInterface != nil { 576 auths := authsInterface.([]interface{}) 577 t.Logf("Found %d authorizations", len(auths)) 578 // Don't assert length - authorization may have been disabled in Part 8 579 if len(auths) > 0 { 580 authMap := auths[0].(map[string]interface{}) 581 // authMap contains nested aggregator object, not flat communityDid 582 t.Logf("First authorization: %+v", authMap) 583 } 584 } 585 586 t.Log("✓ getAuthorizations works") 587 }) 588 589 // Test 5.4: listForCommunity endpoint 590 t.Run("listForCommunity - List aggregators for community", func(t *testing.T) { 591 req := httptest.NewRequest("GET", fmt.Sprintf("/xrpc/social.coves.aggregator.listForCommunity?community=%s", communityDID), nil) 592 rr := httptest.NewRecorder() 593 594 listForCommunityHandler.HandleListForCommunity(rr, req) 595 596 require.Equal(t, http.StatusOK, rr.Code) 597 598 var response map[string]interface{} 599 err := json.NewDecoder(rr.Body).Decode(&response) 600 require.NoError(t, err) 601 602 // Check if aggregators field exists (not 'authorizations') 603 aggsInterface, ok := response["aggregators"] 604 require.True(t, ok, "Response should have 'aggregators' field") 605 606 // Empty slice is valid (after authorization was disabled in Part 8) 607 if aggsInterface != nil { 608 aggs := aggsInterface.([]interface{}) 609 t.Logf("Found %d aggregators", len(aggs)) 610 // Don't assert length - authorization may have been disabled in Part 8 611 if len(aggs) > 0 { 612 aggMap := aggs[0].(map[string]interface{}) 613 assert.Equal(t, aggregatorDID, aggMap["aggregatorDid"]) 614 assert.Equal(t, communityDID, aggMap["communityDid"]) 615 } 616 } 617 618 t.Log("✓ listForCommunity works") 619 }) 620 621 t.Log("✅ All XRPC query endpoints work correctly") 622 }) 623 624 // ==================================================================================== 625 // Part 6: Security - Unauthorized Post Attempt 626 // ==================================================================================== 627 t.Run("6. Security - Rejects post from unauthorized aggregator", func(t *testing.T) { 628 t.Log("\n🔒 Part 6: Testing security - unauthorized aggregator...") 629 630 unauthorizedAggDID := "did:plc:e2eaggunauth999" 631 632 // First, register this aggregator (but DON'T authorize it) 633 unAuthAggEvent := jetstream.JetstreamEvent{ 634 Did: unauthorizedAggDID, 635 Kind: "commit", 636 Commit: &jetstream.CommitEvent{ 637 Operation: "create", 638 Collection: "social.coves.aggregator.service", 639 RKey: "self", 640 CID: "bafy2bzaceunauth", 641 Record: map[string]interface{}{ 642 "$type": "social.coves.aggregator.service", 643 "did": unauthorizedAggDID, 644 "displayName": "Unauthorized Aggregator", 645 "createdAt": time.Now().Format(time.RFC3339), 646 }, 647 }, 648 } 649 err := aggregatorConsumer.HandleEvent(ctx, &unAuthAggEvent) 650 require.NoError(t, err) 651 652 // Try to create post without authorization 653 reqBody := map[string]interface{}{ 654 "community": communityDID, 655 "title": "Unauthorized Post", 656 "content": "This should be rejected", 657 } 658 reqJSON, err := json.Marshal(reqBody) 659 require.NoError(t, err) 660 661 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 662 req.Header.Set("Content-Type", "application/json") 663 req.Header.Set("Authorization", "DPoP "+createSimpleTestJWT(unauthorizedAggDID)) 664 665 rr := httptest.NewRecorder() 666 handler := authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 667 handler.ServeHTTP(rr, req) 668 669 // Should be forbidden 670 assert.Equal(t, http.StatusForbidden, rr.Code, "Should return 403 Forbidden") 671 672 var errorResp map[string]interface{} 673 err = json.NewDecoder(rr.Body).Decode(&errorResp) 674 require.NoError(t, err) 675 676 // Error message format from aggregators.ErrNotAuthorized: "aggregator not authorized for this community" 677 // Or from the compact form "notauthorized" (lowercase, no spaces) 678 errorMsg := strings.ToLower(errorResp["error"].(string)) 679 assert.True(t, 680 strings.Contains(errorMsg, "not authorized") || strings.Contains(errorMsg, "notauthorized"), 681 "Error should mention authorization, got: %s", errorMsg) 682 683 t.Log("✅ Unauthorized post correctly rejected") 684 }) 685 686 // ==================================================================================== 687 // Part 7: Idempotent Indexing 688 // ==================================================================================== 689 t.Run("7. Idempotent Indexing - Duplicate Jetstream events", func(t *testing.T) { 690 t.Log("\n♻️ Part 7: Testing idempotent indexing...") 691 692 duplicateAggDID := "did:plc:e2eaggdup999" 693 694 // Create service declaration event 695 serviceEvent := jetstream.JetstreamEvent{ 696 Did: duplicateAggDID, 697 Kind: "commit", 698 Commit: &jetstream.CommitEvent{ 699 Operation: "create", 700 Collection: "social.coves.aggregator.service", 701 RKey: "self", 702 CID: "bafy2bzacedup123", 703 Record: map[string]interface{}{ 704 "$type": "social.coves.aggregator.service", 705 "did": duplicateAggDID, 706 "displayName": "Duplicate Test Aggregator", 707 "createdAt": time.Now().Format(time.RFC3339), 708 }, 709 }, 710 } 711 712 // Process first time 713 err := aggregatorConsumer.HandleEvent(ctx, &serviceEvent) 714 require.NoError(t, err, "First event should succeed") 715 716 // Process second time (duplicate) 717 err = aggregatorConsumer.HandleEvent(ctx, &serviceEvent) 718 require.NoError(t, err, "Duplicate event should be handled gracefully (upsert)") 719 720 // Verify only one record exists 721 agg, err := aggregatorRepo.GetAggregator(ctx, duplicateAggDID) 722 require.NoError(t, err) 723 assert.Equal(t, duplicateAggDID, agg.DID) 724 725 t.Log("✅ Idempotent indexing works correctly") 726 }) 727 728 // ==================================================================================== 729 // Part 8: Authorization Disable 730 // ==================================================================================== 731 t.Run("8. Authorization Disable - Jetstream update event", func(t *testing.T) { 732 t.Log("\n🚫 Part 8: Testing authorization disable...") 733 734 // Simulate Jetstream event: Community moderator disabled the authorization 735 disableEvent := jetstream.JetstreamEvent{ 736 Did: communityDID, 737 Kind: "commit", 738 Commit: &jetstream.CommitEvent{ 739 Operation: "update", 740 Collection: "social.coves.aggregator.authorization", 741 RKey: authorizationRkey, // Use real rkey from Part 2 742 CID: "bafy2bzacedisabled", 743 Record: map[string]interface{}{ 744 "$type": "social.coves.aggregator.authorization", 745 "aggregatorDid": aggregatorDID, 746 "communityDid": communityDID, 747 "enabled": false, // Now disabled 748 "config": map[string]interface{}{ 749 "feedUrl": "https://example.com/feed.xml", 750 "updateInterval": 15, 751 }, 752 "createdBy": communityDID, 753 "disabledBy": communityDID, 754 "disabledAt": time.Now().Format(time.RFC3339), 755 "createdAt": time.Now().Add(-1 * time.Hour).Format(time.RFC3339), 756 }, 757 }, 758 } 759 760 // Process through consumer 761 err := aggregatorConsumer.HandleEvent(ctx, &disableEvent) 762 require.NoError(t, err) 763 764 // Verify authorization is disabled 765 auth, err := aggregatorRepo.GetAuthorization(ctx, aggregatorDID, communityDID) 766 require.NoError(t, err) 767 assert.False(t, auth.Enabled, "Authorization should be disabled") 768 assert.Equal(t, communityDID, auth.DisabledBy) 769 assert.NotNil(t, auth.DisabledAt) 770 771 // Verify fast check returns false 772 isAuthorized, err := aggregatorRepo.IsAuthorized(ctx, aggregatorDID, communityDID) 773 require.NoError(t, err) 774 assert.False(t, isAuthorized, "IsAuthorized should return false") 775 776 // Try to create post - should be rejected 777 reqBody := map[string]interface{}{ 778 "community": communityDID, 779 "title": "Post After Disable", 780 "content": "This should fail", 781 } 782 reqJSON, err := json.Marshal(reqBody) 783 require.NoError(t, err) 784 785 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON)) 786 req.Header.Set("Content-Type", "application/json") 787 req.Header.Set("Authorization", "DPoP "+createSimpleTestJWT(aggregatorDID)) 788 789 rr := httptest.NewRecorder() 790 handler := authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate)) 791 handler.ServeHTTP(rr, req) 792 793 assert.Equal(t, http.StatusForbidden, rr.Code, "Should reject post from disabled aggregator") 794 795 t.Log("✅ Authorization disable works correctly") 796 }) 797 798 t.Log("\n✅ Full E2E Test Complete - All 8 Parts Passed!") 799 t.Log("Summary:") 800 t.Log(" ✓ Service Declaration indexed via Jetstream") 801 t.Log(" ✓ Authorization indexed and stats updated") 802 t.Log(" ✓ Aggregator can create posts in authorized communities") 803 t.Log(" ✓ Rate limiting enforced (10 posts/hour)") 804 t.Log(" ✓ XRPC query endpoints return correct data") 805 t.Log(" ✓ Security: Unauthorized posts rejected") 806 t.Log(" ✓ Idempotent indexing handles duplicates") 807 t.Log(" ✓ Authorization disable prevents posting") 808} 809 810// TestAggregator_E2E_LivePDS tests the COMPLETE end-to-end flow with a live PDS 811// This would require: 812// - Live PDS running at PDS_URL 813// - Live Jetstream running at JETSTREAM_URL 814// - Ability to provision aggregator accounts on PDS 815// - Real WebSocket connection to Jetstream firehose 816// 817// NOTE: This is a placeholder for future implementation 818// For now, use TestAggregator_E2E_WithJetstream for integration testing 819func TestAggregator_E2E_LivePDS(t *testing.T) { 820 if testing.Short() { 821 t.Skip("Skipping live PDS E2E test in short mode") 822 } 823 824 // Setup test database 825 dbURL := os.Getenv("TEST_DATABASE_URL") 826 if dbURL == "" { 827 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 828 } 829 830 db, err := sql.Open("postgres", dbURL) 831 require.NoError(t, err, "Failed to connect to test database") 832 defer func() { 833 if closeErr := db.Close(); closeErr != nil { 834 t.Logf("Failed to close database: %v", closeErr) 835 } 836 }() 837 838 // Run migrations 839 require.NoError(t, goose.SetDialect("postgres")) 840 require.NoError(t, goose.Up(db, "../../internal/db/migrations")) 841 842 // Check if PDS is running 843 pdsURL := os.Getenv("PDS_URL") 844 if pdsURL == "" { 845 pdsURL = "http://localhost:3001" 846 } 847 848 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 849 if err != nil { 850 t.Skipf("PDS not running at %s: %v", pdsURL, err) 851 } 852 _ = healthResp.Body.Close() 853 854 t.Skip("Live PDS E2E test not yet implemented - use TestAggregator_E2E_WithJetstream") 855 856 // TODO: Implement live PDS E2E test 857 // 1. Provision aggregator account on real PDS 858 // 2. Write service declaration to aggregator's repository 859 // 3. Subscribe to real Jetstream and wait for event 860 // 4. Verify indexing in AppView 861 // 5. Provision community and authorize aggregator 862 // 6. Create real post via XRPC 863 // 7. Wait for Jetstream post event 864 // 8. Verify complete flow 865}