A community based topic aggregation platform built on atproto
at main 2.1 kB view raw
1""" 2RSS feed fetcher with retry logic and error handling. 3""" 4import time 5import logging 6import requests 7import feedparser 8from typing import Optional 9 10logger = logging.getLogger(__name__) 11 12 13class RSSFetcher: 14 """Fetches RSS feeds with retry logic.""" 15 16 def __init__(self, timeout: int = 30, max_retries: int = 3): 17 """ 18 Initialize RSS fetcher. 19 20 Args: 21 timeout: Request timeout in seconds 22 max_retries: Maximum number of retry attempts 23 """ 24 self.timeout = timeout 25 self.max_retries = max_retries 26 27 def fetch_feed(self, url: str) -> feedparser.FeedParserDict: 28 """ 29 Fetch and parse an RSS feed. 30 31 Args: 32 url: RSS feed URL 33 34 Returns: 35 Parsed feed object 36 37 Raises: 38 ValueError: If URL is empty 39 requests.RequestException: If all retry attempts fail 40 """ 41 if not url: 42 raise ValueError("URL cannot be empty") 43 44 last_error = None 45 46 for attempt in range(self.max_retries): 47 try: 48 logger.info(f"Fetching feed from {url} (attempt {attempt + 1}/{self.max_retries})") 49 50 response = requests.get(url, timeout=self.timeout) 51 response.raise_for_status() 52 53 # Parse with feedparser 54 feed = feedparser.parse(response.content) 55 56 logger.info(f"Successfully fetched feed: {feed.feed.get('title', 'Unknown')}") 57 return feed 58 59 except requests.RequestException as e: 60 last_error = e 61 logger.warning(f"Fetch attempt {attempt + 1} failed: {e}") 62 63 if attempt < self.max_retries - 1: 64 # Exponential backoff 65 sleep_time = 2 ** attempt 66 logger.info(f"Retrying in {sleep_time} seconds...") 67 time.sleep(sleep_time) 68 69 # All retries exhausted 70 logger.error(f"Failed to fetch feed after {self.max_retries} attempts") 71 raise last_error