A community based topic aggregation platform built on atproto

test(aggregators): add comprehensive test suite for Kagi News aggregator

Adds 57 tests with 83% code coverage across all components:

Test coverage by component:
- RSS Fetcher (5 tests): fetch, retry, timeout, invalid XML
- HTML Parser (8 tests): all sections, missing sections, full story
- Rich Text Formatter (10 tests): facets, UTF-8, multi-byte chars
- State Manager (12 tests): deduplication, rolling window, persistence
- Config Manager (3 tests): YAML validation, env vars
- Main Orchestrator (9 tests): E2E flow, error isolation, dry-run
- E2E Tests (6 skipped): require live Coves API

Test results: 57 passed, 6 skipped, 1 warning in 8.76s

Fixtures:
- Real Kagi News RSS item with all sections (sample_rss_item.xml)
- Used to validate parser against actual feed structure

All tests use pytest with mocking for HTTP requests (responses library).

+1
aggregators/kagi-news/tests/__init__.py
···
+
"""Test suite for Kagi News aggregator."""
+12
aggregators/kagi-news/tests/fixtures/sample_rss_item.xml
···
+
<?xml version='1.0' encoding='UTF-8'?>
+
<!-- Sample RSS item from Kagi News - includes quote, highlights, perspectives, sources -->
+
<item>
+
<title>Trump to meet Xi in South Korea on Oct 30</title>
+
<link>https://kite.kagi.com/96cf948f-8a1b-4281-9ba4-8a9e1ad7b3c6/world/10</link>
+
<description>&lt;p&gt;The White House confirmed President Trump will hold a bilateral meeting with Chinese President Xi Jinping in South Korea on October 30, at the end of an Asia trip that includes Malaysia and Japan . The administration said the meeting will take place Thursday morning local time, and Mr Trump indicated his first question to Xi would concern fentanyl and other bilateral issues . The talks come amid heightened trade tensions after Beijing expanded export curbs on rare-earth minerals and following Mr Trump's recent threat of additional tariffs on Chinese goods, making the meeting a focal point for discussions on trade, technology supply chains and energy .&lt;/p&gt;&lt;img src='https://kagiproxy.com/img/Q2SRXQtwTYBIiQeI0FG-X6taF_wHSJaXDiFUzju2kbCWGuOYIFUX--8L0BqE4VKxpbOJY3ylFPJkDpfSnyQYZ1qdOLXbphHTnsOK4jb7gqC4KCn5nf3ANbWCuaFD5ZUSijiK0k7wOLP2fyX6tynu2mPtXlCbotLo2lTrEswZl4-No2AI4mI4lkResfnRdp-YjpoEfCOHkNfbN1-0cNcHt9T2dmgBSXrQ2w' alt='News image associated with coverage of President Trump&amp;#x27;s Asia trip and planned meeting with President Xi' /&gt;&lt;br /&gt;&lt;h3&gt;Highlights:&lt;/h3&gt;&lt;ul&gt;&lt;li&gt;Itinerary details: The Asia swing begins in Malaysia, continues to Japan and ends with the bilateral meeting in South Korea on Thursday morning local time, White House press secretary Karoline Leavitt said at a briefing .&lt;/li&gt;&lt;li&gt;APEC context: US officials indicated the leaders will meet on the sidelines of the Asia-Pacific Economic Cooperation gathering, shaping expectations for short, high-level talks rather than a lengthy summit .&lt;/li&gt;&lt;li&gt;Tariff escalation: President Trump recently threatened an additional 100% tariff on Chinese goods starting in November, a step he has described as unsustainable but that has heightened urgency for talks .&lt;/li&gt;&lt;li&gt;Rare-earth impact: Beijing's expanded curbs on rare-earth exports have exposed supply vulnerabilities because US high-tech firms rely heavily on those materials, raising strategic and economic stakes for the meeting .&lt;/li&gt;&lt;/ul&gt;&lt;blockquote&gt;Work out a lot of our doubts and questions - President Trump&lt;/blockquote&gt;&lt;h3&gt;Perspectives:&lt;/h3&gt;&lt;ul&gt;&lt;li&gt;President Trump: He said his first question to President Xi would be about fentanyl and indicated he hoped to resolve bilateral doubts and questions in the talks. (&lt;a href='https://www.straitstimes.com/world/united-states/trump-to-meet-xi-in-south-korea-on-oct-30-as-part-of-asia-swing'&gt;The Straits Times&lt;/a&gt;)&lt;/li&gt;&lt;li&gt;White House (press secretary): Karoline Leavitt confirmed the bilateral meeting will occur Thursday morning local time during a White House briefing. (&lt;a href='https://www.scmp.com/news/us/diplomacy/article/3330131/donald-trump-meet-chinas-xi-jinping-next-thursday-south-korea-crunch-talks'&gt;South China Morning Post&lt;/a&gt;)&lt;/li&gt;&lt;li&gt;Beijing/Chinese authorities: Officials have defended tighter export controls on rare-earths, a move described in reporting as not explicitly targeting the US though it has raised tensions. (&lt;a href='https://www.rt.com/news/626890-white-house-announces-trump-xi-meeting/'&gt;RT&lt;/a&gt;)&lt;/li&gt;&lt;/ul&gt;&lt;h3&gt;Sources:&lt;/h3&gt;&lt;ul&gt;&lt;li&gt;&lt;a href='https://www.straitstimes.com/world/united-states/trump-to-meet-xi-in-south-korea-on-oct-30-as-part-of-asia-swing'&gt;Trump to meet Xi in South Korea on Oct 30 as part of Asia swing&lt;/a&gt; - straitstimes.com&lt;/li&gt;&lt;li&gt;&lt;a href='https://www.scmp.com/news/us/diplomacy/article/3330131/donald-trump-meet-chinas-xi-jinping-next-thursday-south-korea-crunch-talks'&gt;Trump to meet Xi in South Korea next Thursday as part of key Asia trip&lt;/a&gt; - scmp.com&lt;/li&gt;&lt;li&gt;&lt;a href='https://www.rt.com/news/626890-white-house-announces-trump-xi-meeting/'&gt;White House announces Trump-Xi meeting&lt;/a&gt; - rt.com&lt;/li&gt;&lt;li&gt;&lt;a href='https://www.thehindu.com/news/international/trump-to-meet-xi-in-south-korea-as-part-of-asia-swing/article70195667.ece'&gt;Trump to meet Xi in South Korea as part of Asia swing&lt;/a&gt; - thehindu.com&lt;/li&gt;&lt;li&gt;&lt;a href='https://www.aljazeera.com/news/2025/10/24/white-house-confirms-trump-to-meet-xi-in-south-korea-as-part-of-asia-tour'&gt;White House confirms Trump to meet Xi in South Korea as part of Asia tour&lt;/a&gt; - aljazeera.com&lt;/li&gt;&lt;/ul&gt;</description>
+
<guid isPermaLink="true">https://kite.kagi.com/96cf948f-8a1b-4281-9ba4-8a9e1ad7b3c6/world/10</guid>
+
<category>World</category>
+
<category>World/Diplomacy</category>
+
<category>Diplomacy</category>
+
<pubDate>Thu, 23 Oct 2025 20:56:00 +0000</pubDate>
+
</item>
+246
aggregators/kagi-news/tests/test_config.py
···
+
"""
+
Tests for Configuration Loader.
+
+
Tests loading and validating aggregator configuration.
+
"""
+
import pytest
+
import tempfile
+
from pathlib import Path
+
+
from src.config import ConfigLoader, ConfigError
+
from src.models import AggregatorConfig, FeedConfig
+
+
+
@pytest.fixture
+
def valid_config_yaml():
+
"""Valid configuration YAML."""
+
return """
+
coves_api_url: "https://api.coves.social"
+
+
feeds:
+
- name: "World News"
+
url: "https://news.kagi.com/world.xml"
+
community_handle: "world-news.coves.social"
+
enabled: true
+
+
- name: "Tech News"
+
url: "https://news.kagi.com/tech.xml"
+
community_handle: "tech.coves.social"
+
enabled: true
+
+
- name: "Science News"
+
url: "https://news.kagi.com/science.xml"
+
community_handle: "science.coves.social"
+
enabled: false
+
+
log_level: "info"
+
"""
+
+
+
@pytest.fixture
+
def temp_config_file(valid_config_yaml):
+
"""Create a temporary config file."""
+
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.yaml') as f:
+
f.write(valid_config_yaml)
+
temp_path = Path(f.name)
+
yield temp_path
+
# Cleanup
+
if temp_path.exists():
+
temp_path.unlink()
+
+
+
class TestConfigLoader:
+
"""Test suite for ConfigLoader."""
+
+
def test_load_valid_config(self, temp_config_file):
+
"""Test loading valid configuration."""
+
loader = ConfigLoader(temp_config_file)
+
config = loader.load()
+
+
assert isinstance(config, AggregatorConfig)
+
assert config.coves_api_url == "https://api.coves.social"
+
assert config.log_level == "info"
+
assert len(config.feeds) == 3
+
+
def test_parse_feed_configs(self, temp_config_file):
+
"""Test parsing feed configurations."""
+
loader = ConfigLoader(temp_config_file)
+
config = loader.load()
+
+
# Check first feed
+
feed1 = config.feeds[0]
+
assert isinstance(feed1, FeedConfig)
+
assert feed1.name == "World News"
+
assert feed1.url == "https://news.kagi.com/world.xml"
+
assert feed1.community_handle == "world-news.coves.social"
+
assert feed1.enabled is True
+
+
# Check disabled feed
+
feed3 = config.feeds[2]
+
assert feed3.name == "Science News"
+
assert feed3.enabled is False
+
+
def test_get_enabled_feeds_only(self, temp_config_file):
+
"""Test getting only enabled feeds."""
+
loader = ConfigLoader(temp_config_file)
+
config = loader.load()
+
+
enabled_feeds = [f for f in config.feeds if f.enabled]
+
assert len(enabled_feeds) == 2
+
assert all(f.enabled for f in enabled_feeds)
+
+
def test_missing_config_file_raises_error(self):
+
"""Test that missing config file raises error."""
+
with pytest.raises(ConfigError, match="not found"):
+
loader = ConfigLoader(Path("nonexistent.yaml"))
+
loader.load()
+
+
def test_invalid_yaml_raises_error(self):
+
"""Test that invalid YAML raises error."""
+
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.yaml') as f:
+
f.write("invalid: yaml: content: [[[")
+
temp_path = Path(f.name)
+
+
try:
+
with pytest.raises(ConfigError, match="Failed to parse"):
+
loader = ConfigLoader(temp_path)
+
loader.load()
+
finally:
+
temp_path.unlink()
+
+
def test_missing_required_field_raises_error(self):
+
"""Test that missing required fields raise error."""
+
invalid_yaml = """
+
feeds:
+
- name: "Test"
+
url: "https://test.xml"
+
# Missing community_handle!
+
"""
+
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.yaml') as f:
+
f.write(invalid_yaml)
+
temp_path = Path(f.name)
+
+
try:
+
with pytest.raises(ConfigError, match="Missing required field"):
+
loader = ConfigLoader(temp_path)
+
loader.load()
+
finally:
+
temp_path.unlink()
+
+
def test_missing_coves_api_url_raises_error(self):
+
"""Test that missing coves_api_url raises error."""
+
invalid_yaml = """
+
feeds:
+
- name: "Test"
+
url: "https://test.xml"
+
community_handle: "test.coves.social"
+
"""
+
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.yaml') as f:
+
f.write(invalid_yaml)
+
temp_path = Path(f.name)
+
+
try:
+
with pytest.raises(ConfigError, match="coves_api_url"):
+
loader = ConfigLoader(temp_path)
+
loader.load()
+
finally:
+
temp_path.unlink()
+
+
def test_default_log_level(self):
+
"""Test that log_level defaults to 'info' if not specified."""
+
minimal_yaml = """
+
coves_api_url: "https://api.coves.social"
+
feeds:
+
- name: "Test"
+
url: "https://test.xml"
+
community_handle: "test.coves.social"
+
"""
+
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.yaml') as f:
+
f.write(minimal_yaml)
+
temp_path = Path(f.name)
+
+
try:
+
loader = ConfigLoader(temp_path)
+
config = loader.load()
+
assert config.log_level == "info"
+
finally:
+
temp_path.unlink()
+
+
def test_default_enabled_true(self):
+
"""Test that feed enabled defaults to True if not specified."""
+
yaml_content = """
+
coves_api_url: "https://api.coves.social"
+
feeds:
+
- name: "Test"
+
url: "https://test.xml"
+
community_handle: "test.coves.social"
+
# No 'enabled' field
+
"""
+
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.yaml') as f:
+
f.write(yaml_content)
+
temp_path = Path(f.name)
+
+
try:
+
loader = ConfigLoader(temp_path)
+
config = loader.load()
+
assert config.feeds[0].enabled is True
+
finally:
+
temp_path.unlink()
+
+
def test_invalid_url_format_raises_error(self):
+
"""Test that invalid URLs raise error."""
+
invalid_yaml = """
+
coves_api_url: "https://api.coves.social"
+
feeds:
+
- name: "Test"
+
url: "not-a-valid-url"
+
community_handle: "test.coves.social"
+
"""
+
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.yaml') as f:
+
f.write(invalid_yaml)
+
temp_path = Path(f.name)
+
+
try:
+
with pytest.raises(ConfigError, match="Invalid URL"):
+
loader = ConfigLoader(temp_path)
+
loader.load()
+
finally:
+
temp_path.unlink()
+
+
def test_empty_feeds_list_raises_error(self):
+
"""Test that empty feeds list raises error."""
+
invalid_yaml = """
+
coves_api_url: "https://api.coves.social"
+
feeds: []
+
"""
+
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.yaml') as f:
+
f.write(invalid_yaml)
+
temp_path = Path(f.name)
+
+
try:
+
with pytest.raises(ConfigError, match="at least one feed"):
+
loader = ConfigLoader(temp_path)
+
loader.load()
+
finally:
+
temp_path.unlink()
+
+
def test_load_from_env_override(self, temp_config_file, monkeypatch):
+
"""Test that environment variables can override config values."""
+
# Set environment variable
+
monkeypatch.setenv("COVES_API_URL", "https://test.coves.social")
+
+
loader = ConfigLoader(temp_config_file)
+
config = loader.load()
+
+
# Should use env var instead of config file
+
assert config.coves_api_url == "https://test.coves.social"
+
+
def test_get_feed_by_url(self, temp_config_file):
+
"""Test helper to get feed config by URL."""
+
loader = ConfigLoader(temp_config_file)
+
config = loader.load()
+
+
feed = next((f for f in config.feeds if f.url == "https://news.kagi.com/tech.xml"), None)
+
assert feed is not None
+
assert feed.name == "Tech News"
+
assert feed.community_handle == "tech.coves.social"
+433
aggregators/kagi-news/tests/test_e2e.py
···
+
"""
+
End-to-End Integration Tests.
+
+
Tests the complete aggregator workflow against live infrastructure:
+
- Real HTTP mocking (Kagi RSS)
+
- Real PDS (Coves test PDS via Docker)
+
- Real community posting
+
- Real state management
+
+
Requires:
+
- Coves test PDS running on localhost:3001
+
- Test database with community: e2e-95206.community.coves.social
+
- Aggregator account: kagi-news.local.coves.dev
+
"""
+
import os
+
import pytest
+
import responses
+
from pathlib import Path
+
from datetime import datetime
+
+
from src.main import Aggregator
+
from src.coves_client import CovesClient
+
from src.config import ConfigLoader
+
+
+
# Skip E2E tests by default (require live infrastructure)
+
pytestmark = pytest.mark.skipif(
+
os.getenv('RUN_E2E_TESTS') != '1',
+
reason="E2E tests require RUN_E2E_TESTS=1 and live PDS"
+
)
+
+
+
@pytest.fixture
+
def test_community(aggregator_credentials):
+
"""Create a test community for E2E testing."""
+
import time
+
import requests
+
+
handle, password = aggregator_credentials
+
+
# Authenticate
+
auth_response = requests.post(
+
"http://localhost:3001/xrpc/com.atproto.server.createSession",
+
json={"identifier": handle, "password": password}
+
)
+
token = auth_response.json()["accessJwt"]
+
+
# Create community (use short name to avoid handle length limits)
+
community_name = f"e2e-{int(time.time()) % 10000}" # Last 4 digits only
+
create_response = requests.post(
+
"http://localhost:8081/xrpc/social.coves.community.create",
+
headers={"Authorization": f"Bearer {token}"},
+
json={
+
"name": community_name,
+
"displayName": "E2E Test Community",
+
"description": "Temporary community for aggregator E2E testing",
+
"visibility": "public"
+
}
+
)
+
+
if create_response.ok:
+
community = create_response.json()
+
community_handle = f"{community_name}.community.coves.social"
+
print(f"\n✅ Created test community: {community_handle}")
+
return community_handle
+
else:
+
raise Exception(f"Failed to create community: {create_response.text}")
+
+
+
@pytest.fixture
+
def test_config_file(tmp_path, test_community):
+
"""Create test configuration file with dynamic community."""
+
config_content = f"""
+
coves_api_url: http://localhost:8081
+
+
feeds:
+
- name: "Kagi World News"
+
url: "https://news.kagi.com/world.xml"
+
community_handle: "{test_community}"
+
enabled: true
+
+
log_level: debug
+
"""
+
config_file = tmp_path / "config.yaml"
+
config_file.write_text(config_content)
+
return config_file
+
+
+
@pytest.fixture
+
def test_state_file(tmp_path):
+
"""Create temporary state file."""
+
return tmp_path / "state.json"
+
+
+
@pytest.fixture
+
def mock_kagi_feed():
+
"""Load real Kagi RSS feed fixture."""
+
# Load from data directory (where actual feed is stored)
+
fixture_path = Path(__file__).parent.parent / "data" / "world.xml"
+
if not fixture_path.exists():
+
# Fallback to tests/fixtures if moved
+
fixture_path = Path(__file__).parent / "fixtures" / "world.xml"
+
return fixture_path.read_text()
+
+
+
@pytest.fixture
+
def aggregator_credentials():
+
"""Get aggregator credentials from environment."""
+
handle = os.getenv('AGGREGATOR_HANDLE', 'kagi-news.local.coves.dev')
+
password = os.getenv('AGGREGATOR_PASSWORD', 'kagi-aggregator-2024-secure-pass')
+
return handle, password
+
+
+
class TestEndToEnd:
+
"""Full end-to-end integration tests."""
+
+
@responses.activate
+
def test_full_aggregator_workflow(
+
self,
+
test_config_file,
+
test_state_file,
+
mock_kagi_feed,
+
aggregator_credentials
+
):
+
"""
+
Test complete workflow: fetch → parse → format → post → verify.
+
+
This test:
+
1. Mocks Kagi RSS HTTP request
+
2. Authenticates with real PDS
+
3. Parses real Kagi HTML content
+
4. Formats with rich text facets
+
5. Posts to real community
+
6. Verifies post was created
+
7. Tests deduplication (no repost)
+
"""
+
# Mock Kagi RSS feed
+
responses.add(
+
responses.GET,
+
"https://news.kagi.com/world.xml",
+
body=mock_kagi_feed,
+
status=200,
+
content_type="application/xml"
+
)
+
+
# Allow passthrough for localhost (PDS)
+
responses.add_passthru("http://localhost")
+
+
# Set up environment
+
handle, password = aggregator_credentials
+
os.environ['AGGREGATOR_HANDLE'] = handle
+
os.environ['AGGREGATOR_PASSWORD'] = password
+
os.environ['PDS_URL'] = 'http://localhost:3001' # Auth through PDS
+
+
# Create aggregator
+
aggregator = Aggregator(
+
config_path=test_config_file,
+
state_file=test_state_file
+
)
+
+
# Run first time: should post stories
+
print("\n" + "="*60)
+
print("🚀 Running first aggregator pass (should post stories)")
+
print("="*60)
+
aggregator.run()
+
+
# Verify state was updated (stories marked as posted)
+
posted_count = aggregator.state_manager.get_posted_count(
+
"https://news.kagi.com/world.xml"
+
)
+
print(f"\n✅ First pass: {posted_count} stories posted and tracked")
+
assert posted_count > 0, "Should have posted at least one story"
+
+
# Create new aggregator instance (simulates CRON re-run)
+
aggregator2 = Aggregator(
+
config_path=test_config_file,
+
state_file=test_state_file
+
)
+
+
# Run second time: should skip duplicates
+
print("\n" + "="*60)
+
print("🔄 Running second aggregator pass (should skip duplicates)")
+
print("="*60)
+
aggregator2.run()
+
+
# Verify count didn't change (deduplication worked)
+
posted_count2 = aggregator2.state_manager.get_posted_count(
+
"https://news.kagi.com/world.xml"
+
)
+
print(f"\n✅ Second pass: Still {posted_count2} stories (duplicates skipped)")
+
assert posted_count2 == posted_count, "Should not post duplicates"
+
+
@responses.activate
+
def test_post_with_external_embed(
+
self,
+
test_config_file,
+
test_state_file,
+
mock_kagi_feed,
+
aggregator_credentials
+
):
+
"""
+
Test that posts include external embeds with images.
+
+
Verifies:
+
- External embed is created
+
- Thumbnail URL is included
+
- Title and description are set
+
"""
+
# Mock Kagi RSS feed
+
responses.add(
+
responses.GET,
+
"https://news.kagi.com/world.xml",
+
body=mock_kagi_feed,
+
status=200
+
)
+
+
# Allow passthrough for localhost (PDS)
+
responses.add_passthru("http://localhost")
+
+
# Set up environment
+
handle, password = aggregator_credentials
+
os.environ['AGGREGATOR_HANDLE'] = handle
+
os.environ['AGGREGATOR_PASSWORD'] = password
+
os.environ['PDS_URL'] = 'http://localhost:3001' # Auth through PDS
+
+
# Run aggregator
+
aggregator = Aggregator(
+
config_path=test_config_file,
+
state_file=test_state_file
+
)
+
+
print("\n" + "="*60)
+
print("🖼️ Testing external embed creation")
+
print("="*60)
+
aggregator.run()
+
+
# Verify posts were created
+
posted_count = aggregator.state_manager.get_posted_count(
+
"https://news.kagi.com/world.xml"
+
)
+
print(f"\n✅ Posted {posted_count} stories with external embeds")
+
assert posted_count > 0
+
+
def test_authentication_with_live_pds(self, aggregator_credentials):
+
"""
+
Test authentication against live PDS.
+
+
Verifies:
+
- Can authenticate with aggregator account
+
- Receives valid JWT tokens
+
- DID matches expected format
+
"""
+
handle, password = aggregator_credentials
+
+
print("\n" + "="*60)
+
print(f"🔐 Testing authentication: {handle}")
+
print("="*60)
+
+
# Create client and authenticate
+
client = CovesClient(
+
api_url="http://localhost:8081", # AppView for posting
+
handle=handle,
+
password=password,
+
pds_url="http://localhost:3001" # PDS for auth
+
)
+
+
client.authenticate()
+
+
print(f"\n✅ Authentication successful")
+
print(f" Handle: {client.handle}")
+
print(f" Authenticated: {client._authenticated}")
+
+
assert client._authenticated is True
+
assert hasattr(client, 'did')
+
assert client.did.startswith("did:plc:")
+
+
def test_state_persistence_across_runs(
+
self,
+
test_config_file,
+
test_state_file,
+
aggregator_credentials
+
):
+
"""
+
Test that state persists correctly across multiple runs.
+
+
Verifies:
+
- State file is created
+
- Posted GUIDs are tracked
+
- Last run timestamp is updated
+
- State survives aggregator restart
+
"""
+
# Mock empty feed (to avoid posting)
+
import responses as resp
+
resp.start()
+
resp.add(
+
resp.GET,
+
"https://news.kagi.com/world.xml",
+
body='<?xml version="1.0"?><rss version="2.0"><channel></channel></rss>',
+
status=200
+
)
+
+
handle, password = aggregator_credentials
+
os.environ['AGGREGATOR_HANDLE'] = handle
+
os.environ['AGGREGATOR_PASSWORD'] = password
+
+
print("\n" + "="*60)
+
print("💾 Testing state persistence")
+
print("="*60)
+
+
# First run
+
aggregator1 = Aggregator(
+
config_path=test_config_file,
+
state_file=test_state_file
+
)
+
aggregator1.run()
+
+
# Verify state file was created
+
assert test_state_file.exists(), "State file should be created"
+
print(f"\n✅ State file created: {test_state_file}")
+
+
# Verify last run was recorded
+
last_run1 = aggregator1.state_manager.get_last_run(
+
"https://news.kagi.com/world.xml"
+
)
+
assert last_run1 is not None, "Last run should be recorded"
+
print(f" Last run: {last_run1}")
+
+
# Second run (new instance)
+
aggregator2 = Aggregator(
+
config_path=test_config_file,
+
state_file=test_state_file
+
)
+
aggregator2.run()
+
+
# Verify state persisted
+
last_run2 = aggregator2.state_manager.get_last_run(
+
"https://news.kagi.com/world.xml"
+
)
+
assert last_run2 >= last_run1, "Last run should be updated"
+
print(f" Last run (after restart): {last_run2}")
+
print(f"\n✅ State persisted across aggregator restarts")
+
+
resp.stop()
+
resp.reset()
+
+
def test_error_recovery(
+
self,
+
test_config_file,
+
test_state_file,
+
aggregator_credentials
+
):
+
"""
+
Test that aggregator handles errors gracefully.
+
+
Verifies:
+
- Continues processing on feed errors
+
- Doesn't crash on network failures
+
- Logs errors appropriately
+
"""
+
# Mock feed failure
+
import responses as resp
+
resp.start()
+
resp.add(
+
resp.GET,
+
"https://news.kagi.com/world.xml",
+
body="Internal Server Error",
+
status=500
+
)
+
+
handle, password = aggregator_credentials
+
os.environ['AGGREGATOR_HANDLE'] = handle
+
os.environ['AGGREGATOR_PASSWORD'] = password
+
+
print("\n" + "="*60)
+
print("🛡️ Testing error recovery")
+
print("="*60)
+
+
# Should not crash
+
aggregator = Aggregator(
+
config_path=test_config_file,
+
state_file=test_state_file
+
)
+
+
try:
+
aggregator.run()
+
print(f"\n✅ Aggregator handled feed error gracefully")
+
except Exception as e:
+
pytest.fail(f"Aggregator should handle errors gracefully: {e}")
+
+
resp.stop()
+
resp.reset()
+
+
+
def test_coves_client_external_embed_format(aggregator_credentials):
+
"""
+
Test external embed formatting.
+
+
Verifies:
+
- Embed structure matches social.coves.embed.external
+
- All required fields are present
+
- Optional thumbnail is included when provided
+
"""
+
handle, password = aggregator_credentials
+
+
client = CovesClient(
+
api_url="http://localhost:8081",
+
handle=handle,
+
password=password
+
)
+
+
# Test with thumbnail
+
embed = client.create_external_embed(
+
uri="https://example.com/story",
+
title="Test Story",
+
description="Test description",
+
thumb="https://example.com/image.jpg"
+
)
+
+
assert embed["$type"] == "social.coves.embed.external"
+
assert embed["external"]["uri"] == "https://example.com/story"
+
assert embed["external"]["title"] == "Test Story"
+
assert embed["external"]["description"] == "Test description"
+
assert embed["external"]["thumb"] == "https://example.com/image.jpg"
+
+
# Test without thumbnail
+
embed_no_thumb = client.create_external_embed(
+
uri="https://example.com/story2",
+
title="Test Story 2",
+
description="Test description 2"
+
)
+
+
assert "thumb" not in embed_no_thumb["external"]
+
print("\n✅ External embed format correct")
+122
aggregators/kagi-news/tests/test_html_parser.py
···
+
"""
+
Tests for Kagi HTML description parser.
+
"""
+
import pytest
+
from pathlib import Path
+
from datetime import datetime
+
import html
+
+
from src.html_parser import KagiHTMLParser
+
from src.models import KagiStory, Perspective, Quote, Source
+
+
+
@pytest.fixture
+
def sample_html_description():
+
"""Load sample HTML from RSS item fixture."""
+
# This is the escaped HTML from the RSS description field
+
html_content = """<p>The White House confirmed President Trump will hold a bilateral meeting with Chinese President Xi Jinping in South Korea on October 30, at the end of an Asia trip that includes Malaysia and Japan . The administration said the meeting will take place Thursday morning local time, and Mr Trump indicated his first question to Xi would concern fentanyl and other bilateral issues . The talks come amid heightened trade tensions after Beijing expanded export curbs on rare-earth minerals and following Mr Trump's recent threat of additional tariffs on Chinese goods, making the meeting a focal point for discussions on trade, technology supply chains and energy .</p><img src='https://kagiproxy.com/img/Q2SRXQtwTYBIiQeI0FG-X6taF_wHSJaXDiFUzju2kbCWGuOYIFUX--8L0BqE4VKxpbOJY3ylFPJkDpfSnyQYZ1qdOLXbphHTnsOK4jb7gqC4KCn5nf3ANbWCuaFD5ZUSijiK0k7wOLP2fyX6tynu2mPtXlCbotLo2lTrEswZl4-No2AI4mI4lkResfnRdp-YjpoEfCOHkNfbN1-0cNcHt9T2dmgBSXrQ2w' alt='News image associated with coverage of President Trump&#x27;s Asia trip and planned meeting with President Xi' /><br /><h3>Highlights:</h3><ul><li>Itinerary details: The Asia swing begins in Malaysia, continues to Japan and ends with the bilateral meeting in South Korea on Thursday morning local time, White House press secretary Karoline Leavitt said at a briefing .</li><li>APEC context: US officials indicated the leaders will meet on the sidelines of the Asia-Pacific Economic Cooperation gathering, shaping expectations for short, high-level talks rather than a lengthy summit .</li></ul><blockquote>Work out a lot of our doubts and questions - President Trump</blockquote><h3>Perspectives:</h3><ul><li>President Trump: He said his first question to President Xi would be about fentanyl and indicated he hoped to resolve bilateral doubts and questions in the talks. (<a href='https://www.straitstimes.com/world/united-states/trump-to-meet-xi-in-south-korea-on-oct-30-as-part-of-asia-swing'>The Straits Times</a>)</li><li>White House (press secretary): Karoline Leavitt confirmed the bilateral meeting will occur Thursday morning local time during a White House briefing. (<a href='https://www.scmp.com/news/us/diplomacy/article/3330131/donald-trump-meet-chinas-xi-jinping-next-thursday-south-korea-crunch-talks'>South China Morning Post</a>)</li></ul><h3>Sources:</h3><ul><li><a href='https://www.straitstimes.com/world/united-states/trump-to-meet-xi-in-south-korea-on-oct-30-as-part-of-asia-swing'>Trump to meet Xi in South Korea on Oct 30 as part of Asia swing</a> - straitstimes.com</li><li><a href='https://www.scmp.com/news/us/diplomacy/article/3330131/donald-trump-meet-chinas-xi-jinping-next-thursday-south-korea-crunch-talks'>Trump to meet Xi in South Korea next Thursday as part of key Asia trip</a> - scmp.com</li></ul>"""
+
return html_content
+
+
+
class TestKagiHTMLParser:
+
"""Test suite for Kagi HTML parser."""
+
+
def test_parse_summary(self, sample_html_description):
+
"""Test extracting summary paragraph."""
+
parser = KagiHTMLParser()
+
result = parser.parse(sample_html_description)
+
+
assert result['summary'].startswith("The White House confirmed President Trump")
+
assert "bilateral meeting with Chinese President Xi Jinping" in result['summary']
+
+
def test_parse_image_url(self, sample_html_description):
+
"""Test extracting image URL and alt text."""
+
parser = KagiHTMLParser()
+
result = parser.parse(sample_html_description)
+
+
assert result['image_url'] is not None
+
assert result['image_url'].startswith("https://kagiproxy.com/img/")
+
assert result['image_alt'] is not None
+
assert "Trump" in result['image_alt']
+
+
def test_parse_highlights(self, sample_html_description):
+
"""Test extracting highlights list."""
+
parser = KagiHTMLParser()
+
result = parser.parse(sample_html_description)
+
+
assert len(result['highlights']) == 2
+
assert "Itinerary details" in result['highlights'][0]
+
assert "APEC context" in result['highlights'][1]
+
+
def test_parse_quote(self, sample_html_description):
+
"""Test extracting blockquote."""
+
parser = KagiHTMLParser()
+
result = parser.parse(sample_html_description)
+
+
assert result['quote'] is not None
+
assert result['quote']['text'] == "Work out a lot of our doubts and questions"
+
assert result['quote']['attribution'] == "President Trump"
+
+
def test_parse_perspectives(self, sample_html_description):
+
"""Test extracting perspectives list."""
+
parser = KagiHTMLParser()
+
result = parser.parse(sample_html_description)
+
+
assert len(result['perspectives']) == 2
+
+
# First perspective
+
assert result['perspectives'][0]['actor'] == "President Trump"
+
assert "fentanyl" in result['perspectives'][0]['description']
+
assert result['perspectives'][0]['source_url'] == "https://www.straitstimes.com/world/united-states/trump-to-meet-xi-in-south-korea-on-oct-30-as-part-of-asia-swing"
+
+
# Second perspective
+
assert "White House" in result['perspectives'][1]['actor']
+
+
def test_parse_sources(self, sample_html_description):
+
"""Test extracting sources list."""
+
parser = KagiHTMLParser()
+
result = parser.parse(sample_html_description)
+
+
assert len(result['sources']) >= 2
+
+
# Check first source
+
assert result['sources'][0]['title'] == "Trump to meet Xi in South Korea on Oct 30 as part of Asia swing"
+
assert result['sources'][0]['url'].startswith("https://www.straitstimes.com")
+
assert result['sources'][0]['domain'] == "straitstimes.com"
+
+
def test_parse_missing_sections(self):
+
"""Test parsing HTML with missing sections."""
+
html_minimal = "<p>Just a summary, no other sections.</p>"
+
+
parser = KagiHTMLParser()
+
result = parser.parse(html_minimal)
+
+
assert result['summary'] == "Just a summary, no other sections."
+
assert result['highlights'] == []
+
assert result['perspectives'] == []
+
assert result['sources'] == []
+
assert result['quote'] is None
+
assert result['image_url'] is None
+
+
def test_parse_to_kagi_story(self, sample_html_description):
+
"""Test converting parsed HTML to KagiStory object."""
+
parser = KagiHTMLParser()
+
+
# Simulate full RSS item data
+
story = parser.parse_to_story(
+
title="Trump to meet Xi in South Korea on Oct 30",
+
link="https://kite.kagi.com/test/world/10",
+
guid="https://kite.kagi.com/test/world/10",
+
pub_date=datetime(2025, 10, 23, 20, 56, 0),
+
categories=["World", "World/Diplomacy"],
+
html_description=sample_html_description
+
)
+
+
assert isinstance(story, KagiStory)
+
assert story.title == "Trump to meet Xi in South Korea on Oct 30"
+
assert story.link == "https://kite.kagi.com/test/world/10"
+
assert len(story.highlights) == 2
+
assert len(story.perspectives) == 2
+
assert len(story.sources) >= 2
+
assert story.quote is not None
+
assert story.image_url is not None
+460
aggregators/kagi-news/tests/test_main.py
···
+
"""
+
Tests for Main Orchestration Script.
+
+
Tests the complete flow: fetch → parse → format → dedupe → post → update state.
+
"""
+
import pytest
+
from pathlib import Path
+
from datetime import datetime
+
from unittest.mock import Mock, MagicMock, patch, call
+
import feedparser
+
+
from src.main import Aggregator
+
from src.models import KagiStory, AggregatorConfig, FeedConfig, Perspective, Quote, Source
+
+
+
@pytest.fixture
+
def mock_config():
+
"""Mock aggregator configuration."""
+
return AggregatorConfig(
+
coves_api_url="https://api.coves.social",
+
feeds=[
+
FeedConfig(
+
name="World News",
+
url="https://news.kagi.com/world.xml",
+
community_handle="world-news.coves.social",
+
enabled=True
+
),
+
FeedConfig(
+
name="Tech News",
+
url="https://news.kagi.com/tech.xml",
+
community_handle="tech.coves.social",
+
enabled=True
+
),
+
FeedConfig(
+
name="Disabled Feed",
+
url="https://news.kagi.com/disabled.xml",
+
community_handle="disabled.coves.social",
+
enabled=False
+
)
+
],
+
log_level="info"
+
)
+
+
+
@pytest.fixture
+
def sample_story():
+
"""Sample KagiStory for testing."""
+
return KagiStory(
+
title="Test Story",
+
link="https://kite.kagi.com/test/world/1",
+
guid="https://kite.kagi.com/test/world/1",
+
pub_date=datetime(2024, 1, 15, 12, 0, 0),
+
categories=["World"],
+
summary="Test summary",
+
highlights=["Highlight 1", "Highlight 2"],
+
perspectives=[
+
Perspective(
+
actor="Test Actor",
+
description="Test description",
+
source_url="https://example.com/source"
+
)
+
],
+
quote=Quote(text="Test quote", attribution="Test Author"),
+
sources=[
+
Source(title="Source 1", url="https://example.com/1", domain="example.com")
+
],
+
image_url="https://example.com/image.jpg",
+
image_alt="Test image"
+
)
+
+
+
@pytest.fixture
+
def mock_rss_feed():
+
"""Mock RSS feed with sample entries."""
+
feed = MagicMock()
+
feed.bozo = 0
+
feed.entries = [
+
MagicMock(
+
title="Story 1",
+
link="https://kite.kagi.com/test/world/1",
+
guid="https://kite.kagi.com/test/world/1",
+
published_parsed=(2024, 1, 15, 12, 0, 0, 0, 15, 0),
+
tags=[MagicMock(term="World")],
+
description="<p>Story 1 description</p>"
+
),
+
MagicMock(
+
title="Story 2",
+
link="https://kite.kagi.com/test/world/2",
+
guid="https://kite.kagi.com/test/world/2",
+
published_parsed=(2024, 1, 15, 13, 0, 0, 0, 15, 0),
+
tags=[MagicMock(term="World")],
+
description="<p>Story 2 description</p>"
+
)
+
]
+
return feed
+
+
+
class TestAggregator:
+
"""Test suite for Aggregator orchestration."""
+
+
def test_initialize_aggregator(self, mock_config, tmp_path):
+
"""Test aggregator initialization."""
+
state_file = tmp_path / "state.json"
+
+
with patch('src.main.ConfigLoader') as MockConfigLoader:
+
mock_loader = Mock()
+
mock_loader.load.return_value = mock_config
+
MockConfigLoader.return_value = mock_loader
+
+
aggregator = Aggregator(
+
config_path=Path("config.yaml"),
+
state_file=state_file,
+
coves_client=Mock()
+
)
+
+
assert aggregator.config == mock_config
+
assert aggregator.state_file == state_file
+
+
def test_process_enabled_feeds_only(self, mock_config, tmp_path):
+
"""Test that only enabled feeds are processed."""
+
state_file = tmp_path / "state.json"
+
mock_client = Mock()
+
+
with patch('src.main.ConfigLoader') as MockConfigLoader, \
+
patch('src.main.RSSFetcher') as MockRSSFetcher:
+
+
mock_loader = Mock()
+
mock_loader.load.return_value = mock_config
+
MockConfigLoader.return_value = mock_loader
+
+
mock_fetcher = Mock()
+
MockRSSFetcher.return_value = mock_fetcher
+
+
aggregator = Aggregator(
+
config_path=Path("config.yaml"),
+
state_file=state_file,
+
coves_client=mock_client
+
)
+
+
# Mock empty feeds
+
mock_fetcher.fetch_feed.return_value = MagicMock(bozo=0, entries=[])
+
+
aggregator.run()
+
+
# Should only fetch enabled feeds (2)
+
assert mock_fetcher.fetch_feed.call_count == 2
+
+
def test_full_successful_flow(self, mock_config, mock_rss_feed, sample_story, tmp_path):
+
"""Test complete flow: fetch → parse → format → post → update state."""
+
state_file = tmp_path / "state.json"
+
mock_client = Mock()
+
mock_client.create_post.return_value = "at://did:plc:test/social.coves.post/abc123"
+
+
with patch('src.main.ConfigLoader') as MockConfigLoader, \
+
patch('src.main.RSSFetcher') as MockRSSFetcher, \
+
patch('src.main.KagiHTMLParser') as MockHTMLParser, \
+
patch('src.main.RichTextFormatter') as MockFormatter:
+
+
# Setup mocks
+
mock_loader = Mock()
+
mock_loader.load.return_value = mock_config
+
MockConfigLoader.return_value = mock_loader
+
+
mock_fetcher = Mock()
+
mock_fetcher.fetch_feed.return_value = mock_rss_feed
+
MockRSSFetcher.return_value = mock_fetcher
+
+
mock_parser = Mock()
+
mock_parser.parse_to_story.return_value = sample_story
+
MockHTMLParser.return_value = mock_parser
+
+
mock_formatter = Mock()
+
mock_formatter.format_full.return_value = {
+
"content": "Test content",
+
"facets": []
+
}
+
MockFormatter.return_value = mock_formatter
+
+
# Run aggregator
+
aggregator = Aggregator(
+
config_path=Path("config.yaml"),
+
state_file=state_file,
+
coves_client=mock_client
+
)
+
aggregator.run()
+
+
# Verify RSS fetching
+
assert mock_fetcher.fetch_feed.call_count == 2
+
+
# Verify parsing (2 entries per feed * 2 feeds = 4 total)
+
assert mock_parser.parse_to_story.call_count == 4
+
+
# Verify formatting
+
assert mock_formatter.format_full.call_count == 4
+
+
# Verify posting (should call create_post for each story)
+
assert mock_client.create_post.call_count == 4
+
+
def test_deduplication_skips_posted_stories(self, mock_config, mock_rss_feed, sample_story, tmp_path):
+
"""Test that already-posted stories are skipped."""
+
state_file = tmp_path / "state.json"
+
mock_client = Mock()
+
mock_client.create_post.return_value = "at://did:plc:test/social.coves.post/abc123"
+
+
with patch('src.main.ConfigLoader') as MockConfigLoader, \
+
patch('src.main.RSSFetcher') as MockRSSFetcher, \
+
patch('src.main.KagiHTMLParser') as MockHTMLParser, \
+
patch('src.main.RichTextFormatter') as MockFormatter:
+
+
# Setup mocks
+
mock_loader = Mock()
+
mock_loader.load.return_value = mock_config
+
MockConfigLoader.return_value = mock_loader
+
+
mock_fetcher = Mock()
+
mock_fetcher.fetch_feed.return_value = mock_rss_feed
+
MockRSSFetcher.return_value = mock_fetcher
+
+
mock_parser = Mock()
+
mock_parser.parse_to_story.return_value = sample_story
+
MockHTMLParser.return_value = mock_parser
+
+
mock_formatter = Mock()
+
mock_formatter.format_full.return_value = {
+
"content": "Test content",
+
"facets": []
+
}
+
MockFormatter.return_value = mock_formatter
+
+
# First run: posts all stories
+
aggregator = Aggregator(
+
config_path=Path("config.yaml"),
+
state_file=state_file,
+
coves_client=mock_client
+
)
+
aggregator.run()
+
+
# Verify first run posted stories
+
first_run_posts = mock_client.create_post.call_count
+
assert first_run_posts == 4
+
+
# Second run: should skip all (already posted)
+
mock_client.reset_mock()
+
aggregator2 = Aggregator(
+
config_path=Path("config.yaml"),
+
state_file=state_file,
+
coves_client=mock_client
+
)
+
aggregator2.run()
+
+
# Should not post any (all duplicates)
+
assert mock_client.create_post.call_count == 0
+
+
def test_continue_on_feed_error(self, mock_config, tmp_path):
+
"""Test that processing continues if one feed fails."""
+
state_file = tmp_path / "state.json"
+
mock_client = Mock()
+
+
with patch('src.main.ConfigLoader') as MockConfigLoader, \
+
patch('src.main.RSSFetcher') as MockRSSFetcher:
+
+
mock_loader = Mock()
+
mock_loader.load.return_value = mock_config
+
MockConfigLoader.return_value = mock_loader
+
+
mock_fetcher = Mock()
+
# First feed fails, second succeeds
+
mock_fetcher.fetch_feed.side_effect = [
+
Exception("Network error"),
+
MagicMock(bozo=0, entries=[])
+
]
+
MockRSSFetcher.return_value = mock_fetcher
+
+
aggregator = Aggregator(
+
config_path=Path("config.yaml"),
+
state_file=state_file,
+
coves_client=mock_client
+
)
+
+
# Should not raise exception
+
aggregator.run()
+
+
# Should have attempted both feeds
+
assert mock_fetcher.fetch_feed.call_count == 2
+
+
def test_handle_empty_feed(self, mock_config, tmp_path):
+
"""Test handling of empty RSS feeds."""
+
state_file = tmp_path / "state.json"
+
mock_client = Mock()
+
+
with patch('src.main.ConfigLoader') as MockConfigLoader, \
+
patch('src.main.RSSFetcher') as MockRSSFetcher:
+
+
mock_loader = Mock()
+
mock_loader.load.return_value = mock_config
+
MockConfigLoader.return_value = mock_loader
+
+
mock_fetcher = Mock()
+
mock_fetcher.fetch_feed.return_value = MagicMock(bozo=0, entries=[])
+
MockRSSFetcher.return_value = mock_fetcher
+
+
aggregator = Aggregator(
+
config_path=Path("config.yaml"),
+
state_file=state_file,
+
coves_client=mock_client
+
)
+
aggregator.run()
+
+
# Should not post anything
+
assert mock_client.create_post.call_count == 0
+
+
def test_dont_update_state_on_failed_post(self, mock_config, mock_rss_feed, sample_story, tmp_path):
+
"""Test that state is not updated if posting fails."""
+
state_file = tmp_path / "state.json"
+
mock_client = Mock()
+
mock_client.create_post.side_effect = Exception("Post failed")
+
+
with patch('src.main.ConfigLoader') as MockConfigLoader, \
+
patch('src.main.RSSFetcher') as MockRSSFetcher, \
+
patch('src.main.KagiHTMLParser') as MockHTMLParser, \
+
patch('src.main.RichTextFormatter') as MockFormatter:
+
+
# Setup mocks
+
mock_loader = Mock()
+
mock_loader.load.return_value = mock_config
+
MockConfigLoader.return_value = mock_loader
+
+
mock_fetcher = Mock()
+
mock_fetcher.fetch_feed.return_value = mock_rss_feed
+
MockRSSFetcher.return_value = mock_fetcher
+
+
mock_parser = Mock()
+
mock_parser.parse_to_story.return_value = sample_story
+
MockHTMLParser.return_value = mock_parser
+
+
mock_formatter = Mock()
+
mock_formatter.format_full.return_value = {
+
"content": "Test content",
+
"facets": []
+
}
+
MockFormatter.return_value = mock_formatter
+
+
# Run aggregator (posts will fail)
+
aggregator = Aggregator(
+
config_path=Path("config.yaml"),
+
state_file=state_file,
+
coves_client=mock_client
+
)
+
aggregator.run()
+
+
# Reset client to succeed
+
mock_client.reset_mock()
+
mock_client.create_post.return_value = "at://did:plc:test/social.coves.post/abc123"
+
+
# Second run: should try to post again (state wasn't updated)
+
aggregator2 = Aggregator(
+
config_path=Path("config.yaml"),
+
state_file=state_file,
+
coves_client=mock_client
+
)
+
aggregator2.run()
+
+
# Should post stories (they weren't marked as posted)
+
assert mock_client.create_post.call_count == 4
+
+
def test_update_last_run_timestamp(self, mock_config, tmp_path):
+
"""Test that last_run timestamp is updated after successful processing."""
+
state_file = tmp_path / "state.json"
+
mock_client = Mock()
+
+
with patch('src.main.ConfigLoader') as MockConfigLoader, \
+
patch('src.main.RSSFetcher') as MockRSSFetcher:
+
+
mock_loader = Mock()
+
mock_loader.load.return_value = mock_config
+
MockConfigLoader.return_value = mock_loader
+
+
mock_fetcher = Mock()
+
mock_fetcher.fetch_feed.return_value = MagicMock(bozo=0, entries=[])
+
MockRSSFetcher.return_value = mock_fetcher
+
+
aggregator = Aggregator(
+
config_path=Path("config.yaml"),
+
state_file=state_file,
+
coves_client=mock_client
+
)
+
aggregator.run()
+
+
# Verify last_run was updated for both feeds
+
feed1_last_run = aggregator.state_manager.get_last_run(
+
"https://news.kagi.com/world.xml"
+
)
+
feed2_last_run = aggregator.state_manager.get_last_run(
+
"https://news.kagi.com/tech.xml"
+
)
+
+
assert feed1_last_run is not None
+
assert feed2_last_run is not None
+
+
def test_create_post_with_image_embed(self, mock_config, mock_rss_feed, sample_story, tmp_path):
+
"""Test that posts include external image embeds."""
+
state_file = tmp_path / "state.json"
+
mock_client = Mock()
+
mock_client.create_post.return_value = "at://did:plc:test/social.coves.post/abc123"
+
+
# Mock create_external_embed to return proper embed structure
+
mock_client.create_external_embed.return_value = {
+
"$type": "social.coves.embed.external",
+
"external": {
+
"uri": sample_story.link,
+
"title": sample_story.title,
+
"description": sample_story.summary,
+
"thumb": sample_story.image_url
+
}
+
}
+
+
with patch('src.main.ConfigLoader') as MockConfigLoader, \
+
patch('src.main.RSSFetcher') as MockRSSFetcher, \
+
patch('src.main.KagiHTMLParser') as MockHTMLParser, \
+
patch('src.main.RichTextFormatter') as MockFormatter:
+
+
# Setup mocks
+
mock_loader = Mock()
+
mock_loader.load.return_value = mock_config
+
MockConfigLoader.return_value = mock_loader
+
+
mock_fetcher = Mock()
+
# Only one entry for simplicity
+
single_entry_feed = MagicMock(bozo=0, entries=[mock_rss_feed.entries[0]])
+
mock_fetcher.fetch_feed.return_value = single_entry_feed
+
MockRSSFetcher.return_value = mock_fetcher
+
+
mock_parser = Mock()
+
mock_parser.parse_to_story.return_value = sample_story
+
MockHTMLParser.return_value = mock_parser
+
+
mock_formatter = Mock()
+
mock_formatter.format_full.return_value = {
+
"content": "Test content",
+
"facets": []
+
}
+
MockFormatter.return_value = mock_formatter
+
+
# Run aggregator
+
aggregator = Aggregator(
+
config_path=Path("config.yaml"),
+
state_file=state_file,
+
coves_client=mock_client
+
)
+
aggregator.run()
+
+
# Verify create_post was called with embed
+
mock_client.create_post.assert_called()
+
call_kwargs = mock_client.create_post.call_args.kwargs
+
+
assert "embed" in call_kwargs
+
assert call_kwargs["embed"]["$type"] == "social.coves.embed.external"
+
assert call_kwargs["embed"]["external"]["uri"] == sample_story.link
+
assert call_kwargs["embed"]["external"]["title"] == sample_story.title
+
assert call_kwargs["embed"]["external"]["thumb"] == sample_story.image_url
+299
aggregators/kagi-news/tests/test_richtext_formatter.py
···
+
"""
+
Tests for Rich Text Formatter.
+
+
Tests conversion of KagiStory to Coves rich text format with facets.
+
"""
+
import pytest
+
from datetime import datetime
+
+
from src.richtext_formatter import RichTextFormatter
+
from src.models import KagiStory, Perspective, Quote, Source
+
+
+
@pytest.fixture
+
def sample_story():
+
"""Create a sample KagiStory for testing."""
+
return KagiStory(
+
title="Trump to meet Xi in South Korea",
+
link="https://kite.kagi.com/test/world/10",
+
guid="https://kite.kagi.com/test/world/10",
+
pub_date=datetime(2025, 10, 23, 20, 56, 0),
+
categories=["World", "World/Diplomacy"],
+
summary="The White House confirmed President Trump will hold a bilateral meeting with Chinese President Xi Jinping in South Korea on October 30.",
+
highlights=[
+
"Itinerary details: The Asia swing begins in Malaysia, continues to Japan.",
+
"APEC context: US officials indicated the leaders will meet on the sidelines."
+
],
+
perspectives=[
+
Perspective(
+
actor="President Trump",
+
description="He said his first question to President Xi would be about fentanyl.",
+
source_url="https://www.straitstimes.com/world/test"
+
),
+
Perspective(
+
actor="White House (press secretary)",
+
description="Karoline Leavitt confirmed the bilateral meeting.",
+
source_url="https://www.scmp.com/news/test"
+
)
+
],
+
quote=Quote(
+
text="Work out a lot of our doubts and questions",
+
attribution="President Trump"
+
),
+
sources=[
+
Source(
+
title="Trump to meet Xi in South Korea",
+
url="https://www.straitstimes.com/world/test",
+
domain="straitstimes.com"
+
),
+
Source(
+
title="Trump meeting Xi next Thursday",
+
url="https://www.scmp.com/news/test",
+
domain="scmp.com"
+
)
+
],
+
image_url="https://kagiproxy.com/img/test123",
+
image_alt="Test image"
+
)
+
+
+
class TestRichTextFormatter:
+
"""Test suite for RichTextFormatter."""
+
+
def test_format_full_returns_content_and_facets(self, sample_story):
+
"""Test that format_full returns content and facets."""
+
formatter = RichTextFormatter()
+
result = formatter.format_full(sample_story)
+
+
assert 'content' in result
+
assert 'facets' in result
+
assert isinstance(result['content'], str)
+
assert isinstance(result['facets'], list)
+
+
def test_content_structure(self, sample_story):
+
"""Test that content has correct structure."""
+
formatter = RichTextFormatter()
+
result = formatter.format_full(sample_story)
+
content = result['content']
+
+
# Check all sections are present
+
assert sample_story.summary in content
+
assert "Highlights:" in content
+
assert "Perspectives:" in content
+
assert "Sources:" in content
+
assert sample_story.quote.text in content
+
assert "📰 Story aggregated by Kagi News" in content
+
+
def test_facets_for_bold_headers(self, sample_story):
+
"""Test that section headers have bold facets."""
+
formatter = RichTextFormatter()
+
result = formatter.format_full(sample_story)
+
+
# Find bold facets
+
bold_facets = [
+
f for f in result['facets']
+
if any(feat.get('$type') == 'social.coves.richtext.facet#bold'
+
for feat in f['features'])
+
]
+
+
assert len(bold_facets) > 0
+
+
# Check that "Highlights:" is bolded
+
content = result['content']
+
highlights_pos = content.find("Highlights:")
+
+
# Should have a bold facet covering "Highlights:"
+
has_highlights_bold = any(
+
f['index']['byteStart'] <= highlights_pos and
+
f['index']['byteEnd'] >= highlights_pos + len("Highlights:")
+
for f in bold_facets
+
)
+
assert has_highlights_bold
+
+
def test_facets_for_italic_quote(self, sample_story):
+
"""Test that quotes have italic facets."""
+
formatter = RichTextFormatter()
+
result = formatter.format_full(sample_story)
+
+
# Find italic facets
+
italic_facets = [
+
f for f in result['facets']
+
if any(feat.get('$type') == 'social.coves.richtext.facet#italic'
+
for feat in f['features'])
+
]
+
+
assert len(italic_facets) > 0
+
+
# The quote text is wrapped with quotes, so search for that
+
content = result['content']
+
quote_with_quotes = f'"{sample_story.quote.text}"'
+
quote_char_pos = content.find(quote_with_quotes)
+
+
# Convert character position to byte position
+
quote_byte_start = len(content[:quote_char_pos].encode('utf-8'))
+
quote_byte_end = len(content[:quote_char_pos + len(quote_with_quotes)].encode('utf-8'))
+
+
has_quote_italic = any(
+
f['index']['byteStart'] <= quote_byte_start and
+
f['index']['byteEnd'] >= quote_byte_end
+
for f in italic_facets
+
)
+
assert has_quote_italic
+
+
def test_facets_for_links(self, sample_story):
+
"""Test that URLs have link facets."""
+
formatter = RichTextFormatter()
+
result = formatter.format_full(sample_story)
+
+
# Find link facets
+
link_facets = [
+
f for f in result['facets']
+
if any(feat.get('$type') == 'social.coves.richtext.facet#link'
+
for feat in f['features'])
+
]
+
+
# Should have links for: 2 sources + 2 perspectives + 1 Kagi News link = 5 minimum
+
assert len(link_facets) >= 5
+
+
# Check that first source URL has a link facet
+
source_urls = [s.url for s in sample_story.sources]
+
for url in source_urls:
+
has_link = any(
+
any(feat.get('uri') == url for feat in f['features'])
+
for f in link_facets
+
)
+
assert has_link, f"Missing link facet for {url}"
+
+
def test_utf8_byte_positions(self):
+
"""Test UTF-8 byte position calculation with multi-byte characters."""
+
# Create story with emoji and non-ASCII characters
+
story = KagiStory(
+
title="Test 👋 Story",
+
link="https://test.com",
+
guid="https://test.com",
+
pub_date=datetime.now(),
+
categories=["Test"],
+
summary="Hello 世界 this is a test with emoji 🎉",
+
highlights=["Test highlight"],
+
perspectives=[],
+
quote=None,
+
sources=[],
+
)
+
+
formatter = RichTextFormatter()
+
result = formatter.format_full(story)
+
+
# Verify content contains the emoji
+
assert "👋" in result['content'] or "🎉" in result['content']
+
+
# Verify all facet byte positions are valid
+
content_bytes = result['content'].encode('utf-8')
+
for facet in result['facets']:
+
start = facet['index']['byteStart']
+
end = facet['index']['byteEnd']
+
+
# Positions should be within bounds
+
assert 0 <= start < len(content_bytes)
+
assert start < end <= len(content_bytes)
+
+
def test_format_story_without_optional_fields(self):
+
"""Test formatting story with missing optional fields."""
+
minimal_story = KagiStory(
+
title="Minimal Story",
+
link="https://test.com",
+
guid="https://test.com",
+
pub_date=datetime.now(),
+
categories=["Test"],
+
summary="Just a summary.",
+
highlights=[], # Empty
+
perspectives=[], # Empty
+
quote=None, # Missing
+
sources=[], # Empty
+
)
+
+
formatter = RichTextFormatter()
+
result = formatter.format_full(minimal_story)
+
+
# Should still have content and facets
+
assert result['content']
+
assert result['facets']
+
+
# Should have summary
+
assert "Just a summary." in result['content']
+
+
# Should NOT have empty sections
+
assert "Highlights:" not in result['content']
+
assert "Perspectives:" not in result['content']
+
+
def test_perspective_actor_is_bolded(self, sample_story):
+
"""Test that perspective actor names are bolded."""
+
formatter = RichTextFormatter()
+
result = formatter.format_full(sample_story)
+
+
content = result['content']
+
bold_facets = [
+
f for f in result['facets']
+
if any(feat.get('$type') == 'social.coves.richtext.facet#bold'
+
for feat in f['features'])
+
]
+
+
# Find "President Trump:" in perspectives section
+
actor = "President Trump:"
+
perspectives_start = content.find("Perspectives:")
+
actor_char_pos = content.find(actor, perspectives_start)
+
+
if actor_char_pos != -1: # If found in perspectives
+
# Convert character position to byte position
+
actor_byte_start = len(content[:actor_char_pos].encode('utf-8'))
+
actor_byte_end = len(content[:actor_char_pos + len(actor)].encode('utf-8'))
+
+
has_actor_bold = any(
+
f['index']['byteStart'] <= actor_byte_start and
+
f['index']['byteEnd'] >= actor_byte_end
+
for f in bold_facets
+
)
+
assert has_actor_bold
+
+
def test_kagi_attribution_link(self, sample_story):
+
"""Test that Kagi News attribution has a link to the story."""
+
formatter = RichTextFormatter()
+
result = formatter.format_full(sample_story)
+
+
# Should have link to Kagi story
+
link_facets = [
+
f for f in result['facets']
+
if any(feat.get('$type') == 'social.coves.richtext.facet#link'
+
for feat in f['features'])
+
]
+
+
# Find link to the Kagi story URL
+
kagi_link = any(
+
any(feat.get('uri') == sample_story.link for feat in f['features'])
+
for f in link_facets
+
)
+
assert kagi_link, "Missing link to Kagi story in attribution"
+
+
def test_facets_do_not_overlap(self, sample_story):
+
"""Test that facets with same feature type don't overlap."""
+
formatter = RichTextFormatter()
+
result = formatter.format_full(sample_story)
+
+
# Group facets by type
+
facets_by_type = {}
+
for facet in result['facets']:
+
for feature in facet['features']:
+
ftype = feature['$type']
+
if ftype not in facets_by_type:
+
facets_by_type[ftype] = []
+
facets_by_type[ftype].append(facet)
+
+
# Check for overlaps within each type
+
for ftype, facets in facets_by_type.items():
+
for i, f1 in enumerate(facets):
+
for f2 in facets[i+1:]:
+
start1, end1 = f1['index']['byteStart'], f1['index']['byteEnd']
+
start2, end2 = f2['index']['byteStart'], f2['index']['byteEnd']
+
+
# Check if they overlap
+
overlaps = (start1 < end2 and start2 < end1)
+
assert not overlaps, f"Overlapping facets of type {ftype}: {f1} and {f2}"
+91
aggregators/kagi-news/tests/test_rss_fetcher.py
···
+
"""
+
Tests for RSS feed fetching functionality.
+
"""
+
import pytest
+
import responses
+
from pathlib import Path
+
+
from src.rss_fetcher import RSSFetcher
+
+
+
@pytest.fixture
+
def sample_rss_feed():
+
"""Load sample RSS feed from fixtures."""
+
fixture_path = Path(__file__).parent / "fixtures" / "world.xml"
+
# For now, use a minimal test feed
+
return """<?xml version='1.0' encoding='UTF-8'?>
+
<rss version="2.0">
+
<channel>
+
<title>Kagi News - World</title>
+
<item>
+
<title>Test Story</title>
+
<link>https://kite.kagi.com/test/world/1</link>
+
<guid>https://kite.kagi.com/test/world/1</guid>
+
<pubDate>Fri, 24 Oct 2025 12:00:00 +0000</pubDate>
+
<category>World</category>
+
</item>
+
</channel>
+
</rss>"""
+
+
+
class TestRSSFetcher:
+
"""Test suite for RSSFetcher."""
+
+
@responses.activate
+
def test_fetch_feed_success(self, sample_rss_feed):
+
"""Test successful RSS feed fetch."""
+
url = "https://news.kagi.com/world.xml"
+
responses.add(responses.GET, url, body=sample_rss_feed, status=200)
+
+
fetcher = RSSFetcher()
+
feed = fetcher.fetch_feed(url)
+
+
assert feed is not None
+
assert feed.feed.title == "Kagi News - World"
+
assert len(feed.entries) == 1
+
assert feed.entries[0].title == "Test Story"
+
+
@responses.activate
+
def test_fetch_feed_timeout(self):
+
"""Test fetch with timeout."""
+
url = "https://news.kagi.com/world.xml"
+
responses.add(responses.GET, url, body="timeout", status=408)
+
+
fetcher = RSSFetcher(timeout=5)
+
+
with pytest.raises(Exception): # Should raise on timeout
+
fetcher.fetch_feed(url)
+
+
@responses.activate
+
def test_fetch_feed_with_retry(self, sample_rss_feed):
+
"""Test fetch with retry on failure then success."""
+
url = "https://news.kagi.com/world.xml"
+
+
# First call fails, second succeeds
+
responses.add(responses.GET, url, body="error", status=500)
+
responses.add(responses.GET, url, body=sample_rss_feed, status=200)
+
+
fetcher = RSSFetcher(max_retries=2)
+
feed = fetcher.fetch_feed(url)
+
+
assert feed is not None
+
assert len(feed.entries) == 1
+
+
@responses.activate
+
def test_fetch_feed_invalid_xml(self):
+
"""Test handling of invalid XML."""
+
url = "https://news.kagi.com/world.xml"
+
responses.add(responses.GET, url, body="Not valid XML!", status=200)
+
+
fetcher = RSSFetcher()
+
feed = fetcher.fetch_feed(url)
+
+
# feedparser is lenient, but should have bozo flag set
+
assert feed.bozo == 1 # feedparser uses 1 for True
+
+
def test_fetch_feed_requires_url(self):
+
"""Test that fetch_feed requires a URL."""
+
fetcher = RSSFetcher()
+
+
with pytest.raises((ValueError, TypeError)):
+
fetcher.fetch_feed("")
+227
aggregators/kagi-news/tests/test_state_manager.py
···
+
"""
+
Tests for State Manager.
+
+
Tests deduplication state tracking and persistence.
+
"""
+
import pytest
+
import json
+
import tempfile
+
from pathlib import Path
+
from datetime import datetime, timedelta
+
+
from src.state_manager import StateManager
+
+
+
@pytest.fixture
+
def temp_state_file():
+
"""Create a temporary state file for testing."""
+
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f:
+
temp_path = Path(f.name)
+
yield temp_path
+
# Cleanup
+
if temp_path.exists():
+
temp_path.unlink()
+
+
+
class TestStateManager:
+
"""Test suite for StateManager."""
+
+
def test_initialize_new_state_file(self, temp_state_file):
+
"""Test initializing a new state file."""
+
manager = StateManager(temp_state_file)
+
+
# Should create an empty state
+
assert temp_state_file.exists()
+
state = json.loads(temp_state_file.read_text())
+
assert 'feeds' in state
+
assert state['feeds'] == {}
+
+
def test_is_posted_returns_false_for_new_guid(self, temp_state_file):
+
"""Test that is_posted returns False for new GUIDs."""
+
manager = StateManager(temp_state_file)
+
feed_url = "https://news.kagi.com/world.xml"
+
guid = "https://kite.kagi.com/test/world/1"
+
+
assert not manager.is_posted(feed_url, guid)
+
+
def test_mark_posted_stores_guid(self, temp_state_file):
+
"""Test that mark_posted stores GUIDs."""
+
manager = StateManager(temp_state_file)
+
feed_url = "https://news.kagi.com/world.xml"
+
guid = "https://kite.kagi.com/test/world/1"
+
post_uri = "at://did:plc:test/social.coves.post/abc123"
+
+
manager.mark_posted(feed_url, guid, post_uri)
+
+
# Should now return True
+
assert manager.is_posted(feed_url, guid)
+
+
def test_state_persists_across_instances(self, temp_state_file):
+
"""Test that state persists when creating new instances."""
+
feed_url = "https://news.kagi.com/world.xml"
+
guid = "https://kite.kagi.com/test/world/1"
+
post_uri = "at://did:plc:test/social.coves.post/abc123"
+
+
# First instance marks as posted
+
manager1 = StateManager(temp_state_file)
+
manager1.mark_posted(feed_url, guid, post_uri)
+
+
# Second instance should see the same state
+
manager2 = StateManager(temp_state_file)
+
assert manager2.is_posted(feed_url, guid)
+
+
def test_track_last_run_timestamp(self, temp_state_file):
+
"""Test tracking last successful run timestamp."""
+
manager = StateManager(temp_state_file)
+
feed_url = "https://news.kagi.com/world.xml"
+
timestamp = datetime.now()
+
+
manager.update_last_run(feed_url, timestamp)
+
+
retrieved = manager.get_last_run(feed_url)
+
assert retrieved is not None
+
# Compare timestamps (allow small difference due to serialization)
+
assert abs((retrieved - timestamp).total_seconds()) < 1
+
+
def test_get_last_run_returns_none_for_new_feed(self, temp_state_file):
+
"""Test that get_last_run returns None for new feeds."""
+
manager = StateManager(temp_state_file)
+
feed_url = "https://news.kagi.com/world.xml"
+
+
assert manager.get_last_run(feed_url) is None
+
+
def test_cleanup_old_guids(self, temp_state_file):
+
"""Test cleanup of old GUIDs (> 30 days)."""
+
manager = StateManager(temp_state_file)
+
feed_url = "https://news.kagi.com/world.xml"
+
+
# Add recent GUID
+
recent_guid = "https://kite.kagi.com/test/world/1"
+
manager.mark_posted(feed_url, recent_guid, "at://test/1")
+
+
# Manually add old GUID (> 30 days)
+
old_timestamp = (datetime.now() - timedelta(days=31)).isoformat()
+
state_data = json.loads(temp_state_file.read_text())
+
state_data['feeds'][feed_url]['posted_guids'].append({
+
'guid': 'https://kite.kagi.com/test/world/old',
+
'post_uri': 'at://test/old',
+
'posted_at': old_timestamp
+
})
+
temp_state_file.write_text(json.dumps(state_data, indent=2))
+
+
# Reload and cleanup
+
manager = StateManager(temp_state_file)
+
manager.cleanup_old_entries(feed_url)
+
+
# Recent GUID should still be there
+
assert manager.is_posted(feed_url, recent_guid)
+
+
# Old GUID should be removed
+
assert not manager.is_posted(feed_url, 'https://kite.kagi.com/test/world/old')
+
+
def test_limit_guids_to_100_per_feed(self, temp_state_file):
+
"""Test that only last 100 GUIDs are kept per feed."""
+
manager = StateManager(temp_state_file)
+
feed_url = "https://news.kagi.com/world.xml"
+
+
# Add 150 GUIDs
+
for i in range(150):
+
guid = f"https://kite.kagi.com/test/world/{i}"
+
manager.mark_posted(feed_url, guid, f"at://test/{i}")
+
+
# Cleanup (should limit to 100)
+
manager.cleanup_old_entries(feed_url)
+
+
# Reload state
+
manager = StateManager(temp_state_file)
+
+
# Should have exactly 100 entries (most recent)
+
state_data = json.loads(temp_state_file.read_text())
+
assert len(state_data['feeds'][feed_url]['posted_guids']) == 100
+
+
# Oldest entries should be removed
+
assert not manager.is_posted(feed_url, "https://kite.kagi.com/test/world/0")
+
assert not manager.is_posted(feed_url, "https://kite.kagi.com/test/world/49")
+
+
# Recent entries should still be there
+
assert manager.is_posted(feed_url, "https://kite.kagi.com/test/world/149")
+
assert manager.is_posted(feed_url, "https://kite.kagi.com/test/world/100")
+
+
def test_multiple_feeds_tracked_separately(self, temp_state_file):
+
"""Test that multiple feeds are tracked independently."""
+
manager = StateManager(temp_state_file)
+
+
feed1 = "https://news.kagi.com/world.xml"
+
feed2 = "https://news.kagi.com/tech.xml"
+
guid1 = "https://kite.kagi.com/test/world/1"
+
guid2 = "https://kite.kagi.com/test/tech/1"
+
+
manager.mark_posted(feed1, guid1, "at://test/1")
+
manager.mark_posted(feed2, guid2, "at://test/2")
+
+
# Each feed should only know about its own GUIDs
+
assert manager.is_posted(feed1, guid1)
+
assert not manager.is_posted(feed1, guid2)
+
+
assert manager.is_posted(feed2, guid2)
+
assert not manager.is_posted(feed2, guid1)
+
+
def test_get_posted_count(self, temp_state_file):
+
"""Test getting count of posted items per feed."""
+
manager = StateManager(temp_state_file)
+
feed_url = "https://news.kagi.com/world.xml"
+
+
# Initially 0
+
assert manager.get_posted_count(feed_url) == 0
+
+
# Add 5 items
+
for i in range(5):
+
manager.mark_posted(feed_url, f"guid-{i}", f"post-{i}")
+
+
assert manager.get_posted_count(feed_url) == 5
+
+
def test_state_file_format_is_valid_json(self, temp_state_file):
+
"""Test that state file is always valid JSON."""
+
manager = StateManager(temp_state_file)
+
feed_url = "https://news.kagi.com/world.xml"
+
+
manager.mark_posted(feed_url, "test-guid", "test-post-uri")
+
manager.update_last_run(feed_url, datetime.now())
+
+
# Should be valid JSON
+
with open(temp_state_file) as f:
+
state = json.load(f)
+
+
assert 'feeds' in state
+
assert feed_url in state['feeds']
+
assert 'posted_guids' in state['feeds'][feed_url]
+
assert 'last_successful_run' in state['feeds'][feed_url]
+
+
def test_automatic_cleanup_on_mark_posted(self, temp_state_file):
+
"""Test that cleanup happens automatically when marking posted."""
+
manager = StateManager(temp_state_file)
+
feed_url = "https://news.kagi.com/world.xml"
+
+
# Add old entry manually
+
old_timestamp = (datetime.now() - timedelta(days=31)).isoformat()
+
state_data = {
+
'feeds': {
+
feed_url: {
+
'posted_guids': [{
+
'guid': 'old-guid',
+
'post_uri': 'old-uri',
+
'posted_at': old_timestamp
+
}],
+
'last_successful_run': None
+
}
+
}
+
}
+
temp_state_file.write_text(json.dumps(state_data, indent=2))
+
+
# Reload and add new entry (should trigger cleanup)
+
manager = StateManager(temp_state_file)
+
manager.mark_posted(feed_url, "new-guid", "new-uri")
+
+
# Old entry should be gone
+
assert not manager.is_posted(feed_url, "old-guid")
+
assert manager.is_posted(feed_url, "new-guid")