Atom feed for our EEG site
1# /// script 2# requires-python = ">=3.11" 3# dependencies = [ 4# "feedparser", 5# "feedgenerator", 6# "requests", 7# "beautifulsoup4", 8# ] 9# /// 10# Do not delete the above as its needed for `uv run` 11#!/usr/bin/env python3 12 13import json 14import feedparser 15import datetime 16from time import mktime 17from feedgenerator import Atom1Feed 18import requests 19import sys 20import os 21import re 22from html import unescape 23from bs4 import BeautifulSoup 24 25def load_feed_urls(file_path): 26 with open(file_path, 'r') as f: 27 data = json.load(f) 28 return [item['url'] for item in data] 29 30def load_mapping(file_path): 31 if not os.path.exists(file_path): 32 return {} 33 34 with open(file_path, 'r') as f: 35 return json.load(f) 36 37def get_feed_data(url, mapping): 38 try: 39 response = requests.get(url, timeout=30) 40 response.raise_for_status() 41 feed_data = feedparser.parse(response.content) 42 print(f"Fetched {url}: found {len(feed_data.entries)} entries", file=sys.stderr) 43 44 # Add mapping info to feed_data 45 if url in mapping: 46 feed_data.mapping = mapping[url] 47 else: 48 feed_data.mapping = None 49 50 return feed_data 51 except Exception as e: 52 print(f"Error fetching {url}: {e}", file=sys.stderr) 53 return None 54 55def create_html_preview(html_content, max_length=800): 56 """ 57 Create a preview from HTML content, preserving links inline while stripping all other HTML tags 58 """ 59 if not html_content: 60 return "" 61 62 try: 63 # Parse HTML 64 soup = BeautifulSoup(html_content, 'html.parser') 65 66 # Copy all <a> tags to preserve them 67 links = {} 68 for i, a_tag in enumerate(soup.find_all('a', href=True)): 69 # Create a unique placeholder for each link 70 placeholder = f"__LINK_{i}__" 71 links[placeholder] = { 72 'href': a_tag['href'], 73 'text': a_tag.get_text().strip() 74 } 75 # Replace the link with a placeholder 76 a_tag.replace_with(placeholder) 77 78 # Get text content with placeholders 79 text_content = soup.get_text(' ') 80 # Clean up whitespace 81 text_content = re.sub(r'\s+', ' ', text_content).strip() 82 83 # Truncate if needed 84 if len(text_content) > max_length: 85 text_content = text_content[:max_length] 86 87 # Check if we're cutting in the middle of a placeholder 88 for placeholder in links.keys(): 89 pos = text_content.rfind(placeholder) 90 if pos > 0 and pos + len(placeholder) > len(text_content): 91 # We're cutting in the middle of a placeholder, cut before it 92 text_content = text_content[:pos] 93 94 # Find the last complete word 95 last_space = text_content.rfind(' ') 96 if last_space > max_length * 0.8: # Only trim at a space if we're not losing too much text 97 text_content = text_content[:last_space] 98 99 text_content += "..." 100 101 # Restore links 102 for placeholder, link in links.items(): 103 if placeholder in text_content and link['text']: 104 link_html = f'<a href="{link["href"]}" target="_blank">{link["text"]}</a>' 105 text_content = text_content.replace(placeholder, link_html) 106 107 return text_content 108 except Exception as e: 109 print(f"Error processing HTML preview: {e}", file=sys.stderr) 110 # Fallback to plain text with no links 111 plain_text = BeautifulSoup(html_content, 'html.parser').get_text(' ') 112 plain_text = re.sub(r'\s+', ' ', plain_text).strip() 113 if len(plain_text) > max_length: 114 plain_text = plain_text[:max_length] + "..." 115 return plain_text 116 117def extract_entries(feeds): 118 all_entries = [] 119 for feed_data in feeds: 120 if not feed_data or not hasattr(feed_data, 'entries'): 121 continue 122 123 # Get feed title and handle mapping 124 feed_title = feed_data.feed.get('title', 'Unknown Source') 125 author_name = 'Unknown' 126 127 if hasattr(feed_data, 'mapping') and feed_data.mapping: 128 author_name = feed_data.mapping.get('name', 'Unknown') 129 130 print(f"Processing feed: {feed_title} ({len(feed_data.entries)} entries)", file=sys.stderr) 131 132 for entry in feed_data.entries: 133 # Get publication date 134 pub_date = None 135 if hasattr(entry, 'published_parsed') and entry.published_parsed: 136 pub_date = datetime.datetime.fromtimestamp(mktime(entry.published_parsed)) 137 elif hasattr(entry, 'updated_parsed') and entry.updated_parsed: 138 pub_date = datetime.datetime.fromtimestamp(mktime(entry.updated_parsed)) 139 140 if not pub_date: 141 print(f"Skipping entry without date: {entry.get('title', 'Unknown')}", file=sys.stderr) 142 continue 143 144 # Get title 145 title = entry.get('title', 'No title') 146 147 # Get link 148 link = entry.get('link', '') 149 150 # Get description/content 151 if hasattr(entry, 'content') and entry.content: 152 content = entry.content[0].value 153 else: 154 content = entry.get('summary', '') 155 156 # Create HTML preview that will be used as the content 157 preview = create_html_preview(content) 158 159 # Get unique ID 160 entry_id = entry.get('id', link) 161 162 all_entries.append({ 163 'title': title, 164 'link': link, 165 'content': content, 166 'preview': preview, 167 'author': author_name, 168 'pub_date': pub_date, 169 'feed_title': feed_title, 170 'id': entry_id 171 }) 172 173 # Sort by publication date (newest first) 174 sorted_entries = sorted(all_entries, key=lambda x: x['pub_date'], reverse=True) 175 print(f"Total entries after sorting: {len(sorted_entries)}", file=sys.stderr) 176 return sorted_entries 177 178def format_pubdate(pubdate): 179 # Format the date with short month (three-letter) 180 return pubdate.strftime('%d %b %Y %H:%M:%S') 181 182def create_atom_feed(entries): 183 feed = Atom1Feed( 184 title="Atomic EEG", 185 link="https://example.com/", # Placeholder link 186 description="Aggregated Atom feeds", 187 language="en", 188 author_name="Feed Aggregator", 189 feed_url="https://example.com/eeg.xml" # Placeholder feed URL 190 ) 191 192 for entry in entries: 193 # Format the date with short month name 194 formatted_date = format_pubdate(entry['pub_date']) 195 feed.add_item( 196 title=entry['title'], 197 link=entry['link'], 198 description=entry['preview'], # Use the preview as the main content 199 author_name=entry['author'], 200 pubdate=entry['pub_date'], 201 unique_id=entry['id'], 202 categories=[entry['feed_title']], # Use feed title as category for attribution 203 # Add formatted date as extra field 204 updateddate=entry['pub_date'], 205 formatted_date=formatted_date 206 ) 207 208 return feed 209 210def main(): 211 # Load feed URLs 212 feed_urls = load_feed_urls('feed.json') 213 214 # Load mapping 215 mapping = load_mapping('mapping.json') 216 217 # Fetch feed data 218 print(f"Fetching {len(feed_urls)} feeds...", file=sys.stderr) 219 feeds = [] 220 for url in feed_urls: 221 feed_data = get_feed_data(url, mapping) 222 if feed_data: 223 feeds.append(feed_data) 224 225 # Extract and sort entries 226 print("Processing entries...", file=sys.stderr) 227 entries = extract_entries(feeds) 228 print(f"Found {len(entries)} entries to include in feed", file=sys.stderr) 229 230 # Create aggregated feed 231 feed = create_atom_feed(entries) 232 233 # Write to file 234 with open('eeg.xml', 'w') as f: 235 feed.write(f, 'utf-8') 236 237 print(f"Feed successfully written to eeg.xml", file=sys.stderr) 238 239if __name__ == "__main__": 240 main()