# /// script
# requires-python = ">=3.11"
# dependencies = [
# "feedparser",
# "feedgenerator",
# "requests",
# "beautifulsoup4",
# ]
# ///
# Do not delete the above as its needed for `uv run`
#!/usr/bin/env python3
import json
import feedparser
import datetime
from time import mktime
from feedgenerator import Atom1Feed
import requests
import sys
import os
import re
from html import unescape
from bs4 import BeautifulSoup
def load_feed_urls(file_path):
with open(file_path, 'r') as f:
data = json.load(f)
return [item['url'] for item in data]
def load_mapping(file_path):
if not os.path.exists(file_path):
return {}
with open(file_path, 'r') as f:
return json.load(f)
def get_feed_data(url, mapping):
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
feed_data = feedparser.parse(response.content)
print(f"Fetched {url}: found {len(feed_data.entries)} entries", file=sys.stderr)
# Add mapping info to feed_data
if url in mapping:
feed_data.mapping = mapping[url]
else:
feed_data.mapping = None
return feed_data
except Exception as e:
print(f"Error fetching {url}: {e}", file=sys.stderr)
return None
def create_html_preview(html_content, max_length=800):
"""
Create a preview from HTML content, preserving links inline while stripping all other HTML tags
"""
if not html_content:
return ""
try:
# Parse HTML
soup = BeautifulSoup(html_content, 'html.parser')
# Copy all tags to preserve them
links = {}
for i, a_tag in enumerate(soup.find_all('a', href=True)):
# Create a unique placeholder for each link
placeholder = f"__LINK_{i}__"
links[placeholder] = {
'href': a_tag['href'],
'text': a_tag.get_text().strip()
}
# Replace the link with a placeholder
a_tag.replace_with(placeholder)
# Get text content with placeholders
text_content = soup.get_text(' ')
# Clean up whitespace
text_content = re.sub(r'\s+', ' ', text_content).strip()
# Truncate if needed
if len(text_content) > max_length:
text_content = text_content[:max_length]
# Check if we're cutting in the middle of a placeholder
for placeholder in links.keys():
pos = text_content.rfind(placeholder)
if pos > 0 and pos + len(placeholder) > len(text_content):
# We're cutting in the middle of a placeholder, cut before it
text_content = text_content[:pos]
# Find the last complete word
last_space = text_content.rfind(' ')
if last_space > max_length * 0.8: # Only trim at a space if we're not losing too much text
text_content = text_content[:last_space]
text_content += "..."
# Restore links
for placeholder, link in links.items():
if placeholder in text_content and link['text']:
link_html = f'{link["text"]}'
text_content = text_content.replace(placeholder, link_html)
return text_content
except Exception as e:
print(f"Error processing HTML preview: {e}", file=sys.stderr)
# Fallback to plain text with no links
plain_text = BeautifulSoup(html_content, 'html.parser').get_text(' ')
plain_text = re.sub(r'\s+', ' ', plain_text).strip()
if len(plain_text) > max_length:
plain_text = plain_text[:max_length] + "..."
return plain_text
def extract_entries(feeds):
all_entries = []
for feed_data in feeds:
if not feed_data or not hasattr(feed_data, 'entries'):
continue
# Get feed title and handle mapping
feed_title = feed_data.feed.get('title', 'Unknown Source')
author_name = 'Unknown'
if hasattr(feed_data, 'mapping') and feed_data.mapping:
author_name = feed_data.mapping.get('name', 'Unknown')
print(f"Processing feed: {feed_title} ({len(feed_data.entries)} entries)", file=sys.stderr)
for entry in feed_data.entries:
# Get publication date
pub_date = None
if hasattr(entry, 'published_parsed') and entry.published_parsed:
pub_date = datetime.datetime.fromtimestamp(mktime(entry.published_parsed))
elif hasattr(entry, 'updated_parsed') and entry.updated_parsed:
pub_date = datetime.datetime.fromtimestamp(mktime(entry.updated_parsed))
if not pub_date:
print(f"Skipping entry without date: {entry.get('title', 'Unknown')}", file=sys.stderr)
continue
# Get title
title = entry.get('title', 'No title')
# Get link
link = entry.get('link', '')
# Get description/content
if hasattr(entry, 'content') and entry.content:
content = entry.content[0].value
else:
content = entry.get('summary', '')
# Create HTML preview that will be used as the content
preview = create_html_preview(content)
# Get unique ID
entry_id = entry.get('id', link)
all_entries.append({
'title': title,
'link': link,
'content': content,
'preview': preview,
'author': author_name,
'pub_date': pub_date,
'feed_title': feed_title,
'id': entry_id
})
# Sort by publication date (newest first)
sorted_entries = sorted(all_entries, key=lambda x: x['pub_date'], reverse=True)
print(f"Total entries after sorting: {len(sorted_entries)}", file=sys.stderr)
return sorted_entries
def format_pubdate(pubdate):
# Format the date with short month (three-letter)
return pubdate.strftime('%d %b %Y %H:%M:%S')
def create_atom_feed(entries):
feed = Atom1Feed(
title="Atomic EEG",
link="https://example.com/", # Placeholder link
description="Aggregated Atom feeds",
language="en",
author_name="Feed Aggregator",
feed_url="https://example.com/eeg.xml" # Placeholder feed URL
)
for entry in entries:
# Format the date with short month name
formatted_date = format_pubdate(entry['pub_date'])
feed.add_item(
title=entry['title'],
link=entry['link'],
description=entry['preview'], # Use the preview as the main content
author_name=entry['author'],
pubdate=entry['pub_date'],
unique_id=entry['id'],
categories=[entry['feed_title']], # Use feed title as category for attribution
# Add formatted date as extra field
updateddate=entry['pub_date'],
formatted_date=formatted_date
)
return feed
def main():
# Load feed URLs
feed_urls = load_feed_urls('feed.json')
# Load mapping
mapping = load_mapping('mapping.json')
# Fetch feed data
print(f"Fetching {len(feed_urls)} feeds...", file=sys.stderr)
feeds = []
for url in feed_urls:
feed_data = get_feed_data(url, mapping)
if feed_data:
feeds.append(feed_data)
# Extract and sort entries
print("Processing entries...", file=sys.stderr)
entries = extract_entries(feeds)
print(f"Found {len(entries)} entries to include in feed", file=sys.stderr)
# Create aggregated feed
feed = create_atom_feed(entries)
# Write to file
with open('eeg.xml', 'w') as f:
feed.write(f, 'utf-8')
print(f"Feed successfully written to eeg.xml", file=sys.stderr)
if __name__ == "__main__":
main()