A community based topic aggregation platform built on atproto
1package votes
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "strings"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/auth/oauth"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16
17 oauthclient "Coves/internal/atproto/oauth"
18)
19
20// voteService implements the Service interface for vote operations
21type voteService struct {
22 repo Repository
23 oauthClient *oauthclient.OAuthClient
24 oauthStore oauth.ClientAuthStore
25 logger *slog.Logger
26}
27
28// NewService creates a new vote service instance
29func NewService(repo Repository, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, logger *slog.Logger) Service {
30 if logger == nil {
31 logger = slog.Default()
32 }
33 return &voteService{
34 repo: repo,
35 oauthClient: oauthClient,
36 oauthStore: oauthStore,
37 logger: logger,
38 }
39}
40
41// CreateVote creates a new vote or toggles off an existing vote
42// Implements the toggle behavior:
43// - No existing vote → Create new vote with given direction
44// - Vote exists with same direction → Delete vote (toggle off)
45// - Vote exists with different direction → Update to new direction
46func (s *voteService) CreateVote(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (*CreateVoteResponse, error) {
47 // Validate direction
48 if req.Direction != "up" && req.Direction != "down" {
49 return nil, ErrInvalidDirection
50 }
51
52 // Validate subject URI format
53 if req.Subject.URI == "" {
54 return nil, ErrInvalidSubject
55 }
56 if !strings.HasPrefix(req.Subject.URI, "at://") {
57 return nil, ErrInvalidSubject
58 }
59
60 // Validate subject CID is provided
61 if req.Subject.CID == "" {
62 return nil, ErrInvalidSubject
63 }
64
65 // Check for existing vote by querying PDS directly (source of truth)
66 // This avoids eventual consistency issues with the AppView database
67 existing, err := s.getVoteFromPDS(ctx, session, req.Subject.URI)
68 if err != nil {
69 s.logger.Error("failed to check existing vote on PDS",
70 "error", err,
71 "voter", session.AccountDID,
72 "subject", req.Subject.URI)
73 return nil, fmt.Errorf("failed to check existing vote: %w", err)
74 }
75
76 // Toggle logic
77 if existing != nil {
78 // Vote exists - check if same direction
79 if existing.Direction == req.Direction {
80 // Same direction - toggle off (delete)
81 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil {
82 s.logger.Error("failed to delete vote on PDS",
83 "error", err,
84 "voter", session.AccountDID,
85 "rkey", existing.RKey)
86 return nil, fmt.Errorf("failed to delete vote: %w", err)
87 }
88
89 s.logger.Info("vote toggled off",
90 "voter", session.AccountDID,
91 "subject", req.Subject.URI,
92 "direction", req.Direction)
93
94 // Return empty response to indicate deletion
95 return &CreateVoteResponse{
96 URI: "",
97 CID: "",
98 }, nil
99 }
100
101 // Different direction - delete old vote first, then create new one
102 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil {
103 s.logger.Error("failed to delete existing vote on PDS",
104 "error", err,
105 "voter", session.AccountDID,
106 "rkey", existing.RKey)
107 return nil, fmt.Errorf("failed to delete existing vote: %w", err)
108 }
109
110 s.logger.Info("deleted existing vote before creating new direction",
111 "voter", session.AccountDID,
112 "subject", req.Subject.URI,
113 "old_direction", existing.Direction,
114 "new_direction", req.Direction)
115 }
116
117 // Create new vote
118 uri, cid, err := s.createVoteRecord(ctx, session, req)
119 if err != nil {
120 s.logger.Error("failed to create vote on PDS",
121 "error", err,
122 "voter", session.AccountDID,
123 "subject", req.Subject.URI,
124 "direction", req.Direction)
125 return nil, fmt.Errorf("failed to create vote: %w", err)
126 }
127
128 s.logger.Info("vote created",
129 "voter", session.AccountDID,
130 "subject", req.Subject.URI,
131 "direction", req.Direction,
132 "uri", uri,
133 "cid", cid)
134
135 return &CreateVoteResponse{
136 URI: uri,
137 CID: cid,
138 }, nil
139}
140
141// DeleteVote removes a vote on the specified subject
142func (s *voteService) DeleteVote(ctx context.Context, session *oauth.ClientSessionData, req DeleteVoteRequest) error {
143 // Validate subject URI format
144 if req.Subject.URI == "" {
145 return ErrInvalidSubject
146 }
147 if !strings.HasPrefix(req.Subject.URI, "at://") {
148 return ErrInvalidSubject
149 }
150
151 // Find existing vote by querying PDS directly (source of truth)
152 // This avoids eventual consistency issues with the AppView database
153 existing, err := s.getVoteFromPDS(ctx, session, req.Subject.URI)
154 if err != nil {
155 s.logger.Error("failed to find vote on PDS",
156 "error", err,
157 "voter", session.AccountDID,
158 "subject", req.Subject.URI)
159 return fmt.Errorf("failed to find vote: %w", err)
160 }
161 if existing == nil {
162 return ErrVoteNotFound
163 }
164
165 // Delete the vote record from user's PDS
166 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil {
167 s.logger.Error("failed to delete vote on PDS",
168 "error", err,
169 "voter", session.AccountDID,
170 "rkey", existing.RKey)
171 return fmt.Errorf("failed to delete vote: %w", err)
172 }
173
174 s.logger.Info("vote deleted",
175 "voter", session.AccountDID,
176 "subject", req.Subject.URI,
177 "uri", existing.URI)
178
179 return nil
180}
181
182// createVoteRecord writes a vote record to the user's PDS
183func (s *voteService) createVoteRecord(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (string, string, error) {
184 // Generate TID for the record key
185 tid := syntax.NewTIDNow(0)
186
187 // Build vote record following the lexicon schema
188 record := VoteRecord{
189 Type: "social.coves.feed.vote",
190 Subject: StrongRef{
191 URI: req.Subject.URI,
192 CID: req.Subject.CID,
193 },
194 Direction: req.Direction,
195 CreatedAt: time.Now().UTC().Format(time.RFC3339),
196 }
197
198 // Call com.atproto.repo.createRecord on the user's PDS
199 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(session.HostURL, "/"))
200
201 payload := map[string]interface{}{
202 "repo": session.AccountDID.String(),
203 "collection": "social.coves.feed.vote",
204 "rkey": tid.String(),
205 "record": record,
206 }
207
208 uri, cid, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, session.AccessToken)
209 if err != nil {
210 return "", "", err
211 }
212
213 return uri, cid, nil
214}
215
216// getVoteFromPDS queries the user's PDS directly to find an existing vote for a subject.
217// This avoids eventual consistency issues with the AppView database populated by Jetstream.
218// Paginates through all vote records to handle users with >100 votes.
219// Returns the vote record with rkey, or nil if no vote exists for the subject.
220func (s *voteService) getVoteFromPDS(ctx context.Context, session *oauth.ClientSessionData, subjectURI string) (*existingVote, error) {
221 baseURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=social.coves.feed.vote&limit=100",
222 strings.TrimSuffix(session.HostURL, "/"),
223 session.AccountDID.String())
224
225 client := &http.Client{Timeout: 10 * time.Second}
226 cursor := ""
227
228 // Paginate through all vote records
229 for {
230 endpoint := baseURL
231 if cursor != "" {
232 endpoint = fmt.Sprintf("%s&cursor=%s", baseURL, cursor)
233 }
234
235 req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil)
236 if err != nil {
237 return nil, fmt.Errorf("failed to create request: %w", err)
238 }
239 req.Header.Set("Authorization", "Bearer "+session.AccessToken)
240
241 resp, err := client.Do(req)
242 if err != nil {
243 return nil, fmt.Errorf("failed to call PDS: %w", err)
244 }
245
246 body, err := io.ReadAll(resp.Body)
247 closeErr := resp.Body.Close()
248 if closeErr != nil {
249 s.logger.Warn("failed to close response body", "error", closeErr)
250 }
251 if err != nil {
252 return nil, fmt.Errorf("failed to read response: %w", err)
253 }
254
255 // Handle auth errors - map to ErrNotAuthorized per lexicon
256 if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
257 s.logger.Warn("PDS auth failure",
258 "status", resp.StatusCode,
259 "did", session.AccountDID)
260 return nil, ErrNotAuthorized
261 }
262
263 if resp.StatusCode != http.StatusOK {
264 return nil, fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
265 }
266
267 // Parse the listRecords response
268 var result struct {
269 Records []struct {
270 URI string `json:"uri"`
271 CID string `json:"cid"`
272 Value struct {
273 Type string `json:"$type"`
274 Subject struct {
275 URI string `json:"uri"`
276 CID string `json:"cid"`
277 } `json:"subject"`
278 Direction string `json:"direction"`
279 CreatedAt string `json:"createdAt"`
280 } `json:"value"`
281 } `json:"records"`
282 Cursor string `json:"cursor"`
283 }
284
285 if err := json.Unmarshal(body, &result); err != nil {
286 return nil, fmt.Errorf("failed to parse PDS response: %w", err)
287 }
288
289 // Search for the vote matching our subject in this page
290 for _, rec := range result.Records {
291 if rec.Value.Subject.URI == subjectURI {
292 // Extract rkey from the URI (at://did/collection/rkey)
293 parts := strings.Split(rec.URI, "/")
294 if len(parts) < 5 {
295 continue
296 }
297 rkey := parts[len(parts)-1]
298
299 return &existingVote{
300 URI: rec.URI,
301 CID: rec.CID,
302 RKey: rkey,
303 Direction: rec.Value.Direction,
304 }, nil
305 }
306 }
307
308 // Check if there are more pages
309 if result.Cursor == "" {
310 break // No more pages
311 }
312 cursor = result.Cursor
313 }
314
315 // No vote found for this subject after checking all pages
316 return nil, nil
317}
318
319// existingVote represents a vote record found on the PDS
320type existingVote struct {
321 URI string
322 CID string
323 RKey string
324 Direction string
325}
326
327// deleteVoteRecord removes a vote record from the user's PDS
328func (s *voteService) deleteVoteRecord(ctx context.Context, session *oauth.ClientSessionData, rkey string) error {
329 // Call com.atproto.repo.deleteRecord on the user's PDS
330 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(session.HostURL, "/"))
331
332 payload := map[string]interface{}{
333 "repo": session.AccountDID.String(),
334 "collection": "social.coves.feed.vote",
335 "rkey": rkey,
336 }
337
338 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, session.AccessToken)
339 return err
340}
341
342// callPDSWithAuth makes an authenticated HTTP call to the PDS
343// Returns URI and CID from the response (for create/update operations)
344func (s *voteService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) {
345 jsonData, err := json.Marshal(payload)
346 if err != nil {
347 return "", "", fmt.Errorf("failed to marshal payload: %w", err)
348 }
349
350 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData))
351 if err != nil {
352 return "", "", fmt.Errorf("failed to create request: %w", err)
353 }
354 req.Header.Set("Content-Type", "application/json")
355
356 // Add OAuth bearer token for authentication
357 if accessToken != "" {
358 req.Header.Set("Authorization", "Bearer "+accessToken)
359 }
360
361 // Set reasonable timeout for PDS operations
362 timeout := 10 * time.Second
363 if strings.Contains(endpoint, "createRecord") || strings.Contains(endpoint, "putRecord") {
364 timeout = 15 * time.Second // Slightly longer for write operations
365 }
366
367 client := &http.Client{Timeout: timeout}
368 resp, err := client.Do(req)
369 if err != nil {
370 return "", "", fmt.Errorf("failed to call PDS: %w", err)
371 }
372 defer func() {
373 if closeErr := resp.Body.Close(); closeErr != nil {
374 s.logger.Warn("failed to close response body", "error", closeErr)
375 }
376 }()
377
378 body, err := io.ReadAll(resp.Body)
379 if err != nil {
380 return "", "", fmt.Errorf("failed to read response: %w", err)
381 }
382
383 // Handle auth errors - map to ErrNotAuthorized per lexicon
384 if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
385 s.logger.Warn("PDS auth failure during write operation",
386 "status", resp.StatusCode,
387 "endpoint", endpoint)
388 return "", "", ErrNotAuthorized
389 }
390
391 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
392 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
393 }
394
395 // Parse response to extract URI and CID (for create/update operations)
396 var result struct {
397 URI string `json:"uri"`
398 CID string `json:"cid"`
399 }
400 if err := json.Unmarshal(body, &result); err != nil {
401 // For delete operations, there might not be a response body with URI/CID
402 if method == "POST" && strings.Contains(endpoint, "deleteRecord") {
403 return "", "", nil
404 }
405 return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
406 }
407
408 return result.URI, result.CID, nil
409}