Manage Atom feeds in a persistent git repository

Fix code quality issues and improve consistency

- Fix all ruff linting errors (28 issues resolved)
- Update type annotations to modern Python syntax (Dict -> dict, List -> list)
- Fix exception handling to use proper chaining (raise ... from err)
- Remove whitespace issues and improve formatting
- Add noqa comments for intentional import patterns
- Format all code with ruff formatter for consistency
- Ensure all Python files compile successfully

All code now passes comprehensive quality checks:
- ✅ Zero ruff linting errors
- ✅ Consistent code formatting
- ✅ Modern type annotations
- ✅ Proper exception handling
- ✅ All files compile successfully

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

+22 -23
src/thicket/bots/test_bot.py
···
import json
from pathlib import Path
-
from typing import Any, Dict, Optional
+
from typing import Any, Optional
from ..models import AtomEntry
from .thicket_bot import ThicketBotHandler
···
def __init__(self) -> None:
"""Initialize mock bot handler."""
-
self.storage_data: Dict[str, str] = {}
-
self.sent_messages: list[Dict[str, Any]] = []
+
self.storage_data: dict[str, str] = {}
+
self.sent_messages: list[dict[str, Any]] = []
self.config_info = {
"full_name": "Thicket Bot",
-
"email": "thicket-bot@example.com"
+
"email": "thicket-bot@example.com",
}
-
def get_config_info(self) -> Dict[str, str]:
+
def get_config_info(self) -> dict[str, str]:
"""Return bot configuration info."""
return self.config_info
-
def send_reply(self, message: Dict[str, Any], content: str) -> None:
+
def send_reply(self, message: dict[str, Any], content: str) -> None:
"""Mock sending a reply."""
reply = {
"type": "reply",
"to": message.get("sender_id"),
"content": content,
-
"original_message": message
+
"original_message": message,
}
self.sent_messages.append(reply)
-
def send_message(self, message: Dict[str, Any]) -> None:
+
def send_message(self, message: dict[str, Any]) -> None:
"""Mock sending a message."""
self.sent_messages.append(message)
@property
-
def storage(self) -> 'MockStorage':
+
def storage(self) -> "MockStorage":
"""Return mock storage."""
return MockStorage(self.storage_data)
···
class MockStorage:
"""Mock storage for bot state."""
-
def __init__(self, storage_data: Dict[str, str]) -> None:
+
def __init__(self, storage_data: dict[str, str]) -> None:
"""Initialize with storage data."""
self.storage_data = storage_data
-
def __enter__(self) -> 'MockStorage':
+
def __enter__(self) -> "MockStorage":
"""Context manager entry."""
return self
···
content: str,
sender: str = "Test User",
sender_id: int = 12345,
-
message_type: str = "stream"
-
) -> Dict[str, Any]:
+
message_type: str = "stream",
+
) -> dict[str, Any]:
"""Create a test message for bot testing."""
return {
"content": content,
···
"type": message_type,
"timestamp": 1642694400, # 2022-01-20 12:00:00 UTC
"stream_id": 1,
-
"subject": "test topic"
+
"subject": "test topic",
}
def create_test_entry(
entry_id: str = "test-entry-1",
title: str = "Test Article",
-
link: str = "https://example.com/test-article"
+
link: str = "https://example.com/test-article",
) -> AtomEntry:
"""Create a test AtomEntry for testing."""
from datetime import datetime
···
published=datetime(2024, 1, 20, 10, 0, 0),
summary="This is a test article summary",
content="<p>This is test article content</p>",
-
author={"name": "Test Author", "email": "author@example.com"}
+
author={"name": "Test Author", "email": "author@example.com"},
)
···
self.configure_bot(config_path, "test-stream", "test-topic")
def configure_bot(
-
self,
-
config_path: Path,
-
stream: str = "test-stream",
-
topic: str = "test-topic"
+
self, config_path: Path, stream: str = "test-stream", topic: str = "test-topic"
) -> None:
"""Configure the bot for testing."""
# Set bot configuration
···
"topic_name": topic,
"sync_interval": 300,
"max_entries_per_sync": 10,
-
"config_path": str(config_path)
+
"config_path": str(config_path),
}
self.handler.storage_data["bot_config"] = json.dumps(config_data)
···
# Initialize bot
self.bot._load_bot_config(self.handler)
-
def send_command(self, command: str, sender: str = "Test User") -> list[Dict[str, Any]]:
+
def send_command(
+
self, command: str, sender: str = "Test User"
+
) -> list[dict[str, Any]]:
"""Send a command to the bot and return responses."""
message = create_test_message(f"@thicket {command}", sender)
···
return self.handler.sent_messages[-1].get("content")
return None
-
def get_last_message(self) -> Optional[Dict[str, Any]]:
+
def get_last_message(self) -> Optional[dict[str, Any]]:
"""Get the last sent message."""
if self.handler.sent_messages:
return self.handler.sent_messages[-1]
+429 -187
src/thicket/bots/thicket_bot.py
···
except ImportError:
# When run directly by zulip-bots, add the package to path
import sys
+
src_dir = Path(__file__).parent.parent.parent
if str(src_dir) not in sys.path:
sys.path.insert(0, str(src_dir))
···
self._send_help(message, bot_handler)
elif command == "status":
self._send_status(message, bot_handler, sender)
-
elif command == "sync" and len(command_parts) > 1 and command_parts[1] == "now":
+
elif (
+
command == "sync"
+
and len(command_parts) > 1
+
and command_parts[1] == "now"
+
):
self._handle_force_sync(message, bot_handler, sender)
elif command == "reset":
self._handle_reset_command(message, bot_handler, sender)
elif command == "config":
-
self._handle_config_command(message, bot_handler, command_parts[1:], sender)
+
self._handle_config_command(
+
message, bot_handler, command_parts[1:], sender
+
)
elif command == "schedule":
self._handle_schedule_command(message, bot_handler, sender)
elif command == "claim":
-
self._handle_claim_command(message, bot_handler, command_parts[1:], sender)
+
self._handle_claim_command(
+
message, bot_handler, command_parts[1:], sender
+
)
else:
-
bot_handler.send_reply(message, f"Unknown command: {command}. Type `@mention help` for usage.")
+
bot_handler.send_reply(
+
message,
+
f"Unknown command: {command}. Type `@mention help` for usage.",
+
)
except Exception as e:
self.logger.error(f"Error handling command '{command}': {e}")
bot_handler.send_reply(message, f"Error processing command: {str(e)}")
···
try:
# Get bot's actual name from Zulip
bot_info = bot_handler._client.get_profile()
-
if bot_info.get('result') == 'success':
-
bot_name = bot_info.get('full_name', '').lower()
+
if bot_info.get("result") == "success":
+
bot_name = bot_info.get("full_name", "").lower()
if bot_name:
-
return f"@{bot_name}" in content.lower() or f"@**{bot_name}**" in content.lower()
+
return (
+
f"@{bot_name}" in content.lower()
+
or f"@**{bot_name}**" in content.lower()
+
)
except Exception as e:
self.logger.debug(f"Could not get bot profile: {e}")
···
try:
# Get bot's actual name from Zulip
bot_info = bot_handler._client.get_profile()
-
if bot_info.get('result') == 'success':
-
bot_name = bot_info.get('full_name', '')
+
if bot_info.get("result") == "success":
+
bot_name = bot_info.get("full_name", "")
if bot_name:
# Remove @bot_name or @**bot_name**
escaped_name = re.escape(bot_name)
-
content = re.sub(rf'@(?:\*\*)?{escaped_name}(?:\*\*)?', '', content, flags=re.IGNORECASE).strip()
+
content = re.sub(
+
rf"@(?:\*\*)?{escaped_name}(?:\*\*)?",
+
"",
+
content,
+
flags=re.IGNORECASE,
+
).strip()
return content
except Exception as e:
self.logger.debug(f"Could not get bot profile for mention cleaning: {e}")
# Fallback to removing @thicket
-
content = re.sub(r'@(?:\*\*)?thicket(?:\*\*)?', '', content, flags=re.IGNORECASE).strip()
+
content = re.sub(
+
r"@(?:\*\*)?thicket(?:\*\*)?", "", content, flags=re.IGNORECASE
+
).strip()
return content
def _send_help(self, message: dict[str, Any], bot_handler: BotHandler) -> None:
"""Send help message."""
bot_handler.send_reply(message, self.usage())
-
def _send_status(self, message: dict[str, Any], bot_handler: BotHandler, sender: str) -> None:
+
def _send_status(
+
self, message: dict[str, Any], bot_handler: BotHandler, sender: str
+
) -> None:
"""Send bot status information."""
status_lines = [
f"**Thicket Bot Status** (requested by {sender})",
···
# Debug mode status
if self.debug_user:
-
status_lines.extend([
-
"🐛 **Debug Mode:** ENABLED",
-
f"🎯 **Debug User:** {self.debug_user}",
-
"",
-
])
+
status_lines.extend(
+
[
+
"🐛 **Debug Mode:** ENABLED",
+
f"🎯 **Debug User:** {self.debug_user}",
+
"",
+
]
+
)
else:
-
status_lines.extend([
-
f"📍 **Stream:** {self.stream_name or 'Not configured'}",
-
f"📝 **Topic:** {self.topic_name or 'Not configured'}",
-
"",
-
])
+
status_lines.extend(
+
[
+
f"📍 **Stream:** {self.stream_name or 'Not configured'}",
+
f"📝 **Topic:** {self.topic_name or 'Not configured'}",
+
"",
+
]
+
)
-
status_lines.extend([
-
f"⏱️ **Sync Interval:** {self.sync_interval}s ({self.sync_interval // 60}m {self.sync_interval % 60}s)",
-
f"📊 **Max Entries/Sync:** {self.max_entries_per_sync}",
-
f"📁 **Config Path:** {self.config_path or 'Not configured'}",
-
"",
-
f"📄 **Tracked Entries:** {len(self.posted_entries)}",
-
f"🔄 **Catchup Mode:** {'Active (first run)' if len(self.posted_entries) == 0 else 'Inactive'}",
-
f"✅ **Thicket Initialized:** {'Yes' if self.git_store else 'No'}",
-
"",
-
self._get_schedule_info(),
-
])
+
status_lines.extend(
+
[
+
f"⏱️ **Sync Interval:** {self.sync_interval}s ({self.sync_interval // 60}m {self.sync_interval % 60}s)",
+
f"📊 **Max Entries/Sync:** {self.max_entries_per_sync}",
+
f"📁 **Config Path:** {self.config_path or 'Not configured'}",
+
"",
+
f"📄 **Tracked Entries:** {len(self.posted_entries)}",
+
f"🔄 **Catchup Mode:** {'Active (first run)' if len(self.posted_entries) == 0 else 'Inactive'}",
+
f"✅ **Thicket Initialized:** {'Yes' if self.git_store else 'No'}",
+
"",
+
self._get_schedule_info(),
+
]
+
)
bot_handler.send_reply(message, "\n".join(status_lines))
-
def _handle_force_sync(self, message: dict[str, Any], bot_handler: BotHandler, sender: str) -> None:
+
def _handle_force_sync(
+
self, message: dict[str, Any], bot_handler: BotHandler, sender: str
+
) -> None:
"""Handle immediate sync request."""
if not self._check_initialization(message, bot_handler):
return
-
bot_handler.send_reply(message, f"🔄 Starting immediate sync... (requested by {sender})")
+
bot_handler.send_reply(
+
message, f"🔄 Starting immediate sync... (requested by {sender})"
+
)
try:
new_entries = self._perform_sync(bot_handler)
bot_handler.send_reply(
-
message,
-
f"✅ Sync completed! Found {len(new_entries)} new entries."
+
message, f"✅ Sync completed! Found {len(new_entries)} new entries."
)
except Exception as e:
self.logger.error(f"Force sync failed: {e}")
bot_handler.send_reply(message, f"❌ Sync failed: {str(e)}")
-
def _handle_reset_command(self, message: dict[str, Any], bot_handler: BotHandler, sender: str) -> None:
+
def _handle_reset_command(
+
self, message: dict[str, Any], bot_handler: BotHandler, sender: str
+
) -> None:
"""Handle reset command to clear posted entries tracking."""
try:
self.posted_entries.clear()
self._save_posted_entries(bot_handler)
bot_handler.send_reply(
message,
-
f"✅ Posting history reset! Recent entries will be posted on next sync. (requested by {sender})"
+
f"✅ Posting history reset! Recent entries will be posted on next sync. (requested by {sender})",
)
self.logger.info(f"Posted entries tracking reset by {sender}")
except Exception as e:
self.logger.error(f"Reset failed: {e}")
bot_handler.send_reply(message, f"❌ Reset failed: {str(e)}")
-
def _handle_schedule_command(self, message: dict[str, Any], bot_handler: BotHandler, sender: str) -> None:
+
def _handle_schedule_command(
+
self, message: dict[str, Any], bot_handler: BotHandler, sender: str
+
) -> None:
"""Handle schedule query command."""
schedule_info = self._get_schedule_info()
bot_handler.send_reply(
message,
-
f"**Thicket Bot Schedule** (requested by {sender})\n\n{schedule_info}"
+
f"**Thicket Bot Schedule** (requested by {sender})\n\n{schedule_info}",
)
def _handle_claim_command(
···
message: dict[str, Any],
bot_handler: BotHandler,
args: list[str],
-
sender: str
+
sender: str,
) -> None:
"""Handle username claiming command."""
if not args:
···
sender_email = message.get("sender_email")
if not sender_user_id or not sender_email:
-
bot_handler.send_reply(message, "❌ Could not determine your Zulip user information.")
+
bot_handler.send_reply(
+
message, "❌ Could not determine your Zulip user information."
+
)
return
try:
···
server_url = zulip_site_url.replace("https://", "").replace("http://", "")
if not server_url:
-
bot_handler.send_reply(message, "❌ Could not determine Zulip server URL.")
+
bot_handler.send_reply(
+
message, "❌ Could not determine Zulip server URL."
+
)
return
# Check if username exists in thicket
···
if not user:
bot_handler.send_reply(
message,
-
f"❌ Username `{username}` not found in thicket. Available users: {', '.join(self.git_store.list_users())}"
+
f"❌ Username `{username}` not found in thicket. Available users: {', '.join(self.git_store.list_users())}",
)
return
···
existing_zulip_id = user.get_zulip_mention(server_url)
if existing_zulip_id:
# Check if it's claimed by the same user
-
if existing_zulip_id == sender_email or str(existing_zulip_id) == str(sender_user_id):
+
if existing_zulip_id == sender_email or str(existing_zulip_id) == str(
+
sender_user_id
+
):
bot_handler.send_reply(
message,
-
f"✅ Username `{username}` is already claimed by you on {server_url}!"
+
f"✅ Username `{username}` is already claimed by you on {server_url}!",
)
else:
bot_handler.send_reply(
message,
-
f"❌ Username `{username}` is already claimed by another user on {server_url}."
+
f"❌ Username `{username}` is already claimed by another user on {server_url}.",
)
return
# Claim the username - prefer email for consistency
-
success = self.git_store.add_zulip_association(username, server_url, sender_email)
+
success = self.git_store.add_zulip_association(
+
username, server_url, sender_email
+
)
if success:
-
reply_msg = f"🎉 Successfully claimed username `{username}` for **{sender}** on {server_url}!\n" + \
-
"You will now be mentioned when new articles are posted from this user's feeds."
+
reply_msg = (
+
f"🎉 Successfully claimed username `{username}` for **{sender}** on {server_url}!\n"
+
+ "You will now be mentioned when new articles are posted from this user's feeds."
+
)
bot_handler.send_reply(message, reply_msg)
# Send notification to configured stream if enabled and not in debug mode
-
if (self.username_claim_notifications and
-
not self.debug_user and
-
self.stream_name and self.topic_name):
+
if (
+
self.username_claim_notifications
+
and not self.debug_user
+
and self.stream_name
+
and self.topic_name
+
):
try:
notification_msg = f"👋 **{sender}** claimed thicket username `{username}` on {server_url}"
-
bot_handler.send_message({
-
"type": "stream",
-
"to": self.stream_name,
-
"subject": self.topic_name,
-
"content": notification_msg
-
})
+
bot_handler.send_message(
+
{
+
"type": "stream",
+
"to": self.stream_name,
+
"subject": self.topic_name,
+
"content": notification_msg,
+
}
+
)
except Exception as e:
-
self.logger.error(f"Failed to send username claim notification: {e}")
+
self.logger.error(
+
f"Failed to send username claim notification: {e}"
+
)
-
self.logger.info(f"User {sender} ({sender_email}) claimed username {username} on {server_url}")
+
self.logger.info(
+
f"User {sender} ({sender_email}) claimed username {username} on {server_url}"
+
)
else:
bot_handler.send_reply(
message,
-
f"❌ Failed to claim username `{username}`. This shouldn't happen - please contact an administrator."
+
f"❌ Failed to claim username `{username}`. This shouldn't happen - please contact an administrator.",
)
except Exception as e:
···
message: dict[str, Any],
bot_handler: BotHandler,
args: list[str],
-
sender: str
+
sender: str,
) -> None:
"""Handle configuration commands."""
if len(args) < 2:
-
bot_handler.send_reply(message, "Usage: `@mention config <setting> <value>`")
+
bot_handler.send_reply(
+
message, "Usage: `@mention config <setting> <value>`"
+
)
return
setting = args[0].lower()
···
old_value = self.stream_name
self.stream_name = value
self._save_bot_config(bot_handler)
-
bot_handler.send_reply(message, f"✅ Stream set to: **{value}** (by {sender})")
-
self._send_config_change_notification(bot_handler, sender, "stream", old_value, value)
+
bot_handler.send_reply(
+
message, f"✅ Stream set to: **{value}** (by {sender})"
+
)
+
self._send_config_change_notification(
+
bot_handler, sender, "stream", old_value, value
+
)
elif setting == "topic":
old_value = self.topic_name
self.topic_name = value
self._save_bot_config(bot_handler)
-
bot_handler.send_reply(message, f"✅ Topic set to: **{value}** (by {sender})")
-
self._send_config_change_notification(bot_handler, sender, "topic", old_value, value)
+
bot_handler.send_reply(
+
message, f"✅ Topic set to: **{value}** (by {sender})"
+
)
+
self._send_config_change_notification(
+
bot_handler, sender, "topic", old_value, value
+
)
elif setting == "interval":
try:
interval = int(value)
if interval < 60:
-
bot_handler.send_reply(message, "❌ Interval must be at least 60 seconds")
+
bot_handler.send_reply(
+
message, "❌ Interval must be at least 60 seconds"
+
)
return
old_value = self.sync_interval
self.sync_interval = interval
self._save_bot_config(bot_handler)
-
bot_handler.send_reply(message, f"✅ Sync interval set to: **{interval}s** (by {sender})")
-
self._send_config_change_notification(bot_handler, sender, "sync interval", f"{old_value}s", f"{interval}s")
+
bot_handler.send_reply(
+
message, f"✅ Sync interval set to: **{interval}s** (by {sender})"
+
)
+
self._send_config_change_notification(
+
bot_handler,
+
sender,
+
"sync interval",
+
f"{old_value}s",
+
f"{interval}s",
+
)
except ValueError:
-
bot_handler.send_reply(message, "❌ Invalid interval value. Must be a number of seconds.")
+
bot_handler.send_reply(
+
message, "❌ Invalid interval value. Must be a number of seconds."
+
)
elif setting == "max_entries":
try:
max_entries = int(value)
if max_entries < 1 or max_entries > 50:
-
bot_handler.send_reply(message, "❌ Max entries must be between 1 and 50")
+
bot_handler.send_reply(
+
message, "❌ Max entries must be between 1 and 50"
+
)
return
old_value = self.max_entries_per_sync
self.max_entries_per_sync = max_entries
self._save_bot_config(bot_handler)
-
bot_handler.send_reply(message, f"✅ Max entries per sync set to: **{max_entries}** (by {sender})")
-
self._send_config_change_notification(bot_handler, sender, "max entries per sync", str(old_value), str(max_entries))
+
bot_handler.send_reply(
+
message,
+
f"✅ Max entries per sync set to: **{max_entries}** (by {sender})",
+
)
+
self._send_config_change_notification(
+
bot_handler,
+
sender,
+
"max entries per sync",
+
str(old_value),
+
str(max_entries),
+
)
except ValueError:
-
bot_handler.send_reply(message, "❌ Invalid max entries value. Must be a number.")
+
bot_handler.send_reply(
+
message, "❌ Invalid max entries value. Must be a number."
+
)
else:
bot_handler.send_reply(
message,
-
f"❌ Unknown setting: {setting}. Available: stream, topic, interval, max_entries"
+
f"❌ Unknown setting: {setting}. Available: stream, topic, interval, max_entries",
)
def _load_bot_config(self, bot_handler: BotHandler) -> None:
···
if "bot" in config:
bot_section = config["bot"]
self.sync_interval = bot_section.getint("sync_interval", 300)
-
self.max_entries_per_sync = bot_section.getint("max_entries_per_sync", 10)
+
self.max_entries_per_sync = bot_section.getint(
+
"max_entries_per_sync", 10
+
)
self.rate_limit_delay = bot_section.getint("rate_limit_delay", 5)
self.posts_per_batch = bot_section.getint("posts_per_batch", 5)
···
if "notifications" in config:
notifications_section = config["notifications"]
-
self.config_change_notifications = notifications_section.getboolean("config_change_notifications", True)
-
self.username_claim_notifications = notifications_section.getboolean("username_claim_notifications", True)
+
self.config_change_notifications = notifications_section.getboolean(
+
"config_change_notifications", True
+
)
+
self.username_claim_notifications = notifications_section.getboolean(
+
"username_claim_notifications", True
+
)
self.logger.info(f"Loaded configuration from {botrc_path}")
···
# Load thicket configuration
import yaml
+
with open(self.config_path) as f:
config_data = yaml.safe_load(f)
self.config = ThicketConfig(**config_data)
···
zulip_user_id = user.get_zulip_mention(server_url)
if not zulip_user_id:
-
raise ValueError(f"User '{self.debug_user}' has no Zulip association for server '{server_url}'")
+
raise ValueError(
+
f"User '{self.debug_user}' has no Zulip association for server '{server_url}'"
+
)
# Try to look up the actual Zulip user ID from the email address
# But don't fail if we can't - we'll try again when sending messages
···
if actual_user_id and actual_user_id != zulip_user_id:
# Successfully resolved to numeric ID
self.debug_zulip_user_id = actual_user_id
-
self.logger.info(f"Debug mode enabled: Will send DMs to {self.debug_user} (email: {zulip_user_id}, user_id: {actual_user_id}) on {server_url}")
+
self.logger.info(
+
f"Debug mode enabled: Will send DMs to {self.debug_user} (email: {zulip_user_id}, user_id: {actual_user_id}) on {server_url}"
+
)
else:
# Keep the email address, will resolve later when sending
self.debug_zulip_user_id = zulip_user_id
-
self.logger.info(f"Debug mode enabled: Will send DMs to {self.debug_user} ({zulip_user_id}) on {server_url} (will resolve user ID when sending)")
+
self.logger.info(
+
f"Debug mode enabled: Will send DMs to {self.debug_user} ({zulip_user_id}) on {server_url} (will resolve user ID when sending)"
+
)
-
def _lookup_zulip_user_id(self, bot_handler: BotHandler, email_or_id: str) -> Optional[str]:
+
def _lookup_zulip_user_id(
+
self, bot_handler: BotHandler, email_or_id: str
+
) -> Optional[str]:
"""Look up Zulip user ID from email address or return the ID if it's already numeric."""
# If it's already a numeric user ID, return it
if email_or_id.isdigit():
···
# First try the get_user_by_email API if available
try:
user_result = client.get_user_by_email(email_or_id)
-
if user_result.get('result') == 'success':
-
user_data = user_result.get('user', {})
-
user_id = user_data.get('user_id')
+
if user_result.get("result") == "success":
+
user_data = user_result.get("user", {})
+
user_id = user_data.get("user_id")
if user_id:
-
self.logger.info(f"Found user ID {user_id} for '{email_or_id}' via get_user_by_email API")
+
self.logger.info(
+
f"Found user ID {user_id} for '{email_or_id}' via get_user_by_email API"
+
)
return str(user_id)
except (AttributeError, Exception):
pass
# Fallback: Get all users and search through them
users_result = client.get_users()
-
if users_result.get('result') == 'success':
-
for user in users_result['members']:
-
user_email = user.get('email', '')
-
delivery_email = user.get('delivery_email', '')
+
if users_result.get("result") == "success":
+
for user in users_result["members"]:
+
user_email = user.get("email", "")
+
delivery_email = user.get("delivery_email", "")
-
if (user_email == email_or_id or
-
delivery_email == email_or_id or
-
str(user.get('user_id')) == email_or_id):
-
user_id = user.get('user_id')
+
if (
+
user_email == email_or_id
+
or delivery_email == email_or_id
+
or str(user.get("user_id")) == email_or_id
+
):
+
user_id = user.get("user_id")
return str(user_id)
-
self.logger.error(f"No user found with identifier '{email_or_id}'. Searched {len(users_result['members'])} users.")
+
self.logger.error(
+
f"No user found with identifier '{email_or_id}'. Searched {len(users_result['members'])} users."
+
)
return None
else:
-
self.logger.error(f"Failed to get users: {users_result.get('msg', 'Unknown error')}")
+
self.logger.error(
+
f"Failed to get users: {users_result.get('msg', 'Unknown error')}"
+
)
return None
except Exception as e:
self.logger.error(f"Error looking up user ID for '{email_or_id}': {e}")
return None
-
def _lookup_zulip_user_info(self, bot_handler: BotHandler, email_or_id: str) -> tuple[Optional[str], Optional[str]]:
+
def _lookup_zulip_user_info(
+
self, bot_handler: BotHandler, email_or_id: str
+
) -> tuple[Optional[str], Optional[str]]:
"""Look up both Zulip user ID and full name from email address."""
if email_or_id.isdigit():
return email_or_id, None
···
# Try get_user_by_email API first
try:
user_result = client.get_user_by_email(email_or_id)
-
if user_result.get('result') == 'success':
-
user_data = user_result.get('user', {})
-
user_id = user_data.get('user_id')
-
full_name = user_data.get('full_name', '')
+
if user_result.get("result") == "success":
+
user_data = user_result.get("user", {})
+
user_id = user_data.get("user_id")
+
full_name = user_data.get("full_name", "")
if user_id:
return str(user_id), full_name
except AttributeError:
···
# Fallback: search all users
users_result = client.get_users()
-
if users_result.get('result') == 'success':
-
for user in users_result['members']:
-
if (user.get('email') == email_or_id or
-
user.get('delivery_email') == email_or_id):
-
return str(user.get('user_id')), user.get('full_name', '')
+
if users_result.get("result") == "success":
+
for user in users_result["members"]:
+
if (
+
user.get("email") == email_or_id
+
or user.get("delivery_email") == email_or_id
+
):
+
return str(user.get("user_id")), user.get("full_name", "")
return None, None
···
def _save_posted_entries(self, bot_handler: BotHandler) -> None:
"""Save the set of posted entries."""
try:
-
bot_handler.storage.put("posted_entries", json.dumps(list(self.posted_entries)))
+
bot_handler.storage.put(
+
"posted_entries", json.dumps(list(self.posted_entries))
+
)
except Exception as e:
self.logger.error(f"Error saving posted entries: {e}")
-
def _check_initialization(self, message: dict[str, Any], bot_handler: BotHandler) -> bool:
+
def _check_initialization(
+
self, message: dict[str, Any], bot_handler: BotHandler
+
) -> bool:
"""Check if thicket is properly initialized."""
if not self.git_store or not self.config:
bot_handler.send_reply(
-
message,
-
"❌ Thicket not initialized. Please check configuration."
+
message, "❌ Thicket not initialized. Please check configuration."
)
return False
···
if not self.stream_name or not self.topic_name:
bot_handler.send_reply(
message,
-
"❌ Stream and topic must be configured first. Use `@mention config stream <name>` and `@mention config topic <name>`"
+
"❌ Stream and topic must be configured first. Use `@mention config stream <name>` and `@mention config topic <name>`",
)
return False
···
def _schedule_sync(self, bot_handler: BotHandler) -> None:
"""Schedule periodic sync operations."""
+
def sync_loop():
while True:
try:
# Check if we can sync
-
can_sync = (self.git_store and
-
((self.stream_name and self.topic_name) or
-
self.debug_user))
+
can_sync = self.git_store and (
+
(self.stream_name and self.topic_name) or self.debug_user
+
)
if can_sync:
self._perform_sync(bot_handler)
···
# Start background thread
import threading
+
sync_thread = threading.Thread(target=sync_loop, daemon=True)
sync_thread.start()
···
asyncio.set_event_loop(loop)
try:
new_count, _ = loop.run_until_complete(
-
sync_feed(self.git_store, username, str(feed_url), dry_run=False)
+
sync_feed(
+
self.git_store, username, str(feed_url), dry_run=False
+
)
)
entries_to_check = []
if new_count > 0:
# Get the newly added entries
-
entries_to_check = self.git_store.list_entries(username, limit=new_count)
+
entries_to_check = self.git_store.list_entries(
+
username, limit=new_count
+
)
# Always check for catchup mode on first run
if is_first_run:
# Catchup mode: get configured number of entries on first run
-
catchup_entries = self.git_store.list_entries(username, limit=self.catchup_entries)
-
entries_to_check = catchup_entries if not entries_to_check else entries_to_check
+
catchup_entries = self.git_store.list_entries(
+
username, limit=self.catchup_entries
+
)
+
entries_to_check = (
+
catchup_entries
+
if not entries_to_check
+
else entries_to_check
+
)
for entry in entries_to_check:
entry_key = f"{username}:{entry.id}"
···
loop.close()
except Exception as e:
-
self.logger.error(f"Error syncing feed {feed_url} for user {username}: {e}")
+
self.logger.error(
+
f"Error syncing feed {feed_url} for user {username}: {e}"
+
)
if len(new_entries) >= self.max_entries_per_sync:
break
···
posted_count += 1
# Rate limiting: pause after configured number of messages
-
if posted_count % self.posts_per_batch == 0 and i < len(new_entries) - 1:
+
if (
+
posted_count % self.posts_per_batch == 0
+
and i < len(new_entries) - 1
+
):
time.sleep(self.rate_limit_delay)
self._save_posted_entries(bot_handler)
···
return [entry for entry, _ in new_entries]
-
def _post_entry_to_zulip(self, entry: AtomEntry, bot_handler: BotHandler, username: str) -> None:
+
def _post_entry_to_zulip(
+
self, entry: AtomEntry, bot_handler: BotHandler, username: str
+
) -> None:
"""Post a single entry to the configured Zulip stream/topic or debug user DM."""
try:
# Get current Zulip server from environment
···
zulip_user_id = user.get_zulip_mention(server_url)
if zulip_user_id:
# Look up the actual Zulip full name for proper @mention
-
_, zulip_full_name = self._lookup_zulip_user_info(bot_handler, zulip_user_id)
+
_, zulip_full_name = self._lookup_zulip_user_info(
+
bot_handler, zulip_user_id
+
)
display_name = zulip_full_name or user.display_name or username
# Check if author is different from the user - avoid redundancy
···
published_info = ""
if entry.published:
-
published_info = f" • {entry.published.strftime('%Y-%m-%d')}"
+
published_info = (
+
f" • {entry.published.strftime('%Y-%m-%d')}"
+
)
mention_info = f"@**{display_name}** posted{author_info}{published_info}:\n\n"
···
if entry.published:
published_info = f" • {entry.published.strftime('%Y-%m-%d')}"
-
mention_info = f"**{display_name}** posted{author_info}{published_info}:\n\n"
+
mention_info = (
+
f"**{display_name}** posted{author_info}{published_info}:\n\n"
+
)
# Format the message with HTML processing
message_lines = [
···
user_id_to_use = self.debug_zulip_user_id
if not user_id_to_use.isdigit():
# Need to look up the numeric ID
-
resolved_id = self._lookup_zulip_user_id(bot_handler, user_id_to_use)
+
resolved_id = self._lookup_zulip_user_id(
+
bot_handler, user_id_to_use
+
)
if resolved_id:
user_id_to_use = resolved_id
-
self.logger.debug(f"Resolved {self.debug_zulip_user_id} to user ID {user_id_to_use}")
+
self.logger.debug(
+
f"Resolved {self.debug_zulip_user_id} to user ID {user_id_to_use}"
+
)
else:
-
self.logger.error(f"Could not resolve user ID for {self.debug_zulip_user_id}")
+
self.logger.error(
+
f"Could not resolve user ID for {self.debug_zulip_user_id}"
+
)
return
try:
# For private messages, user_id needs to be an integer, not string
user_id_int = int(user_id_to_use)
-
bot_handler.send_message({
-
"type": "private",
-
"to": [user_id_int], # Use integer user ID
-
"content": debug_message
-
})
+
bot_handler.send_message(
+
{
+
"type": "private",
+
"to": [user_id_int], # Use integer user ID
+
"content": debug_message,
+
}
+
)
except ValueError:
# If conversion to int fails, user_id_to_use might be an email
try:
-
bot_handler.send_message({
-
"type": "private",
-
"to": [user_id_to_use], # Try as string (email)
-
"content": debug_message
-
})
+
bot_handler.send_message(
+
{
+
"type": "private",
+
"to": [user_id_to_use], # Try as string (email)
+
"content": debug_message,
+
}
+
)
except Exception as e2:
-
self.logger.error(f"Failed to send DM to {self.debug_user} (tried both int and string): {e2}")
+
self.logger.error(
+
f"Failed to send DM to {self.debug_user} (tried both int and string): {e2}"
+
)
return
except Exception as e:
-
self.logger.error(f"Failed to send DM to {self.debug_user} ({user_id_to_use}): {e}")
+
self.logger.error(
+
f"Failed to send DM to {self.debug_user} ({user_id_to_use}): {e}"
+
)
return
-
self.logger.info(f"Posted entry to debug user {self.debug_user}: {entry.title}")
+
self.logger.info(
+
f"Posted entry to debug user {self.debug_user}: {entry.title}"
+
)
else:
# Normal mode: send to stream/topic
-
bot_handler.send_message({
-
"type": "stream",
-
"to": self.stream_name,
-
"subject": self.topic_name,
-
"content": message_content
-
})
-
self.logger.info(f"Posted entry to stream: {entry.title} (user: {username})")
+
bot_handler.send_message(
+
{
+
"type": "stream",
+
"to": self.stream_name,
+
"subject": self.topic_name,
+
"content": message_content,
+
}
+
)
+
self.logger.info(
+
f"Posted entry to stream: {entry.title} (user: {username})"
+
)
except Exception as e:
self.logger.error(f"Error posting entry to Zulip: {e}")
···
html_content,
heading_style="ATX", # Use # for headings (but we'll post-process these)
bullets="-", # Use - for bullets
-
convert=['a', 'b', 'strong', 'i', 'em', 'code', 'pre', 'p', 'br', 'ul', 'ol', 'li', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6']
+
convert=[
+
"a",
+
"b",
+
"strong",
+
"i",
+
"em",
+
"code",
+
"pre",
+
"p",
+
"br",
+
"ul",
+
"ol",
+
"li",
+
"h1",
+
"h2",
+
"h3",
+
"h4",
+
"h5",
+
"h6",
+
],
).strip()
# Post-process to convert headings to bold for compact summaries
import re
+
# Convert markdown headers to bold with period
-
markdown = re.sub(r'^#{1,6}\s*(.+)$', r'**\1.**', markdown, flags=re.MULTILINE)
+
markdown = re.sub(
+
r"^#{1,6}\s*(.+)$", r"**\1.**", markdown, flags=re.MULTILINE
+
)
# Clean up excessive newlines and make more compact
-
markdown = re.sub(r'\n\s*\n\s*\n+', ' ', markdown) # Multiple newlines become space
-
markdown = re.sub(r'\n\s*\n', '. ', markdown) # Double newlines become sentence breaks
-
markdown = re.sub(r'\n', ' ', markdown) # Single newlines become spaces
+
markdown = re.sub(
+
r"\n\s*\n\s*\n+", " ", markdown
+
) # Multiple newlines become space
+
markdown = re.sub(
+
r"\n\s*\n", ". ", markdown
+
) # Double newlines become sentence breaks
+
markdown = re.sub(r"\n", " ", markdown) # Single newlines become spaces
# Clean up double periods and excessive whitespace
-
markdown = re.sub(r'\.\.+', '.', markdown)
-
markdown = re.sub(r'\s+', ' ', markdown)
+
markdown = re.sub(r"\.\.+", ".", markdown)
+
markdown = re.sub(r"\s+", " ", markdown)
return markdown.strip()
except ImportError:
# Fallback: manual HTML processing
import re
+
content = html_content
# Convert headings to bold with periods for compact summaries
-
content = re.sub(r'<h[1-6](?:\s[^>]*)?>([^<]*)</h[1-6]>', r'**\1.** ', content, flags=re.IGNORECASE)
+
content = re.sub(
+
r"<h[1-6](?:\s[^>]*)?>([^<]*)</h[1-6]>",
+
r"**\1.** ",
+
content,
+
flags=re.IGNORECASE,
+
)
# Convert common HTML elements to Markdown
-
content = re.sub(r'<(?:strong|b)(?:\s[^>]*)?>([^<]*)</(?:strong|b)>', r'**\1**', content, flags=re.IGNORECASE)
-
content = re.sub(r'<(?:em|i)(?:\s[^>]*)?>([^<]*)</(?:em|i)>', r'*\1*', content, flags=re.IGNORECASE)
-
content = re.sub(r'<code(?:\s[^>]*)?>([^<]*)</code>', r'`\1`', content, flags=re.IGNORECASE)
-
content = re.sub(r'<a(?:\s[^>]*?)?\s*href=["\']([^"\']*)["\'](?:\s[^>]*)?>([^<]*)</a>', r'[\2](\1)', content, flags=re.IGNORECASE)
+
content = re.sub(
+
r"<(?:strong|b)(?:\s[^>]*)?>([^<]*)</(?:strong|b)>",
+
r"**\1**",
+
content,
+
flags=re.IGNORECASE,
+
)
+
content = re.sub(
+
r"<(?:em|i)(?:\s[^>]*)?>([^<]*)</(?:em|i)>",
+
r"*\1*",
+
content,
+
flags=re.IGNORECASE,
+
)
+
content = re.sub(
+
r"<code(?:\s[^>]*)?>([^<]*)</code>",
+
r"`\1`",
+
content,
+
flags=re.IGNORECASE,
+
)
+
content = re.sub(
+
r'<a(?:\s[^>]*?)?\s*href=["\']([^"\']*)["\'](?:\s[^>]*)?>([^<]*)</a>',
+
r"[\2](\1)",
+
content,
+
flags=re.IGNORECASE,
+
)
# Convert block elements to spaces instead of newlines for compactness
-
content = re.sub(r'<br\s*/?>', ' ', content, flags=re.IGNORECASE)
-
content = re.sub(r'</p>\s*<p>', '. ', content, flags=re.IGNORECASE)
-
content = re.sub(r'</?(?:p|div)(?:\s[^>]*)?>', ' ', content, flags=re.IGNORECASE)
+
content = re.sub(r"<br\s*/?>", " ", content, flags=re.IGNORECASE)
+
content = re.sub(r"</p>\s*<p>", ". ", content, flags=re.IGNORECASE)
+
content = re.sub(
+
r"</?(?:p|div)(?:\s[^>]*)?>", " ", content, flags=re.IGNORECASE
+
)
# Remove remaining HTML tags
-
content = re.sub(r'<[^>]+>', '', content)
+
content = re.sub(r"<[^>]+>", "", content)
# Clean up whitespace and make compact
-
content = re.sub(r'\s+', ' ', content) # Multiple whitespace becomes single space
-
content = re.sub(r'\.\.+', '.', content) # Multiple periods become single period
+
content = re.sub(
+
r"\s+", " ", content
+
) # Multiple whitespace becomes single space
+
content = re.sub(
+
r"\.\.+", ".", content
+
) # Multiple periods become single period
return content.strip()
except Exception as e:
self.logger.error(f"Error processing HTML content: {e}")
# Last resort: just strip HTML tags
import re
-
return re.sub(r'<[^>]+>', '', html_content).strip()
+
+
return re.sub(r"<[^>]+>", "", html_content).strip()
def _get_schedule_info(self) -> str:
"""Get schedule information string."""
···
if self.last_sync_time:
import datetime
+
last_sync = datetime.datetime.fromtimestamp(self.last_sync_time)
next_sync = last_sync + datetime.timedelta(seconds=self.sync_interval)
now = datetime.datetime.now()
···
else:
time_str = f"{seconds}s"
-
lines.extend([
-
f"🕐 **Last Sync:** {last_sync.strftime('%H:%M:%S')}",
-
f"⏰ **Next Sync:** {next_sync.strftime('%H:%M:%S')} (in {time_str})",
-
])
+
lines.extend(
+
[
+
f"🕐 **Last Sync:** {last_sync.strftime('%H:%M:%S')}",
+
f"⏰ **Next Sync:** {next_sync.strftime('%H:%M:%S')} (in {time_str})",
+
]
+
)
else:
-
lines.extend([
-
f"🕐 **Last Sync:** {last_sync.strftime('%H:%M:%S')}",
-
f"⏰ **Next Sync:** Due now (running every {self.sync_interval}s)",
-
])
+
lines.extend(
+
[
+
f"🕐 **Last Sync:** {last_sync.strftime('%H:%M:%S')}",
+
f"⏰ **Next Sync:** Due now (running every {self.sync_interval}s)",
+
]
+
)
else:
lines.append("🕐 **Last Sync:** Never (bot starting up)")
# Add sync frequency info
if self.sync_interval >= 3600:
-
frequency_str = f"{self.sync_interval // 3600}h {(self.sync_interval % 3600) // 60}m"
+
frequency_str = (
+
f"{self.sync_interval // 3600}h {(self.sync_interval % 3600) // 60}m"
+
)
elif self.sync_interval >= 60:
frequency_str = f"{self.sync_interval // 60}m {self.sync_interval % 60}s"
else:
···
return "\n".join(lines)
-
def _send_config_change_notification(self, bot_handler: BotHandler, changer: str, setting: str, old_value: Optional[str], new_value: str) -> None:
+
def _send_config_change_notification(
+
self,
+
bot_handler: BotHandler,
+
changer: str,
+
setting: str,
+
old_value: Optional[str],
+
new_value: str,
+
) -> None:
"""Send configuration change notification if enabled."""
if not self.config_change_notifications or self.debug_user:
return
···
try:
old_display = old_value if old_value else "(not set)"
-
notification_msg = f"⚙️ **{changer}** changed {setting}: `{old_display}` → `{new_value}`"
+
notification_msg = (
+
f"⚙️ **{changer}** changed {setting}: `{old_display}` → `{new_value}`"
+
)
-
bot_handler.send_message({
-
"type": "stream",
-
"to": self.stream_name,
-
"subject": self.topic_name,
-
"content": notification_msg
-
})
+
bot_handler.send_message(
+
{
+
"type": "stream",
+
"to": self.stream_name,
+
"subject": self.topic_name,
+
"content": notification_msg,
+
}
+
)
except Exception as e:
self.logger.error(f"Failed to send config change notification: {e}")
handler_class = ThicketBotHandler
-
+12 -1
src/thicket/cli/commands/__init__.py
···
zulip,
)
-
__all__ = ["add", "bot", "duplicates", "info_cmd", "init", "list_cmd", "search", "sync", "upload", "zulip"]
+
__all__ = [
+
"add",
+
"bot",
+
"duplicates",
+
"info_cmd",
+
"init",
+
"list_cmd",
+
"search",
+
"sync",
+
"upload",
+
"zulip",
+
]
+29 -13
src/thicket/cli/commands/bot.py
···
),
) -> None:
"""Manage the Thicket Zulip bot.
-
+
Actions:
- run: Start the Zulip bot
- test: Test bot functionality
···
raise typer.Exit(1)
-
def _run_bot(config_file: Path, thicket_config: Path, daemon: bool, debug_user: str = None) -> None:
+
def _run_bot(
+
config_file: Path, thicket_config: Path, daemon: bool, debug_user: str = None
+
) -> None:
"""Run the Zulip bot."""
if not config_file.exists():
print_error(f"Configuration file not found: {config_file}")
-
print_info(f"Copy bot-config/zuliprc.template to {config_file} and configure it")
+
print_info(
+
f"Copy bot-config/zuliprc.template to {config_file} and configure it"
+
)
print_info("See bot-config/README.md for setup instructions")
raise typer.Exit(1)
···
print_info(f"Using thicket config: {thicket_config}")
if debug_user:
-
print_info(f"🐛 DEBUG MODE: Will send DMs to thicket user '{debug_user}' instead of posting to streams")
+
print_info(
+
f"🐛 DEBUG MODE: Will send DMs to thicket user '{debug_user}' instead of posting to streams"
+
)
if daemon:
print_info("Running in daemon mode...")
···
try:
# Build the command
cmd = [
-
sys.executable, "-m", "zulip_bots.run",
+
sys.executable,
+
"-m",
+
"zulip_bots.run",
"src/thicket/bots/thicket_bot.py",
-
"--config-file", str(config_file)
+
"--config-file",
+
str(config_file),
]
# Add environment variables for bot configuration
import os
+
env = os.environ.copy()
# Always pass thicket config path
···
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
-
env=env
+
env=env,
)
print_success(f"Bot started in background with PID {process.pid}")
else:
···
except subprocess.CalledProcessError as e:
print_error(f"Failed to start bot: {e}")
-
raise typer.Exit(1)
+
raise typer.Exit(1) from e
except KeyboardInterrupt:
print_info("Bot stopped by user")
···
except Exception as e:
print_error(f"Bot test failed: {e}")
-
raise typer.Exit(1)
+
raise typer.Exit(1) from e
def _bot_status(config_file: Path) -> None:
···
console.print(f"✓ Config file: {config_file}", style="green")
else:
console.print(f"✗ Config file not found: {config_file}", style="red")
-
console.print(" Copy bot-config/zuliprc.template and configure it", style="yellow")
-
console.print(" See bot-config/README.md for setup instructions", style="yellow")
+
console.print(
+
" Copy bot-config/zuliprc.template and configure it", style="yellow"
+
)
+
console.print(
+
" See bot-config/README.md for setup instructions", style="yellow"
+
)
# Check dependencies
try:
import zulip_bots
-
version = getattr(zulip_bots, '__version__', 'unknown')
+
+
version = getattr(zulip_bots, "__version__", "unknown")
console.print(f"✓ zulip-bots version: {version}", style="green")
except ImportError:
console.print("✗ zulip-bots not installed", style="red")
try:
-
from ...bots.thicket_bot import ThicketBotHandler
+
from ...bots.thicket_bot import ThicketBotHandler # noqa: F401
+
console.print("✓ ThicketBotHandler available", style="green")
except ImportError as e:
console.print(f"✗ Bot handler not available: {e}", style="red")
+1 -1
src/thicket/cli/commands/info_cmd.py
···
except Exception as e:
console.print(f"[red]Error displaying entry info: {e}[/red]")
-
raise typer.Exit(1)
+
raise typer.Exit(1) from e
def _display_entry_info(entry, username: str) -> None:
+40 -35
src/thicket/cli/commands/search.py
···
# Check that we have required configuration
if not final_url:
console.print("[red]Error: Typesense URL is required[/red]")
-
console.print("Either provide --typesense-url or create ~/.typesense/url file")
+
console.print(
+
"Either provide --typesense-url or create ~/.typesense/url file"
+
)
raise typer.Exit(1)
if not final_api_key:
console.print("[red]Error: Typesense API key is required[/red]")
-
console.print("Either provide --api-key or create ~/.typesense/api_key file")
+
console.print(
+
"Either provide --api-key or create ~/.typesense/api_key file"
+
)
raise typer.Exit(1)
# Create Typesense configuration
typesense_config = TypesenseConfig.from_url(
-
final_url,
-
final_api_key,
-
collection_name
+
final_url, final_api_key, collection_name
)
typesense_config.connection_timeout = timeout
···
# Prepare search parameters
search_params = {
-
'per_page': limit,
+
"per_page": limit,
}
# Add user filter if specified
if user:
-
search_params['filter_by'] = f'username:{user}'
+
search_params["filter_by"] = f"username:{user}"
# Perform search
try:
···
if raw:
import json
+
console.print(json.dumps(results, indent=2))
return
···
def _display_search_results(results: dict, query: str) -> None:
"""Display search results in a formatted table."""
-
hits = results.get('hits', [])
-
found = results.get('found', 0)
-
search_time = results.get('search_time_ms', 0)
+
hits = results.get("hits", [])
+
found = results.get("found", 0)
+
search_time = results.get("search_time_ms", 0)
if not hits:
console.print("\n[yellow]No results found.[/yellow]")
···
table.add_column("Summary", style="dim", width=50)
for hit in hits:
-
doc = hit['document']
+
doc = hit["document"]
# Format score
score = f"{hit.get('text_match', 0):.2f}"
# Format user
-
user_display = doc.get('user_display_name', doc.get('username', 'Unknown'))
+
user_display = doc.get("user_display_name", doc.get("username", "Unknown"))
if len(user_display) > 12:
user_display = user_display[:9] + "..."
# Format title
-
title = doc.get('title', 'Untitled')
+
title = doc.get("title", "Untitled")
if len(title) > 40:
title = title[:37] + "..."
# Format date
-
updated_timestamp = doc.get('updated', 0)
+
updated_timestamp = doc.get("updated", 0)
if updated_timestamp:
from datetime import datetime
+
updated_date = datetime.fromtimestamp(updated_timestamp)
updated_str = updated_date.strftime("%Y-%m-%d")
else:
updated_str = "Unknown"
# Format summary
-
summary = doc.get('summary') or doc.get('content', '')
+
summary = doc.get("summary") or doc.get("content", "")
if summary:
# Remove HTML tags and truncate
import re
-
summary = re.sub(r'<[^>]+>', '', summary)
+
+
summary = re.sub(r"<[^>]+>", "", summary)
summary = summary.strip()
if len(summary) > 60:
summary = summary[:57] + "..."
else:
summary = ""
-
table.add_row(
-
score,
-
user_display,
-
title,
-
updated_str,
-
summary
-
)
+
table.add_row(score, user_display, title, updated_str, summary)
console.print(table)
# Show additional info
console.print(f"\n[dim]Showing {len(hits)} of {found} results[/dim]")
if len(hits) < found:
-
console.print(f"[dim]Use --limit to see more results (current limit: {len(hits)})[/dim]")
+
console.print(
+
f"[dim]Use --limit to see more results (current limit: {len(hits)})[/dim]"
+
)
def _display_compact_results(results: dict, query: str) -> None:
"""Display search results in a compact format."""
-
hits = results.get('hits', [])
-
found = results.get('found', 0)
+
hits = results.get("hits", [])
+
found = results.get("found", 0)
if not hits:
console.print("\n[yellow]No results found.[/yellow]")
···
console.print(f"\n[green]Found {found} results[/green]\n")
for i, hit in enumerate(hits, 1):
-
doc = hit['document']
-
score = hit.get('text_match', 0)
+
doc = hit["document"]
+
score = hit.get("text_match", 0)
# Header with score and user
-
user = doc.get('user_display_name', doc.get('username', 'Unknown'))
-
console.print(f"[green]{i:2d}.[/green] [cyan]{user}[/cyan] [dim](score: {score:.2f})[/dim]")
+
user = doc.get("user_display_name", doc.get("username", "Unknown"))
+
console.print(
+
f"[green]{i:2d}.[/green] [cyan]{user}[/cyan] [dim](score: {score:.2f})[/dim]"
+
)
# Title
-
title = doc.get('title', 'Untitled')
+
title = doc.get("title", "Untitled")
console.print(f" [bold]{title}[/bold]")
# Date and link
-
updated_timestamp = doc.get('updated', 0)
+
updated_timestamp = doc.get("updated", 0)
if updated_timestamp:
from datetime import datetime
+
updated_date = datetime.fromtimestamp(updated_timestamp)
updated_str = updated_date.strftime("%Y-%m-%d %H:%M")
else:
updated_str = "Unknown date"
-
link = doc.get('link', '')
+
link = doc.get("link", "")
console.print(f" [blue]{updated_str}[/blue] - [link={link}]{link}[/link]")
# Summary
-
summary = doc.get('summary') or doc.get('content', '')
+
summary = doc.get("summary") or doc.get("content", "")
if summary:
import re
-
summary = re.sub(r'<[^>]+>', '', summary)
+
+
summary = re.sub(r"<[^>]+>", "", summary)
summary = summary.strip()
if len(summary) > 150:
summary = summary[:147] + "..."
+44 -20
src/thicket/cli/commands/upload.py
···
return url, api_key
-
def _save_typesense_config(url: Optional[str] = None, api_key: Optional[str] = None) -> None:
+
def _save_typesense_config(
+
url: Optional[str] = None, api_key: Optional[str] = None
+
) -> None:
"""Save Typesense URL and API key to ~/.typesense directory."""
typesense_dir = Path.home() / ".typesense"
typesense_dir.mkdir(exist_ok=True, mode=0o700) # Secure permissions
···
# Check that we have required configuration
if not final_url:
console.print("[red]Error: Typesense URL is required[/red]")
-
console.print("Either provide --typesense-url or create ~/.typesense/url file")
+
console.print(
+
"Either provide --typesense-url or create ~/.typesense/url file"
+
)
raise typer.Exit(1)
if not final_api_key:
console.print("[red]Error: Typesense API key is required[/red]")
-
console.print("Either provide --api-key or create ~/.typesense/api_key file")
+
console.print(
+
"Either provide --api-key or create ~/.typesense/api_key file"
+
)
raise typer.Exit(1)
# Save configuration if provided via command line (for future use)
···
# Create Typesense configuration
typesense_config = TypesenseConfig.from_url(
-
final_url,
-
final_api_key,
-
collection_name
+
final_url, final_api_key, collection_name
)
typesense_config.connection_timeout = timeout
···
raise typer.Exit(1) from e
-
def _dry_run_upload(git_store: GitStore, config: ThicketConfig, typesense_config: TypesenseConfig) -> None:
+
def _dry_run_upload(
+
git_store: GitStore, config: ThicketConfig, typesense_config: TypesenseConfig
+
) -> None:
"""Perform a dry run showing what would be uploaded."""
console.print("\n[bold]Dry run analysis:[/bold]")
···
entry_files = list(user_dir.glob("*.json"))
total_entries += len(entry_files)
-
console.print(f" ✅ User {username}: {len(entry_files)} entries would be uploaded")
+
console.print(
+
f" ✅ User {username}: {len(entry_files)} entries would be uploaded"
+
)
except Exception as e:
console.print(f" ❌ User {username}: Error loading entries - {e}")
···
console.print(f" • Total users: {len(index.users)}")
console.print(f" • Total entries to upload: {total_entries}")
console.print(f" • Target collection: {typesense_config.collection_name}")
-
console.print(f" • Typesense server: {typesense_config.protocol}://{typesense_config.host}:{typesense_config.port}")
+
console.print(
+
f" • Typesense server: {typesense_config.protocol}://{typesense_config.host}:{typesense_config.port}"
+
)
if total_entries > 0:
console.print("\n[green]Ready to upload! Remove --dry-run to proceed.[/green]")
···
console.print("\n[yellow]No entries found to upload.[/yellow]")
-
def _perform_upload(git_store: GitStore, config: ThicketConfig, typesense_config: TypesenseConfig) -> None:
+
def _perform_upload(
+
git_store: GitStore, config: ThicketConfig, typesense_config: TypesenseConfig
+
) -> None:
"""Perform the actual upload to Typesense."""
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
-
# Test connection
progress.add_task("Testing Typesense connection...", total=None)
···
TextColumn("[progress.description]{task.description}"),
console=console,
) as upload_progress:
-
upload_progress.add_task("Uploading entries to Typesense...", total=None)
try:
···
# Batch import results
success_count = sum(1 for r in result if r.get("success"))
total_count = len(result)
-
console.print(f"[green]✅ Upload completed: {success_count}/{total_count} documents uploaded successfully[/green]")
+
console.print(
+
f"[green]✅ Upload completed: {success_count}/{total_count} documents uploaded successfully[/green]"
+
)
# Show any errors
errors = [r for r in result if not r.get("success")]
if errors:
-
console.print(f"[yellow]⚠️ {len(errors)} documents had errors[/yellow]")
-
for i, error in enumerate(errors[:5]): # Show first 5 errors
-
console.print(f" Error {i+1}: {error}")
+
console.print(
+
f"[yellow]⚠️ {len(errors)} documents had errors[/yellow]"
+
)
+
for i, error in enumerate(
+
errors[:5]
+
): # Show first 5 errors
+
console.print(f" Error {i + 1}: {error}")
if len(errors) > 5:
-
console.print(f" ... and {len(errors) - 5} more errors")
+
console.print(
+
f" ... and {len(errors) - 5} more errors"
+
)
else:
console.print("[green]✅ Upload completed successfully[/green]")
else:
-
console.print("[yellow]⚠️ Upload completed but no result data available[/yellow]")
+
console.print(
+
"[yellow]⚠️ Upload completed but no result data available[/yellow]"
+
)
console.print("\n[bold]Collection information:[/bold]")
-
console.print(f" • Server: {typesense_config.protocol}://{typesense_config.host}:{typesense_config.port}")
+
console.print(
+
f" • Server: {typesense_config.protocol}://{typesense_config.host}:{typesense_config.port}"
+
)
console.print(f" • Collection: {typesense_config.collection_name}")
-
console.print("\n[dim]You can now search your entries using the Typesense API or dashboard.[/dim]")
+
console.print(
+
"\n[dim]You can now search your entries using the Typesense API or dashboard.[/dim]"
+
)
except Exception as e:
upload_progress.stop()
+26 -16
src/thicket/cli/commands/zulip.py
···
@app.command()
def zulip_add(
username: str = typer.Argument(..., help="Username to associate with Zulip"),
-
server: str = typer.Argument(..., help="Zulip server (e.g., yourorg.zulipchat.com)"),
+
server: str = typer.Argument(
+
..., help="Zulip server (e.g., yourorg.zulipchat.com)"
+
),
user_id: str = typer.Argument(..., help="Zulip user ID or email for @mentions"),
config_file: Path = typer.Option(
Path("thicket.yaml"),
···
),
) -> None:
"""Add a Zulip association for a user.
-
+
This associates a thicket user with their Zulip identity, enabling
@mentions when the bot posts their articles.
-
+
Example:
thicket zulip-add alice myorg.zulipchat.com alice@example.com
"""
···
except Exception as e:
print_error(f"Failed to add Zulip association: {e}")
-
raise typer.Exit(1)
+
raise typer.Exit(1) from e
@app.command()
···
),
) -> None:
"""Remove a Zulip association from a user.
-
+
Example:
thicket zulip-remove alice myorg.zulipchat.com alice@example.com
"""
···
# Remove association
if git_store.remove_zulip_association(username, server, user_id):
-
print_success(f"Removed Zulip association for {username}: {user_id}@{server}")
+
print_success(
+
f"Removed Zulip association for {username}: {user_id}@{server}"
+
)
git_store.commit_changes(f"Remove Zulip association for {username}")
else:
print_error(f"Association not found for {username}: {user_id}@{server}")
···
except Exception as e:
print_error(f"Failed to remove Zulip association: {e}")
-
raise typer.Exit(1)
+
raise typer.Exit(1) from e
@app.command()
def zulip_list(
-
username: Optional[str] = typer.Argument(None, help="Username to list associations for"),
+
username: Optional[str] = typer.Argument(
+
None, help="Username to list associations for"
+
),
config_file: Path = typer.Option(
Path("thicket.yaml"),
"--config",
···
),
) -> None:
"""List Zulip associations for users.
-
+
If no username is provided, lists associations for all users.
-
+
Examples:
thicket zulip-list # List all associations
thicket zulip-list alice # List associations for alice
···
except Exception as e:
print_error(f"Failed to list Zulip associations: {e}")
-
raise typer.Exit(1)
+
raise typer.Exit(1) from e
@app.command()
···
),
) -> None:
"""Import Zulip associations from a CSV file.
-
+
CSV format (no header):
username,server,user_id
alice,myorg.zulipchat.com,alice@example.com
bob,myorg.zulipchat.com,bob.smith
-
+
Example:
thicket zulip-import associations.csv
"""
···
for a in user.zulip_associations
)
if exists:
-
print_info(f"Would skip existing: {username} -> {user_id}@{server}")
+
print_info(
+
f"Would skip existing: {username} -> {user_id}@{server}"
+
)
skipped += 1
else:
print_info(f"Would add: {username} -> {user_id}@{server}")
···
print_success(f"Added: {username} -> {user_id}@{server}")
added += 1
else:
-
print_info(f"Skipped existing: {username} -> {user_id}@{server}")
+
print_info(
+
f"Skipped existing: {username} -> {user_id}@{server}"
+
)
skipped += 1
# Summary
···
except Exception as e:
print_error(f"Failed to import Zulip associations: {e}")
-
raise typer.Exit(1)
+
raise typer.Exit(1) from e
+1 -1
src/thicket/cli/main.py
···
# Import commands to register them
-
from .commands import ( # noqa: F401
+
from .commands import ( # noqa: F401, E402
add,
duplicates,
info_cmd,
+3 -1
src/thicket/core/git_store.py
···
return result
-
def remove_zulip_association(self, username: str, server: str, user_id: str) -> bool:
+
def remove_zulip_association(
+
self, username: str, server: str, user_id: str
+
) -> bool:
"""Remove a Zulip association from a user."""
index = self._load_index()
user = index.get_user(username)
+114 -58
src/thicket/core/typesense_client.py
···
collection_name: str = "thicket_entries"
@classmethod
-
def from_url(cls, url: str, api_key: str, collection_name: str = "thicket_entries") -> "TypesenseConfig":
+
def from_url(
+
cls, url: str, api_key: str, collection_name: str = "thicket_entries"
+
) -> "TypesenseConfig":
"""Create config from Typesense URL."""
parsed = urlparse(url)
return cls(
···
def __init__(self, config: TypesenseConfig):
"""Initialize Typesense client."""
self.config = config
-
self.client = typesense.Client({
-
'nodes': [{
-
'host': config.host,
-
'port': config.port,
-
'protocol': config.protocol,
-
}],
-
'api_key': config.api_key,
-
'connection_timeout_seconds': config.connection_timeout,
-
})
+
self.client = typesense.Client(
+
{
+
"nodes": [
+
{
+
"host": config.host,
+
"port": config.port,
+
"protocol": config.protocol,
+
}
+
],
+
"api_key": config.api_key,
+
"connection_timeout_seconds": config.connection_timeout,
+
}
+
)
def get_collection_schema(self) -> dict[str, Any]:
"""Get the Typesense collection schema for thicket entries."""
return {
-
'name': self.config.collection_name,
-
'fields': [
+
"name": self.config.collection_name,
+
"fields": [
# Primary identifiers
-
{'name': 'id', 'type': 'string', 'facet': False},
-
{'name': 'original_id', 'type': 'string', 'facet': False},
-
+
{"name": "id", "type": "string", "facet": False},
+
{"name": "original_id", "type": "string", "facet": False},
# Content fields - optimized for search
-
{'name': 'title', 'type': 'string', 'facet': False},
-
{'name': 'summary', 'type': 'string', 'optional': True, 'facet': False},
-
{'name': 'content', 'type': 'string', 'optional': True, 'facet': False},
-
{'name': 'content_type', 'type': 'string', 'facet': True},
-
+
{"name": "title", "type": "string", "facet": False},
+
{"name": "summary", "type": "string", "optional": True, "facet": False},
+
{"name": "content", "type": "string", "optional": True, "facet": False},
+
{"name": "content_type", "type": "string", "facet": True},
# Searchable combined fields for embeddings/semantic search
-
{'name': 'searchable_content', 'type': 'string', 'facet': False},
-
{'name': 'searchable_metadata', 'type': 'string', 'facet': False},
-
+
{"name": "searchable_content", "type": "string", "facet": False},
+
{"name": "searchable_metadata", "type": "string", "facet": False},
# Temporal fields
-
{'name': 'updated', 'type': 'int64', 'facet': False, 'sort': True},
-
{'name': 'published', 'type': 'int64', 'optional': True, 'facet': False, 'sort': True},
-
+
{"name": "updated", "type": "int64", "facet": False, "sort": True},
+
{
+
"name": "published",
+
"type": "int64",
+
"optional": True,
+
"facet": False,
+
"sort": True,
+
},
# Link and source
-
{'name': 'link', 'type': 'string', 'facet': False},
-
{'name': 'source', 'type': 'string', 'optional': True, 'facet': False},
-
+
{"name": "link", "type": "string", "facet": False},
+
{"name": "source", "type": "string", "optional": True, "facet": False},
# Categories and classification
-
{'name': 'categories', 'type': 'string[]', 'facet': True, 'optional': True},
-
{'name': 'rights', 'type': 'string', 'optional': True, 'facet': False},
-
+
{
+
"name": "categories",
+
"type": "string[]",
+
"facet": True,
+
"optional": True,
+
},
+
{"name": "rights", "type": "string", "optional": True, "facet": False},
# User/feed metadata - facetable for filtering
-
{'name': 'username', 'type': 'string', 'facet': True},
-
{'name': 'user_display_name', 'type': 'string', 'optional': True, 'facet': True},
-
{'name': 'user_email', 'type': 'string', 'optional': True, 'facet': False},
-
{'name': 'user_homepage', 'type': 'string', 'optional': True, 'facet': False},
-
{'name': 'user_icon', 'type': 'string', 'optional': True, 'facet': False},
-
+
{"name": "username", "type": "string", "facet": True},
+
{
+
"name": "user_display_name",
+
"type": "string",
+
"optional": True,
+
"facet": True,
+
},
+
{
+
"name": "user_email",
+
"type": "string",
+
"optional": True,
+
"facet": False,
+
},
+
{
+
"name": "user_homepage",
+
"type": "string",
+
"optional": True,
+
"facet": False,
+
},
+
{
+
"name": "user_icon",
+
"type": "string",
+
"optional": True,
+
"facet": False,
+
},
# Author information from entries
-
{'name': 'author_name', 'type': 'string', 'optional': True, 'facet': True},
-
{'name': 'author_email', 'type': 'string', 'optional': True, 'facet': False},
-
{'name': 'author_uri', 'type': 'string', 'optional': True, 'facet': False},
+
{
+
"name": "author_name",
+
"type": "string",
+
"optional": True,
+
"facet": True,
+
},
+
{
+
"name": "author_email",
+
"type": "string",
+
"optional": True,
+
"facet": False,
+
},
+
{
+
"name": "author_uri",
+
"type": "string",
+
"optional": True,
+
"facet": False,
+
},
],
-
'default_sorting_field': 'updated',
+
"default_sorting_field": "updated",
}
def create_collection(self) -> dict[str, Any]:
···
# Try to delete existing collection first
try:
self.client.collections[self.config.collection_name].delete()
-
logger.info(f"Deleted existing collection: {self.config.collection_name}")
+
logger.info(
+
f"Deleted existing collection: {self.config.collection_name}"
+
)
except typesense.exceptions.ObjectNotFound:
-
logger.info(f"Collection {self.config.collection_name} does not exist, creating new one")
+
logger.info(
+
f"Collection {self.config.collection_name} does not exist, creating new one"
+
)
# Create new collection
schema = self.get_collection_schema()
···
document_dicts = [doc.model_dump() for doc in documents]
# Use import endpoint for batch indexing
-
result = self.client.collections[self.config.collection_name].documents.import_(
+
result = self.client.collections[
+
self.config.collection_name
+
].documents.import_(
document_dicts,
-
{'action': 'upsert'} # Update if exists, insert if not
+
{"action": "upsert"}, # Update if exists, insert if not
)
logger.info(f"Indexed {len(documents)} documents")
···
logger.error(f"Failed to index documents: {e}")
raise
-
def upload_from_git_store(self, git_store: GitStore, config: ThicketConfig) -> dict[str, Any]:
+
def upload_from_git_store(
+
self, git_store: GitStore, config: ThicketConfig
+
) -> dict[str, Any]:
"""Upload all entries from the Git store to Typesense."""
logger.info("Starting Typesense upload from Git store")
···
try:
user_dir = git_store.repo_path / user_metadata.directory
if not user_dir.exists():
-
logger.warning(f"Directory not found for user {username}: {user_dir}")
+
logger.warning(
+
f"Directory not found for user {username}: {user_dir}"
+
)
continue
entry_files = list(user_dir.glob("*.json"))
···
)
documents.append(doc)
except Exception as e:
-
logger.error(f"Failed to convert entry {entry_file} to document: {e}")
+
logger.error(
+
f"Failed to convert entry {entry_file} to document: {e}"
+
)
except Exception as e:
logger.error(f"Failed to load entries for user {username}: {e}")
···
return {}
def search(
-
self,
-
query: str,
-
search_parameters: Optional[dict[str, Any]] = None
+
self, query: str, search_parameters: Optional[dict[str, Any]] = None
) -> dict[str, Any]:
"""Search the collection."""
default_params = {
-
'q': query,
-
'query_by': 'title,searchable_content,searchable_metadata',
-
'sort_by': 'updated:desc',
-
'per_page': 20,
+
"q": query,
+
"query_by": "title,searchable_content,searchable_metadata",
+
"sort_by": "updated:desc",
+
"per_page": 20,
}
if search_parameters:
default_params.update(search_parameters)
-
return self.client.collections[self.config.collection_name].documents.search(default_params)
-
+
return self.client.collections[self.config.collection_name].documents.search(
+
default_params
+
)
+3 -1
src/thicket/models/user.py
···
homepage: Optional[str] = None
icon: Optional[str] = None
feeds: list[str] = []
-
zulip_associations: list[ZulipAssociation] = Field(default_factory=list) # Zulip server/user pairs
+
zulip_associations: list[ZulipAssociation] = Field(
+
default_factory=list
+
) # Zulip server/user pairs
directory: str # Directory name in Git store
created: datetime
last_updated: datetime
+6 -6
tests/test_bot.py
···
"""Tests for the Thicket Zulip bot."""
-
import pytest
from thicket.bots.test_bot import (
···
result = self.bot._check_initialization(message, self.handler)
assert result is False
assert len(self.handler.sent_messages) == 1
-
assert "Debug mode validation failed" in self.handler.sent_messages[0]["content"]
+
assert (
+
"Debug mode validation failed" in self.handler.sent_messages[0]["content"]
+
)
def test_debug_mode_dm_posting(self) -> None:
"""Test that debug mode posts DMs instead of stream messages."""
from unittest.mock import Mock
-
# Setup bot in debug mode
self.bot.debug_user = "testuser"
···
self.handler.config_info = {
"full_name": "Thicket Bot",
"email": "thicket-bot@example.com",
-
"site": "https://example.zulipchat.com"
+
"site": "https://example.zulipchat.com",
}
# Mock git store user
···
tester = BotTester()
# Configure stream
-
responses = tester.send_command("config stream general")
+
tester.send_command("config stream general")
tester.assert_response_contains("Stream set to")
# Configure topic
-
responses = tester.send_command("config topic test")
+
tester.send_command("config topic test")
tester.assert_response_contains("Topic set to")
def test_assert_response_contains(self) -> None:
+2 -1
tests/test_feed_parser.py
···
html_with_attrs = '<a href="https://example.com" onclick="alert()">Link</a>'
sanitized = parser._sanitize_html(html_with_attrs)
assert 'href="https://example.com"' in sanitized
-
assert 'onclick' not in sanitized
+
assert "onclick" not in sanitized
def test_extract_feed_metadata(self):
"""Test feed metadata extraction."""
···
# Test with feedparser parsed data
import feedparser
+
parsed = feedparser.parse("""<?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<title>Test Feed</title>
+7 -2
tests/test_git_store.py
···
duplicates = store.get_duplicates()
assert len(duplicates.duplicates) == 1
assert duplicates.is_duplicate("https://example.com/dup")
-
assert duplicates.get_canonical("https://example.com/dup") == "https://example.com/canonical"
+
assert (
+
duplicates.get_canonical("https://example.com/dup")
+
== "https://example.com/canonical"
+
)
# Remove duplicate
result = store.remove_duplicate("https://example.com/dup")
···
entry = AtomEntry(
id=f"https://example.com/entry/{title.lower().replace(' ', '-')}",
title=title,
-
link=HttpUrl(f"https://example.com/entry/{title.lower().replace(' ', '-')}"),
+
link=HttpUrl(
+
f"https://example.com/entry/{title.lower().replace(' ', '-')}"
+
),
updated=datetime.now(),
summary=summary,
)
+16 -18
tests/test_models.py
···
git_store=temp_dir / "git_store",
cache_dir=temp_dir / "cache",
users=[
-
UserConfig(username="testuser", feeds=["https://example.com/feed1.xml"]),
+
UserConfig(
+
username="testuser", feeds=["https://example.com/feed1.xml"]
+
),
],
)
-
result = config.add_feed_to_user("testuser", HttpUrl("https://example.com/feed2.xml"))
+
result = config.add_feed_to_user(
+
"testuser", HttpUrl("https://example.com/feed2.xml")
+
)
assert result is True
user = config.find_user("testuser")
···
assert HttpUrl("https://example.com/feed2.xml") in user.feeds
# Test adding to non-existent user
-
result = config.add_feed_to_user("nonexistent", HttpUrl("https://example.com/feed.xml"))
+
result = config.add_feed_to_user(
+
"nonexistent", HttpUrl("https://example.com/feed.xml")
+
)
assert result is False
···
user_config = metadata.to_user_config("testuser", feed_url)
assert user_config.display_name == "Test Feed" # Falls back to title
-
assert user_config.homepage == HttpUrl("https://example.com") # Falls back to link
+
assert user_config.homepage == HttpUrl(
+
"https://example.com"
+
) # Falls back to link
assert user_config.icon == HttpUrl("https://example.com/icon.png")
assert user_config.email is None
···
def test_valid_association(self):
"""Test creating valid Zulip association."""
assoc = ZulipAssociation(
-
server="example.zulipchat.com",
-
user_id="alice@example.com"
+
server="example.zulipchat.com", user_id="alice@example.com"
)
assert assoc.server == "example.zulipchat.com"
···
def test_association_hash(self):
"""Test that associations are hashable."""
-
assoc1 = ZulipAssociation(
-
server="example.zulipchat.com",
-
user_id="alice"
-
)
-
assoc2 = ZulipAssociation(
-
server="example.zulipchat.com",
-
user_id="alice"
-
)
-
assoc3 = ZulipAssociation(
-
server="other.zulipchat.com",
-
user_id="alice"
-
)
+
assoc1 = ZulipAssociation(server="example.zulipchat.com", user_id="alice")
+
assoc2 = ZulipAssociation(server="example.zulipchat.com", user_id="alice")
+
assoc3 = ZulipAssociation(server="other.zulipchat.com", user_id="alice")
# Same associations should have same hash
assert hash(assoc1) == hash(assoc2)