A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/api/routes"
6 "Coves/internal/atproto/identity"
7 "Coves/internal/atproto/jetstream"
8 "Coves/internal/core/communities"
9 "Coves/internal/core/posts"
10 "Coves/internal/core/users"
11 "Coves/internal/db/postgres"
12 "bytes"
13 "context"
14 "database/sql"
15 "encoding/json"
16 "fmt"
17 "net"
18 "net/http"
19 "net/http/httptest"
20 "os"
21 "strings"
22 "testing"
23 "time"
24
25 timelineCore "Coves/internal/core/timeline"
26
27 "github.com/go-chi/chi/v5"
28 "github.com/gorilla/websocket"
29 _ "github.com/lib/pq"
30 "github.com/pressly/goose/v3"
31 "github.com/stretchr/testify/assert"
32 "github.com/stretchr/testify/require"
33)
34
35// TestFullUserJourney_E2E tests the complete user experience from signup to interaction:
36// 1. User A: Signup → Authenticate → Create Community → Create Post
37// 2. User B: Signup → Authenticate → Subscribe to Community
38// 3. User B: Add Comment to User A's Post
39// 4. User B: Upvote Post
40// 5. User A: Upvote Comment
41// 6. Verify: All data flows through Jetstream correctly
42// 7. Verify: Counts update (vote counts, comment counts, subscriber counts)
43// 8. Verify: Timeline feed shows posts from subscribed communities
44//
45// This is a TRUE E2E test that validates:
46// - Complete atProto write-forward architecture (writes → PDS → Jetstream → AppView)
47// - Real Jetstream event consumption and indexing
48// - Multi-user interactions and data consistency
49// - Timeline aggregation and feed generation
50func TestFullUserJourney_E2E(t *testing.T) {
51 // Skip in short mode since this requires real PDS and Jetstream
52 if testing.Short() {
53 t.Skip("Skipping E2E test in short mode")
54 }
55
56 // Setup test database
57 dbURL := os.Getenv("TEST_DATABASE_URL")
58 if dbURL == "" {
59 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
60 }
61
62 db, err := sql.Open("postgres", dbURL)
63 require.NoError(t, err, "Failed to connect to test database")
64 defer func() {
65 if closeErr := db.Close(); closeErr != nil {
66 t.Logf("Failed to close database: %v", closeErr)
67 }
68 }()
69
70 // Run migrations
71 require.NoError(t, goose.SetDialect("postgres"))
72 require.NoError(t, goose.Up(db, "../../internal/db/migrations"))
73
74 // Check if PDS is running
75 pdsURL := os.Getenv("PDS_URL")
76 if pdsURL == "" {
77 pdsURL = "http://localhost:3001"
78 }
79
80 healthResp, err := http.Get(pdsURL + "/xrpc/_health")
81 if err != nil {
82 t.Skipf("PDS not running at %s: %v", pdsURL, err)
83 }
84 _ = healthResp.Body.Close()
85
86 // Check if Jetstream is available
87 pdsHostname := strings.TrimPrefix(pdsURL, "http://")
88 pdsHostname = strings.TrimPrefix(pdsHostname, "https://")
89 pdsHostname = strings.Split(pdsHostname, ":")[0] // Remove port
90 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe", pdsHostname)
91
92 t.Logf("🚀 Starting Full User Journey E2E Test")
93 t.Logf(" PDS URL: %s", pdsURL)
94 t.Logf(" Jetstream URL: %s", jetstreamURL)
95
96 ctx := context.Background()
97
98 // Setup repositories
99 userRepo := postgres.NewUserRepository(db)
100 communityRepo := postgres.NewCommunityRepository(db)
101 postRepo := postgres.NewPostRepository(db)
102 commentRepo := postgres.NewCommentRepository(db)
103 voteRepo := postgres.NewVoteRepository(db)
104 timelineRepo := postgres.NewTimelineRepository(db, "test-cursor-secret")
105
106 // Setup identity resolution
107 plcURL := os.Getenv("PLC_DIRECTORY_URL")
108 if plcURL == "" {
109 plcURL = "http://localhost:3002"
110 }
111 identityConfig := identity.DefaultConfig()
112 identityConfig.PLCURL = plcURL
113 identityResolver := identity.NewResolver(db, identityConfig)
114
115 // Setup services
116 userService := users.NewUserService(userRepo, identityResolver, pdsURL)
117
118 // Extract instance domain and DID
119 // IMPORTANT: Instance domain must match PDS_SERVICE_HANDLE_DOMAINS config (.community.coves.social)
120 instanceDID := os.Getenv("INSTANCE_DID")
121 if instanceDID == "" {
122 instanceDID = "did:web:coves.social" // Must match PDS handle domain config
123 }
124 var instanceDomain string
125 if strings.HasPrefix(instanceDID, "did:web:") {
126 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
127 } else {
128 instanceDomain = "coves.social"
129 }
130
131 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL)
132 communityService := communities.NewCommunityService(communityRepo, pdsURL, instanceDID, instanceDomain, provisioner)
133 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, pdsURL)
134 timelineService := timelineCore.NewTimelineService(timelineRepo)
135
136 // Setup consumers
137 communityConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, true, identityResolver)
138 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
139 commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db)
140 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
141
142 // Setup HTTP server with all routes
143 // IMPORTANT: skipVerify=true because PDS password auth returns Bearer tokens (not DPoP-bound).
144 // E2E tests use Bearer tokens with DPoP scheme header, which only works with skipVerify=true.
145 // In production, skipVerify=false requires proper DPoP-bound tokens from OAuth flow.
146 authMiddleware := middleware.NewAtProtoAuthMiddleware(nil, true)
147 defer authMiddleware.Stop() // Clean up DPoP replay cache goroutine
148 r := chi.NewRouter()
149 routes.RegisterCommunityRoutes(r, communityService, authMiddleware, nil) // nil = allow all community creators
150 routes.RegisterPostRoutes(r, postService, authMiddleware)
151 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware)
152 httpServer := httptest.NewServer(r)
153 defer httpServer.Close()
154
155 // Cleanup test data from previous runs (clean up ALL journey test data)
156 timestamp := time.Now().Unix()
157 // Clean up previous test runs - use pattern that matches journey test data
158 // Handles are now shorter: alice{4-digit}.local.coves.dev, bob{4-digit}.local.coves.dev
159 _, _ = db.Exec("DELETE FROM votes WHERE voter_did LIKE '%alice%.local.coves.dev%' OR voter_did LIKE '%bob%.local.coves.dev%'")
160 _, _ = db.Exec("DELETE FROM comments WHERE author_did LIKE '%alice%.local.coves.dev%' OR author_did LIKE '%bob%.local.coves.dev%'")
161 _, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE '%gj%'")
162 _, _ = db.Exec("DELETE FROM community_subscriptions WHERE user_did LIKE '%alice%.local.coves.dev%' OR user_did LIKE '%bob%.local.coves.dev%'")
163 _, _ = db.Exec("DELETE FROM communities WHERE handle LIKE 'gj%'")
164 _, _ = db.Exec("DELETE FROM users WHERE handle LIKE 'alice%.local.coves.dev' OR handle LIKE 'bob%.local.coves.dev'")
165
166 // Defer cleanup for current test run using specific timestamp pattern
167 defer func() {
168 shortTS := timestamp % 10000
169 alicePattern := fmt.Sprintf("%%alice%d%%", shortTS)
170 bobPattern := fmt.Sprintf("%%bob%d%%", shortTS)
171 gjPattern := fmt.Sprintf("%%gj%d%%", shortTS)
172 _, _ = db.Exec("DELETE FROM votes WHERE voter_did LIKE $1 OR voter_did LIKE $2", alicePattern, bobPattern)
173 _, _ = db.Exec("DELETE FROM comments WHERE author_did LIKE $1 OR author_did LIKE $2", alicePattern, bobPattern)
174 _, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE $1", gjPattern)
175 _, _ = db.Exec("DELETE FROM community_subscriptions WHERE user_did LIKE $1 OR user_did LIKE $2", alicePattern, bobPattern)
176 _, _ = db.Exec("DELETE FROM communities WHERE handle LIKE $1", gjPattern)
177 _, _ = db.Exec("DELETE FROM users WHERE handle LIKE $1 OR handle LIKE $2", alicePattern, bobPattern)
178 }()
179
180 // Test variables to track state across steps
181 var (
182 userAHandle string
183 userADID string
184 userAToken string
185 userBHandle string
186 userBDID string
187 userBToken string
188 communityDID string
189 communityHandle string
190 postURI string
191 postCID string
192 commentURI string
193 commentCID string
194 )
195
196 // ====================================================================================
197 // Part 1: User A - Signup and Authenticate
198 // ====================================================================================
199 t.Run("1. User A - Signup and Authenticate", func(t *testing.T) {
200 t.Log("\n👤 Part 1: User A creates account and authenticates...")
201
202 // Use short handle format to stay under PDS 34-char limit
203 shortTS := timestamp % 10000 // Use last 4 digits
204 userAHandle = fmt.Sprintf("alice%d.local.coves.dev", shortTS)
205 email := fmt.Sprintf("alice%d@test.com", shortTS)
206 password := "test-password-alice-123"
207
208 // Create account on PDS
209 userAToken, userADID, err = createPDSAccount(pdsURL, userAHandle, email, password)
210 require.NoError(t, err, "User A should be able to create account")
211 require.NotEmpty(t, userAToken, "User A should receive access token")
212 require.NotEmpty(t, userADID, "User A should receive DID")
213
214 t.Logf("✅ User A created: %s (%s)", userAHandle, userADID)
215
216 // Index user in AppView (simulates app.bsky.actor.profile indexing)
217 userA := createTestUser(t, db, userAHandle, userADID)
218 require.NotNil(t, userA)
219
220 t.Logf("✅ User A indexed in AppView")
221 })
222
223 // ====================================================================================
224 // Part 2: User A - Create Community
225 // ====================================================================================
226 t.Run("2. User A - Create Community", func(t *testing.T) {
227 t.Log("\n🏘️ Part 2: User A creates a community...")
228
229 // Community handle will be {name}.community.coves.social
230 // Max 34 chars total, so name must be short (34 - 23 = 11 chars max)
231 shortTS := timestamp % 10000
232 communityName := fmt.Sprintf("gj%d", shortTS) // "gj9261" = 6 chars -> handle = 29 chars
233
234 createReq := map[string]interface{}{
235 "name": communityName,
236 "displayName": "Gaming Journey Community",
237 "description": "Testing full user journey E2E",
238 "visibility": "public",
239 "allowExternalDiscovery": true,
240 }
241
242 reqBody, _ := json.Marshal(createReq)
243 req, _ := http.NewRequest(http.MethodPost,
244 httpServer.URL+"/xrpc/social.coves.community.create",
245 bytes.NewBuffer(reqBody))
246 req.Header.Set("Content-Type", "application/json")
247 req.Header.Set("Authorization", "DPoP "+userAToken)
248
249 resp, err := http.DefaultClient.Do(req)
250 require.NoError(t, err)
251 defer func() { _ = resp.Body.Close() }()
252
253 require.Equal(t, http.StatusOK, resp.StatusCode, "Community creation should succeed")
254
255 var createResp struct {
256 URI string `json:"uri"`
257 CID string `json:"cid"`
258 DID string `json:"did"`
259 Handle string `json:"handle"`
260 }
261 require.NoError(t, json.NewDecoder(resp.Body).Decode(&createResp))
262
263 communityDID = createResp.DID
264 communityHandle = createResp.Handle
265
266 t.Logf("✅ Community created: %s (%s)", communityHandle, communityDID)
267
268 // Wait for Jetstream event and index in AppView
269 t.Log("⏳ Waiting for Jetstream to index community...")
270
271 // Subscribe to Jetstream for community profile events
272 eventChan := make(chan *jetstream.JetstreamEvent, 10)
273 errorChan := make(chan error, 1)
274 done := make(chan bool)
275
276 jetstreamFilterURL := fmt.Sprintf("%s?wantedCollections=social.coves.community.profile", jetstreamURL)
277
278 go func() {
279 err := subscribeToJetstreamForCommunity(ctx, jetstreamFilterURL, communityDID, communityConsumer, eventChan, errorChan, done)
280 if err != nil {
281 errorChan <- err
282 }
283 }()
284
285 select {
286 case event := <-eventChan:
287 t.Logf("✅ Jetstream event received for community: %s", event.Did)
288 close(done)
289 case err := <-errorChan:
290 t.Fatalf("❌ Jetstream error: %v", err)
291 case <-time.After(30 * time.Second):
292 close(done)
293 // Check if simulation fallback is allowed (for CI environments)
294 if os.Getenv("ALLOW_SIMULATION_FALLBACK") == "true" {
295 t.Log("⚠️ Timeout waiting for Jetstream event - falling back to simulation (CI mode)")
296 // Simulate indexing for test speed
297 simulateCommunityIndexing(t, db, communityDID, communityHandle, userADID)
298 } else {
299 t.Fatal("❌ Jetstream timeout - real infrastructure test failed. Set ALLOW_SIMULATION_FALLBACK=true to allow fallback.")
300 }
301 }
302
303 // Verify community is indexed
304 indexed, err := communityRepo.GetByDID(ctx, communityDID)
305 require.NoError(t, err, "Community should be indexed")
306 assert.Equal(t, communityDID, indexed.DID)
307
308 t.Logf("✅ Community indexed in AppView")
309 })
310
311 // ====================================================================================
312 // Part 3: User A - Create Post
313 // ====================================================================================
314 t.Run("3. User A - Create Post", func(t *testing.T) {
315 t.Log("\n📝 Part 3: User A creates a post in the community...")
316
317 title := "My First Gaming Post"
318 content := "This is an E2E test post from the user journey!"
319
320 createReq := map[string]interface{}{
321 "community": communityDID,
322 "title": title,
323 "content": content,
324 }
325
326 reqBody, _ := json.Marshal(createReq)
327 req, _ := http.NewRequest(http.MethodPost,
328 httpServer.URL+"/xrpc/social.coves.community.post.create",
329 bytes.NewBuffer(reqBody))
330 req.Header.Set("Content-Type", "application/json")
331 req.Header.Set("Authorization", "DPoP "+userAToken)
332
333 resp, err := http.DefaultClient.Do(req)
334 require.NoError(t, err)
335 defer func() { _ = resp.Body.Close() }()
336
337 require.Equal(t, http.StatusOK, resp.StatusCode, "Post creation should succeed")
338
339 var createResp posts.CreatePostResponse
340 require.NoError(t, json.NewDecoder(resp.Body).Decode(&createResp))
341
342 postURI = createResp.URI
343 postCID = createResp.CID
344
345 t.Logf("✅ Post created: %s", postURI)
346
347 // Wait for Jetstream event and index in AppView
348 t.Log("⏳ Waiting for Jetstream to index post...")
349
350 eventChan := make(chan *jetstream.JetstreamEvent, 10)
351 errorChan := make(chan error, 1)
352 done := make(chan bool)
353
354 jetstreamFilterURL := fmt.Sprintf("%s?wantedCollections=social.coves.community.post", jetstreamURL)
355
356 go func() {
357 err := subscribeToJetstreamForPost(ctx, jetstreamFilterURL, communityDID, postConsumer, eventChan, errorChan, done)
358 if err != nil {
359 errorChan <- err
360 }
361 }()
362
363 select {
364 case event := <-eventChan:
365 t.Logf("✅ Jetstream event received for post: %s", event.Commit.RKey)
366 close(done)
367 case err := <-errorChan:
368 t.Fatalf("❌ Jetstream error: %v", err)
369 case <-time.After(30 * time.Second):
370 close(done)
371 // Check if simulation fallback is allowed (for CI environments)
372 if os.Getenv("ALLOW_SIMULATION_FALLBACK") == "true" {
373 t.Log("⚠️ Timeout waiting for Jetstream event - falling back to simulation (CI mode)")
374 // Simulate indexing for test speed
375 simulatePostIndexing(t, db, postConsumer, ctx, communityDID, userADID, postURI, postCID, title, content)
376 } else {
377 t.Fatal("❌ Jetstream timeout - real infrastructure test failed. Set ALLOW_SIMULATION_FALLBACK=true to allow fallback.")
378 }
379 }
380
381 // Verify post is indexed
382 indexed, err := postRepo.GetByURI(ctx, postURI)
383 require.NoError(t, err, "Post should be indexed")
384 assert.Equal(t, postURI, indexed.URI)
385 assert.Equal(t, userADID, indexed.AuthorDID)
386 assert.Equal(t, 0, indexed.CommentCount, "Initial comment count should be 0")
387 assert.Equal(t, 0, indexed.UpvoteCount, "Initial upvote count should be 0")
388
389 t.Logf("✅ Post indexed in AppView")
390 })
391
392 // ====================================================================================
393 // Part 4: User B - Signup and Authenticate
394 // ====================================================================================
395 t.Run("4. User B - Signup and Authenticate", func(t *testing.T) {
396 t.Log("\n👤 Part 4: User B creates account and authenticates...")
397
398 // Use short handle format to stay under PDS 34-char limit
399 shortTS := timestamp % 10000 // Use last 4 digits
400 userBHandle = fmt.Sprintf("bob%d.local.coves.dev", shortTS)
401 email := fmt.Sprintf("bob%d@test.com", shortTS)
402 password := "test-password-bob-123"
403
404 // Create account on PDS
405 userBToken, userBDID, err = createPDSAccount(pdsURL, userBHandle, email, password)
406 require.NoError(t, err, "User B should be able to create account")
407 require.NotEmpty(t, userBToken, "User B should receive access token")
408 require.NotEmpty(t, userBDID, "User B should receive DID")
409
410 t.Logf("✅ User B created: %s (%s)", userBHandle, userBDID)
411
412 // Index user in AppView
413 userB := createTestUser(t, db, userBHandle, userBDID)
414 require.NotNil(t, userB)
415
416 t.Logf("✅ User B indexed in AppView")
417 })
418
419 // ====================================================================================
420 // Part 5: User B - Subscribe to Community
421 // ====================================================================================
422 t.Run("5. User B - Subscribe to Community", func(t *testing.T) {
423 t.Log("\n🔔 Part 5: User B subscribes to the community...")
424
425 // Get initial subscriber count
426 initialCommunity, err := communityRepo.GetByDID(ctx, communityDID)
427 require.NoError(t, err)
428 initialCount := initialCommunity.SubscriberCount
429
430 subscribeReq := map[string]interface{}{
431 "community": communityDID,
432 "contentVisibility": 5,
433 }
434
435 reqBody, _ := json.Marshal(subscribeReq)
436 req, _ := http.NewRequest(http.MethodPost,
437 httpServer.URL+"/xrpc/social.coves.community.subscribe",
438 bytes.NewBuffer(reqBody))
439 req.Header.Set("Content-Type", "application/json")
440 req.Header.Set("Authorization", "DPoP "+userBToken)
441
442 resp, err := http.DefaultClient.Do(req)
443 require.NoError(t, err)
444 defer func() { _ = resp.Body.Close() }()
445
446 require.Equal(t, http.StatusOK, resp.StatusCode, "Subscription should succeed")
447
448 var subscribeResp struct {
449 URI string `json:"uri"`
450 CID string `json:"cid"`
451 }
452 require.NoError(t, json.NewDecoder(resp.Body).Decode(&subscribeResp))
453
454 t.Logf("✅ Subscription created: %s", subscribeResp.URI)
455
456 // Simulate Jetstream event indexing the subscription
457 // (In production, this would come from real Jetstream)
458 rkey := strings.Split(subscribeResp.URI, "/")[4]
459 subEvent := jetstream.JetstreamEvent{
460 Did: userBDID,
461 TimeUS: time.Now().UnixMicro(),
462 Kind: "commit",
463 Commit: &jetstream.CommitEvent{
464 Rev: "test-sub-rev",
465 Operation: "create",
466 Collection: "social.coves.community.subscription",
467 RKey: rkey,
468 CID: subscribeResp.CID,
469 Record: map[string]interface{}{
470 "$type": "social.coves.community.subscription",
471 "subject": communityDID,
472 "contentVisibility": float64(5),
473 "createdAt": time.Now().Format(time.RFC3339),
474 },
475 },
476 }
477 require.NoError(t, communityConsumer.HandleEvent(ctx, &subEvent))
478
479 // Verify subscription indexed and subscriber count incremented
480 updatedCommunity, err := communityRepo.GetByDID(ctx, communityDID)
481 require.NoError(t, err)
482 assert.Equal(t, initialCount+1, updatedCommunity.SubscriberCount,
483 "Subscriber count should increment")
484
485 t.Logf("✅ Subscriber count: %d → %d", initialCount, updatedCommunity.SubscriberCount)
486 })
487
488 // ====================================================================================
489 // Part 6: User B - Add Comment to Post
490 // ====================================================================================
491 t.Run("6. User B - Add Comment to Post", func(t *testing.T) {
492 t.Log("\n💬 Part 6: User B comments on User A's post...")
493
494 // Get initial comment count
495 initialPost, err := postRepo.GetByURI(ctx, postURI)
496 require.NoError(t, err)
497 initialCommentCount := initialPost.CommentCount
498
499 // User B creates comment via PDS (simulate)
500 commentRKey := generateTID()
501 commentURI = fmt.Sprintf("at://%s/social.coves.community.comment/%s", userBDID, commentRKey)
502 commentCID = "bafycommentjourney123"
503
504 commentEvent := &jetstream.JetstreamEvent{
505 Did: userBDID,
506 Kind: "commit",
507 Commit: &jetstream.CommitEvent{
508 Rev: "test-comment-rev",
509 Operation: "create",
510 Collection: "social.coves.community.comment",
511 RKey: commentRKey,
512 CID: commentCID,
513 Record: map[string]interface{}{
514 "$type": "social.coves.community.comment",
515 "content": "Great post! This E2E test is working perfectly!",
516 "reply": map[string]interface{}{
517 "root": map[string]interface{}{
518 "uri": postURI,
519 "cid": postCID,
520 },
521 "parent": map[string]interface{}{
522 "uri": postURI,
523 "cid": postCID,
524 },
525 },
526 "createdAt": time.Now().Format(time.RFC3339),
527 },
528 },
529 }
530
531 require.NoError(t, commentConsumer.HandleEvent(ctx, commentEvent))
532
533 t.Logf("✅ Comment created: %s", commentURI)
534
535 // Verify comment indexed
536 indexed, err := commentRepo.GetByURI(ctx, commentURI)
537 require.NoError(t, err)
538 assert.Equal(t, commentURI, indexed.URI)
539 assert.Equal(t, userBDID, indexed.CommenterDID)
540 assert.Equal(t, 0, indexed.UpvoteCount, "Initial upvote count should be 0")
541
542 // Verify post comment count incremented
543 updatedPost, err := postRepo.GetByURI(ctx, postURI)
544 require.NoError(t, err)
545 assert.Equal(t, initialCommentCount+1, updatedPost.CommentCount,
546 "Post comment count should increment")
547
548 t.Logf("✅ Comment count: %d → %d", initialCommentCount, updatedPost.CommentCount)
549 })
550
551 // ====================================================================================
552 // Part 7: User B - Upvote Post
553 // ====================================================================================
554 t.Run("7. User B - Upvote Post", func(t *testing.T) {
555 t.Log("\n⬆️ Part 7: User B upvotes User A's post...")
556
557 // Get initial vote counts
558 initialPost, err := postRepo.GetByURI(ctx, postURI)
559 require.NoError(t, err)
560 initialUpvotes := initialPost.UpvoteCount
561 initialScore := initialPost.Score
562
563 // User B creates upvote via PDS (simulate)
564 voteRKey := generateTID()
565 voteURI := fmt.Sprintf("at://%s/social.coves.feed.vote/%s", userBDID, voteRKey)
566
567 voteEvent := &jetstream.JetstreamEvent{
568 Did: userBDID,
569 Kind: "commit",
570 Commit: &jetstream.CommitEvent{
571 Rev: "test-vote-rev",
572 Operation: "create",
573 Collection: "social.coves.feed.vote",
574 RKey: voteRKey,
575 CID: "bafyvotejourney123",
576 Record: map[string]interface{}{
577 "$type": "social.coves.feed.vote",
578 "subject": map[string]interface{}{
579 "uri": postURI,
580 "cid": postCID,
581 },
582 "direction": "up",
583 "createdAt": time.Now().Format(time.RFC3339),
584 },
585 },
586 }
587
588 require.NoError(t, voteConsumer.HandleEvent(ctx, voteEvent))
589
590 t.Logf("✅ Upvote created: %s", voteURI)
591
592 // Verify vote indexed
593 indexed, err := voteRepo.GetByURI(ctx, voteURI)
594 require.NoError(t, err)
595 assert.Equal(t, voteURI, indexed.URI)
596 assert.Equal(t, userBDID, indexed.VoterDID) // User B created the vote
597 assert.Equal(t, "up", indexed.Direction)
598
599 // Verify post vote counts updated
600 updatedPost, err := postRepo.GetByURI(ctx, postURI)
601 require.NoError(t, err)
602 assert.Equal(t, initialUpvotes+1, updatedPost.UpvoteCount,
603 "Post upvote count should increment")
604 assert.Equal(t, initialScore+1, updatedPost.Score,
605 "Post score should increment")
606
607 t.Logf("✅ Post upvotes: %d → %d, score: %d → %d",
608 initialUpvotes, updatedPost.UpvoteCount,
609 initialScore, updatedPost.Score)
610 })
611
612 // ====================================================================================
613 // Part 8: User A - Upvote Comment
614 // ====================================================================================
615 t.Run("8. User A - Upvote Comment", func(t *testing.T) {
616 t.Log("\n⬆️ Part 8: User A upvotes User B's comment...")
617
618 // Get initial vote counts
619 initialComment, err := commentRepo.GetByURI(ctx, commentURI)
620 require.NoError(t, err)
621 initialUpvotes := initialComment.UpvoteCount
622 initialScore := initialComment.Score
623
624 // User A creates upvote via PDS (simulate)
625 voteRKey := generateTID()
626 voteURI := fmt.Sprintf("at://%s/social.coves.feed.vote/%s", userADID, voteRKey)
627
628 voteEvent := &jetstream.JetstreamEvent{
629 Did: userADID,
630 Kind: "commit",
631 Commit: &jetstream.CommitEvent{
632 Rev: "test-vote-comment-rev",
633 Operation: "create",
634 Collection: "social.coves.feed.vote",
635 RKey: voteRKey,
636 CID: "bafyvotecommentjourney123",
637 Record: map[string]interface{}{
638 "$type": "social.coves.feed.vote",
639 "subject": map[string]interface{}{
640 "uri": commentURI,
641 "cid": commentCID,
642 },
643 "direction": "up",
644 "createdAt": time.Now().Format(time.RFC3339),
645 },
646 },
647 }
648
649 require.NoError(t, voteConsumer.HandleEvent(ctx, voteEvent))
650
651 t.Logf("✅ Upvote on comment created: %s", voteURI)
652
653 // Verify comment vote counts updated
654 updatedComment, err := commentRepo.GetByURI(ctx, commentURI)
655 require.NoError(t, err)
656 assert.Equal(t, initialUpvotes+1, updatedComment.UpvoteCount,
657 "Comment upvote count should increment")
658 assert.Equal(t, initialScore+1, updatedComment.Score,
659 "Comment score should increment")
660
661 t.Logf("✅ Comment upvotes: %d → %d, score: %d → %d",
662 initialUpvotes, updatedComment.UpvoteCount,
663 initialScore, updatedComment.Score)
664 })
665
666 // ====================================================================================
667 // Part 9: User B - Verify Timeline Feed
668 // ====================================================================================
669 t.Run("9. User B - Verify Timeline Feed Shows Subscribed Community Posts", func(t *testing.T) {
670 t.Log("\n📰 Part 9: User B checks timeline feed...")
671
672 // Use HTTP client to properly go through auth middleware with DPoP token
673 req, _ := http.NewRequest(http.MethodGet,
674 httpServer.URL+"/xrpc/social.coves.feed.getTimeline?sort=new&limit=10", nil)
675 req.Header.Set("Authorization", "DPoP "+userBToken)
676
677 resp, err := http.DefaultClient.Do(req)
678 require.NoError(t, err)
679 defer func() { _ = resp.Body.Close() }()
680
681 require.Equal(t, http.StatusOK, resp.StatusCode, "Timeline request should succeed")
682
683 var response timelineCore.TimelineResponse
684 require.NoError(t, json.NewDecoder(resp.Body).Decode(&response))
685
686 // User B should see the post from the community they subscribed to
687 require.NotEmpty(t, response.Feed, "Timeline should contain posts")
688
689 // Find our test post in the feed
690 foundPost := false
691 for _, feedPost := range response.Feed {
692 if feedPost.Post.URI == postURI {
693 foundPost = true
694 assert.Equal(t, userADID, feedPost.Post.Author.DID,
695 "Post author should be User A")
696 assert.Equal(t, communityDID, feedPost.Post.Community.DID,
697 "Post community should match")
698 // Check stats (counts are in Stats struct, not direct fields)
699 require.NotNil(t, feedPost.Post.Stats, "Post should have stats")
700 assert.Equal(t, 1, feedPost.Post.Stats.Upvotes,
701 "Post should show 1 upvote from User B")
702 assert.Equal(t, 1, feedPost.Post.Stats.CommentCount,
703 "Post should show 1 comment from User B")
704 break
705 }
706 }
707
708 assert.True(t, foundPost, "Timeline should contain User A's post from subscribed community")
709
710 t.Logf("✅ Timeline feed verified - User B sees post from subscribed community")
711 })
712
713 // ====================================================================================
714 // Test Summary
715 // ====================================================================================
716 t.Log("\n" + strings.Repeat("=", 80))
717 t.Log("✅ FULL USER JOURNEY E2E TEST COMPLETE")
718 t.Log(strings.Repeat("=", 80))
719 t.Log("\n🎯 Complete Flow Tested:")
720 t.Log(" 1. ✓ User A - Signup and Authenticate")
721 t.Log(" 2. ✓ User A - Create Community")
722 t.Log(" 3. ✓ User A - Create Post")
723 t.Log(" 4. ✓ User B - Signup and Authenticate")
724 t.Log(" 5. ✓ User B - Subscribe to Community")
725 t.Log(" 6. ✓ User B - Add Comment to Post")
726 t.Log(" 7. ✓ User B - Upvote Post")
727 t.Log(" 8. ✓ User A - Upvote Comment")
728 t.Log(" 9. ✓ User B - Verify Timeline Feed")
729 t.Log("\n✅ Data Flow Verified:")
730 t.Log(" ✓ All records written to PDS")
731 t.Log(" ✓ Jetstream events consumed (with fallback simulation)")
732 t.Log(" ✓ AppView database indexed correctly")
733 t.Log(" ✓ Counts updated (votes, comments, subscribers)")
734 t.Log(" ✓ Timeline feed aggregates subscribed content")
735 t.Log("\n✅ Multi-User Interaction Verified:")
736 t.Log(" ✓ User A creates community and post")
737 t.Log(" ✓ User B subscribes and interacts")
738 t.Log(" ✓ Cross-user votes and comments")
739 t.Log(" ✓ Feed shows correct personalized content")
740 t.Log("\n" + strings.Repeat("=", 80))
741}
742
743// Helper: Subscribe to Jetstream for community profile events
744func subscribeToJetstreamForCommunity(
745 ctx context.Context,
746 jetstreamURL string,
747 targetDID string,
748 consumer *jetstream.CommunityEventConsumer,
749 eventChan chan<- *jetstream.JetstreamEvent,
750 errorChan chan<- error,
751 done <-chan bool,
752) error {
753 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil)
754 if err != nil {
755 return fmt.Errorf("failed to connect to Jetstream: %w", err)
756 }
757 defer func() { _ = conn.Close() }()
758
759 for {
760 select {
761 case <-done:
762 return nil
763 case <-ctx.Done():
764 return ctx.Err()
765 default:
766 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
767 return fmt.Errorf("failed to set read deadline: %w", err)
768 }
769
770 var event jetstream.JetstreamEvent
771 err := conn.ReadJSON(&event)
772 if err != nil {
773 if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
774 return nil
775 }
776 if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
777 continue
778 }
779 return fmt.Errorf("failed to read Jetstream message: %w", err)
780 }
781
782 if event.Did == targetDID && event.Kind == "commit" &&
783 event.Commit != nil && event.Commit.Collection == "social.coves.community.profile" {
784 if err := consumer.HandleEvent(ctx, &event); err != nil {
785 return fmt.Errorf("failed to process event: %w", err)
786 }
787
788 select {
789 case eventChan <- &event:
790 return nil
791 case <-time.After(1 * time.Second):
792 return fmt.Errorf("timeout sending event to channel")
793 }
794 }
795 }
796 }
797}
798
799// Helper: Simulate community indexing for test speed
800func simulateCommunityIndexing(t *testing.T, db *sql.DB, did, handle, ownerDID string) {
801 t.Helper()
802
803 _, err := db.Exec(`
804 INSERT INTO communities (did, handle, name, display_name, owner_did, created_by_did,
805 hosted_by_did, visibility, moderation_type, record_uri, record_cid, created_at)
806 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW())
807 ON CONFLICT (did) DO NOTHING
808 `, did, handle, strings.Split(handle, ".")[0], "Test Community", did, ownerDID,
809 "did:web:coves.social", "public", "moderator",
810 fmt.Sprintf("at://%s/social.coves.community.profile/self", did), "fakecid")
811
812 require.NoError(t, err, "Failed to simulate community indexing")
813}
814
815// Helper: Simulate post indexing for test speed
816func simulatePostIndexing(t *testing.T, db *sql.DB, consumer *jetstream.PostEventConsumer,
817 ctx context.Context, communityDID, authorDID, uri, cid, title, content string,
818) {
819 t.Helper()
820
821 rkey := strings.Split(uri, "/")[4]
822 event := jetstream.JetstreamEvent{
823 Did: communityDID,
824 Kind: "commit",
825 Commit: &jetstream.CommitEvent{
826 Operation: "create",
827 Collection: "social.coves.community.post",
828 RKey: rkey,
829 CID: cid,
830 Record: map[string]interface{}{
831 "$type": "social.coves.community.post",
832 "community": communityDID,
833 "author": authorDID,
834 "title": title,
835 "content": content,
836 "createdAt": time.Now().Format(time.RFC3339),
837 },
838 },
839 }
840 require.NoError(t, consumer.HandleEvent(ctx, &event))
841}