A community based topic aggregation platform built on atproto
1"""
2Main Orchestration Script for Kagi News Aggregator.
3
4Coordinates all components to:
51. Fetch RSS feeds
62. Parse HTML content
73. Format as rich text
84. Deduplicate stories
95. Post to Coves communities
106. Track state
11
12Designed to run via CRON (single execution, then exit).
13"""
14import os
15import sys
16import logging
17from pathlib import Path
18from datetime import datetime
19from typing import Optional
20
21from src.config import ConfigLoader
22from src.rss_fetcher import RSSFetcher
23from src.html_parser import KagiHTMLParser
24from src.richtext_formatter import RichTextFormatter
25from src.state_manager import StateManager
26from src.coves_client import CovesClient
27
28# Setup logging
29logging.basicConfig(
30 level=logging.INFO,
31 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
32)
33logger = logging.getLogger(__name__)
34
35
36class Aggregator:
37 """
38 Main aggregator orchestration.
39
40 Coordinates all components to fetch, parse, format, and post stories.
41 """
42
43 def __init__(
44 self,
45 config_path: Path,
46 state_file: Path,
47 coves_client: Optional[CovesClient] = None
48 ):
49 """
50 Initialize aggregator.
51
52 Args:
53 config_path: Path to config.yaml
54 state_file: Path to state.json
55 coves_client: Optional CovesClient (for testing)
56 """
57 # Load configuration
58 logger.info("Loading configuration...")
59 config_loader = ConfigLoader(config_path)
60 self.config = config_loader.load()
61
62 # Initialize components
63 logger.info("Initializing components...")
64 self.rss_fetcher = RSSFetcher()
65 self.html_parser = KagiHTMLParser()
66 self.richtext_formatter = RichTextFormatter()
67 self.state_manager = StateManager(state_file)
68 self.state_file = state_file
69
70 # Initialize Coves client (or use provided one for testing)
71 if coves_client:
72 self.coves_client = coves_client
73 else:
74 # Get credentials from environment
75 aggregator_handle = os.getenv('AGGREGATOR_HANDLE')
76 aggregator_password = os.getenv('AGGREGATOR_PASSWORD')
77 pds_url = os.getenv('PDS_URL') # Optional: separate PDS for auth
78
79 if not aggregator_handle or not aggregator_password:
80 raise ValueError(
81 "Missing AGGREGATOR_HANDLE or AGGREGATOR_PASSWORD environment variables"
82 )
83
84 self.coves_client = CovesClient(
85 api_url=self.config.coves_api_url,
86 handle=aggregator_handle,
87 password=aggregator_password,
88 pds_url=pds_url # Auth through PDS if specified
89 )
90
91 def run(self):
92 """
93 Run aggregator: fetch, parse, post, and update state.
94
95 This is the main entry point for CRON execution.
96 """
97 logger.info("=" * 60)
98 logger.info("Starting Kagi News Aggregator")
99 logger.info("=" * 60)
100
101 # Get enabled feeds only
102 enabled_feeds = [f for f in self.config.feeds if f.enabled]
103 logger.info(f"Processing {len(enabled_feeds)} enabled feeds")
104
105 # Authenticate once at the start
106 try:
107 self.coves_client.authenticate()
108 except Exception as e:
109 logger.error(f"Failed to authenticate: {e}")
110 logger.error("Cannot continue without authentication")
111 return
112
113 # Process each feed
114 for feed_config in enabled_feeds:
115 try:
116 self._process_feed(feed_config)
117 except Exception as e:
118 # Log error but continue with other feeds
119 logger.error(f"Error processing feed '{feed_config.name}': {e}", exc_info=True)
120 continue
121
122 logger.info("=" * 60)
123 logger.info("Aggregator run completed")
124 logger.info("=" * 60)
125
126 def _process_feed(self, feed_config):
127 """
128 Process a single RSS feed.
129
130 Args:
131 feed_config: FeedConfig object
132 """
133 logger.info(f"Processing feed: {feed_config.name} -> {feed_config.community_handle}")
134
135 # Fetch RSS feed
136 try:
137 feed = self.rss_fetcher.fetch_feed(feed_config.url)
138 except Exception as e:
139 logger.error(f"Failed to fetch feed '{feed_config.name}': {e}")
140 raise
141
142 # Check for feed errors
143 if feed.bozo:
144 logger.warning(f"Feed '{feed_config.name}' has parsing issues (bozo flag set)")
145
146 # Process entries
147 new_posts = 0
148 skipped_posts = 0
149
150 for entry in feed.entries:
151 try:
152 # Check if already posted
153 guid = entry.guid if hasattr(entry, 'guid') else entry.link
154 if self.state_manager.is_posted(feed_config.url, guid):
155 skipped_posts += 1
156 logger.debug(f"Skipping already-posted story: {guid}")
157 continue
158
159 # Parse story
160 story = self.html_parser.parse_to_story(
161 title=entry.title,
162 link=entry.link,
163 guid=guid,
164 pub_date=entry.published_parsed,
165 categories=[tag.term for tag in entry.tags] if hasattr(entry, 'tags') else [],
166 html_description=entry.description
167 )
168
169 # Format as rich text
170 rich_text = self.richtext_formatter.format_full(story)
171
172 # Create external embed
173 embed = self.coves_client.create_external_embed(
174 uri=story.link,
175 title=story.title,
176 description=story.summary[:200] if len(story.summary) > 200 else story.summary,
177 thumb=story.image_url
178 )
179
180 # Post to community
181 try:
182 post_uri = self.coves_client.create_post(
183 community_handle=feed_config.community_handle,
184 title=story.title,
185 content=rich_text["content"],
186 facets=rich_text["facets"],
187 embed=embed
188 )
189
190 # Mark as posted (only if successful)
191 self.state_manager.mark_posted(feed_config.url, guid, post_uri)
192 new_posts += 1
193 logger.info(f"Posted: {story.title[:50]}... -> {post_uri}")
194
195 except Exception as e:
196 # Don't update state if posting failed
197 logger.error(f"Failed to post story '{story.title}': {e}")
198 continue
199
200 except Exception as e:
201 # Log error but continue with other entries
202 logger.error(f"Error processing entry: {e}", exc_info=True)
203 continue
204
205 # Update last run timestamp
206 self.state_manager.update_last_run(feed_config.url, datetime.now())
207
208 logger.info(
209 f"Feed '{feed_config.name}': {new_posts} new posts, {skipped_posts} duplicates"
210 )
211
212
213def main():
214 """
215 Main entry point for command-line execution.
216
217 Usage:
218 python -m src.main
219 """
220 # Get paths from environment or use defaults
221 config_path = Path(os.getenv('CONFIG_PATH', 'config.yaml'))
222 state_file = Path(os.getenv('STATE_FILE', 'data/state.json'))
223
224 # Validate config file exists
225 if not config_path.exists():
226 logger.error(f"Configuration file not found: {config_path}")
227 logger.error("Please create config.yaml (see config.example.yaml)")
228 sys.exit(1)
229
230 # Create aggregator and run
231 try:
232 aggregator = Aggregator(
233 config_path=config_path,
234 state_file=state_file
235 )
236 aggregator.run()
237 sys.exit(0)
238 except Exception as e:
239 logger.error(f"Aggregator failed: {e}", exc_info=True)
240 sys.exit(1)
241
242
243if __name__ == '__main__':
244 main()