A community based topic aggregation platform built on atproto
1package votes
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "strings"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/auth/oauth"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16
17 oauthclient "Coves/internal/atproto/oauth"
18)
19
20// voteService implements the Service interface for vote operations
21type voteService struct {
22 repo Repository
23 subjectValidator SubjectValidator
24 oauthClient *oauthclient.OAuthClient
25 oauthStore oauth.ClientAuthStore
26 logger *slog.Logger
27}
28
29// NewService creates a new vote service instance
30// subjectValidator can be nil to skip subject existence checks (not recommended for production)
31func NewService(repo Repository, subjectValidator SubjectValidator, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, logger *slog.Logger) Service {
32 if logger == nil {
33 logger = slog.Default()
34 }
35 return &voteService{
36 repo: repo,
37 subjectValidator: subjectValidator,
38 oauthClient: oauthClient,
39 oauthStore: oauthStore,
40 logger: logger,
41 }
42}
43
44// CreateVote creates a new vote or toggles off an existing vote
45// Implements the toggle behavior:
46// - No existing vote → Create new vote with given direction
47// - Vote exists with same direction → Delete vote (toggle off)
48// - Vote exists with different direction → Update to new direction
49func (s *voteService) CreateVote(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (*CreateVoteResponse, error) {
50 // Validate direction
51 if req.Direction != "up" && req.Direction != "down" {
52 return nil, ErrInvalidDirection
53 }
54
55 // Validate subject URI format
56 if req.Subject.URI == "" {
57 return nil, ErrInvalidSubject
58 }
59 if !strings.HasPrefix(req.Subject.URI, "at://") {
60 return nil, ErrInvalidSubject
61 }
62
63 // Validate subject CID is provided
64 if req.Subject.CID == "" {
65 return nil, ErrInvalidSubject
66 }
67
68 // Validate subject exists in AppView (post or comment)
69 // This prevents creating votes on non-existent content
70 if s.subjectValidator != nil {
71 exists, err := s.subjectValidator.SubjectExists(ctx, req.Subject.URI)
72 if err != nil {
73 s.logger.Error("failed to validate subject existence",
74 "error", err,
75 "subject", req.Subject.URI)
76 return nil, fmt.Errorf("failed to validate subject: %w", err)
77 }
78 if !exists {
79 return nil, ErrSubjectNotFound
80 }
81 }
82
83 // Check for existing vote by querying PDS directly (source of truth)
84 // This avoids eventual consistency issues with the AppView database
85 existing, err := s.getVoteFromPDS(ctx, session, req.Subject.URI)
86 if err != nil {
87 s.logger.Error("failed to check existing vote on PDS",
88 "error", err,
89 "voter", session.AccountDID,
90 "subject", req.Subject.URI)
91 return nil, fmt.Errorf("failed to check existing vote: %w", err)
92 }
93
94 // Toggle logic
95 if existing != nil {
96 // Vote exists - check if same direction
97 if existing.Direction == req.Direction {
98 // Same direction - toggle off (delete)
99 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil {
100 s.logger.Error("failed to delete vote on PDS",
101 "error", err,
102 "voter", session.AccountDID,
103 "rkey", existing.RKey)
104 return nil, fmt.Errorf("failed to delete vote: %w", err)
105 }
106
107 s.logger.Info("vote toggled off",
108 "voter", session.AccountDID,
109 "subject", req.Subject.URI,
110 "direction", req.Direction)
111
112 // Return empty response to indicate deletion
113 return &CreateVoteResponse{
114 URI: "",
115 CID: "",
116 }, nil
117 }
118
119 // Different direction - delete old vote first, then create new one
120 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil {
121 s.logger.Error("failed to delete existing vote on PDS",
122 "error", err,
123 "voter", session.AccountDID,
124 "rkey", existing.RKey)
125 return nil, fmt.Errorf("failed to delete existing vote: %w", err)
126 }
127
128 s.logger.Info("deleted existing vote before creating new direction",
129 "voter", session.AccountDID,
130 "subject", req.Subject.URI,
131 "old_direction", existing.Direction,
132 "new_direction", req.Direction)
133 }
134
135 // Create new vote
136 uri, cid, err := s.createVoteRecord(ctx, session, req)
137 if err != nil {
138 s.logger.Error("failed to create vote on PDS",
139 "error", err,
140 "voter", session.AccountDID,
141 "subject", req.Subject.URI,
142 "direction", req.Direction)
143 return nil, fmt.Errorf("failed to create vote: %w", err)
144 }
145
146 s.logger.Info("vote created",
147 "voter", session.AccountDID,
148 "subject", req.Subject.URI,
149 "direction", req.Direction,
150 "uri", uri,
151 "cid", cid)
152
153 return &CreateVoteResponse{
154 URI: uri,
155 CID: cid,
156 }, nil
157}
158
159// DeleteVote removes a vote on the specified subject
160func (s *voteService) DeleteVote(ctx context.Context, session *oauth.ClientSessionData, req DeleteVoteRequest) error {
161 // Validate subject URI format
162 if req.Subject.URI == "" {
163 return ErrInvalidSubject
164 }
165 if !strings.HasPrefix(req.Subject.URI, "at://") {
166 return ErrInvalidSubject
167 }
168
169 // Find existing vote by querying PDS directly (source of truth)
170 // This avoids eventual consistency issues with the AppView database
171 existing, err := s.getVoteFromPDS(ctx, session, req.Subject.URI)
172 if err != nil {
173 s.logger.Error("failed to find vote on PDS",
174 "error", err,
175 "voter", session.AccountDID,
176 "subject", req.Subject.URI)
177 return fmt.Errorf("failed to find vote: %w", err)
178 }
179 if existing == nil {
180 return ErrVoteNotFound
181 }
182
183 // Delete the vote record from user's PDS
184 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil {
185 s.logger.Error("failed to delete vote on PDS",
186 "error", err,
187 "voter", session.AccountDID,
188 "rkey", existing.RKey)
189 return fmt.Errorf("failed to delete vote: %w", err)
190 }
191
192 s.logger.Info("vote deleted",
193 "voter", session.AccountDID,
194 "subject", req.Subject.URI,
195 "uri", existing.URI)
196
197 return nil
198}
199
200// createVoteRecord writes a vote record to the user's PDS
201func (s *voteService) createVoteRecord(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (string, string, error) {
202 // Generate TID for the record key
203 tid := syntax.NewTIDNow(0)
204
205 // Build vote record following the lexicon schema
206 record := VoteRecord{
207 Type: "social.coves.feed.vote",
208 Subject: StrongRef{
209 URI: req.Subject.URI,
210 CID: req.Subject.CID,
211 },
212 Direction: req.Direction,
213 CreatedAt: time.Now().UTC().Format(time.RFC3339),
214 }
215
216 // Call com.atproto.repo.createRecord on the user's PDS
217 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(session.HostURL, "/"))
218
219 payload := map[string]interface{}{
220 "repo": session.AccountDID.String(),
221 "collection": "social.coves.feed.vote",
222 "rkey": tid.String(),
223 "record": record,
224 }
225
226 uri, cid, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, session.AccessToken)
227 if err != nil {
228 return "", "", err
229 }
230
231 return uri, cid, nil
232}
233
234// getVoteFromPDS queries the user's PDS directly to find an existing vote for a subject.
235// This avoids eventual consistency issues with the AppView database populated by Jetstream.
236// Paginates through all vote records to handle users with >100 votes.
237// Returns the vote record with rkey, or nil if no vote exists for the subject.
238func (s *voteService) getVoteFromPDS(ctx context.Context, session *oauth.ClientSessionData, subjectURI string) (*existingVote, error) {
239 baseURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=social.coves.feed.vote&limit=100",
240 strings.TrimSuffix(session.HostURL, "/"),
241 session.AccountDID.String())
242
243 client := &http.Client{Timeout: 10 * time.Second}
244 cursor := ""
245
246 // Paginate through all vote records
247 for {
248 endpoint := baseURL
249 if cursor != "" {
250 endpoint = fmt.Sprintf("%s&cursor=%s", baseURL, cursor)
251 }
252
253 req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil)
254 if err != nil {
255 return nil, fmt.Errorf("failed to create request: %w", err)
256 }
257 req.Header.Set("Authorization", "Bearer "+session.AccessToken)
258
259 resp, err := client.Do(req)
260 if err != nil {
261 return nil, fmt.Errorf("failed to call PDS: %w", err)
262 }
263
264 body, err := io.ReadAll(resp.Body)
265 closeErr := resp.Body.Close()
266 if closeErr != nil {
267 s.logger.Warn("failed to close response body", "error", closeErr)
268 }
269 if err != nil {
270 return nil, fmt.Errorf("failed to read response: %w", err)
271 }
272
273 // Handle auth errors - map to ErrNotAuthorized per lexicon
274 if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
275 s.logger.Warn("PDS auth failure",
276 "status", resp.StatusCode,
277 "did", session.AccountDID)
278 return nil, ErrNotAuthorized
279 }
280
281 if resp.StatusCode != http.StatusOK {
282 return nil, fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
283 }
284
285 // Parse the listRecords response
286 var result struct {
287 Records []struct {
288 URI string `json:"uri"`
289 CID string `json:"cid"`
290 Value struct {
291 Type string `json:"$type"`
292 Subject struct {
293 URI string `json:"uri"`
294 CID string `json:"cid"`
295 } `json:"subject"`
296 Direction string `json:"direction"`
297 CreatedAt string `json:"createdAt"`
298 } `json:"value"`
299 } `json:"records"`
300 Cursor string `json:"cursor"`
301 }
302
303 if err := json.Unmarshal(body, &result); err != nil {
304 return nil, fmt.Errorf("failed to parse PDS response: %w", err)
305 }
306
307 // Search for the vote matching our subject in this page
308 for _, rec := range result.Records {
309 if rec.Value.Subject.URI == subjectURI {
310 // Extract rkey from the URI (at://did/collection/rkey)
311 parts := strings.Split(rec.URI, "/")
312 if len(parts) < 5 {
313 continue
314 }
315 rkey := parts[len(parts)-1]
316
317 return &existingVote{
318 URI: rec.URI,
319 CID: rec.CID,
320 RKey: rkey,
321 Direction: rec.Value.Direction,
322 }, nil
323 }
324 }
325
326 // Check if there are more pages
327 if result.Cursor == "" {
328 break // No more pages
329 }
330 cursor = result.Cursor
331 }
332
333 // No vote found for this subject after checking all pages
334 return nil, nil
335}
336
337// existingVote represents a vote record found on the PDS
338type existingVote struct {
339 URI string
340 CID string
341 RKey string
342 Direction string
343}
344
345// deleteVoteRecord removes a vote record from the user's PDS
346func (s *voteService) deleteVoteRecord(ctx context.Context, session *oauth.ClientSessionData, rkey string) error {
347 // Call com.atproto.repo.deleteRecord on the user's PDS
348 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(session.HostURL, "/"))
349
350 payload := map[string]interface{}{
351 "repo": session.AccountDID.String(),
352 "collection": "social.coves.feed.vote",
353 "rkey": rkey,
354 }
355
356 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, session.AccessToken)
357 return err
358}
359
360// callPDSWithAuth makes an authenticated HTTP call to the PDS
361// Returns URI and CID from the response (for create/update operations)
362func (s *voteService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) {
363 jsonData, err := json.Marshal(payload)
364 if err != nil {
365 return "", "", fmt.Errorf("failed to marshal payload: %w", err)
366 }
367
368 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData))
369 if err != nil {
370 return "", "", fmt.Errorf("failed to create request: %w", err)
371 }
372 req.Header.Set("Content-Type", "application/json")
373
374 // Add OAuth bearer token for authentication
375 if accessToken != "" {
376 req.Header.Set("Authorization", "Bearer "+accessToken)
377 }
378
379 // Set reasonable timeout for PDS operations
380 timeout := 10 * time.Second
381 if strings.Contains(endpoint, "createRecord") || strings.Contains(endpoint, "putRecord") {
382 timeout = 15 * time.Second // Slightly longer for write operations
383 }
384
385 client := &http.Client{Timeout: timeout}
386 resp, err := client.Do(req)
387 if err != nil {
388 return "", "", fmt.Errorf("failed to call PDS: %w", err)
389 }
390 defer func() {
391 if closeErr := resp.Body.Close(); closeErr != nil {
392 s.logger.Warn("failed to close response body", "error", closeErr)
393 }
394 }()
395
396 body, err := io.ReadAll(resp.Body)
397 if err != nil {
398 return "", "", fmt.Errorf("failed to read response: %w", err)
399 }
400
401 // Handle auth errors - map to ErrNotAuthorized per lexicon
402 if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
403 s.logger.Warn("PDS auth failure during write operation",
404 "status", resp.StatusCode,
405 "endpoint", endpoint)
406 return "", "", ErrNotAuthorized
407 }
408
409 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
410 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
411 }
412
413 // Parse response to extract URI and CID (for create/update operations)
414 var result struct {
415 URI string `json:"uri"`
416 CID string `json:"cid"`
417 }
418 if err := json.Unmarshal(body, &result); err != nil {
419 // For delete operations, there might not be a response body with URI/CID
420 if method == "POST" && strings.Contains(endpoint, "deleteRecord") {
421 return "", "", nil
422 }
423 return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
424 }
425
426 return result.URI, result.CID, nil
427}