A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/api/handlers/post" 5 "Coves/internal/api/middleware" 6 "Coves/internal/atproto/auth" 7 "Coves/internal/atproto/identity" 8 "Coves/internal/atproto/jetstream" 9 "Coves/internal/core/communities" 10 "Coves/internal/core/posts" 11 "Coves/internal/core/users" 12 "Coves/internal/db/postgres" 13 "bytes" 14 "context" 15 "database/sql" 16 "encoding/base64" 17 "encoding/json" 18 "fmt" 19 "net" 20 "net/http" 21 "net/http/httptest" 22 "os" 23 "strings" 24 "testing" 25 "time" 26 27 "github.com/golang-jwt/jwt/v5" 28 "github.com/gorilla/websocket" 29 _ "github.com/lib/pq" 30 "github.com/pressly/goose/v3" 31 "github.com/stretchr/testify/assert" 32 "github.com/stretchr/testify/require" 33) 34 35// TestPostCreation_E2E_WithJetstream tests the full post creation flow: 36// XRPC endpoint → AppView Service → PDS write → Jetstream consumer → DB indexing 37// 38// This is a TRUE E2E test that simulates what happens in production: 39// 1. Client calls POST /xrpc/social.coves.post.create with auth token 40// 2. Handler validates and calls PostService.CreatePost() 41// 3. Service writes post to community's PDS repository 42// 4. PDS broadcasts event to firehose/Jetstream 43// 5. Jetstream consumer receives event and indexes post in AppView DB 44// 6. Post is now queryable from AppView 45// 46// NOTE: This test simulates the Jetstream event (step 4-5) since we don't have 47// a live PDS/Jetstream in test environment. For true live testing, use TestPostCreation_E2E_LivePDS. 48func TestPostCreation_E2E_WithJetstream(t *testing.T) { 49 db := setupTestDB(t) 50 defer func() { 51 if err := db.Close(); err != nil { 52 t.Logf("Failed to close database: %v", err) 53 } 54 }() 55 56 // Cleanup old test data first 57 _, _ = db.Exec("DELETE FROM posts WHERE community_did = 'did:plc:gaming123'") 58 _, _ = db.Exec("DELETE FROM communities WHERE did = 'did:plc:gaming123'") 59 _, _ = db.Exec("DELETE FROM users WHERE did = 'did:plc:alice123'") 60 61 // Setup repositories 62 userRepo := postgres.NewUserRepository(db) 63 communityRepo := postgres.NewCommunityRepository(db) 64 postRepo := postgres.NewPostRepository(db) 65 66 // Setup user service for post consumer 67 identityConfig := identity.DefaultConfig() 68 identityResolver := identity.NewResolver(db, identityConfig) 69 userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001") 70 71 // Create test user (author) 72 author := createTestUser(t, db, "alice.test", "did:plc:alice123") 73 74 // Create test community with fake PDS credentials 75 // In real E2E, this would be a real community provisioned on PDS 76 community := &communities.Community{ 77 DID: "did:plc:gaming123", 78 Handle: "gaming.community.test.coves.social", 79 Name: "gaming", 80 DisplayName: "Gaming Community", 81 OwnerDID: "did:plc:gaming123", 82 CreatedByDID: author.DID, 83 HostedByDID: "did:web:coves.test", 84 Visibility: "public", 85 ModerationType: "moderator", 86 RecordURI: "at://did:plc:gaming123/social.coves.community.profile/self", 87 RecordCID: "fakecid123", 88 PDSAccessToken: "fake_token_for_testing", 89 PDSRefreshToken: "fake_refresh_token", 90 } 91 _, err := communityRepo.Create(context.Background(), community) 92 if err != nil { 93 t.Fatalf("Failed to create test community: %v", err) 94 } 95 96 t.Run("Full E2E flow - XRPC to DB via Jetstream", func(t *testing.T) { 97 ctx := context.Background() 98 99 // STEP 1: Simulate what the XRPC handler would receive 100 // In real flow, this comes from client with OAuth bearer token 101 title := "My First Post" 102 content := "This is a test post!" 103 postReq := posts.CreatePostRequest{ 104 Title: &title, 105 Content: &content, 106 // Community and AuthorDID set by handler from request context 107 } 108 109 // STEP 2: Simulate Jetstream consumer receiving the post CREATE event 110 // In real production, this event comes from PDS via Jetstream WebSocket 111 // For this test, we simulate the event that would be broadcast after PDS write 112 113 // Generate a realistic rkey (TID - timestamp identifier) 114 rkey := generateTID() 115 116 // Build the post record as it would appear in Jetstream 117 jetstreamEvent := jetstream.JetstreamEvent{ 118 Did: community.DID, // Repo owner (community) 119 Kind: "commit", 120 Commit: &jetstream.CommitEvent{ 121 Operation: "create", 122 Collection: "social.coves.post.record", 123 RKey: rkey, 124 CID: "bafy2bzaceabc123def456", // Fake CID 125 Record: map[string]interface{}{ 126 "$type": "social.coves.post.record", 127 "community": community.DID, 128 "author": author.DID, 129 "title": *postReq.Title, 130 "content": *postReq.Content, 131 "createdAt": time.Now().Format(time.RFC3339), 132 }, 133 }, 134 } 135 136 // STEP 3: Process event through Jetstream consumer 137 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 138 err := consumer.HandleEvent(ctx, &jetstreamEvent) 139 if err != nil { 140 t.Fatalf("Jetstream consumer failed to process event: %v", err) 141 } 142 143 // STEP 4: Verify post was indexed in AppView database 144 expectedURI := fmt.Sprintf("at://%s/social.coves.post.record/%s", community.DID, rkey) 145 indexedPost, err := postRepo.GetByURI(ctx, expectedURI) 146 if err != nil { 147 t.Fatalf("Post not indexed in AppView: %v", err) 148 } 149 150 // STEP 5: Verify all fields are correct 151 if indexedPost.URI != expectedURI { 152 t.Errorf("Expected URI %s, got %s", expectedURI, indexedPost.URI) 153 } 154 if indexedPost.AuthorDID != author.DID { 155 t.Errorf("Expected author %s, got %s", author.DID, indexedPost.AuthorDID) 156 } 157 if indexedPost.CommunityDID != community.DID { 158 t.Errorf("Expected community %s, got %s", community.DID, indexedPost.CommunityDID) 159 } 160 if indexedPost.Title == nil || *indexedPost.Title != title { 161 t.Errorf("Expected title '%s', got %v", title, indexedPost.Title) 162 } 163 if indexedPost.Content == nil || *indexedPost.Content != content { 164 t.Errorf("Expected content '%s', got %v", content, indexedPost.Content) 165 } 166 167 // Verify stats initialized correctly 168 if indexedPost.UpvoteCount != 0 { 169 t.Errorf("Expected upvote_count 0, got %d", indexedPost.UpvoteCount) 170 } 171 if indexedPost.DownvoteCount != 0 { 172 t.Errorf("Expected downvote_count 0, got %d", indexedPost.DownvoteCount) 173 } 174 if indexedPost.Score != 0 { 175 t.Errorf("Expected score 0, got %d", indexedPost.Score) 176 } 177 178 t.Logf("✓ E2E test passed! Post indexed with URI: %s", indexedPost.URI) 179 }) 180 181 t.Run("Consumer validates repository ownership (security)", func(t *testing.T) { 182 ctx := context.Background() 183 184 // SECURITY TEST: Try to create a post that claims to be from the community 185 // but actually comes from a user's repository 186 // This should be REJECTED by the consumer 187 188 maliciousEvent := jetstream.JetstreamEvent{ 189 Did: author.DID, // Event from user's repo (NOT community repo) 190 Kind: "commit", 191 Commit: &jetstream.CommitEvent{ 192 Operation: "create", 193 Collection: "social.coves.post.record", 194 RKey: generateTID(), 195 CID: "bafy2bzacefake", 196 Record: map[string]interface{}{ 197 "$type": "social.coves.post.record", 198 "community": community.DID, // Claims to be for this community 199 "author": author.DID, 200 "title": "Fake Post", 201 "content": "This is a malicious post attempt", 202 "createdAt": time.Now().Format(time.RFC3339), 203 }, 204 }, 205 } 206 207 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 208 err := consumer.HandleEvent(ctx, &maliciousEvent) 209 210 // Should get security error 211 if err == nil { 212 t.Fatal("Expected security error for post from wrong repository, got nil") 213 } 214 215 if !contains(err.Error(), "repository DID") || !contains(err.Error(), "doesn't match") { 216 t.Errorf("Expected repository mismatch error, got: %v", err) 217 } 218 219 t.Logf("✓ Security validation passed: %v", err) 220 }) 221 222 t.Run("Idempotent indexing - duplicate events", func(t *testing.T) { 223 ctx := context.Background() 224 225 // Simulate the same Jetstream event arriving twice 226 // This can happen during Jetstream replays or network retries 227 rkey := generateTID() 228 event := jetstream.JetstreamEvent{ 229 Did: community.DID, 230 Kind: "commit", 231 Commit: &jetstream.CommitEvent{ 232 Operation: "create", 233 Collection: "social.coves.post.record", 234 RKey: rkey, 235 CID: "bafy2bzaceidempotent", 236 Record: map[string]interface{}{ 237 "$type": "social.coves.post.record", 238 "community": community.DID, 239 "author": author.DID, 240 "title": "Duplicate Test", 241 "content": "Testing idempotency", 242 "createdAt": time.Now().Format(time.RFC3339), 243 }, 244 }, 245 } 246 247 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 248 249 // First event - should succeed 250 err := consumer.HandleEvent(ctx, &event) 251 if err != nil { 252 t.Fatalf("First event failed: %v", err) 253 } 254 255 // Second event (duplicate) - should be handled gracefully 256 err = consumer.HandleEvent(ctx, &event) 257 if err != nil { 258 t.Fatalf("Duplicate event should be handled gracefully, got error: %v", err) 259 } 260 261 // Verify only one post in database 262 uri := fmt.Sprintf("at://%s/social.coves.post.record/%s", community.DID, rkey) 263 post, err := postRepo.GetByURI(ctx, uri) 264 if err != nil { 265 t.Fatalf("Post not found: %v", err) 266 } 267 268 if post.URI != uri { 269 t.Error("Post URI mismatch - possible duplicate indexing") 270 } 271 272 t.Logf("✓ Idempotency test passed") 273 }) 274 275 t.Run("Handles orphaned posts (unknown community)", func(t *testing.T) { 276 ctx := context.Background() 277 278 // Post references a community that doesn't exist in AppView yet 279 // This can happen if Jetstream delivers post event before community profile event 280 unknownCommunityDID := "did:plc:unknown999" 281 282 event := jetstream.JetstreamEvent{ 283 Did: unknownCommunityDID, 284 Kind: "commit", 285 Commit: &jetstream.CommitEvent{ 286 Operation: "create", 287 Collection: "social.coves.post.record", 288 RKey: generateTID(), 289 CID: "bafy2bzaceorphaned", 290 Record: map[string]interface{}{ 291 "$type": "social.coves.post.record", 292 "community": unknownCommunityDID, 293 "author": author.DID, 294 "title": "Orphaned Post", 295 "content": "Community not indexed yet", 296 "createdAt": time.Now().Format(time.RFC3339), 297 }, 298 }, 299 } 300 301 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 302 303 // Should log warning but NOT fail (eventual consistency) 304 // Note: This will fail due to foreign key constraint in current schema 305 // In production, you might want to handle this differently (defer indexing, etc.) 306 err := consumer.HandleEvent(ctx, &event) 307 308 // For now, we expect this to fail due to FK constraint 309 // In future, we might make FK constraint DEFERRABLE or handle orphaned posts differently 310 if err == nil { 311 t.Log("⚠️ Orphaned post was indexed (FK constraint not enforced)") 312 } else { 313 t.Logf("✓ Orphaned post rejected by FK constraint (expected): %v", err) 314 } 315 }) 316} 317 318// TestPostCreation_E2E_LivePDS tests the COMPLETE end-to-end flow with a live PDS: 319// 1. HTTP POST to /xrpc/social.coves.post.create (with auth) 320// 2. Handler → Service → Write to community's PDS repository 321// 3. PDS → Jetstream firehose event 322// 4. Jetstream consumer → Index in AppView database 323// 5. Verify post appears in database with correct data 324// 325// This is a TRUE E2E test that requires: 326// - Live PDS running at PDS_URL (default: http://localhost:3001) 327// - Live Jetstream running at JETSTREAM_URL (default: ws://localhost:6008/subscribe) 328// - Test database running 329func TestPostCreation_E2E_LivePDS(t *testing.T) { 330 if testing.Short() { 331 t.Skip("Skipping live PDS E2E test in short mode") 332 } 333 334 // Setup test database 335 dbURL := os.Getenv("TEST_DATABASE_URL") 336 if dbURL == "" { 337 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 338 } 339 340 db, err := sql.Open("postgres", dbURL) 341 require.NoError(t, err, "Failed to connect to test database") 342 defer func() { 343 if closeErr := db.Close(); closeErr != nil { 344 t.Logf("Failed to close database: %v", closeErr) 345 } 346 }() 347 348 // Run migrations 349 require.NoError(t, goose.SetDialect("postgres")) 350 require.NoError(t, goose.Up(db, "../../internal/db/migrations")) 351 352 // Check if PDS is running 353 pdsURL := os.Getenv("PDS_URL") 354 if pdsURL == "" { 355 pdsURL = "http://localhost:3001" 356 } 357 358 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 359 if err != nil { 360 t.Skipf("PDS not running at %s: %v", pdsURL, err) 361 } 362 _ = healthResp.Body.Close() 363 364 // Get instance credentials for authentication 365 instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE") 366 instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD") 367 if instanceHandle == "" { 368 instanceHandle = "testuser123.local.coves.dev" 369 } 370 if instancePassword == "" { 371 instancePassword = "test-password-123" 372 } 373 374 t.Logf("🔐 Authenticating with PDS as: %s", instanceHandle) 375 376 // Authenticate to get instance DID (needed for provisioner domain) 377 _, instanceDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword) 378 if err != nil { 379 t.Skipf("Failed to authenticate with PDS (may not be configured): %v", err) 380 } 381 382 t.Logf("✅ Authenticated - Instance DID: %s", instanceDID) 383 384 // Extract instance domain from DID for community provisioning 385 var instanceDomain string 386 if strings.HasPrefix(instanceDID, "did:web:") { 387 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 388 } else { 389 // Fallback for did:plc 390 instanceDomain = "coves.social" 391 } 392 393 // Setup repositories and services 394 communityRepo := postgres.NewCommunityRepository(db) 395 postRepo := postgres.NewPostRepository(db) 396 397 // Setup PDS account provisioner for community creation 398 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL) 399 400 // Setup community service with real PDS provisioner 401 communityService := communities.NewCommunityService( 402 communityRepo, 403 pdsURL, 404 instanceDID, 405 instanceDomain, 406 provisioner, // ✅ Real provisioner for creating communities on PDS 407 ) 408 409 postService := posts.NewPostService(postRepo, communityService, pdsURL) 410 411 // Setup auth middleware (skip JWT verification for testing) 412 authMiddleware := middleware.NewAtProtoAuthMiddleware(nil, true) 413 414 // Setup HTTP handler 415 createHandler := post.NewCreateHandler(postService) 416 417 ctx := context.Background() 418 419 // Cleanup old test data 420 _, _ = db.Exec("DELETE FROM posts WHERE community_did LIKE 'did:plc:e2etest%'") 421 _, _ = db.Exec("DELETE FROM communities WHERE did LIKE 'did:plc:e2etest%'") 422 _, _ = db.Exec("DELETE FROM users WHERE did LIKE 'did:plc:e2etest%'") 423 424 // Create test user (author) 425 author := createTestUser(t, db, "e2etestauthor.bsky.social", "did:plc:e2etestauthor123") 426 427 // ==================================================================================== 428 // Part 1: Write-Forward to PDS 429 // ==================================================================================== 430 t.Run("1. Write-Forward to PDS", func(t *testing.T) { 431 // TRUE E2E: Actually provision a real community on PDS 432 // This tests the full flow: 433 // 1. Call com.atproto.server.createAccount on PDS 434 // 2. PDS generates DID, keys, tokens 435 // 3. Write community profile to PDS repository 436 // 4. Store credentials in AppView DB 437 // 5. Use those credentials to create a post 438 439 // Use timestamp to ensure unique community name for each test run 440 communityName := fmt.Sprintf("e2epost%d", time.Now().Unix()) 441 442 t.Logf("\n📝 Provisioning test community on live PDS (name: %s)...", communityName) 443 community, err := communityService.CreateCommunity(ctx, communities.CreateCommunityRequest{ 444 Name: communityName, 445 DisplayName: "E2E Test Community", 446 Description: "Test community for E2E post creation testing", 447 CreatedByDID: author.DID, 448 Visibility: "public", 449 AllowExternalDiscovery: true, 450 }) 451 require.NoError(t, err, "Failed to provision community on PDS") 452 require.NotEmpty(t, community.DID, "Community should have DID from PDS") 453 require.NotEmpty(t, community.PDSAccessToken, "Community should have access token") 454 require.NotEmpty(t, community.PDSRefreshToken, "Community should have refresh token") 455 456 t.Logf("✓ Community provisioned: DID=%s, Handle=%s", community.DID, community.Handle) 457 458 // NOTE: Cleanup disabled to allow post-test inspection of indexed data 459 // Uncomment to enable cleanup after test 460 // defer func() { 461 // if err := communityRepo.Delete(ctx, community.DID); err != nil { 462 // t.Logf("Warning: Failed to cleanup test community: %v", err) 463 // } 464 // }() 465 466 // Build HTTP request for post creation 467 title := "E2E Test Post" 468 content := "This post was created via full E2E test with live PDS!" 469 reqBody := map[string]interface{}{ 470 "community": community.DID, 471 "title": title, 472 "content": content, 473 } 474 reqJSON, err := json.Marshal(reqBody) 475 require.NoError(t, err) 476 477 // Create HTTP request 478 req := httptest.NewRequest("POST", "/xrpc/social.coves.post.create", bytes.NewReader(reqJSON)) 479 req.Header.Set("Content-Type", "application/json") 480 481 // Create a simple JWT for testing (Phase 1: no signature verification) 482 // In production, this would be a real OAuth token from PDS 483 testJWT := createSimpleTestJWT(author.DID) 484 req.Header.Set("Authorization", "Bearer "+testJWT) 485 486 // Execute request through auth middleware + handler 487 rr := httptest.NewRecorder() 488 handler := authMiddleware.RequireAuth(http.HandlerFunc(createHandler.HandleCreate)) 489 handler.ServeHTTP(rr, req) 490 491 // Check response 492 require.Equal(t, http.StatusOK, rr.Code, "Handler should return 200 OK, body: %s", rr.Body.String()) 493 494 // Parse response 495 var response posts.CreatePostResponse 496 err = json.NewDecoder(rr.Body).Decode(&response) 497 require.NoError(t, err, "Failed to parse response") 498 499 t.Logf("✅ Post created on PDS:") 500 t.Logf(" URI: %s", response.URI) 501 t.Logf(" CID: %s", response.CID) 502 503 // ==================================================================================== 504 // Part 2: TRUE E2E - Real Jetstream Firehose Consumer 505 // ==================================================================================== 506 // This part tests the ACTUAL production code path in main.go 507 // including the WebSocket connection and consumer logic 508 t.Run("2. Real Jetstream Firehose Consumption", func(t *testing.T) { 509 t.Logf("\n🔄 TRUE E2E: Subscribing to real Jetstream firehose...") 510 511 // Get PDS hostname for Jetstream filtering 512 pdsHostname := strings.TrimPrefix(pdsURL, "http://") 513 pdsHostname = strings.TrimPrefix(pdsHostname, "https://") 514 pdsHostname = strings.Split(pdsHostname, ":")[0] // Remove port 515 516 // Build Jetstream URL with filters for post records 517 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.post.record", 518 pdsHostname) 519 520 t.Logf(" Jetstream URL: %s", jetstreamURL) 521 t.Logf(" Looking for post URI: %s", response.URI) 522 t.Logf(" Community DID: %s", community.DID) 523 524 // Setup user service (required by post consumer) 525 userRepo := postgres.NewUserRepository(db) 526 identityConfig := identity.DefaultConfig() 527 plcURL := os.Getenv("PLC_DIRECTORY_URL") 528 if plcURL == "" { 529 plcURL = "http://localhost:3002" 530 } 531 identityConfig.PLCURL = plcURL 532 identityResolver := identity.NewResolver(db, identityConfig) 533 userService := users.NewUserService(userRepo, identityResolver, pdsURL) 534 535 // Create post consumer (same as main.go) 536 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 537 538 // Channels to receive the event 539 eventChan := make(chan *jetstream.JetstreamEvent, 10) 540 errorChan := make(chan error, 1) 541 done := make(chan bool) 542 543 // Start Jetstream WebSocket subscriber in background 544 // This creates its own WebSocket connection to Jetstream 545 go func() { 546 err := subscribeToJetstreamForPost(ctx, jetstreamURL, community.DID, postConsumer, eventChan, errorChan, done) 547 if err != nil { 548 errorChan <- err 549 } 550 }() 551 552 // Wait for event or timeout 553 t.Logf("⏳ Waiting for Jetstream event (max 30 seconds)...") 554 555 select { 556 case event := <-eventChan: 557 t.Logf("✅ Received real Jetstream event!") 558 t.Logf(" Event DID: %s", event.Did) 559 t.Logf(" Collection: %s", event.Commit.Collection) 560 t.Logf(" Operation: %s", event.Commit.Operation) 561 t.Logf(" RKey: %s", event.Commit.RKey) 562 563 // Verify it's for our community 564 assert.Equal(t, community.DID, event.Did, "Event should be from community repo") 565 566 // Verify post was indexed in AppView database 567 t.Logf("\n🔍 Querying AppView database for indexed post...") 568 569 indexedPost, err := postRepo.GetByURI(ctx, response.URI) 570 require.NoError(t, err, "Post should be indexed in AppView") 571 572 t.Logf("✅ Post indexed in AppView:") 573 t.Logf(" URI: %s", indexedPost.URI) 574 t.Logf(" CID: %s", indexedPost.CID) 575 t.Logf(" Author DID: %s", indexedPost.AuthorDID) 576 t.Logf(" Community: %s", indexedPost.CommunityDID) 577 t.Logf(" Title: %v", indexedPost.Title) 578 t.Logf(" Content: %v", indexedPost.Content) 579 580 // Verify all fields match what we sent 581 assert.Equal(t, response.URI, indexedPost.URI, "URI should match") 582 assert.Equal(t, response.CID, indexedPost.CID, "CID should match") 583 assert.Equal(t, author.DID, indexedPost.AuthorDID, "Author DID should match") 584 assert.Equal(t, community.DID, indexedPost.CommunityDID, "Community DID should match") 585 assert.Equal(t, title, *indexedPost.Title, "Title should match") 586 assert.Equal(t, content, *indexedPost.Content, "Content should match") 587 588 // Verify stats initialized correctly 589 assert.Equal(t, 0, indexedPost.UpvoteCount, "Upvote count should be 0") 590 assert.Equal(t, 0, indexedPost.DownvoteCount, "Downvote count should be 0") 591 assert.Equal(t, 0, indexedPost.Score, "Score should be 0") 592 assert.Equal(t, 0, indexedPost.CommentCount, "Comment count should be 0") 593 594 // Verify timestamps 595 assert.False(t, indexedPost.CreatedAt.IsZero(), "CreatedAt should be set") 596 assert.False(t, indexedPost.IndexedAt.IsZero(), "IndexedAt should be set") 597 598 // Signal to stop Jetstream consumer 599 close(done) 600 601 t.Log("\n✅ Part 2 Complete: TRUE E2E - PDS → Jetstream → Consumer → AppView ✓") 602 603 case err := <-errorChan: 604 t.Fatalf("❌ Jetstream error: %v", err) 605 606 case <-time.After(30 * time.Second): 607 t.Fatalf("❌ Timeout: No Jetstream event received within 30 seconds") 608 } 609 }) 610 }) 611} 612 613// createSimpleTestJWT creates a minimal JWT for testing (Phase 1 - no signature) 614// In production, this would be a real OAuth token from PDS with proper signatures 615func createSimpleTestJWT(userDID string) string { 616 // Create minimal JWT claims using RegisteredClaims 617 // Use userDID as issuer since we don't have a proper PDS DID for testing 618 claims := auth.Claims{ 619 RegisteredClaims: jwt.RegisteredClaims{ 620 Subject: userDID, 621 Issuer: userDID, // Use DID as issuer for testing (valid per atProto) 622 Audience: jwt.ClaimStrings{"did:web:test.coves.social"}, 623 IssuedAt: jwt.NewNumericDate(time.Now()), 624 ExpiresAt: jwt.NewNumericDate(time.Now().Add(1 * time.Hour)), 625 }, 626 Scope: "com.atproto.access", 627 } 628 629 // For Phase 1 testing, we create an unsigned JWT 630 // The middleware is configured with skipVerify=true for testing 631 header := map[string]interface{}{ 632 "alg": "none", 633 "typ": "JWT", 634 } 635 636 headerJSON, _ := json.Marshal(header) 637 claimsJSON, _ := json.Marshal(claims) 638 639 // Base64url encode (without padding) 640 headerB64 := base64.RawURLEncoding.EncodeToString(headerJSON) 641 claimsB64 := base64.RawURLEncoding.EncodeToString(claimsJSON) 642 643 // For "alg: none", signature is empty 644 return headerB64 + "." + claimsB64 + "." 645} 646 647// generateTID generates a simple timestamp-based identifier for testing 648// In production, PDS generates proper TIDs 649func generateTID() string { 650 return fmt.Sprintf("3k%d", time.Now().UnixNano()/1000) 651} 652 653// subscribeToJetstreamForPost subscribes to real Jetstream firehose and processes post events 654// This helper creates a WebSocket connection to Jetstream and waits for post events 655func subscribeToJetstreamForPost( 656 ctx context.Context, 657 jetstreamURL string, 658 targetDID string, 659 consumer *jetstream.PostEventConsumer, 660 eventChan chan<- *jetstream.JetstreamEvent, 661 errorChan chan<- error, 662 done <-chan bool, 663) error { 664 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 665 if err != nil { 666 return fmt.Errorf("failed to connect to Jetstream: %w", err) 667 } 668 defer func() { _ = conn.Close() }() 669 670 // Read messages until we find our event or receive done signal 671 for { 672 select { 673 case <-done: 674 return nil 675 case <-ctx.Done(): 676 return ctx.Err() 677 default: 678 // Set read deadline to avoid blocking forever 679 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { 680 return fmt.Errorf("failed to set read deadline: %w", err) 681 } 682 683 var event jetstream.JetstreamEvent 684 err := conn.ReadJSON(&event) 685 if err != nil { 686 // Check if it's a timeout (expected) 687 if websocket.IsCloseError(err, websocket.CloseNormalClosure) { 688 return nil 689 } 690 if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 691 continue // Timeout is expected, keep listening 692 } 693 // For other errors, don't retry reading from a broken connection 694 return fmt.Errorf("failed to read Jetstream message: %w", err) 695 } 696 697 // Check if this is a post event for the target DID 698 if event.Did == targetDID && event.Kind == "commit" && 699 event.Commit != nil && event.Commit.Collection == "social.coves.post.record" { 700 // Process the event through the consumer 701 if err := consumer.HandleEvent(ctx, &event); err != nil { 702 return fmt.Errorf("failed to process event: %w", err) 703 } 704 705 // Send to channel so test can verify 706 select { 707 case eventChan <- &event: 708 return nil 709 case <-time.After(1 * time.Second): 710 return fmt.Errorf("timeout sending event to channel") 711 } 712 } 713 } 714 } 715}