A community based topic aggregation platform built on atproto

feat(aggregators): implement Kagi News RSS aggregator core

Implements Phase 1 of the Kagi News aggregator system, a reference
implementation for the Coves aggregator architecture.

Core components:
- RSS Fetcher: Fetches feeds with retry logic and error handling
- HTML Parser: Extracts structured data from Kagi's HTML descriptions
(summary, highlights, perspectives, quotes, sources)
- Rich Text Formatter: Formats content with proper Coves facets
- State Manager: JSON-based deduplication with rolling window
- Config Manager: YAML configuration with environment variable support
- Coves Client: HTTP client for authentication and post creation
- Main Orchestrator: Coordinates all components with error isolation

Key features:
- Verified feed structure: 3 H3 sections (Highlights, Perspectives, Sources)
- Historical context woven into summary/highlights
- UTF-8 byte position calculation for facets
- Feed-level and item-level error isolation
- Structured logging throughout

Implementation uses Python 3.11+ with:
- feedparser for RSS parsing
- beautifulsoup4 for HTML extraction
- requests for HTTP operations
- pyyaml for configuration

+41
aggregators/kagi-news/.gitignore
···
+
# Environment and config
+
.env
+
config.yaml
+
venv/
+
+
# State files
+
data/*.json
+
data/world.xml
+
+
# Python
+
__pycache__/
+
*.py[cod]
+
*$py.class
+
*.so
+
.Python
+
build/
+
develop-eggs/
+
dist/
+
downloads/
+
eggs/
+
.eggs/
+
lib/
+
lib64/
+
parts/
+
sdist/
+
var/
+
wheels/
+
*.egg-info/
+
.installed.cfg
+
*.egg
+
+
# Testing
+
.pytest_cache/
+
.coverage
+
htmlcov/
+
+
# IDE
+
.vscode/
+
.idea/
+
*.swp
+
*.swo
+3
aggregators/kagi-news/src/__init__.py
···
+
"""Kagi News RSS Aggregator for Coves."""
+
+
__version__ = "0.1.0"
+165
aggregators/kagi-news/src/config.py
···
+
"""
+
Configuration Loader for Kagi News Aggregator.
+
+
Loads and validates configuration from YAML files.
+
"""
+
import os
+
import logging
+
from pathlib import Path
+
from typing import Dict, Any
+
import yaml
+
from urllib.parse import urlparse
+
+
from src.models import AggregatorConfig, FeedConfig
+
+
logger = logging.getLogger(__name__)
+
+
+
class ConfigError(Exception):
+
"""Configuration error."""
+
pass
+
+
+
class ConfigLoader:
+
"""
+
Loads and validates aggregator configuration.
+
+
Supports:
+
- Loading from YAML file
+
- Environment variable overrides
+
- Validation of required fields
+
- URL validation
+
"""
+
+
def __init__(self, config_path: Path):
+
"""
+
Initialize config loader.
+
+
Args:
+
config_path: Path to config.yaml file
+
"""
+
self.config_path = Path(config_path)
+
+
def load(self) -> AggregatorConfig:
+
"""
+
Load and validate configuration.
+
+
Returns:
+
AggregatorConfig object
+
+
Raises:
+
ConfigError: If config is invalid or missing
+
"""
+
# Check file exists
+
if not self.config_path.exists():
+
raise ConfigError(f"Configuration file not found: {self.config_path}")
+
+
# Load YAML
+
try:
+
with open(self.config_path, 'r') as f:
+
config_data = yaml.safe_load(f)
+
except yaml.YAMLError as e:
+
raise ConfigError(f"Failed to parse YAML: {e}")
+
+
if not config_data:
+
raise ConfigError("Configuration file is empty")
+
+
# Validate and parse
+
try:
+
return self._parse_config(config_data)
+
except Exception as e:
+
raise ConfigError(f"Invalid configuration: {e}")
+
+
def _parse_config(self, data: Dict[str, Any]) -> AggregatorConfig:
+
"""
+
Parse and validate configuration data.
+
+
Args:
+
data: Parsed YAML data
+
+
Returns:
+
AggregatorConfig object
+
+
Raises:
+
ConfigError: If validation fails
+
"""
+
# Get coves_api_url (with env override)
+
coves_api_url = os.getenv('COVES_API_URL', data.get('coves_api_url'))
+
if not coves_api_url:
+
raise ConfigError("Missing required field: coves_api_url")
+
+
# Validate URL
+
if not self._is_valid_url(coves_api_url):
+
raise ConfigError(f"Invalid URL for coves_api_url: {coves_api_url}")
+
+
# Get log level (default to info)
+
log_level = data.get('log_level', 'info')
+
+
# Parse feeds
+
feeds_data = data.get('feeds', [])
+
if not feeds_data:
+
raise ConfigError("Configuration must include at least one feed")
+
+
feeds = []
+
for feed_data in feeds_data:
+
feed = self._parse_feed(feed_data)
+
feeds.append(feed)
+
+
logger.info(f"Loaded configuration with {len(feeds)} feeds ({sum(1 for f in feeds if f.enabled)} enabled)")
+
+
return AggregatorConfig(
+
coves_api_url=coves_api_url,
+
feeds=feeds,
+
log_level=log_level
+
)
+
+
def _parse_feed(self, data: Dict[str, Any]) -> FeedConfig:
+
"""
+
Parse and validate a single feed configuration.
+
+
Args:
+
data: Feed configuration data
+
+
Returns:
+
FeedConfig object
+
+
Raises:
+
ConfigError: If validation fails
+
"""
+
# Required fields
+
required_fields = ['name', 'url', 'community_handle']
+
for field in required_fields:
+
if field not in data:
+
raise ConfigError(f"Missing required field in feed config: {field}")
+
+
name = data['name']
+
url = data['url']
+
community_handle = data['community_handle']
+
enabled = data.get('enabled', True) # Default to True
+
+
# Validate URL
+
if not self._is_valid_url(url):
+
raise ConfigError(f"Invalid URL for feed '{name}': {url}")
+
+
return FeedConfig(
+
name=name,
+
url=url,
+
community_handle=community_handle,
+
enabled=enabled
+
)
+
+
def _is_valid_url(self, url: str) -> bool:
+
"""
+
Validate URL format.
+
+
Args:
+
url: URL to validate
+
+
Returns:
+
True if valid, False otherwise
+
"""
+
try:
+
result = urlparse(url)
+
return all([result.scheme, result.netloc])
+
except Exception:
+
return False
+175
aggregators/kagi-news/src/coves_client.py
···
+
"""
+
Coves API Client for posting to communities.
+
+
Handles authentication and posting via XRPC.
+
"""
+
import logging
+
import requests
+
from typing import Dict, List, Optional
+
from atproto import Client
+
+
logger = logging.getLogger(__name__)
+
+
+
class CovesClient:
+
"""
+
Client for posting to Coves communities via XRPC.
+
+
Handles:
+
- Authentication with aggregator credentials
+
- Creating posts in communities (social.coves.post.create)
+
- External embed formatting
+
"""
+
+
def __init__(self, api_url: str, handle: str, password: str, pds_url: Optional[str] = None):
+
"""
+
Initialize Coves client.
+
+
Args:
+
api_url: Coves AppView URL for posting (e.g., "http://localhost:8081")
+
handle: Aggregator handle (e.g., "kagi-news.coves.social")
+
password: Aggregator password/app password
+
pds_url: Optional PDS URL for authentication (defaults to api_url)
+
"""
+
self.api_url = api_url
+
self.pds_url = pds_url or api_url # Auth through PDS, post through AppView
+
self.handle = handle
+
self.password = password
+
self.client = Client(base_url=self.pds_url) # Use PDS for auth
+
self._authenticated = False
+
+
def authenticate(self):
+
"""
+
Authenticate with Coves API.
+
+
Uses com.atproto.server.createSession directly to avoid
+
Bluesky-specific endpoints that don't exist on Coves PDS.
+
+
Raises:
+
Exception: If authentication fails
+
"""
+
try:
+
logger.info(f"Authenticating as {self.handle}")
+
+
# Use createSession directly (avoid app.bsky.actor.getProfile)
+
session = self.client.com.atproto.server.create_session(
+
{"identifier": self.handle, "password": self.password}
+
)
+
+
# Manually set session (skip profile fetch)
+
self.client._session = session
+
self._authenticated = True
+
self.did = session.did
+
+
logger.info(f"Authentication successful (DID: {self.did})")
+
except Exception as e:
+
logger.error(f"Authentication failed: {e}")
+
raise
+
+
def create_post(
+
self,
+
community_handle: str,
+
content: str,
+
facets: List[Dict],
+
embed: Optional[Dict] = None
+
) -> str:
+
"""
+
Create a post in a community.
+
+
Args:
+
community_handle: Community handle (e.g., "world-news.coves.social")
+
content: Post content (rich text)
+
facets: Rich text facets (formatting, links)
+
embed: Optional external embed
+
+
Returns:
+
AT Proto URI of created post (e.g., "at://did:plc:.../social.coves.post/...")
+
+
Raises:
+
Exception: If post creation fails
+
"""
+
if not self._authenticated:
+
self.authenticate()
+
+
try:
+
# Prepare post data for social.coves.post.create endpoint
+
post_data = {
+
"community": community_handle,
+
"content": content,
+
"facets": facets
+
}
+
+
# Add embed if provided
+
if embed:
+
post_data["embed"] = embed
+
+
# Use Coves-specific endpoint (not direct PDS write)
+
# This provides validation, authorization, and business logic
+
logger.info(f"Creating post in community: {community_handle}")
+
+
# Make direct HTTP request to XRPC endpoint
+
url = f"{self.api_url}/xrpc/social.coves.post.create"
+
headers = {
+
"Authorization": f"Bearer {self.client._session.access_jwt}",
+
"Content-Type": "application/json"
+
}
+
+
response = requests.post(url, json=post_data, headers=headers, timeout=30)
+
+
# Log detailed error if request fails
+
if not response.ok:
+
error_body = response.text
+
logger.error(f"Post creation failed ({response.status_code}): {error_body}")
+
response.raise_for_status()
+
+
result = response.json()
+
post_uri = result["uri"]
+
logger.info(f"Post created: {post_uri}")
+
return post_uri
+
+
except Exception as e:
+
logger.error(f"Failed to create post: {e}")
+
raise
+
+
def create_external_embed(
+
self,
+
uri: str,
+
title: str,
+
description: str,
+
thumb: Optional[str] = None
+
) -> Dict:
+
"""
+
Create external embed object for hot-linked content.
+
+
Args:
+
uri: External URL (story link)
+
title: Story title
+
description: Story description/summary
+
thumb: Optional thumbnail image URL
+
+
Returns:
+
External embed dictionary
+
"""
+
embed = {
+
"$type": "social.coves.embed.external",
+
"external": {
+
"uri": uri,
+
"title": title,
+
"description": description
+
}
+
}
+
+
if thumb:
+
embed["external"]["thumb"] = thumb
+
+
return embed
+
+
def _get_timestamp(self) -> str:
+
"""
+
Get current timestamp in ISO 8601 format.
+
+
Returns:
+
ISO timestamp string
+
"""
+
from datetime import datetime, timezone
+
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
+300
aggregators/kagi-news/src/html_parser.py
···
+
"""
+
Kagi News HTML description parser.
+
+
Parses the HTML content from RSS feed item descriptions
+
into structured data.
+
"""
+
import re
+
import logging
+
from typing import Dict, List, Optional
+
from datetime import datetime
+
from bs4 import BeautifulSoup
+
from urllib.parse import urlparse
+
+
from src.models import KagiStory, Perspective, Quote, Source
+
+
logger = logging.getLogger(__name__)
+
+
+
class KagiHTMLParser:
+
"""Parses Kagi News HTML descriptions into structured data."""
+
+
def parse(self, html_description: str) -> Dict:
+
"""
+
Parse HTML description into structured data.
+
+
Args:
+
html_description: HTML content from RSS item description
+
+
Returns:
+
Dictionary with extracted data:
+
- summary: str
+
- image_url: Optional[str]
+
- image_alt: Optional[str]
+
- highlights: List[str]
+
- quote: Optional[Dict[str, str]]
+
- perspectives: List[Dict]
+
- sources: List[Dict]
+
"""
+
soup = BeautifulSoup(html_description, 'html.parser')
+
+
return {
+
'summary': self._extract_summary(soup),
+
'image_url': self._extract_image_url(soup),
+
'image_alt': self._extract_image_alt(soup),
+
'highlights': self._extract_highlights(soup),
+
'quote': self._extract_quote(soup),
+
'perspectives': self._extract_perspectives(soup),
+
'sources': self._extract_sources(soup),
+
}
+
+
def parse_to_story(
+
self,
+
title: str,
+
link: str,
+
guid: str,
+
pub_date: datetime,
+
categories: List[str],
+
html_description: str
+
) -> KagiStory:
+
"""
+
Parse HTML and create a KagiStory object.
+
+
Args:
+
title: Story title
+
link: Story URL
+
guid: Unique identifier
+
pub_date: Publication date
+
categories: List of categories
+
html_description: HTML content from description
+
+
Returns:
+
KagiStory object
+
"""
+
parsed = self.parse(html_description)
+
+
# Convert parsed data to model objects
+
perspectives = [
+
Perspective(
+
actor=p['actor'],
+
description=p['description'],
+
source_url=p['source_url']
+
)
+
for p in parsed['perspectives']
+
]
+
+
sources = [
+
Source(
+
title=s['title'],
+
url=s['url'],
+
domain=s['domain']
+
)
+
for s in parsed['sources']
+
]
+
+
quote = None
+
if parsed['quote']:
+
quote = Quote(
+
text=parsed['quote']['text'],
+
attribution=parsed['quote']['attribution']
+
)
+
+
return KagiStory(
+
title=title,
+
link=link,
+
guid=guid,
+
pub_date=pub_date,
+
categories=categories,
+
summary=parsed['summary'],
+
highlights=parsed['highlights'],
+
perspectives=perspectives,
+
quote=quote,
+
sources=sources,
+
image_url=parsed['image_url'],
+
image_alt=parsed['image_alt']
+
)
+
+
def _extract_summary(self, soup: BeautifulSoup) -> str:
+
"""Extract summary from first <p> tag."""
+
p_tag = soup.find('p')
+
if p_tag:
+
return p_tag.get_text(strip=True)
+
return ""
+
+
def _extract_image_url(self, soup: BeautifulSoup) -> Optional[str]:
+
"""Extract image URL from <img> tag."""
+
img_tag = soup.find('img')
+
if img_tag and img_tag.get('src'):
+
return img_tag['src']
+
return None
+
+
def _extract_image_alt(self, soup: BeautifulSoup) -> Optional[str]:
+
"""Extract image alt text from <img> tag."""
+
img_tag = soup.find('img')
+
if img_tag and img_tag.get('alt'):
+
return img_tag['alt']
+
return None
+
+
def _extract_highlights(self, soup: BeautifulSoup) -> List[str]:
+
"""Extract highlights list from H3 section."""
+
highlights = []
+
+
# Find "Highlights:" h3 tag
+
h3_tags = soup.find_all('h3')
+
for h3 in h3_tags:
+
if 'Highlights' in h3.get_text():
+
# Get the <ul> that follows this h3
+
ul = h3.find_next_sibling('ul')
+
if ul:
+
for li in ul.find_all('li'):
+
highlights.append(li.get_text(strip=True))
+
break
+
+
return highlights
+
+
def _extract_quote(self, soup: BeautifulSoup) -> Optional[Dict[str, str]]:
+
"""Extract quote from <blockquote> tag."""
+
blockquote = soup.find('blockquote')
+
if not blockquote:
+
return None
+
+
text = blockquote.get_text(strip=True)
+
+
# Try to split on " - " to separate quote from attribution
+
if ' - ' in text:
+
quote_text, attribution = text.rsplit(' - ', 1)
+
return {
+
'text': quote_text.strip(),
+
'attribution': attribution.strip()
+
}
+
+
# If no attribution found, entire text is the quote
+
# Try to infer attribution from context (often mentioned in highlights/perspectives)
+
return {
+
'text': text,
+
'attribution': self._infer_quote_attribution(soup, text)
+
}
+
+
def _infer_quote_attribution(self, soup: BeautifulSoup, quote_text: str) -> str:
+
"""
+
Try to infer quote attribution from context.
+
+
This is a fallback when quote doesn't have explicit attribution.
+
"""
+
# For now, check if any perspective mentions similar keywords
+
perspectives_section = soup.find('h3', string=re.compile(r'Perspectives'))
+
if perspectives_section:
+
ul = perspectives_section.find_next_sibling('ul')
+
if ul:
+
for li in ul.find_all('li'):
+
li_text = li.get_text()
+
# Extract actor name (before first colon)
+
if ':' in li_text:
+
actor = li_text.split(':', 1)[0].strip()
+
return actor
+
+
return "Unknown"
+
+
def _extract_perspectives(self, soup: BeautifulSoup) -> List[Dict]:
+
"""Extract perspectives from H3 section."""
+
perspectives = []
+
+
# Find "Perspectives:" h3 tag
+
h3_tags = soup.find_all('h3')
+
for h3 in h3_tags:
+
if 'Perspectives' in h3.get_text():
+
# Get the <ul> that follows this h3
+
ul = h3.find_next_sibling('ul')
+
if ul:
+
for li in ul.find_all('li'):
+
perspective = self._parse_perspective_li(li)
+
if perspective:
+
perspectives.append(perspective)
+
break
+
+
return perspectives
+
+
def _parse_perspective_li(self, li) -> Optional[Dict]:
+
"""
+
Parse a single perspective <li> element.
+
+
Format: "Actor: Description. (Source)"
+
"""
+
# Get full text
+
full_text = li.get_text()
+
+
# Extract actor (before first colon)
+
if ':' not in full_text:
+
return None
+
+
actor, rest = full_text.split(':', 1)
+
actor = actor.strip()
+
+
# Find the <a> tag for source URL
+
a_tag = li.find('a')
+
source_url = a_tag['href'] if a_tag and a_tag.get('href') else ""
+
+
# Extract description (between colon and source link)
+
# Remove the source citation part in parentheses
+
description = rest
+
+
# Remove source citation like "(The Straits Times)" from description
+
if a_tag:
+
# Remove the link text and surrounding parentheses
+
link_text = a_tag.get_text()
+
description = description.replace(f"({link_text})", "").strip()
+
+
# Clean up trailing period
+
description = description.strip('. ')
+
+
return {
+
'actor': actor,
+
'description': description,
+
'source_url': source_url
+
}
+
+
def _extract_sources(self, soup: BeautifulSoup) -> List[Dict]:
+
"""Extract sources list from H3 section."""
+
sources = []
+
+
# Find "Sources:" h3 tag
+
h3_tags = soup.find_all('h3')
+
for h3 in h3_tags:
+
if 'Sources' in h3.get_text():
+
# Get the <ul> that follows this h3
+
ul = h3.find_next_sibling('ul')
+
if ul:
+
for li in ul.find_all('li'):
+
source = self._parse_source_li(li)
+
if source:
+
sources.append(source)
+
break
+
+
return sources
+
+
def _parse_source_li(self, li) -> Optional[Dict]:
+
"""
+
Parse a single source <li> element.
+
+
Format: "<a href='...'>Title</a> - domain.com"
+
"""
+
a_tag = li.find('a')
+
if not a_tag or not a_tag.get('href'):
+
return None
+
+
title = a_tag.get_text(strip=True)
+
url = a_tag['href']
+
+
# Extract domain from URL
+
parsed_url = urlparse(url)
+
domain = parsed_url.netloc
+
+
# Remove "www." prefix if present
+
if domain.startswith('www.'):
+
domain = domain[4:]
+
+
return {
+
'title': title,
+
'url': url,
+
'domain': domain
+
}
+243
aggregators/kagi-news/src/main.py
···
+
"""
+
Main Orchestration Script for Kagi News Aggregator.
+
+
Coordinates all components to:
+
1. Fetch RSS feeds
+
2. Parse HTML content
+
3. Format as rich text
+
4. Deduplicate stories
+
5. Post to Coves communities
+
6. Track state
+
+
Designed to run via CRON (single execution, then exit).
+
"""
+
import os
+
import sys
+
import logging
+
from pathlib import Path
+
from datetime import datetime
+
from typing import Optional
+
+
from src.config import ConfigLoader
+
from src.rss_fetcher import RSSFetcher
+
from src.html_parser import KagiHTMLParser
+
from src.richtext_formatter import RichTextFormatter
+
from src.state_manager import StateManager
+
from src.coves_client import CovesClient
+
+
# Setup logging
+
logging.basicConfig(
+
level=logging.INFO,
+
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+
)
+
logger = logging.getLogger(__name__)
+
+
+
class Aggregator:
+
"""
+
Main aggregator orchestration.
+
+
Coordinates all components to fetch, parse, format, and post stories.
+
"""
+
+
def __init__(
+
self,
+
config_path: Path,
+
state_file: Path,
+
coves_client: Optional[CovesClient] = None
+
):
+
"""
+
Initialize aggregator.
+
+
Args:
+
config_path: Path to config.yaml
+
state_file: Path to state.json
+
coves_client: Optional CovesClient (for testing)
+
"""
+
# Load configuration
+
logger.info("Loading configuration...")
+
config_loader = ConfigLoader(config_path)
+
self.config = config_loader.load()
+
+
# Initialize components
+
logger.info("Initializing components...")
+
self.rss_fetcher = RSSFetcher()
+
self.html_parser = KagiHTMLParser()
+
self.richtext_formatter = RichTextFormatter()
+
self.state_manager = StateManager(state_file)
+
self.state_file = state_file
+
+
# Initialize Coves client (or use provided one for testing)
+
if coves_client:
+
self.coves_client = coves_client
+
else:
+
# Get credentials from environment
+
aggregator_handle = os.getenv('AGGREGATOR_HANDLE')
+
aggregator_password = os.getenv('AGGREGATOR_PASSWORD')
+
pds_url = os.getenv('PDS_URL') # Optional: separate PDS for auth
+
+
if not aggregator_handle or not aggregator_password:
+
raise ValueError(
+
"Missing AGGREGATOR_HANDLE or AGGREGATOR_PASSWORD environment variables"
+
)
+
+
self.coves_client = CovesClient(
+
api_url=self.config.coves_api_url,
+
handle=aggregator_handle,
+
password=aggregator_password,
+
pds_url=pds_url # Auth through PDS if specified
+
)
+
+
def run(self):
+
"""
+
Run aggregator: fetch, parse, post, and update state.
+
+
This is the main entry point for CRON execution.
+
"""
+
logger.info("=" * 60)
+
logger.info("Starting Kagi News Aggregator")
+
logger.info("=" * 60)
+
+
# Get enabled feeds only
+
enabled_feeds = [f for f in self.config.feeds if f.enabled]
+
logger.info(f"Processing {len(enabled_feeds)} enabled feeds")
+
+
# Authenticate once at the start
+
try:
+
self.coves_client.authenticate()
+
except Exception as e:
+
logger.error(f"Failed to authenticate: {e}")
+
logger.error("Cannot continue without authentication")
+
return
+
+
# Process each feed
+
for feed_config in enabled_feeds:
+
try:
+
self._process_feed(feed_config)
+
except Exception as e:
+
# Log error but continue with other feeds
+
logger.error(f"Error processing feed '{feed_config.name}': {e}", exc_info=True)
+
continue
+
+
logger.info("=" * 60)
+
logger.info("Aggregator run completed")
+
logger.info("=" * 60)
+
+
def _process_feed(self, feed_config):
+
"""
+
Process a single RSS feed.
+
+
Args:
+
feed_config: FeedConfig object
+
"""
+
logger.info(f"Processing feed: {feed_config.name} -> {feed_config.community_handle}")
+
+
# Fetch RSS feed
+
try:
+
feed = self.rss_fetcher.fetch_feed(feed_config.url)
+
except Exception as e:
+
logger.error(f"Failed to fetch feed '{feed_config.name}': {e}")
+
raise
+
+
# Check for feed errors
+
if feed.bozo:
+
logger.warning(f"Feed '{feed_config.name}' has parsing issues (bozo flag set)")
+
+
# Process entries
+
new_posts = 0
+
skipped_posts = 0
+
+
for entry in feed.entries:
+
try:
+
# Check if already posted
+
guid = entry.guid if hasattr(entry, 'guid') else entry.link
+
if self.state_manager.is_posted(feed_config.url, guid):
+
skipped_posts += 1
+
logger.debug(f"Skipping already-posted story: {guid}")
+
continue
+
+
# Parse story
+
story = self.html_parser.parse_to_story(
+
title=entry.title,
+
link=entry.link,
+
guid=guid,
+
pub_date=entry.published_parsed,
+
categories=[tag.term for tag in entry.tags] if hasattr(entry, 'tags') else [],
+
html_description=entry.description
+
)
+
+
# Format as rich text
+
rich_text = self.richtext_formatter.format_full(story)
+
+
# Create external embed
+
embed = self.coves_client.create_external_embed(
+
uri=story.link,
+
title=story.title,
+
description=story.summary[:200] if len(story.summary) > 200 else story.summary,
+
thumb=story.image_url
+
)
+
+
# Post to community
+
try:
+
post_uri = self.coves_client.create_post(
+
community_handle=feed_config.community_handle,
+
content=rich_text["content"],
+
facets=rich_text["facets"],
+
embed=embed
+
)
+
+
# Mark as posted (only if successful)
+
self.state_manager.mark_posted(feed_config.url, guid, post_uri)
+
new_posts += 1
+
logger.info(f"Posted: {story.title[:50]}... -> {post_uri}")
+
+
except Exception as e:
+
# Don't update state if posting failed
+
logger.error(f"Failed to post story '{story.title}': {e}")
+
continue
+
+
except Exception as e:
+
# Log error but continue with other entries
+
logger.error(f"Error processing entry: {e}", exc_info=True)
+
continue
+
+
# Update last run timestamp
+
self.state_manager.update_last_run(feed_config.url, datetime.now())
+
+
logger.info(
+
f"Feed '{feed_config.name}': {new_posts} new posts, {skipped_posts} duplicates"
+
)
+
+
+
def main():
+
"""
+
Main entry point for command-line execution.
+
+
Usage:
+
python -m src.main
+
"""
+
# Get paths from environment or use defaults
+
config_path = Path(os.getenv('CONFIG_PATH', 'config.yaml'))
+
state_file = Path(os.getenv('STATE_FILE', 'data/state.json'))
+
+
# Validate config file exists
+
if not config_path.exists():
+
logger.error(f"Configuration file not found: {config_path}")
+
logger.error("Please create config.yaml (see config.example.yaml)")
+
sys.exit(1)
+
+
# Create aggregator and run
+
try:
+
aggregator = Aggregator(
+
config_path=config_path,
+
state_file=state_file
+
)
+
aggregator.run()
+
sys.exit(0)
+
except Exception as e:
+
logger.error(f"Aggregator failed: {e}", exc_info=True)
+
sys.exit(1)
+
+
+
if __name__ == '__main__':
+
main()
+79
aggregators/kagi-news/src/models.py
···
+
"""
+
Data models for Kagi News RSS aggregator.
+
"""
+
from dataclasses import dataclass, field
+
from datetime import datetime
+
from typing import List, Optional
+
+
+
@dataclass
+
class Source:
+
"""A news source citation."""
+
title: str
+
url: str
+
domain: str
+
+
+
@dataclass
+
class Perspective:
+
"""A perspective from a particular actor/stakeholder."""
+
actor: str
+
description: str
+
source_url: str
+
+
+
@dataclass
+
class Quote:
+
"""A notable quote from the story."""
+
text: str
+
attribution: str
+
+
+
@dataclass
+
class KagiStory:
+
"""
+
Structured representation of a Kagi News story.
+
+
Parsed from RSS feed item with HTML description.
+
"""
+
# RSS metadata
+
title: str
+
link: str # Kagi story permalink
+
guid: str
+
pub_date: datetime
+
categories: List[str] = field(default_factory=list)
+
+
# Parsed from HTML description
+
summary: str = ""
+
highlights: List[str] = field(default_factory=list)
+
perspectives: List[Perspective] = field(default_factory=list)
+
quote: Optional[Quote] = None
+
sources: List[Source] = field(default_factory=list)
+
image_url: Optional[str] = None
+
image_alt: Optional[str] = None
+
+
def __post_init__(self):
+
"""Validate required fields."""
+
if not self.title:
+
raise ValueError("title is required")
+
if not self.link:
+
raise ValueError("link is required")
+
if not self.guid:
+
raise ValueError("guid is required")
+
+
+
@dataclass
+
class FeedConfig:
+
"""Configuration for a single RSS feed."""
+
name: str
+
url: str
+
community_handle: str
+
enabled: bool = True
+
+
+
@dataclass
+
class AggregatorConfig:
+
"""Full aggregator configuration."""
+
coves_api_url: str
+
feeds: List[FeedConfig]
+
log_level: str = "info"
+177
aggregators/kagi-news/src/richtext_formatter.py
···
+
"""
+
Rich Text Formatter for Coves posts.
+
+
Converts KagiStory objects to Coves rich text format with facets.
+
Handles UTF-8 byte position calculation for multi-byte characters.
+
"""
+
import logging
+
from typing import Dict, List, Tuple
+
from src.models import KagiStory, Perspective, Source
+
+
logger = logging.getLogger(__name__)
+
+
+
class RichTextFormatter:
+
"""
+
Formats KagiStory into Coves rich text with facets.
+
+
Applies:
+
- Bold facets for section headers and perspective actors
+
- Italic facets for quotes
+
- Link facets for all URLs
+
"""
+
+
def format_full(self, story: KagiStory) -> Dict:
+
"""
+
Format KagiStory into full rich text format.
+
+
Args:
+
story: KagiStory object to format
+
+
Returns:
+
Dictionary with 'content' (str) and 'facets' (list)
+
"""
+
builder = RichTextBuilder()
+
+
# Summary
+
builder.add_text(story.summary)
+
builder.add_text("\n\n")
+
+
# Highlights (if present)
+
if story.highlights:
+
builder.add_bold("Highlights:")
+
builder.add_text("\n")
+
for highlight in story.highlights:
+
builder.add_text(f"• {highlight}\n")
+
builder.add_text("\n")
+
+
# Perspectives (if present)
+
if story.perspectives:
+
builder.add_bold("Perspectives:")
+
builder.add_text("\n")
+
for perspective in story.perspectives:
+
# Bold the actor name
+
actor_with_colon = f"{perspective.actor}:"
+
builder.add_bold(actor_with_colon)
+
builder.add_text(f" {perspective.description} (")
+
+
# Add link to source
+
source_link_text = "Source"
+
builder.add_link(source_link_text, perspective.source_url)
+
builder.add_text(")\n")
+
builder.add_text("\n")
+
+
# Quote (if present)
+
if story.quote:
+
quote_text = f'"{story.quote.text}"'
+
builder.add_italic(quote_text)
+
builder.add_text(f" — {story.quote.attribution}\n\n")
+
+
# Sources (if present)
+
if story.sources:
+
builder.add_bold("Sources:")
+
builder.add_text("\n")
+
for source in story.sources:
+
builder.add_text("• ")
+
builder.add_link(source.title, source.url)
+
builder.add_text(f" - {source.domain}\n")
+
builder.add_text("\n")
+
+
# Kagi News attribution
+
builder.add_text("---\n📰 Story aggregated by ")
+
builder.add_link("Kagi News", story.link)
+
+
return builder.build()
+
+
+
class RichTextBuilder:
+
"""
+
Helper class to build rich text content with facets.
+
+
Handles UTF-8 byte position tracking automatically.
+
"""
+
+
def __init__(self):
+
self.content_parts = []
+
self.facets = []
+
+
def add_text(self, text: str):
+
"""Add plain text without any facets."""
+
self.content_parts.append(text)
+
+
def add_bold(self, text: str):
+
"""Add text with bold facet."""
+
start_byte = self._get_current_byte_position()
+
self.content_parts.append(text)
+
end_byte = self._get_current_byte_position()
+
+
self.facets.append({
+
"index": {
+
"byteStart": start_byte,
+
"byteEnd": end_byte
+
},
+
"features": [
+
{"$type": "social.coves.richtext.facet#bold"}
+
]
+
})
+
+
def add_italic(self, text: str):
+
"""Add text with italic facet."""
+
start_byte = self._get_current_byte_position()
+
self.content_parts.append(text)
+
end_byte = self._get_current_byte_position()
+
+
self.facets.append({
+
"index": {
+
"byteStart": start_byte,
+
"byteEnd": end_byte
+
},
+
"features": [
+
{"$type": "social.coves.richtext.facet#italic"}
+
]
+
})
+
+
def add_link(self, text: str, uri: str):
+
"""Add text with link facet."""
+
start_byte = self._get_current_byte_position()
+
self.content_parts.append(text)
+
end_byte = self._get_current_byte_position()
+
+
self.facets.append({
+
"index": {
+
"byteStart": start_byte,
+
"byteEnd": end_byte
+
},
+
"features": [
+
{
+
"$type": "social.coves.richtext.facet#link",
+
"uri": uri
+
}
+
]
+
})
+
+
def _get_current_byte_position(self) -> int:
+
"""
+
Get the current byte position in the content.
+
+
Uses UTF-8 encoding to handle multi-byte characters correctly.
+
"""
+
current_content = ''.join(self.content_parts)
+
return len(current_content.encode('utf-8'))
+
+
def build(self) -> Dict:
+
"""
+
Build the final rich text object.
+
+
Returns:
+
Dictionary with 'content' and 'facets'
+
"""
+
content = ''.join(self.content_parts)
+
+
# Sort facets by start position for consistency
+
sorted_facets = sorted(self.facets, key=lambda f: f['index']['byteStart'])
+
+
return {
+
"content": content,
+
"facets": sorted_facets
+
}
+71
aggregators/kagi-news/src/rss_fetcher.py
···
+
"""
+
RSS feed fetcher with retry logic and error handling.
+
"""
+
import time
+
import logging
+
import requests
+
import feedparser
+
from typing import Optional
+
+
logger = logging.getLogger(__name__)
+
+
+
class RSSFetcher:
+
"""Fetches RSS feeds with retry logic."""
+
+
def __init__(self, timeout: int = 30, max_retries: int = 3):
+
"""
+
Initialize RSS fetcher.
+
+
Args:
+
timeout: Request timeout in seconds
+
max_retries: Maximum number of retry attempts
+
"""
+
self.timeout = timeout
+
self.max_retries = max_retries
+
+
def fetch_feed(self, url: str) -> feedparser.FeedParserDict:
+
"""
+
Fetch and parse an RSS feed.
+
+
Args:
+
url: RSS feed URL
+
+
Returns:
+
Parsed feed object
+
+
Raises:
+
ValueError: If URL is empty
+
requests.RequestException: If all retry attempts fail
+
"""
+
if not url:
+
raise ValueError("URL cannot be empty")
+
+
last_error = None
+
+
for attempt in range(self.max_retries):
+
try:
+
logger.info(f"Fetching feed from {url} (attempt {attempt + 1}/{self.max_retries})")
+
+
response = requests.get(url, timeout=self.timeout)
+
response.raise_for_status()
+
+
# Parse with feedparser
+
feed = feedparser.parse(response.content)
+
+
logger.info(f"Successfully fetched feed: {feed.feed.get('title', 'Unknown')}")
+
return feed
+
+
except requests.RequestException as e:
+
last_error = e
+
logger.warning(f"Fetch attempt {attempt + 1} failed: {e}")
+
+
if attempt < self.max_retries - 1:
+
# Exponential backoff
+
sleep_time = 2 ** attempt
+
logger.info(f"Retrying in {sleep_time} seconds...")
+
time.sleep(sleep_time)
+
+
# All retries exhausted
+
logger.error(f"Failed to fetch feed after {self.max_retries} attempts")
+
raise last_error
+213
aggregators/kagi-news/src/state_manager.py
···
+
"""
+
State Manager for tracking posted stories.
+
+
Handles deduplication by tracking which stories have already been posted.
+
Uses JSON file for persistence.
+
"""
+
import json
+
import logging
+
from pathlib import Path
+
from datetime import datetime, timedelta
+
from typing import Optional, Dict, List
+
+
logger = logging.getLogger(__name__)
+
+
+
class StateManager:
+
"""
+
Manages aggregator state for deduplication.
+
+
Tracks:
+
- Posted GUIDs per feed (with timestamps)
+
- Last successful run timestamp per feed
+
- Automatic cleanup of old entries
+
"""
+
+
def __init__(self, state_file: Path, max_guids_per_feed: int = 100, max_age_days: int = 30):
+
"""
+
Initialize state manager.
+
+
Args:
+
state_file: Path to JSON state file
+
max_guids_per_feed: Maximum GUIDs to keep per feed (default: 100)
+
max_age_days: Maximum age in days for GUIDs (default: 30)
+
"""
+
self.state_file = Path(state_file)
+
self.max_guids_per_feed = max_guids_per_feed
+
self.max_age_days = max_age_days
+
self.state = self._load_state()
+
+
def _load_state(self) -> Dict:
+
"""Load state from file, or create new state if file doesn't exist."""
+
if not self.state_file.exists():
+
logger.info(f"Creating new state file at {self.state_file}")
+
state = {'feeds': {}}
+
self._save_state(state)
+
return state
+
+
try:
+
with open(self.state_file, 'r') as f:
+
state = json.load(f)
+
logger.info(f"Loaded state from {self.state_file}")
+
return state
+
except json.JSONDecodeError as e:
+
logger.error(f"Failed to load state file: {e}. Creating new state.")
+
state = {'feeds': {}}
+
self._save_state(state)
+
return state
+
+
def _save_state(self, state: Optional[Dict] = None):
+
"""Save state to file."""
+
if state is None:
+
state = self.state
+
+
# Ensure parent directory exists
+
self.state_file.parent.mkdir(parents=True, exist_ok=True)
+
+
with open(self.state_file, 'w') as f:
+
json.dump(state, f, indent=2)
+
+
def _ensure_feed_exists(self, feed_url: str):
+
"""Ensure feed entry exists in state."""
+
if feed_url not in self.state['feeds']:
+
self.state['feeds'][feed_url] = {
+
'posted_guids': [],
+
'last_successful_run': None
+
}
+
+
def is_posted(self, feed_url: str, guid: str) -> bool:
+
"""
+
Check if a story has already been posted.
+
+
Args:
+
feed_url: RSS feed URL
+
guid: Story GUID
+
+
Returns:
+
True if already posted, False otherwise
+
"""
+
self._ensure_feed_exists(feed_url)
+
+
posted_guids = self.state['feeds'][feed_url]['posted_guids']
+
return any(entry['guid'] == guid for entry in posted_guids)
+
+
def mark_posted(self, feed_url: str, guid: str, post_uri: str):
+
"""
+
Mark a story as posted.
+
+
Args:
+
feed_url: RSS feed URL
+
guid: Story GUID
+
post_uri: AT Proto URI of created post
+
"""
+
self._ensure_feed_exists(feed_url)
+
+
# Add to posted list
+
entry = {
+
'guid': guid,
+
'post_uri': post_uri,
+
'posted_at': datetime.now().isoformat()
+
}
+
self.state['feeds'][feed_url]['posted_guids'].append(entry)
+
+
# Auto-cleanup to keep state file manageable
+
self.cleanup_old_entries(feed_url)
+
+
# Save state
+
self._save_state()
+
+
logger.info(f"Marked as posted: {guid} -> {post_uri}")
+
+
def get_last_run(self, feed_url: str) -> Optional[datetime]:
+
"""
+
Get last successful run timestamp for a feed.
+
+
Args:
+
feed_url: RSS feed URL
+
+
Returns:
+
Datetime of last run, or None if never run
+
"""
+
self._ensure_feed_exists(feed_url)
+
+
timestamp_str = self.state['feeds'][feed_url]['last_successful_run']
+
if timestamp_str is None:
+
return None
+
+
return datetime.fromisoformat(timestamp_str)
+
+
def update_last_run(self, feed_url: str, timestamp: datetime):
+
"""
+
Update last successful run timestamp.
+
+
Args:
+
feed_url: RSS feed URL
+
timestamp: Timestamp of successful run
+
"""
+
self._ensure_feed_exists(feed_url)
+
+
self.state['feeds'][feed_url]['last_successful_run'] = timestamp.isoformat()
+
self._save_state()
+
+
logger.info(f"Updated last run for {feed_url}: {timestamp}")
+
+
def cleanup_old_entries(self, feed_url: str):
+
"""
+
Remove old entries from state.
+
+
Removes entries that are:
+
- Older than max_age_days
+
- Beyond max_guids_per_feed limit (keeps most recent)
+
+
Args:
+
feed_url: RSS feed URL
+
"""
+
self._ensure_feed_exists(feed_url)
+
+
posted_guids = self.state['feeds'][feed_url]['posted_guids']
+
+
# Filter out entries older than max_age_days
+
cutoff_date = datetime.now() - timedelta(days=self.max_age_days)
+
filtered = [
+
entry for entry in posted_guids
+
if datetime.fromisoformat(entry['posted_at']) > cutoff_date
+
]
+
+
# Keep only most recent max_guids_per_feed entries
+
# Sort by posted_at (most recent first)
+
filtered.sort(key=lambda x: x['posted_at'], reverse=True)
+
filtered = filtered[:self.max_guids_per_feed]
+
+
# Update state
+
old_count = len(posted_guids)
+
new_count = len(filtered)
+
self.state['feeds'][feed_url]['posted_guids'] = filtered
+
+
if old_count != new_count:
+
logger.info(f"Cleaned up {old_count - new_count} old entries for {feed_url}")
+
+
def get_posted_count(self, feed_url: str) -> int:
+
"""
+
Get count of posted items for a feed.
+
+
Args:
+
feed_url: RSS feed URL
+
+
Returns:
+
Number of posted items
+
"""
+
self._ensure_feed_exists(feed_url)
+
return len(self.state['feeds'][feed_url]['posted_guids'])
+
+
def get_all_posted_guids(self, feed_url: str) -> List[str]:
+
"""
+
Get all posted GUIDs for a feed.
+
+
Args:
+
feed_url: RSS feed URL
+
+
Returns:
+
List of GUIDs
+
"""
+
self._ensure_feed_exists(feed_url)
+
return [entry['guid'] for entry in self.state['feeds'][feed_url]['posted_guids']]