A community based topic aggregation platform built on atproto
1"""
2State Manager for tracking posted stories.
3
4Handles deduplication by tracking which stories have already been posted.
5Uses JSON file for persistence.
6"""
7import json
8import logging
9from pathlib import Path
10from datetime import datetime, timedelta
11from typing import Optional, Dict, List
12
13logger = logging.getLogger(__name__)
14
15
16class StateManager:
17 """
18 Manages aggregator state for deduplication.
19
20 Tracks:
21 - Posted GUIDs per feed (with timestamps)
22 - Last successful run timestamp per feed
23 - Automatic cleanup of old entries
24 """
25
26 def __init__(self, state_file: Path, max_guids_per_feed: int = 100, max_age_days: int = 30):
27 """
28 Initialize state manager.
29
30 Args:
31 state_file: Path to JSON state file
32 max_guids_per_feed: Maximum GUIDs to keep per feed (default: 100)
33 max_age_days: Maximum age in days for GUIDs (default: 30)
34 """
35 self.state_file = Path(state_file)
36 self.max_guids_per_feed = max_guids_per_feed
37 self.max_age_days = max_age_days
38 self.state = self._load_state()
39
40 def _load_state(self) -> Dict:
41 """Load state from file, or create new state if file doesn't exist."""
42 if not self.state_file.exists():
43 logger.info(f"Creating new state file at {self.state_file}")
44 state = {'feeds': {}}
45 self._save_state(state)
46 return state
47
48 try:
49 with open(self.state_file, 'r') as f:
50 state = json.load(f)
51 logger.info(f"Loaded state from {self.state_file}")
52 return state
53 except json.JSONDecodeError as e:
54 logger.error(f"Failed to load state file: {e}. Creating new state.")
55 state = {'feeds': {}}
56 self._save_state(state)
57 return state
58
59 def _save_state(self, state: Optional[Dict] = None):
60 """Save state to file."""
61 if state is None:
62 state = self.state
63
64 # Ensure parent directory exists
65 self.state_file.parent.mkdir(parents=True, exist_ok=True)
66
67 with open(self.state_file, 'w') as f:
68 json.dump(state, f, indent=2)
69
70 def _ensure_feed_exists(self, feed_url: str):
71 """Ensure feed entry exists in state."""
72 if feed_url not in self.state['feeds']:
73 self.state['feeds'][feed_url] = {
74 'posted_guids': [],
75 'last_successful_run': None
76 }
77
78 def is_posted(self, feed_url: str, guid: str) -> bool:
79 """
80 Check if a story has already been posted.
81
82 Args:
83 feed_url: RSS feed URL
84 guid: Story GUID
85
86 Returns:
87 True if already posted, False otherwise
88 """
89 self._ensure_feed_exists(feed_url)
90
91 posted_guids = self.state['feeds'][feed_url]['posted_guids']
92 return any(entry['guid'] == guid for entry in posted_guids)
93
94 def mark_posted(self, feed_url: str, guid: str, post_uri: str):
95 """
96 Mark a story as posted.
97
98 Args:
99 feed_url: RSS feed URL
100 guid: Story GUID
101 post_uri: AT Proto URI of created post
102 """
103 self._ensure_feed_exists(feed_url)
104
105 # Add to posted list
106 entry = {
107 'guid': guid,
108 'post_uri': post_uri,
109 'posted_at': datetime.now().isoformat()
110 }
111 self.state['feeds'][feed_url]['posted_guids'].append(entry)
112
113 # Auto-cleanup to keep state file manageable
114 self.cleanup_old_entries(feed_url)
115
116 # Save state
117 self._save_state()
118
119 logger.info(f"Marked as posted: {guid} -> {post_uri}")
120
121 def get_last_run(self, feed_url: str) -> Optional[datetime]:
122 """
123 Get last successful run timestamp for a feed.
124
125 Args:
126 feed_url: RSS feed URL
127
128 Returns:
129 Datetime of last run, or None if never run
130 """
131 self._ensure_feed_exists(feed_url)
132
133 timestamp_str = self.state['feeds'][feed_url]['last_successful_run']
134 if timestamp_str is None:
135 return None
136
137 return datetime.fromisoformat(timestamp_str)
138
139 def update_last_run(self, feed_url: str, timestamp: datetime):
140 """
141 Update last successful run timestamp.
142
143 Args:
144 feed_url: RSS feed URL
145 timestamp: Timestamp of successful run
146 """
147 self._ensure_feed_exists(feed_url)
148
149 self.state['feeds'][feed_url]['last_successful_run'] = timestamp.isoformat()
150 self._save_state()
151
152 logger.info(f"Updated last run for {feed_url}: {timestamp}")
153
154 def cleanup_old_entries(self, feed_url: str):
155 """
156 Remove old entries from state.
157
158 Removes entries that are:
159 - Older than max_age_days
160 - Beyond max_guids_per_feed limit (keeps most recent)
161
162 Args:
163 feed_url: RSS feed URL
164 """
165 self._ensure_feed_exists(feed_url)
166
167 posted_guids = self.state['feeds'][feed_url]['posted_guids']
168
169 # Filter out entries older than max_age_days
170 cutoff_date = datetime.now() - timedelta(days=self.max_age_days)
171 filtered = [
172 entry for entry in posted_guids
173 if datetime.fromisoformat(entry['posted_at']) > cutoff_date
174 ]
175
176 # Keep only most recent max_guids_per_feed entries
177 # Sort by posted_at (most recent first)
178 filtered.sort(key=lambda x: x['posted_at'], reverse=True)
179 filtered = filtered[:self.max_guids_per_feed]
180
181 # Update state
182 old_count = len(posted_guids)
183 new_count = len(filtered)
184 self.state['feeds'][feed_url]['posted_guids'] = filtered
185
186 if old_count != new_count:
187 logger.info(f"Cleaned up {old_count - new_count} old entries for {feed_url}")
188
189 def get_posted_count(self, feed_url: str) -> int:
190 """
191 Get count of posted items for a feed.
192
193 Args:
194 feed_url: RSS feed URL
195
196 Returns:
197 Number of posted items
198 """
199 self._ensure_feed_exists(feed_url)
200 return len(self.state['feeds'][feed_url]['posted_guids'])
201
202 def get_all_posted_guids(self, feed_url: str) -> List[str]:
203 """
204 Get all posted GUIDs for a feed.
205
206 Args:
207 feed_url: RSS feed URL
208
209 Returns:
210 List of GUIDs
211 """
212 self._ensure_feed_exists(feed_url)
213 return [entry['guid'] for entry in self.state['feeds'][feed_url]['posted_guids']]