# /// script
# requires-python = ">=3.11"
# dependencies = [
# "feedparser",
# "feedgenerator",
# "requests",
# "beautifulsoup4",
# "urllib3",
# ]
# ///
# Do not delete the above as its needed for `uv run`
#!/usr/bin/env python3
import json
import feedparser
import datetime
from time import mktime
from feedgenerator import Atom1Feed
import requests
import sys
import os
import re
from html import unescape
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
def load_feed_urls(file_path):
with open(file_path, 'r') as f:
data = json.load(f)
return [item['url'] for item in data]
def load_mapping(file_path):
if not os.path.exists(file_path):
return {}
with open(file_path, 'r') as f:
return json.load(f)
def get_feed_data(url, mapping):
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
feed_data = feedparser.parse(response.content)
print(f"Fetched {url}: found {len(feed_data.entries)} entries", file=sys.stderr)
# Add mapping info to feed_data
if url in mapping:
feed_data.mapping = mapping[url]
else:
feed_data.mapping = None
return feed_data
except Exception as e:
print(f"Error fetching {url}: {e}", file=sys.stderr)
return None
def create_html_preview(html_content, max_length=800):
"""
Create a preview from HTML content, preserving links inline while stripping all other HTML tags
"""
if not html_content:
return ""
try:
# Parse HTML
soup = BeautifulSoup(html_content, 'html.parser')
# Copy all tags to preserve them
links = {}
for i, a_tag in enumerate(soup.find_all('a', href=True)):
# Create a unique placeholder for each link
placeholder = f"__LINK_{i}__"
links[placeholder] = {
'href': a_tag['href'],
'text': a_tag.get_text().strip()
}
# Replace the link with a placeholder
a_tag.replace_with(placeholder)
# Get text content with placeholders
text_content = soup.get_text(' ')
# Clean up whitespace
text_content = re.sub(r'\s+', ' ', text_content).strip()
# Truncate if needed
if len(text_content) > max_length:
text_content = text_content[:max_length]
# Check if we're cutting in the middle of a placeholder
for placeholder in links.keys():
pos = text_content.rfind(placeholder)
if pos > 0 and pos + len(placeholder) > len(text_content):
# We're cutting in the middle of a placeholder, cut before it
text_content = text_content[:pos]
# Find the last complete word
last_space = text_content.rfind(' ')
if last_space > max_length * 0.8: # Only trim at a space if we're not losing too much text
text_content = text_content[:last_space]
text_content += "..."
# Restore links
for placeholder, link in links.items():
if placeholder in text_content and link['text']:
link_html = f'{link["text"]}'
text_content = text_content.replace(placeholder, link_html)
return text_content
except Exception as e:
print(f"Error processing HTML preview: {e}", file=sys.stderr)
# Fallback to plain text with no links
plain_text = BeautifulSoup(html_content, 'html.parser').get_text(' ')
plain_text = re.sub(r'\s+', ' ', plain_text).strip()
if len(plain_text) > max_length:
plain_text = plain_text[:max_length] + "..."
return plain_text
def extract_entries(feeds):
all_entries = []
for feed_data in feeds:
if not feed_data or not hasattr(feed_data, 'entries'):
continue
# Get feed title and handle mapping
feed_title = feed_data.feed.get('title', 'Unknown Source')
author_name = 'Unknown'
if hasattr(feed_data, 'mapping') and feed_data.mapping:
author_name = feed_data.mapping.get('name', 'Unknown')
print(f"Processing feed: {feed_title} ({len(feed_data.entries)} entries)", file=sys.stderr)
for entry in feed_data.entries:
# Get publication date
pub_date = None
if hasattr(entry, 'published_parsed') and entry.published_parsed:
pub_date = datetime.datetime.fromtimestamp(mktime(entry.published_parsed))
elif hasattr(entry, 'updated_parsed') and entry.updated_parsed:
pub_date = datetime.datetime.fromtimestamp(mktime(entry.updated_parsed))
if not pub_date:
print(f"Skipping entry without date: {entry.get('title', 'Unknown')}", file=sys.stderr)
continue
# Get title
title = entry.get('title', 'No title')
# Get link
link = entry.get('link', '')
# Get full content from the feed entry
if hasattr(entry, 'content') and entry.content:
content = entry.content[0].value
else:
content = entry.get('summary', '')
# Create HTML preview that will be used as the content
preview = create_html_preview(content)
# Get unique ID
entry_id = entry.get('id', link)
all_entries.append({
'title': title,
'link': link,
'content': content, # Use the feed content directly
'preview': preview,
'author': author_name,
'pub_date': pub_date,
'feed_title': feed_title,
'id': entry_id
})
# Sort by publication date (newest first)
sorted_entries = sorted(all_entries, key=lambda x: x['pub_date'], reverse=True)
print(f"Total entries after sorting: {len(sorted_entries)}", file=sys.stderr)
return sorted_entries
def format_pubdate(pubdate):
# Format the date with short month (three-letter)
return pubdate.strftime('%d %b %Y %H:%M:%S')
def create_atom_feed(entries):
feed = Atom1Feed(
title="Atomic EEG",
link="https://example.com/", # Placeholder link
description="Aggregated Atom feeds",
language="en",
author_name="Feed Aggregator",
feed_url="https://example.com/eeg.xml" # Placeholder feed URL
)
for entry in entries:
# Format the date with short month name
formatted_date = format_pubdate(entry['pub_date'])
feed.add_item(
title=entry['title'],
link=entry['link'],
description=entry['preview'], # Use the preview as the main content
author_name=entry['author'],
pubdate=entry['pub_date'],
unique_id=entry['id'],
categories=[entry['feed_title']], # Use feed title as category for attribution
# Add formatted date as extra field
updateddate=entry['pub_date'],
formatted_date=formatted_date
)
return feed
# Functions from make_threads.py
def extract_links_from_html(html_content, base_url=None):
"""Extract and normalize links from HTML content"""
soup = BeautifulSoup(html_content, 'html.parser')
links = []
for a_tag in soup.find_all('a', href=True):
href = a_tag['href'].strip()
# Skip empty links, anchors, javascript, and mailto
if not href or href.startswith(('#', 'javascript:', 'mailto:')):
continue
# Convert relative URLs to absolute if we have a base URL
if base_url and not href.startswith(('http://', 'https://')):
href = urljoin(base_url, href)
links.append(href)
return links
def normalize_url(url):
"""Normalize URLs to consistently match them"""
if not url:
return ""
# Handle common URL shorteners or redirects (not implemented)
# Parse the URL
parsed = urlparse(url)
# Ensure scheme is consistent
scheme = parsed.scheme.lower() or 'http'
# Normalize netloc (lowercase, remove 'www.' prefix optionally)
netloc = parsed.netloc.lower()
if netloc.startswith('www.'):
netloc = netloc[4:]
# Remove trailing slashes and index.html/index.php
path = parsed.path.rstrip('/')
for index_file in ['/index.html', '/index.php', '/index.htm']:
if path.endswith(index_file):
path = path[:-len(index_file)]
# Remove common fragments and query parameters that don't affect content
# (like tracking params, utm_*, etc.)
query_parts = []
if parsed.query:
for param in parsed.query.split('&'):
if '=' in param:
key, value = param.split('=', 1)
if not key.startswith(('utm_', 'ref', 'source')):
query_parts.append(f"{key}={value}")
query = '&'.join(query_parts)
# Remove common hash fragments
fragment = ''
# Special case for common blogging platforms
# Medium, WordPress, Ghost, etc. may have specific URL patterns
# Reconstruct the URL
normalized = f"{scheme}://{netloc}{path}"
if query:
normalized += f"?{query}"
if fragment:
normalized += f"#{fragment}"
return normalized
def get_domain(url):
"""Extract domain from a URL"""
parsed = urlparse(url)
domain = parsed.netloc.lower()
# Remove 'www.' prefix if present
if domain.startswith('www.'):
domain = domain[4:]
return domain
def generate_threads(entries):
"""Generate thread data from the entries"""
print(f"Generating thread data from {len(entries)} entries...", file=sys.stderr)
entry_urls = {} # Maps normalized URLs to entry data
# First pass: collect all entries and their URLs
for entry in entries:
# Get link
link = entry['link']
if not link:
continue
# Normalize the entry URL to help with matching
normalized_link = normalize_url(link)
# Get the domain of the entry
entry_domain = get_domain(link)
# Use the feed content to extract links
content_to_extract = entry['content']
# Extract all links from content, using the entry link as base URL for resolving relative URLs
content_links = extract_links_from_html(content_to_extract, base_url=link)
entry_data = {
'title': entry['title'],
'link': link,
'normalized_link': normalized_link,
'domain': entry_domain,
'feed_title': entry['feed_title'],
'id': entry['id'],
'content_links': content_links,
'references': [], # Will be filled in the second pass
'referenced_by': [], # Will be filled in the second pass
'external_links': [] # Links to content outside the feed
}
entry_urls[normalized_link] = entry_data
print(f"Extracted links from all entries", file=sys.stderr)
# Second pass: analyze links between entries
for entry_id, entry_data in entry_urls.items():
# Keep track of references to avoid duplicates
reference_ids = set()
normalized_content_links = [normalize_url(link) for link in entry_data['content_links']]
for i, normalized_link in enumerate(normalized_content_links):
original_link = entry_data['content_links'][i] if i < len(entry_data['content_links']) else normalized_link
# Check if this is a link to another entry in the feed
if normalized_link in entry_urls and normalized_link != entry_data['normalized_link']:
referenced_entry = entry_urls[normalized_link]
# Avoid duplicate references
if referenced_entry['id'] in reference_ids:
continue
reference_ids.add(referenced_entry['id'])
# Add to the references of the current entry
entry_data['references'].append({
'id': referenced_entry['id'],
'link': referenced_entry['link'],
'title': referenced_entry['title'],
'feed_title': referenced_entry['feed_title'],
'in_feed': True # Mark as a reference to a post in the feed
})
# Add to the referenced_by of the referenced entry
# Check if this entry is already in referenced_by
already_referenced = any(ref['id'] == entry_data['id'] for ref in referenced_entry['referenced_by'])
if not already_referenced:
referenced_entry['referenced_by'].append({
'id': entry_data['id'],
'link': entry_data['link'],
'title': entry_data['title'],
'feed_title': entry_data['feed_title'],
'in_feed': True # Mark as a reference from a post in the feed
})
elif normalized_link != entry_data['normalized_link']:
# This is a link to something outside the feed
# Check if it's from the same domain as the entry
link_domain = get_domain(original_link)
# Only include external links from different domains
if link_domain != entry_data['domain']:
# Track as an external link if not already in the list
if not any(ext_link['url'] == original_link for ext_link in entry_data['external_links']):
external_link = {
'url': original_link,
'normalized_url': normalized_link,
'in_feed': False # Mark as external to the feed
}
entry_data['external_links'].append(external_link)
# Create the thread data structure
thread_data = {}
for _, entry_data in entry_urls.items():
thread_data[entry_data['id']] = {
'id': entry_data['id'],
'title': entry_data['title'],
'link': entry_data['link'],
'feed_title': entry_data['feed_title'],
'references': entry_data['references'],
'referenced_by': entry_data['referenced_by'],
'external_links': entry_data['external_links']
}
# Generate some statistics
entries_with_references = sum(1 for entry_data in entry_urls.values() if entry_data['references'])
entries_with_referenced_by = sum(1 for entry_data in entry_urls.values() if entry_data['referenced_by'])
entries_with_external_links = sum(1 for entry_data in entry_urls.values() if entry_data['external_links'])
total_internal_references = sum(len(entry_data['references']) for entry_data in entry_urls.values())
total_external_links = sum(len(entry_data['external_links']) for entry_data in entry_urls.values())
print(f"\nThread Analysis:", file=sys.stderr)
print(f"Total entries: {len(entry_urls)}", file=sys.stderr)
print(f"Entries that reference other entries in the feed: {entries_with_references}", file=sys.stderr)
print(f"Entries referenced by other entries in the feed: {entries_with_referenced_by}", file=sys.stderr)
print(f"Entries with external links: {entries_with_external_links}", file=sys.stderr)
print(f"Total internal references: {total_internal_references}", file=sys.stderr)
print(f"Total external links: {total_external_links}", file=sys.stderr)
return thread_data
def main():
# Load feed URLs
feed_urls = load_feed_urls('feed.json')
# Load mapping
mapping = load_mapping('mapping.json')
# Fetch feed data
print(f"Fetching {len(feed_urls)} feeds...", file=sys.stderr)
feeds = []
for url in feed_urls:
feed_data = get_feed_data(url, mapping)
if feed_data:
feeds.append(feed_data)
# Extract and sort entries
print("Processing entries...", file=sys.stderr)
entries = extract_entries(feeds)
print(f"Found {len(entries)} entries to include in feed", file=sys.stderr)
# Create aggregated feed
feed = create_atom_feed(entries)
# Write to file
with open('eeg.xml', 'w') as f:
feed.write(f, 'utf-8')
print(f"Feed successfully written to eeg.xml", file=sys.stderr)
# Generate thread data
thread_data = generate_threads(entries)
# Write the thread data to a JSON file
with open('threads.json', 'w') as f:
json.dump(thread_data, f, indent=2)
print(f"Thread data successfully written to threads.json", file=sys.stderr)
if __name__ == "__main__":
main()