Atom feed for our EEG site
1# /// script 2# requires-python = ">=3.11" 3# dependencies = [ 4# "feedparser", 5# "feedgenerator", 6# "requests", 7# "beautifulsoup4", 8# "urllib3", 9# ] 10# /// 11# Do not delete the above as its needed for `uv run` 12#!/usr/bin/env python3 13 14import json 15import feedparser 16import datetime 17from time import mktime 18from feedgenerator import Atom1Feed 19import requests 20import sys 21import os 22import re 23from html import unescape 24from bs4 import BeautifulSoup 25from urllib.parse import urlparse, urljoin 26 27def load_feed_urls(file_path): 28 with open(file_path, 'r') as f: 29 data = json.load(f) 30 return [item['url'] for item in data] 31 32def load_mapping(file_path): 33 if not os.path.exists(file_path): 34 return {} 35 36 with open(file_path, 'r') as f: 37 return json.load(f) 38 39def get_feed_data(url, mapping): 40 try: 41 response = requests.get(url, timeout=30) 42 response.raise_for_status() 43 feed_data = feedparser.parse(response.content) 44 print(f"Fetched {url}: found {len(feed_data.entries)} entries", file=sys.stderr) 45 46 # Add mapping info to feed_data 47 if url in mapping: 48 feed_data.mapping = mapping[url] 49 else: 50 feed_data.mapping = None 51 52 return feed_data 53 except Exception as e: 54 print(f"Error fetching {url}: {e}", file=sys.stderr) 55 return None 56 57def create_html_preview(html_content, max_length=800): 58 """ 59 Create a preview from HTML content, preserving links inline while stripping all other HTML tags 60 """ 61 if not html_content: 62 return "" 63 64 try: 65 # Parse HTML 66 soup = BeautifulSoup(html_content, 'html.parser') 67 68 # Copy all <a> tags to preserve them 69 links = {} 70 for i, a_tag in enumerate(soup.find_all('a', href=True)): 71 # Create a unique placeholder for each link 72 placeholder = f"__LINK_{i}__" 73 links[placeholder] = { 74 'href': a_tag['href'], 75 'text': a_tag.get_text().strip() 76 } 77 # Replace the link with a placeholder 78 a_tag.replace_with(placeholder) 79 80 # Get text content with placeholders 81 text_content = soup.get_text(' ') 82 # Clean up whitespace 83 text_content = re.sub(r'\s+', ' ', text_content).strip() 84 85 # Truncate if needed 86 if len(text_content) > max_length: 87 text_content = text_content[:max_length] 88 89 # Check if we're cutting in the middle of a placeholder 90 for placeholder in links.keys(): 91 pos = text_content.rfind(placeholder) 92 if pos > 0 and pos + len(placeholder) > len(text_content): 93 # We're cutting in the middle of a placeholder, cut before it 94 text_content = text_content[:pos] 95 96 # Find the last complete word 97 last_space = text_content.rfind(' ') 98 if last_space > max_length * 0.8: # Only trim at a space if we're not losing too much text 99 text_content = text_content[:last_space] 100 101 text_content += "..." 102 103 # Restore links 104 for placeholder, link in links.items(): 105 if placeholder in text_content and link['text']: 106 link_html = f'<a href="{link["href"]}" target="_blank">{link["text"]}</a>' 107 text_content = text_content.replace(placeholder, link_html) 108 109 return text_content 110 except Exception as e: 111 print(f"Error processing HTML preview: {e}", file=sys.stderr) 112 # Fallback to plain text with no links 113 plain_text = BeautifulSoup(html_content, 'html.parser').get_text(' ') 114 plain_text = re.sub(r'\s+', ' ', plain_text).strip() 115 if len(plain_text) > max_length: 116 plain_text = plain_text[:max_length] + "..." 117 return plain_text 118 119def extract_entries(feeds): 120 all_entries = [] 121 for feed_data in feeds: 122 if not feed_data or not hasattr(feed_data, 'entries'): 123 continue 124 125 # Get feed title and handle mapping 126 feed_title = feed_data.feed.get('title', 'Unknown Source') 127 author_name = 'Unknown' 128 129 if hasattr(feed_data, 'mapping') and feed_data.mapping: 130 author_name = feed_data.mapping.get('name', 'Unknown') 131 132 print(f"Processing feed: {feed_title} ({len(feed_data.entries)} entries)", file=sys.stderr) 133 134 for entry in feed_data.entries: 135 # Get publication date 136 pub_date = None 137 if hasattr(entry, 'published_parsed') and entry.published_parsed: 138 pub_date = datetime.datetime.fromtimestamp(mktime(entry.published_parsed)) 139 elif hasattr(entry, 'updated_parsed') and entry.updated_parsed: 140 pub_date = datetime.datetime.fromtimestamp(mktime(entry.updated_parsed)) 141 142 if not pub_date: 143 print(f"Skipping entry without date: {entry.get('title', 'Unknown')}", file=sys.stderr) 144 continue 145 146 # Get title 147 title = entry.get('title', 'No title') 148 149 # Get link 150 link = entry.get('link', '') 151 152 # Get full content from the feed entry 153 if hasattr(entry, 'content') and entry.content: 154 content = entry.content[0].value 155 else: 156 content = entry.get('summary', '') 157 158 # Create HTML preview that will be used as the content 159 preview = create_html_preview(content) 160 161 # Get unique ID 162 entry_id = entry.get('id', link) 163 164 all_entries.append({ 165 'title': title, 166 'link': link, 167 'content': content, # Use the feed content directly 168 'preview': preview, 169 'author': author_name, 170 'pub_date': pub_date, 171 'feed_title': feed_title, 172 'id': entry_id 173 }) 174 175 # Sort by publication date (newest first) 176 sorted_entries = sorted(all_entries, key=lambda x: x['pub_date'], reverse=True) 177 print(f"Total entries after sorting: {len(sorted_entries)}", file=sys.stderr) 178 return sorted_entries 179 180def format_pubdate(pubdate): 181 # Format the date with short month (three-letter) 182 return pubdate.strftime('%d %b %Y %H:%M:%S') 183 184def create_atom_feed(entries): 185 feed = Atom1Feed( 186 title="Atomic EEG", 187 link="https://example.com/", # Placeholder link 188 description="Aggregated Atom feeds", 189 language="en", 190 author_name="Feed Aggregator", 191 feed_url="https://example.com/eeg.xml" # Placeholder feed URL 192 ) 193 194 for entry in entries: 195 # Format the date with short month name 196 formatted_date = format_pubdate(entry['pub_date']) 197 feed.add_item( 198 title=entry['title'], 199 link=entry['link'], 200 description=entry['preview'], # Use the preview as the main content 201 author_name=entry['author'], 202 pubdate=entry['pub_date'], 203 unique_id=entry['id'], 204 categories=[entry['feed_title']], # Use feed title as category for attribution 205 # Add formatted date as extra field 206 updateddate=entry['pub_date'], 207 formatted_date=formatted_date 208 ) 209 210 return feed 211 212# Functions from make_threads.py 213 214def extract_links_from_html(html_content, base_url=None): 215 """Extract and normalize links from HTML content""" 216 soup = BeautifulSoup(html_content, 'html.parser') 217 links = [] 218 219 for a_tag in soup.find_all('a', href=True): 220 href = a_tag['href'].strip() 221 222 # Skip empty links, anchors, javascript, and mailto 223 if not href or href.startswith(('#', 'javascript:', 'mailto:')): 224 continue 225 226 # Convert relative URLs to absolute if we have a base URL 227 if base_url and not href.startswith(('http://', 'https://')): 228 href = urljoin(base_url, href) 229 230 links.append(href) 231 232 return links 233 234def normalize_url(url): 235 """Normalize URLs to consistently match them""" 236 if not url: 237 return "" 238 239 # Handle common URL shorteners or redirects (not implemented) 240 241 # Parse the URL 242 parsed = urlparse(url) 243 244 # Ensure scheme is consistent 245 scheme = parsed.scheme.lower() or 'http' 246 247 # Normalize netloc (lowercase, remove 'www.' prefix optionally) 248 netloc = parsed.netloc.lower() 249 if netloc.startswith('www.'): 250 netloc = netloc[4:] 251 252 # Remove trailing slashes and index.html/index.php 253 path = parsed.path.rstrip('/') 254 for index_file in ['/index.html', '/index.php', '/index.htm']: 255 if path.endswith(index_file): 256 path = path[:-len(index_file)] 257 258 # Remove common fragments and query parameters that don't affect content 259 # (like tracking params, utm_*, etc.) 260 query_parts = [] 261 if parsed.query: 262 for param in parsed.query.split('&'): 263 if '=' in param: 264 key, value = param.split('=', 1) 265 if not key.startswith(('utm_', 'ref', 'source')): 266 query_parts.append(f"{key}={value}") 267 268 query = '&'.join(query_parts) 269 270 # Remove common hash fragments 271 fragment = '' 272 273 # Special case for common blogging platforms 274 # Medium, WordPress, Ghost, etc. may have specific URL patterns 275 276 # Reconstruct the URL 277 normalized = f"{scheme}://{netloc}{path}" 278 if query: 279 normalized += f"?{query}" 280 if fragment: 281 normalized += f"#{fragment}" 282 283 return normalized 284 285def get_domain(url): 286 """Extract domain from a URL""" 287 parsed = urlparse(url) 288 domain = parsed.netloc.lower() 289 # Remove 'www.' prefix if present 290 if domain.startswith('www.'): 291 domain = domain[4:] 292 return domain 293 294def generate_threads(entries): 295 """Generate thread data from the entries""" 296 print(f"Generating thread data from {len(entries)} entries...", file=sys.stderr) 297 298 entry_urls = {} # Maps normalized URLs to entry data 299 300 # First pass: collect all entries and their URLs 301 for entry in entries: 302 # Get link 303 link = entry['link'] 304 if not link: 305 continue 306 307 # Normalize the entry URL to help with matching 308 normalized_link = normalize_url(link) 309 310 # Get the domain of the entry 311 entry_domain = get_domain(link) 312 313 # Use the feed content to extract links 314 content_to_extract = entry['content'] 315 316 # Extract all links from content, using the entry link as base URL for resolving relative URLs 317 content_links = extract_links_from_html(content_to_extract, base_url=link) 318 319 entry_data = { 320 'title': entry['title'], 321 'link': link, 322 'normalized_link': normalized_link, 323 'domain': entry_domain, 324 'feed_title': entry['feed_title'], 325 'id': entry['id'], 326 'content_links': content_links, 327 'references': [], # Will be filled in the second pass 328 'referenced_by': [], # Will be filled in the second pass 329 'external_links': [] # Links to content outside the feed 330 } 331 332 entry_urls[normalized_link] = entry_data 333 334 print(f"Extracted links from all entries", file=sys.stderr) 335 336 # Second pass: analyze links between entries 337 for entry_id, entry_data in entry_urls.items(): 338 # Keep track of references to avoid duplicates 339 reference_ids = set() 340 normalized_content_links = [normalize_url(link) for link in entry_data['content_links']] 341 342 for i, normalized_link in enumerate(normalized_content_links): 343 original_link = entry_data['content_links'][i] if i < len(entry_data['content_links']) else normalized_link 344 345 # Check if this is a link to another entry in the feed 346 if normalized_link in entry_urls and normalized_link != entry_data['normalized_link']: 347 referenced_entry = entry_urls[normalized_link] 348 349 # Avoid duplicate references 350 if referenced_entry['id'] in reference_ids: 351 continue 352 353 reference_ids.add(referenced_entry['id']) 354 355 # Add to the references of the current entry 356 entry_data['references'].append({ 357 'id': referenced_entry['id'], 358 'link': referenced_entry['link'], 359 'title': referenced_entry['title'], 360 'feed_title': referenced_entry['feed_title'], 361 'in_feed': True # Mark as a reference to a post in the feed 362 }) 363 364 # Add to the referenced_by of the referenced entry 365 # Check if this entry is already in referenced_by 366 already_referenced = any(ref['id'] == entry_data['id'] for ref in referenced_entry['referenced_by']) 367 if not already_referenced: 368 referenced_entry['referenced_by'].append({ 369 'id': entry_data['id'], 370 'link': entry_data['link'], 371 'title': entry_data['title'], 372 'feed_title': entry_data['feed_title'], 373 'in_feed': True # Mark as a reference from a post in the feed 374 }) 375 elif normalized_link != entry_data['normalized_link']: 376 # This is a link to something outside the feed 377 # Check if it's from the same domain as the entry 378 link_domain = get_domain(original_link) 379 380 # Only include external links from different domains 381 if link_domain != entry_data['domain']: 382 # Track as an external link if not already in the list 383 if not any(ext_link['url'] == original_link for ext_link in entry_data['external_links']): 384 external_link = { 385 'url': original_link, 386 'normalized_url': normalized_link, 387 'in_feed': False # Mark as external to the feed 388 } 389 entry_data['external_links'].append(external_link) 390 391 # Create the thread data structure 392 thread_data = {} 393 for _, entry_data in entry_urls.items(): 394 thread_data[entry_data['id']] = { 395 'id': entry_data['id'], 396 'title': entry_data['title'], 397 'link': entry_data['link'], 398 'feed_title': entry_data['feed_title'], 399 'references': entry_data['references'], 400 'referenced_by': entry_data['referenced_by'], 401 'external_links': entry_data['external_links'] 402 } 403 404 # Generate some statistics 405 entries_with_references = sum(1 for entry_data in entry_urls.values() if entry_data['references']) 406 entries_with_referenced_by = sum(1 for entry_data in entry_urls.values() if entry_data['referenced_by']) 407 entries_with_external_links = sum(1 for entry_data in entry_urls.values() if entry_data['external_links']) 408 total_internal_references = sum(len(entry_data['references']) for entry_data in entry_urls.values()) 409 total_external_links = sum(len(entry_data['external_links']) for entry_data in entry_urls.values()) 410 411 print(f"\nThread Analysis:", file=sys.stderr) 412 print(f"Total entries: {len(entry_urls)}", file=sys.stderr) 413 print(f"Entries that reference other entries in the feed: {entries_with_references}", file=sys.stderr) 414 print(f"Entries referenced by other entries in the feed: {entries_with_referenced_by}", file=sys.stderr) 415 print(f"Entries with external links: {entries_with_external_links}", file=sys.stderr) 416 print(f"Total internal references: {total_internal_references}", file=sys.stderr) 417 print(f"Total external links: {total_external_links}", file=sys.stderr) 418 419 return thread_data 420 421def main(): 422 # Load feed URLs 423 feed_urls = load_feed_urls('feed.json') 424 425 # Load mapping 426 mapping = load_mapping('mapping.json') 427 428 # Fetch feed data 429 print(f"Fetching {len(feed_urls)} feeds...", file=sys.stderr) 430 feeds = [] 431 for url in feed_urls: 432 feed_data = get_feed_data(url, mapping) 433 if feed_data: 434 feeds.append(feed_data) 435 436 # Extract and sort entries 437 print("Processing entries...", file=sys.stderr) 438 entries = extract_entries(feeds) 439 print(f"Found {len(entries)} entries to include in feed", file=sys.stderr) 440 441 # Create aggregated feed 442 feed = create_atom_feed(entries) 443 444 # Write to file 445 with open('eeg.xml', 'w') as f: 446 feed.write(f, 'utf-8') 447 448 print(f"Feed successfully written to eeg.xml", file=sys.stderr) 449 450 # Generate thread data 451 thread_data = generate_threads(entries) 452 453 # Write the thread data to a JSON file 454 with open('threads.json', 'w') as f: 455 json.dump(thread_data, f, indent=2) 456 457 print(f"Thread data successfully written to threads.json", file=sys.stderr) 458 459if __name__ == "__main__": 460 main()