Manage Atom feeds in a persistent git repository

trimmer JSON

Changed files
+55 -30
src
+1 -1
src/thicket/cli/utils.py
···
"""Save thicket configuration to file."""
import yaml
-
config_data = config.model_dump(mode="json")
# Convert Path objects to strings for YAML serialization
config_data["git_store"] = str(config_data["git_store"])
···
"""Save thicket configuration to file."""
import yaml
+
config_data = config.model_dump(mode="json", exclude_none=True)
# Convert Path objects to strings for YAML serialization
config_data["git_store"] = str(config_data["git_store"])
+3 -3
src/thicket/core/git_store.py
···
"""Save the index to index.json."""
index_path = self.repo_path / "index.json"
with open(index_path, "w") as f:
-
json.dump(index.model_dump(mode="json"), f, indent=2, default=str)
def _load_index(self) -> GitStoreIndex:
"""Load the index from index.json."""
···
"""Save duplicates map to duplicates.json."""
duplicates_path = self.repo_path / "duplicates.json"
with open(duplicates_path, "w") as f:
-
json.dump(duplicates.model_dump(), f, indent=2)
def _load_duplicates(self) -> DuplicateMap:
"""Load duplicates map from duplicates.json."""
···
# Save entry
with open(entry_path, "w") as f:
-
json.dump(entry.model_dump(mode="json"), f, indent=2, default=str)
# Update user metadata if new entry
if not entry_exists:
···
"""Save the index to index.json."""
index_path = self.repo_path / "index.json"
with open(index_path, "w") as f:
+
json.dump(index.model_dump(mode="json", exclude_none=True), f, indent=2, default=str)
def _load_index(self) -> GitStoreIndex:
"""Load the index from index.json."""
···
"""Save duplicates map to duplicates.json."""
duplicates_path = self.repo_path / "duplicates.json"
with open(duplicates_path, "w") as f:
+
json.dump(duplicates.model_dump(exclude_none=True), f, indent=2)
def _load_duplicates(self) -> DuplicateMap:
"""Load duplicates map from duplicates.json."""
···
# Save entry
with open(entry_path, "w") as f:
+
json.dump(entry.model_dump(mode="json", exclude_none=True), f, indent=2, default=str)
# Update user metadata if new entry
if not entry_exists:
+51 -26
src/thicket/core/reference_parser.py
···
class BlogReference:
"""Represents a reference from one blog entry to another."""
-
def __init__(self, source_entry_id: str, source_username: str,
-
target_url: str, target_username: Optional[str] = None,
-
target_entry_id: Optional[str] = None):
self.source_entry_id = source_entry_id
self.source_username = source_username
self.target_url = target_url
···
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
-
return {
"source_entry_id": self.source_entry_id,
"source_username": self.source_username,
"target_url": self.target_url,
-
"target_username": self.target_username,
-
"target_entry_id": self.target_entry_id
}
@classmethod
def from_dict(cls, data: dict) -> "BlogReference":
"""Create from dictionary."""
···
source_username=data["source_username"],
target_url=data["target_url"],
target_username=data.get("target_username"),
-
target_entry_id=data.get("target_entry_id")
)
···
def __init__(self):
self.references: list[BlogReference] = []
-
self.outbound_refs: dict[str, list[BlogReference]] = {} # entry_id -> outbound refs
-
self.inbound_refs: dict[str, list[BlogReference]] = {} # entry_id -> inbound refs
self.user_domains: dict[str, set[str]] = {} # username -> set of domains
def add_reference(self, ref: BlogReference) -> None:
···
"""Convert to dictionary for JSON serialization."""
return {
"references": [ref.to_dict() for ref in self.references],
-
"user_domains": {k: list(v) for k, v in self.user_domains.items()}
}
@classmethod
···
def __init__(self):
# Common blog platforms and patterns
self.blog_patterns = [
-
r'https?://[^/]+\.(?:org|com|net|io|dev|me|co\.uk)/.*', # Common blog domains
-
r'https?://[^/]+\.github\.io/.*', # GitHub Pages
-
r'https?://[^/]+\.substack\.com/.*', # Substack
-
r'https?://medium\.com/.*', # Medium
-
r'https?://[^/]+\.wordpress\.com/.*', # WordPress.com
-
r'https?://[^/]+\.blogspot\.com/.*', # Blogger
]
# Compile regex patterns
-
self.link_pattern = re.compile(r'<a[^>]+href="([^"]+)"[^>]*>(.*?)</a>', re.IGNORECASE | re.DOTALL)
self.url_pattern = re.compile(r'https?://[^\s<>"]+')
def extract_links_from_html(self, html_content: str) -> list[tuple[str, str]]:
···
# Extract links from <a> tags
for match in self.link_pattern.finditer(html_content):
url = match.group(1)
-
text = re.sub(r'<[^>]+>', '', match.group(2)).strip() # Remove HTML tags from link text
links.append((url, text))
return links
···
return True
return False
-
-
def resolve_target_user(self, url: str, user_domains: dict[str, set[str]]) -> Optional[str]:
"""Try to resolve a URL to a known user based on domain mapping."""
parsed_url = urlparse(url)
domain = parsed_url.netloc.lower()
···
return None
-
def extract_references(self, entry: AtomEntry, username: str,
-
user_domains: dict[str, set[str]]) -> list[BlogReference]:
"""Extract all blog references from an entry."""
references = []
···
for url, _link_text in links:
# Skip internal links (same domain as the entry)
-
entry_domain = urlparse(str(entry.link)).netloc.lower() if entry.link else ""
link_domain = urlparse(url).netloc.lower()
if link_domain == entry_domain:
···
source_username=username,
target_url=url,
target_username=target_username,
-
target_entry_id=None # Will be resolved later if possible
)
references.append(ref)
···
return user_domains
-
def resolve_target_entry_ids(self, references: list[BlogReference], git_store: "GitStore") -> list[BlogReference]:
"""Resolve target_entry_id for references that have target_username but no target_entry_id."""
resolved_refs = []
···
source_username=ref.source_username,
target_url=ref.target_url,
target_username=ref.target_username,
-
target_entry_id=resolved_entry_id
)
resolved_refs.append(resolved_ref)
···
class BlogReference:
"""Represents a reference from one blog entry to another."""
+
def __init__(
+
self,
+
source_entry_id: str,
+
source_username: str,
+
target_url: str,
+
target_username: Optional[str] = None,
+
target_entry_id: Optional[str] = None,
+
):
self.source_entry_id = source_entry_id
self.source_username = source_username
self.target_url = target_url
···
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
+
result = {
"source_entry_id": self.source_entry_id,
"source_username": self.source_username,
"target_url": self.target_url,
}
+
# Only include optional fields if they are not None
+
if self.target_username is not None:
+
result["target_username"] = self.target_username
+
if self.target_entry_id is not None:
+
result["target_entry_id"] = self.target_entry_id
+
+
return result
+
@classmethod
def from_dict(cls, data: dict) -> "BlogReference":
"""Create from dictionary."""
···
source_username=data["source_username"],
target_url=data["target_url"],
target_username=data.get("target_username"),
+
target_entry_id=data.get("target_entry_id"),
)
···
def __init__(self):
self.references: list[BlogReference] = []
+
self.outbound_refs: dict[
+
str, list[BlogReference]
+
] = {} # entry_id -> outbound refs
+
self.inbound_refs: dict[
+
str, list[BlogReference]
+
] = {} # entry_id -> inbound refs
self.user_domains: dict[str, set[str]] = {} # username -> set of domains
def add_reference(self, ref: BlogReference) -> None:
···
"""Convert to dictionary for JSON serialization."""
return {
"references": [ref.to_dict() for ref in self.references],
+
"user_domains": {k: list(v) for k, v in self.user_domains.items()},
}
@classmethod
···
def __init__(self):
# Common blog platforms and patterns
self.blog_patterns = [
+
r"https?://[^/]+\.(?:org|com|net|io|dev|me|co\.uk)/.*", # Common blog domains
+
r"https?://[^/]+\.github\.io/.*", # GitHub Pages
+
r"https?://[^/]+\.substack\.com/.*", # Substack
+
r"https?://medium\.com/.*", # Medium
+
r"https?://[^/]+\.wordpress\.com/.*", # WordPress.com
+
r"https?://[^/]+\.blogspot\.com/.*", # Blogger
]
# Compile regex patterns
+
self.link_pattern = re.compile(
+
r'<a[^>]+href="([^"]+)"[^>]*>(.*?)</a>', re.IGNORECASE | re.DOTALL
+
)
self.url_pattern = re.compile(r'https?://[^\s<>"]+')
def extract_links_from_html(self, html_content: str) -> list[tuple[str, str]]:
···
# Extract links from <a> tags
for match in self.link_pattern.finditer(html_content):
url = match.group(1)
+
text = re.sub(
+
r"<[^>]+>", "", match.group(2)
+
).strip() # Remove HTML tags from link text
links.append((url, text))
return links
···
return True
return False
+
def resolve_target_user(
+
self, url: str, user_domains: dict[str, set[str]]
+
) -> Optional[str]:
"""Try to resolve a URL to a known user based on domain mapping."""
parsed_url = urlparse(url)
domain = parsed_url.netloc.lower()
···
return None
+
def extract_references(
+
self, entry: AtomEntry, username: str, user_domains: dict[str, set[str]]
+
) -> list[BlogReference]:
"""Extract all blog references from an entry."""
references = []
···
for url, _link_text in links:
# Skip internal links (same domain as the entry)
+
entry_domain = (
+
urlparse(str(entry.link)).netloc.lower() if entry.link else ""
+
)
link_domain = urlparse(url).netloc.lower()
if link_domain == entry_domain:
···
source_username=username,
target_url=url,
target_username=target_username,
+
target_entry_id=None, # Will be resolved later if possible
)
references.append(ref)
···
return user_domains
+
def resolve_target_entry_ids(
+
self, references: list[BlogReference], git_store: "GitStore"
+
) -> list[BlogReference]:
"""Resolve target_entry_id for references that have target_username but no target_entry_id."""
resolved_refs = []
···
source_username=ref.source_username,
target_url=ref.target_url,
target_username=ref.target_username,
+
target_entry_id=resolved_entry_id,
)
resolved_refs.append(resolved_ref)