Manage Atom feeds in a persistent git repository

Implement complete thicket CLI application

Core Features:
- Modern CLI with Typer + Rich for beautiful terminal output
- Universal feed parser supporting RSS/Atom with auto-discovery
- Git storage system with structured JSON entries
- Duplicate management with manual curation
- Comprehensive test suite with pytest

Components:
- Data models: Pydantic models for config, feeds, users
- Core logic: FeedParser and GitStore classes
- CLI commands: init, add, sync, list, duplicates
- Tests: Complete test coverage for all components
- Documentation: README with usage examples

Architecture:
- src/thicket/ package structure
- Async HTTP with httpx for feed fetching
- HTML sanitization with bleach for security
- Modern Python packaging with pyproject.toml

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

+178
README.md
···
+
# Thicket
+
+
A modern CLI tool for persisting Atom/RSS feeds in Git repositories, designed to enable distributed webblog comment structures.
+
+
## Features
+
+
- **Feed Auto-Discovery**: Automatically extracts user metadata from Atom/RSS feeds
+
- **Git Storage**: Stores feed entries in a Git repository with full history
+
- **Duplicate Management**: Manual curation of duplicate entries across feeds
+
- **Modern CLI**: Built with Typer and Rich for beautiful terminal output
+
- **Comprehensive Parsing**: Supports RSS 0.9x, RSS 1.0, RSS 2.0, and Atom feeds
+
- **Cron-Friendly**: Designed for scheduled execution
+
+
## Installation
+
+
```bash
+
# Install from source
+
pip install -e .
+
+
# Or install with dev dependencies
+
pip install -e .[dev]
+
```
+
+
## Quick Start
+
+
1. **Initialize a new thicket repository:**
+
```bash
+
thicket init ./my-feeds
+
```
+
+
2. **Add a user with their feed:**
+
```bash
+
thicket add user "alice" --feed "https://alice.example.com/feed.xml"
+
```
+
+
3. **Sync feeds to download entries:**
+
```bash
+
thicket sync --all
+
```
+
+
4. **List users and feeds:**
+
```bash
+
thicket list users
+
thicket list feeds
+
thicket list entries
+
```
+
+
## Commands
+
+
### Initialize
+
```bash
+
thicket init <git-store-path> [--cache-dir <path>] [--config <config-file>]
+
```
+
+
### Add Users and Feeds
+
```bash
+
# Add user with auto-discovery
+
thicket add user "username" --feed "https://example.com/feed.xml"
+
+
# Add user with manual metadata
+
thicket add user "username" \
+
--feed "https://example.com/feed.xml" \
+
--email "user@example.com" \
+
--homepage "https://example.com" \
+
--display-name "User Name"
+
+
# Add additional feed to existing user
+
thicket add feed "username" "https://example.com/other-feed.xml"
+
```
+
+
### Sync Feeds
+
```bash
+
# Sync all users
+
thicket sync --all
+
+
# Sync specific user
+
thicket sync --user "username"
+
+
# Dry run (preview changes)
+
thicket sync --all --dry-run
+
```
+
+
### List Information
+
```bash
+
# List all users
+
thicket list users
+
+
# List all feeds
+
thicket list feeds
+
+
# List feeds for specific user
+
thicket list feeds --user "username"
+
+
# List recent entries
+
thicket list entries --limit 20
+
+
# List entries for specific user
+
thicket list entries --user "username"
+
```
+
+
### Manage Duplicates
+
```bash
+
# List duplicate mappings
+
thicket duplicates list
+
+
# Mark entries as duplicates
+
thicket duplicates add "https://example.com/dup" "https://example.com/canonical"
+
+
# Remove duplicate mapping
+
thicket duplicates remove "https://example.com/dup"
+
```
+
+
## Configuration
+
+
Thicket uses a YAML configuration file (default: `thicket.yaml`):
+
+
```yaml
+
git_store: ./feeds-repo
+
cache_dir: ~/.cache/thicket
+
users:
+
- username: alice
+
feeds:
+
- https://alice.example.com/feed.xml
+
email: alice@example.com
+
homepage: https://alice.example.com
+
display_name: Alice
+
```
+
+
## Git Repository Structure
+
+
```
+
feeds-repo/
+
├── index.json # User directory index
+
├── duplicates.json # Duplicate entry mappings
+
├── alice/
+
│ ├── metadata.json # User metadata
+
│ ├── entry_id_1.json # Feed entries
+
│ └── entry_id_2.json
+
└── bob/
+
└── ...
+
```
+
+
## Development
+
+
### Setup
+
```bash
+
# Install in development mode
+
pip install -e .[dev]
+
+
# Run tests
+
pytest
+
+
# Run linting
+
ruff check src/
+
black --check src/
+
+
# Run type checking
+
mypy src/
+
```
+
+
### Architecture
+
+
- **CLI**: Modern interface with Typer and Rich
+
- **Feed Processing**: Universal parsing with feedparser
+
- **Git Storage**: Structured storage with GitPython
+
- **Data Models**: Pydantic for validation and serialization
+
- **Async HTTP**: httpx for efficient feed fetching
+
+
## Use Cases
+
+
- **Blog Aggregation**: Collect and archive blog posts from multiple sources
+
- **Comment Networks**: Enable distributed commenting systems
+
- **Feed Archival**: Preserve feed history beyond typical feed depth limits
+
- **Content Curation**: Manage and deduplicate content across feeds
+
+
## License
+
+
MIT License - see LICENSE file for details.
+169
pyproject.toml
···
+
[build-system]
+
requires = ["hatchling"]
+
build-backend = "hatchling.build"
+
+
[project]
+
name = "thicket"
+
dynamic = ["version"]
+
description = "A CLI tool for persisting Atom/RSS feeds in Git repositories"
+
readme = "README.md"
+
license = "MIT"
+
requires-python = ">=3.9"
+
authors = [
+
{name = "thicket", email = "thicket@example.com"},
+
]
+
classifiers = [
+
"Development Status :: 3 - Alpha",
+
"Intended Audience :: Developers",
+
"License :: OSI Approved :: MIT License",
+
"Operating System :: OS Independent",
+
"Programming Language :: Python :: 3",
+
"Programming Language :: Python :: 3.9",
+
"Programming Language :: Python :: 3.10",
+
"Programming Language :: Python :: 3.11",
+
"Programming Language :: Python :: 3.12",
+
"Programming Language :: Python :: 3.13",
+
"Topic :: Internet :: WWW/HTTP :: Dynamic Content :: News/Diary",
+
"Topic :: Software Development :: Version Control :: Git",
+
"Topic :: Text Processing :: Markup :: XML",
+
]
+
dependencies = [
+
"typer>=0.15.0",
+
"rich>=13.0.0",
+
"GitPython>=3.1.40",
+
"feedparser>=6.0.11",
+
"pydantic>=2.11.0",
+
"pydantic-settings>=2.10.0",
+
"httpx>=0.28.0",
+
"pendulum>=3.0.0",
+
"bleach>=6.0.0",
+
"platformdirs>=4.0.0",
+
"pyyaml>=6.0.0",
+
]
+
+
[project.optional-dependencies]
+
dev = [
+
"pytest>=8.0.0",
+
"pytest-asyncio>=0.24.0",
+
"pytest-cov>=6.0.0",
+
"black>=24.0.0",
+
"ruff>=0.8.0",
+
"mypy>=1.13.0",
+
"types-PyYAML>=6.0.0",
+
]
+
+
[project.urls]
+
Homepage = "https://github.com/example/thicket"
+
Documentation = "https://github.com/example/thicket"
+
Repository = "https://github.com/example/thicket"
+
"Bug Tracker" = "https://github.com/example/thicket/issues"
+
+
[project.scripts]
+
thicket = "thicket.cli.main:app"
+
+
[tool.hatch.version]
+
path = "src/thicket/__init__.py"
+
+
[tool.hatch.build.targets.wheel]
+
packages = ["src/thicket"]
+
+
[tool.black]
+
line-length = 88
+
target-version = ['py39']
+
include = '\.pyi?$'
+
extend-exclude = '''
+
/(
+
# directories
+
\.eggs
+
| \.git
+
| \.hg
+
| \.mypy_cache
+
| \.tox
+
| \.venv
+
| build
+
| dist
+
)/
+
'''
+
+
[tool.ruff]
+
target-version = "py39"
+
line-length = 88
+
select = [
+
"E", # pycodestyle errors
+
"W", # pycodestyle warnings
+
"F", # pyflakes
+
"I", # isort
+
"B", # flake8-bugbear
+
"C4", # flake8-comprehensions
+
"UP", # pyupgrade
+
]
+
ignore = [
+
"E501", # line too long, handled by black
+
"B008", # do not perform function calls in argument defaults
+
"C901", # too complex
+
]
+
+
[tool.ruff.per-file-ignores]
+
"__init__.py" = ["F401"]
+
+
[tool.mypy]
+
python_version = "3.9"
+
check_untyped_defs = true
+
disallow_any_generics = true
+
disallow_incomplete_defs = true
+
disallow_untyped_defs = true
+
no_implicit_optional = true
+
warn_redundant_casts = true
+
warn_unused_ignores = true
+
warn_return_any = true
+
strict_optional = true
+
+
[[tool.mypy.overrides]]
+
module = [
+
"feedparser",
+
"git",
+
"bleach",
+
]
+
ignore_missing_imports = true
+
+
[tool.pytest.ini_options]
+
testpaths = ["tests"]
+
python_files = ["test_*.py"]
+
python_classes = ["Test*"]
+
python_functions = ["test_*"]
+
addopts = [
+
"-ra",
+
"--strict-markers",
+
"--strict-config",
+
"--cov=src/thicket",
+
"--cov-report=term-missing",
+
"--cov-report=html",
+
"--cov-report=xml",
+
]
+
filterwarnings = [
+
"error",
+
"ignore::UserWarning",
+
"ignore::DeprecationWarning",
+
]
+
markers = [
+
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
+
"integration: marks tests as integration tests",
+
]
+
+
[tool.coverage.run]
+
source = ["src"]
+
branch = true
+
+
[tool.coverage.report]
+
exclude_lines = [
+
"pragma: no cover",
+
"def __repr__",
+
"if self.debug:",
+
"if settings.DEBUG",
+
"raise AssertionError",
+
"raise NotImplementedError",
+
"if 0:",
+
"if __name__ == .__main__.:",
+
"class .*\\bProtocol\\):",
+
"@(abc\\.)?abstractmethod",
+
]
+5
src/thicket/__init__.py
···
+
"""Thicket: A CLI tool for persisting Atom/RSS feeds in Git repositories."""
+
+
__version__ = "0.1.0"
+
__author__ = "thicket"
+
__email__ = "thicket@example.com"
+6
src/thicket/__main__.py
···
+
"""Entry point for running thicket as a module."""
+
+
from .cli.main import app
+
+
if __name__ == "__main__":
+
app()
+5
src/thicket/cli/__init__.py
···
+
"""CLI interface for thicket."""
+
+
from .main import app
+
+
__all__ = ["app"]
+6
src/thicket/cli/commands/__init__.py
···
+
"""CLI commands for thicket."""
+
+
# Import all commands to register them with the main app
+
from . import add, duplicates, init, list_cmd, sync
+
+
__all__ = ["add", "duplicates", "init", "list_cmd", "sync"]
+193
src/thicket/cli/commands/add.py
···
+
"""Add command for thicket."""
+
+
import asyncio
+
from pathlib import Path
+
from typing import Optional
+
+
import typer
+
from pydantic import HttpUrl, ValidationError
+
+
from ...core.feed_parser import FeedParser
+
from ...core.git_store import GitStore
+
from ...models import UserConfig
+
from ..main import app
+
from ..utils import (
+
create_progress,
+
load_config,
+
print_error,
+
print_info,
+
print_success,
+
save_config,
+
)
+
+
+
@app.command("add")
+
def add_command(
+
subcommand: str = typer.Argument(..., help="Subcommand: 'user' or 'feed'"),
+
username: str = typer.Argument(..., help="Username"),
+
feed_url: Optional[str] = typer.Argument(None, help="Feed URL (required for 'user' command)"),
+
email: Optional[str] = typer.Option(None, "--email", "-e", help="User email"),
+
homepage: Optional[str] = typer.Option(None, "--homepage", "-h", help="User homepage"),
+
icon: Optional[str] = typer.Option(None, "--icon", "-i", help="User icon URL"),
+
display_name: Optional[str] = typer.Option(None, "--display-name", "-d", help="User display name"),
+
config_file: Optional[Path] = typer.Option(
+
Path("thicket.yaml"), "--config", help="Configuration file path"
+
),
+
auto_discover: bool = typer.Option(
+
True, "--auto-discover/--no-auto-discover", help="Auto-discover user metadata from feed"
+
),
+
) -> None:
+
"""Add a user or feed to thicket."""
+
+
if subcommand == "user":
+
add_user(username, feed_url, email, homepage, icon, display_name, config_file, auto_discover)
+
elif subcommand == "feed":
+
add_feed(username, feed_url, config_file)
+
else:
+
print_error(f"Unknown subcommand: {subcommand}")
+
print_error("Use 'user' or 'feed'")
+
raise typer.Exit(1)
+
+
+
def add_user(
+
username: str,
+
feed_url: Optional[str],
+
email: Optional[str],
+
homepage: Optional[str],
+
icon: Optional[str],
+
display_name: Optional[str],
+
config_file: Path,
+
auto_discover: bool,
+
) -> None:
+
"""Add a new user with feed."""
+
+
if not feed_url:
+
print_error("Feed URL is required when adding a user")
+
raise typer.Exit(1)
+
+
# Validate feed URL
+
try:
+
validated_feed_url = HttpUrl(feed_url)
+
except ValidationError:
+
print_error(f"Invalid feed URL: {feed_url}")
+
raise typer.Exit(1)
+
+
# Load configuration
+
config = load_config(config_file)
+
+
# Check if user already exists
+
existing_user = config.find_user(username)
+
if existing_user:
+
print_error(f"User '{username}' already exists")
+
print_error("Use 'thicket add feed' to add additional feeds")
+
raise typer.Exit(1)
+
+
# Auto-discover metadata if enabled
+
discovered_metadata = None
+
if auto_discover:
+
discovered_metadata = asyncio.run(discover_feed_metadata(validated_feed_url))
+
+
# Create user config with manual overrides taking precedence
+
user_config = UserConfig(
+
username=username,
+
feeds=[validated_feed_url],
+
email=email or (discovered_metadata.author_email if discovered_metadata else None),
+
homepage=HttpUrl(homepage) if homepage else (discovered_metadata.author_uri or discovered_metadata.link if discovered_metadata else None),
+
icon=HttpUrl(icon) if icon else (discovered_metadata.logo or discovered_metadata.icon or discovered_metadata.image_url if discovered_metadata else None),
+
display_name=display_name or (discovered_metadata.author_name or discovered_metadata.title if discovered_metadata else None),
+
)
+
+
# Add user to configuration
+
config.add_user(user_config)
+
+
# Save configuration
+
save_config(config, config_file)
+
+
# Add user to Git store
+
git_store = GitStore(config.git_store)
+
git_store.add_user(
+
username=username,
+
display_name=user_config.display_name,
+
email=user_config.email,
+
homepage=str(user_config.homepage) if user_config.homepage else None,
+
icon=str(user_config.icon) if user_config.icon else None,
+
feeds=[str(f) for f in user_config.feeds],
+
)
+
+
# Commit changes
+
git_store.commit_changes(f"Add user: {username}")
+
+
print_success(f"Added user '{username}' with feed: {feed_url}")
+
+
if discovered_metadata and auto_discover:
+
print_info("Auto-discovered metadata:")
+
if user_config.display_name:
+
print_info(f" Display name: {user_config.display_name}")
+
if user_config.email:
+
print_info(f" Email: {user_config.email}")
+
if user_config.homepage:
+
print_info(f" Homepage: {user_config.homepage}")
+
if user_config.icon:
+
print_info(f" Icon: {user_config.icon}")
+
+
+
def add_feed(username: str, feed_url: Optional[str], config_file: Path) -> None:
+
"""Add a feed to an existing user."""
+
+
if not feed_url:
+
print_error("Feed URL is required")
+
raise typer.Exit(1)
+
+
# Validate feed URL
+
try:
+
validated_feed_url = HttpUrl(feed_url)
+
except ValidationError:
+
print_error(f"Invalid feed URL: {feed_url}")
+
raise typer.Exit(1)
+
+
# Load configuration
+
config = load_config(config_file)
+
+
# Check if user exists
+
user = config.find_user(username)
+
if not user:
+
print_error(f"User '{username}' not found")
+
print_error("Use 'thicket add user' to add a new user")
+
raise typer.Exit(1)
+
+
# Check if feed already exists
+
if validated_feed_url in user.feeds:
+
print_error(f"Feed already exists for user '{username}': {feed_url}")
+
raise typer.Exit(1)
+
+
# Add feed to user
+
if config.add_feed_to_user(username, validated_feed_url):
+
save_config(config, config_file)
+
+
# Update Git store
+
git_store = GitStore(config.git_store)
+
git_store.update_user(username, feeds=[str(f) for f in user.feeds])
+
git_store.commit_changes(f"Add feed to user {username}: {feed_url}")
+
+
print_success(f"Added feed to user '{username}': {feed_url}")
+
else:
+
print_error(f"Failed to add feed to user '{username}'")
+
raise typer.Exit(1)
+
+
+
async def discover_feed_metadata(feed_url: HttpUrl):
+
"""Discover metadata from a feed URL."""
+
try:
+
with create_progress() as progress:
+
task = progress.add_task("Discovering feed metadata...", total=None)
+
+
parser = FeedParser()
+
content = await parser.fetch_feed(feed_url)
+
metadata, _ = parser.parse_feed(content, feed_url)
+
+
progress.update(task, completed=True)
+
return metadata
+
+
except Exception as e:
+
print_error(f"Failed to discover feed metadata: {e}")
+
return None
+121
src/thicket/cli/commands/duplicates.py
···
+
"""Duplicates command for thicket."""
+
+
from pathlib import Path
+
from typing import Optional
+
+
import typer
+
from rich.table import Table
+
+
from ...core.git_store import GitStore
+
from ..main import app
+
from ..utils import (
+
console,
+
load_config,
+
print_error,
+
print_info,
+
print_success,
+
)
+
+
+
@app.command("duplicates")
+
def duplicates_command(
+
action: str = typer.Argument(..., help="Action: 'list', 'add', 'remove'"),
+
duplicate_id: Optional[str] = typer.Argument(None, help="Duplicate entry ID"),
+
canonical_id: Optional[str] = typer.Argument(None, help="Canonical entry ID"),
+
config_file: Optional[Path] = typer.Option(
+
Path("thicket.yaml"), "--config", help="Configuration file path"
+
),
+
) -> None:
+
"""Manage duplicate entry mappings."""
+
+
# Load configuration
+
config = load_config(config_file)
+
+
# Initialize Git store
+
git_store = GitStore(config.git_store)
+
+
if action == "list":
+
list_duplicates(git_store)
+
elif action == "add":
+
add_duplicate(git_store, duplicate_id, canonical_id)
+
elif action == "remove":
+
remove_duplicate(git_store, duplicate_id)
+
else:
+
print_error(f"Unknown action: {action}")
+
print_error("Use 'list', 'add', or 'remove'")
+
raise typer.Exit(1)
+
+
+
def list_duplicates(git_store: GitStore) -> None:
+
"""List all duplicate mappings."""
+
duplicates = git_store.get_duplicates()
+
+
if not duplicates.duplicates:
+
print_info("No duplicate mappings found")
+
return
+
+
table = Table(title="Duplicate Entry Mappings")
+
table.add_column("Duplicate ID", style="red")
+
table.add_column("Canonical ID", style="green")
+
+
for duplicate_id, canonical_id in duplicates.duplicates.items():
+
table.add_row(duplicate_id, canonical_id)
+
+
console.print(table)
+
print_info(f"Total duplicates: {len(duplicates.duplicates)}")
+
+
+
def add_duplicate(git_store: GitStore, duplicate_id: Optional[str], canonical_id: Optional[str]) -> None:
+
"""Add a duplicate mapping."""
+
if not duplicate_id:
+
print_error("Duplicate ID is required")
+
raise typer.Exit(1)
+
+
if not canonical_id:
+
print_error("Canonical ID is required")
+
raise typer.Exit(1)
+
+
# Check if duplicate_id already exists
+
duplicates = git_store.get_duplicates()
+
if duplicates.is_duplicate(duplicate_id):
+
existing_canonical = duplicates.get_canonical(duplicate_id)
+
print_error(f"Duplicate ID already mapped to: {existing_canonical}")
+
print_error("Use 'remove' first to change the mapping")
+
raise typer.Exit(1)
+
+
# Check if we're trying to make a canonical ID point to itself
+
if duplicate_id == canonical_id:
+
print_error("Duplicate ID cannot be the same as canonical ID")
+
raise typer.Exit(1)
+
+
# Add the mapping
+
git_store.add_duplicate(duplicate_id, canonical_id)
+
+
# Commit changes
+
git_store.commit_changes(f"Add duplicate mapping: {duplicate_id} -> {canonical_id}")
+
+
print_success(f"Added duplicate mapping: {duplicate_id} -> {canonical_id}")
+
+
+
def remove_duplicate(git_store: GitStore, duplicate_id: Optional[str]) -> None:
+
"""Remove a duplicate mapping."""
+
if not duplicate_id:
+
print_error("Duplicate ID is required")
+
raise typer.Exit(1)
+
+
# Check if mapping exists
+
duplicates = git_store.get_duplicates()
+
if not duplicates.is_duplicate(duplicate_id):
+
print_error(f"No duplicate mapping found for: {duplicate_id}")
+
raise typer.Exit(1)
+
+
canonical_id = duplicates.get_canonical(duplicate_id)
+
+
# Remove the mapping
+
if git_store.remove_duplicate(duplicate_id):
+
# Commit changes
+
git_store.commit_changes(f"Remove duplicate mapping: {duplicate_id} -> {canonical_id}")
+
print_success(f"Removed duplicate mapping: {duplicate_id} -> {canonical_id}")
+
else:
+
print_error(f"Failed to remove duplicate mapping: {duplicate_id}")
+
raise typer.Exit(1)
+77
src/thicket/cli/commands/init.py
···
+
"""Initialize command for thicket."""
+
+
from pathlib import Path
+
from typing import Optional
+
+
import typer
+
from pydantic import ValidationError
+
+
from ...core.git_store import GitStore
+
from ...models import ThicketConfig
+
from ..main import app
+
from ..utils import print_error, print_success, save_config
+
+
+
@app.command()
+
def init(
+
git_store: Path = typer.Argument(..., help="Path to Git repository for storing feeds"),
+
cache_dir: Optional[Path] = typer.Option(
+
None, "--cache-dir", "-c", help="Cache directory (default: ~/.cache/thicket)"
+
),
+
config_file: Optional[Path] = typer.Option(
+
None, "--config", help="Configuration file path (default: thicket.yaml)"
+
),
+
force: bool = typer.Option(
+
False, "--force", "-f", help="Overwrite existing configuration"
+
),
+
) -> None:
+
"""Initialize a new thicket configuration and Git store."""
+
+
# Set default paths
+
if cache_dir is None:
+
from platformdirs import user_cache_dir
+
cache_dir = Path(user_cache_dir("thicket"))
+
+
if config_file is None:
+
config_file = Path("thicket.yaml")
+
+
# Check if config already exists
+
if config_file.exists() and not force:
+
print_error(f"Configuration file already exists: {config_file}")
+
print_error("Use --force to overwrite")
+
raise typer.Exit(1)
+
+
# Create cache directory
+
cache_dir.mkdir(parents=True, exist_ok=True)
+
+
# Create Git store
+
try:
+
git_store_obj = GitStore(git_store)
+
print_success(f"Initialized Git store at: {git_store}")
+
except Exception as e:
+
print_error(f"Failed to initialize Git store: {e}")
+
raise typer.Exit(1)
+
+
# Create configuration
+
try:
+
config = ThicketConfig(
+
git_store=git_store,
+
cache_dir=cache_dir,
+
users=[]
+
)
+
+
save_config(config, config_file)
+
print_success(f"Created configuration file: {config_file}")
+
+
except ValidationError as e:
+
print_error(f"Invalid configuration: {e}")
+
raise typer.Exit(1)
+
except Exception as e:
+
print_error(f"Failed to create configuration: {e}")
+
raise typer.Exit(1)
+
+
print_success("Thicket initialized successfully!")
+
print_success(f"Git store: {git_store}")
+
print_success(f"Cache directory: {cache_dir}")
+
print_success(f"Configuration: {config_file}")
+
print_success("Run 'thicket add user' to add your first user and feed.")
+146
src/thicket/cli/commands/list_cmd.py
···
+
"""List command for thicket."""
+
+
from pathlib import Path
+
from typing import Optional
+
+
import typer
+
from rich.table import Table
+
+
from ...core.git_store import GitStore
+
from ..main import app
+
from ..utils import (
+
console,
+
load_config,
+
print_error,
+
print_feeds_table,
+
print_info,
+
print_users_table,
+
)
+
+
+
@app.command("list")
+
def list_command(
+
what: str = typer.Argument(..., help="What to list: 'users', 'feeds', 'entries'"),
+
user: Optional[str] = typer.Option(
+
None, "--user", "-u", help="Filter by specific user"
+
),
+
limit: Optional[int] = typer.Option(
+
None, "--limit", "-l", help="Limit number of results"
+
),
+
config_file: Optional[Path] = typer.Option(
+
Path("thicket.yaml"), "--config", help="Configuration file path"
+
),
+
) -> None:
+
"""List users, feeds, or entries."""
+
+
# Load configuration
+
config = load_config(config_file)
+
+
if what == "users":
+
list_users(config)
+
elif what == "feeds":
+
list_feeds(config, user)
+
elif what == "entries":
+
list_entries(config, user, limit)
+
else:
+
print_error(f"Unknown list type: {what}")
+
print_error("Use 'users', 'feeds', or 'entries'")
+
raise typer.Exit(1)
+
+
+
def list_users(config) -> None:
+
"""List all users."""
+
if not config.users:
+
print_info("No users configured")
+
return
+
+
print_users_table(config)
+
+
+
def list_feeds(config, username: Optional[str] = None) -> None:
+
"""List feeds, optionally filtered by user."""
+
if username:
+
user = config.find_user(username)
+
if not user:
+
print_error(f"User '{username}' not found")
+
raise typer.Exit(1)
+
+
if not user.feeds:
+
print_info(f"No feeds configured for user '{username}'")
+
return
+
+
print_feeds_table(config, username)
+
+
+
def list_entries(config, username: Optional[str] = None, limit: Optional[int] = None) -> None:
+
"""List entries, optionally filtered by user."""
+
+
# Initialize Git store
+
git_store = GitStore(config.git_store)
+
+
if username:
+
# List entries for specific user
+
user = config.find_user(username)
+
if not user:
+
print_error(f"User '{username}' not found")
+
raise typer.Exit(1)
+
+
entries = git_store.list_entries(username, limit)
+
if not entries:
+
print_info(f"No entries found for user '{username}'")
+
return
+
+
print_entries_table([entries], [username])
+
+
else:
+
# List entries for all users
+
all_entries = []
+
all_usernames = []
+
+
for user in config.users:
+
entries = git_store.list_entries(user.username, limit)
+
if entries:
+
all_entries.append(entries)
+
all_usernames.append(user.username)
+
+
if not all_entries:
+
print_info("No entries found")
+
return
+
+
print_entries_table(all_entries, all_usernames)
+
+
+
def print_entries_table(entries_by_user: list[list], usernames: list[str]) -> None:
+
"""Print a table of entries."""
+
table = Table(title="Feed Entries")
+
table.add_column("User", style="cyan", no_wrap=True)
+
table.add_column("Title", style="bold")
+
table.add_column("Updated", style="blue")
+
table.add_column("URL", style="green")
+
+
# Combine all entries with usernames
+
all_entries = []
+
for entries, username in zip(entries_by_user, usernames):
+
for entry in entries:
+
all_entries.append((username, entry))
+
+
# Sort by updated time (newest first)
+
all_entries.sort(key=lambda x: x[1].updated, reverse=True)
+
+
for username, entry in all_entries:
+
# Format updated time
+
updated_str = entry.updated.strftime("%Y-%m-%d %H:%M")
+
+
# Truncate title if too long
+
title = entry.title
+
if len(title) > 50:
+
title = title[:47] + "..."
+
+
table.add_row(
+
username,
+
title,
+
updated_str,
+
str(entry.link),
+
)
+
+
console.print(table)
+142
src/thicket/cli/commands/sync.py
···
+
"""Sync command for thicket."""
+
+
import asyncio
+
from pathlib import Path
+
from typing import Optional
+
+
import typer
+
from rich.progress import track
+
+
from ...core.feed_parser import FeedParser
+
from ...core.git_store import GitStore
+
from ..main import app
+
from ..utils import (
+
create_progress,
+
load_config,
+
print_error,
+
print_info,
+
print_success,
+
)
+
+
+
@app.command()
+
def sync(
+
all_users: bool = typer.Option(
+
False, "--all", "-a", help="Sync all users and feeds"
+
),
+
user: Optional[str] = typer.Option(
+
None, "--user", "-u", help="Sync specific user only"
+
),
+
config_file: Optional[Path] = typer.Option(
+
Path("thicket.yaml"), "--config", help="Configuration file path"
+
),
+
dry_run: bool = typer.Option(
+
False, "--dry-run", help="Show what would be synced without making changes"
+
),
+
) -> None:
+
"""Sync feeds and store entries in Git repository."""
+
+
# Load configuration
+
config = load_config(config_file)
+
+
# Determine which users to sync
+
users_to_sync = []
+
if all_users:
+
users_to_sync = config.users
+
elif user:
+
user_config = config.find_user(user)
+
if not user_config:
+
print_error(f"User '{user}' not found")
+
raise typer.Exit(1)
+
users_to_sync = [user_config]
+
else:
+
print_error("Specify --all to sync all users or --user to sync a specific user")
+
raise typer.Exit(1)
+
+
if not users_to_sync:
+
print_info("No users configured to sync")
+
return
+
+
# Initialize Git store
+
git_store = GitStore(config.git_store)
+
+
# Sync each user
+
total_new_entries = 0
+
total_updated_entries = 0
+
+
for user_config in users_to_sync:
+
print_info(f"Syncing user: {user_config.username}")
+
+
user_new_entries = 0
+
user_updated_entries = 0
+
+
# Sync each feed for the user
+
for feed_url in track(user_config.feeds, description=f"Syncing {user_config.username}'s feeds"):
+
try:
+
new_entries, updated_entries = asyncio.run(
+
sync_feed(git_store, user_config.username, feed_url, dry_run)
+
)
+
user_new_entries += new_entries
+
user_updated_entries += updated_entries
+
+
except Exception as e:
+
print_error(f"Failed to sync feed {feed_url}: {e}")
+
continue
+
+
print_info(f"User {user_config.username}: {user_new_entries} new, {user_updated_entries} updated")
+
total_new_entries += user_new_entries
+
total_updated_entries += user_updated_entries
+
+
# Commit changes if not dry run
+
if not dry_run and (total_new_entries > 0 or total_updated_entries > 0):
+
commit_message = f"Sync feeds: {total_new_entries} new entries, {total_updated_entries} updated"
+
git_store.commit_changes(commit_message)
+
print_success(f"Committed changes: {commit_message}")
+
+
# Summary
+
if dry_run:
+
print_info(f"Dry run complete: would sync {total_new_entries} new entries, {total_updated_entries} updated")
+
else:
+
print_success(f"Sync complete: {total_new_entries} new entries, {total_updated_entries} updated")
+
+
+
async def sync_feed(git_store: GitStore, username: str, feed_url, dry_run: bool) -> tuple[int, int]:
+
"""Sync a single feed for a user."""
+
+
parser = FeedParser()
+
+
try:
+
# Fetch and parse feed
+
content = await parser.fetch_feed(feed_url)
+
metadata, entries = parser.parse_feed(content, feed_url)
+
+
new_entries = 0
+
updated_entries = 0
+
+
# Process each entry
+
for entry in entries:
+
try:
+
# Check if entry already exists
+
existing_entry = git_store.get_entry(username, entry.id)
+
+
if existing_entry:
+
# Check if entry has been updated
+
if existing_entry.updated != entry.updated:
+
if not dry_run:
+
git_store.store_entry(username, entry)
+
updated_entries += 1
+
else:
+
# New entry
+
if not dry_run:
+
git_store.store_entry(username, entry)
+
new_entries += 1
+
+
except Exception as e:
+
print_error(f"Failed to process entry {entry.id}: {e}")
+
continue
+
+
return new_entries, updated_entries
+
+
except Exception as e:
+
print_error(f"Failed to sync feed {feed_url}: {e}")
+
return 0, 0
+45
src/thicket/cli/main.py
···
+
"""Main CLI application using Typer."""
+
+
import typer
+
from rich.console import Console
+
+
from .. import __version__
+
+
app = typer.Typer(
+
name="thicket",
+
help="A CLI tool for persisting Atom/RSS feeds in Git repositories",
+
no_args_is_help=True,
+
rich_markup_mode="rich",
+
)
+
+
console = Console()
+
+
+
def version_callback(value: bool) -> None:
+
"""Show version and exit."""
+
if value:
+
console.print(f"thicket version {__version__}")
+
raise typer.Exit()
+
+
+
@app.callback()
+
def main(
+
version: bool = typer.Option(
+
None,
+
"--version",
+
"-v",
+
help="Show the version and exit",
+
callback=version_callback,
+
is_eager=True,
+
),
+
) -> None:
+
"""Thicket: A CLI tool for persisting Atom/RSS feeds in Git repositories."""
+
pass
+
+
+
# Import commands to register them
+
from .commands import duplicates, init, add, sync, list_cmd # noqa: E402
+
+
+
if __name__ == "__main__":
+
app()
+125
src/thicket/cli/utils.py
···
+
"""CLI utilities and helpers."""
+
+
from pathlib import Path
+
from typing import Optional
+
+
import typer
+
from rich.console import Console
+
from rich.progress import Progress, SpinnerColumn, TextColumn
+
from rich.table import Table
+
+
from ..models import ThicketConfig
+
+
console = Console()
+
+
+
def load_config(config_path: Optional[Path] = None) -> ThicketConfig:
+
"""Load thicket configuration from file or environment."""
+
if config_path and config_path.exists():
+
import yaml
+
+
with open(config_path) as f:
+
config_data = yaml.safe_load(f)
+
+
# Convert to ThicketConfig
+
return ThicketConfig(**config_data)
+
+
# Try to load from default locations or environment
+
try:
+
return ThicketConfig()
+
except Exception as e:
+
console.print(f"[red]Error loading configuration: {e}[/red]")
+
console.print("[yellow]Run 'thicket init' to create a new configuration.[/yellow]")
+
raise typer.Exit(1)
+
+
+
def save_config(config: ThicketConfig, config_path: Path) -> None:
+
"""Save thicket configuration to file."""
+
import yaml
+
+
config_data = config.model_dump(mode="json")
+
+
# Convert Path objects to strings for YAML serialization
+
config_data["git_store"] = str(config_data["git_store"])
+
config_data["cache_dir"] = str(config_data["cache_dir"])
+
+
with open(config_path, "w") as f:
+
yaml.dump(config_data, f, default_flow_style=False, sort_keys=False)
+
+
+
def create_progress() -> Progress:
+
"""Create a Rich progress display."""
+
return Progress(
+
SpinnerColumn(),
+
TextColumn("[progress.description]{task.description}"),
+
console=console,
+
transient=True,
+
)
+
+
+
def print_users_table(config: ThicketConfig) -> None:
+
"""Print a table of users and their feeds."""
+
table = Table(title="Users and Feeds")
+
table.add_column("Username", style="cyan", no_wrap=True)
+
table.add_column("Display Name", style="magenta")
+
table.add_column("Email", style="blue")
+
table.add_column("Homepage", style="green")
+
table.add_column("Feeds", style="yellow")
+
+
for user in config.users:
+
feeds_str = "\n".join(str(feed) for feed in user.feeds)
+
table.add_row(
+
user.username,
+
user.display_name or "",
+
user.email or "",
+
str(user.homepage) if user.homepage else "",
+
feeds_str,
+
)
+
+
console.print(table)
+
+
+
def print_feeds_table(config: ThicketConfig, username: Optional[str] = None) -> None:
+
"""Print a table of feeds, optionally filtered by username."""
+
table = Table(title=f"Feeds{f' for {username}' if username else ''}")
+
table.add_column("Username", style="cyan", no_wrap=True)
+
table.add_column("Feed URL", style="blue")
+
table.add_column("Status", style="green")
+
+
users = [config.find_user(username)] if username else config.users
+
users = [u for u in users if u is not None]
+
+
for user in users:
+
for feed in user.feeds:
+
table.add_row(
+
user.username,
+
str(feed),
+
"Active", # TODO: Add actual status checking
+
)
+
+
console.print(table)
+
+
+
def confirm_action(message: str, default: bool = False) -> bool:
+
"""Prompt for confirmation."""
+
return typer.confirm(message, default=default)
+
+
+
def print_success(message: str) -> None:
+
"""Print a success message."""
+
console.print(f"[green][/green] {message}")
+
+
+
def print_error(message: str) -> None:
+
"""Print an error message."""
+
console.print(f"[red][/red] {message}")
+
+
+
def print_warning(message: str) -> None:
+
"""Print a warning message."""
+
console.print(f"[yellow]�[/yellow] {message}")
+
+
+
def print_info(message: str) -> None:
+
"""Print an info message."""
+
console.print(f"[blue]9[/blue] {message}")
+6
src/thicket/core/__init__.py
···
+
"""Core business logic for thicket."""
+
+
from .feed_parser import FeedParser
+
from .git_store import GitStore
+
+
__all__ = ["FeedParser", "GitStore"]
+262
src/thicket/core/feed_parser.py
···
+
"""Feed parsing and normalization with auto-discovery."""
+
+
from datetime import datetime
+
from typing import Optional
+
from urllib.parse import urljoin, urlparse
+
+
import bleach
+
import feedparser
+
import httpx
+
from pydantic import HttpUrl, ValidationError
+
+
from ..models import AtomEntry, FeedMetadata
+
+
+
class FeedParser:
+
"""Parser for RSS/Atom feeds with normalization and auto-discovery."""
+
+
def __init__(self, user_agent: str = "thicket/0.1.0"):
+
"""Initialize the feed parser."""
+
self.user_agent = user_agent
+
self.allowed_tags = [
+
"a", "abbr", "acronym", "b", "blockquote", "br", "code", "em",
+
"i", "li", "ol", "p", "pre", "strong", "ul", "h1", "h2", "h3",
+
"h4", "h5", "h6", "img", "div", "span",
+
]
+
self.allowed_attributes = {
+
"a": ["href", "title"],
+
"abbr": ["title"],
+
"acronym": ["title"],
+
"img": ["src", "alt", "title", "width", "height"],
+
"blockquote": ["cite"],
+
}
+
+
async def fetch_feed(self, url: HttpUrl) -> str:
+
"""Fetch feed content from URL."""
+
async with httpx.AsyncClient() as client:
+
response = await client.get(
+
str(url),
+
headers={"User-Agent": self.user_agent},
+
timeout=30.0,
+
follow_redirects=True,
+
)
+
response.raise_for_status()
+
return response.text
+
+
def parse_feed(self, content: str, source_url: Optional[HttpUrl] = None) -> tuple[FeedMetadata, list[AtomEntry]]:
+
"""Parse feed content and return metadata and entries."""
+
parsed = feedparser.parse(content)
+
+
if parsed.bozo and parsed.bozo_exception:
+
# Try to continue with potentially malformed feed
+
pass
+
+
# Extract feed metadata
+
feed_meta = self._extract_feed_metadata(parsed.feed)
+
+
# Extract and normalize entries
+
entries = []
+
for entry in parsed.entries:
+
try:
+
atom_entry = self._normalize_entry(entry, source_url)
+
entries.append(atom_entry)
+
except Exception as e:
+
# Log error but continue processing other entries
+
print(f"Error processing entry {getattr(entry, 'id', 'unknown')}: {e}")
+
continue
+
+
return feed_meta, entries
+
+
def _extract_feed_metadata(self, feed: feedparser.FeedParserDict) -> FeedMetadata:
+
"""Extract metadata from feed for auto-discovery."""
+
# Parse author information
+
author_name = None
+
author_email = None
+
author_uri = None
+
+
if hasattr(feed, 'author_detail'):
+
author_name = feed.author_detail.get('name')
+
author_email = feed.author_detail.get('email')
+
author_uri = feed.author_detail.get('href')
+
elif hasattr(feed, 'author'):
+
author_name = feed.author
+
+
# Parse managing editor for RSS feeds
+
if not author_email and hasattr(feed, 'managingEditor'):
+
author_email = feed.managingEditor
+
+
# Parse feed link
+
feed_link = None
+
if hasattr(feed, 'link'):
+
try:
+
feed_link = HttpUrl(feed.link)
+
except ValidationError:
+
pass
+
+
# Parse image/icon/logo
+
logo = None
+
icon = None
+
image_url = None
+
+
if hasattr(feed, 'image'):
+
try:
+
image_url = HttpUrl(feed.image.get('href', feed.image.get('url', '')))
+
except (ValidationError, AttributeError):
+
pass
+
+
if hasattr(feed, 'icon'):
+
try:
+
icon = HttpUrl(feed.icon)
+
except ValidationError:
+
pass
+
+
if hasattr(feed, 'logo'):
+
try:
+
logo = HttpUrl(feed.logo)
+
except ValidationError:
+
pass
+
+
return FeedMetadata(
+
title=getattr(feed, 'title', None),
+
author_name=author_name,
+
author_email=author_email,
+
author_uri=HttpUrl(author_uri) if author_uri else None,
+
link=feed_link,
+
logo=logo,
+
icon=icon,
+
image_url=image_url,
+
description=getattr(feed, 'description', None),
+
)
+
+
def _normalize_entry(self, entry: feedparser.FeedParserDict, source_url: Optional[HttpUrl] = None) -> AtomEntry:
+
"""Normalize an entry to Atom format."""
+
# Parse timestamps
+
updated = self._parse_timestamp(entry.get('updated_parsed') or entry.get('published_parsed'))
+
published = self._parse_timestamp(entry.get('published_parsed'))
+
+
# Parse content
+
content = self._extract_content(entry)
+
content_type = self._extract_content_type(entry)
+
+
# Parse author
+
author = self._extract_author(entry)
+
+
# Parse categories/tags
+
categories = []
+
if hasattr(entry, 'tags'):
+
categories = [tag.get('term', '') for tag in entry.tags if tag.get('term')]
+
+
# Sanitize HTML content
+
if content:
+
content = self._sanitize_html(content)
+
+
summary = entry.get('summary', '')
+
if summary:
+
summary = self._sanitize_html(summary)
+
+
return AtomEntry(
+
id=entry.get('id', entry.get('link', '')),
+
title=entry.get('title', ''),
+
link=HttpUrl(entry.get('link', '')),
+
updated=updated,
+
published=published,
+
summary=summary or None,
+
content=content or None,
+
content_type=content_type,
+
author=author,
+
categories=categories,
+
rights=entry.get('rights', None),
+
source=str(source_url) if source_url else None,
+
)
+
+
def _parse_timestamp(self, time_struct) -> datetime:
+
"""Parse feedparser time struct to datetime."""
+
if time_struct:
+
return datetime(*time_struct[:6])
+
return datetime.now()
+
+
def _extract_content(self, entry: feedparser.FeedParserDict) -> Optional[str]:
+
"""Extract the best content from an entry."""
+
# Prefer content over summary
+
if hasattr(entry, 'content') and entry.content:
+
# Find the best content (prefer text/html, then text/plain)
+
for content_item in entry.content:
+
if content_item.get('type') in ['text/html', 'html']:
+
return content_item.get('value', '')
+
elif content_item.get('type') in ['text/plain', 'text']:
+
return content_item.get('value', '')
+
# Fallback to first content item
+
return entry.content[0].get('value', '')
+
+
# Fallback to summary
+
return entry.get('summary', '')
+
+
def _extract_content_type(self, entry: feedparser.FeedParserDict) -> str:
+
"""Extract content type from entry."""
+
if hasattr(entry, 'content') and entry.content:
+
content_type = entry.content[0].get('type', 'html')
+
# Normalize content type
+
if content_type in ['text/html', 'html']:
+
return 'html'
+
elif content_type in ['text/plain', 'text']:
+
return 'text'
+
elif content_type == 'xhtml':
+
return 'xhtml'
+
return 'html'
+
+
def _extract_author(self, entry: feedparser.FeedParserDict) -> Optional[dict]:
+
"""Extract author information from entry."""
+
author = {}
+
+
if hasattr(entry, 'author_detail'):
+
author.update({
+
'name': entry.author_detail.get('name'),
+
'email': entry.author_detail.get('email'),
+
'uri': entry.author_detail.get('href'),
+
})
+
elif hasattr(entry, 'author'):
+
author['name'] = entry.author
+
+
return author if author else None
+
+
def _sanitize_html(self, html: str) -> str:
+
"""Sanitize HTML content to prevent XSS."""
+
return bleach.clean(
+
html,
+
tags=self.allowed_tags,
+
attributes=self.allowed_attributes,
+
strip=True,
+
)
+
+
def sanitize_entry_id(self, entry_id: str) -> str:
+
"""Sanitize entry ID to be a safe filename."""
+
# Parse URL to get meaningful parts
+
parsed = urlparse(entry_id)
+
+
# Start with the path component
+
if parsed.path:
+
# Remove leading slash and replace problematic characters
+
safe_id = parsed.path.lstrip('/').replace('/', '_').replace('\\', '_')
+
else:
+
# Use the entire ID as fallback
+
safe_id = entry_id
+
+
# Replace problematic characters
+
safe_chars = []
+
for char in safe_id:
+
if char.isalnum() or char in '-_.':
+
safe_chars.append(char)
+
else:
+
safe_chars.append('_')
+
+
safe_id = ''.join(safe_chars)
+
+
# Ensure it's not too long (max 200 chars)
+
if len(safe_id) > 200:
+
safe_id = safe_id[:200]
+
+
# Ensure it's not empty
+
if not safe_id:
+
safe_id = "entry"
+
+
return safe_id
+321
src/thicket/core/git_store.py
···
+
"""Git repository operations for thicket."""
+
+
import json
+
from datetime import datetime
+
from pathlib import Path
+
from typing import Optional
+
+
import git
+
from git import Repo
+
+
from ..models import AtomEntry, DuplicateMap, GitStoreIndex, UserMetadata
+
+
+
class GitStore:
+
"""Manages the Git repository for storing feed entries."""
+
+
def __init__(self, repo_path: Path):
+
"""Initialize the Git store."""
+
self.repo_path = repo_path
+
self.repo: Optional[Repo] = None
+
self._ensure_repo()
+
+
def _ensure_repo(self) -> None:
+
"""Ensure the Git repository exists and is initialized."""
+
if not self.repo_path.exists():
+
self.repo_path.mkdir(parents=True, exist_ok=True)
+
+
try:
+
self.repo = Repo(self.repo_path)
+
except git.InvalidGitRepositoryError:
+
# Initialize new repository
+
self.repo = Repo.init(self.repo_path)
+
self._create_initial_structure()
+
+
def _create_initial_structure(self) -> None:
+
"""Create initial Git store structure."""
+
# Create index.json
+
index = GitStoreIndex(
+
created=datetime.now(),
+
last_updated=datetime.now(),
+
)
+
self._save_index(index)
+
+
# Create duplicates.json
+
duplicates = DuplicateMap()
+
self._save_duplicates(duplicates)
+
+
# Create initial commit
+
self.repo.index.add(["index.json", "duplicates.json"])
+
self.repo.index.commit("Initial thicket repository structure")
+
+
def _save_index(self, index: GitStoreIndex) -> None:
+
"""Save the index to index.json."""
+
index_path = self.repo_path / "index.json"
+
with open(index_path, "w") as f:
+
json.dump(index.model_dump(mode="json"), f, indent=2, default=str)
+
+
def _load_index(self) -> GitStoreIndex:
+
"""Load the index from index.json."""
+
index_path = self.repo_path / "index.json"
+
if not index_path.exists():
+
return GitStoreIndex(
+
created=datetime.now(),
+
last_updated=datetime.now(),
+
)
+
+
with open(index_path) as f:
+
data = json.load(f)
+
+
return GitStoreIndex(**data)
+
+
def _save_duplicates(self, duplicates: DuplicateMap) -> None:
+
"""Save duplicates map to duplicates.json."""
+
duplicates_path = self.repo_path / "duplicates.json"
+
with open(duplicates_path, "w") as f:
+
json.dump(duplicates.model_dump(), f, indent=2)
+
+
def _load_duplicates(self) -> DuplicateMap:
+
"""Load duplicates map from duplicates.json."""
+
duplicates_path = self.repo_path / "duplicates.json"
+
if not duplicates_path.exists():
+
return DuplicateMap()
+
+
with open(duplicates_path) as f:
+
data = json.load(f)
+
+
return DuplicateMap(**data)
+
+
def add_user(self, username: str, display_name: Optional[str] = None,
+
email: Optional[str] = None, homepage: Optional[str] = None,
+
icon: Optional[str] = None, feeds: Optional[list[str]] = None) -> UserMetadata:
+
"""Add a new user to the Git store."""
+
index = self._load_index()
+
+
# Create user directory
+
user_dir = self.repo_path / username
+
user_dir.mkdir(exist_ok=True)
+
+
# Create user metadata
+
user_metadata = UserMetadata(
+
username=username,
+
display_name=display_name,
+
email=email,
+
homepage=homepage,
+
icon=icon,
+
feeds=feeds or [],
+
directory=username,
+
created=datetime.now(),
+
last_updated=datetime.now(),
+
)
+
+
# Save user metadata
+
metadata_path = user_dir / "metadata.json"
+
with open(metadata_path, "w") as f:
+
json.dump(user_metadata.model_dump(mode="json"), f, indent=2, default=str)
+
+
# Update index
+
index.add_user(user_metadata)
+
self._save_index(index)
+
+
return user_metadata
+
+
def get_user(self, username: str) -> Optional[UserMetadata]:
+
"""Get user metadata by username."""
+
index = self._load_index()
+
return index.get_user(username)
+
+
def update_user(self, username: str, **kwargs) -> bool:
+
"""Update user metadata."""
+
index = self._load_index()
+
user = index.get_user(username)
+
+
if not user:
+
return False
+
+
# Update user metadata
+
for key, value in kwargs.items():
+
if hasattr(user, key) and value is not None:
+
setattr(user, key, value)
+
+
user.update_timestamp()
+
+
# Save user metadata
+
user_dir = self.repo_path / user.directory
+
metadata_path = user_dir / "metadata.json"
+
with open(metadata_path, "w") as f:
+
json.dump(user.model_dump(mode="json"), f, indent=2, default=str)
+
+
# Update index
+
index.add_user(user)
+
self._save_index(index)
+
+
return True
+
+
def store_entry(self, username: str, entry: AtomEntry) -> bool:
+
"""Store an entry in the user's directory."""
+
user = self.get_user(username)
+
if not user:
+
return False
+
+
# Sanitize entry ID for filename
+
from .feed_parser import FeedParser
+
parser = FeedParser()
+
safe_id = parser.sanitize_entry_id(entry.id)
+
+
# Create entry file
+
user_dir = self.repo_path / user.directory
+
entry_path = user_dir / f"{safe_id}.json"
+
+
# Check if entry already exists
+
entry_exists = entry_path.exists()
+
+
# Save entry
+
with open(entry_path, "w") as f:
+
json.dump(entry.model_dump(mode="json"), f, indent=2, default=str)
+
+
# Update user metadata if new entry
+
if not entry_exists:
+
user.increment_entry_count()
+
self.update_user(username, entry_count=user.entry_count)
+
+
return True
+
+
def get_entry(self, username: str, entry_id: str) -> Optional[AtomEntry]:
+
"""Get an entry by username and entry ID."""
+
user = self.get_user(username)
+
if not user:
+
return None
+
+
# Sanitize entry ID
+
from .feed_parser import FeedParser
+
parser = FeedParser()
+
safe_id = parser.sanitize_entry_id(entry_id)
+
+
entry_path = self.repo_path / user.directory / f"{safe_id}.json"
+
if not entry_path.exists():
+
return None
+
+
with open(entry_path) as f:
+
data = json.load(f)
+
+
return AtomEntry(**data)
+
+
def list_entries(self, username: str, limit: Optional[int] = None) -> list[AtomEntry]:
+
"""List entries for a user."""
+
user = self.get_user(username)
+
if not user:
+
return []
+
+
user_dir = self.repo_path / user.directory
+
if not user_dir.exists():
+
return []
+
+
entries = []
+
entry_files = sorted(user_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True)
+
+
# Filter out metadata.json
+
entry_files = [f for f in entry_files if f.name != "metadata.json"]
+
+
if limit:
+
entry_files = entry_files[:limit]
+
+
for entry_file in entry_files:
+
try:
+
with open(entry_file) as f:
+
data = json.load(f)
+
entries.append(AtomEntry(**data))
+
except Exception:
+
# Skip invalid entries
+
continue
+
+
return entries
+
+
def get_duplicates(self) -> DuplicateMap:
+
"""Get the duplicates map."""
+
return self._load_duplicates()
+
+
def add_duplicate(self, duplicate_id: str, canonical_id: str) -> None:
+
"""Add a duplicate mapping."""
+
duplicates = self._load_duplicates()
+
duplicates.add_duplicate(duplicate_id, canonical_id)
+
self._save_duplicates(duplicates)
+
+
def remove_duplicate(self, duplicate_id: str) -> bool:
+
"""Remove a duplicate mapping."""
+
duplicates = self._load_duplicates()
+
result = duplicates.remove_duplicate(duplicate_id)
+
self._save_duplicates(duplicates)
+
return result
+
+
def commit_changes(self, message: str) -> None:
+
"""Commit all changes to the Git repository."""
+
if not self.repo:
+
return
+
+
# Add all changes
+
self.repo.git.add(A=True)
+
+
# Check if there are changes to commit
+
if self.repo.index.diff("HEAD"):
+
self.repo.index.commit(message)
+
+
def get_stats(self) -> dict:
+
"""Get statistics about the Git store."""
+
index = self._load_index()
+
duplicates = self._load_duplicates()
+
+
return {
+
"total_users": len(index.users),
+
"total_entries": index.total_entries,
+
"total_duplicates": len(duplicates.duplicates),
+
"last_updated": index.last_updated,
+
"repository_size": sum(f.stat().st_size for f in self.repo_path.rglob("*") if f.is_file()),
+
}
+
+
def search_entries(self, query: str, username: Optional[str] = None,
+
limit: Optional[int] = None) -> list[tuple[str, AtomEntry]]:
+
"""Search entries by content."""
+
results = []
+
+
# Get users to search
+
index = self._load_index()
+
users = [index.get_user(username)] if username else list(index.users.values())
+
users = [u for u in users if u is not None]
+
+
for user in users:
+
user_dir = self.repo_path / user.directory
+
if not user_dir.exists():
+
continue
+
+
entry_files = user_dir.glob("*.json")
+
entry_files = [f for f in entry_files if f.name != "metadata.json"]
+
+
for entry_file in entry_files:
+
try:
+
with open(entry_file) as f:
+
data = json.load(f)
+
+
entry = AtomEntry(**data)
+
+
# Simple text search in title, summary, and content
+
searchable_text = " ".join(filter(None, [
+
entry.title,
+
entry.summary or "",
+
entry.content or "",
+
])).lower()
+
+
if query.lower() in searchable_text:
+
results.append((user.username, entry))
+
+
if limit and len(results) >= limit:
+
return results
+
+
except Exception:
+
# Skip invalid entries
+
continue
+
+
# Sort by updated time (newest first)
+
results.sort(key=lambda x: x[1].updated, reverse=True)
+
+
return results[:limit] if limit else results
+15
src/thicket/models/__init__.py
···
+
"""Data models for thicket."""
+
+
from .config import ThicketConfig, UserConfig
+
from .feed import AtomEntry, DuplicateMap, FeedMetadata
+
from .user import GitStoreIndex, UserMetadata
+
+
__all__ = [
+
"ThicketConfig",
+
"UserConfig",
+
"AtomEntry",
+
"DuplicateMap",
+
"FeedMetadata",
+
"GitStoreIndex",
+
"UserMetadata",
+
]
+71
src/thicket/models/config.py
···
+
"""Configuration models for thicket."""
+
+
from pathlib import Path
+
from typing import Optional
+
+
from pydantic import BaseModel, EmailStr, HttpUrl
+
from pydantic_settings import BaseSettings, SettingsConfigDict
+
+
+
class UserConfig(BaseModel):
+
"""Configuration for a single user and their feeds."""
+
+
username: str
+
feeds: list[HttpUrl]
+
email: Optional[EmailStr] = None
+
homepage: Optional[HttpUrl] = None
+
icon: Optional[HttpUrl] = None
+
display_name: Optional[str] = None
+
+
+
class ThicketConfig(BaseSettings):
+
"""Main configuration for thicket."""
+
+
model_config = SettingsConfigDict(
+
env_prefix="THICKET_",
+
env_file=".env",
+
yaml_file="thicket.yaml",
+
case_sensitive=False,
+
)
+
+
git_store: Path
+
cache_dir: Path
+
users: list[UserConfig] = []
+
+
def find_user(self, username: str) -> Optional[UserConfig]:
+
"""Find a user by username."""
+
for user in self.users:
+
if user.username == username:
+
return user
+
return None
+
+
def add_user(self, user: UserConfig) -> None:
+
"""Add a new user or update existing user."""
+
existing = self.find_user(user.username)
+
if existing:
+
# Update existing user
+
existing.feeds = list(set(existing.feeds + user.feeds))
+
existing.email = user.email or existing.email
+
existing.homepage = user.homepage or existing.homepage
+
existing.icon = user.icon or existing.icon
+
existing.display_name = user.display_name or existing.display_name
+
else:
+
# Add new user
+
self.users.append(user)
+
+
def remove_user(self, username: str) -> bool:
+
"""Remove a user by username. Returns True if user was found and removed."""
+
for i, user in enumerate(self.users):
+
if user.username == username:
+
del self.users[i]
+
return True
+
return False
+
+
def add_feed_to_user(self, username: str, feed_url: HttpUrl) -> bool:
+
"""Add a feed to an existing user. Returns True if user was found."""
+
user = self.find_user(username)
+
if user:
+
if feed_url not in user.feeds:
+
user.feeds.append(feed_url)
+
return True
+
return False
+86
src/thicket/models/feed.py
···
+
"""Feed and entry models for thicket."""
+
+
from datetime import datetime
+
from typing import Optional
+
+
from pydantic import BaseModel, ConfigDict, EmailStr, HttpUrl
+
+
+
class AtomEntry(BaseModel):
+
"""Represents an Atom feed entry stored in the Git repository."""
+
+
model_config = ConfigDict(
+
json_encoders={datetime: lambda v: v.isoformat()},
+
str_strip_whitespace=True,
+
)
+
+
id: str # Original Atom ID
+
title: str
+
link: HttpUrl
+
updated: datetime
+
published: Optional[datetime] = None
+
summary: Optional[str] = None
+
content: Optional[str] = None # Full body content from Atom entry
+
content_type: Optional[str] = "html" # text, html, xhtml
+
author: Optional[dict] = None
+
categories: list[str] = []
+
rights: Optional[str] = None # Copyright info
+
source: Optional[str] = None # Source feed URL
+
+
+
class FeedMetadata(BaseModel):
+
"""Metadata extracted from a feed for auto-discovery."""
+
+
title: Optional[str] = None
+
author_name: Optional[str] = None
+
author_email: Optional[EmailStr] = None
+
author_uri: Optional[HttpUrl] = None
+
link: Optional[HttpUrl] = None
+
logo: Optional[HttpUrl] = None
+
icon: Optional[HttpUrl] = None
+
image_url: Optional[HttpUrl] = None
+
description: Optional[str] = None
+
+
def to_user_config(self, username: str, feed_url: HttpUrl) -> "UserConfig":
+
"""Convert discovered metadata to UserConfig with fallbacks."""
+
from .config import UserConfig
+
+
return UserConfig(
+
username=username,
+
feeds=[feed_url],
+
display_name=self.author_name or self.title,
+
email=self.author_email,
+
homepage=self.author_uri or self.link,
+
icon=self.logo or self.icon or self.image_url,
+
)
+
+
+
class DuplicateMap(BaseModel):
+
"""Maps duplicate entry IDs to canonical entry IDs."""
+
+
duplicates: dict[str, str] = {} # duplicate_id -> canonical_id
+
comment: str = "Entry IDs that map to the same canonical content"
+
+
def add_duplicate(self, duplicate_id: str, canonical_id: str) -> None:
+
"""Add a duplicate mapping."""
+
self.duplicates[duplicate_id] = canonical_id
+
+
def remove_duplicate(self, duplicate_id: str) -> bool:
+
"""Remove a duplicate mapping. Returns True if existed."""
+
return self.duplicates.pop(duplicate_id, None) is not None
+
+
def get_canonical(self, entry_id: str) -> str:
+
"""Get canonical ID for an entry (returns original if not duplicate)."""
+
return self.duplicates.get(entry_id, entry_id)
+
+
def is_duplicate(self, entry_id: str) -> bool:
+
"""Check if entry ID is marked as duplicate."""
+
return entry_id in self.duplicates
+
+
def get_duplicates_for_canonical(self, canonical_id: str) -> list[str]:
+
"""Get all duplicate IDs that map to a canonical ID."""
+
return [
+
duplicate_id
+
for duplicate_id, canonical in self.duplicates.items()
+
if canonical == canonical_id
+
]
+79
src/thicket/models/user.py
···
+
"""User metadata models for thicket."""
+
+
from datetime import datetime
+
from typing import Optional
+
+
from pydantic import BaseModel, ConfigDict
+
+
+
class UserMetadata(BaseModel):
+
"""Metadata about a user stored in the Git repository."""
+
+
model_config = ConfigDict(
+
json_encoders={datetime: lambda v: v.isoformat()},
+
str_strip_whitespace=True,
+
)
+
+
username: str
+
display_name: Optional[str] = None
+
email: Optional[str] = None
+
homepage: Optional[str] = None
+
icon: Optional[str] = None
+
feeds: list[str] = []
+
directory: str # Directory name in Git store
+
created: datetime
+
last_updated: datetime
+
entry_count: int = 0
+
+
def update_timestamp(self) -> None:
+
"""Update the last_updated timestamp to now."""
+
self.last_updated = datetime.now()
+
+
def increment_entry_count(self, count: int = 1) -> None:
+
"""Increment the entry count by the given amount."""
+
self.entry_count += count
+
self.update_timestamp()
+
+
+
class GitStoreIndex(BaseModel):
+
"""Index of all users and their directories in the Git store."""
+
+
model_config = ConfigDict(
+
json_encoders={datetime: lambda v: v.isoformat()}
+
)
+
+
users: dict[str, UserMetadata] = {} # username -> UserMetadata
+
created: datetime
+
last_updated: datetime
+
total_entries: int = 0
+
+
def add_user(self, user_metadata: UserMetadata) -> None:
+
"""Add or update a user in the index."""
+
self.users[user_metadata.username] = user_metadata
+
self.last_updated = datetime.now()
+
+
def remove_user(self, username: str) -> bool:
+
"""Remove a user from the index. Returns True if user existed."""
+
if username in self.users:
+
del self.users[username]
+
self.last_updated = datetime.now()
+
return True
+
return False
+
+
def get_user(self, username: str) -> Optional[UserMetadata]:
+
"""Get user metadata by username."""
+
return self.users.get(username)
+
+
def update_entry_count(self, username: str, count: int) -> None:
+
"""Update entry count for a user and total."""
+
user = self.get_user(username)
+
if user:
+
old_count = user.entry_count
+
user.increment_entry_count(count)
+
self.total_entries += count
+
self.last_updated = datetime.now()
+
+
def recalculate_totals(self) -> None:
+
"""Recalculate total entries from all users."""
+
self.total_entries = sum(user.entry_count for user in self.users.values())
+
self.last_updated = datetime.now()
+4
src/thicket/utils/__init__.py
···
+
"""Utility modules for thicket."""
+
+
# This module will contain shared utilities
+
# For now, it's empty but can be expanded with common functions
tests/__init__.py

