A community based topic aggregation platform built on atproto
1"""
2RSS feed fetcher with retry logic and error handling.
3"""
4import time
5import logging
6import requests
7import feedparser
8from typing import Optional
9
10logger = logging.getLogger(__name__)
11
12
13class RSSFetcher:
14 """Fetches RSS feeds with retry logic."""
15
16 def __init__(self, timeout: int = 30, max_retries: int = 3):
17 """
18 Initialize RSS fetcher.
19
20 Args:
21 timeout: Request timeout in seconds
22 max_retries: Maximum number of retry attempts
23 """
24 self.timeout = timeout
25 self.max_retries = max_retries
26
27 def fetch_feed(self, url: str) -> feedparser.FeedParserDict:
28 """
29 Fetch and parse an RSS feed.
30
31 Args:
32 url: RSS feed URL
33
34 Returns:
35 Parsed feed object
36
37 Raises:
38 ValueError: If URL is empty
39 requests.RequestException: If all retry attempts fail
40 """
41 if not url:
42 raise ValueError("URL cannot be empty")
43
44 last_error = None
45
46 for attempt in range(self.max_retries):
47 try:
48 logger.info(f"Fetching feed from {url} (attempt {attempt + 1}/{self.max_retries})")
49
50 response = requests.get(url, timeout=self.timeout)
51 response.raise_for_status()
52
53 # Parse with feedparser
54 feed = feedparser.parse(response.content)
55
56 logger.info(f"Successfully fetched feed: {feed.feed.get('title', 'Unknown')}")
57 return feed
58
59 except requests.RequestException as e:
60 last_error = e
61 logger.warning(f"Fetch attempt {attempt + 1} failed: {e}")
62
63 if attempt < self.max_retries - 1:
64 # Exponential backoff
65 sleep_time = 2 ** attempt
66 logger.info(f"Retrying in {sleep_time} seconds...")
67 time.sleep(sleep_time)
68
69 # All retries exhausted
70 logger.error(f"Failed to fetch feed after {self.max_retries} attempts")
71 raise last_error