A community based topic aggregation platform built on atproto
1"""
2Coves API Client for posting to communities.
3
4Handles authentication and posting via XRPC.
5"""
6import logging
7import requests
8from typing import Dict, List, Optional
9from atproto import Client
10
11logger = logging.getLogger(__name__)
12
13
14class CovesClient:
15 """
16 Client for posting to Coves communities via XRPC.
17
18 Handles:
19 - Authentication with aggregator credentials
20 - Creating posts in communities (social.coves.community.post.create)
21 - External embed formatting
22 """
23
24 def __init__(self, api_url: str, handle: str, password: str, pds_url: Optional[str] = None):
25 """
26 Initialize Coves client.
27
28 Args:
29 api_url: Coves AppView URL for posting (e.g., "http://localhost:8081")
30 handle: Aggregator handle (e.g., "kagi-news.coves.social")
31 password: Aggregator password/app password
32 pds_url: Optional PDS URL for authentication (defaults to api_url)
33 """
34 self.api_url = api_url
35 self.pds_url = pds_url or api_url # Auth through PDS, post through AppView
36 self.handle = handle
37 self.password = password
38 self.client = Client(base_url=self.pds_url) # Use PDS for auth
39 self._authenticated = False
40
41 def authenticate(self):
42 """
43 Authenticate with Coves API.
44
45 Uses com.atproto.server.createSession directly to avoid
46 Bluesky-specific endpoints that don't exist on Coves PDS.
47
48 Raises:
49 Exception: If authentication fails
50 """
51 try:
52 logger.info(f"Authenticating as {self.handle}")
53
54 # Use createSession directly (avoid app.bsky.actor.getProfile)
55 session = self.client.com.atproto.server.create_session(
56 {"identifier": self.handle, "password": self.password}
57 )
58
59 # Manually set session (skip profile fetch)
60 self.client._session = session
61 self._authenticated = True
62 self.did = session.did
63
64 logger.info(f"Authentication successful (DID: {self.did})")
65 except Exception as e:
66 logger.error(f"Authentication failed: {e}")
67 raise
68
69 def create_post(
70 self,
71 community_handle: str,
72 content: str,
73 facets: List[Dict],
74 title: Optional[str] = None,
75 embed: Optional[Dict] = None,
76 thumbnail_url: Optional[str] = None
77 ) -> str:
78 """
79 Create a post in a community.
80
81 Args:
82 community_handle: Community handle (e.g., "world-news.coves.social")
83 content: Post content (rich text)
84 facets: Rich text facets (formatting, links)
85 title: Optional post title
86 embed: Optional external embed
87 thumbnail_url: Optional thumbnail URL (for trusted aggregators only)
88
89 Returns:
90 AT Proto URI of created post (e.g., "at://did:plc:.../social.coves.post/...")
91
92 Raises:
93 Exception: If post creation fails
94 """
95 if not self._authenticated:
96 self.authenticate()
97
98 try:
99 # Prepare post data for social.coves.community.post.create endpoint
100 post_data = {
101 "community": community_handle,
102 "content": content,
103 "facets": facets
104 }
105
106 # Add title if provided
107 if title:
108 post_data["title"] = title
109
110 # Add embed if provided
111 if embed:
112 post_data["embed"] = embed
113
114 # Add thumbnail URL at top level if provided (for trusted aggregators)
115 if thumbnail_url:
116 post_data["thumbnailUrl"] = thumbnail_url
117
118 # Use Coves-specific endpoint (not direct PDS write)
119 # This provides validation, authorization, and business logic
120 logger.info(f"Creating post in community: {community_handle}")
121
122 # Make direct HTTP request to XRPC endpoint
123 url = f"{self.api_url}/xrpc/social.coves.community.post.create"
124 headers = {
125 "Authorization": f"Bearer {self.client._session.access_jwt}",
126 "Content-Type": "application/json"
127 }
128
129 response = requests.post(url, json=post_data, headers=headers, timeout=30)
130
131 # Log detailed error if request fails
132 if not response.ok:
133 error_body = response.text
134 logger.error(f"Post creation failed ({response.status_code}): {error_body}")
135 response.raise_for_status()
136
137 result = response.json()
138 post_uri = result["uri"]
139 logger.info(f"Post created: {post_uri}")
140 return post_uri
141
142 except Exception as e:
143 logger.error(f"Failed to create post: {e}")
144 raise
145
146 def create_external_embed(
147 self,
148 uri: str,
149 title: str,
150 description: str
151 ) -> Dict:
152 """
153 Create external embed object for hot-linked content.
154
155 Args:
156 uri: URL of the external content
157 title: Title of the content
158 description: Description/summary
159
160 Returns:
161 Embed dictionary ready for post creation
162 """
163 return {
164 "$type": "social.coves.embed.external",
165 "external": {
166 "uri": uri,
167 "title": title,
168 "description": description
169 }
170 }
171
172 def _get_timestamp(self) -> str:
173 """
174 Get current timestamp in ISO 8601 format.
175
176 Returns:
177 ISO timestamp string
178 """
179 from datetime import datetime, timezone
180 return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")