This is a binary file and will not be displayed.

+84
tests/conftest.py
···
+
"""Test configuration and fixtures for thicket."""
+
+
import tempfile
+
from pathlib import Path
+
+
import pytest
+
+
from thicket.models import ThicketConfig, UserConfig
+
+
+
@pytest.fixture
+
def temp_dir():
+
"""Create a temporary directory for tests."""
+
with tempfile.TemporaryDirectory() as tmp_dir:
+
yield Path(tmp_dir)
+
+
+
@pytest.fixture
+
def sample_config(temp_dir):
+
"""Create a sample configuration for testing."""
+
git_store = temp_dir / "git_store"
+
cache_dir = temp_dir / "cache"
+
+
return ThicketConfig(
+
git_store=git_store,
+
cache_dir=cache_dir,
+
users=[
+
UserConfig(
+
username="testuser",
+
feeds=["https://example.com/feed.xml"],
+
email="test@example.com",
+
display_name="Test User",
+
)
+
],
+
)
+
+
+
@pytest.fixture
+
def sample_atom_feed():
+
"""Sample Atom feed XML for testing."""
+
return """<?xml version="1.0" encoding="utf-8"?>
+
<feed xmlns="http://www.w3.org/2005/Atom">
+
<title>Test Feed</title>
+
<link href="https://example.com/"/>
+
<updated>2025-01-01T00:00:00Z</updated>
+
<author>
+
<name>Test Author</name>
+
<email>author@example.com</email>
+
</author>
+
<id>https://example.com/</id>
+
+
<entry>
+
<title>Test Entry</title>
+
<link href="https://example.com/entry/1"/>
+
<id>https://example.com/entry/1</id>
+
<updated>2025-01-01T00:00:00Z</updated>
+
<summary>This is a test entry.</summary>
+
<content type="html">
+
<![CDATA[<p>This is the content of the test entry.</p>]]>
+
</content>
+
</entry>
+
</feed>"""
+
+
+
@pytest.fixture
+
def sample_rss_feed():
+
"""Sample RSS feed XML for testing."""
+
return """<?xml version="1.0" encoding="UTF-8"?>
+
<rss version="2.0">
+
<channel>
+
<title>Test RSS Feed</title>
+
<link>https://example.com/</link>
+
<description>Test RSS feed for testing</description>
+
<managingEditor>editor@example.com</managingEditor>
+
+
<item>
+
<title>Test RSS Entry</title>
+
<link>https://example.com/rss/entry/1</link>
+
<description>This is a test RSS entry.</description>
+
<pubDate>Mon, 01 Jan 2025 00:00:00 GMT</pubDate>
+
<guid>https://example.com/rss/entry/1</guid>
+
</item>
+
</channel>
+
</rss>"""
+132
tests/test_feed_parser.py
···
+
"""Tests for feed parser functionality."""
+
+
import pytest
+
from pydantic import HttpUrl
+
+
from thicket.core.feed_parser import FeedParser
+
from thicket.models import AtomEntry, FeedMetadata
+
+
+
class TestFeedParser:
+
"""Test the FeedParser class."""
+
+
def test_init(self):
+
"""Test parser initialization."""
+
parser = FeedParser()
+
assert parser.user_agent == "thicket/0.1.0"
+
assert "a" in parser.allowed_tags
+
assert "href" in parser.allowed_attributes["a"]
+
+
def test_parse_atom_feed(self, sample_atom_feed):
+
"""Test parsing an Atom feed."""
+
parser = FeedParser()
+
metadata, entries = parser.parse_feed(sample_atom_feed)
+
+
# Check metadata
+
assert isinstance(metadata, FeedMetadata)
+
assert metadata.title == "Test Feed"
+
assert metadata.author_name == "Test Author"
+
assert metadata.author_email == "author@example.com"
+
assert metadata.link == HttpUrl("https://example.com/")
+
+
# Check entries
+
assert len(entries) == 1
+
entry = entries[0]
+
assert isinstance(entry, AtomEntry)
+
assert entry.title == "Test Entry"
+
assert entry.id == "https://example.com/entry/1"
+
assert entry.link == HttpUrl("https://example.com/entry/1")
+
assert entry.summary == "This is a test entry."
+
assert "<p>This is the content of the test entry.</p>" in entry.content
+
+
def test_parse_rss_feed(self, sample_rss_feed):
+
"""Test parsing an RSS feed."""
+
parser = FeedParser()
+
metadata, entries = parser.parse_feed(sample_rss_feed)
+
+
# Check metadata
+
assert isinstance(metadata, FeedMetadata)
+
assert metadata.title == "Test RSS Feed"
+
assert metadata.link == HttpUrl("https://example.com/")
+
assert metadata.author_email == "editor@example.com"
+
+
# Check entries
+
assert len(entries) == 1
+
entry = entries[0]
+
assert isinstance(entry, AtomEntry)
+
assert entry.title == "Test RSS Entry"
+
assert entry.id == "https://example.com/rss/entry/1"
+
assert entry.summary == "This is a test RSS entry."
+
+
def test_sanitize_entry_id(self):
+
"""Test entry ID sanitization."""
+
parser = FeedParser()
+
+
# Test URL ID
+
url_id = "https://example.com/posts/2025/01/test-post"
+
sanitized = parser.sanitize_entry_id(url_id)
+
assert sanitized == "posts_2025_01_test-post"
+
+
# Test problematic characters
+
bad_id = "test/with\\bad:chars|and<more>"
+
sanitized = parser.sanitize_entry_id(bad_id)
+
assert sanitized == "test_with_bad_chars_and_more_"
+
+
# Test empty ID
+
empty_id = ""
+
sanitized = parser.sanitize_entry_id(empty_id)
+
assert sanitized == "entry"
+
+
# Test very long ID
+
long_id = "a" * 300
+
sanitized = parser.sanitize_entry_id(long_id)
+
assert len(sanitized) == 200
+
+
def test_sanitize_html(self):
+
"""Test HTML sanitization."""
+
parser = FeedParser()
+
+
# Test allowed tags
+
safe_html = "<p>This is <strong>safe</strong> HTML</p>"
+
sanitized = parser._sanitize_html(safe_html)
+
assert sanitized == safe_html
+
+
# Test dangerous tags
+
dangerous_html = "<script>alert('xss')</script><p>Safe content</p>"
+
sanitized = parser._sanitize_html(dangerous_html)
+
assert "<script>" not in sanitized
+
assert "<p>Safe content</p>" in sanitized
+
+
# Test attributes
+
html_with_attrs = '<a href="https://example.com" onclick="alert()">Link</a>'
+
sanitized = parser._sanitize_html(html_with_attrs)
+
assert 'href="https://example.com"' in sanitized
+
assert 'onclick' not in sanitized
+
+
def test_extract_feed_metadata(self):
+
"""Test feed metadata extraction."""
+
parser = FeedParser()
+
+
# Test with feedparser parsed data
+
import feedparser
+
parsed = feedparser.parse("""<?xml version="1.0" encoding="utf-8"?>
+
<feed xmlns="http://www.w3.org/2005/Atom">
+
<title>Test Feed</title>
+
<link href="https://example.com/"/>
+
<author>
+
<name>Test Author</name>
+
<email>author@example.com</email>
+
<uri>https://example.com/about</uri>
+
</author>
+
<logo>https://example.com/logo.png</logo>
+
<icon>https://example.com/icon.png</icon>
+
</feed>""")
+
+
metadata = parser._extract_feed_metadata(parsed.feed)
+
assert metadata.title == "Test Feed"
+
assert metadata.author_name == "Test Author"
+
assert metadata.author_email == "author@example.com"
+
assert metadata.author_uri == HttpUrl("https://example.com/about")
+
assert metadata.link == HttpUrl("https://example.com/")
+
assert metadata.logo == HttpUrl("https://example.com/logo.png")
+
assert metadata.icon == HttpUrl("https://example.com/icon.png")
+277
tests/test_git_store.py
···
+
"""Tests for Git store functionality."""
+
+
import json
+
from datetime import datetime
+
+
import pytest
+
from pydantic import HttpUrl
+
+
from thicket.core.git_store import GitStore
+
from thicket.models import AtomEntry, DuplicateMap, UserMetadata
+
+
+
class TestGitStore:
+
"""Test the GitStore class."""
+
+
def test_init_new_repo(self, temp_dir):
+
"""Test initializing a new Git repository."""
+
repo_path = temp_dir / "test_repo"
+
store = GitStore(repo_path)
+
+
assert store.repo_path == repo_path
+
assert store.repo is not None
+
assert repo_path.exists()
+
assert (repo_path / ".git").exists()
+
assert (repo_path / "index.json").exists()
+
assert (repo_path / "duplicates.json").exists()
+
+
def test_init_existing_repo(self, temp_dir):
+
"""Test initializing with existing repository."""
+
repo_path = temp_dir / "test_repo"
+
+
# Create first store
+
store1 = GitStore(repo_path)
+
store1.add_user("testuser", display_name="Test User")
+
+
# Create second store pointing to same repo
+
store2 = GitStore(repo_path)
+
user = store2.get_user("testuser")
+
+
assert user is not None
+
assert user.username == "testuser"
+
assert user.display_name == "Test User"
+
+
def test_add_user(self, temp_dir):
+
"""Test adding a user to the Git store."""
+
store = GitStore(temp_dir / "test_repo")
+
+
user = store.add_user(
+
username="testuser",
+
display_name="Test User",
+
email="test@example.com",
+
homepage="https://example.com",
+
icon="https://example.com/icon.png",
+
feeds=["https://example.com/feed.xml"],
+
)
+
+
assert isinstance(user, UserMetadata)
+
assert user.username == "testuser"
+
assert user.display_name == "Test User"
+
assert user.email == "test@example.com"
+
assert user.homepage == "https://example.com"
+
assert user.icon == "https://example.com/icon.png"
+
assert user.feeds == ["https://example.com/feed.xml"]
+
assert user.directory == "testuser"
+
+
# Check that user directory was created
+
user_dir = store.repo_path / "testuser"
+
assert user_dir.exists()
+
assert (user_dir / "metadata.json").exists()
+
+
# Check metadata file content
+
with open(user_dir / "metadata.json") as f:
+
metadata = json.load(f)
+
assert metadata["username"] == "testuser"
+
assert metadata["display_name"] == "Test User"
+
+
def test_get_user(self, temp_dir):
+
"""Test getting user metadata."""
+
store = GitStore(temp_dir / "test_repo")
+
+
# Add user
+
store.add_user("testuser", display_name="Test User")
+
+
# Get user
+
user = store.get_user("testuser")
+
assert user is not None
+
assert user.username == "testuser"
+
assert user.display_name == "Test User"
+
+
# Try to get non-existent user
+
non_user = store.get_user("nonexistent")
+
assert non_user is None
+
+
def test_store_entry(self, temp_dir):
+
"""Test storing an entry."""
+
store = GitStore(temp_dir / "test_repo")
+
+
# Add user first
+
store.add_user("testuser")
+
+
# Create test entry
+
entry = AtomEntry(
+
id="https://example.com/entry/1",
+
title="Test Entry",
+
link=HttpUrl("https://example.com/entry/1"),
+
updated=datetime.now(),
+
summary="Test entry summary",
+
content="<p>Test content</p>",
+
)
+
+
# Store entry
+
result = store.store_entry("testuser", entry)
+
assert result is True
+
+
# Check that entry file was created
+
user_dir = store.repo_path / "testuser"
+
entry_files = list(user_dir.glob("*.json"))
+
entry_files = [f for f in entry_files if f.name != "metadata.json"]
+
assert len(entry_files) == 1
+
+
# Check entry content
+
with open(entry_files[0]) as f:
+
stored_entry = json.load(f)
+
assert stored_entry["title"] == "Test Entry"
+
assert stored_entry["id"] == "https://example.com/entry/1"
+
+
def test_get_entry(self, temp_dir):
+
"""Test retrieving an entry."""
+
store = GitStore(temp_dir / "test_repo")
+
+
# Add user and entry
+
store.add_user("testuser")
+
entry = AtomEntry(
+
id="https://example.com/entry/1",
+
title="Test Entry",
+
link=HttpUrl("https://example.com/entry/1"),
+
updated=datetime.now(),
+
)
+
store.store_entry("testuser", entry)
+
+
# Get entry
+
retrieved = store.get_entry("testuser", "https://example.com/entry/1")
+
assert retrieved is not None
+
assert retrieved.title == "Test Entry"
+
assert retrieved.id == "https://example.com/entry/1"
+
+
# Try to get non-existent entry
+
non_entry = store.get_entry("testuser", "https://example.com/nonexistent")
+
assert non_entry is None
+
+
def test_list_entries(self, temp_dir):
+
"""Test listing entries for a user."""
+
store = GitStore(temp_dir / "test_repo")
+
+
# Add user
+
store.add_user("testuser")
+
+
# Add multiple entries
+
for i in range(3):
+
entry = AtomEntry(
+
id=f"https://example.com/entry/{i}",
+
title=f"Test Entry {i}",
+
link=HttpUrl(f"https://example.com/entry/{i}"),
+
updated=datetime.now(),
+
)
+
store.store_entry("testuser", entry)
+
+
# List all entries
+
entries = store.list_entries("testuser")
+
assert len(entries) == 3
+
+
# List with limit
+
limited = store.list_entries("testuser", limit=2)
+
assert len(limited) == 2
+
+
# List for non-existent user
+
none_entries = store.list_entries("nonexistent")
+
assert len(none_entries) == 0
+
+
def test_duplicates(self, temp_dir):
+
"""Test duplicate management."""
+
store = GitStore(temp_dir / "test_repo")
+
+
# Get initial duplicates (should be empty)
+
duplicates = store.get_duplicates()
+
assert isinstance(duplicates, DuplicateMap)
+
assert len(duplicates.duplicates) == 0
+
+
# Add duplicate
+
store.add_duplicate("https://example.com/dup", "https://example.com/canonical")
+
+
# Check duplicate was added
+
duplicates = store.get_duplicates()
+
assert len(duplicates.duplicates) == 1
+
assert duplicates.is_duplicate("https://example.com/dup")
+
assert duplicates.get_canonical("https://example.com/dup") == "https://example.com/canonical"
+
+
# Remove duplicate
+
result = store.remove_duplicate("https://example.com/dup")
+
assert result is True
+
+
# Check duplicate was removed
+
duplicates = store.get_duplicates()
+
assert len(duplicates.duplicates) == 0
+
assert not duplicates.is_duplicate("https://example.com/dup")
+
+
def test_search_entries(self, temp_dir):
+
"""Test searching entries."""
+
store = GitStore(temp_dir / "test_repo")
+
+
# Add user
+
store.add_user("testuser")
+
+
# Add entries with different content
+
entries_data = [
+
("Test Python Programming", "Learning Python basics"),
+
("JavaScript Tutorial", "Advanced JavaScript concepts"),
+
("Python Web Development", "Building web apps with Python"),
+
]
+
+
for title, summary in entries_data:
+
entry = AtomEntry(
+
id=f"https://example.com/entry/{title.lower().replace(' ', '-')}",
+
title=title,
+
link=HttpUrl(f"https://example.com/entry/{title.lower().replace(' ', '-')}"),
+
updated=datetime.now(),
+
summary=summary,
+
)
+
store.store_entry("testuser", entry)
+
+
# Search for Python entries
+
results = store.search_entries("Python")
+
assert len(results) == 2
+
+
# Search for specific user
+
results = store.search_entries("Python", username="testuser")
+
assert len(results) == 2
+
+
# Search with limit
+
results = store.search_entries("Python", limit=1)
+
assert len(results) == 1
+
+
# Search for non-existent term
+
results = store.search_entries("NonExistent")
+
assert len(results) == 0
+
+
def test_get_stats(self, temp_dir):
+
"""Test getting repository statistics."""
+
store = GitStore(temp_dir / "test_repo")
+
+
# Get initial stats
+
stats = store.get_stats()
+
assert stats["total_users"] == 0
+
assert stats["total_entries"] == 0
+
assert stats["total_duplicates"] == 0
+
+
# Add user and entries
+
store.add_user("testuser")
+
for i in range(3):
+
entry = AtomEntry(
+
id=f"https://example.com/entry/{i}",
+
title=f"Test Entry {i}",
+
link=HttpUrl(f"https://example.com/entry/{i}"),
+
updated=datetime.now(),
+
)
+
store.store_entry("testuser", entry)
+
+
# Add duplicate
+
store.add_duplicate("https://example.com/dup", "https://example.com/canonical")
+
+
# Get updated stats
+
stats = store.get_stats()
+
assert stats["total_users"] == 1
+
assert stats["total_entries"] == 3
+
assert stats["total_duplicates"] == 1
+
assert "last_updated" in stats
+
assert "repository_size" in stats
+353
tests/test_models.py
···
+
"""Tests for pydantic models."""
+
+
from datetime import datetime
+
from pathlib import Path
+
+
import pytest
+
from pydantic import HttpUrl, ValidationError
+
+
from thicket.models import (
+
AtomEntry,
+
DuplicateMap,
+
FeedMetadata,
+
ThicketConfig,
+
UserConfig,
+
UserMetadata,
+
)
+
+
+
class TestUserConfig:
+
"""Test UserConfig model."""
+
+
def test_valid_user_config(self):
+
"""Test creating valid user config."""
+
config = UserConfig(
+
username="testuser",
+
feeds=["https://example.com/feed.xml"],
+
email="test@example.com",
+
homepage="https://example.com",
+
display_name="Test User",
+
)
+
+
assert config.username == "testuser"
+
assert len(config.feeds) == 1
+
assert config.feeds[0] == HttpUrl("https://example.com/feed.xml")
+
assert config.email == "test@example.com"
+
assert config.display_name == "Test User"
+
+
def test_invalid_email(self):
+
"""Test validation of invalid email."""
+
with pytest.raises(ValidationError):
+
UserConfig(
+
username="testuser",
+
feeds=["https://example.com/feed.xml"],
+
email="invalid-email",
+
)
+
+
def test_invalid_feed_url(self):
+
"""Test validation of invalid feed URL."""
+
with pytest.raises(ValidationError):
+
UserConfig(
+
username="testuser",
+
feeds=["not-a-url"],
+
)
+
+
def test_optional_fields(self):
+
"""Test optional fields with None values."""
+
config = UserConfig(
+
username="testuser",
+
feeds=["https://example.com/feed.xml"],
+
)
+
+
assert config.email is None
+
assert config.homepage is None
+
assert config.icon is None
+
assert config.display_name is None
+
+
+
class TestThicketConfig:
+
"""Test ThicketConfig model."""
+
+
def test_valid_config(self, temp_dir):
+
"""Test creating valid configuration."""
+
config = ThicketConfig(
+
git_store=temp_dir / "git_store",
+
cache_dir=temp_dir / "cache",
+
users=[
+
UserConfig(
+
username="testuser",
+
feeds=["https://example.com/feed.xml"],
+
)
+
],
+
)
+
+
assert config.git_store == temp_dir / "git_store"
+
assert config.cache_dir == temp_dir / "cache"
+
assert len(config.users) == 1
+
assert config.users[0].username == "testuser"
+
+
def test_find_user(self, temp_dir):
+
"""Test finding user by username."""
+
config = ThicketConfig(
+
git_store=temp_dir / "git_store",
+
cache_dir=temp_dir / "cache",
+
users=[
+
UserConfig(username="user1", feeds=["https://example.com/feed1.xml"]),
+
UserConfig(username="user2", feeds=["https://example.com/feed2.xml"]),
+
],
+
)
+
+
user = config.find_user("user1")
+
assert user is not None
+
assert user.username == "user1"
+
+
non_user = config.find_user("nonexistent")
+
assert non_user is None
+
+
def test_add_user(self, temp_dir):
+
"""Test adding a new user."""
+
config = ThicketConfig(
+
git_store=temp_dir / "git_store",
+
cache_dir=temp_dir / "cache",
+
users=[],
+
)
+
+
new_user = UserConfig(
+
username="newuser",
+
feeds=["https://example.com/feed.xml"],
+
)
+
+
config.add_user(new_user)
+
assert len(config.users) == 1
+
assert config.users[0].username == "newuser"
+
+
def test_add_feed_to_user(self, temp_dir):
+
"""Test adding feed to existing user."""
+
config = ThicketConfig(
+
git_store=temp_dir / "git_store",
+
cache_dir=temp_dir / "cache",
+
users=[
+
UserConfig(username="testuser", feeds=["https://example.com/feed1.xml"]),
+
],
+
)
+
+
result = config.add_feed_to_user("testuser", HttpUrl("https://example.com/feed2.xml"))
+
assert result is True
+
+
user = config.find_user("testuser")
+
assert len(user.feeds) == 2
+
assert HttpUrl("https://example.com/feed2.xml") in user.feeds
+
+
# Test adding to non-existent user
+
result = config.add_feed_to_user("nonexistent", HttpUrl("https://example.com/feed.xml"))
+
assert result is False
+
+
+
class TestAtomEntry:
+
"""Test AtomEntry model."""
+
+
def test_valid_entry(self):
+
"""Test creating valid Atom entry."""
+
entry = AtomEntry(
+
id="https://example.com/entry/1",
+
title="Test Entry",
+
link=HttpUrl("https://example.com/entry/1"),
+
updated=datetime.now(),
+
published=datetime.now(),
+
summary="Test summary",
+
content="<p>Test content</p>",
+
content_type="html",
+
author={"name": "Test Author"},
+
categories=["test", "example"],
+
)
+
+
assert entry.id == "https://example.com/entry/1"
+
assert entry.title == "Test Entry"
+
assert entry.summary == "Test summary"
+
assert entry.content == "<p>Test content</p>"
+
assert entry.content_type == "html"
+
assert entry.author["name"] == "Test Author"
+
assert "test" in entry.categories
+
+
def test_minimal_entry(self):
+
"""Test creating minimal Atom entry."""
+
entry = AtomEntry(
+
id="https://example.com/entry/1",
+
title="Test Entry",
+
link=HttpUrl("https://example.com/entry/1"),
+
updated=datetime.now(),
+
)
+
+
assert entry.id == "https://example.com/entry/1"
+
assert entry.title == "Test Entry"
+
assert entry.published is None
+
assert entry.summary is None
+
assert entry.content is None
+
assert entry.content_type == "html" # default
+
assert entry.author is None
+
assert entry.categories == []
+
+
+
class TestDuplicateMap:
+
"""Test DuplicateMap model."""
+
+
def test_empty_duplicates(self):
+
"""Test empty duplicate map."""
+
dup_map = DuplicateMap()
+
assert len(dup_map.duplicates) == 0
+
assert not dup_map.is_duplicate("test")
+
assert dup_map.get_canonical("test") == "test"
+
+
def test_add_duplicate(self):
+
"""Test adding duplicate mapping."""
+
dup_map = DuplicateMap()
+
dup_map.add_duplicate("dup1", "canonical1")
+
+
assert len(dup_map.duplicates) == 1
+
assert dup_map.is_duplicate("dup1")
+
assert dup_map.get_canonical("dup1") == "canonical1"
+
assert dup_map.get_canonical("canonical1") == "canonical1"
+
+
def test_remove_duplicate(self):
+
"""Test removing duplicate mapping."""
+
dup_map = DuplicateMap()
+
dup_map.add_duplicate("dup1", "canonical1")
+
+
result = dup_map.remove_duplicate("dup1")
+
assert result is True
+
assert len(dup_map.duplicates) == 0
+
assert not dup_map.is_duplicate("dup1")
+
+
# Test removing non-existent duplicate
+
result = dup_map.remove_duplicate("nonexistent")
+
assert result is False
+
+
def test_get_duplicates_for_canonical(self):
+
"""Test getting all duplicates for a canonical ID."""
+
dup_map = DuplicateMap()
+
dup_map.add_duplicate("dup1", "canonical1")
+
dup_map.add_duplicate("dup2", "canonical1")
+
dup_map.add_duplicate("dup3", "canonical2")
+
+
dups = dup_map.get_duplicates_for_canonical("canonical1")
+
assert len(dups) == 2
+
assert "dup1" in dups
+
assert "dup2" in dups
+
+
dups = dup_map.get_duplicates_for_canonical("canonical2")
+
assert len(dups) == 1
+
assert "dup3" in dups
+
+
dups = dup_map.get_duplicates_for_canonical("nonexistent")
+
assert len(dups) == 0
+
+
+
class TestFeedMetadata:
+
"""Test FeedMetadata model."""
+
+
def test_valid_metadata(self):
+
"""Test creating valid feed metadata."""
+
metadata = FeedMetadata(
+
title="Test Feed",
+
author_name="Test Author",
+
author_email="author@example.com",
+
author_uri=HttpUrl("https://example.com/author"),
+
link=HttpUrl("https://example.com"),
+
description="Test description",
+
)
+
+
assert metadata.title == "Test Feed"
+
assert metadata.author_name == "Test Author"
+
assert metadata.author_email == "author@example.com"
+
assert metadata.link == HttpUrl("https://example.com")
+
+
def test_to_user_config(self):
+
"""Test converting metadata to user config."""
+
metadata = FeedMetadata(
+
title="Test Feed",
+
author_name="Test Author",
+
author_email="author@example.com",
+
author_uri=HttpUrl("https://example.com/author"),
+
link=HttpUrl("https://example.com"),
+
logo=HttpUrl("https://example.com/logo.png"),
+
)
+
+
feed_url = HttpUrl("https://example.com/feed.xml")
+
user_config = metadata.to_user_config("testuser", feed_url)
+
+
assert user_config.username == "testuser"
+
assert user_config.feeds == [feed_url]
+
assert user_config.display_name == "Test Author"
+
assert user_config.email == "author@example.com"
+
assert user_config.homepage == HttpUrl("https://example.com/author")
+
assert user_config.icon == HttpUrl("https://example.com/logo.png")
+
+
def test_to_user_config_fallbacks(self):
+
"""Test fallback logic in to_user_config."""
+
metadata = FeedMetadata(
+
title="Test Feed",
+
link=HttpUrl("https://example.com"),
+
icon=HttpUrl("https://example.com/icon.png"),
+
)
+
+
feed_url = HttpUrl("https://example.com/feed.xml")
+
user_config = metadata.to_user_config("testuser", feed_url)
+
+
assert user_config.display_name == "Test Feed" # Falls back to title
+
assert user_config.homepage == HttpUrl("https://example.com") # Falls back to link
+
assert user_config.icon == HttpUrl("https://example.com/icon.png")
+
assert user_config.email is None
+
+
+
class TestUserMetadata:
+
"""Test UserMetadata model."""
+
+
def test_valid_metadata(self):
+
"""Test creating valid user metadata."""
+
now = datetime.now()
+
metadata = UserMetadata(
+
username="testuser",
+
directory="testuser",
+
created=now,
+
last_updated=now,
+
feeds=["https://example.com/feed.xml"],
+
entry_count=5,
+
)
+
+
assert metadata.username == "testuser"
+
assert metadata.directory == "testuser"
+
assert metadata.entry_count == 5
+
assert len(metadata.feeds) == 1
+
+
def test_update_timestamp(self):
+
"""Test updating timestamp."""
+
now = datetime.now()
+
metadata = UserMetadata(
+
username="testuser",
+
directory="testuser",
+
created=now,
+
last_updated=now,
+
)
+
+
original_time = metadata.last_updated
+
metadata.update_timestamp()
+
+
assert metadata.last_updated > original_time
+
+
def test_increment_entry_count(self):
+
"""Test incrementing entry count."""
+
metadata = UserMetadata(
+
username="testuser",
+
directory="testuser",
+
created=datetime.now(),
+
last_updated=datetime.now(),
+
entry_count=5,
+
)
+
+
original_count = metadata.entry_count
+
original_time = metadata.last_updated
+
+
metadata.increment_entry_count(3)
+
+
assert metadata.entry_count == original_count + 3
+
assert metadata.last_updated > original_time