Manage Atom feeds in a persistent git repository

Compare changes

Choose any two refs to compare.

+26 -25
ARCH.md
···
git-store/
├── index.json # User directory index
├── duplicates.json # Manual curation of duplicate entries
-
├── links.json # All outbound links categorized by type
-
├── references.json # Cross-reference index for threading
+
├── links.json # Unified links, references, and mapping data
├── user1/
│ ├── entry_id_1.json # Sanitized entry files
│ ├── entry_id_2.json
···
### Data Structures
-
#### links.json Format
+
#### links.json Format (Unified Structure)
```json
{
-
"links": [
-
{
-
"url": "https://example.com/post/123",
-
"entry_id": "https://blog.user.com/entry/456",
-
"username": "user1",
-
"context": "As mentioned in this post...",
-
"category": "user",
+
"links": {
+
"https://example.com/post/123": {
+
"referencing_entries": ["https://blog.user.com/entry/456"],
"target_username": "user2"
+
},
+
"https://external-site.com/article": {
+
"referencing_entries": ["https://blog.user.com/entry/789"]
}
-
],
-
"categories": {
-
"internal": 1234,
-
"user": 456,
-
"unknown": 7890
},
-
"user_domains": {
-
"user1": ["blog.user.com", "user.com"],
-
"user2": ["example.com"]
-
}
-
}
-
```
-
-
#### references.json Format
-
```json
-
{
+
"reverse_mapping": {
+
"https://blog.user.com/entry/456": ["https://example.com/post/123"],
+
"https://blog.user.com/entry/789": ["https://external-site.com/article"]
+
},
"references": [
{
"source_entry_id": "https://blog.user.com/entry/456",
···
}
}
```
+
+
This unified structure eliminates duplication by:
+
- Storing each URL only once with minimal metadata
+
- Including all link data, reference data, and mappings in one file
+
- Using presence of `target_username` to identify tracked vs external links
+
- Providing bidirectional mappings for efficient queries
+
+
### Unified Structure Benefits
+
+
- **Eliminates Duplication**: Each URL appears only once with metadata
+
- **Single Source of Truth**: All link-related data in one file
+
- **Efficient Queries**: Fast lookups for both directions (URL→entries, entry→URLs)
+
- **Atomic Updates**: All link data changes together
+
- **Reduced I/O**: Fewer file operations
### Implementation Benefits
+6 -5
pyproject.toml
···
"bleach>=6.0.0",
"platformdirs>=4.0.0",
"pyyaml>=6.0.0",
-
"email_validator"
+
"email_validator",
]
[project.optional-dependencies]
···
"-ra",
"--strict-markers",
"--strict-config",
-
"--cov=src/thicket",
-
"--cov-report=term-missing",
-
"--cov-report=html",
-
"--cov-report=xml",
]
filterwarnings = [
"error",
···
"class .*\\bProtocol\\):",
"@(abc\\.)?abstractmethod",
]
+
+
[dependency-groups]
+
dev = [
+
"pytest>=8.4.1",
+
]
+2 -2
src/thicket/cli/commands/__init__.py
···
"""CLI commands for thicket."""
# Import all commands to register them with the main app
-
from . import add, duplicates, index_cmd, info_cmd, init, links_cmd, list_cmd, sync
+
from . import add, duplicates, info_cmd, init, list_cmd, sync
-
__all__ = ["add", "duplicates", "index_cmd", "info_cmd", "init", "links_cmd", "list_cmd", "sync"]
+
__all__ = ["add", "duplicates", "info_cmd", "init", "list_cmd", "sync"]
+44 -9
src/thicket/cli/commands/add.py
···
def add_command(
subcommand: str = typer.Argument(..., help="Subcommand: 'user' or 'feed'"),
username: str = typer.Argument(..., help="Username"),
-
feed_url: Optional[str] = typer.Argument(None, help="Feed URL (required for 'user' command)"),
+
feed_url: Optional[str] = typer.Argument(
+
None, help="Feed URL (required for 'user' command)"
+
),
email: Optional[str] = typer.Option(None, "--email", "-e", help="User email"),
-
homepage: Optional[str] = typer.Option(None, "--homepage", "-h", help="User homepage"),
+
homepage: Optional[str] = typer.Option(
+
None, "--homepage", "-h", help="User homepage"
+
),
icon: Optional[str] = typer.Option(None, "--icon", "-i", help="User icon URL"),
-
display_name: Optional[str] = typer.Option(None, "--display-name", "-d", help="User display name"),
+
display_name: Optional[str] = typer.Option(
+
None, "--display-name", "-d", help="User display name"
+
),
config_file: Optional[Path] = typer.Option(
Path("thicket.yaml"), "--config", help="Configuration file path"
),
auto_discover: bool = typer.Option(
-
True, "--auto-discover/--no-auto-discover", help="Auto-discover user metadata from feed"
+
True,
+
"--auto-discover/--no-auto-discover",
+
help="Auto-discover user metadata from feed",
),
) -> None:
"""Add a user or feed to thicket."""
if subcommand == "user":
-
add_user(username, feed_url, email, homepage, icon, display_name, config_file, auto_discover)
+
add_user(
+
username,
+
feed_url,
+
email,
+
homepage,
+
icon,
+
display_name,
+
config_file,
+
auto_discover,
+
)
elif subcommand == "feed":
add_feed(username, feed_url, config_file)
else:
···
discovered_metadata = asyncio.run(discover_feed_metadata(validated_feed_url))
# Prepare user data with manual overrides taking precedence
-
user_display_name = display_name or (discovered_metadata.author_name or discovered_metadata.title if discovered_metadata else None)
-
user_email = email or (discovered_metadata.author_email if discovered_metadata else None)
-
user_homepage = homepage or (str(discovered_metadata.author_uri or discovered_metadata.link) if discovered_metadata else None)
-
user_icon = icon or (str(discovered_metadata.logo or discovered_metadata.icon or discovered_metadata.image_url) if discovered_metadata else None)
+
user_display_name = display_name or (
+
discovered_metadata.author_name or discovered_metadata.title
+
if discovered_metadata
+
else None
+
)
+
user_email = email or (
+
discovered_metadata.author_email if discovered_metadata else None
+
)
+
user_homepage = homepage or (
+
str(discovered_metadata.author_uri or discovered_metadata.link)
+
if discovered_metadata
+
else None
+
)
+
user_icon = icon or (
+
str(
+
discovered_metadata.logo
+
or discovered_metadata.icon
+
or discovered_metadata.image_url
+
)
+
if discovered_metadata
+
else None
+
)
# Add user to Git store
git_store.add_user(
+7 -3
src/thicket/cli/commands/duplicates.py
···
from ..main import app
from ..utils import (
console,
+
get_tsv_mode,
load_config,
print_error,
print_info,
print_success,
-
get_tsv_mode,
)
···
print_info(f"Total duplicates: {len(duplicates.duplicates)}")
-
def add_duplicate(git_store: GitStore, duplicate_id: Optional[str], canonical_id: Optional[str]) -> None:
+
def add_duplicate(
+
git_store: GitStore, duplicate_id: Optional[str], canonical_id: Optional[str]
+
) -> None:
"""Add a duplicate mapping."""
if not duplicate_id:
print_error("Duplicate ID is required")
···
# Remove the mapping
if git_store.remove_duplicate(duplicate_id):
# Commit changes
-
git_store.commit_changes(f"Remove duplicate mapping: {duplicate_id} -> {canonical_id}")
+
git_store.commit_changes(
+
f"Remove duplicate mapping: {duplicate_id} -> {canonical_id}"
+
)
print_success(f"Removed duplicate mapping: {duplicate_id} -> {canonical_id}")
else:
print_error(f"Failed to remove duplicate mapping: {duplicate_id}")
-396
src/thicket/cli/commands/index_cmd.py
···
-
"""CLI command for building reference index from blog entries."""
-
-
import json
-
from pathlib import Path
-
from typing import Optional
-
-
import typer
-
from rich.console import Console
-
from rich.progress import (
-
BarColumn,
-
Progress,
-
SpinnerColumn,
-
TaskProgressColumn,
-
TextColumn,
-
)
-
from rich.table import Table
-
-
from ...core.git_store import GitStore
-
from ...core.reference_parser import ReferenceIndex, ReferenceParser
-
from ..main import app
-
from ..utils import get_tsv_mode, load_config
-
-
console = Console()
-
-
-
@app.command()
-
def index(
-
config_file: Optional[Path] = typer.Option(
-
None,
-
"--config",
-
"-c",
-
help="Path to configuration file",
-
),
-
output_file: Optional[Path] = typer.Option(
-
None,
-
"--output",
-
"-o",
-
help="Path to output index file (default: references.json in git store)",
-
),
-
verbose: bool = typer.Option(
-
False,
-
"--verbose",
-
"-v",
-
help="Show detailed progress information",
-
),
-
) -> None:
-
"""Build a reference index showing which blog entries reference others.
-
-
This command analyzes all blog entries to detect cross-references between
-
different blogs, creating an index that can be used to build threaded
-
views of related content.
-
"""
-
try:
-
# Load configuration
-
config = load_config(config_file)
-
-
# Initialize Git store
-
git_store = GitStore(config.git_store)
-
-
# Initialize reference parser
-
parser = ReferenceParser()
-
-
# Build user domain mapping
-
if verbose:
-
console.print("Building user domain mapping...")
-
user_domains = parser.build_user_domain_mapping(git_store)
-
-
if verbose:
-
console.print(f"Found {len(user_domains)} users with {sum(len(d) for d in user_domains.values())} total domains")
-
-
# Initialize reference index
-
ref_index = ReferenceIndex()
-
ref_index.user_domains = user_domains
-
-
# Get all users
-
index = git_store._load_index()
-
users = list(index.users.keys())
-
-
if not users:
-
console.print("[yellow]No users found in Git store[/yellow]")
-
raise typer.Exit(0)
-
-
# Process all entries
-
total_entries = 0
-
total_references = 0
-
all_references = []
-
-
with Progress(
-
SpinnerColumn(),
-
TextColumn("[progress.description]{task.description}"),
-
BarColumn(),
-
TaskProgressColumn(),
-
console=console,
-
) as progress:
-
-
# Count total entries first
-
counting_task = progress.add_task("Counting entries...", total=len(users))
-
entry_counts = {}
-
for username in users:
-
entries = git_store.list_entries(username)
-
entry_counts[username] = len(entries)
-
total_entries += len(entries)
-
progress.advance(counting_task)
-
-
progress.remove_task(counting_task)
-
-
# Process entries - extract references
-
processing_task = progress.add_task(
-
f"Extracting references from {total_entries} entries...",
-
total=total_entries
-
)
-
-
for username in users:
-
entries = git_store.list_entries(username)
-
-
for entry in entries:
-
# Extract references from this entry
-
references = parser.extract_references(entry, username, user_domains)
-
all_references.extend(references)
-
-
progress.advance(processing_task)
-
-
if verbose and references:
-
console.print(f" Found {len(references)} references in {username}:{entry.title[:50]}...")
-
-
progress.remove_task(processing_task)
-
-
# Resolve target_entry_ids for references
-
if all_references:
-
resolve_task = progress.add_task(
-
f"Resolving {len(all_references)} references...",
-
total=len(all_references)
-
)
-
-
if verbose:
-
console.print(f"Resolving target entry IDs for {len(all_references)} references...")
-
-
resolved_references = parser.resolve_target_entry_ids(all_references, git_store)
-
-
# Count resolved references
-
resolved_count = sum(1 for ref in resolved_references if ref.target_entry_id is not None)
-
if verbose:
-
console.print(f"Resolved {resolved_count} out of {len(all_references)} references")
-
-
# Add resolved references to index
-
for ref in resolved_references:
-
ref_index.add_reference(ref)
-
total_references += 1
-
progress.advance(resolve_task)
-
-
progress.remove_task(resolve_task)
-
-
# Determine output path
-
if output_file:
-
output_path = output_file
-
else:
-
output_path = config.git_store / "references.json"
-
-
# Save reference index
-
with open(output_path, "w") as f:
-
json.dump(ref_index.to_dict(), f, indent=2, default=str)
-
-
# Show summary
-
if not get_tsv_mode():
-
console.print("\n[green]✓ Reference index built successfully[/green]")
-
-
# Create summary table or TSV output
-
if get_tsv_mode():
-
print("Metric\tCount")
-
print(f"Total Users\t{len(users)}")
-
print(f"Total Entries\t{total_entries}")
-
print(f"Total References\t{total_references}")
-
print(f"Outbound Refs\t{len(ref_index.outbound_refs)}")
-
print(f"Inbound Refs\t{len(ref_index.inbound_refs)}")
-
print(f"Output File\t{output_path}")
-
else:
-
table = Table(title="Reference Index Summary")
-
table.add_column("Metric", style="cyan")
-
table.add_column("Count", style="green")
-
-
table.add_row("Total Users", str(len(users)))
-
table.add_row("Total Entries", str(total_entries))
-
table.add_row("Total References", str(total_references))
-
table.add_row("Outbound Refs", str(len(ref_index.outbound_refs)))
-
table.add_row("Inbound Refs", str(len(ref_index.inbound_refs)))
-
table.add_row("Output File", str(output_path))
-
-
console.print(table)
-
-
# Show some interesting statistics
-
if total_references > 0:
-
if not get_tsv_mode():
-
console.print("\n[bold]Reference Statistics:[/bold]")
-
-
# Most referenced users
-
target_counts = {}
-
unresolved_domains = set()
-
-
for ref in ref_index.references:
-
if ref.target_username:
-
target_counts[ref.target_username] = target_counts.get(ref.target_username, 0) + 1
-
else:
-
# Track unresolved domains
-
from urllib.parse import urlparse
-
domain = urlparse(ref.target_url).netloc.lower()
-
unresolved_domains.add(domain)
-
-
if target_counts:
-
if get_tsv_mode():
-
print("Referenced User\tReference Count")
-
for username, count in sorted(target_counts.items(), key=lambda x: x[1], reverse=True)[:5]:
-
print(f"{username}\t{count}")
-
else:
-
console.print("\nMost referenced users:")
-
for username, count in sorted(target_counts.items(), key=lambda x: x[1], reverse=True)[:5]:
-
console.print(f" {username}: {count} references")
-
-
if unresolved_domains and verbose:
-
if get_tsv_mode():
-
print("Unresolved Domain\tCount")
-
for domain in sorted(list(unresolved_domains)[:10]):
-
print(f"{domain}\t1")
-
if len(unresolved_domains) > 10:
-
print(f"... and {len(unresolved_domains) - 10} more\t...")
-
else:
-
console.print(f"\nUnresolved domains: {len(unresolved_domains)}")
-
for domain in sorted(list(unresolved_domains)[:10]):
-
console.print(f" {domain}")
-
if len(unresolved_domains) > 10:
-
console.print(f" ... and {len(unresolved_domains) - 10} more")
-
-
except Exception as e:
-
console.print(f"[red]Error building reference index: {e}[/red]")
-
if verbose:
-
console.print_exception()
-
raise typer.Exit(1)
-
-
-
@app.command()
-
def threads(
-
config_file: Optional[Path] = typer.Option(
-
None,
-
"--config",
-
"-c",
-
help="Path to configuration file",
-
),
-
index_file: Optional[Path] = typer.Option(
-
None,
-
"--index",
-
"-i",
-
help="Path to reference index file (default: references.json in git store)",
-
),
-
username: Optional[str] = typer.Option(
-
None,
-
"--username",
-
"-u",
-
help="Show threads for specific username only",
-
),
-
entry_id: Optional[str] = typer.Option(
-
None,
-
"--entry",
-
"-e",
-
help="Show thread for specific entry ID",
-
),
-
min_size: int = typer.Option(
-
2,
-
"--min-size",
-
"-m",
-
help="Minimum thread size to display",
-
),
-
) -> None:
-
"""Show threaded view of related blog entries.
-
-
This command uses the reference index to show which blog entries
-
are connected through cross-references, creating an email-style
-
threaded view of the conversation.
-
"""
-
try:
-
# Load configuration
-
config = load_config(config_file)
-
-
# Determine index file path
-
if index_file:
-
index_path = index_file
-
else:
-
index_path = config.git_store / "references.json"
-
-
if not index_path.exists():
-
console.print(f"[red]Reference index not found: {index_path}[/red]")
-
console.print("Run 'thicket index' first to build the reference index")
-
raise typer.Exit(1)
-
-
# Load reference index
-
with open(index_path) as f:
-
index_data = json.load(f)
-
-
ref_index = ReferenceIndex.from_dict(index_data)
-
-
# Initialize Git store to get entry details
-
git_store = GitStore(config.git_store)
-
-
if entry_id and username:
-
# Show specific thread
-
thread_members = ref_index.get_thread_members(username, entry_id)
-
_display_thread(thread_members, ref_index, git_store, f"Thread for {username}:{entry_id}")
-
-
elif username:
-
# Show all threads involving this user
-
user_index = git_store._load_index()
-
user = user_index.get_user(username)
-
if not user:
-
console.print(f"[red]User not found: {username}[/red]")
-
raise typer.Exit(1)
-
-
entries = git_store.list_entries(username)
-
threads_found = set()
-
-
console.print(f"[bold]Threads involving {username}:[/bold]\n")
-
-
for entry in entries:
-
thread_members = ref_index.get_thread_members(username, entry.id)
-
if len(thread_members) >= min_size:
-
thread_key = tuple(sorted(thread_members))
-
if thread_key not in threads_found:
-
threads_found.add(thread_key)
-
_display_thread(thread_members, ref_index, git_store, f"Thread #{len(threads_found)}")
-
-
else:
-
# Show all threads
-
console.print("[bold]All conversation threads:[/bold]\n")
-
-
all_threads = set()
-
processed_entries = set()
-
-
# Get all entries
-
user_index = git_store._load_index()
-
for username in user_index.users.keys():
-
entries = git_store.list_entries(username)
-
for entry in entries:
-
entry_key = (username, entry.id)
-
if entry_key in processed_entries:
-
continue
-
-
thread_members = ref_index.get_thread_members(username, entry.id)
-
if len(thread_members) >= min_size:
-
thread_key = tuple(sorted(thread_members))
-
if thread_key not in all_threads:
-
all_threads.add(thread_key)
-
_display_thread(thread_members, ref_index, git_store, f"Thread #{len(all_threads)}")
-
-
# Mark all members as processed
-
for member in thread_members:
-
processed_entries.add(member)
-
-
if not all_threads:
-
console.print("[yellow]No conversation threads found[/yellow]")
-
console.print(f"(minimum thread size: {min_size})")
-
-
except Exception as e:
-
console.print(f"[red]Error showing threads: {e}[/red]")
-
raise typer.Exit(1)
-
-
-
def _display_thread(thread_members, ref_index, git_store, title):
-
"""Display a single conversation thread."""
-
console.print(f"[bold cyan]{title}[/bold cyan]")
-
console.print(f"Thread size: {len(thread_members)} entries")
-
-
# Get entry details for each member
-
thread_entries = []
-
for username, entry_id in thread_members:
-
entry = git_store.get_entry(username, entry_id)
-
if entry:
-
thread_entries.append((username, entry))
-
-
# Sort by publication date
-
thread_entries.sort(key=lambda x: x[1].published or x[1].updated)
-
-
# Display entries
-
for i, (username, entry) in enumerate(thread_entries):
-
prefix = "├─" if i < len(thread_entries) - 1 else "└─"
-
-
# Get references for this entry
-
outbound = ref_index.get_outbound_refs(username, entry.id)
-
inbound = ref_index.get_inbound_refs(username, entry.id)
-
-
ref_info = ""
-
if outbound or inbound:
-
ref_info = f" ({len(outbound)} out, {len(inbound)} in)"
-
-
console.print(f" {prefix} [{username}] {entry.title[:60]}...{ref_info}")
-
-
if entry.published:
-
console.print(f" Published: {entry.published.strftime('%Y-%m-%d')}")
-
-
console.print() # Empty line after each thread
+105 -112
src/thicket/cli/commands/info_cmd.py
···
"""CLI command for displaying detailed information about a specific atom entry."""
-
import json
from pathlib import Path
from typing import Optional
···
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
-
from rich.text import Text
from ...core.git_store import GitStore
-
from ...core.reference_parser import ReferenceIndex
from ..main import app
-
from ..utils import load_config, get_tsv_mode
+
from ..utils import get_tsv_mode, load_config
console = Console()
···
@app.command()
def info(
identifier: str = typer.Argument(
-
...,
-
help="The atom ID or URL of the entry to display information about"
+
..., help="The atom ID or URL of the entry to display information about"
),
username: Optional[str] = typer.Option(
None,
"--username",
"-u",
-
help="Username to search for the entry (if not provided, searches all users)"
+
help="Username to search for the entry (if not provided, searches all users)",
),
config_file: Optional[Path] = typer.Option(
Path("thicket.yaml"),
···
help="Path to configuration file",
),
show_content: bool = typer.Option(
-
False,
-
"--content",
-
help="Include the full content of the entry in the output"
+
False, "--content", help="Include the full content of the entry in the output"
),
) -> None:
"""Display detailed information about a specific atom entry.
-
+
You can specify the entry using either its atom ID or URL.
Shows all metadata for the given entry, including title, dates, categories,
and summarizes all inbound and outbound links to/from other posts.
···
try:
# Load configuration
config = load_config(config_file)
-
+
# Initialize Git store
git_store = GitStore(config.git_store)
-
+
# Find the entry
entry = None
found_username = None
-
+
# Check if identifier looks like a URL
-
is_url = identifier.startswith(('http://', 'https://'))
-
+
is_url = identifier.startswith(("http://", "https://"))
+
if username:
# Search specific username
if is_url:
···
if entry:
found_username = user
break
-
+
if not entry or not found_username:
if username:
-
console.print(f"[red]Entry with {'URL' if is_url else 'atom ID'} '{identifier}' not found for user '{username}'[/red]")
+
console.print(
+
f"[red]Entry with {'URL' if is_url else 'atom ID'} '{identifier}' not found for user '{username}'[/red]"
+
)
else:
-
console.print(f"[red]Entry with {'URL' if is_url else 'atom ID'} '{identifier}' not found in any user's entries[/red]")
+
console.print(
+
f"[red]Entry with {'URL' if is_url else 'atom ID'} '{identifier}' not found in any user's entries[/red]"
+
)
raise typer.Exit(1)
-
-
# Load reference index if available
-
references_path = config.git_store / "references.json"
-
ref_index = None
-
if references_path.exists():
-
with open(references_path) as f:
-
index_data = json.load(f)
-
ref_index = ReferenceIndex.from_dict(index_data)
-
+
# Display information
if get_tsv_mode():
-
_display_entry_info_tsv(entry, found_username, ref_index, show_content)
+
_display_entry_info_tsv(entry, found_username, show_content)
else:
_display_entry_info(entry, found_username)
-
-
if ref_index:
-
_display_link_info(entry, found_username, ref_index)
-
else:
-
console.print("\n[yellow]No reference index found. Run 'thicket index' to build cross-reference data.[/yellow]")
-
+
+
# Display links and backlinks from entry fields
+
_display_link_info(entry, found_username, git_store)
+
# Optionally display content
if show_content and entry.content:
_display_content(entry.content)
-
+
except Exception as e:
console.print(f"[red]Error displaying entry info: {e}[/red]")
raise typer.Exit(1)
···
def _display_entry_info(entry, username: str) -> None:
"""Display basic entry information in a structured format."""
-
+
# Create main info panel
info_table = Table.grid(padding=(0, 2))
info_table.add_column("Field", style="cyan bold", width=15)
info_table.add_column("Value", style="white")
-
+
info_table.add_row("User", f"[green]{username}[/green]")
info_table.add_row("Atom ID", f"[blue]{entry.id}[/blue]")
info_table.add_row("Title", entry.title)
info_table.add_row("Link", str(entry.link))
-
+
if entry.published:
-
info_table.add_row("Published", entry.published.strftime("%Y-%m-%d %H:%M:%S UTC"))
-
+
info_table.add_row(
+
"Published", entry.published.strftime("%Y-%m-%d %H:%M:%S UTC")
+
)
+
info_table.add_row("Updated", entry.updated.strftime("%Y-%m-%d %H:%M:%S UTC"))
-
+
if entry.summary:
# Truncate long summaries
-
summary = entry.summary[:200] + "..." if len(entry.summary) > 200 else entry.summary
+
summary = (
+
entry.summary[:200] + "..." if len(entry.summary) > 200 else entry.summary
+
)
info_table.add_row("Summary", summary)
-
+
if entry.categories:
categories_text = ", ".join(entry.categories)
info_table.add_row("Categories", categories_text)
-
+
if entry.author:
author_info = []
if "name" in entry.author:
···
author_info.append(f"<{entry.author['email']}>")
if author_info:
info_table.add_row("Author", " ".join(author_info))
-
+
if entry.content_type:
info_table.add_row("Content Type", entry.content_type)
-
+
if entry.rights:
info_table.add_row("Rights", entry.rights)
-
+
if entry.source:
info_table.add_row("Source Feed", entry.source)
-
+
panel = Panel(
-
info_table,
-
title=f"[bold]Entry Information[/bold]",
-
border_style="blue"
+
info_table, title="[bold]Entry Information[/bold]", border_style="blue"
)
-
+
console.print(panel)
-
def _display_link_info(entry, username: str, ref_index: ReferenceIndex) -> None:
+
def _display_link_info(entry, username: str, git_store: GitStore) -> None:
"""Display inbound and outbound link information."""
-
-
# Get links
-
outbound_refs = ref_index.get_outbound_refs(username, entry.id)
-
inbound_refs = ref_index.get_inbound_refs(username, entry.id)
-
-
if not outbound_refs and not inbound_refs:
+
+
# Get links from entry fields
+
outbound_links = getattr(entry, "links", [])
+
backlinks = getattr(entry, "backlinks", [])
+
+
if not outbound_links and not backlinks:
console.print("\n[dim]No cross-references found for this entry.[/dim]")
return
-
+
# Create links table
links_table = Table(title="Cross-References")
links_table.add_column("Direction", style="cyan", width=10)
-
links_table.add_column("Target/Source", style="green", width=20)
-
links_table.add_column("URL", style="blue", width=50)
-
-
# Add outbound references
-
for ref in outbound_refs:
-
target_info = f"{ref.target_username}:{ref.target_entry_id}" if ref.target_username and ref.target_entry_id else "External"
-
links_table.add_row("→ Out", target_info, ref.target_url)
-
-
# Add inbound references
-
for ref in inbound_refs:
-
source_info = f"{ref.source_username}:{ref.source_entry_id}"
-
links_table.add_row("← In", source_info, ref.target_url)
-
+
links_table.add_column("Target/Source", style="green", width=30)
+
links_table.add_column("URL/ID", style="blue", width=60)
+
+
# Add outbound links
+
for link in outbound_links:
+
links_table.add_row("→ Out", "External/Other", link)
+
+
# Add backlinks (inbound references)
+
for backlink_id in backlinks:
+
# Try to find which user this entry belongs to
+
source_info = backlink_id
+
# Could enhance this by looking up the actual entry to get username
+
links_table.add_row("← In", "Entry", source_info)
+
console.print()
console.print(links_table)
-
+
# Summary
-
console.print(f"\n[bold]Summary:[/bold] {len(outbound_refs)} outbound, {len(inbound_refs)} inbound references")
+
console.print(
+
f"\n[bold]Summary:[/bold] {len(outbound_links)} outbound links, {len(backlinks)} inbound backlinks"
+
)
def _display_content(content: str) -> None:
"""Display the full content of the entry."""
-
+
# Truncate very long content
display_content = content
if len(content) > 5000:
display_content = content[:5000] + "\n\n[... content truncated ...]"
-
+
panel = Panel(
display_content,
title="[bold]Entry Content[/bold]",
border_style="green",
-
expand=False
+
expand=False,
)
-
+
console.print()
console.print(panel)
-
def _display_entry_info_tsv(entry, username: str, ref_index: Optional[ReferenceIndex], show_content: bool) -> None:
+
def _display_entry_info_tsv(entry, username: str, show_content: bool) -> None:
"""Display entry information in TSV format."""
-
+
# Basic info
print("Field\tValue")
print(f"User\t{username}")
print(f"Atom ID\t{entry.id}")
-
print(f"Title\t{entry.title.replace(chr(9), ' ').replace(chr(10), ' ').replace(chr(13), ' ')}")
+
print(
+
f"Title\t{entry.title.replace(chr(9), ' ').replace(chr(10), ' ').replace(chr(13), ' ')}"
+
)
print(f"Link\t{entry.link}")
-
+
if entry.published:
print(f"Published\t{entry.published.strftime('%Y-%m-%d %H:%M:%S UTC')}")
-
+
print(f"Updated\t{entry.updated.strftime('%Y-%m-%d %H:%M:%S UTC')}")
-
+
if entry.summary:
# Escape tabs and newlines in summary
-
summary = entry.summary.replace('\t', ' ').replace('\n', ' ').replace('\r', ' ')
+
summary = entry.summary.replace("\t", " ").replace("\n", " ").replace("\r", " ")
print(f"Summary\t{summary}")
-
+
if entry.categories:
print(f"Categories\t{', '.join(entry.categories)}")
-
+
if entry.author:
author_info = []
if "name" in entry.author:
···
author_info.append(f"<{entry.author['email']}>")
if author_info:
print(f"Author\t{' '.join(author_info)}")
-
+
if entry.content_type:
print(f"Content Type\t{entry.content_type}")
-
+
if entry.rights:
print(f"Rights\t{entry.rights}")
-
+
if entry.source:
print(f"Source Feed\t{entry.source}")
-
-
# Add reference info if available
-
if ref_index:
-
outbound_refs = ref_index.get_outbound_refs(username, entry.id)
-
inbound_refs = ref_index.get_inbound_refs(username, entry.id)
-
-
print(f"Outbound References\t{len(outbound_refs)}")
-
print(f"Inbound References\t{len(inbound_refs)}")
-
-
# Show each reference
-
for ref in outbound_refs:
-
target_info = f"{ref.target_username}:{ref.target_entry_id}" if ref.target_username and ref.target_entry_id else "External"
-
print(f"Outbound Reference\t{target_info}\t{ref.target_url}")
-
-
for ref in inbound_refs:
-
source_info = f"{ref.source_username}:{ref.source_entry_id}"
-
print(f"Inbound Reference\t{source_info}\t{ref.target_url}")
-
+
+
# Add links info from entry fields
+
outbound_links = getattr(entry, "links", [])
+
backlinks = getattr(entry, "backlinks", [])
+
+
if outbound_links or backlinks:
+
print(f"Outbound Links\t{len(outbound_links)}")
+
print(f"Backlinks\t{len(backlinks)}")
+
+
# Show each link
+
for link in outbound_links:
+
print(f"→ Link\t{link}")
+
+
for backlink_id in backlinks:
+
print(f"← Backlink\t{backlink_id}")
+
# Show content if requested
if show_content and entry.content:
# Escape tabs and newlines in content
-
content = entry.content.replace('\t', ' ').replace('\n', ' ').replace('\r', ' ')
-
print(f"Content\t{content}")
+
content = entry.content.replace("\t", " ").replace("\n", " ").replace("\r", " ")
+
print(f"Content\t{content}")
+5 -6
src/thicket/cli/commands/init.py
···
@app.command()
def init(
-
git_store: Path = typer.Argument(..., help="Path to Git repository for storing feeds"),
+
git_store: Path = typer.Argument(
+
..., help="Path to Git repository for storing feeds"
+
),
cache_dir: Optional[Path] = typer.Option(
None, "--cache-dir", "-c", help="Cache directory (default: ~/.cache/thicket)"
),
···
# Set default paths
if cache_dir is None:
from platformdirs import user_cache_dir
+
cache_dir = Path(user_cache_dir("thicket"))
if config_file is None:
···
# Create configuration
try:
-
config = ThicketConfig(
-
git_store=git_store,
-
cache_dir=cache_dir,
-
users=[]
-
)
+
config = ThicketConfig(git_store=git_store, cache_dir=cache_dir, users=[])
save_config(config, config_file)
print_success(f"Created configuration file: {config_file}")
-422
src/thicket/cli/commands/links_cmd.py
···
-
"""CLI command for extracting and categorizing all outbound links from blog entries."""
-
-
import json
-
import re
-
from pathlib import Path
-
from typing import Dict, List, Optional, Set
-
from urllib.parse import urljoin, urlparse
-
-
import typer
-
from rich.console import Console
-
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
-
from rich.table import Table
-
-
from ...core.git_store import GitStore
-
from ..main import app
-
from ..utils import load_config, get_tsv_mode
-
-
console = Console()
-
-
-
class LinkData:
-
"""Represents a link found in a blog entry."""
-
-
def __init__(self, url: str, entry_id: str, username: str):
-
self.url = url
-
self.entry_id = entry_id
-
self.username = username
-
-
def to_dict(self) -> dict:
-
"""Convert to dictionary for JSON serialization."""
-
return {
-
"url": self.url,
-
"entry_id": self.entry_id,
-
"username": self.username
-
}
-
-
@classmethod
-
def from_dict(cls, data: dict) -> "LinkData":
-
"""Create from dictionary."""
-
return cls(
-
url=data["url"],
-
entry_id=data["entry_id"],
-
username=data["username"]
-
)
-
-
-
class LinkCategorizer:
-
"""Categorizes links as internal, user, or unknown."""
-
-
def __init__(self, user_domains: Dict[str, Set[str]]):
-
self.user_domains = user_domains
-
# Create reverse mapping of domain -> username
-
self.domain_to_user = {}
-
for username, domains in user_domains.items():
-
for domain in domains:
-
self.domain_to_user[domain] = username
-
-
def categorize_url(self, url: str, source_username: str) -> tuple[str, Optional[str]]:
-
"""
-
Categorize a URL as 'internal', 'user', or 'unknown'.
-
Returns (category, target_username).
-
"""
-
try:
-
parsed = urlparse(url)
-
domain = parsed.netloc.lower()
-
-
# Check if it's a link to the same user's domain (internal)
-
if domain in self.user_domains.get(source_username, set()):
-
return "internal", source_username
-
-
# Check if it's a link to another user's domain
-
if domain in self.domain_to_user:
-
return "user", self.domain_to_user[domain]
-
-
# Everything else is unknown
-
return "unknown", None
-
-
except Exception:
-
return "unknown", None
-
-
-
class LinkExtractor:
-
"""Extracts and resolves links from blog entries."""
-
-
def __init__(self):
-
# Pattern for extracting links from HTML
-
self.link_pattern = re.compile(r'<a[^>]+href="([^"]+)"[^>]*>(.*?)</a>', re.IGNORECASE | re.DOTALL)
-
self.url_pattern = re.compile(r'https?://[^\s<>"]+')
-
-
def extract_links_from_html(self, html_content: str, base_url: str) -> List[tuple[str, str]]:
-
"""Extract all links from HTML content and resolve them against base URL."""
-
links = []
-
-
# Extract links from <a> tags
-
for match in self.link_pattern.finditer(html_content):
-
url = match.group(1)
-
text = re.sub(r'<[^>]+>', '', match.group(2)).strip() # Remove HTML tags from link text
-
-
# Resolve relative URLs against base URL
-
resolved_url = urljoin(base_url, url)
-
links.append((resolved_url, text))
-
-
return links
-
-
-
def extract_links_from_entry(self, entry, username: str, base_url: str) -> List[LinkData]:
-
"""Extract all links from a blog entry."""
-
links = []
-
-
# Combine all text content for analysis
-
content_to_search = []
-
if entry.content:
-
content_to_search.append(entry.content)
-
if entry.summary:
-
content_to_search.append(entry.summary)
-
-
for content in content_to_search:
-
extracted_links = self.extract_links_from_html(content, base_url)
-
-
for url, link_text in extracted_links:
-
# Skip empty URLs
-
if not url or url.startswith('#'):
-
continue
-
-
link_data = LinkData(
-
url=url,
-
entry_id=entry.id,
-
username=username
-
)
-
-
links.append(link_data)
-
-
return links
-
-
-
@app.command()
-
def links(
-
config_file: Optional[Path] = typer.Option(
-
Path("thicket.yaml"),
-
"--config",
-
"-c",
-
help="Path to configuration file",
-
),
-
output_file: Optional[Path] = typer.Option(
-
None,
-
"--output",
-
"-o",
-
help="Path to output links file (default: links.json in git store)",
-
),
-
mapping_file: Optional[Path] = typer.Option(
-
None,
-
"--mapping",
-
"-m",
-
help="Path to output URL <-> atom ID mapping file (default: url_mapping.json in git store)",
-
),
-
verbose: bool = typer.Option(
-
False,
-
"--verbose",
-
"-v",
-
help="Show detailed progress information",
-
),
-
) -> None:
-
"""Extract and categorize all outbound links from blog entries.
-
-
This command analyzes all blog entries to extract outbound links,
-
resolve them properly with respect to the feed's base URL, and
-
categorize them as internal, user, or unknown links.
-
"""
-
try:
-
# Load configuration
-
config = load_config(config_file)
-
-
# Initialize Git store
-
git_store = GitStore(config.git_store)
-
-
# Build user domain mapping
-
if verbose:
-
console.print("Building user domain mapping...")
-
-
index = git_store._load_index()
-
user_domains = {}
-
-
for username, user_metadata in index.users.items():
-
domains = set()
-
-
# Add domains from feeds
-
for feed_url in user_metadata.feeds:
-
domain = urlparse(feed_url).netloc.lower()
-
if domain:
-
domains.add(domain)
-
-
# Add domain from homepage
-
if user_metadata.homepage:
-
domain = urlparse(str(user_metadata.homepage)).netloc.lower()
-
if domain:
-
domains.add(domain)
-
-
user_domains[username] = domains
-
-
if verbose:
-
console.print(f"Found {len(user_domains)} users with {sum(len(d) for d in user_domains.values())} total domains")
-
-
# Initialize components
-
link_extractor = LinkExtractor()
-
categorizer = LinkCategorizer(user_domains)
-
-
# Get all users
-
users = list(index.users.keys())
-
-
if not users:
-
console.print("[yellow]No users found in Git store[/yellow]")
-
raise typer.Exit(0)
-
-
# Process all entries
-
all_links = []
-
link_categories = {"internal": [], "user": [], "unknown": []}
-
link_dict = {} # Dictionary with link URL as key, maps to list of atom IDs
-
reverse_dict = {} # Dictionary with atom ID as key, maps to list of URLs
-
-
with Progress(
-
SpinnerColumn(),
-
TextColumn("[progress.description]{task.description}"),
-
BarColumn(),
-
TaskProgressColumn(),
-
console=console,
-
) as progress:
-
-
# Count total entries first
-
counting_task = progress.add_task("Counting entries...", total=len(users))
-
total_entries = 0
-
-
for username in users:
-
entries = git_store.list_entries(username)
-
total_entries += len(entries)
-
progress.advance(counting_task)
-
-
progress.remove_task(counting_task)
-
-
# Process entries
-
processing_task = progress.add_task(
-
f"Processing {total_entries} entries...",
-
total=total_entries
-
)
-
-
for username in users:
-
entries = git_store.list_entries(username)
-
user_metadata = index.users[username]
-
-
# Get base URL for this user (use first feed URL)
-
base_url = str(user_metadata.feeds[0]) if user_metadata.feeds else "https://example.com"
-
-
for entry in entries:
-
# Extract links from this entry
-
entry_links = link_extractor.extract_links_from_entry(entry, username, base_url)
-
-
# Track unique links per entry
-
entry_urls_seen = set()
-
-
# Categorize each link
-
for link_data in entry_links:
-
# Skip if we've already seen this URL in this entry
-
if link_data.url in entry_urls_seen:
-
continue
-
entry_urls_seen.add(link_data.url)
-
-
category, target_username = categorizer.categorize_url(link_data.url, username)
-
-
# Add to link dictionary (URL as key, maps to list of atom IDs)
-
if link_data.url not in link_dict:
-
link_dict[link_data.url] = []
-
if link_data.entry_id not in link_dict[link_data.url]:
-
link_dict[link_data.url].append(link_data.entry_id)
-
-
# Also add to reverse mapping (atom ID -> list of URLs)
-
if link_data.entry_id not in reverse_dict:
-
reverse_dict[link_data.entry_id] = []
-
if link_data.url not in reverse_dict[link_data.entry_id]:
-
reverse_dict[link_data.entry_id].append(link_data.url)
-
-
# Add category info to link data for categories tracking
-
link_info = link_data.to_dict()
-
link_info["category"] = category
-
link_info["target_username"] = target_username
-
-
all_links.append(link_info)
-
link_categories[category].append(link_info)
-
-
progress.advance(processing_task)
-
-
if verbose and entry_links:
-
console.print(f" Found {len(entry_links)} links in {username}:{entry.title[:50]}...")
-
-
# Determine output paths
-
if output_file:
-
output_path = output_file
-
else:
-
output_path = config.git_store / "links.json"
-
-
if mapping_file:
-
mapping_path = mapping_file
-
else:
-
mapping_path = config.git_store / "url_mapping.json"
-
-
# Save all extracted links (not just filtered ones)
-
if verbose:
-
console.print("Preparing output data...")
-
-
# Build a set of all URLs that correspond to posts in the git database
-
registered_urls = set()
-
-
# Get all entries from all users and build URL mappings
-
for username in users:
-
entries = git_store.list_entries(username)
-
user_metadata = index.users[username]
-
-
for entry in entries:
-
# Try to match entry URLs with extracted links
-
if hasattr(entry, 'link') and entry.link:
-
registered_urls.add(str(entry.link))
-
-
# Also check entry alternate links if they exist
-
if hasattr(entry, 'links') and entry.links:
-
for link in entry.links:
-
if hasattr(link, 'href') and link.href:
-
registered_urls.add(str(link.href))
-
-
# Create filtered version for URL mapping (only links to registered posts)
-
filtered_link_dict = {}
-
filtered_reverse_dict = {}
-
-
for url, entry_ids in link_dict.items():
-
if url in registered_urls:
-
filtered_link_dict[url] = entry_ids
-
-
# Also update reverse mapping
-
for entry_id in entry_ids:
-
if entry_id not in filtered_reverse_dict:
-
filtered_reverse_dict[entry_id] = []
-
if url not in filtered_reverse_dict[entry_id]:
-
filtered_reverse_dict[entry_id].append(url)
-
-
# Use all links for main output, not filtered ones
-
output_data = link_dict
-
-
if verbose:
-
console.print(f"Found {len(registered_urls)} registered post URLs")
-
console.print(f"Found {len(link_dict)} total links, {len(filtered_link_dict)} links to registered posts")
-
-
# Save links data (URL -> atom ID mapping, all links)
-
with open(output_path, "w") as f:
-
json.dump(output_data, f, indent=2, default=str)
-
-
# Save bidirectional mapping file (filtered)
-
mapping_data = {
-
"url_to_atom": filtered_link_dict,
-
"atom_to_urls": filtered_reverse_dict
-
}
-
-
with open(mapping_path, "w") as f:
-
json.dump(mapping_data, f, indent=2, default=str)
-
-
# Show summary
-
if not get_tsv_mode():
-
console.print("\n[green]✓ Links extraction completed successfully[/green]")
-
-
# Create summary table or TSV output
-
if get_tsv_mode():
-
print("Category\tCount\tDescription")
-
print(f"Internal\t{len(link_categories['internal'])}\tLinks to same user's domain")
-
print(f"User\t{len(link_categories['user'])}\tLinks to other tracked users")
-
print(f"Unknown\t{len(link_categories['unknown'])}\tLinks to external sites")
-
print(f"Total Extracted\t{len(all_links)}\tAll extracted links")
-
print(f"Saved to Output\t{len(output_data)}\tLinks saved to output file")
-
print(f"Cross-references\t{len(filtered_link_dict)}\tLinks to registered posts only")
-
else:
-
table = Table(title="Links Summary")
-
table.add_column("Category", style="cyan")
-
table.add_column("Count", style="green")
-
table.add_column("Description", style="white")
-
-
table.add_row("Internal", str(len(link_categories["internal"])), "Links to same user's domain")
-
table.add_row("User", str(len(link_categories["user"])), "Links to other tracked users")
-
table.add_row("Unknown", str(len(link_categories["unknown"])), "Links to external sites")
-
table.add_row("Total Extracted", str(len(all_links)), "All extracted links")
-
table.add_row("Saved to Output", str(len(output_data)), "Links saved to output file")
-
table.add_row("Cross-references", str(len(filtered_link_dict)), "Links to registered posts only")
-
-
console.print(table)
-
-
# Show user links if verbose
-
if verbose and link_categories["user"]:
-
if get_tsv_mode():
-
print("User Link Source\tUser Link Target\tLink Count")
-
user_link_counts = {}
-
-
for link in link_categories["user"]:
-
key = f"{link['username']} -> {link['target_username']}"
-
user_link_counts[key] = user_link_counts.get(key, 0) + 1
-
-
for link_pair, count in sorted(user_link_counts.items(), key=lambda x: x[1], reverse=True)[:10]:
-
source, target = link_pair.split(" -> ")
-
print(f"{source}\t{target}\t{count}")
-
else:
-
console.print("\n[bold]User-to-user links:[/bold]")
-
user_link_counts = {}
-
-
for link in link_categories["user"]:
-
key = f"{link['username']} -> {link['target_username']}"
-
user_link_counts[key] = user_link_counts.get(key, 0) + 1
-
-
for link_pair, count in sorted(user_link_counts.items(), key=lambda x: x[1], reverse=True)[:10]:
-
console.print(f" {link_pair}: {count} links")
-
-
if not get_tsv_mode():
-
console.print(f"\nLinks output saved to: {output_path}")
-
console.print(f"URL mapping saved to: {mapping_path}")
-
-
except Exception as e:
-
console.print(f"[red]Error extracting links: {e}[/red]")
-
if verbose:
-
console.print_exception()
-
raise typer.Exit(1)
+11 -11
src/thicket/cli/commands/list_cmd.py
···
from ..main import app
from ..utils import (
console,
+
get_tsv_mode,
load_config,
+
print_entries_tsv,
print_error,
-
print_feeds_table,
print_feeds_table_from_git,
print_info,
-
print_users_table,
print_users_table_from_git,
-
print_entries_tsv,
-
get_tsv_mode,
)
···
"""List all users."""
index = git_store._load_index()
users = list(index.users.values())
-
+
if not users:
print_info("No users configured")
return
···
print_feeds_table_from_git(git_store, username)
-
def list_entries(git_store: GitStore, username: Optional[str] = None, limit: Optional[int] = None) -> None:
+
def list_entries(
+
git_store: GitStore, username: Optional[str] = None, limit: Optional[int] = None
+
) -> None:
"""List entries, optionally filtered by user."""
if username:
···
"""Clean HTML content for display in table."""
if not content:
return ""
-
+
# Remove HTML tags
-
clean_text = re.sub(r'<[^>]+>', ' ', content)
+
clean_text = re.sub(r"<[^>]+>", " ", content)
# Replace multiple whitespace with single space
-
clean_text = re.sub(r'\s+', ' ', clean_text)
+
clean_text = re.sub(r"\s+", " ", clean_text)
# Strip and limit length
clean_text = clean_text.strip()
if len(clean_text) > 100:
clean_text = clean_text[:97] + "..."
-
+
return clean_text
···
if get_tsv_mode():
print_entries_tsv(entries_by_user, usernames)
return
-
+
table = Table(title="Feed Entries")
table.add_column("User", style="cyan", no_wrap=True)
table.add_column("Title", style="bold")
+15 -5
src/thicket/cli/commands/sync.py
···
user_updated_entries = 0
# Sync each feed for the user
-
for feed_url in track(user_metadata.feeds, description=f"Syncing {user_metadata.username}'s feeds"):
+
for feed_url in track(
+
user_metadata.feeds, description=f"Syncing {user_metadata.username}'s feeds"
+
):
try:
new_entries, updated_entries = asyncio.run(
sync_feed(git_store, user_metadata.username, feed_url, dry_run)
···
print_error(f"Failed to sync feed {feed_url}: {e}")
continue
-
print_info(f"User {user_metadata.username}: {user_new_entries} new, {user_updated_entries} updated")
+
print_info(
+
f"User {user_metadata.username}: {user_new_entries} new, {user_updated_entries} updated"
+
)
total_new_entries += user_new_entries
total_updated_entries += user_updated_entries
···
# Summary
if dry_run:
-
print_info(f"Dry run complete: would sync {total_new_entries} new entries, {total_updated_entries} updated")
+
print_info(
+
f"Dry run complete: would sync {total_new_entries} new entries, {total_updated_entries} updated"
+
)
else:
-
print_success(f"Sync complete: {total_new_entries} new entries, {total_updated_entries} updated")
+
print_success(
+
f"Sync complete: {total_new_entries} new entries, {total_updated_entries} updated"
+
)
-
async def sync_feed(git_store: GitStore, username: str, feed_url, dry_run: bool) -> tuple[int, int]:
+
async def sync_feed(
+
git_store: GitStore, username: str, feed_url, dry_run: bool
+
) -> tuple[int, int]:
"""Sync a single feed for a user."""
parser = FeedParser()
+1 -1
src/thicket/cli/main.py
···
# Import commands to register them
-
from .commands import add, duplicates, index_cmd, info_cmd, init, links_cmd, list_cmd, sync
+
from .commands import add, duplicates, info_cmd, init, list_cmd, sync # noqa: F401
if __name__ == "__main__":
app()
+32 -20
src/thicket/cli/utils.py
···
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.table import Table
-
from ..models import ThicketConfig, UserMetadata
from ..core.git_store import GitStore
+
from ..models import ThicketConfig, UserMetadata
console = Console()
···
def get_tsv_mode() -> bool:
"""Get the global TSV mode setting."""
from .main import tsv_mode
+
return tsv_mode
···
default_config = Path("thicket.yaml")
if default_config.exists():
import yaml
+
with open(default_config) as f:
config_data = yaml.safe_load(f)
return ThicketConfig(**config_data)
-
+
# Fall back to environment variables
return ThicketConfig()
except Exception as e:
console.print(f"[red]Error loading configuration: {e}[/red]")
-
console.print("[yellow]Run 'thicket init' to create a new configuration.[/yellow]")
+
console.print(
+
"[yellow]Run 'thicket init' to create a new configuration.[/yellow]"
+
)
raise typer.Exit(1) from e
···
if get_tsv_mode():
print_users_tsv(config)
return
-
+
table = Table(title="Users and Feeds")
table.add_column("Username", style="cyan", no_wrap=True)
table.add_column("Display Name", style="magenta")
···
if get_tsv_mode():
print_feeds_tsv(config, username)
return
-
+
table = Table(title=f"Feeds{f' for {username}' if username else ''}")
table.add_column("Username", style="cyan", no_wrap=True)
table.add_column("Feed URL", style="blue")
···
if get_tsv_mode():
print_users_tsv_from_git(users)
return
-
+
table = Table(title="Users and Feeds")
table.add_column("Username", style="cyan", no_wrap=True)
table.add_column("Display Name", style="magenta")
···
console.print(table)
-
def print_feeds_table_from_git(git_store: GitStore, username: Optional[str] = None) -> None:
+
def print_feeds_table_from_git(
+
git_store: GitStore, username: Optional[str] = None
+
) -> None:
"""Print a table of feeds from git repository."""
if get_tsv_mode():
print_feeds_tsv_from_git(git_store, username)
return
-
+
table = Table(title=f"Feeds{f' for {username}' if username else ''}")
table.add_column("Username", style="cyan", no_wrap=True)
table.add_column("Feed URL", style="blue")
···
print("Username\tDisplay Name\tEmail\tHomepage\tFeeds")
for user in config.users:
feeds_str = ",".join(str(feed) for feed in user.feeds)
-
print(f"{user.username}\t{user.display_name or ''}\t{user.email or ''}\t{user.homepage or ''}\t{feeds_str}")
+
print(
+
f"{user.username}\t{user.display_name or ''}\t{user.email or ''}\t{user.homepage or ''}\t{feeds_str}"
+
)
def print_users_tsv_from_git(users: list[UserMetadata]) -> None:
···
print("Username\tDisplay Name\tEmail\tHomepage\tFeeds")
for user in users:
feeds_str = ",".join(user.feeds)
-
print(f"{user.username}\t{user.display_name or ''}\t{user.email or ''}\t{user.homepage or ''}\t{feeds_str}")
+
print(
+
f"{user.username}\t{user.display_name or ''}\t{user.email or ''}\t{user.homepage or ''}\t{feeds_str}"
+
)
def print_feeds_tsv(config: ThicketConfig, username: Optional[str] = None) -> None:
···
print("Username\tFeed URL\tStatus")
users = [config.find_user(username)] if username else config.users
users = [u for u in users if u is not None]
-
+
for user in users:
for feed in user.feeds:
print(f"{user.username}\t{feed}\tActive")
-
def print_feeds_tsv_from_git(git_store: GitStore, username: Optional[str] = None) -> None:
+
def print_feeds_tsv_from_git(
+
git_store: GitStore, username: Optional[str] = None
+
) -> None:
"""Print feeds from git repository in TSV format."""
print("Username\tFeed URL\tStatus")
-
+
if username:
user = git_store.get_user(username)
users = [user] if user else []
else:
index = git_store._load_index()
users = list(index.users.values())
-
+
for user in users:
for feed in user.feeds:
print(f"{user.username}\t{feed}\tActive")
···
def print_entries_tsv(entries_by_user: list[list], usernames: list[str]) -> None:
"""Print entries in TSV format."""
print("User\tAtom ID\tTitle\tUpdated\tURL")
-
+
# Combine all entries with usernames
all_entries = []
for entries, username in zip(entries_by_user, usernames):
for entry in entries:
all_entries.append((username, entry))
-
+
# Sort by updated time (newest first)
all_entries.sort(key=lambda x: x[1].updated, reverse=True)
-
+
for username, entry in all_entries:
# Format updated time
updated_str = entry.updated.strftime("%Y-%m-%d %H:%M")
-
+
# Escape tabs and newlines in title to preserve TSV format
-
title = entry.title.replace('\t', ' ').replace('\n', ' ').replace('\r', ' ')
-
+
title = entry.title.replace("\t", " ").replace("\n", " ").replace("\r", " ")
+
print(f"{username}\t{entry.id}\t{title}\t{updated_str}\t{entry.link}")
+84 -55
src/thicket/core/feed_parser.py
···
"""Initialize the feed parser."""
self.user_agent = user_agent
self.allowed_tags = [
-
"a", "abbr", "acronym", "b", "blockquote", "br", "code", "em",
-
"i", "li", "ol", "p", "pre", "strong", "ul", "h1", "h2", "h3",
-
"h4", "h5", "h6", "img", "div", "span",
+
"a",
+
"abbr",
+
"acronym",
+
"b",
+
"blockquote",
+
"br",
+
"code",
+
"em",
+
"i",
+
"li",
+
"ol",
+
"p",
+
"pre",
+
"strong",
+
"ul",
+
"h1",
+
"h2",
+
"h3",
+
"h4",
+
"h5",
+
"h6",
+
"img",
+
"div",
+
"span",
]
self.allowed_attributes = {
"a": ["href", "title"],
···
response.raise_for_status()
return response.text
-
def parse_feed(self, content: str, source_url: Optional[HttpUrl] = None) -> tuple[FeedMetadata, list[AtomEntry]]:
+
def parse_feed(
+
self, content: str, source_url: Optional[HttpUrl] = None
+
) -> tuple[FeedMetadata, list[AtomEntry]]:
"""Parse feed content and return metadata and entries."""
parsed = feedparser.parse(content)
···
author_email = None
author_uri = None
-
if hasattr(feed, 'author_detail'):
-
author_name = feed.author_detail.get('name')
-
author_email = feed.author_detail.get('email')
-
author_uri = feed.author_detail.get('href')
-
elif hasattr(feed, 'author'):
+
if hasattr(feed, "author_detail"):
+
author_name = feed.author_detail.get("name")
+
author_email = feed.author_detail.get("email")
+
author_uri = feed.author_detail.get("href")
+
elif hasattr(feed, "author"):
author_name = feed.author
# Parse managing editor for RSS feeds
-
if not author_email and hasattr(feed, 'managingEditor'):
+
if not author_email and hasattr(feed, "managingEditor"):
author_email = feed.managingEditor
# Parse feed link
feed_link = None
-
if hasattr(feed, 'link'):
+
if hasattr(feed, "link"):
try:
feed_link = HttpUrl(feed.link)
except ValidationError:
···
icon = None
image_url = None
-
if hasattr(feed, 'image'):
+
if hasattr(feed, "image"):
try:
-
image_url = HttpUrl(feed.image.get('href', feed.image.get('url', '')))
+
image_url = HttpUrl(feed.image.get("href", feed.image.get("url", "")))
except (ValidationError, AttributeError):
pass
-
if hasattr(feed, 'icon'):
+
if hasattr(feed, "icon"):
try:
icon = HttpUrl(feed.icon)
except ValidationError:
pass
-
if hasattr(feed, 'logo'):
+
if hasattr(feed, "logo"):
try:
logo = HttpUrl(feed.logo)
except ValidationError:
pass
return FeedMetadata(
-
title=getattr(feed, 'title', None),
+
title=getattr(feed, "title", None),
author_name=author_name,
author_email=author_email,
author_uri=HttpUrl(author_uri) if author_uri else None,
···
logo=logo,
icon=icon,
image_url=image_url,
-
description=getattr(feed, 'description', None),
+
description=getattr(feed, "description", None),
)
-
def _normalize_entry(self, entry: feedparser.FeedParserDict, source_url: Optional[HttpUrl] = None) -> AtomEntry:
+
def _normalize_entry(
+
self, entry: feedparser.FeedParserDict, source_url: Optional[HttpUrl] = None
+
) -> AtomEntry:
"""Normalize an entry to Atom format."""
# Parse timestamps
-
updated = self._parse_timestamp(entry.get('updated_parsed') or entry.get('published_parsed'))
-
published = self._parse_timestamp(entry.get('published_parsed'))
+
updated = self._parse_timestamp(
+
entry.get("updated_parsed") or entry.get("published_parsed")
+
)
+
published = self._parse_timestamp(entry.get("published_parsed"))
# Parse content
content = self._extract_content(entry)
···
# Parse categories/tags
categories = []
-
if hasattr(entry, 'tags'):
-
categories = [tag.get('term', '') for tag in entry.tags if tag.get('term')]
+
if hasattr(entry, "tags"):
+
categories = [tag.get("term", "") for tag in entry.tags if tag.get("term")]
# Sanitize HTML content
if content:
content = self._sanitize_html(content)
-
summary = entry.get('summary', '')
+
summary = entry.get("summary", "")
if summary:
summary = self._sanitize_html(summary)
return AtomEntry(
-
id=entry.get('id', entry.get('link', '')),
-
title=entry.get('title', ''),
-
link=HttpUrl(entry.get('link', '')),
+
id=entry.get("id", entry.get("link", "")),
+
title=entry.get("title", ""),
+
link=HttpUrl(entry.get("link", "")),
updated=updated,
published=published,
summary=summary or None,
···
content_type=content_type,
author=author,
categories=categories,
-
rights=entry.get('rights', None),
+
rights=entry.get("rights", None),
source=str(source_url) if source_url else None,
)
···
def _extract_content(self, entry: feedparser.FeedParserDict) -> Optional[str]:
"""Extract the best content from an entry."""
# Prefer content over summary
-
if hasattr(entry, 'content') and entry.content:
+
if hasattr(entry, "content") and entry.content:
# Find the best content (prefer text/html, then text/plain)
for content_item in entry.content:
-
if content_item.get('type') in ['text/html', 'html']:
-
return content_item.get('value', '')
-
elif content_item.get('type') in ['text/plain', 'text']:
-
return content_item.get('value', '')
+
if content_item.get("type") in ["text/html", "html"]:
+
return content_item.get("value", "")
+
elif content_item.get("type") in ["text/plain", "text"]:
+
return content_item.get("value", "")
# Fallback to first content item
-
return entry.content[0].get('value', '')
+
return entry.content[0].get("value", "")
# Fallback to summary
-
return entry.get('summary', '')
+
return entry.get("summary", "")
def _extract_content_type(self, entry: feedparser.FeedParserDict) -> str:
"""Extract content type from entry."""
-
if hasattr(entry, 'content') and entry.content:
-
content_type = entry.content[0].get('type', 'html')
+
if hasattr(entry, "content") and entry.content:
+
content_type = entry.content[0].get("type", "html")
# Normalize content type
-
if content_type in ['text/html', 'html']:
-
return 'html'
-
elif content_type in ['text/plain', 'text']:
-
return 'text'
-
elif content_type == 'xhtml':
-
return 'xhtml'
-
return 'html'
+
if content_type in ["text/html", "html"]:
+
return "html"
+
elif content_type in ["text/plain", "text"]:
+
return "text"
+
elif content_type == "xhtml":
+
return "xhtml"
+
return "html"
def _extract_author(self, entry: feedparser.FeedParserDict) -> Optional[dict]:
"""Extract author information from entry."""
author = {}
-
if hasattr(entry, 'author_detail'):
-
author.update({
-
'name': entry.author_detail.get('name'),
-
'email': entry.author_detail.get('email'),
-
'uri': entry.author_detail.get('href'),
-
})
-
elif hasattr(entry, 'author'):
-
author['name'] = entry.author
+
if hasattr(entry, "author_detail"):
+
author.update(
+
{
+
"name": entry.author_detail.get("name"),
+
"email": entry.author_detail.get("email"),
+
"uri": entry.author_detail.get("href"),
+
}
+
)
+
elif hasattr(entry, "author"):
+
author["name"] = entry.author
return author if author else None
···
# Start with the path component
if parsed.path:
# Remove leading slash and replace problematic characters
-
safe_id = parsed.path.lstrip('/').replace('/', '_').replace('\\', '_')
+
safe_id = parsed.path.lstrip("/").replace("/", "_").replace("\\", "_")
else:
# Use the entire ID as fallback
safe_id = entry_id
···
# Replace problematic characters
safe_chars = []
for char in safe_id:
-
if char.isalnum() or char in '-_.':
+
if char.isalnum() or char in "-_.":
safe_chars.append(char)
else:
-
safe_chars.append('_')
+
safe_chars.append("_")
-
safe_id = ''.join(safe_chars)
+
safe_id = "".join(safe_chars)
# Ensure it's not too long (max 200 chars)
if len(safe_id) > 200:
+45 -18
src/thicket/core/git_store.py
···
"""Save the index to index.json."""
index_path = self.repo_path / "index.json"
with open(index_path, "w") as f:
-
json.dump(index.model_dump(mode="json", exclude_none=True), f, indent=2, default=str)
+
json.dump(
+
index.model_dump(mode="json", exclude_none=True),
+
f,
+
indent=2,
+
default=str,
+
)
def _load_index(self) -> GitStoreIndex:
"""Load the index from index.json."""
···
return DuplicateMap(**data)
-
def add_user(self, username: str, display_name: Optional[str] = None,
-
email: Optional[str] = None, homepage: Optional[str] = None,
-
icon: Optional[str] = None, feeds: Optional[list[str]] = None) -> UserMetadata:
+
def add_user(
+
self,
+
username: str,
+
display_name: Optional[str] = None,
+
email: Optional[str] = None,
+
homepage: Optional[str] = None,
+
icon: Optional[str] = None,
+
feeds: Optional[list[str]] = None,
+
) -> UserMetadata:
"""Add a new user to the Git store."""
index = self._load_index()
···
created=datetime.now(),
last_updated=datetime.now(),
)
-
# Update index
index.add_user(user_metadata)
···
user.update_timestamp()
-
# Update index
index.add_user(user)
self._save_index(index)
···
# Sanitize entry ID for filename
from .feed_parser import FeedParser
+
parser = FeedParser()
safe_id = parser.sanitize_entry_id(entry.id)
···
# Save entry
with open(entry_path, "w") as f:
-
json.dump(entry.model_dump(mode="json", exclude_none=True), f, indent=2, default=str)
+
json.dump(
+
entry.model_dump(mode="json", exclude_none=True),
+
f,
+
indent=2,
+
default=str,
+
)
# Update user metadata if new entry
if not entry_exists:
···
# Sanitize entry ID
from .feed_parser import FeedParser
+
parser = FeedParser()
safe_id = parser.sanitize_entry_id(entry_id)
···
return AtomEntry(**data)
-
def list_entries(self, username: str, limit: Optional[int] = None) -> list[AtomEntry]:
+
def list_entries(
+
self, username: str, limit: Optional[int] = None
+
) -> list[AtomEntry]:
"""List entries for a user."""
user = self.get_user(username)
if not user:
···
return []
entries = []
-
entry_files = sorted(user_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True)
-
+
entry_files = sorted(
+
user_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True
+
)
if limit:
entry_files = entry_files[:limit]
···
"total_entries": index.total_entries,
"total_duplicates": len(duplicates.duplicates),
"last_updated": index.last_updated,
-
"repository_size": sum(f.stat().st_size for f in self.repo_path.rglob("*") if f.is_file()),
+
"repository_size": sum(
+
f.stat().st_size for f in self.repo_path.rglob("*") if f.is_file()
+
),
}
-
def search_entries(self, query: str, username: Optional[str] = None,
-
limit: Optional[int] = None) -> list[tuple[str, AtomEntry]]:
+
def search_entries(
+
self, query: str, username: Optional[str] = None, limit: Optional[int] = None
+
) -> list[tuple[str, AtomEntry]]:
"""Search entries by content."""
results = []
···
entry = AtomEntry(**data)
# Simple text search in title, summary, and content
-
searchable_text = " ".join(filter(None, [
-
entry.title,
-
entry.summary or "",
-
entry.content or "",
-
])).lower()
+
searchable_text = " ".join(
+
filter(
+
None,
+
[
+
entry.title,
+
entry.summary or "",
+
entry.content or "",
+
],
+
)
+
).lower()
if query.lower() in searchable_text:
results.append((user.username, entry))
-301
src/thicket/core/reference_parser.py
···
-
"""Reference detection and parsing for blog entries."""
-
-
import re
-
from typing import Optional
-
from urllib.parse import urlparse
-
-
from ..models import AtomEntry
-
-
-
class BlogReference:
-
"""Represents a reference from one blog entry to another."""
-
-
def __init__(
-
self,
-
source_entry_id: str,
-
source_username: str,
-
target_url: str,
-
target_username: Optional[str] = None,
-
target_entry_id: Optional[str] = None,
-
):
-
self.source_entry_id = source_entry_id
-
self.source_username = source_username
-
self.target_url = target_url
-
self.target_username = target_username
-
self.target_entry_id = target_entry_id
-
-
def to_dict(self) -> dict:
-
"""Convert to dictionary for JSON serialization."""
-
result = {
-
"source_entry_id": self.source_entry_id,
-
"source_username": self.source_username,
-
"target_url": self.target_url,
-
}
-
-
# Only include optional fields if they are not None
-
if self.target_username is not None:
-
result["target_username"] = self.target_username
-
if self.target_entry_id is not None:
-
result["target_entry_id"] = self.target_entry_id
-
-
return result
-
-
@classmethod
-
def from_dict(cls, data: dict) -> "BlogReference":
-
"""Create from dictionary."""
-
return cls(
-
source_entry_id=data["source_entry_id"],
-
source_username=data["source_username"],
-
target_url=data["target_url"],
-
target_username=data.get("target_username"),
-
target_entry_id=data.get("target_entry_id"),
-
)
-
-
-
class ReferenceIndex:
-
"""Index of blog-to-blog references for creating threaded views."""
-
-
def __init__(self):
-
self.references: list[BlogReference] = []
-
self.outbound_refs: dict[
-
str, list[BlogReference]
-
] = {} # entry_id -> outbound refs
-
self.inbound_refs: dict[
-
str, list[BlogReference]
-
] = {} # entry_id -> inbound refs
-
self.user_domains: dict[str, set[str]] = {} # username -> set of domains
-
-
def add_reference(self, ref: BlogReference) -> None:
-
"""Add a reference to the index."""
-
self.references.append(ref)
-
-
# Update outbound references
-
source_key = f"{ref.source_username}:{ref.source_entry_id}"
-
if source_key not in self.outbound_refs:
-
self.outbound_refs[source_key] = []
-
self.outbound_refs[source_key].append(ref)
-
-
# Update inbound references if we can identify the target
-
if ref.target_username and ref.target_entry_id:
-
target_key = f"{ref.target_username}:{ref.target_entry_id}"
-
if target_key not in self.inbound_refs:
-
self.inbound_refs[target_key] = []
-
self.inbound_refs[target_key].append(ref)
-
-
def get_outbound_refs(self, username: str, entry_id: str) -> list[BlogReference]:
-
"""Get all outbound references from an entry."""
-
key = f"{username}:{entry_id}"
-
return self.outbound_refs.get(key, [])
-
-
def get_inbound_refs(self, username: str, entry_id: str) -> list[BlogReference]:
-
"""Get all inbound references to an entry."""
-
key = f"{username}:{entry_id}"
-
return self.inbound_refs.get(key, [])
-
-
def get_thread_members(self, username: str, entry_id: str) -> set[tuple[str, str]]:
-
"""Get all entries that are part of the same thread."""
-
visited = set()
-
to_visit = [(username, entry_id)]
-
thread_members = set()
-
-
while to_visit:
-
current_user, current_entry = to_visit.pop()
-
if (current_user, current_entry) in visited:
-
continue
-
-
visited.add((current_user, current_entry))
-
thread_members.add((current_user, current_entry))
-
-
# Add outbound references
-
for ref in self.get_outbound_refs(current_user, current_entry):
-
if ref.target_username and ref.target_entry_id:
-
to_visit.append((ref.target_username, ref.target_entry_id))
-
-
# Add inbound references
-
for ref in self.get_inbound_refs(current_user, current_entry):
-
to_visit.append((ref.source_username, ref.source_entry_id))
-
-
return thread_members
-
-
def to_dict(self) -> dict:
-
"""Convert to dictionary for JSON serialization."""
-
return {
-
"references": [ref.to_dict() for ref in self.references],
-
"user_domains": {k: list(v) for k, v in self.user_domains.items()},
-
}
-
-
@classmethod
-
def from_dict(cls, data: dict) -> "ReferenceIndex":
-
"""Create from dictionary."""
-
index = cls()
-
for ref_data in data.get("references", []):
-
ref = BlogReference.from_dict(ref_data)
-
index.add_reference(ref)
-
-
for username, domains in data.get("user_domains", {}).items():
-
index.user_domains[username] = set(domains)
-
-
return index
-
-
-
class ReferenceParser:
-
"""Parses blog entries to detect references to other blogs."""
-
-
def __init__(self):
-
# Common blog platforms and patterns
-
self.blog_patterns = [
-
r"https?://[^/]+\.(?:org|com|net|io|dev|me|co\.uk)/.*", # Common blog domains
-
r"https?://[^/]+\.github\.io/.*", # GitHub Pages
-
r"https?://[^/]+\.substack\.com/.*", # Substack
-
r"https?://medium\.com/.*", # Medium
-
r"https?://[^/]+\.wordpress\.com/.*", # WordPress.com
-
r"https?://[^/]+\.blogspot\.com/.*", # Blogger
-
]
-
-
# Compile regex patterns
-
self.link_pattern = re.compile(
-
r'<a[^>]+href="([^"]+)"[^>]*>(.*?)</a>', re.IGNORECASE | re.DOTALL
-
)
-
self.url_pattern = re.compile(r'https?://[^\s<>"]+')
-
-
def extract_links_from_html(self, html_content: str) -> list[tuple[str, str]]:
-
"""Extract all links from HTML content."""
-
links = []
-
-
# Extract links from <a> tags
-
for match in self.link_pattern.finditer(html_content):
-
url = match.group(1)
-
text = re.sub(
-
r"<[^>]+>", "", match.group(2)
-
).strip() # Remove HTML tags from link text
-
links.append((url, text))
-
-
return links
-
-
def is_blog_url(self, url: str) -> bool:
-
"""Check if a URL likely points to a blog post."""
-
for pattern in self.blog_patterns:
-
if re.match(pattern, url):
-
return True
-
return False
-
-
def resolve_target_user(
-
self, url: str, user_domains: dict[str, set[str]]
-
) -> Optional[str]:
-
"""Try to resolve a URL to a known user based on domain mapping."""
-
parsed_url = urlparse(url)
-
domain = parsed_url.netloc.lower()
-
-
for username, domains in user_domains.items():
-
if domain in domains:
-
return username
-
-
return None
-
-
def extract_references(
-
self, entry: AtomEntry, username: str, user_domains: dict[str, set[str]]
-
) -> list[BlogReference]:
-
"""Extract all blog references from an entry."""
-
references = []
-
-
# Combine all text content for analysis
-
content_to_search = []
-
if entry.content:
-
content_to_search.append(entry.content)
-
if entry.summary:
-
content_to_search.append(entry.summary)
-
-
for content in content_to_search:
-
links = self.extract_links_from_html(content)
-
-
for url, _link_text in links:
-
# Skip internal links (same domain as the entry)
-
entry_domain = (
-
urlparse(str(entry.link)).netloc.lower() if entry.link else ""
-
)
-
link_domain = urlparse(url).netloc.lower()
-
-
if link_domain == entry_domain:
-
continue
-
-
# Check if this looks like a blog URL
-
if not self.is_blog_url(url):
-
continue
-
-
# Try to resolve to a known user
-
target_username = self.resolve_target_user(url, user_domains)
-
-
ref = BlogReference(
-
source_entry_id=entry.id,
-
source_username=username,
-
target_url=url,
-
target_username=target_username,
-
target_entry_id=None, # Will be resolved later if possible
-
)
-
-
references.append(ref)
-
-
return references
-
-
def build_user_domain_mapping(self, git_store: "GitStore") -> dict[str, set[str]]:
-
"""Build mapping of usernames to their known domains."""
-
user_domains = {}
-
index = git_store._load_index()
-
-
for username, user_metadata in index.users.items():
-
domains = set()
-
-
# Add domains from feeds
-
for feed_url in user_metadata.feeds:
-
domain = urlparse(feed_url).netloc.lower()
-
if domain:
-
domains.add(domain)
-
-
# Add domain from homepage
-
if user_metadata.homepage:
-
domain = urlparse(str(user_metadata.homepage)).netloc.lower()
-
if domain:
-
domains.add(domain)
-
-
user_domains[username] = domains
-
-
return user_domains
-
-
def resolve_target_entry_ids(
-
self, references: list[BlogReference], git_store: "GitStore"
-
) -> list[BlogReference]:
-
"""Resolve target_entry_id for references that have target_username but no target_entry_id."""
-
resolved_refs = []
-
-
for ref in references:
-
# If we already have a target_entry_id, keep the reference as-is
-
if ref.target_entry_id is not None:
-
resolved_refs.append(ref)
-
continue
-
-
# If we don't have a target_username, we can't resolve it
-
if ref.target_username is None:
-
resolved_refs.append(ref)
-
continue
-
-
# Try to find the entry by matching the URL
-
entries = git_store.list_entries(ref.target_username)
-
resolved_entry_id = None
-
-
for entry in entries:
-
# Check if the entry's link matches the target URL
-
if entry.link and str(entry.link) == ref.target_url:
-
resolved_entry_id = entry.id
-
break
-
-
# Create a new reference with the resolved target_entry_id
-
resolved_ref = BlogReference(
-
source_entry_id=ref.source_entry_id,
-
source_username=ref.source_username,
-
target_url=ref.target_url,
-
target_username=ref.target_username,
-
target_entry_id=resolved_entry_id,
-
)
-
resolved_refs.append(resolved_ref)
-
-
return resolved_refs
+24
src/thicket/models/config.py
···
git_store: Path
cache_dir: Path
users: list[UserConfig] = []
+
+
def find_user(self, username: str) -> Optional[UserConfig]:
+
"""Find a user by username."""
+
for user in self.users:
+
if user.username == username:
+
return user
+
return None
+
+
def add_user(self, user: UserConfig) -> bool:
+
"""Add a user to the configuration. Returns True if added, False if already exists."""
+
if self.find_user(user.username) is not None:
+
return False
+
self.users.append(user)
+
return True
+
+
def add_feed_to_user(self, username: str, feed_url: HttpUrl) -> bool:
+
"""Add a feed to an existing user. Returns True if added, False if user not found or feed already exists."""
+
user = self.find_user(username)
+
if user is None:
+
return False
+
if feed_url in user.feeds:
+
return False
+
user.feeds.append(feed_url)
+
return True
+2 -2
src/thicket/models/feed.py
···
"""Feed and entry models for thicket."""
from datetime import datetime
-
from typing import TYPE_CHECKING, Optional
+
from typing import TYPE_CHECKING, Any, Optional
from pydantic import BaseModel, ConfigDict, EmailStr, HttpUrl
···
summary: Optional[str] = None
content: Optional[str] = None # Full body content from Atom entry
content_type: Optional[str] = "html" # text, html, xhtml
-
author: Optional[dict] = None
+
author: Optional[dict[str, Any]] = None
categories: list[str] = []
rights: Optional[str] = None # Copyright info
source: Optional[str] = None # Source feed URL
+1 -3
src/thicket/models/user.py
···
class GitStoreIndex(BaseModel):
"""Index of all users and their directories in the Git store."""
-
model_config = ConfigDict(
-
json_encoders={datetime: lambda v: v.isoformat()}
-
)
+
model_config = ConfigDict(json_encoders={datetime: lambda v: v.isoformat()})
users: dict[str, UserMetadata] = {} # username -> UserMetadata
created: datetime
+9 -1
uv.lock
···
version = 1
-
revision = 2
+
revision = 3
requires-python = ">=3.9"
resolution-markers = [
"python_full_version >= '3.10'",
···
{ name = "types-pyyaml" },
]
+
[package.dev-dependencies]
+
dev = [
+
{ name = "pytest" },
+
]
+
[package.metadata]
requires-dist = [
{ name = "black", marker = "extra == 'dev'", specifier = ">=24.0.0" },
···
{ name = "types-pyyaml", marker = "extra == 'dev'", specifier = ">=6.0.0" },
]
provides-extras = ["dev"]
+
+
[package.metadata.requires-dev]
+
dev = [{ name = "pytest", specifier = ">=8.4.1" }]
[[package]]
name = "tomli"