A community based topic aggregation platform built on atproto
1package votes
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "strings"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/auth/oauth"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16
17 oauthclient "Coves/internal/atproto/oauth"
18)
19
20// voteService implements the Service interface for vote operations
21type voteService struct {
22 repo Repository
23 oauthClient *oauthclient.OAuthClient
24 oauthStore oauth.ClientAuthStore
25 logger *slog.Logger
26}
27
28// NewService creates a new vote service instance
29func NewService(repo Repository, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, logger *slog.Logger) Service {
30 if logger == nil {
31 logger = slog.Default()
32 }
33 return &voteService{
34 repo: repo,
35 oauthClient: oauthClient,
36 oauthStore: oauthStore,
37 logger: logger,
38 }
39}
40
41// CreateVote creates a new vote or toggles off an existing vote
42// Implements the toggle behavior:
43// - No existing vote → Create new vote with given direction
44// - Vote exists with same direction → Delete vote (toggle off)
45// - Vote exists with different direction → Update to new direction
46func (s *voteService) CreateVote(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (*CreateVoteResponse, error) {
47 // Validate direction
48 if req.Direction != "up" && req.Direction != "down" {
49 return nil, ErrInvalidDirection
50 }
51
52 // Validate subject URI format
53 if req.Subject.URI == "" {
54 return nil, ErrInvalidSubject
55 }
56 if !strings.HasPrefix(req.Subject.URI, "at://") {
57 return nil, ErrInvalidSubject
58 }
59
60 // Validate subject CID is provided
61 if req.Subject.CID == "" {
62 return nil, ErrInvalidSubject
63 }
64
65 // Note: We intentionally don't validate subject existence here.
66 // The vote record goes to the user's PDS regardless. The Jetstream consumer
67 // handles orphaned votes correctly by only updating counts for non-deleted subjects.
68 // This avoids race conditions and eventual consistency issues.
69
70 // Check for existing vote by querying PDS directly (source of truth)
71 // This avoids eventual consistency issues with the AppView database
72 existing, err := s.getVoteFromPDS(ctx, session, req.Subject.URI)
73 if err != nil {
74 s.logger.Error("failed to check existing vote on PDS",
75 "error", err,
76 "voter", session.AccountDID,
77 "subject", req.Subject.URI)
78 return nil, fmt.Errorf("failed to check existing vote: %w", err)
79 }
80
81 // Toggle logic
82 if existing != nil {
83 // Vote exists - check if same direction
84 if existing.Direction == req.Direction {
85 // Same direction - toggle off (delete)
86 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil {
87 s.logger.Error("failed to delete vote on PDS",
88 "error", err,
89 "voter", session.AccountDID,
90 "rkey", existing.RKey)
91 return nil, fmt.Errorf("failed to delete vote: %w", err)
92 }
93
94 s.logger.Info("vote toggled off",
95 "voter", session.AccountDID,
96 "subject", req.Subject.URI,
97 "direction", req.Direction)
98
99 // Return empty response to indicate deletion
100 return &CreateVoteResponse{
101 URI: "",
102 CID: "",
103 }, nil
104 }
105
106 // Different direction - delete old vote first, then create new one
107 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil {
108 s.logger.Error("failed to delete existing vote on PDS",
109 "error", err,
110 "voter", session.AccountDID,
111 "rkey", existing.RKey)
112 return nil, fmt.Errorf("failed to delete existing vote: %w", err)
113 }
114
115 s.logger.Info("deleted existing vote before creating new direction",
116 "voter", session.AccountDID,
117 "subject", req.Subject.URI,
118 "old_direction", existing.Direction,
119 "new_direction", req.Direction)
120 }
121
122 // Create new vote
123 uri, cid, err := s.createVoteRecord(ctx, session, req)
124 if err != nil {
125 s.logger.Error("failed to create vote on PDS",
126 "error", err,
127 "voter", session.AccountDID,
128 "subject", req.Subject.URI,
129 "direction", req.Direction)
130 return nil, fmt.Errorf("failed to create vote: %w", err)
131 }
132
133 s.logger.Info("vote created",
134 "voter", session.AccountDID,
135 "subject", req.Subject.URI,
136 "direction", req.Direction,
137 "uri", uri,
138 "cid", cid)
139
140 return &CreateVoteResponse{
141 URI: uri,
142 CID: cid,
143 }, nil
144}
145
146// DeleteVote removes a vote on the specified subject
147func (s *voteService) DeleteVote(ctx context.Context, session *oauth.ClientSessionData, req DeleteVoteRequest) error {
148 // Validate subject URI format
149 if req.Subject.URI == "" {
150 return ErrInvalidSubject
151 }
152 if !strings.HasPrefix(req.Subject.URI, "at://") {
153 return ErrInvalidSubject
154 }
155
156 // Find existing vote by querying PDS directly (source of truth)
157 // This avoids eventual consistency issues with the AppView database
158 existing, err := s.getVoteFromPDS(ctx, session, req.Subject.URI)
159 if err != nil {
160 s.logger.Error("failed to find vote on PDS",
161 "error", err,
162 "voter", session.AccountDID,
163 "subject", req.Subject.URI)
164 return fmt.Errorf("failed to find vote: %w", err)
165 }
166 if existing == nil {
167 return ErrVoteNotFound
168 }
169
170 // Delete the vote record from user's PDS
171 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil {
172 s.logger.Error("failed to delete vote on PDS",
173 "error", err,
174 "voter", session.AccountDID,
175 "rkey", existing.RKey)
176 return fmt.Errorf("failed to delete vote: %w", err)
177 }
178
179 s.logger.Info("vote deleted",
180 "voter", session.AccountDID,
181 "subject", req.Subject.URI,
182 "uri", existing.URI)
183
184 return nil
185}
186
187// createVoteRecord writes a vote record to the user's PDS
188func (s *voteService) createVoteRecord(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (string, string, error) {
189 // Generate TID for the record key
190 tid := syntax.NewTIDNow(0)
191
192 // Build vote record following the lexicon schema
193 record := VoteRecord{
194 Type: "social.coves.feed.vote",
195 Subject: StrongRef{
196 URI: req.Subject.URI,
197 CID: req.Subject.CID,
198 },
199 Direction: req.Direction,
200 CreatedAt: time.Now().UTC().Format(time.RFC3339),
201 }
202
203 // Call com.atproto.repo.createRecord on the user's PDS
204 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(session.HostURL, "/"))
205
206 payload := map[string]interface{}{
207 "repo": session.AccountDID.String(),
208 "collection": "social.coves.feed.vote",
209 "rkey": tid.String(),
210 "record": record,
211 }
212
213 uri, cid, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, session.AccessToken)
214 if err != nil {
215 return "", "", err
216 }
217
218 return uri, cid, nil
219}
220
221// getVoteFromPDS queries the user's PDS directly to find an existing vote for a subject.
222// This avoids eventual consistency issues with the AppView database populated by Jetstream.
223// Paginates through all vote records to handle users with >100 votes.
224// Returns the vote record with rkey, or nil if no vote exists for the subject.
225func (s *voteService) getVoteFromPDS(ctx context.Context, session *oauth.ClientSessionData, subjectURI string) (*existingVote, error) {
226 baseURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=social.coves.feed.vote&limit=100",
227 strings.TrimSuffix(session.HostURL, "/"),
228 session.AccountDID.String())
229
230 client := &http.Client{Timeout: 10 * time.Second}
231 cursor := ""
232
233 // Paginate through all vote records
234 for {
235 endpoint := baseURL
236 if cursor != "" {
237 endpoint = fmt.Sprintf("%s&cursor=%s", baseURL, cursor)
238 }
239
240 req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil)
241 if err != nil {
242 return nil, fmt.Errorf("failed to create request: %w", err)
243 }
244 req.Header.Set("Authorization", "Bearer "+session.AccessToken)
245
246 resp, err := client.Do(req)
247 if err != nil {
248 return nil, fmt.Errorf("failed to call PDS: %w", err)
249 }
250
251 body, err := io.ReadAll(resp.Body)
252 closeErr := resp.Body.Close()
253 if closeErr != nil {
254 s.logger.Warn("failed to close response body", "error", closeErr)
255 }
256 if err != nil {
257 return nil, fmt.Errorf("failed to read response: %w", err)
258 }
259
260 // Handle auth errors - map to ErrNotAuthorized per lexicon
261 if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
262 s.logger.Warn("PDS auth failure",
263 "status", resp.StatusCode,
264 "did", session.AccountDID)
265 return nil, ErrNotAuthorized
266 }
267
268 if resp.StatusCode != http.StatusOK {
269 return nil, fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
270 }
271
272 // Parse the listRecords response
273 var result struct {
274 Cursor string `json:"cursor"`
275 Records []struct {
276 URI string `json:"uri"`
277 CID string `json:"cid"`
278 Value struct {
279 Type string `json:"$type"`
280 Subject struct {
281 URI string `json:"uri"`
282 CID string `json:"cid"`
283 } `json:"subject"`
284 Direction string `json:"direction"`
285 CreatedAt string `json:"createdAt"`
286 } `json:"value"`
287 } `json:"records"`
288 }
289
290 if err := json.Unmarshal(body, &result); err != nil {
291 return nil, fmt.Errorf("failed to parse PDS response: %w", err)
292 }
293
294 // Search for the vote matching our subject in this page
295 for _, rec := range result.Records {
296 if rec.Value.Subject.URI == subjectURI {
297 // Extract rkey from the URI (at://did/collection/rkey)
298 parts := strings.Split(rec.URI, "/")
299 if len(parts) < 5 {
300 continue
301 }
302 rkey := parts[len(parts)-1]
303
304 return &existingVote{
305 URI: rec.URI,
306 CID: rec.CID,
307 RKey: rkey,
308 Direction: rec.Value.Direction,
309 }, nil
310 }
311 }
312
313 // Check if there are more pages
314 if result.Cursor == "" {
315 break // No more pages
316 }
317 cursor = result.Cursor
318 }
319
320 // No vote found for this subject after checking all pages
321 return nil, nil
322}
323
324// existingVote represents a vote record found on the PDS
325type existingVote struct {
326 URI string
327 CID string
328 RKey string
329 Direction string
330}
331
332// deleteVoteRecord removes a vote record from the user's PDS
333func (s *voteService) deleteVoteRecord(ctx context.Context, session *oauth.ClientSessionData, rkey string) error {
334 // Call com.atproto.repo.deleteRecord on the user's PDS
335 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(session.HostURL, "/"))
336
337 payload := map[string]interface{}{
338 "repo": session.AccountDID.String(),
339 "collection": "social.coves.feed.vote",
340 "rkey": rkey,
341 }
342
343 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, session.AccessToken)
344 return err
345}
346
347// callPDSWithAuth makes an authenticated HTTP call to the PDS
348// Returns URI and CID from the response (for create/update operations)
349func (s *voteService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) {
350 jsonData, err := json.Marshal(payload)
351 if err != nil {
352 return "", "", fmt.Errorf("failed to marshal payload: %w", err)
353 }
354
355 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData))
356 if err != nil {
357 return "", "", fmt.Errorf("failed to create request: %w", err)
358 }
359 req.Header.Set("Content-Type", "application/json")
360
361 // Add OAuth bearer token for authentication
362 if accessToken != "" {
363 req.Header.Set("Authorization", "Bearer "+accessToken)
364 }
365
366 // Set reasonable timeout for PDS operations
367 timeout := 10 * time.Second
368 if strings.Contains(endpoint, "createRecord") || strings.Contains(endpoint, "putRecord") {
369 timeout = 15 * time.Second // Slightly longer for write operations
370 }
371
372 client := &http.Client{Timeout: timeout}
373 resp, err := client.Do(req)
374 if err != nil {
375 return "", "", fmt.Errorf("failed to call PDS: %w", err)
376 }
377 defer func() {
378 if closeErr := resp.Body.Close(); closeErr != nil {
379 s.logger.Warn("failed to close response body", "error", closeErr)
380 }
381 }()
382
383 body, err := io.ReadAll(resp.Body)
384 if err != nil {
385 return "", "", fmt.Errorf("failed to read response: %w", err)
386 }
387
388 // Handle auth errors - map to ErrNotAuthorized per lexicon
389 if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
390 s.logger.Warn("PDS auth failure during write operation",
391 "status", resp.StatusCode,
392 "endpoint", endpoint)
393 return "", "", ErrNotAuthorized
394 }
395
396 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
397 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
398 }
399
400 // Parse response to extract URI and CID (for create/update operations)
401 var result struct {
402 URI string `json:"uri"`
403 CID string `json:"cid"`
404 }
405 if err := json.Unmarshal(body, &result); err != nil {
406 // For delete operations, there might not be a response body with URI/CID
407 if method == "POST" && strings.Contains(endpoint, "deleteRecord") {
408 return "", "", nil
409 }
410 return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
411 }
412
413 return result.URI, result.CID, nil
414}