A community based topic aggregation platform built on atproto
1"""
2End-to-End Integration Tests.
3
4Tests the complete aggregator workflow against live infrastructure:
5- Real HTTP mocking (Kagi RSS)
6- Real PDS (Coves test PDS via Docker)
7- Real community posting
8- Real state management
9
10Requires:
11- Coves test PDS running on localhost:3001
12- Test database with community: e2e-95206.community.coves.social
13- Aggregator account: kagi-news.local.coves.dev
14"""
15import os
16import pytest
17import responses
18from pathlib import Path
19from datetime import datetime
20
21from src.main import Aggregator
22from src.coves_client import CovesClient
23from src.config import ConfigLoader
24
25
26# Skip E2E tests by default (require live infrastructure)
27pytestmark = pytest.mark.skipif(
28 os.getenv('RUN_E2E_TESTS') != '1',
29 reason="E2E tests require RUN_E2E_TESTS=1 and live PDS"
30)
31
32
33@pytest.fixture
34def test_community(aggregator_credentials):
35 """Create a test community for E2E testing."""
36 import time
37 import requests
38
39 handle, password = aggregator_credentials
40
41 # Authenticate
42 auth_response = requests.post(
43 "http://localhost:3001/xrpc/com.atproto.server.createSession",
44 json={"identifier": handle, "password": password}
45 )
46 token = auth_response.json()["accessJwt"]
47
48 # Create community (use short name to avoid handle length limits)
49 community_name = f"e2e-{int(time.time()) % 10000}" # Last 4 digits only
50 create_response = requests.post(
51 "http://localhost:8081/xrpc/social.coves.community.create",
52 headers={"Authorization": f"Bearer {token}"},
53 json={
54 "name": community_name,
55 "displayName": "E2E Test Community",
56 "description": "Temporary community for aggregator E2E testing",
57 "visibility": "public"
58 }
59 )
60
61 if create_response.ok:
62 community = create_response.json()
63 community_handle = f"{community_name}.community.coves.social"
64 print(f"\n✅ Created test community: {community_handle}")
65 return community_handle
66 else:
67 raise Exception(f"Failed to create community: {create_response.text}")
68
69
70@pytest.fixture
71def test_config_file(tmp_path, test_community):
72 """Create test configuration file with dynamic community."""
73 config_content = f"""
74coves_api_url: http://localhost:8081
75
76feeds:
77 - name: "Kagi World News"
78 url: "https://news.kagi.com/world.xml"
79 community_handle: "{test_community}"
80 enabled: true
81
82log_level: debug
83"""
84 config_file = tmp_path / "config.yaml"
85 config_file.write_text(config_content)
86 return config_file
87
88
89@pytest.fixture
90def test_state_file(tmp_path):
91 """Create temporary state file."""
92 return tmp_path / "state.json"
93
94
95@pytest.fixture
96def mock_kagi_feed():
97 """Load real Kagi RSS feed fixture."""
98 # Load from data directory (where actual feed is stored)
99 fixture_path = Path(__file__).parent.parent / "data" / "world.xml"
100 if not fixture_path.exists():
101 # Fallback to tests/fixtures if moved
102 fixture_path = Path(__file__).parent / "fixtures" / "world.xml"
103 return fixture_path.read_text()
104
105
106@pytest.fixture
107def aggregator_credentials():
108 """Get aggregator credentials from environment."""
109 handle = os.getenv('AGGREGATOR_HANDLE', 'kagi-news.local.coves.dev')
110 password = os.getenv('AGGREGATOR_PASSWORD', 'kagi-aggregator-2024-secure-pass')
111 return handle, password
112
113
114class TestEndToEnd:
115 """Full end-to-end integration tests."""
116
117 @responses.activate
118 def test_full_aggregator_workflow(
119 self,
120 test_config_file,
121 test_state_file,
122 mock_kagi_feed,
123 aggregator_credentials
124 ):
125 """
126 Test complete workflow: fetch → parse → format → post → verify.
127
128 This test:
129 1. Mocks Kagi RSS HTTP request
130 2. Authenticates with real PDS
131 3. Parses real Kagi HTML content
132 4. Formats with rich text facets
133 5. Posts to real community
134 6. Verifies post was created
135 7. Tests deduplication (no repost)
136 """
137 # Mock Kagi RSS feed
138 responses.add(
139 responses.GET,
140 "https://news.kagi.com/world.xml",
141 body=mock_kagi_feed,
142 status=200,
143 content_type="application/xml"
144 )
145
146 # Allow passthrough for localhost (PDS)
147 responses.add_passthru("http://localhost")
148
149 # Set up environment
150 handle, password = aggregator_credentials
151 os.environ['AGGREGATOR_HANDLE'] = handle
152 os.environ['AGGREGATOR_PASSWORD'] = password
153 os.environ['PDS_URL'] = 'http://localhost:3001' # Auth through PDS
154
155 # Create aggregator
156 aggregator = Aggregator(
157 config_path=test_config_file,
158 state_file=test_state_file
159 )
160
161 # Run first time: should post stories
162 print("\n" + "="*60)
163 print("🚀 Running first aggregator pass (should post stories)")
164 print("="*60)
165 aggregator.run()
166
167 # Verify state was updated (stories marked as posted)
168 posted_count = aggregator.state_manager.get_posted_count(
169 "https://news.kagi.com/world.xml"
170 )
171 print(f"\n✅ First pass: {posted_count} stories posted and tracked")
172 assert posted_count > 0, "Should have posted at least one story"
173
174 # Create new aggregator instance (simulates CRON re-run)
175 aggregator2 = Aggregator(
176 config_path=test_config_file,
177 state_file=test_state_file
178 )
179
180 # Run second time: should skip duplicates
181 print("\n" + "="*60)
182 print("🔄 Running second aggregator pass (should skip duplicates)")
183 print("="*60)
184 aggregator2.run()
185
186 # Verify count didn't change (deduplication worked)
187 posted_count2 = aggregator2.state_manager.get_posted_count(
188 "https://news.kagi.com/world.xml"
189 )
190 print(f"\n✅ Second pass: Still {posted_count2} stories (duplicates skipped)")
191 assert posted_count2 == posted_count, "Should not post duplicates"
192
193 @responses.activate
194 def test_post_with_external_embed(
195 self,
196 test_config_file,
197 test_state_file,
198 mock_kagi_feed,
199 aggregator_credentials
200 ):
201 """
202 Test that posts include external embeds with images.
203
204 Verifies:
205 - External embed is created
206 - Thumbnail URL is included
207 - Title and description are set
208 """
209 # Mock Kagi RSS feed
210 responses.add(
211 responses.GET,
212 "https://news.kagi.com/world.xml",
213 body=mock_kagi_feed,
214 status=200
215 )
216
217 # Allow passthrough for localhost (PDS)
218 responses.add_passthru("http://localhost")
219
220 # Set up environment
221 handle, password = aggregator_credentials
222 os.environ['AGGREGATOR_HANDLE'] = handle
223 os.environ['AGGREGATOR_PASSWORD'] = password
224 os.environ['PDS_URL'] = 'http://localhost:3001' # Auth through PDS
225
226 # Run aggregator
227 aggregator = Aggregator(
228 config_path=test_config_file,
229 state_file=test_state_file
230 )
231
232 print("\n" + "="*60)
233 print("🖼️ Testing external embed creation")
234 print("="*60)
235 aggregator.run()
236
237 # Verify posts were created
238 posted_count = aggregator.state_manager.get_posted_count(
239 "https://news.kagi.com/world.xml"
240 )
241 print(f"\n✅ Posted {posted_count} stories with external embeds")
242 assert posted_count > 0
243
244 def test_authentication_with_live_pds(self, aggregator_credentials):
245 """
246 Test authentication against live PDS.
247
248 Verifies:
249 - Can authenticate with aggregator account
250 - Receives valid JWT tokens
251 - DID matches expected format
252 """
253 handle, password = aggregator_credentials
254
255 print("\n" + "="*60)
256 print(f"🔐 Testing authentication: {handle}")
257 print("="*60)
258
259 # Create client and authenticate
260 client = CovesClient(
261 api_url="http://localhost:8081", # AppView for posting
262 handle=handle,
263 password=password,
264 pds_url="http://localhost:3001" # PDS for auth
265 )
266
267 client.authenticate()
268
269 print(f"\n✅ Authentication successful")
270 print(f" Handle: {client.handle}")
271 print(f" Authenticated: {client._authenticated}")
272
273 assert client._authenticated is True
274 assert hasattr(client, 'did')
275 assert client.did.startswith("did:plc:")
276
277 def test_state_persistence_across_runs(
278 self,
279 test_config_file,
280 test_state_file,
281 aggregator_credentials
282 ):
283 """
284 Test that state persists correctly across multiple runs.
285
286 Verifies:
287 - State file is created
288 - Posted GUIDs are tracked
289 - Last run timestamp is updated
290 - State survives aggregator restart
291 """
292 # Mock empty feed (to avoid posting)
293 import responses as resp
294 resp.start()
295 resp.add(
296 resp.GET,
297 "https://news.kagi.com/world.xml",
298 body='<?xml version="1.0"?><rss version="2.0"><channel></channel></rss>',
299 status=200
300 )
301
302 handle, password = aggregator_credentials
303 os.environ['AGGREGATOR_HANDLE'] = handle
304 os.environ['AGGREGATOR_PASSWORD'] = password
305
306 print("\n" + "="*60)
307 print("💾 Testing state persistence")
308 print("="*60)
309
310 # First run
311 aggregator1 = Aggregator(
312 config_path=test_config_file,
313 state_file=test_state_file
314 )
315 aggregator1.run()
316
317 # Verify state file was created
318 assert test_state_file.exists(), "State file should be created"
319 print(f"\n✅ State file created: {test_state_file}")
320
321 # Verify last run was recorded
322 last_run1 = aggregator1.state_manager.get_last_run(
323 "https://news.kagi.com/world.xml"
324 )
325 assert last_run1 is not None, "Last run should be recorded"
326 print(f" Last run: {last_run1}")
327
328 # Second run (new instance)
329 aggregator2 = Aggregator(
330 config_path=test_config_file,
331 state_file=test_state_file
332 )
333 aggregator2.run()
334
335 # Verify state persisted
336 last_run2 = aggregator2.state_manager.get_last_run(
337 "https://news.kagi.com/world.xml"
338 )
339 assert last_run2 >= last_run1, "Last run should be updated"
340 print(f" Last run (after restart): {last_run2}")
341 print(f"\n✅ State persisted across aggregator restarts")
342
343 resp.stop()
344 resp.reset()
345
346 def test_error_recovery(
347 self,
348 test_config_file,
349 test_state_file,
350 aggregator_credentials
351 ):
352 """
353 Test that aggregator handles errors gracefully.
354
355 Verifies:
356 - Continues processing on feed errors
357 - Doesn't crash on network failures
358 - Logs errors appropriately
359 """
360 # Mock feed failure
361 import responses as resp
362 resp.start()
363 resp.add(
364 resp.GET,
365 "https://news.kagi.com/world.xml",
366 body="Internal Server Error",
367 status=500
368 )
369
370 handle, password = aggregator_credentials
371 os.environ['AGGREGATOR_HANDLE'] = handle
372 os.environ['AGGREGATOR_PASSWORD'] = password
373
374 print("\n" + "="*60)
375 print("🛡️ Testing error recovery")
376 print("="*60)
377
378 # Should not crash
379 aggregator = Aggregator(
380 config_path=test_config_file,
381 state_file=test_state_file
382 )
383
384 try:
385 aggregator.run()
386 print(f"\n✅ Aggregator handled feed error gracefully")
387 except Exception as e:
388 pytest.fail(f"Aggregator should handle errors gracefully: {e}")
389
390 resp.stop()
391 resp.reset()
392
393
394def test_coves_client_external_embed_format(aggregator_credentials):
395 """
396 Test external embed formatting.
397
398 Verifies:
399 - Embed structure matches social.coves.embed.external
400 - All required fields are present
401 - Thumbnails are handled by server's unfurl service (not included in client)
402 """
403 handle, password = aggregator_credentials
404
405 client = CovesClient(
406 api_url="http://localhost:8081",
407 handle=handle,
408 password=password
409 )
410
411 # Create external embed (server will handle thumbnail extraction)
412 embed = client.create_external_embed(
413 uri="https://example.com/story",
414 title="Test Story",
415 description="Test description"
416 )
417
418 assert embed["$type"] == "social.coves.embed.external"
419 assert embed["external"]["uri"] == "https://example.com/story"
420 assert embed["external"]["title"] == "Test Story"
421 assert embed["external"]["description"] == "Test description"
422 # Thumbnail is not included - server's unfurl service handles it
423 assert "thumb" not in embed["external"]
424 print("\n✅ External embed format correct")