A community based topic aggregation platform built on atproto
at main 8.2 kB view raw
1""" 2Main Orchestration Script for Kagi News Aggregator. 3 4Coordinates all components to: 51. Fetch RSS feeds 62. Parse HTML content 73. Format as rich text 84. Deduplicate stories 95. Post to Coves communities 106. Track state 11 12Designed to run via CRON (single execution, then exit). 13""" 14import os 15import sys 16import logging 17from pathlib import Path 18from datetime import datetime 19from typing import Optional 20 21from src.config import ConfigLoader 22from src.rss_fetcher import RSSFetcher 23from src.html_parser import KagiHTMLParser 24from src.richtext_formatter import RichTextFormatter 25from src.state_manager import StateManager 26from src.coves_client import CovesClient 27 28# Setup logging 29logging.basicConfig( 30 level=logging.INFO, 31 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' 32) 33logger = logging.getLogger(__name__) 34 35 36class Aggregator: 37 """ 38 Main aggregator orchestration. 39 40 Coordinates all components to fetch, parse, format, and post stories. 41 """ 42 43 def __init__( 44 self, 45 config_path: Path, 46 state_file: Path, 47 coves_client: Optional[CovesClient] = None 48 ): 49 """ 50 Initialize aggregator. 51 52 Args: 53 config_path: Path to config.yaml 54 state_file: Path to state.json 55 coves_client: Optional CovesClient (for testing) 56 """ 57 # Load configuration 58 logger.info("Loading configuration...") 59 config_loader = ConfigLoader(config_path) 60 self.config = config_loader.load() 61 62 # Initialize components 63 logger.info("Initializing components...") 64 self.rss_fetcher = RSSFetcher() 65 self.html_parser = KagiHTMLParser() 66 self.richtext_formatter = RichTextFormatter() 67 self.state_manager = StateManager(state_file) 68 self.state_file = state_file 69 70 # Initialize Coves client (or use provided one for testing) 71 if coves_client: 72 self.coves_client = coves_client 73 else: 74 # Get credentials from environment 75 aggregator_handle = os.getenv('AGGREGATOR_HANDLE') 76 aggregator_password = os.getenv('AGGREGATOR_PASSWORD') 77 pds_url = os.getenv('PDS_URL') # Optional: separate PDS for auth 78 79 if not aggregator_handle or not aggregator_password: 80 raise ValueError( 81 "Missing AGGREGATOR_HANDLE or AGGREGATOR_PASSWORD environment variables" 82 ) 83 84 self.coves_client = CovesClient( 85 api_url=self.config.coves_api_url, 86 handle=aggregator_handle, 87 password=aggregator_password, 88 pds_url=pds_url # Auth through PDS if specified 89 ) 90 91 def run(self): 92 """ 93 Run aggregator: fetch, parse, post, and update state. 94 95 This is the main entry point for CRON execution. 96 """ 97 logger.info("=" * 60) 98 logger.info("Starting Kagi News Aggregator") 99 logger.info("=" * 60) 100 101 # Get enabled feeds only 102 enabled_feeds = [f for f in self.config.feeds if f.enabled] 103 logger.info(f"Processing {len(enabled_feeds)} enabled feeds") 104 105 # Authenticate once at the start 106 try: 107 self.coves_client.authenticate() 108 except Exception as e: 109 logger.error(f"Failed to authenticate: {e}") 110 logger.error("Cannot continue without authentication") 111 return 112 113 # Process each feed 114 for feed_config in enabled_feeds: 115 try: 116 self._process_feed(feed_config) 117 except Exception as e: 118 # Log error but continue with other feeds 119 logger.error(f"Error processing feed '{feed_config.name}': {e}", exc_info=True) 120 continue 121 122 logger.info("=" * 60) 123 logger.info("Aggregator run completed") 124 logger.info("=" * 60) 125 126 def _process_feed(self, feed_config): 127 """ 128 Process a single RSS feed. 129 130 Args: 131 feed_config: FeedConfig object 132 """ 133 logger.info(f"Processing feed: {feed_config.name} -> {feed_config.community_handle}") 134 135 # Fetch RSS feed 136 try: 137 feed = self.rss_fetcher.fetch_feed(feed_config.url) 138 except Exception as e: 139 logger.error(f"Failed to fetch feed '{feed_config.name}': {e}") 140 raise 141 142 # Check for feed errors 143 if feed.bozo: 144 logger.warning(f"Feed '{feed_config.name}' has parsing issues (bozo flag set)") 145 146 # Process entries 147 new_posts = 0 148 skipped_posts = 0 149 150 for entry in feed.entries: 151 try: 152 # Check if already posted 153 guid = entry.guid if hasattr(entry, 'guid') else entry.link 154 if self.state_manager.is_posted(feed_config.url, guid): 155 skipped_posts += 1 156 logger.debug(f"Skipping already-posted story: {guid}") 157 continue 158 159 # Parse story 160 story = self.html_parser.parse_to_story( 161 title=entry.title, 162 link=entry.link, 163 guid=guid, 164 pub_date=entry.published_parsed, 165 categories=[tag.term for tag in entry.tags] if hasattr(entry, 'tags') else [], 166 html_description=entry.description 167 ) 168 169 # Format as rich text 170 rich_text = self.richtext_formatter.format_full(story) 171 172 # Create external embed 173 embed = self.coves_client.create_external_embed( 174 uri=story.link, 175 title=story.title, 176 description=story.summary[:200] if len(story.summary) > 200 else story.summary 177 ) 178 179 # Post to community 180 # Pass thumbnail URL from RSS feed at top level for trusted aggregator upload 181 try: 182 post_uri = self.coves_client.create_post( 183 community_handle=feed_config.community_handle, 184 title=story.title, 185 content=rich_text["content"], 186 facets=rich_text["facets"], 187 embed=embed, 188 thumbnail_url=story.image_url # From RSS feed - server will validate and upload 189 ) 190 191 # Mark as posted (only if successful) 192 self.state_manager.mark_posted(feed_config.url, guid, post_uri) 193 new_posts += 1 194 logger.info(f"Posted: {story.title[:50]}... -> {post_uri}") 195 196 except Exception as e: 197 # Don't update state if posting failed 198 logger.error(f"Failed to post story '{story.title}': {e}") 199 continue 200 201 except Exception as e: 202 # Log error but continue with other entries 203 logger.error(f"Error processing entry: {e}", exc_info=True) 204 continue 205 206 # Update last run timestamp 207 self.state_manager.update_last_run(feed_config.url, datetime.now()) 208 209 logger.info( 210 f"Feed '{feed_config.name}': {new_posts} new posts, {skipped_posts} duplicates" 211 ) 212 213 214def main(): 215 """ 216 Main entry point for command-line execution. 217 218 Usage: 219 python -m src.main 220 """ 221 # Get paths from environment or use defaults 222 config_path = Path(os.getenv('CONFIG_PATH', 'config.yaml')) 223 state_file = Path(os.getenv('STATE_FILE', 'data/state.json')) 224 225 # Validate config file exists 226 if not config_path.exists(): 227 logger.error(f"Configuration file not found: {config_path}") 228 logger.error("Please create config.yaml (see config.example.yaml)") 229 sys.exit(1) 230 231 # Create aggregator and run 232 try: 233 aggregator = Aggregator( 234 config_path=config_path, 235 state_file=state_file 236 ) 237 aggregator.run() 238 sys.exit(0) 239 except Exception as e: 240 logger.error(f"Aggregator failed: {e}", exc_info=True) 241 sys.exit(1) 242 243 244if __name__ == '__main__': 245 main()