A community based topic aggregation platform built on atproto
at main 13 kB view raw
1""" 2End-to-End Integration Tests. 3 4Tests the complete aggregator workflow against live infrastructure: 5- Real HTTP mocking (Kagi RSS) 6- Real PDS (Coves test PDS via Docker) 7- Real community posting 8- Real state management 9 10Requires: 11- Coves test PDS running on localhost:3001 12- Test database with community: e2e-95206.community.coves.social 13- Aggregator account: kagi-news.local.coves.dev 14""" 15import os 16import pytest 17import responses 18from pathlib import Path 19from datetime import datetime 20 21from src.main import Aggregator 22from src.coves_client import CovesClient 23from src.config import ConfigLoader 24 25 26# Skip E2E tests by default (require live infrastructure) 27pytestmark = pytest.mark.skipif( 28 os.getenv('RUN_E2E_TESTS') != '1', 29 reason="E2E tests require RUN_E2E_TESTS=1 and live PDS" 30) 31 32 33@pytest.fixture 34def test_community(aggregator_credentials): 35 """Create a test community for E2E testing.""" 36 import time 37 import requests 38 39 handle, password = aggregator_credentials 40 41 # Authenticate 42 auth_response = requests.post( 43 "http://localhost:3001/xrpc/com.atproto.server.createSession", 44 json={"identifier": handle, "password": password} 45 ) 46 token = auth_response.json()["accessJwt"] 47 48 # Create community (use short name to avoid handle length limits) 49 community_name = f"e2e-{int(time.time()) % 10000}" # Last 4 digits only 50 create_response = requests.post( 51 "http://localhost:8081/xrpc/social.coves.community.create", 52 headers={"Authorization": f"Bearer {token}"}, 53 json={ 54 "name": community_name, 55 "displayName": "E2E Test Community", 56 "description": "Temporary community for aggregator E2E testing", 57 "visibility": "public" 58 } 59 ) 60 61 if create_response.ok: 62 community = create_response.json() 63 community_handle = f"{community_name}.community.coves.social" 64 print(f"\n✅ Created test community: {community_handle}") 65 return community_handle 66 else: 67 raise Exception(f"Failed to create community: {create_response.text}") 68 69 70@pytest.fixture 71def test_config_file(tmp_path, test_community): 72 """Create test configuration file with dynamic community.""" 73 config_content = f""" 74coves_api_url: http://localhost:8081 75 76feeds: 77 - name: "Kagi World News" 78 url: "https://news.kagi.com/world.xml" 79 community_handle: "{test_community}" 80 enabled: true 81 82log_level: debug 83""" 84 config_file = tmp_path / "config.yaml" 85 config_file.write_text(config_content) 86 return config_file 87 88 89@pytest.fixture 90def test_state_file(tmp_path): 91 """Create temporary state file.""" 92 return tmp_path / "state.json" 93 94 95@pytest.fixture 96def mock_kagi_feed(): 97 """Load real Kagi RSS feed fixture.""" 98 # Load from data directory (where actual feed is stored) 99 fixture_path = Path(__file__).parent.parent / "data" / "world.xml" 100 if not fixture_path.exists(): 101 # Fallback to tests/fixtures if moved 102 fixture_path = Path(__file__).parent / "fixtures" / "world.xml" 103 return fixture_path.read_text() 104 105 106@pytest.fixture 107def aggregator_credentials(): 108 """Get aggregator credentials from environment.""" 109 handle = os.getenv('AGGREGATOR_HANDLE', 'kagi-news.local.coves.dev') 110 password = os.getenv('AGGREGATOR_PASSWORD', 'kagi-aggregator-2024-secure-pass') 111 return handle, password 112 113 114class TestEndToEnd: 115 """Full end-to-end integration tests.""" 116 117 @responses.activate 118 def test_full_aggregator_workflow( 119 self, 120 test_config_file, 121 test_state_file, 122 mock_kagi_feed, 123 aggregator_credentials 124 ): 125 """ 126 Test complete workflow: fetch → parse → format → post → verify. 127 128 This test: 129 1. Mocks Kagi RSS HTTP request 130 2. Authenticates with real PDS 131 3. Parses real Kagi HTML content 132 4. Formats with rich text facets 133 5. Posts to real community 134 6. Verifies post was created 135 7. Tests deduplication (no repost) 136 """ 137 # Mock Kagi RSS feed 138 responses.add( 139 responses.GET, 140 "https://news.kagi.com/world.xml", 141 body=mock_kagi_feed, 142 status=200, 143 content_type="application/xml" 144 ) 145 146 # Allow passthrough for localhost (PDS) 147 responses.add_passthru("http://localhost") 148 149 # Set up environment 150 handle, password = aggregator_credentials 151 os.environ['AGGREGATOR_HANDLE'] = handle 152 os.environ['AGGREGATOR_PASSWORD'] = password 153 os.environ['PDS_URL'] = 'http://localhost:3001' # Auth through PDS 154 155 # Create aggregator 156 aggregator = Aggregator( 157 config_path=test_config_file, 158 state_file=test_state_file 159 ) 160 161 # Run first time: should post stories 162 print("\n" + "="*60) 163 print("🚀 Running first aggregator pass (should post stories)") 164 print("="*60) 165 aggregator.run() 166 167 # Verify state was updated (stories marked as posted) 168 posted_count = aggregator.state_manager.get_posted_count( 169 "https://news.kagi.com/world.xml" 170 ) 171 print(f"\n✅ First pass: {posted_count} stories posted and tracked") 172 assert posted_count > 0, "Should have posted at least one story" 173 174 # Create new aggregator instance (simulates CRON re-run) 175 aggregator2 = Aggregator( 176 config_path=test_config_file, 177 state_file=test_state_file 178 ) 179 180 # Run second time: should skip duplicates 181 print("\n" + "="*60) 182 print("🔄 Running second aggregator pass (should skip duplicates)") 183 print("="*60) 184 aggregator2.run() 185 186 # Verify count didn't change (deduplication worked) 187 posted_count2 = aggregator2.state_manager.get_posted_count( 188 "https://news.kagi.com/world.xml" 189 ) 190 print(f"\n✅ Second pass: Still {posted_count2} stories (duplicates skipped)") 191 assert posted_count2 == posted_count, "Should not post duplicates" 192 193 @responses.activate 194 def test_post_with_external_embed( 195 self, 196 test_config_file, 197 test_state_file, 198 mock_kagi_feed, 199 aggregator_credentials 200 ): 201 """ 202 Test that posts include external embeds with images. 203 204 Verifies: 205 - External embed is created 206 - Thumbnail URL is included 207 - Title and description are set 208 """ 209 # Mock Kagi RSS feed 210 responses.add( 211 responses.GET, 212 "https://news.kagi.com/world.xml", 213 body=mock_kagi_feed, 214 status=200 215 ) 216 217 # Allow passthrough for localhost (PDS) 218 responses.add_passthru("http://localhost") 219 220 # Set up environment 221 handle, password = aggregator_credentials 222 os.environ['AGGREGATOR_HANDLE'] = handle 223 os.environ['AGGREGATOR_PASSWORD'] = password 224 os.environ['PDS_URL'] = 'http://localhost:3001' # Auth through PDS 225 226 # Run aggregator 227 aggregator = Aggregator( 228 config_path=test_config_file, 229 state_file=test_state_file 230 ) 231 232 print("\n" + "="*60) 233 print("🖼️ Testing external embed creation") 234 print("="*60) 235 aggregator.run() 236 237 # Verify posts were created 238 posted_count = aggregator.state_manager.get_posted_count( 239 "https://news.kagi.com/world.xml" 240 ) 241 print(f"\n✅ Posted {posted_count} stories with external embeds") 242 assert posted_count > 0 243 244 def test_authentication_with_live_pds(self, aggregator_credentials): 245 """ 246 Test authentication against live PDS. 247 248 Verifies: 249 - Can authenticate with aggregator account 250 - Receives valid JWT tokens 251 - DID matches expected format 252 """ 253 handle, password = aggregator_credentials 254 255 print("\n" + "="*60) 256 print(f"🔐 Testing authentication: {handle}") 257 print("="*60) 258 259 # Create client and authenticate 260 client = CovesClient( 261 api_url="http://localhost:8081", # AppView for posting 262 handle=handle, 263 password=password, 264 pds_url="http://localhost:3001" # PDS for auth 265 ) 266 267 client.authenticate() 268 269 print(f"\n✅ Authentication successful") 270 print(f" Handle: {client.handle}") 271 print(f" Authenticated: {client._authenticated}") 272 273 assert client._authenticated is True 274 assert hasattr(client, 'did') 275 assert client.did.startswith("did:plc:") 276 277 def test_state_persistence_across_runs( 278 self, 279 test_config_file, 280 test_state_file, 281 aggregator_credentials 282 ): 283 """ 284 Test that state persists correctly across multiple runs. 285 286 Verifies: 287 - State file is created 288 - Posted GUIDs are tracked 289 - Last run timestamp is updated 290 - State survives aggregator restart 291 """ 292 # Mock empty feed (to avoid posting) 293 import responses as resp 294 resp.start() 295 resp.add( 296 resp.GET, 297 "https://news.kagi.com/world.xml", 298 body='<?xml version="1.0"?><rss version="2.0"><channel></channel></rss>', 299 status=200 300 ) 301 302 handle, password = aggregator_credentials 303 os.environ['AGGREGATOR_HANDLE'] = handle 304 os.environ['AGGREGATOR_PASSWORD'] = password 305 306 print("\n" + "="*60) 307 print("💾 Testing state persistence") 308 print("="*60) 309 310 # First run 311 aggregator1 = Aggregator( 312 config_path=test_config_file, 313 state_file=test_state_file 314 ) 315 aggregator1.run() 316 317 # Verify state file was created 318 assert test_state_file.exists(), "State file should be created" 319 print(f"\n✅ State file created: {test_state_file}") 320 321 # Verify last run was recorded 322 last_run1 = aggregator1.state_manager.get_last_run( 323 "https://news.kagi.com/world.xml" 324 ) 325 assert last_run1 is not None, "Last run should be recorded" 326 print(f" Last run: {last_run1}") 327 328 # Second run (new instance) 329 aggregator2 = Aggregator( 330 config_path=test_config_file, 331 state_file=test_state_file 332 ) 333 aggregator2.run() 334 335 # Verify state persisted 336 last_run2 = aggregator2.state_manager.get_last_run( 337 "https://news.kagi.com/world.xml" 338 ) 339 assert last_run2 >= last_run1, "Last run should be updated" 340 print(f" Last run (after restart): {last_run2}") 341 print(f"\n✅ State persisted across aggregator restarts") 342 343 resp.stop() 344 resp.reset() 345 346 def test_error_recovery( 347 self, 348 test_config_file, 349 test_state_file, 350 aggregator_credentials 351 ): 352 """ 353 Test that aggregator handles errors gracefully. 354 355 Verifies: 356 - Continues processing on feed errors 357 - Doesn't crash on network failures 358 - Logs errors appropriately 359 """ 360 # Mock feed failure 361 import responses as resp 362 resp.start() 363 resp.add( 364 resp.GET, 365 "https://news.kagi.com/world.xml", 366 body="Internal Server Error", 367 status=500 368 ) 369 370 handle, password = aggregator_credentials 371 os.environ['AGGREGATOR_HANDLE'] = handle 372 os.environ['AGGREGATOR_PASSWORD'] = password 373 374 print("\n" + "="*60) 375 print("🛡️ Testing error recovery") 376 print("="*60) 377 378 # Should not crash 379 aggregator = Aggregator( 380 config_path=test_config_file, 381 state_file=test_state_file 382 ) 383 384 try: 385 aggregator.run() 386 print(f"\n✅ Aggregator handled feed error gracefully") 387 except Exception as e: 388 pytest.fail(f"Aggregator should handle errors gracefully: {e}") 389 390 resp.stop() 391 resp.reset() 392 393 394def test_coves_client_external_embed_format(aggregator_credentials): 395 """ 396 Test external embed formatting. 397 398 Verifies: 399 - Embed structure matches social.coves.embed.external 400 - All required fields are present 401 - Thumbnails are handled by server's unfurl service (not included in client) 402 """ 403 handle, password = aggregator_credentials 404 405 client = CovesClient( 406 api_url="http://localhost:8081", 407 handle=handle, 408 password=password 409 ) 410 411 # Create external embed (server will handle thumbnail extraction) 412 embed = client.create_external_embed( 413 uri="https://example.com/story", 414 title="Test Story", 415 description="Test description" 416 ) 417 418 assert embed["$type"] == "social.coves.embed.external" 419 assert embed["external"]["uri"] == "https://example.com/story" 420 assert embed["external"]["title"] == "Test Story" 421 assert embed["external"]["description"] == "Test description" 422 # Thumbnail is not included - server's unfurl service handles it 423 assert "thumb" not in embed["external"] 424 print("\n✅ External embed format correct")