Atom feed for our EEG site
1# /// script
2# requires-python = ">=3.11"
3# dependencies = [
4# "feedparser",
5# "feedgenerator",
6# "requests",
7# "beautifulsoup4",
8# "urllib3",
9# ]
10# ///
11# Do not delete the above as its needed for `uv run`
12#!/usr/bin/env python3
13
14import json
15import feedparser
16import datetime
17from time import mktime
18from feedgenerator import Atom1Feed
19import requests
20import sys
21import os
22import re
23from html import unescape
24from bs4 import BeautifulSoup
25from urllib.parse import urlparse, urljoin
26
27def load_feed_urls(file_path):
28 with open(file_path, 'r') as f:
29 data = json.load(f)
30 return [item['url'] for item in data]
31
32def load_mapping(file_path):
33 if not os.path.exists(file_path):
34 return {}
35
36 with open(file_path, 'r') as f:
37 return json.load(f)
38
39def get_feed_data(url, mapping):
40 try:
41 response = requests.get(url, timeout=30)
42 response.raise_for_status()
43 feed_data = feedparser.parse(response.content)
44 print(f"Fetched {url}: found {len(feed_data.entries)} entries", file=sys.stderr)
45
46 # Add mapping info to feed_data
47 if url in mapping:
48 feed_data.mapping = mapping[url]
49 else:
50 feed_data.mapping = None
51
52 return feed_data
53 except Exception as e:
54 print(f"Error fetching {url}: {e}", file=sys.stderr)
55 return None
56
57def create_html_preview(html_content, max_length=800):
58 """
59 Create a preview from HTML content, preserving links inline while stripping all other HTML tags
60 """
61 if not html_content:
62 return ""
63
64 try:
65 # Parse HTML
66 soup = BeautifulSoup(html_content, 'html.parser')
67
68 # Copy all <a> tags to preserve them
69 links = {}
70 for i, a_tag in enumerate(soup.find_all('a', href=True)):
71 # Create a unique placeholder for each link
72 placeholder = f"__LINK_{i}__"
73 links[placeholder] = {
74 'href': a_tag['href'],
75 'text': a_tag.get_text().strip()
76 }
77 # Replace the link with a placeholder
78 a_tag.replace_with(placeholder)
79
80 # Get text content with placeholders
81 text_content = soup.get_text(' ')
82 # Clean up whitespace
83 text_content = re.sub(r'\s+', ' ', text_content).strip()
84
85 # Truncate if needed
86 if len(text_content) > max_length:
87 text_content = text_content[:max_length]
88
89 # Check if we're cutting in the middle of a placeholder
90 for placeholder in links.keys():
91 pos = text_content.rfind(placeholder)
92 if pos > 0 and pos + len(placeholder) > len(text_content):
93 # We're cutting in the middle of a placeholder, cut before it
94 text_content = text_content[:pos]
95
96 # Find the last complete word
97 last_space = text_content.rfind(' ')
98 if last_space > max_length * 0.8: # Only trim at a space if we're not losing too much text
99 text_content = text_content[:last_space]
100
101 text_content += "..."
102
103 # Restore links
104 for placeholder, link in links.items():
105 if placeholder in text_content and link['text']:
106 link_html = f'<a href="{link["href"]}" target="_blank">{link["text"]}</a>'
107 text_content = text_content.replace(placeholder, link_html)
108
109 return text_content
110 except Exception as e:
111 print(f"Error processing HTML preview: {e}", file=sys.stderr)
112 # Fallback to plain text with no links
113 plain_text = BeautifulSoup(html_content, 'html.parser').get_text(' ')
114 plain_text = re.sub(r'\s+', ' ', plain_text).strip()
115 if len(plain_text) > max_length:
116 plain_text = plain_text[:max_length] + "..."
117 return plain_text
118
119def extract_entries(feeds):
120 all_entries = []
121 for feed_data in feeds:
122 if not feed_data or not hasattr(feed_data, 'entries'):
123 continue
124
125 # Get feed title and handle mapping
126 feed_title = feed_data.feed.get('title', 'Unknown Source')
127 author_name = 'Unknown'
128
129 if hasattr(feed_data, 'mapping') and feed_data.mapping:
130 author_name = feed_data.mapping.get('name', 'Unknown')
131
132 print(f"Processing feed: {feed_title} ({len(feed_data.entries)} entries)", file=sys.stderr)
133
134 for entry in feed_data.entries:
135 # Get publication date
136 pub_date = None
137 if hasattr(entry, 'published_parsed') and entry.published_parsed:
138 pub_date = datetime.datetime.fromtimestamp(mktime(entry.published_parsed))
139 elif hasattr(entry, 'updated_parsed') and entry.updated_parsed:
140 pub_date = datetime.datetime.fromtimestamp(mktime(entry.updated_parsed))
141
142 if not pub_date:
143 print(f"Skipping entry without date: {entry.get('title', 'Unknown')}", file=sys.stderr)
144 continue
145
146 # Get title
147 title = entry.get('title', 'No title')
148
149 # Get link
150 link = entry.get('link', '')
151
152 # Get full content from the feed entry
153 if hasattr(entry, 'content') and entry.content:
154 content = entry.content[0].value
155 else:
156 content = entry.get('summary', '')
157
158 # Create HTML preview that will be used as the content
159 preview = create_html_preview(content)
160
161 # Get unique ID
162 entry_id = entry.get('id', link)
163
164 all_entries.append({
165 'title': title,
166 'link': link,
167 'content': content, # Use the feed content directly
168 'preview': preview,
169 'author': author_name,
170 'pub_date': pub_date,
171 'feed_title': feed_title,
172 'id': entry_id
173 })
174
175 # Sort by publication date (newest first)
176 sorted_entries = sorted(all_entries, key=lambda x: x['pub_date'], reverse=True)
177 print(f"Total entries after sorting: {len(sorted_entries)}", file=sys.stderr)
178 return sorted_entries
179
180def format_pubdate(pubdate):
181 # Format the date with short month (three-letter)
182 return pubdate.strftime('%d %b %Y %H:%M:%S')
183
184def create_atom_feed(entries):
185 feed = Atom1Feed(
186 title="Atomic EEG",
187 link="https://example.com/", # Placeholder link
188 description="Aggregated Atom feeds",
189 language="en",
190 author_name="Feed Aggregator",
191 feed_url="https://example.com/eeg.xml" # Placeholder feed URL
192 )
193
194 for entry in entries:
195 # Format the date with short month name
196 formatted_date = format_pubdate(entry['pub_date'])
197 feed.add_item(
198 title=entry['title'],
199 link=entry['link'],
200 description=entry['preview'], # Use the preview as the main content
201 author_name=entry['author'],
202 pubdate=entry['pub_date'],
203 unique_id=entry['id'],
204 categories=[entry['feed_title']], # Use feed title as category for attribution
205 # Add formatted date as extra field
206 updateddate=entry['pub_date'],
207 formatted_date=formatted_date
208 )
209
210 return feed
211
212# Functions from make_threads.py
213
214def extract_links_from_html(html_content, base_url=None):
215 """Extract and normalize links from HTML content"""
216 soup = BeautifulSoup(html_content, 'html.parser')
217 links = []
218
219 for a_tag in soup.find_all('a', href=True):
220 href = a_tag['href'].strip()
221
222 # Skip empty links, anchors, javascript, and mailto
223 if not href or href.startswith(('#', 'javascript:', 'mailto:')):
224 continue
225
226 # Convert relative URLs to absolute if we have a base URL
227 if base_url and not href.startswith(('http://', 'https://')):
228 href = urljoin(base_url, href)
229
230 links.append(href)
231
232 return links
233
234def normalize_url(url):
235 """Normalize URLs to consistently match them"""
236 if not url:
237 return ""
238
239 # Handle common URL shorteners or redirects (not implemented)
240
241 # Parse the URL
242 parsed = urlparse(url)
243
244 # Ensure scheme is consistent
245 scheme = parsed.scheme.lower() or 'http'
246
247 # Normalize netloc (lowercase, remove 'www.' prefix optionally)
248 netloc = parsed.netloc.lower()
249 if netloc.startswith('www.'):
250 netloc = netloc[4:]
251
252 # Remove trailing slashes and index.html/index.php
253 path = parsed.path.rstrip('/')
254 for index_file in ['/index.html', '/index.php', '/index.htm']:
255 if path.endswith(index_file):
256 path = path[:-len(index_file)]
257
258 # Remove common fragments and query parameters that don't affect content
259 # (like tracking params, utm_*, etc.)
260 query_parts = []
261 if parsed.query:
262 for param in parsed.query.split('&'):
263 if '=' in param:
264 key, value = param.split('=', 1)
265 if not key.startswith(('utm_', 'ref', 'source')):
266 query_parts.append(f"{key}={value}")
267
268 query = '&'.join(query_parts)
269
270 # Remove common hash fragments
271 fragment = ''
272
273 # Special case for common blogging platforms
274 # Medium, WordPress, Ghost, etc. may have specific URL patterns
275
276 # Reconstruct the URL
277 normalized = f"{scheme}://{netloc}{path}"
278 if query:
279 normalized += f"?{query}"
280 if fragment:
281 normalized += f"#{fragment}"
282
283 return normalized
284
285def get_domain(url):
286 """Extract domain from a URL"""
287 parsed = urlparse(url)
288 domain = parsed.netloc.lower()
289 # Remove 'www.' prefix if present
290 if domain.startswith('www.'):
291 domain = domain[4:]
292 return domain
293
294def generate_threads(entries):
295 """Generate thread data from the entries"""
296 print(f"Generating thread data from {len(entries)} entries...", file=sys.stderr)
297
298 entry_urls = {} # Maps normalized URLs to entry data
299
300 # First pass: collect all entries and their URLs
301 for entry in entries:
302 # Get link
303 link = entry['link']
304 if not link:
305 continue
306
307 # Normalize the entry URL to help with matching
308 normalized_link = normalize_url(link)
309
310 # Get the domain of the entry
311 entry_domain = get_domain(link)
312
313 # Use the feed content to extract links
314 content_to_extract = entry['content']
315
316 # Extract all links from content, using the entry link as base URL for resolving relative URLs
317 content_links = extract_links_from_html(content_to_extract, base_url=link)
318
319 entry_data = {
320 'title': entry['title'],
321 'link': link,
322 'normalized_link': normalized_link,
323 'domain': entry_domain,
324 'feed_title': entry['feed_title'],
325 'id': entry['id'],
326 'content_links': content_links,
327 'references': [], # Will be filled in the second pass
328 'referenced_by': [], # Will be filled in the second pass
329 'external_links': [] # Links to content outside the feed
330 }
331
332 entry_urls[normalized_link] = entry_data
333
334 print(f"Extracted links from all entries", file=sys.stderr)
335
336 # Second pass: analyze links between entries
337 for entry_id, entry_data in entry_urls.items():
338 # Keep track of references to avoid duplicates
339 reference_ids = set()
340 normalized_content_links = [normalize_url(link) for link in entry_data['content_links']]
341
342 for i, normalized_link in enumerate(normalized_content_links):
343 original_link = entry_data['content_links'][i] if i < len(entry_data['content_links']) else normalized_link
344
345 # Check if this is a link to another entry in the feed
346 if normalized_link in entry_urls and normalized_link != entry_data['normalized_link']:
347 referenced_entry = entry_urls[normalized_link]
348
349 # Avoid duplicate references
350 if referenced_entry['id'] in reference_ids:
351 continue
352
353 reference_ids.add(referenced_entry['id'])
354
355 # Add to the references of the current entry
356 entry_data['references'].append({
357 'id': referenced_entry['id'],
358 'link': referenced_entry['link'],
359 'title': referenced_entry['title'],
360 'feed_title': referenced_entry['feed_title'],
361 'in_feed': True # Mark as a reference to a post in the feed
362 })
363
364 # Add to the referenced_by of the referenced entry
365 # Check if this entry is already in referenced_by
366 already_referenced = any(ref['id'] == entry_data['id'] for ref in referenced_entry['referenced_by'])
367 if not already_referenced:
368 referenced_entry['referenced_by'].append({
369 'id': entry_data['id'],
370 'link': entry_data['link'],
371 'title': entry_data['title'],
372 'feed_title': entry_data['feed_title'],
373 'in_feed': True # Mark as a reference from a post in the feed
374 })
375 elif normalized_link != entry_data['normalized_link']:
376 # This is a link to something outside the feed
377 # Check if it's from the same domain as the entry
378 link_domain = get_domain(original_link)
379
380 # Only include external links from different domains
381 if link_domain != entry_data['domain']:
382 # Track as an external link if not already in the list
383 if not any(ext_link['url'] == original_link for ext_link in entry_data['external_links']):
384 external_link = {
385 'url': original_link,
386 'normalized_url': normalized_link,
387 'in_feed': False # Mark as external to the feed
388 }
389 entry_data['external_links'].append(external_link)
390
391 # Create the thread data structure
392 thread_data = {}
393 for _, entry_data in entry_urls.items():
394 thread_data[entry_data['id']] = {
395 'id': entry_data['id'],
396 'title': entry_data['title'],
397 'link': entry_data['link'],
398 'feed_title': entry_data['feed_title'],
399 'references': entry_data['references'],
400 'referenced_by': entry_data['referenced_by'],
401 'external_links': entry_data['external_links']
402 }
403
404 # Generate some statistics
405 entries_with_references = sum(1 for entry_data in entry_urls.values() if entry_data['references'])
406 entries_with_referenced_by = sum(1 for entry_data in entry_urls.values() if entry_data['referenced_by'])
407 entries_with_external_links = sum(1 for entry_data in entry_urls.values() if entry_data['external_links'])
408 total_internal_references = sum(len(entry_data['references']) for entry_data in entry_urls.values())
409 total_external_links = sum(len(entry_data['external_links']) for entry_data in entry_urls.values())
410
411 print(f"\nThread Analysis:", file=sys.stderr)
412 print(f"Total entries: {len(entry_urls)}", file=sys.stderr)
413 print(f"Entries that reference other entries in the feed: {entries_with_references}", file=sys.stderr)
414 print(f"Entries referenced by other entries in the feed: {entries_with_referenced_by}", file=sys.stderr)
415 print(f"Entries with external links: {entries_with_external_links}", file=sys.stderr)
416 print(f"Total internal references: {total_internal_references}", file=sys.stderr)
417 print(f"Total external links: {total_external_links}", file=sys.stderr)
418
419 return thread_data
420
421def main():
422 # Load feed URLs
423 feed_urls = load_feed_urls('feed.json')
424
425 # Load mapping
426 mapping = load_mapping('mapping.json')
427
428 # Fetch feed data
429 print(f"Fetching {len(feed_urls)} feeds...", file=sys.stderr)
430 feeds = []
431 for url in feed_urls:
432 feed_data = get_feed_data(url, mapping)
433 if feed_data:
434 feeds.append(feed_data)
435
436 # Extract and sort entries
437 print("Processing entries...", file=sys.stderr)
438 entries = extract_entries(feeds)
439 print(f"Found {len(entries)} entries to include in feed", file=sys.stderr)
440
441 # Create aggregated feed
442 feed = create_atom_feed(entries)
443
444 # Write to file
445 with open('eeg.xml', 'w') as f:
446 feed.write(f, 'utf-8')
447
448 print(f"Feed successfully written to eeg.xml", file=sys.stderr)
449
450 # Generate thread data
451 thread_data = generate_threads(entries)
452
453 # Write the thread data to a JSON file
454 with open('threads.json', 'w') as f:
455 json.dump(thread_data, f, indent=2)
456
457 print(f"Thread data successfully written to threads.json", file=sys.stderr)
458
459if __name__ == "__main__":
460 main()