A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/api/handlers/aggregator"
5 "Coves/internal/api/handlers/post"
6 "Coves/internal/api/middleware"
7 "Coves/internal/atproto/identity"
8 "Coves/internal/atproto/jetstream"
9 "Coves/internal/core/aggregators"
10 "Coves/internal/core/communities"
11 "Coves/internal/core/posts"
12 "Coves/internal/core/users"
13 "Coves/internal/db/postgres"
14 "bytes"
15 "context"
16 "database/sql"
17 "encoding/json"
18 "fmt"
19 "net/http"
20 "net/http/httptest"
21 "os"
22 "strings"
23 "testing"
24 "time"
25
26 _ "github.com/lib/pq"
27 "github.com/pressly/goose/v3"
28 "github.com/stretchr/testify/assert"
29 "github.com/stretchr/testify/require"
30)
31
32// TestAggregator_E2E_WithJetstream tests the complete aggregator flow with real PDS:
33// 1. Service Declaration: Create aggregator account → Write service record → Jetstream → AppView DB
34// 2. Authorization: Create community account → Write authorization record → Jetstream → AppView DB
35// 3. Post Creation: Aggregator creates post → Validates authorization + rate limits → PDS → Jetstream → AppView
36// 4. Query Endpoints: Verify XRPC handlers return correct data from AppView
37//
38// This tests the REAL atProto flow:
39// - Real accounts created on PDS
40// - Real records written via XRPC
41// - Simulated Jetstream events (for test speed - testing AppView indexing, not Jetstream itself)
42// - AppView indexes and serves data via XRPC
43//
44// NOTE: Requires PDS running at http://localhost:3001
45func TestAggregator_E2E_WithJetstream(t *testing.T) {
46 // Check if PDS is available
47 pdsURL := "http://localhost:3001"
48 resp, err := http.Get(pdsURL + "/xrpc/_health")
49 if err != nil || resp.StatusCode != http.StatusOK {
50 t.Skipf("PDS not available at %s - run 'make dev-up' to start it", pdsURL)
51 }
52 if resp != nil {
53 _ = resp.Body.Close()
54 }
55 db := setupTestDB(t)
56 defer func() {
57 if err := db.Close(); err != nil {
58 t.Logf("Failed to close database: %v", err)
59 }
60 }()
61
62 // Setup repositories
63 aggregatorRepo := postgres.NewAggregatorRepository(db)
64 communityRepo := postgres.NewCommunityRepository(db)
65 postRepo := postgres.NewPostRepository(db)
66 userRepo := postgres.NewUserRepository(db)
67
68 // Setup services
69 identityConfig := identity.DefaultConfig()
70 identityResolver := identity.NewResolver(db, identityConfig)
71 userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001")
72 communityService := communities.NewCommunityService(communityRepo, "http://localhost:3001", "did:web:test.coves.social", "coves.social", nil)
73 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService)
74 postService := posts.NewPostService(postRepo, communityService, aggregatorService, "http://localhost:3001")
75
76 // Setup consumers
77 aggregatorConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo)
78 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
79
80 // Setup HTTP handlers
81 getServicesHandler := aggregator.NewGetServicesHandler(aggregatorService)
82 getAuthorizationsHandler := aggregator.NewGetAuthorizationsHandler(aggregatorService)
83 listForCommunityHandler := aggregator.NewListForCommunityHandler(aggregatorService)
84 createPostHandler := post.NewCreateHandler(postService)
85 authMiddleware := middleware.NewAtProtoAuthMiddleware(nil, true) // Skip JWT verification for testing
86
87 ctx := context.Background()
88
89 // Cleanup test data (aggregators and communities will be created via real PDS in test parts)
90 _, _ = db.Exec("DELETE FROM aggregator_posts WHERE aggregator_did LIKE 'did:plc:%'")
91 _, _ = db.Exec("DELETE FROM aggregator_authorizations WHERE aggregator_did LIKE 'did:plc:%'")
92 _, _ = db.Exec("DELETE FROM aggregators WHERE did LIKE 'did:plc:%'")
93 _, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE 'did:plc:%'")
94 _, _ = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:%'")
95 _, _ = db.Exec("DELETE FROM users WHERE did LIKE 'did:plc:%'")
96
97 // ====================================================================================
98 // Part 1: Service Declaration via Real PDS
99 // ====================================================================================
100 // Store DIDs, tokens, and URIs for use across all test parts
101 var aggregatorDID, aggregatorToken, aggregatorHandle, communityDID, communityToken, authorizationRkey string
102
103 t.Run("1. Service Declaration - PDS Account → Write Record → Jetstream → AppView DB", func(t *testing.T) {
104 t.Log("\n📝 Part 1: Create aggregator account and publish service declaration to PDS...")
105
106 // STEP 1: Create aggregator account on real PDS
107 // Use PDS configured domain (.local.coves.dev for users/services)
108 timestamp := time.Now().Unix() // Use Unix seconds instead of nanoseconds for shorter handle
109 aggregatorHandle = fmt.Sprintf("rss-agg-%d.local.coves.dev", timestamp)
110 email := fmt.Sprintf("agg-%d@test.com", timestamp)
111 password := "test-password-123"
112
113 var err error
114 aggregatorToken, aggregatorDID, err = createPDSAccount(pdsURL, aggregatorHandle, email, password)
115 require.NoError(t, err, "Failed to create aggregator account on PDS")
116 require.NotEmpty(t, aggregatorToken, "Should receive access token")
117 require.NotEmpty(t, aggregatorDID, "Should receive DID")
118
119 t.Logf("✓ Created aggregator account: %s (%s)", aggregatorHandle, aggregatorDID)
120
121 // STEP 2: Write service declaration to aggregator's repository on PDS
122 configSchema := map[string]interface{}{
123 "type": "object",
124 "properties": map[string]interface{}{
125 "feedUrl": map[string]interface{}{
126 "type": "string",
127 "description": "RSS feed URL to aggregate",
128 },
129 "updateInterval": map[string]interface{}{
130 "type": "number",
131 "minimum": 5,
132 "maximum": 60,
133 "description": "Minutes between feed checks",
134 },
135 },
136 "required": []string{"feedUrl"},
137 }
138
139 serviceRecord := map[string]interface{}{
140 "$type": "social.coves.aggregator.service",
141 "did": aggregatorDID,
142 "displayName": "RSS Feed Aggregator",
143 "description": "Aggregates content from RSS feeds",
144 "configSchema": configSchema,
145 "maintainer": aggregatorDID, // Aggregator maintains itself
146 "sourceUrl": "https://github.com/example/rss-aggregator",
147 "createdAt": time.Now().Format(time.RFC3339),
148 }
149
150 // Write to at://{aggregatorDID}/social.coves.aggregator.service/self
151 uri, cid, err := writePDSRecord(pdsURL, aggregatorToken, aggregatorDID, "social.coves.aggregator.service", "self", serviceRecord)
152 require.NoError(t, err, "Failed to write service declaration to PDS")
153 require.NotEmpty(t, uri, "Should receive record URI")
154 require.NotEmpty(t, cid, "Should receive record CID")
155
156 t.Logf("✓ Wrote service declaration to PDS: %s (CID: %s)", uri, cid)
157
158 // STEP 3: Simulate Jetstream event (in production, this comes from real Jetstream)
159 // We simulate it here for test speed - we're testing AppView indexing, not Jetstream itself
160 serviceEvent := jetstream.JetstreamEvent{
161 Did: aggregatorDID,
162 Kind: "commit",
163 Commit: &jetstream.CommitEvent{
164 Operation: "create",
165 Collection: "social.coves.aggregator.service",
166 RKey: "self",
167 CID: cid,
168 Record: serviceRecord,
169 },
170 }
171
172 // STEP 4: Process through Jetstream consumer (simulates what happens when Jetstream broadcasts)
173 err = aggregatorConsumer.HandleEvent(ctx, &serviceEvent)
174 require.NoError(t, err, "Consumer should index service declaration")
175
176 // STEP 2: Verify indexed in AppView database
177 indexedAgg, err := aggregatorRepo.GetAggregator(ctx, aggregatorDID)
178 require.NoError(t, err, "Aggregator should be indexed in AppView")
179
180 assert.Equal(t, aggregatorDID, indexedAgg.DID)
181 assert.Equal(t, "RSS Feed Aggregator", indexedAgg.DisplayName)
182 assert.Equal(t, "Aggregates content from RSS feeds", indexedAgg.Description)
183 assert.Empty(t, indexedAgg.AvatarURL, "Avatar not uploaded in this test")
184 assert.Equal(t, aggregatorDID, indexedAgg.MaintainerDID, "Aggregator maintains itself")
185 assert.Equal(t, "https://github.com/example/rss-aggregator", indexedAgg.SourceURL)
186 assert.NotEmpty(t, indexedAgg.ConfigSchema, "Config schema should be stored")
187 assert.Equal(t, fmt.Sprintf("at://%s/social.coves.aggregator.service/self", aggregatorDID), indexedAgg.RecordURI)
188 assert.False(t, indexedAgg.CreatedAt.IsZero(), "CreatedAt should be parsed from record")
189 assert.False(t, indexedAgg.IndexedAt.IsZero(), "IndexedAt should be set")
190
191 // Verify stats initialized to zero
192 assert.Equal(t, 0, indexedAgg.CommunitiesUsing)
193 assert.Equal(t, 0, indexedAgg.PostsCreated)
194
195 // STEP 6: Index aggregator as a user in AppView (required for post authorship)
196 // In production, this would come from Jetstream indexing app.bsky.actor.profile
197 // For this E2E test, we create it directly
198 testUser := createTestUser(t, db, aggregatorHandle, aggregatorDID)
199 require.NotNil(t, testUser, "Should create aggregator user")
200
201 t.Logf("✓ Indexed aggregator as user: %s", aggregatorHandle)
202 t.Log("✅ Service declaration indexed and aggregator registered as user")
203 })
204
205 // ====================================================================================
206 // Part 2: Authorization via Real PDS
207 // ====================================================================================
208 t.Run("2. Authorization - Community Account → PDS → Jetstream → AppView DB", func(t *testing.T) {
209 t.Log("\n🔐 Part 2: Create community account and authorize aggregator...")
210
211 // STEP 1: Create community account on real PDS
212 // Use PDS configured domain (.community.coves.social for communities)
213 // Keep handle short to avoid PDS "handle too long" error
214 timestamp := time.Now().Unix() % 100000 // Last 5 digits
215 communityHandle := fmt.Sprintf("e2e-%d.community.coves.social", timestamp)
216 communityEmail := fmt.Sprintf("comm-%d@test.com", timestamp)
217 communityPassword := "community-test-password-123"
218
219 var err error
220 communityToken, communityDID, err = createPDSAccount(pdsURL, communityHandle, communityEmail, communityPassword)
221 require.NoError(t, err, "Failed to create community account on PDS")
222 require.NotEmpty(t, communityToken, "Should receive community access token")
223 require.NotEmpty(t, communityDID, "Should receive community DID")
224
225 t.Logf("✓ Created community account: %s (%s)", communityHandle, communityDID)
226
227 // STEP 2: Index community in AppView database (required for foreign key)
228 // In production, this would come from Jetstream indexing community.profile records
229 // For this E2E test, we create it directly
230 testCommunity := &communities.Community{
231 DID: communityDID,
232 Handle: communityHandle,
233 Name: fmt.Sprintf("e2e-%d", timestamp),
234 DisplayName: "E2E Test Community",
235 OwnerDID: communityDID,
236 CreatedByDID: communityDID,
237 HostedByDID: "did:web:test.coves.social",
238 Visibility: "public",
239 ModerationType: "moderator",
240 RecordURI: fmt.Sprintf("at://%s/social.coves.community.profile/self", communityDID),
241 RecordCID: "fakecid123",
242 PDSAccessToken: communityToken,
243 PDSRefreshToken: communityToken,
244 }
245 _, err = communityRepo.Create(ctx, testCommunity)
246 require.NoError(t, err, "Failed to index community in AppView")
247
248 t.Logf("✓ Indexed community in AppView database")
249
250 // STEP 3: Build aggregator config (matches the schema from Part 1)
251 aggregatorConfig := map[string]interface{}{
252 "feedUrl": "https://example.com/feed.xml",
253 "updateInterval": 15,
254 }
255
256 // STEP 4: Write authorization record to community's repository on PDS
257 // This record grants permission for the aggregator to post to this community
258 authRecord := map[string]interface{}{
259 "$type": "social.coves.aggregator.authorization",
260 "aggregatorDid": aggregatorDID,
261 "communityDid": communityDID,
262 "enabled": true,
263 "config": aggregatorConfig,
264 "createdBy": communityDID, // Community authorizes itself
265 "createdAt": time.Now().Format(time.RFC3339),
266 }
267
268 // Write to at://{communityDID}/social.coves.aggregator.authorization/{rkey}
269 authURI, authCID, err := writePDSRecord(pdsURL, communityToken, communityDID, "social.coves.aggregator.authorization", "", authRecord)
270 require.NoError(t, err, "Failed to write authorization to PDS")
271 require.NotEmpty(t, authURI, "Should receive authorization URI")
272 require.NotEmpty(t, authCID, "Should receive authorization CID")
273
274 t.Logf("✓ Wrote authorization to PDS: %s (CID: %s)", authURI, authCID)
275
276 // STEP 5: Simulate Jetstream event (in production, this comes from real Jetstream)
277 authorizationRkey = strings.Split(authURI, "/")[4] // Extract rkey from URI and store for later
278 authEvent := jetstream.JetstreamEvent{
279 Did: communityDID, // Repository owner (community)
280 Kind: "commit",
281 Commit: &jetstream.CommitEvent{
282 Operation: "create",
283 Collection: "social.coves.aggregator.authorization",
284 RKey: authorizationRkey,
285 CID: authCID,
286 Record: authRecord,
287 },
288 }
289
290 // STEP 6: Process through Jetstream consumer
291 err = aggregatorConsumer.HandleEvent(ctx, &authEvent)
292 require.NoError(t, err, "Consumer should index authorization")
293
294 // STEP 7: Verify indexed in AppView database
295 indexedAuth, err := aggregatorRepo.GetAuthorization(ctx, aggregatorDID, communityDID)
296 require.NoError(t, err, "Authorization should be indexed in AppView")
297
298 assert.Equal(t, aggregatorDID, indexedAuth.AggregatorDID)
299 assert.Equal(t, communityDID, indexedAuth.CommunityDID)
300 assert.True(t, indexedAuth.Enabled)
301 assert.Equal(t, communityDID, indexedAuth.CreatedBy)
302 assert.NotEmpty(t, indexedAuth.Config, "Config should be stored")
303 assert.False(t, indexedAuth.CreatedAt.IsZero())
304
305 // STEP 8: Verify aggregator stats updated via trigger
306 agg, err := aggregatorRepo.GetAggregator(ctx, aggregatorDID)
307 require.NoError(t, err)
308 assert.Equal(t, 1, agg.CommunitiesUsing, "Trigger should increment communities_using")
309
310 // STEP 9: Verify fast authorization check
311 isAuthorized, err := aggregatorRepo.IsAuthorized(ctx, aggregatorDID, communityDID)
312 require.NoError(t, err)
313 assert.True(t, isAuthorized, "IsAuthorized should return true")
314
315 t.Log("✅ Community created and authorization indexed successfully")
316 })
317
318 // ====================================================================================
319 // Part 3: Post Creation by Aggregator
320 // ====================================================================================
321 t.Run("3. Post Creation - Aggregator → Validation → PDS → Jetstream → AppView", func(t *testing.T) {
322 t.Log("\n📮 Part 3: Aggregator creates post in authorized community...")
323
324 // STEP 1: Aggregator calls XRPC endpoint to create post
325 title := "Breaking News from RSS Feed"
326 content := "This post was created by an authorized aggregator!"
327 reqBody := map[string]interface{}{
328 "community": communityDID,
329 "title": title,
330 "content": content,
331 }
332 reqJSON, err := json.Marshal(reqBody)
333 require.NoError(t, err)
334
335 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON))
336 req.Header.Set("Content-Type", "application/json")
337
338 // Create JWT for aggregator (not a user)
339 aggregatorJWT := createSimpleTestJWT(aggregatorDID)
340 req.Header.Set("Authorization", "Bearer "+aggregatorJWT)
341
342 // Execute request through auth middleware + handler
343 rr := httptest.NewRecorder()
344 handler := authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate))
345 handler.ServeHTTP(rr, req)
346
347 // STEP 2: Verify post creation succeeded
348 require.Equal(t, http.StatusOK, rr.Code, "Handler should return 200 OK, body: %s", rr.Body.String())
349
350 var response posts.CreatePostResponse
351 err = json.NewDecoder(rr.Body).Decode(&response)
352 require.NoError(t, err, "Failed to parse response")
353
354 t.Logf("✓ Post created on PDS: URI=%s, CID=%s", response.URI, response.CID)
355
356 // STEP 3: Simulate Jetstream event (post written to PDS → firehose)
357 rkey := strings.Split(response.URI, "/")[4] // Extract rkey from URI
358 postEvent := jetstream.JetstreamEvent{
359 Did: communityDID,
360 Kind: "commit",
361 Commit: &jetstream.CommitEvent{
362 Operation: "create",
363 Collection: "social.coves.community.post",
364 RKey: rkey,
365 CID: response.CID,
366 Record: map[string]interface{}{
367 "$type": "social.coves.community.post",
368 "community": communityDID,
369 "author": aggregatorDID, // Aggregator is the author
370 "title": title,
371 "content": content,
372 "createdAt": time.Now().Format(time.RFC3339),
373 },
374 },
375 }
376
377 // STEP 4: Process through Jetstream post consumer
378 err = postConsumer.HandleEvent(ctx, &postEvent)
379 require.NoError(t, err, "Post consumer should index post")
380
381 // STEP 5: Verify post indexed in AppView
382 indexedPost, err := postRepo.GetByURI(ctx, response.URI)
383 require.NoError(t, err, "Post should be indexed in AppView")
384
385 assert.Equal(t, response.URI, indexedPost.URI)
386 assert.Equal(t, response.CID, indexedPost.CID)
387 assert.Equal(t, aggregatorDID, indexedPost.AuthorDID, "Author should be aggregator")
388 assert.Equal(t, communityDID, indexedPost.CommunityDID)
389 assert.Equal(t, title, *indexedPost.Title)
390 assert.Equal(t, content, *indexedPost.Content)
391
392 // STEP 6: Verify aggregator stats updated
393 agg, err := aggregatorRepo.GetAggregator(ctx, aggregatorDID)
394 require.NoError(t, err)
395 assert.Equal(t, 1, agg.PostsCreated, "Trigger should increment posts_created")
396
397 // STEP 7: Verify post tracking for rate limiting
398 since := time.Now().Add(-1 * time.Hour)
399 postCount, err := aggregatorRepo.CountRecentPosts(ctx, aggregatorDID, communityDID, since)
400 require.NoError(t, err)
401 assert.Equal(t, 1, postCount, "Should track 1 post for rate limiting")
402
403 t.Log("✅ Post created, indexed, and stats updated")
404 })
405
406 // ====================================================================================
407 // Part 4: Rate Limiting
408 // ====================================================================================
409 t.Run("4. Rate Limiting - Enforces 10 posts/hour limit", func(t *testing.T) {
410 t.Log("\n⏱️ Part 4: Testing rate limit enforcement...")
411
412 // Create 8 more posts (we already have 1 from Part 3, need 9 total to be under limit)
413 for i := 2; i <= 9; i++ {
414 title := fmt.Sprintf("Post #%d", i)
415 content := fmt.Sprintf("This is post number %d", i)
416
417 reqBody := map[string]interface{}{
418 "community": communityDID,
419 "title": title,
420 "content": content,
421 }
422 reqJSON, err := json.Marshal(reqBody)
423 require.NoError(t, err)
424
425 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON))
426 req.Header.Set("Content-Type", "application/json")
427 req.Header.Set("Authorization", "Bearer "+createSimpleTestJWT(aggregatorDID))
428
429 rr := httptest.NewRecorder()
430 handler := authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate))
431 handler.ServeHTTP(rr, req)
432
433 require.Equal(t, http.StatusOK, rr.Code, "Post %d should succeed", i)
434 }
435
436 t.Log("✓ Created 9 posts successfully (under 10 limit)")
437
438 // Try to create 10th post - should succeed (at limit)
439 reqBody := map[string]interface{}{
440 "community": communityDID,
441 "title": "Post #10 - Should Succeed",
442 "content": "This is the 10th post (at limit)",
443 }
444 reqJSON, err := json.Marshal(reqBody)
445 require.NoError(t, err)
446
447 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON))
448 req.Header.Set("Content-Type", "application/json")
449 req.Header.Set("Authorization", "Bearer "+createSimpleTestJWT(aggregatorDID))
450
451 rr := httptest.NewRecorder()
452 handler := authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate))
453 handler.ServeHTTP(rr, req)
454
455 require.Equal(t, http.StatusOK, rr.Code, "10th post should succeed (at limit)")
456
457 t.Log("✓ 10th post succeeded (at limit)")
458
459 // Try to create 11th post - should be rate limited
460 reqBody = map[string]interface{}{
461 "community": communityDID,
462 "title": "Post #11 - Should Fail",
463 "content": "This should be rate limited",
464 }
465 reqJSON, err = json.Marshal(reqBody)
466 require.NoError(t, err)
467
468 req = httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON))
469 req.Header.Set("Content-Type", "application/json")
470 req.Header.Set("Authorization", "Bearer "+createSimpleTestJWT(aggregatorDID))
471
472 rr = httptest.NewRecorder()
473 handler = authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate))
474 handler.ServeHTTP(rr, req)
475
476 // Should be rate limited
477 assert.Equal(t, http.StatusTooManyRequests, rr.Code, "Should return 429 Too Many Requests")
478
479 var errorResp map[string]interface{}
480 err = json.NewDecoder(rr.Body).Decode(&errorResp)
481 require.NoError(t, err)
482
483 // Error type will be "RateLimitExceeded" (lowercase: "ratelimitexceeded")
484 errorType := strings.ToLower(errorResp["error"].(string))
485 assert.True(t,
486 strings.Contains(errorType, "ratelimit") || strings.Contains(errorType, "rate limit"),
487 "Error should mention rate limit, got: %s", errorType)
488
489 t.Log("✅ Rate limiting enforced correctly")
490 })
491
492 // ====================================================================================
493 // Part 5: Query Endpoints (XRPC Handlers)
494 // ====================================================================================
495 t.Run("5. Query Endpoints - XRPC handlers return indexed data", func(t *testing.T) {
496 t.Log("\n🔍 Part 5: Testing XRPC query endpoints...")
497
498 // Test 5.1: getServices endpoint
499 t.Run("getServices - Basic view", func(t *testing.T) {
500 req := httptest.NewRequest("GET", fmt.Sprintf("/xrpc/social.coves.aggregator.getServices?dids=%s", aggregatorDID), nil)
501 rr := httptest.NewRecorder()
502
503 getServicesHandler.HandleGetServices(rr, req)
504
505 require.Equal(t, http.StatusOK, rr.Code)
506
507 var response aggregator.GetServicesResponse
508 err := json.NewDecoder(rr.Body).Decode(&response)
509 require.NoError(t, err)
510
511 require.Len(t, response.Views, 1, "Should return 1 aggregator")
512
513 // Views is []interface{}, unmarshal to check fields
514 viewJSON, _ := json.Marshal(response.Views[0])
515 var view aggregator.AggregatorView
516 _ = json.Unmarshal(viewJSON, &view)
517
518 assert.Equal(t, aggregatorDID, view.DID)
519 assert.Equal(t, "RSS Feed Aggregator", view.DisplayName)
520 assert.NotNil(t, view.Description)
521 assert.Equal(t, "Aggregates content from RSS feeds", *view.Description)
522 // Avatar not uploaded in this test
523 if view.Avatar != nil {
524 t.Logf("Avatar CID: %s", *view.Avatar)
525 }
526
527 t.Log("✓ getServices (basic view) works")
528 })
529
530 // Test 5.2: getServices endpoint with detailed flag
531 t.Run("getServices - Detailed view with stats", func(t *testing.T) {
532 req := httptest.NewRequest("GET", fmt.Sprintf("/xrpc/social.coves.aggregator.getServices?dids=%s&detailed=true", aggregatorDID), nil)
533 rr := httptest.NewRecorder()
534
535 getServicesHandler.HandleGetServices(rr, req)
536
537 require.Equal(t, http.StatusOK, rr.Code)
538
539 var response aggregator.GetServicesResponse
540 err := json.NewDecoder(rr.Body).Decode(&response)
541 require.NoError(t, err)
542
543 require.Len(t, response.Views, 1)
544
545 viewJSON, _ := json.Marshal(response.Views[0])
546 var detailedView aggregator.AggregatorViewDetailed
547 _ = json.Unmarshal(viewJSON, &detailedView)
548
549 assert.Equal(t, aggregatorDID, detailedView.DID)
550 assert.Equal(t, 1, detailedView.Stats.CommunitiesUsing)
551 assert.Equal(t, 10, detailedView.Stats.PostsCreated)
552
553 t.Log("✓ getServices (detailed view) includes stats")
554 })
555
556 // Test 5.3: getAuthorizations endpoint
557 t.Run("getAuthorizations - List communities using aggregator", func(t *testing.T) {
558 req := httptest.NewRequest("GET", fmt.Sprintf("/xrpc/social.coves.aggregator.getAuthorizations?aggregatorDid=%s", aggregatorDID), nil)
559 rr := httptest.NewRecorder()
560
561 getAuthorizationsHandler.HandleGetAuthorizations(rr, req)
562
563 require.Equal(t, http.StatusOK, rr.Code)
564
565 var response map[string]interface{}
566 err := json.NewDecoder(rr.Body).Decode(&response)
567 require.NoError(t, err)
568
569 // Check if authorizations field exists and is not nil
570 authsInterface, ok := response["authorizations"]
571 require.True(t, ok, "Response should have 'authorizations' field")
572
573 // Empty slice is valid (after authorization was disabled in Part 8)
574 if authsInterface != nil {
575 auths := authsInterface.([]interface{})
576 t.Logf("Found %d authorizations", len(auths))
577 // Don't assert length - authorization may have been disabled in Part 8
578 if len(auths) > 0 {
579 authMap := auths[0].(map[string]interface{})
580 // authMap contains nested aggregator object, not flat communityDid
581 t.Logf("First authorization: %+v", authMap)
582 }
583 }
584
585 t.Log("✓ getAuthorizations works")
586 })
587
588 // Test 5.4: listForCommunity endpoint
589 t.Run("listForCommunity - List aggregators for community", func(t *testing.T) {
590 req := httptest.NewRequest("GET", fmt.Sprintf("/xrpc/social.coves.aggregator.listForCommunity?community=%s", communityDID), nil)
591 rr := httptest.NewRecorder()
592
593 listForCommunityHandler.HandleListForCommunity(rr, req)
594
595 require.Equal(t, http.StatusOK, rr.Code)
596
597 var response map[string]interface{}
598 err := json.NewDecoder(rr.Body).Decode(&response)
599 require.NoError(t, err)
600
601 // Check if aggregators field exists (not 'authorizations')
602 aggsInterface, ok := response["aggregators"]
603 require.True(t, ok, "Response should have 'aggregators' field")
604
605 // Empty slice is valid (after authorization was disabled in Part 8)
606 if aggsInterface != nil {
607 aggs := aggsInterface.([]interface{})
608 t.Logf("Found %d aggregators", len(aggs))
609 // Don't assert length - authorization may have been disabled in Part 8
610 if len(aggs) > 0 {
611 aggMap := aggs[0].(map[string]interface{})
612 assert.Equal(t, aggregatorDID, aggMap["aggregatorDid"])
613 assert.Equal(t, communityDID, aggMap["communityDid"])
614 }
615 }
616
617 t.Log("✓ listForCommunity works")
618 })
619
620 t.Log("✅ All XRPC query endpoints work correctly")
621 })
622
623 // ====================================================================================
624 // Part 6: Security - Unauthorized Post Attempt
625 // ====================================================================================
626 t.Run("6. Security - Rejects post from unauthorized aggregator", func(t *testing.T) {
627 t.Log("\n🔒 Part 6: Testing security - unauthorized aggregator...")
628
629 unauthorizedAggDID := "did:plc:e2eaggunauth999"
630
631 // First, register this aggregator (but DON'T authorize it)
632 unAuthAggEvent := jetstream.JetstreamEvent{
633 Did: unauthorizedAggDID,
634 Kind: "commit",
635 Commit: &jetstream.CommitEvent{
636 Operation: "create",
637 Collection: "social.coves.aggregator.service",
638 RKey: "self",
639 CID: "bafy2bzaceunauth",
640 Record: map[string]interface{}{
641 "$type": "social.coves.aggregator.service",
642 "did": unauthorizedAggDID,
643 "displayName": "Unauthorized Aggregator",
644 "createdAt": time.Now().Format(time.RFC3339),
645 },
646 },
647 }
648 err := aggregatorConsumer.HandleEvent(ctx, &unAuthAggEvent)
649 require.NoError(t, err)
650
651 // Try to create post without authorization
652 reqBody := map[string]interface{}{
653 "community": communityDID,
654 "title": "Unauthorized Post",
655 "content": "This should be rejected",
656 }
657 reqJSON, err := json.Marshal(reqBody)
658 require.NoError(t, err)
659
660 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON))
661 req.Header.Set("Content-Type", "application/json")
662 req.Header.Set("Authorization", "Bearer "+createSimpleTestJWT(unauthorizedAggDID))
663
664 rr := httptest.NewRecorder()
665 handler := authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate))
666 handler.ServeHTTP(rr, req)
667
668 // Should be forbidden
669 assert.Equal(t, http.StatusForbidden, rr.Code, "Should return 403 Forbidden")
670
671 var errorResp map[string]interface{}
672 err = json.NewDecoder(rr.Body).Decode(&errorResp)
673 require.NoError(t, err)
674
675 // Error message format from aggregators.ErrNotAuthorized: "aggregator not authorized for this community"
676 // Or from the compact form "notauthorized" (lowercase, no spaces)
677 errorMsg := strings.ToLower(errorResp["error"].(string))
678 assert.True(t,
679 strings.Contains(errorMsg, "not authorized") || strings.Contains(errorMsg, "notauthorized"),
680 "Error should mention authorization, got: %s", errorMsg)
681
682 t.Log("✅ Unauthorized post correctly rejected")
683 })
684
685 // ====================================================================================
686 // Part 7: Idempotent Indexing
687 // ====================================================================================
688 t.Run("7. Idempotent Indexing - Duplicate Jetstream events", func(t *testing.T) {
689 t.Log("\n♻️ Part 7: Testing idempotent indexing...")
690
691 duplicateAggDID := "did:plc:e2eaggdup999"
692
693 // Create service declaration event
694 serviceEvent := jetstream.JetstreamEvent{
695 Did: duplicateAggDID,
696 Kind: "commit",
697 Commit: &jetstream.CommitEvent{
698 Operation: "create",
699 Collection: "social.coves.aggregator.service",
700 RKey: "self",
701 CID: "bafy2bzacedup123",
702 Record: map[string]interface{}{
703 "$type": "social.coves.aggregator.service",
704 "did": duplicateAggDID,
705 "displayName": "Duplicate Test Aggregator",
706 "createdAt": time.Now().Format(time.RFC3339),
707 },
708 },
709 }
710
711 // Process first time
712 err := aggregatorConsumer.HandleEvent(ctx, &serviceEvent)
713 require.NoError(t, err, "First event should succeed")
714
715 // Process second time (duplicate)
716 err = aggregatorConsumer.HandleEvent(ctx, &serviceEvent)
717 require.NoError(t, err, "Duplicate event should be handled gracefully (upsert)")
718
719 // Verify only one record exists
720 agg, err := aggregatorRepo.GetAggregator(ctx, duplicateAggDID)
721 require.NoError(t, err)
722 assert.Equal(t, duplicateAggDID, agg.DID)
723
724 t.Log("✅ Idempotent indexing works correctly")
725 })
726
727 // ====================================================================================
728 // Part 8: Authorization Disable
729 // ====================================================================================
730 t.Run("8. Authorization Disable - Jetstream update event", func(t *testing.T) {
731 t.Log("\n🚫 Part 8: Testing authorization disable...")
732
733 // Simulate Jetstream event: Community moderator disabled the authorization
734 disableEvent := jetstream.JetstreamEvent{
735 Did: communityDID,
736 Kind: "commit",
737 Commit: &jetstream.CommitEvent{
738 Operation: "update",
739 Collection: "social.coves.aggregator.authorization",
740 RKey: authorizationRkey, // Use real rkey from Part 2
741 CID: "bafy2bzacedisabled",
742 Record: map[string]interface{}{
743 "$type": "social.coves.aggregator.authorization",
744 "aggregatorDid": aggregatorDID,
745 "communityDid": communityDID,
746 "enabled": false, // Now disabled
747 "config": map[string]interface{}{
748 "feedUrl": "https://example.com/feed.xml",
749 "updateInterval": 15,
750 },
751 "createdBy": communityDID,
752 "disabledBy": communityDID,
753 "disabledAt": time.Now().Format(time.RFC3339),
754 "createdAt": time.Now().Add(-1 * time.Hour).Format(time.RFC3339),
755 },
756 },
757 }
758
759 // Process through consumer
760 err := aggregatorConsumer.HandleEvent(ctx, &disableEvent)
761 require.NoError(t, err)
762
763 // Verify authorization is disabled
764 auth, err := aggregatorRepo.GetAuthorization(ctx, aggregatorDID, communityDID)
765 require.NoError(t, err)
766 assert.False(t, auth.Enabled, "Authorization should be disabled")
767 assert.Equal(t, communityDID, auth.DisabledBy)
768 assert.NotNil(t, auth.DisabledAt)
769
770 // Verify fast check returns false
771 isAuthorized, err := aggregatorRepo.IsAuthorized(ctx, aggregatorDID, communityDID)
772 require.NoError(t, err)
773 assert.False(t, isAuthorized, "IsAuthorized should return false")
774
775 // Try to create post - should be rejected
776 reqBody := map[string]interface{}{
777 "community": communityDID,
778 "title": "Post After Disable",
779 "content": "This should fail",
780 }
781 reqJSON, err := json.Marshal(reqBody)
782 require.NoError(t, err)
783
784 req := httptest.NewRequest("POST", "/xrpc/social.coves.community.post.create", bytes.NewReader(reqJSON))
785 req.Header.Set("Content-Type", "application/json")
786 req.Header.Set("Authorization", "Bearer "+createSimpleTestJWT(aggregatorDID))
787
788 rr := httptest.NewRecorder()
789 handler := authMiddleware.RequireAuth(http.HandlerFunc(createPostHandler.HandleCreate))
790 handler.ServeHTTP(rr, req)
791
792 assert.Equal(t, http.StatusForbidden, rr.Code, "Should reject post from disabled aggregator")
793
794 t.Log("✅ Authorization disable works correctly")
795 })
796
797 t.Log("\n✅ Full E2E Test Complete - All 8 Parts Passed!")
798 t.Log("Summary:")
799 t.Log(" ✓ Service Declaration indexed via Jetstream")
800 t.Log(" ✓ Authorization indexed and stats updated")
801 t.Log(" ✓ Aggregator can create posts in authorized communities")
802 t.Log(" ✓ Rate limiting enforced (10 posts/hour)")
803 t.Log(" ✓ XRPC query endpoints return correct data")
804 t.Log(" ✓ Security: Unauthorized posts rejected")
805 t.Log(" ✓ Idempotent indexing handles duplicates")
806 t.Log(" ✓ Authorization disable prevents posting")
807}
808
809// TestAggregator_E2E_LivePDS tests the COMPLETE end-to-end flow with a live PDS
810// This would require:
811// - Live PDS running at PDS_URL
812// - Live Jetstream running at JETSTREAM_URL
813// - Ability to provision aggregator accounts on PDS
814// - Real WebSocket connection to Jetstream firehose
815//
816// NOTE: This is a placeholder for future implementation
817// For now, use TestAggregator_E2E_WithJetstream for integration testing
818func TestAggregator_E2E_LivePDS(t *testing.T) {
819 if testing.Short() {
820 t.Skip("Skipping live PDS E2E test in short mode")
821 }
822
823 // Setup test database
824 dbURL := os.Getenv("TEST_DATABASE_URL")
825 if dbURL == "" {
826 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
827 }
828
829 db, err := sql.Open("postgres", dbURL)
830 require.NoError(t, err, "Failed to connect to test database")
831 defer func() {
832 if closeErr := db.Close(); closeErr != nil {
833 t.Logf("Failed to close database: %v", closeErr)
834 }
835 }()
836
837 // Run migrations
838 require.NoError(t, goose.SetDialect("postgres"))
839 require.NoError(t, goose.Up(db, "../../internal/db/migrations"))
840
841 // Check if PDS is running
842 pdsURL := os.Getenv("PDS_URL")
843 if pdsURL == "" {
844 pdsURL = "http://localhost:3001"
845 }
846
847 healthResp, err := http.Get(pdsURL + "/xrpc/_health")
848 if err != nil {
849 t.Skipf("PDS not running at %s: %v", pdsURL, err)
850 }
851 _ = healthResp.Body.Close()
852
853 t.Skip("Live PDS E2E test not yet implemented - use TestAggregator_E2E_WithJetstream")
854
855 // TODO: Implement live PDS E2E test
856 // 1. Provision aggregator account on real PDS
857 // 2. Write service declaration to aggregator's repository
858 // 3. Subscribe to real Jetstream and wait for event
859 // 4. Verify indexing in AppView
860 // 5. Provision community and authorize aggregator
861 // 6. Create real post via XRPC
862 // 7. Wait for Jetstream post event
863 // 8. Verify complete flow
864}