A community based topic aggregation platform built on atproto
at main 6.5 kB view raw
1""" 2State Manager for tracking posted stories. 3 4Handles deduplication by tracking which stories have already been posted. 5Uses JSON file for persistence. 6""" 7import json 8import logging 9from pathlib import Path 10from datetime import datetime, timedelta 11from typing import Optional, Dict, List 12 13logger = logging.getLogger(__name__) 14 15 16class StateManager: 17 """ 18 Manages aggregator state for deduplication. 19 20 Tracks: 21 - Posted GUIDs per feed (with timestamps) 22 - Last successful run timestamp per feed 23 - Automatic cleanup of old entries 24 """ 25 26 def __init__(self, state_file: Path, max_guids_per_feed: int = 100, max_age_days: int = 30): 27 """ 28 Initialize state manager. 29 30 Args: 31 state_file: Path to JSON state file 32 max_guids_per_feed: Maximum GUIDs to keep per feed (default: 100) 33 max_age_days: Maximum age in days for GUIDs (default: 30) 34 """ 35 self.state_file = Path(state_file) 36 self.max_guids_per_feed = max_guids_per_feed 37 self.max_age_days = max_age_days 38 self.state = self._load_state() 39 40 def _load_state(self) -> Dict: 41 """Load state from file, or create new state if file doesn't exist.""" 42 if not self.state_file.exists(): 43 logger.info(f"Creating new state file at {self.state_file}") 44 state = {'feeds': {}} 45 self._save_state(state) 46 return state 47 48 try: 49 with open(self.state_file, 'r') as f: 50 state = json.load(f) 51 logger.info(f"Loaded state from {self.state_file}") 52 return state 53 except json.JSONDecodeError as e: 54 logger.error(f"Failed to load state file: {e}. Creating new state.") 55 state = {'feeds': {}} 56 self._save_state(state) 57 return state 58 59 def _save_state(self, state: Optional[Dict] = None): 60 """Save state to file.""" 61 if state is None: 62 state = self.state 63 64 # Ensure parent directory exists 65 self.state_file.parent.mkdir(parents=True, exist_ok=True) 66 67 with open(self.state_file, 'w') as f: 68 json.dump(state, f, indent=2) 69 70 def _ensure_feed_exists(self, feed_url: str): 71 """Ensure feed entry exists in state.""" 72 if feed_url not in self.state['feeds']: 73 self.state['feeds'][feed_url] = { 74 'posted_guids': [], 75 'last_successful_run': None 76 } 77 78 def is_posted(self, feed_url: str, guid: str) -> bool: 79 """ 80 Check if a story has already been posted. 81 82 Args: 83 feed_url: RSS feed URL 84 guid: Story GUID 85 86 Returns: 87 True if already posted, False otherwise 88 """ 89 self._ensure_feed_exists(feed_url) 90 91 posted_guids = self.state['feeds'][feed_url]['posted_guids'] 92 return any(entry['guid'] == guid for entry in posted_guids) 93 94 def mark_posted(self, feed_url: str, guid: str, post_uri: str): 95 """ 96 Mark a story as posted. 97 98 Args: 99 feed_url: RSS feed URL 100 guid: Story GUID 101 post_uri: AT Proto URI of created post 102 """ 103 self._ensure_feed_exists(feed_url) 104 105 # Add to posted list 106 entry = { 107 'guid': guid, 108 'post_uri': post_uri, 109 'posted_at': datetime.now().isoformat() 110 } 111 self.state['feeds'][feed_url]['posted_guids'].append(entry) 112 113 # Auto-cleanup to keep state file manageable 114 self.cleanup_old_entries(feed_url) 115 116 # Save state 117 self._save_state() 118 119 logger.info(f"Marked as posted: {guid} -> {post_uri}") 120 121 def get_last_run(self, feed_url: str) -> Optional[datetime]: 122 """ 123 Get last successful run timestamp for a feed. 124 125 Args: 126 feed_url: RSS feed URL 127 128 Returns: 129 Datetime of last run, or None if never run 130 """ 131 self._ensure_feed_exists(feed_url) 132 133 timestamp_str = self.state['feeds'][feed_url]['last_successful_run'] 134 if timestamp_str is None: 135 return None 136 137 return datetime.fromisoformat(timestamp_str) 138 139 def update_last_run(self, feed_url: str, timestamp: datetime): 140 """ 141 Update last successful run timestamp. 142 143 Args: 144 feed_url: RSS feed URL 145 timestamp: Timestamp of successful run 146 """ 147 self._ensure_feed_exists(feed_url) 148 149 self.state['feeds'][feed_url]['last_successful_run'] = timestamp.isoformat() 150 self._save_state() 151 152 logger.info(f"Updated last run for {feed_url}: {timestamp}") 153 154 def cleanup_old_entries(self, feed_url: str): 155 """ 156 Remove old entries from state. 157 158 Removes entries that are: 159 - Older than max_age_days 160 - Beyond max_guids_per_feed limit (keeps most recent) 161 162 Args: 163 feed_url: RSS feed URL 164 """ 165 self._ensure_feed_exists(feed_url) 166 167 posted_guids = self.state['feeds'][feed_url]['posted_guids'] 168 169 # Filter out entries older than max_age_days 170 cutoff_date = datetime.now() - timedelta(days=self.max_age_days) 171 filtered = [ 172 entry for entry in posted_guids 173 if datetime.fromisoformat(entry['posted_at']) > cutoff_date 174 ] 175 176 # Keep only most recent max_guids_per_feed entries 177 # Sort by posted_at (most recent first) 178 filtered.sort(key=lambda x: x['posted_at'], reverse=True) 179 filtered = filtered[:self.max_guids_per_feed] 180 181 # Update state 182 old_count = len(posted_guids) 183 new_count = len(filtered) 184 self.state['feeds'][feed_url]['posted_guids'] = filtered 185 186 if old_count != new_count: 187 logger.info(f"Cleaned up {old_count - new_count} old entries for {feed_url}") 188 189 def get_posted_count(self, feed_url: str) -> int: 190 """ 191 Get count of posted items for a feed. 192 193 Args: 194 feed_url: RSS feed URL 195 196 Returns: 197 Number of posted items 198 """ 199 self._ensure_feed_exists(feed_url) 200 return len(self.state['feeds'][feed_url]['posted_guids']) 201 202 def get_all_posted_guids(self, feed_url: str) -> List[str]: 203 """ 204 Get all posted GUIDs for a feed. 205 206 Args: 207 feed_url: RSS feed URL 208 209 Returns: 210 List of GUIDs 211 """ 212 self._ensure_feed_exists(feed_url) 213 return [entry['guid'] for entry in self.state['feeds'][feed_url]['posted_guids']]