Atom feed for our EEG site
1# /// script
2# requires-python = ">=3.11"
3# dependencies = [
4# "feedparser",
5# "feedgenerator",
6# "requests",
7# "beautifulsoup4",
8# ]
9# ///
10# Do not delete the above as its needed for `uv run`
11#!/usr/bin/env python3
12
13import json
14import feedparser
15import datetime
16from time import mktime
17from feedgenerator import Atom1Feed
18import requests
19import sys
20import os
21import re
22from html import unescape
23from bs4 import BeautifulSoup
24
25def load_feed_urls(file_path):
26 with open(file_path, 'r') as f:
27 data = json.load(f)
28 return [item['url'] for item in data]
29
30def load_mapping(file_path):
31 if not os.path.exists(file_path):
32 return {}
33
34 with open(file_path, 'r') as f:
35 return json.load(f)
36
37def get_feed_data(url, mapping):
38 try:
39 response = requests.get(url, timeout=30)
40 response.raise_for_status()
41 feed_data = feedparser.parse(response.content)
42 print(f"Fetched {url}: found {len(feed_data.entries)} entries", file=sys.stderr)
43
44 # Add mapping info to feed_data
45 if url in mapping:
46 feed_data.mapping = mapping[url]
47 else:
48 feed_data.mapping = None
49
50 return feed_data
51 except Exception as e:
52 print(f"Error fetching {url}: {e}", file=sys.stderr)
53 return None
54
55def create_html_preview(html_content, max_length=800):
56 """
57 Create a preview from HTML content, preserving links inline while stripping all other HTML tags
58 """
59 if not html_content:
60 return ""
61
62 try:
63 # Parse HTML
64 soup = BeautifulSoup(html_content, 'html.parser')
65
66 # Copy all <a> tags to preserve them
67 links = {}
68 for i, a_tag in enumerate(soup.find_all('a', href=True)):
69 # Create a unique placeholder for each link
70 placeholder = f"__LINK_{i}__"
71 links[placeholder] = {
72 'href': a_tag['href'],
73 'text': a_tag.get_text().strip()
74 }
75 # Replace the link with a placeholder
76 a_tag.replace_with(placeholder)
77
78 # Get text content with placeholders
79 text_content = soup.get_text(' ')
80 # Clean up whitespace
81 text_content = re.sub(r'\s+', ' ', text_content).strip()
82
83 # Truncate if needed
84 if len(text_content) > max_length:
85 text_content = text_content[:max_length]
86
87 # Check if we're cutting in the middle of a placeholder
88 for placeholder in links.keys():
89 pos = text_content.rfind(placeholder)
90 if pos > 0 and pos + len(placeholder) > len(text_content):
91 # We're cutting in the middle of a placeholder, cut before it
92 text_content = text_content[:pos]
93
94 # Find the last complete word
95 last_space = text_content.rfind(' ')
96 if last_space > max_length * 0.8: # Only trim at a space if we're not losing too much text
97 text_content = text_content[:last_space]
98
99 text_content += "..."
100
101 # Restore links
102 for placeholder, link in links.items():
103 if placeholder in text_content and link['text']:
104 link_html = f'<a href="{link["href"]}" target="_blank">{link["text"]}</a>'
105 text_content = text_content.replace(placeholder, link_html)
106
107 return text_content
108 except Exception as e:
109 print(f"Error processing HTML preview: {e}", file=sys.stderr)
110 # Fallback to plain text with no links
111 plain_text = BeautifulSoup(html_content, 'html.parser').get_text(' ')
112 plain_text = re.sub(r'\s+', ' ', plain_text).strip()
113 if len(plain_text) > max_length:
114 plain_text = plain_text[:max_length] + "..."
115 return plain_text
116
117def extract_entries(feeds):
118 all_entries = []
119 for feed_data in feeds:
120 if not feed_data or not hasattr(feed_data, 'entries'):
121 continue
122
123 # Get feed title and handle mapping
124 feed_title = feed_data.feed.get('title', 'Unknown Source')
125 author_name = 'Unknown'
126
127 if hasattr(feed_data, 'mapping') and feed_data.mapping:
128 author_name = feed_data.mapping.get('name', 'Unknown')
129
130 print(f"Processing feed: {feed_title} ({len(feed_data.entries)} entries)", file=sys.stderr)
131
132 for entry in feed_data.entries:
133 # Get publication date
134 pub_date = None
135 if hasattr(entry, 'published_parsed') and entry.published_parsed:
136 pub_date = datetime.datetime.fromtimestamp(mktime(entry.published_parsed))
137 elif hasattr(entry, 'updated_parsed') and entry.updated_parsed:
138 pub_date = datetime.datetime.fromtimestamp(mktime(entry.updated_parsed))
139
140 if not pub_date:
141 print(f"Skipping entry without date: {entry.get('title', 'Unknown')}", file=sys.stderr)
142 continue
143
144 # Get title
145 title = entry.get('title', 'No title')
146
147 # Get link
148 link = entry.get('link', '')
149
150 # Get description/content
151 if hasattr(entry, 'content') and entry.content:
152 content = entry.content[0].value
153 else:
154 content = entry.get('summary', '')
155
156 # Create HTML preview that will be used as the content
157 preview = create_html_preview(content)
158
159 # Get unique ID
160 entry_id = entry.get('id', link)
161
162 all_entries.append({
163 'title': title,
164 'link': link,
165 'content': content,
166 'preview': preview,
167 'author': author_name,
168 'pub_date': pub_date,
169 'feed_title': feed_title,
170 'id': entry_id
171 })
172
173 # Sort by publication date (newest first)
174 sorted_entries = sorted(all_entries, key=lambda x: x['pub_date'], reverse=True)
175 print(f"Total entries after sorting: {len(sorted_entries)}", file=sys.stderr)
176 return sorted_entries
177
178def format_pubdate(pubdate):
179 # Format the date with short month (three-letter)
180 return pubdate.strftime('%d %b %Y %H:%M:%S')
181
182def create_atom_feed(entries):
183 feed = Atom1Feed(
184 title="Atomic EEG",
185 link="https://example.com/", # Placeholder link
186 description="Aggregated Atom feeds",
187 language="en",
188 author_name="Feed Aggregator",
189 feed_url="https://example.com/eeg.xml" # Placeholder feed URL
190 )
191
192 for entry in entries:
193 # Format the date with short month name
194 formatted_date = format_pubdate(entry['pub_date'])
195 feed.add_item(
196 title=entry['title'],
197 link=entry['link'],
198 description=entry['preview'], # Use the preview as the main content
199 author_name=entry['author'],
200 pubdate=entry['pub_date'],
201 unique_id=entry['id'],
202 categories=[entry['feed_title']], # Use feed title as category for attribution
203 # Add formatted date as extra field
204 updateddate=entry['pub_date'],
205 formatted_date=formatted_date
206 )
207
208 return feed
209
210def main():
211 # Load feed URLs
212 feed_urls = load_feed_urls('feed.json')
213
214 # Load mapping
215 mapping = load_mapping('mapping.json')
216
217 # Fetch feed data
218 print(f"Fetching {len(feed_urls)} feeds...", file=sys.stderr)
219 feeds = []
220 for url in feed_urls:
221 feed_data = get_feed_data(url, mapping)
222 if feed_data:
223 feeds.append(feed_data)
224
225 # Extract and sort entries
226 print("Processing entries...", file=sys.stderr)
227 entries = extract_entries(feeds)
228 print(f"Found {len(entries)} entries to include in feed", file=sys.stderr)
229
230 # Create aggregated feed
231 feed = create_atom_feed(entries)
232
233 # Write to file
234 with open('eeg.xml', 'w') as f:
235 feed.write(f, 'utf-8')
236
237 print(f"Feed successfully written to eeg.xml", file=sys.stderr)
238
239if __name__ == "__main__":
240 main()