A community based topic aggregation platform built on atproto
1"""
2Main Orchestration Script for Kagi News Aggregator.
3
4Coordinates all components to:
51. Fetch RSS feeds
62. Parse HTML content
73. Format as rich text
84. Deduplicate stories
95. Post to Coves communities
106. Track state
11
12Designed to run via CRON (single execution, then exit).
13"""
14import os
15import sys
16import logging
17from pathlib import Path
18from datetime import datetime
19from typing import Optional
20
21from src.config import ConfigLoader
22from src.rss_fetcher import RSSFetcher
23from src.html_parser import KagiHTMLParser
24from src.richtext_formatter import RichTextFormatter
25from src.state_manager import StateManager
26from src.coves_client import CovesClient
27
28# Setup logging
29logging.basicConfig(
30 level=logging.INFO,
31 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
32)
33logger = logging.getLogger(__name__)
34
35
36class Aggregator:
37 """
38 Main aggregator orchestration.
39
40 Coordinates all components to fetch, parse, format, and post stories.
41 """
42
43 def __init__(
44 self,
45 config_path: Path,
46 state_file: Path,
47 coves_client: Optional[CovesClient] = None
48 ):
49 """
50 Initialize aggregator.
51
52 Args:
53 config_path: Path to config.yaml
54 state_file: Path to state.json
55 coves_client: Optional CovesClient (for testing)
56 """
57 # Load configuration
58 logger.info("Loading configuration...")
59 config_loader = ConfigLoader(config_path)
60 self.config = config_loader.load()
61
62 # Initialize components
63 logger.info("Initializing components...")
64 self.rss_fetcher = RSSFetcher()
65 self.html_parser = KagiHTMLParser()
66 self.richtext_formatter = RichTextFormatter()
67 self.state_manager = StateManager(state_file)
68 self.state_file = state_file
69
70 # Initialize Coves client (or use provided one for testing)
71 if coves_client:
72 self.coves_client = coves_client
73 else:
74 # Get credentials from environment
75 aggregator_handle = os.getenv('AGGREGATOR_HANDLE')
76 aggregator_password = os.getenv('AGGREGATOR_PASSWORD')
77 pds_url = os.getenv('PDS_URL') # Optional: separate PDS for auth
78
79 if not aggregator_handle or not aggregator_password:
80 raise ValueError(
81 "Missing AGGREGATOR_HANDLE or AGGREGATOR_PASSWORD environment variables"
82 )
83
84 self.coves_client = CovesClient(
85 api_url=self.config.coves_api_url,
86 handle=aggregator_handle,
87 password=aggregator_password,
88 pds_url=pds_url # Auth through PDS if specified
89 )
90
91 def run(self):
92 """
93 Run aggregator: fetch, parse, post, and update state.
94
95 This is the main entry point for CRON execution.
96 """
97 logger.info("=" * 60)
98 logger.info("Starting Kagi News Aggregator")
99 logger.info("=" * 60)
100
101 # Get enabled feeds only
102 enabled_feeds = [f for f in self.config.feeds if f.enabled]
103 logger.info(f"Processing {len(enabled_feeds)} enabled feeds")
104
105 # Authenticate once at the start
106 try:
107 self.coves_client.authenticate()
108 except Exception as e:
109 logger.error(f"Failed to authenticate: {e}")
110 logger.error("Cannot continue without authentication")
111 return
112
113 # Process each feed
114 for feed_config in enabled_feeds:
115 try:
116 self._process_feed(feed_config)
117 except Exception as e:
118 # Log error but continue with other feeds
119 logger.error(f"Error processing feed '{feed_config.name}': {e}", exc_info=True)
120 continue
121
122 logger.info("=" * 60)
123 logger.info("Aggregator run completed")
124 logger.info("=" * 60)
125
126 def _process_feed(self, feed_config):
127 """
128 Process a single RSS feed.
129
130 Args:
131 feed_config: FeedConfig object
132 """
133 logger.info(f"Processing feed: {feed_config.name} -> {feed_config.community_handle}")
134
135 # Fetch RSS feed
136 try:
137 feed = self.rss_fetcher.fetch_feed(feed_config.url)
138 except Exception as e:
139 logger.error(f"Failed to fetch feed '{feed_config.name}': {e}")
140 raise
141
142 # Check for feed errors
143 if feed.bozo:
144 logger.warning(f"Feed '{feed_config.name}' has parsing issues (bozo flag set)")
145
146 # Process entries
147 new_posts = 0
148 skipped_posts = 0
149
150 for entry in feed.entries:
151 try:
152 # Check if already posted
153 guid = entry.guid if hasattr(entry, 'guid') else entry.link
154 if self.state_manager.is_posted(feed_config.url, guid):
155 skipped_posts += 1
156 logger.debug(f"Skipping already-posted story: {guid}")
157 continue
158
159 # Parse story
160 story = self.html_parser.parse_to_story(
161 title=entry.title,
162 link=entry.link,
163 guid=guid,
164 pub_date=entry.published_parsed,
165 categories=[tag.term for tag in entry.tags] if hasattr(entry, 'tags') else [],
166 html_description=entry.description
167 )
168
169 # Format as rich text
170 rich_text = self.richtext_formatter.format_full(story)
171
172 # Create external embed
173 embed = self.coves_client.create_external_embed(
174 uri=story.link,
175 title=story.title,
176 description=story.summary[:200] if len(story.summary) > 200 else story.summary
177 )
178
179 # Post to community
180 # Pass thumbnail URL from RSS feed at top level for trusted aggregator upload
181 try:
182 post_uri = self.coves_client.create_post(
183 community_handle=feed_config.community_handle,
184 title=story.title,
185 content=rich_text["content"],
186 facets=rich_text["facets"],
187 embed=embed,
188 thumbnail_url=story.image_url # From RSS feed - server will validate and upload
189 )
190
191 # Mark as posted (only if successful)
192 self.state_manager.mark_posted(feed_config.url, guid, post_uri)
193 new_posts += 1
194 logger.info(f"Posted: {story.title[:50]}... -> {post_uri}")
195
196 except Exception as e:
197 # Don't update state if posting failed
198 logger.error(f"Failed to post story '{story.title}': {e}")
199 continue
200
201 except Exception as e:
202 # Log error but continue with other entries
203 logger.error(f"Error processing entry: {e}", exc_info=True)
204 continue
205
206 # Update last run timestamp
207 self.state_manager.update_last_run(feed_config.url, datetime.now())
208
209 logger.info(
210 f"Feed '{feed_config.name}': {new_posts} new posts, {skipped_posts} duplicates"
211 )
212
213
214def main():
215 """
216 Main entry point for command-line execution.
217
218 Usage:
219 python -m src.main
220 """
221 # Get paths from environment or use defaults
222 config_path = Path(os.getenv('CONFIG_PATH', 'config.yaml'))
223 state_file = Path(os.getenv('STATE_FILE', 'data/state.json'))
224
225 # Validate config file exists
226 if not config_path.exists():
227 logger.error(f"Configuration file not found: {config_path}")
228 logger.error("Please create config.yaml (see config.example.yaml)")
229 sys.exit(1)
230
231 # Create aggregator and run
232 try:
233 aggregator = Aggregator(
234 config_path=config_path,
235 state_file=state_file
236 )
237 aggregator.run()
238 sys.exit(0)
239 except Exception as e:
240 logger.error(f"Aggregator failed: {e}", exc_info=True)
241 sys.exit(1)
242
243
244if __name__ == '__main__':
245 main()