A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/api/routes" 5 "Coves/internal/atproto/identity" 6 "Coves/internal/atproto/jetstream" 7 "Coves/internal/core/communities" 8 "Coves/internal/core/posts" 9 "Coves/internal/core/users" 10 "Coves/internal/db/postgres" 11 "bytes" 12 "context" 13 "database/sql" 14 "encoding/json" 15 "fmt" 16 "net" 17 "net/http" 18 "net/http/httptest" 19 "os" 20 "strings" 21 "testing" 22 "time" 23 24 "github.com/go-chi/chi/v5" 25 "github.com/gorilla/websocket" 26 _ "github.com/lib/pq" 27 "github.com/pressly/goose/v3" 28 "github.com/stretchr/testify/assert" 29 "github.com/stretchr/testify/require" 30 31 timelineCore "Coves/internal/core/timeline" 32) 33 34// TestFullUserJourney_E2E tests the complete user experience from signup to interaction: 35// 1. User A: Signup → Authenticate → Create Community → Create Post 36// 2. User B: Signup → Authenticate → Subscribe to Community 37// 3. User B: Add Comment to User A's Post 38// 4. User B: Upvote Post 39// 5. User A: Upvote Comment 40// 6. Verify: All data flows through Jetstream correctly 41// 7. Verify: Counts update (vote counts, comment counts, subscriber counts) 42// 8. Verify: Timeline feed shows posts from subscribed communities 43// 44// This is a TRUE E2E test that validates: 45// - Complete atProto write-forward architecture (writes → PDS → Jetstream → AppView) 46// - Real Jetstream event consumption and indexing 47// - Multi-user interactions and data consistency 48// - Timeline aggregation and feed generation 49func TestFullUserJourney_E2E(t *testing.T) { 50 // Skip in short mode since this requires real PDS and Jetstream 51 if testing.Short() { 52 t.Skip("Skipping E2E test in short mode") 53 } 54 55 // Setup test database 56 dbURL := os.Getenv("TEST_DATABASE_URL") 57 if dbURL == "" { 58 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 59 } 60 61 db, err := sql.Open("postgres", dbURL) 62 require.NoError(t, err, "Failed to connect to test database") 63 defer func() { 64 if closeErr := db.Close(); closeErr != nil { 65 t.Logf("Failed to close database: %v", closeErr) 66 } 67 }() 68 69 // Run migrations 70 require.NoError(t, goose.SetDialect("postgres")) 71 require.NoError(t, goose.Up(db, "../../internal/db/migrations")) 72 73 // Check if PDS is running 74 pdsURL := os.Getenv("PDS_URL") 75 if pdsURL == "" { 76 pdsURL = "http://localhost:3001" 77 } 78 79 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 80 if err != nil { 81 t.Skipf("PDS not running at %s: %v", pdsURL, err) 82 } 83 _ = healthResp.Body.Close() 84 85 // Check if Jetstream is available 86 pdsHostname := strings.TrimPrefix(pdsURL, "http://") 87 pdsHostname = strings.TrimPrefix(pdsHostname, "https://") 88 pdsHostname = strings.Split(pdsHostname, ":")[0] // Remove port 89 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe", pdsHostname) 90 91 t.Logf("🚀 Starting Full User Journey E2E Test") 92 t.Logf(" PDS URL: %s", pdsURL) 93 t.Logf(" Jetstream URL: %s", jetstreamURL) 94 95 ctx := context.Background() 96 97 // Setup repositories 98 userRepo := postgres.NewUserRepository(db) 99 communityRepo := postgres.NewCommunityRepository(db) 100 postRepo := postgres.NewPostRepository(db) 101 commentRepo := postgres.NewCommentRepository(db) 102 voteRepo := postgres.NewVoteRepository(db) 103 timelineRepo := postgres.NewTimelineRepository(db, "test-cursor-secret") 104 105 // Setup identity resolution 106 plcURL := os.Getenv("PLC_DIRECTORY_URL") 107 if plcURL == "" { 108 plcURL = "http://localhost:3002" 109 } 110 identityConfig := identity.DefaultConfig() 111 identityConfig.PLCURL = plcURL 112 identityResolver := identity.NewResolver(db, identityConfig) 113 114 // Setup services 115 userService := users.NewUserService(userRepo, identityResolver, pdsURL) 116 117 // Extract instance domain and DID 118 // IMPORTANT: Instance domain must match PDS_SERVICE_HANDLE_DOMAINS config (.community.coves.social) 119 instanceDID := os.Getenv("INSTANCE_DID") 120 if instanceDID == "" { 121 instanceDID = "did:web:coves.social" // Must match PDS handle domain config 122 } 123 var instanceDomain string 124 if strings.HasPrefix(instanceDID, "did:web:") { 125 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 126 } else { 127 instanceDomain = "coves.social" 128 } 129 130 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL) 131 communityService := communities.NewCommunityService(communityRepo, pdsURL, instanceDID, instanceDomain, provisioner) 132 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, pdsURL) 133 timelineService := timelineCore.NewTimelineService(timelineRepo) 134 135 // Setup consumers 136 communityConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, true, identityResolver) 137 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 138 commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 139 voteConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db) 140 141 // Setup HTTP server with all routes using OAuth middleware 142 e2eAuth := NewE2EOAuthMiddleware() 143 r := chi.NewRouter() 144 routes.RegisterCommunityRoutes(r, communityService, e2eAuth.OAuthAuthMiddleware, nil) // nil = allow all community creators 145 routes.RegisterPostRoutes(r, postService, e2eAuth.OAuthAuthMiddleware) 146 routes.RegisterTimelineRoutes(r, timelineService, e2eAuth.OAuthAuthMiddleware) 147 httpServer := httptest.NewServer(r) 148 defer httpServer.Close() 149 150 // Cleanup test data from previous runs (clean up ALL journey test data) 151 timestamp := time.Now().Unix() 152 // Clean up previous test runs - use pattern that matches journey test data 153 // Handles are now shorter: alice{4-digit}.local.coves.dev, bob{4-digit}.local.coves.dev 154 _, _ = db.Exec("DELETE FROM votes WHERE voter_did LIKE '%alice%.local.coves.dev%' OR voter_did LIKE '%bob%.local.coves.dev%'") 155 _, _ = db.Exec("DELETE FROM comments WHERE author_did LIKE '%alice%.local.coves.dev%' OR author_did LIKE '%bob%.local.coves.dev%'") 156 _, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE '%gj%'") 157 _, _ = db.Exec("DELETE FROM community_subscriptions WHERE user_did LIKE '%alice%.local.coves.dev%' OR user_did LIKE '%bob%.local.coves.dev%'") 158 _, _ = db.Exec("DELETE FROM communities WHERE handle LIKE 'gj%'") 159 _, _ = db.Exec("DELETE FROM users WHERE handle LIKE 'alice%.local.coves.dev' OR handle LIKE 'bob%.local.coves.dev'") 160 161 // Defer cleanup for current test run using specific timestamp pattern 162 defer func() { 163 shortTS := timestamp % 10000 164 alicePattern := fmt.Sprintf("%%alice%d%%", shortTS) 165 bobPattern := fmt.Sprintf("%%bob%d%%", shortTS) 166 gjPattern := fmt.Sprintf("%%gj%d%%", shortTS) 167 _, _ = db.Exec("DELETE FROM votes WHERE voter_did LIKE $1 OR voter_did LIKE $2", alicePattern, bobPattern) 168 _, _ = db.Exec("DELETE FROM comments WHERE author_did LIKE $1 OR author_did LIKE $2", alicePattern, bobPattern) 169 _, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE $1", gjPattern) 170 _, _ = db.Exec("DELETE FROM community_subscriptions WHERE user_did LIKE $1 OR user_did LIKE $2", alicePattern, bobPattern) 171 _, _ = db.Exec("DELETE FROM communities WHERE handle LIKE $1", gjPattern) 172 _, _ = db.Exec("DELETE FROM users WHERE handle LIKE $1 OR handle LIKE $2", alicePattern, bobPattern) 173 }() 174 175 // Test variables to track state across steps 176 var ( 177 userAHandle string 178 userADID string 179 userAToken string // PDS access token for direct PDS requests 180 userAAPIToken string // Coves API token for Coves API requests 181 userBHandle string 182 userBDID string 183 userBToken string // PDS access token for direct PDS requests 184 userBAPIToken string // Coves API token for Coves API requests 185 communityDID string 186 communityHandle string 187 postURI string 188 postCID string 189 commentURI string 190 commentCID string 191 ) 192 193 // ==================================================================================== 194 // Part 1: User A - Signup and Authenticate 195 // ==================================================================================== 196 t.Run("1. User A - Signup and Authenticate", func(t *testing.T) { 197 t.Log("\n👤 Part 1: User A creates account and authenticates...") 198 199 // Use short handle format to stay under PDS 34-char limit 200 shortTS := timestamp % 10000 // Use last 4 digits 201 userAHandle = fmt.Sprintf("alice%d.local.coves.dev", shortTS) 202 email := fmt.Sprintf("alice%d@test.com", shortTS) 203 password := "test-password-alice-123" 204 205 // Create account on PDS 206 userAToken, userADID, err = createPDSAccount(pdsURL, userAHandle, email, password) 207 require.NoError(t, err, "User A should be able to create account") 208 require.NotEmpty(t, userAToken, "User A should receive access token") 209 require.NotEmpty(t, userADID, "User A should receive DID") 210 211 t.Logf("✅ User A created: %s (%s)", userAHandle, userADID) 212 213 // Index user in AppView (simulates app.bsky.actor.profile indexing) 214 userA := createTestUser(t, db, userAHandle, userADID) 215 require.NotNil(t, userA) 216 217 // Register user with OAuth middleware for Coves API requests 218 userAAPIToken = e2eAuth.AddUser(userADID) 219 220 t.Logf("✅ User A indexed in AppView") 221 }) 222 223 // ==================================================================================== 224 // Part 2: User A - Create Community 225 // ==================================================================================== 226 t.Run("2. User A - Create Community", func(t *testing.T) { 227 t.Log("\n🏘️ Part 2: User A creates a community...") 228 229 // Community handle will be {name}.community.coves.social 230 // Max 34 chars total, so name must be short (34 - 23 = 11 chars max) 231 shortTS := timestamp % 10000 232 communityName := fmt.Sprintf("gj%d", shortTS) // "gj9261" = 6 chars -> handle = 29 chars 233 234 createReq := map[string]interface{}{ 235 "name": communityName, 236 "displayName": "Gaming Journey Community", 237 "description": "Testing full user journey E2E", 238 "visibility": "public", 239 "allowExternalDiscovery": true, 240 } 241 242 reqBody, _ := json.Marshal(createReq) 243 req, _ := http.NewRequest(http.MethodPost, 244 httpServer.URL+"/xrpc/social.coves.community.create", 245 bytes.NewBuffer(reqBody)) 246 req.Header.Set("Content-Type", "application/json") 247 req.Header.Set("Authorization", "Bearer "+userAAPIToken) 248 249 resp, err := http.DefaultClient.Do(req) 250 require.NoError(t, err) 251 defer func() { _ = resp.Body.Close() }() 252 253 require.Equal(t, http.StatusOK, resp.StatusCode, "Community creation should succeed") 254 255 var createResp struct { 256 URI string `json:"uri"` 257 CID string `json:"cid"` 258 DID string `json:"did"` 259 Handle string `json:"handle"` 260 } 261 require.NoError(t, json.NewDecoder(resp.Body).Decode(&createResp)) 262 263 communityDID = createResp.DID 264 communityHandle = createResp.Handle 265 266 t.Logf("✅ Community created: %s (%s)", communityHandle, communityDID) 267 268 // Wait for Jetstream event and index in AppView 269 t.Log("⏳ Waiting for Jetstream to index community...") 270 271 // Subscribe to Jetstream for community profile events 272 eventChan := make(chan *jetstream.JetstreamEvent, 10) 273 errorChan := make(chan error, 1) 274 done := make(chan bool) 275 276 jetstreamFilterURL := fmt.Sprintf("%s?wantedCollections=social.coves.community.profile", jetstreamURL) 277 278 go func() { 279 err := subscribeToJetstreamForCommunity(ctx, jetstreamFilterURL, communityDID, communityConsumer, eventChan, errorChan, done) 280 if err != nil { 281 errorChan <- err 282 } 283 }() 284 285 select { 286 case event := <-eventChan: 287 t.Logf("✅ Jetstream event received for community: %s", event.Did) 288 close(done) 289 case err := <-errorChan: 290 t.Fatalf("❌ Jetstream error: %v", err) 291 case <-time.After(30 * time.Second): 292 close(done) 293 // Check if simulation fallback is allowed (for CI environments) 294 if os.Getenv("ALLOW_SIMULATION_FALLBACK") == "true" { 295 t.Log("⚠️ Timeout waiting for Jetstream event - falling back to simulation (CI mode)") 296 // Simulate indexing for test speed 297 simulateCommunityIndexing(t, db, communityDID, communityHandle, userADID) 298 } else { 299 t.Fatal("❌ Jetstream timeout - real infrastructure test failed. Set ALLOW_SIMULATION_FALLBACK=true to allow fallback.") 300 } 301 } 302 303 // Verify community is indexed 304 indexed, err := communityRepo.GetByDID(ctx, communityDID) 305 require.NoError(t, err, "Community should be indexed") 306 assert.Equal(t, communityDID, indexed.DID) 307 308 t.Logf("✅ Community indexed in AppView") 309 }) 310 311 // ==================================================================================== 312 // Part 3: User A - Create Post 313 // ==================================================================================== 314 t.Run("3. User A - Create Post", func(t *testing.T) { 315 t.Log("\n📝 Part 3: User A creates a post in the community...") 316 317 title := "My First Gaming Post" 318 content := "This is an E2E test post from the user journey!" 319 320 createReq := map[string]interface{}{ 321 "community": communityDID, 322 "title": title, 323 "content": content, 324 } 325 326 reqBody, _ := json.Marshal(createReq) 327 req, _ := http.NewRequest(http.MethodPost, 328 httpServer.URL+"/xrpc/social.coves.community.post.create", 329 bytes.NewBuffer(reqBody)) 330 req.Header.Set("Content-Type", "application/json") 331 req.Header.Set("Authorization", "Bearer "+userAAPIToken) 332 333 resp, err := http.DefaultClient.Do(req) 334 require.NoError(t, err) 335 defer func() { _ = resp.Body.Close() }() 336 337 require.Equal(t, http.StatusOK, resp.StatusCode, "Post creation should succeed") 338 339 var createResp posts.CreatePostResponse 340 require.NoError(t, json.NewDecoder(resp.Body).Decode(&createResp)) 341 342 postURI = createResp.URI 343 postCID = createResp.CID 344 345 t.Logf("✅ Post created: %s", postURI) 346 347 // Wait for Jetstream event and index in AppView 348 t.Log("⏳ Waiting for Jetstream to index post...") 349 350 eventChan := make(chan *jetstream.JetstreamEvent, 10) 351 errorChan := make(chan error, 1) 352 done := make(chan bool) 353 354 jetstreamFilterURL := fmt.Sprintf("%s?wantedCollections=social.coves.community.post", jetstreamURL) 355 356 go func() { 357 err := subscribeToJetstreamForPost(ctx, jetstreamFilterURL, communityDID, postConsumer, eventChan, errorChan, done) 358 if err != nil { 359 errorChan <- err 360 } 361 }() 362 363 select { 364 case event := <-eventChan: 365 t.Logf("✅ Jetstream event received for post: %s", event.Commit.RKey) 366 close(done) 367 case err := <-errorChan: 368 t.Fatalf("❌ Jetstream error: %v", err) 369 case <-time.After(30 * time.Second): 370 close(done) 371 // Check if simulation fallback is allowed (for CI environments) 372 if os.Getenv("ALLOW_SIMULATION_FALLBACK") == "true" { 373 t.Log("⚠️ Timeout waiting for Jetstream event - falling back to simulation (CI mode)") 374 // Simulate indexing for test speed 375 simulatePostIndexing(t, db, postConsumer, ctx, communityDID, userADID, postURI, postCID, title, content) 376 } else { 377 t.Fatal("❌ Jetstream timeout - real infrastructure test failed. Set ALLOW_SIMULATION_FALLBACK=true to allow fallback.") 378 } 379 } 380 381 // Verify post is indexed 382 indexed, err := postRepo.GetByURI(ctx, postURI) 383 require.NoError(t, err, "Post should be indexed") 384 assert.Equal(t, postURI, indexed.URI) 385 assert.Equal(t, userADID, indexed.AuthorDID) 386 assert.Equal(t, 0, indexed.CommentCount, "Initial comment count should be 0") 387 assert.Equal(t, 0, indexed.UpvoteCount, "Initial upvote count should be 0") 388 389 t.Logf("✅ Post indexed in AppView") 390 }) 391 392 // ==================================================================================== 393 // Part 4: User B - Signup and Authenticate 394 // ==================================================================================== 395 t.Run("4. User B - Signup and Authenticate", func(t *testing.T) { 396 t.Log("\n👤 Part 4: User B creates account and authenticates...") 397 398 // Use short handle format to stay under PDS 34-char limit 399 shortTS := timestamp % 10000 // Use last 4 digits 400 userBHandle = fmt.Sprintf("bob%d.local.coves.dev", shortTS) 401 email := fmt.Sprintf("bob%d@test.com", shortTS) 402 password := "test-password-bob-123" 403 404 // Create account on PDS 405 userBToken, userBDID, err = createPDSAccount(pdsURL, userBHandle, email, password) 406 require.NoError(t, err, "User B should be able to create account") 407 require.NotEmpty(t, userBToken, "User B should receive access token") 408 require.NotEmpty(t, userBDID, "User B should receive DID") 409 410 t.Logf("✅ User B created: %s (%s)", userBHandle, userBDID) 411 412 // Index user in AppView 413 userB := createTestUser(t, db, userBHandle, userBDID) 414 require.NotNil(t, userB) 415 416 // Register user with OAuth middleware for Coves API requests 417 userBAPIToken = e2eAuth.AddUser(userBDID) 418 419 t.Logf("✅ User B indexed in AppView") 420 }) 421 422 // ==================================================================================== 423 // Part 5: User B - Subscribe to Community 424 // ==================================================================================== 425 t.Run("5. User B - Subscribe to Community", func(t *testing.T) { 426 t.Log("\n🔔 Part 5: User B subscribes to the community...") 427 428 // Get initial subscriber count 429 initialCommunity, err := communityRepo.GetByDID(ctx, communityDID) 430 require.NoError(t, err) 431 initialCount := initialCommunity.SubscriberCount 432 433 subscribeReq := map[string]interface{}{ 434 "community": communityDID, 435 "contentVisibility": 5, 436 } 437 438 reqBody, _ := json.Marshal(subscribeReq) 439 req, _ := http.NewRequest(http.MethodPost, 440 httpServer.URL+"/xrpc/social.coves.community.subscribe", 441 bytes.NewBuffer(reqBody)) 442 req.Header.Set("Content-Type", "application/json") 443 req.Header.Set("Authorization", "Bearer "+userBAPIToken) 444 445 resp, err := http.DefaultClient.Do(req) 446 require.NoError(t, err) 447 defer func() { _ = resp.Body.Close() }() 448 449 require.Equal(t, http.StatusOK, resp.StatusCode, "Subscription should succeed") 450 451 var subscribeResp struct { 452 URI string `json:"uri"` 453 CID string `json:"cid"` 454 } 455 require.NoError(t, json.NewDecoder(resp.Body).Decode(&subscribeResp)) 456 457 t.Logf("✅ Subscription created: %s", subscribeResp.URI) 458 459 // Simulate Jetstream event indexing the subscription 460 // (In production, this would come from real Jetstream) 461 rkey := strings.Split(subscribeResp.URI, "/")[4] 462 subEvent := jetstream.JetstreamEvent{ 463 Did: userBDID, 464 TimeUS: time.Now().UnixMicro(), 465 Kind: "commit", 466 Commit: &jetstream.CommitEvent{ 467 Rev: "test-sub-rev", 468 Operation: "create", 469 Collection: "social.coves.community.subscription", 470 RKey: rkey, 471 CID: subscribeResp.CID, 472 Record: map[string]interface{}{ 473 "$type": "social.coves.community.subscription", 474 "subject": communityDID, 475 "contentVisibility": float64(5), 476 "createdAt": time.Now().Format(time.RFC3339), 477 }, 478 }, 479 } 480 require.NoError(t, communityConsumer.HandleEvent(ctx, &subEvent)) 481 482 // Verify subscription indexed and subscriber count incremented 483 updatedCommunity, err := communityRepo.GetByDID(ctx, communityDID) 484 require.NoError(t, err) 485 assert.Equal(t, initialCount+1, updatedCommunity.SubscriberCount, 486 "Subscriber count should increment") 487 488 t.Logf("✅ Subscriber count: %d → %d", initialCount, updatedCommunity.SubscriberCount) 489 }) 490 491 // ==================================================================================== 492 // Part 6: User B - Add Comment to Post 493 // ==================================================================================== 494 t.Run("6. User B - Add Comment to Post", func(t *testing.T) { 495 t.Log("\n💬 Part 6: User B comments on User A's post...") 496 497 // Get initial comment count 498 initialPost, err := postRepo.GetByURI(ctx, postURI) 499 require.NoError(t, err) 500 initialCommentCount := initialPost.CommentCount 501 502 // User B creates comment via PDS (simulate) 503 commentRKey := generateTID() 504 commentURI = fmt.Sprintf("at://%s/social.coves.community.comment/%s", userBDID, commentRKey) 505 commentCID = "bafycommentjourney123" 506 507 commentEvent := &jetstream.JetstreamEvent{ 508 Did: userBDID, 509 Kind: "commit", 510 Commit: &jetstream.CommitEvent{ 511 Rev: "test-comment-rev", 512 Operation: "create", 513 Collection: "social.coves.community.comment", 514 RKey: commentRKey, 515 CID: commentCID, 516 Record: map[string]interface{}{ 517 "$type": "social.coves.community.comment", 518 "content": "Great post! This E2E test is working perfectly!", 519 "reply": map[string]interface{}{ 520 "root": map[string]interface{}{ 521 "uri": postURI, 522 "cid": postCID, 523 }, 524 "parent": map[string]interface{}{ 525 "uri": postURI, 526 "cid": postCID, 527 }, 528 }, 529 "createdAt": time.Now().Format(time.RFC3339), 530 }, 531 }, 532 } 533 534 require.NoError(t, commentConsumer.HandleEvent(ctx, commentEvent)) 535 536 t.Logf("✅ Comment created: %s", commentURI) 537 538 // Verify comment indexed 539 indexed, err := commentRepo.GetByURI(ctx, commentURI) 540 require.NoError(t, err) 541 assert.Equal(t, commentURI, indexed.URI) 542 assert.Equal(t, userBDID, indexed.CommenterDID) 543 assert.Equal(t, 0, indexed.UpvoteCount, "Initial upvote count should be 0") 544 545 // Verify post comment count incremented 546 updatedPost, err := postRepo.GetByURI(ctx, postURI) 547 require.NoError(t, err) 548 assert.Equal(t, initialCommentCount+1, updatedPost.CommentCount, 549 "Post comment count should increment") 550 551 t.Logf("✅ Comment count: %d → %d", initialCommentCount, updatedPost.CommentCount) 552 }) 553 554 // ==================================================================================== 555 // Part 7: User B - Upvote Post 556 // ==================================================================================== 557 t.Run("7. User B - Upvote Post", func(t *testing.T) { 558 t.Log("\n⬆️ Part 7: User B upvotes User A's post...") 559 560 // Get initial vote counts 561 initialPost, err := postRepo.GetByURI(ctx, postURI) 562 require.NoError(t, err) 563 initialUpvotes := initialPost.UpvoteCount 564 initialScore := initialPost.Score 565 566 // User B creates upvote via PDS (simulate) 567 voteRKey := generateTID() 568 voteURI := fmt.Sprintf("at://%s/social.coves.feed.vote/%s", userBDID, voteRKey) 569 570 voteEvent := &jetstream.JetstreamEvent{ 571 Did: userBDID, 572 Kind: "commit", 573 Commit: &jetstream.CommitEvent{ 574 Rev: "test-vote-rev", 575 Operation: "create", 576 Collection: "social.coves.feed.vote", 577 RKey: voteRKey, 578 CID: "bafyvotejourney123", 579 Record: map[string]interface{}{ 580 "$type": "social.coves.feed.vote", 581 "subject": map[string]interface{}{ 582 "uri": postURI, 583 "cid": postCID, 584 }, 585 "direction": "up", 586 "createdAt": time.Now().Format(time.RFC3339), 587 }, 588 }, 589 } 590 591 require.NoError(t, voteConsumer.HandleEvent(ctx, voteEvent)) 592 593 t.Logf("✅ Upvote created: %s", voteURI) 594 595 // Verify vote indexed 596 indexed, err := voteRepo.GetByURI(ctx, voteURI) 597 require.NoError(t, err) 598 assert.Equal(t, voteURI, indexed.URI) 599 assert.Equal(t, userBDID, indexed.VoterDID) // User B created the vote 600 assert.Equal(t, "up", indexed.Direction) 601 602 // Verify post vote counts updated 603 updatedPost, err := postRepo.GetByURI(ctx, postURI) 604 require.NoError(t, err) 605 assert.Equal(t, initialUpvotes+1, updatedPost.UpvoteCount, 606 "Post upvote count should increment") 607 assert.Equal(t, initialScore+1, updatedPost.Score, 608 "Post score should increment") 609 610 t.Logf("✅ Post upvotes: %d → %d, score: %d → %d", 611 initialUpvotes, updatedPost.UpvoteCount, 612 initialScore, updatedPost.Score) 613 }) 614 615 // ==================================================================================== 616 // Part 8: User A - Upvote Comment 617 // ==================================================================================== 618 t.Run("8. User A - Upvote Comment", func(t *testing.T) { 619 t.Log("\n⬆️ Part 8: User A upvotes User B's comment...") 620 621 // Get initial vote counts 622 initialComment, err := commentRepo.GetByURI(ctx, commentURI) 623 require.NoError(t, err) 624 initialUpvotes := initialComment.UpvoteCount 625 initialScore := initialComment.Score 626 627 // User A creates upvote via PDS (simulate) 628 voteRKey := generateTID() 629 voteURI := fmt.Sprintf("at://%s/social.coves.feed.vote/%s", userADID, voteRKey) 630 631 voteEvent := &jetstream.JetstreamEvent{ 632 Did: userADID, 633 Kind: "commit", 634 Commit: &jetstream.CommitEvent{ 635 Rev: "test-vote-comment-rev", 636 Operation: "create", 637 Collection: "social.coves.feed.vote", 638 RKey: voteRKey, 639 CID: "bafyvotecommentjourney123", 640 Record: map[string]interface{}{ 641 "$type": "social.coves.feed.vote", 642 "subject": map[string]interface{}{ 643 "uri": commentURI, 644 "cid": commentCID, 645 }, 646 "direction": "up", 647 "createdAt": time.Now().Format(time.RFC3339), 648 }, 649 }, 650 } 651 652 require.NoError(t, voteConsumer.HandleEvent(ctx, voteEvent)) 653 654 t.Logf("✅ Upvote on comment created: %s", voteURI) 655 656 // Verify comment vote counts updated 657 updatedComment, err := commentRepo.GetByURI(ctx, commentURI) 658 require.NoError(t, err) 659 assert.Equal(t, initialUpvotes+1, updatedComment.UpvoteCount, 660 "Comment upvote count should increment") 661 assert.Equal(t, initialScore+1, updatedComment.Score, 662 "Comment score should increment") 663 664 t.Logf("✅ Comment upvotes: %d → %d, score: %d → %d", 665 initialUpvotes, updatedComment.UpvoteCount, 666 initialScore, updatedComment.Score) 667 }) 668 669 // ==================================================================================== 670 // Part 9: User B - Verify Timeline Feed 671 // ==================================================================================== 672 t.Run("9. User B - Verify Timeline Feed Shows Subscribed Community Posts", func(t *testing.T) { 673 t.Log("\n📰 Part 9: User B checks timeline feed...") 674 675 // Use HTTP client to properly go through auth middleware with Bearer token 676 req, _ := http.NewRequest(http.MethodGet, 677 httpServer.URL+"/xrpc/social.coves.feed.getTimeline?sort=new&limit=10", nil) 678 req.Header.Set("Authorization", "Bearer "+userBAPIToken) 679 680 resp, err := http.DefaultClient.Do(req) 681 require.NoError(t, err) 682 defer func() { _ = resp.Body.Close() }() 683 684 require.Equal(t, http.StatusOK, resp.StatusCode, "Timeline request should succeed") 685 686 var response timelineCore.TimelineResponse 687 require.NoError(t, json.NewDecoder(resp.Body).Decode(&response)) 688 689 // User B should see the post from the community they subscribed to 690 require.NotEmpty(t, response.Feed, "Timeline should contain posts") 691 692 // Find our test post in the feed 693 foundPost := false 694 for _, feedPost := range response.Feed { 695 if feedPost.Post.URI == postURI { 696 foundPost = true 697 assert.Equal(t, userADID, feedPost.Post.Author.DID, 698 "Post author should be User A") 699 assert.Equal(t, communityDID, feedPost.Post.Community.DID, 700 "Post community should match") 701 // Check stats (counts are in Stats struct, not direct fields) 702 require.NotNil(t, feedPost.Post.Stats, "Post should have stats") 703 assert.Equal(t, 1, feedPost.Post.Stats.Upvotes, 704 "Post should show 1 upvote from User B") 705 assert.Equal(t, 1, feedPost.Post.Stats.CommentCount, 706 "Post should show 1 comment from User B") 707 break 708 } 709 } 710 711 assert.True(t, foundPost, "Timeline should contain User A's post from subscribed community") 712 713 t.Logf("✅ Timeline feed verified - User B sees post from subscribed community") 714 }) 715 716 // ==================================================================================== 717 // Test Summary 718 // ==================================================================================== 719 t.Log("\n" + strings.Repeat("=", 80)) 720 t.Log("✅ FULL USER JOURNEY E2E TEST COMPLETE") 721 t.Log(strings.Repeat("=", 80)) 722 t.Log("\n🎯 Complete Flow Tested:") 723 t.Log(" 1. ✓ User A - Signup and Authenticate") 724 t.Log(" 2. ✓ User A - Create Community") 725 t.Log(" 3. ✓ User A - Create Post") 726 t.Log(" 4. ✓ User B - Signup and Authenticate") 727 t.Log(" 5. ✓ User B - Subscribe to Community") 728 t.Log(" 6. ✓ User B - Add Comment to Post") 729 t.Log(" 7. ✓ User B - Upvote Post") 730 t.Log(" 8. ✓ User A - Upvote Comment") 731 t.Log(" 9. ✓ User B - Verify Timeline Feed") 732 t.Log("\n✅ Data Flow Verified:") 733 t.Log(" ✓ All records written to PDS") 734 t.Log(" ✓ Jetstream events consumed (with fallback simulation)") 735 t.Log(" ✓ AppView database indexed correctly") 736 t.Log(" ✓ Counts updated (votes, comments, subscribers)") 737 t.Log(" ✓ Timeline feed aggregates subscribed content") 738 t.Log("\n✅ Multi-User Interaction Verified:") 739 t.Log(" ✓ User A creates community and post") 740 t.Log(" ✓ User B subscribes and interacts") 741 t.Log(" ✓ Cross-user votes and comments") 742 t.Log(" ✓ Feed shows correct personalized content") 743 t.Log("\n" + strings.Repeat("=", 80)) 744} 745 746// Helper: Subscribe to Jetstream for community profile events 747func subscribeToJetstreamForCommunity( 748 ctx context.Context, 749 jetstreamURL string, 750 targetDID string, 751 consumer *jetstream.CommunityEventConsumer, 752 eventChan chan<- *jetstream.JetstreamEvent, 753 errorChan chan<- error, 754 done <-chan bool, 755) error { 756 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 757 if err != nil { 758 return fmt.Errorf("failed to connect to Jetstream: %w", err) 759 } 760 defer func() { _ = conn.Close() }() 761 762 for { 763 select { 764 case <-done: 765 return nil 766 case <-ctx.Done(): 767 return ctx.Err() 768 default: 769 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { 770 return fmt.Errorf("failed to set read deadline: %w", err) 771 } 772 773 var event jetstream.JetstreamEvent 774 err := conn.ReadJSON(&event) 775 if err != nil { 776 if websocket.IsCloseError(err, websocket.CloseNormalClosure) { 777 return nil 778 } 779 if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 780 continue 781 } 782 return fmt.Errorf("failed to read Jetstream message: %w", err) 783 } 784 785 if event.Did == targetDID && event.Kind == "commit" && 786 event.Commit != nil && event.Commit.Collection == "social.coves.community.profile" { 787 if err := consumer.HandleEvent(ctx, &event); err != nil { 788 return fmt.Errorf("failed to process event: %w", err) 789 } 790 791 select { 792 case eventChan <- &event: 793 return nil 794 case <-time.After(1 * time.Second): 795 return fmt.Errorf("timeout sending event to channel") 796 } 797 } 798 } 799 } 800} 801 802// Helper: Simulate community indexing for test speed 803func simulateCommunityIndexing(t *testing.T, db *sql.DB, did, handle, ownerDID string) { 804 t.Helper() 805 806 _, err := db.Exec(` 807 INSERT INTO communities (did, handle, name, display_name, owner_did, created_by_did, 808 hosted_by_did, visibility, moderation_type, record_uri, record_cid, created_at) 809 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW()) 810 ON CONFLICT (did) DO NOTHING 811 `, did, handle, strings.Split(handle, ".")[0], "Test Community", did, ownerDID, 812 "did:web:coves.social", "public", "moderator", 813 fmt.Sprintf("at://%s/social.coves.community.profile/self", did), "fakecid") 814 815 require.NoError(t, err, "Failed to simulate community indexing") 816} 817 818// Helper: Simulate post indexing for test speed 819func simulatePostIndexing(t *testing.T, db *sql.DB, consumer *jetstream.PostEventConsumer, 820 ctx context.Context, communityDID, authorDID, uri, cid, title, content string, 821) { 822 t.Helper() 823 824 rkey := strings.Split(uri, "/")[4] 825 event := jetstream.JetstreamEvent{ 826 Did: communityDID, 827 Kind: "commit", 828 Commit: &jetstream.CommitEvent{ 829 Operation: "create", 830 Collection: "social.coves.community.post", 831 RKey: rkey, 832 CID: cid, 833 Record: map[string]interface{}{ 834 "$type": "social.coves.community.post", 835 "community": communityDID, 836 "author": authorDID, 837 "title": title, 838 "content": content, 839 "createdAt": time.Now().Format(time.RFC3339), 840 }, 841 }, 842 } 843 require.NoError(t, consumer.HandleEvent(ctx, &event)) 844}