# /// script # requires-python = ">=3.11" # dependencies = [ # "feedparser", # "feedgenerator", # "requests", # "beautifulsoup4", # "urllib3", # ] # /// # Do not delete the above as its needed for `uv run` #!/usr/bin/env python3 import json import feedparser import datetime from time import mktime from feedgenerator import Atom1Feed import requests import sys import os import re from html import unescape from bs4 import BeautifulSoup from urllib.parse import urlparse, urljoin def load_feed_urls(file_path): with open(file_path, 'r') as f: data = json.load(f) return [item['url'] for item in data] def load_mapping(file_path): if not os.path.exists(file_path): return {} with open(file_path, 'r') as f: return json.load(f) def get_feed_data(url, mapping): try: response = requests.get(url, timeout=30) response.raise_for_status() feed_data = feedparser.parse(response.content) print(f"Fetched {url}: found {len(feed_data.entries)} entries", file=sys.stderr) # Add mapping info to feed_data if url in mapping: feed_data.mapping = mapping[url] else: feed_data.mapping = None return feed_data except Exception as e: print(f"Error fetching {url}: {e}", file=sys.stderr) return None def create_html_preview(html_content, max_length=800): """ Create a preview from HTML content, preserving links inline while stripping all other HTML tags """ if not html_content: return "" try: # Parse HTML soup = BeautifulSoup(html_content, 'html.parser') # Copy all tags to preserve them links = {} for i, a_tag in enumerate(soup.find_all('a', href=True)): # Create a unique placeholder for each link placeholder = f"__LINK_{i}__" links[placeholder] = { 'href': a_tag['href'], 'text': a_tag.get_text().strip() } # Replace the link with a placeholder a_tag.replace_with(placeholder) # Get text content with placeholders text_content = soup.get_text(' ') # Clean up whitespace text_content = re.sub(r'\s+', ' ', text_content).strip() # Truncate if needed if len(text_content) > max_length: text_content = text_content[:max_length] # Check if we're cutting in the middle of a placeholder for placeholder in links.keys(): pos = text_content.rfind(placeholder) if pos > 0 and pos + len(placeholder) > len(text_content): # We're cutting in the middle of a placeholder, cut before it text_content = text_content[:pos] # Find the last complete word last_space = text_content.rfind(' ') if last_space > max_length * 0.8: # Only trim at a space if we're not losing too much text text_content = text_content[:last_space] text_content += "..." # Restore links for placeholder, link in links.items(): if placeholder in text_content and link['text']: link_html = f'{link["text"]}' text_content = text_content.replace(placeholder, link_html) return text_content except Exception as e: print(f"Error processing HTML preview: {e}", file=sys.stderr) # Fallback to plain text with no links plain_text = BeautifulSoup(html_content, 'html.parser').get_text(' ') plain_text = re.sub(r'\s+', ' ', plain_text).strip() if len(plain_text) > max_length: plain_text = plain_text[:max_length] + "..." return plain_text def extract_entries(feeds): all_entries = [] for feed_data in feeds: if not feed_data or not hasattr(feed_data, 'entries'): continue # Get feed title and handle mapping feed_title = feed_data.feed.get('title', 'Unknown Source') author_name = 'Unknown' if hasattr(feed_data, 'mapping') and feed_data.mapping: author_name = feed_data.mapping.get('name', 'Unknown') print(f"Processing feed: {feed_title} ({len(feed_data.entries)} entries)", file=sys.stderr) for entry in feed_data.entries: # Get publication date pub_date = None if hasattr(entry, 'published_parsed') and entry.published_parsed: pub_date = datetime.datetime.fromtimestamp(mktime(entry.published_parsed)) elif hasattr(entry, 'updated_parsed') and entry.updated_parsed: pub_date = datetime.datetime.fromtimestamp(mktime(entry.updated_parsed)) if not pub_date: print(f"Skipping entry without date: {entry.get('title', 'Unknown')}", file=sys.stderr) continue # Get title title = entry.get('title', 'No title') # Get link link = entry.get('link', '') # Get full content from the feed entry if hasattr(entry, 'content') and entry.content: content = entry.content[0].value else: content = entry.get('summary', '') # Create HTML preview that will be used as the content preview = create_html_preview(content) # Get unique ID entry_id = entry.get('id', link) all_entries.append({ 'title': title, 'link': link, 'content': content, # Use the feed content directly 'preview': preview, 'author': author_name, 'pub_date': pub_date, 'feed_title': feed_title, 'id': entry_id }) # Sort by publication date (newest first) sorted_entries = sorted(all_entries, key=lambda x: x['pub_date'], reverse=True) print(f"Total entries after sorting: {len(sorted_entries)}", file=sys.stderr) return sorted_entries def format_pubdate(pubdate): # Format the date with short month (three-letter) return pubdate.strftime('%d %b %Y %H:%M:%S') def create_atom_feed(entries): feed = Atom1Feed( title="Atomic EEG", link="https://example.com/", # Placeholder link description="Aggregated Atom feeds", language="en", author_name="Feed Aggregator", feed_url="https://example.com/eeg.xml" # Placeholder feed URL ) for entry in entries: # Format the date with short month name formatted_date = format_pubdate(entry['pub_date']) feed.add_item( title=entry['title'], link=entry['link'], description=entry['preview'], # Use the preview as the main content author_name=entry['author'], pubdate=entry['pub_date'], unique_id=entry['id'], categories=[entry['feed_title']], # Use feed title as category for attribution # Add formatted date as extra field updateddate=entry['pub_date'], formatted_date=formatted_date ) return feed # Functions from make_threads.py def extract_links_from_html(html_content, base_url=None): """Extract and normalize links from HTML content""" soup = BeautifulSoup(html_content, 'html.parser') links = [] for a_tag in soup.find_all('a', href=True): href = a_tag['href'].strip() # Skip empty links, anchors, javascript, and mailto if not href or href.startswith(('#', 'javascript:', 'mailto:')): continue # Convert relative URLs to absolute if we have a base URL if base_url and not href.startswith(('http://', 'https://')): href = urljoin(base_url, href) links.append(href) return links def normalize_url(url): """Normalize URLs to consistently match them""" if not url: return "" # Handle common URL shorteners or redirects (not implemented) # Parse the URL parsed = urlparse(url) # Ensure scheme is consistent scheme = parsed.scheme.lower() or 'http' # Normalize netloc (lowercase, remove 'www.' prefix optionally) netloc = parsed.netloc.lower() if netloc.startswith('www.'): netloc = netloc[4:] # Remove trailing slashes and index.html/index.php path = parsed.path.rstrip('/') for index_file in ['/index.html', '/index.php', '/index.htm']: if path.endswith(index_file): path = path[:-len(index_file)] # Remove common fragments and query parameters that don't affect content # (like tracking params, utm_*, etc.) query_parts = [] if parsed.query: for param in parsed.query.split('&'): if '=' in param: key, value = param.split('=', 1) if not key.startswith(('utm_', 'ref', 'source')): query_parts.append(f"{key}={value}") query = '&'.join(query_parts) # Remove common hash fragments fragment = '' # Special case for common blogging platforms # Medium, WordPress, Ghost, etc. may have specific URL patterns # Reconstruct the URL normalized = f"{scheme}://{netloc}{path}" if query: normalized += f"?{query}" if fragment: normalized += f"#{fragment}" return normalized def get_domain(url): """Extract domain from a URL""" parsed = urlparse(url) domain = parsed.netloc.lower() # Remove 'www.' prefix if present if domain.startswith('www.'): domain = domain[4:] return domain def generate_threads(entries): """Generate thread data from the entries""" print(f"Generating thread data from {len(entries)} entries...", file=sys.stderr) entry_urls = {} # Maps normalized URLs to entry data # First pass: collect all entries and their URLs for entry in entries: # Get link link = entry['link'] if not link: continue # Normalize the entry URL to help with matching normalized_link = normalize_url(link) # Get the domain of the entry entry_domain = get_domain(link) # Use the feed content to extract links content_to_extract = entry['content'] # Extract all links from content, using the entry link as base URL for resolving relative URLs content_links = extract_links_from_html(content_to_extract, base_url=link) entry_data = { 'title': entry['title'], 'link': link, 'normalized_link': normalized_link, 'domain': entry_domain, 'feed_title': entry['feed_title'], 'id': entry['id'], 'content_links': content_links, 'references': [], # Will be filled in the second pass 'referenced_by': [], # Will be filled in the second pass 'external_links': [] # Links to content outside the feed } entry_urls[normalized_link] = entry_data print(f"Extracted links from all entries", file=sys.stderr) # Second pass: analyze links between entries for entry_id, entry_data in entry_urls.items(): # Keep track of references to avoid duplicates reference_ids = set() normalized_content_links = [normalize_url(link) for link in entry_data['content_links']] for i, normalized_link in enumerate(normalized_content_links): original_link = entry_data['content_links'][i] if i < len(entry_data['content_links']) else normalized_link # Check if this is a link to another entry in the feed if normalized_link in entry_urls and normalized_link != entry_data['normalized_link']: referenced_entry = entry_urls[normalized_link] # Avoid duplicate references if referenced_entry['id'] in reference_ids: continue reference_ids.add(referenced_entry['id']) # Add to the references of the current entry entry_data['references'].append({ 'id': referenced_entry['id'], 'link': referenced_entry['link'], 'title': referenced_entry['title'], 'feed_title': referenced_entry['feed_title'], 'in_feed': True # Mark as a reference to a post in the feed }) # Add to the referenced_by of the referenced entry # Check if this entry is already in referenced_by already_referenced = any(ref['id'] == entry_data['id'] for ref in referenced_entry['referenced_by']) if not already_referenced: referenced_entry['referenced_by'].append({ 'id': entry_data['id'], 'link': entry_data['link'], 'title': entry_data['title'], 'feed_title': entry_data['feed_title'], 'in_feed': True # Mark as a reference from a post in the feed }) elif normalized_link != entry_data['normalized_link']: # This is a link to something outside the feed # Check if it's from the same domain as the entry link_domain = get_domain(original_link) # Only include external links from different domains if link_domain != entry_data['domain']: # Track as an external link if not already in the list if not any(ext_link['url'] == original_link for ext_link in entry_data['external_links']): external_link = { 'url': original_link, 'normalized_url': normalized_link, 'in_feed': False # Mark as external to the feed } entry_data['external_links'].append(external_link) # Create the thread data structure thread_data = {} for _, entry_data in entry_urls.items(): thread_data[entry_data['id']] = { 'id': entry_data['id'], 'title': entry_data['title'], 'link': entry_data['link'], 'feed_title': entry_data['feed_title'], 'references': entry_data['references'], 'referenced_by': entry_data['referenced_by'], 'external_links': entry_data['external_links'] } # Generate some statistics entries_with_references = sum(1 for entry_data in entry_urls.values() if entry_data['references']) entries_with_referenced_by = sum(1 for entry_data in entry_urls.values() if entry_data['referenced_by']) entries_with_external_links = sum(1 for entry_data in entry_urls.values() if entry_data['external_links']) total_internal_references = sum(len(entry_data['references']) for entry_data in entry_urls.values()) total_external_links = sum(len(entry_data['external_links']) for entry_data in entry_urls.values()) print(f"\nThread Analysis:", file=sys.stderr) print(f"Total entries: {len(entry_urls)}", file=sys.stderr) print(f"Entries that reference other entries in the feed: {entries_with_references}", file=sys.stderr) print(f"Entries referenced by other entries in the feed: {entries_with_referenced_by}", file=sys.stderr) print(f"Entries with external links: {entries_with_external_links}", file=sys.stderr) print(f"Total internal references: {total_internal_references}", file=sys.stderr) print(f"Total external links: {total_external_links}", file=sys.stderr) return thread_data def main(): # Load feed URLs feed_urls = load_feed_urls('feed.json') # Load mapping mapping = load_mapping('mapping.json') # Fetch feed data print(f"Fetching {len(feed_urls)} feeds...", file=sys.stderr) feeds = [] for url in feed_urls: feed_data = get_feed_data(url, mapping) if feed_data: feeds.append(feed_data) # Extract and sort entries print("Processing entries...", file=sys.stderr) entries = extract_entries(feeds) print(f"Found {len(entries)} entries to include in feed", file=sys.stderr) # Create aggregated feed feed = create_atom_feed(entries) # Write to file with open('eeg.xml', 'w') as f: feed.write(f, 'utf-8') print(f"Feed successfully written to eeg.xml", file=sys.stderr) # Generate thread data thread_data = generate_threads(entries) # Write the thread data to a JSON file with open('threads.json', 'w') as f: json.dump(thread_data, f, indent=2) print(f"Thread data successfully written to threads.json", file=sys.stderr) if __name__ == "__main__": main()