A community based topic aggregation platform built on atproto
1""" 2Main Orchestration Script for Kagi News Aggregator. 3 4Coordinates all components to: 51. Fetch RSS feeds 62. Parse HTML content 73. Format as rich text 84. Deduplicate stories 95. Post to Coves communities 106. Track state 11 12Designed to run via CRON (single execution, then exit). 13""" 14import os 15import sys 16import logging 17from pathlib import Path 18from datetime import datetime 19from typing import Optional 20 21from src.config import ConfigLoader 22from src.rss_fetcher import RSSFetcher 23from src.html_parser import KagiHTMLParser 24from src.richtext_formatter import RichTextFormatter 25from src.state_manager import StateManager 26from src.coves_client import CovesClient 27 28# Setup logging 29logging.basicConfig( 30 level=logging.INFO, 31 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' 32) 33logger = logging.getLogger(__name__) 34 35 36class Aggregator: 37 """ 38 Main aggregator orchestration. 39 40 Coordinates all components to fetch, parse, format, and post stories. 41 """ 42 43 def __init__( 44 self, 45 config_path: Path, 46 state_file: Path, 47 coves_client: Optional[CovesClient] = None 48 ): 49 """ 50 Initialize aggregator. 51 52 Args: 53 config_path: Path to config.yaml 54 state_file: Path to state.json 55 coves_client: Optional CovesClient (for testing) 56 """ 57 # Load configuration 58 logger.info("Loading configuration...") 59 config_loader = ConfigLoader(config_path) 60 self.config = config_loader.load() 61 62 # Initialize components 63 logger.info("Initializing components...") 64 self.rss_fetcher = RSSFetcher() 65 self.html_parser = KagiHTMLParser() 66 self.richtext_formatter = RichTextFormatter() 67 self.state_manager = StateManager(state_file) 68 self.state_file = state_file 69 70 # Initialize Coves client (or use provided one for testing) 71 if coves_client: 72 self.coves_client = coves_client 73 else: 74 # Get credentials from environment 75 aggregator_handle = os.getenv('AGGREGATOR_HANDLE') 76 aggregator_password = os.getenv('AGGREGATOR_PASSWORD') 77 pds_url = os.getenv('PDS_URL') # Optional: separate PDS for auth 78 79 if not aggregator_handle or not aggregator_password: 80 raise ValueError( 81 "Missing AGGREGATOR_HANDLE or AGGREGATOR_PASSWORD environment variables" 82 ) 83 84 self.coves_client = CovesClient( 85 api_url=self.config.coves_api_url, 86 handle=aggregator_handle, 87 password=aggregator_password, 88 pds_url=pds_url # Auth through PDS if specified 89 ) 90 91 def run(self): 92 """ 93 Run aggregator: fetch, parse, post, and update state. 94 95 This is the main entry point for CRON execution. 96 """ 97 logger.info("=" * 60) 98 logger.info("Starting Kagi News Aggregator") 99 logger.info("=" * 60) 100 101 # Get enabled feeds only 102 enabled_feeds = [f for f in self.config.feeds if f.enabled] 103 logger.info(f"Processing {len(enabled_feeds)} enabled feeds") 104 105 # Authenticate once at the start 106 try: 107 self.coves_client.authenticate() 108 except Exception as e: 109 logger.error(f"Failed to authenticate: {e}") 110 logger.error("Cannot continue without authentication") 111 return 112 113 # Process each feed 114 for feed_config in enabled_feeds: 115 try: 116 self._process_feed(feed_config) 117 except Exception as e: 118 # Log error but continue with other feeds 119 logger.error(f"Error processing feed '{feed_config.name}': {e}", exc_info=True) 120 continue 121 122 logger.info("=" * 60) 123 logger.info("Aggregator run completed") 124 logger.info("=" * 60) 125 126 def _process_feed(self, feed_config): 127 """ 128 Process a single RSS feed. 129 130 Args: 131 feed_config: FeedConfig object 132 """ 133 logger.info(f"Processing feed: {feed_config.name} -> {feed_config.community_handle}") 134 135 # Fetch RSS feed 136 try: 137 feed = self.rss_fetcher.fetch_feed(feed_config.url) 138 except Exception as e: 139 logger.error(f"Failed to fetch feed '{feed_config.name}': {e}") 140 raise 141 142 # Check for feed errors 143 if feed.bozo: 144 logger.warning(f"Feed '{feed_config.name}' has parsing issues (bozo flag set)") 145 146 # Process entries 147 new_posts = 0 148 skipped_posts = 0 149 150 for entry in feed.entries: 151 try: 152 # Check if already posted 153 guid = entry.guid if hasattr(entry, 'guid') else entry.link 154 if self.state_manager.is_posted(feed_config.url, guid): 155 skipped_posts += 1 156 logger.debug(f"Skipping already-posted story: {guid}") 157 continue 158 159 # Parse story 160 story = self.html_parser.parse_to_story( 161 title=entry.title, 162 link=entry.link, 163 guid=guid, 164 pub_date=entry.published_parsed, 165 categories=[tag.term for tag in entry.tags] if hasattr(entry, 'tags') else [], 166 html_description=entry.description 167 ) 168 169 # Format as rich text 170 rich_text = self.richtext_formatter.format_full(story) 171 172 # Create external embed 173 embed = self.coves_client.create_external_embed( 174 uri=story.link, 175 title=story.title, 176 description=story.summary[:200] if len(story.summary) > 200 else story.summary, 177 thumb=story.image_url 178 ) 179 180 # Post to community 181 try: 182 post_uri = self.coves_client.create_post( 183 community_handle=feed_config.community_handle, 184 title=story.title, 185 content=rich_text["content"], 186 facets=rich_text["facets"], 187 embed=embed 188 ) 189 190 # Mark as posted (only if successful) 191 self.state_manager.mark_posted(feed_config.url, guid, post_uri) 192 new_posts += 1 193 logger.info(f"Posted: {story.title[:50]}... -> {post_uri}") 194 195 except Exception as e: 196 # Don't update state if posting failed 197 logger.error(f"Failed to post story '{story.title}': {e}") 198 continue 199 200 except Exception as e: 201 # Log error but continue with other entries 202 logger.error(f"Error processing entry: {e}", exc_info=True) 203 continue 204 205 # Update last run timestamp 206 self.state_manager.update_last_run(feed_config.url, datetime.now()) 207 208 logger.info( 209 f"Feed '{feed_config.name}': {new_posts} new posts, {skipped_posts} duplicates" 210 ) 211 212 213def main(): 214 """ 215 Main entry point for command-line execution. 216 217 Usage: 218 python -m src.main 219 """ 220 # Get paths from environment or use defaults 221 config_path = Path(os.getenv('CONFIG_PATH', 'config.yaml')) 222 state_file = Path(os.getenv('STATE_FILE', 'data/state.json')) 223 224 # Validate config file exists 225 if not config_path.exists(): 226 logger.error(f"Configuration file not found: {config_path}") 227 logger.error("Please create config.yaml (see config.example.yaml)") 228 sys.exit(1) 229 230 # Create aggregator and run 231 try: 232 aggregator = Aggregator( 233 config_path=config_path, 234 state_file=state_file 235 ) 236 aggregator.run() 237 sys.exit(0) 238 except Exception as e: 239 logger.error(f"Aggregator failed: {e}", exc_info=True) 240 sys.exit(1) 241 242 243if __name__ == '__main__': 244 main()