# /// script # requires-python = ">=3.11" # dependencies = [ # "feedparser", # "feedgenerator", # "requests", # "beautifulsoup4", # ] # /// # Do not delete the above as its needed for `uv run` #!/usr/bin/env python3 import json import feedparser import datetime from time import mktime from feedgenerator import Atom1Feed import requests import sys import os import re from html import unescape from bs4 import BeautifulSoup def load_feed_urls(file_path): with open(file_path, 'r') as f: data = json.load(f) return [item['url'] for item in data] def load_mapping(file_path): if not os.path.exists(file_path): return {} with open(file_path, 'r') as f: return json.load(f) def get_feed_data(url, mapping): try: response = requests.get(url, timeout=30) response.raise_for_status() feed_data = feedparser.parse(response.content) print(f"Fetched {url}: found {len(feed_data.entries)} entries", file=sys.stderr) # Add mapping info to feed_data if url in mapping: feed_data.mapping = mapping[url] else: feed_data.mapping = None return feed_data except Exception as e: print(f"Error fetching {url}: {e}", file=sys.stderr) return None def create_html_preview(html_content, max_length=800): """ Create a preview from HTML content, preserving links inline while stripping all other HTML tags """ if not html_content: return "" try: # Parse HTML soup = BeautifulSoup(html_content, 'html.parser') # Copy all tags to preserve them links = {} for i, a_tag in enumerate(soup.find_all('a', href=True)): # Create a unique placeholder for each link placeholder = f"__LINK_{i}__" links[placeholder] = { 'href': a_tag['href'], 'text': a_tag.get_text().strip() } # Replace the link with a placeholder a_tag.replace_with(placeholder) # Get text content with placeholders text_content = soup.get_text(' ') # Clean up whitespace text_content = re.sub(r'\s+', ' ', text_content).strip() # Truncate if needed if len(text_content) > max_length: text_content = text_content[:max_length] # Check if we're cutting in the middle of a placeholder for placeholder in links.keys(): pos = text_content.rfind(placeholder) if pos > 0 and pos + len(placeholder) > len(text_content): # We're cutting in the middle of a placeholder, cut before it text_content = text_content[:pos] # Find the last complete word last_space = text_content.rfind(' ') if last_space > max_length * 0.8: # Only trim at a space if we're not losing too much text text_content = text_content[:last_space] text_content += "..." # Restore links for placeholder, link in links.items(): if placeholder in text_content and link['text']: link_html = f'{link["text"]}' text_content = text_content.replace(placeholder, link_html) return text_content except Exception as e: print(f"Error processing HTML preview: {e}", file=sys.stderr) # Fallback to plain text with no links plain_text = BeautifulSoup(html_content, 'html.parser').get_text(' ') plain_text = re.sub(r'\s+', ' ', plain_text).strip() if len(plain_text) > max_length: plain_text = plain_text[:max_length] + "..." return plain_text def extract_entries(feeds): all_entries = [] for feed_data in feeds: if not feed_data or not hasattr(feed_data, 'entries'): continue # Get feed title and handle mapping feed_title = feed_data.feed.get('title', 'Unknown Source') author_name = 'Unknown' if hasattr(feed_data, 'mapping') and feed_data.mapping: author_name = feed_data.mapping.get('name', 'Unknown') print(f"Processing feed: {feed_title} ({len(feed_data.entries)} entries)", file=sys.stderr) for entry in feed_data.entries: # Get publication date pub_date = None if hasattr(entry, 'published_parsed') and entry.published_parsed: pub_date = datetime.datetime.fromtimestamp(mktime(entry.published_parsed)) elif hasattr(entry, 'updated_parsed') and entry.updated_parsed: pub_date = datetime.datetime.fromtimestamp(mktime(entry.updated_parsed)) if not pub_date: print(f"Skipping entry without date: {entry.get('title', 'Unknown')}", file=sys.stderr) continue # Get title title = entry.get('title', 'No title') # Get link link = entry.get('link', '') # Get description/content if hasattr(entry, 'content') and entry.content: content = entry.content[0].value else: content = entry.get('summary', '') # Create HTML preview that will be used as the content preview = create_html_preview(content) # Get unique ID entry_id = entry.get('id', link) all_entries.append({ 'title': title, 'link': link, 'content': content, 'preview': preview, 'author': author_name, 'pub_date': pub_date, 'feed_title': feed_title, 'id': entry_id }) # Sort by publication date (newest first) sorted_entries = sorted(all_entries, key=lambda x: x['pub_date'], reverse=True) print(f"Total entries after sorting: {len(sorted_entries)}", file=sys.stderr) return sorted_entries def format_pubdate(pubdate): # Format the date with short month (three-letter) return pubdate.strftime('%d %b %Y %H:%M:%S') def create_atom_feed(entries): feed = Atom1Feed( title="Atomic EEG", link="https://example.com/", # Placeholder link description="Aggregated Atom feeds", language="en", author_name="Feed Aggregator", feed_url="https://example.com/eeg.xml" # Placeholder feed URL ) for entry in entries: # Format the date with short month name formatted_date = format_pubdate(entry['pub_date']) feed.add_item( title=entry['title'], link=entry['link'], description=entry['preview'], # Use the preview as the main content author_name=entry['author'], pubdate=entry['pub_date'], unique_id=entry['id'], categories=[entry['feed_title']], # Use feed title as category for attribution # Add formatted date as extra field updateddate=entry['pub_date'], formatted_date=formatted_date ) return feed def main(): # Load feed URLs feed_urls = load_feed_urls('feed.json') # Load mapping mapping = load_mapping('mapping.json') # Fetch feed data print(f"Fetching {len(feed_urls)} feeds...", file=sys.stderr) feeds = [] for url in feed_urls: feed_data = get_feed_data(url, mapping) if feed_data: feeds.append(feed_data) # Extract and sort entries print("Processing entries...", file=sys.stderr) entries = extract_entries(feeds) print(f"Found {len(entries)} entries to include in feed", file=sys.stderr) # Create aggregated feed feed = create_atom_feed(entries) # Write to file with open('eeg.xml', 'w') as f: feed.write(f, 'utf-8') print(f"Feed successfully written to eeg.xml", file=sys.stderr) if __name__ == "__main__": main()