A community based topic aggregation platform built on atproto
1package votes
2
3import (
4 "Coves/internal/core/posts"
5 "bytes"
6 "context"
7 "encoding/json"
8 "fmt"
9 "io"
10 "log"
11 "net/http"
12 "strings"
13 "time"
14)
15
16type voteService struct {
17 repo Repository
18 postRepo posts.Repository
19 pdsURL string
20}
21
22// NewVoteService creates a new vote service
23func NewVoteService(
24 repo Repository,
25 postRepo posts.Repository,
26 pdsURL string,
27) Service {
28 return &voteService{
29 repo: repo,
30 postRepo: postRepo,
31 pdsURL: pdsURL,
32 }
33}
34
35// CreateVote creates a new vote or toggles an existing vote
36// Toggle logic:
37// - No vote -> Create vote
38// - Same direction -> Delete vote (toggle off)
39// - Different direction -> Delete old + Create new (toggle direction)
40func (s *voteService) CreateVote(ctx context.Context, voterDID string, userAccessToken string, req CreateVoteRequest) (*CreateVoteResponse, error) {
41 // 1. Validate input
42 if voterDID == "" {
43 return nil, NewValidationError("voterDid", "required")
44 }
45 if userAccessToken == "" {
46 return nil, NewValidationError("userAccessToken", "required")
47 }
48 if req.Subject == "" {
49 return nil, NewValidationError("subject", "required")
50 }
51 if req.Direction != "up" && req.Direction != "down" {
52 return nil, ErrInvalidDirection
53 }
54
55 // 2. Validate subject URI format (should be at://...)
56 if !strings.HasPrefix(req.Subject, "at://") {
57 return nil, ErrInvalidSubject
58 }
59
60 // 3. Get subject post/comment to verify it exists and get its CID (for strong reference)
61 // For now, we assume the subject is a post. In the future, we'll support comments too.
62 post, err := s.postRepo.GetByURI(ctx, req.Subject)
63 if err != nil {
64 if err == posts.ErrNotFound {
65 return nil, ErrSubjectNotFound
66 }
67 return nil, fmt.Errorf("failed to get subject post: %w", err)
68 }
69
70 // 4. Check for existing vote on PDS (source of truth for toggle logic)
71 // IMPORTANT: We query the user's PDS directly instead of AppView to avoid race conditions.
72 // AppView is eventually consistent (updated via Jetstream), so querying it can cause
73 // duplicate vote records if the user toggles before Jetstream catches up.
74 existingVoteRecord, err := s.findVoteOnPDS(ctx, voterDID, userAccessToken, req.Subject)
75 if err != nil {
76 return nil, fmt.Errorf("failed to check existing vote on PDS: %w", err)
77 }
78
79 // 5. Handle toggle logic
80 var existingVoteURI *string
81
82 if existingVoteRecord != nil {
83 // Vote exists on PDS - implement toggle logic
84 if existingVoteRecord.Direction == req.Direction {
85 // Same direction -> Delete vote (toggle off)
86 log.Printf("[VOTE-CREATE] Toggle off: deleting existing %s vote on %s", req.Direction, req.Subject)
87
88 // Delete from user's PDS
89 if err := s.deleteRecordOnPDSAs(ctx, voterDID, "social.coves.interaction.vote", existingVoteRecord.RKey, userAccessToken); err != nil {
90 return nil, fmt.Errorf("failed to delete vote on PDS: %w", err)
91 }
92
93 // Return empty response (vote was deleted, not created)
94 return &CreateVoteResponse{
95 URI: "",
96 CID: "",
97 }, nil
98 }
99
100 // Different direction -> Delete old vote first, then create new one below
101 log.Printf("[VOTE-CREATE] Toggle direction: %s -> %s on %s", existingVoteRecord.Direction, req.Direction, req.Subject)
102
103 if err := s.deleteRecordOnPDSAs(ctx, voterDID, "social.coves.interaction.vote", existingVoteRecord.RKey, userAccessToken); err != nil {
104 return nil, fmt.Errorf("failed to delete old vote on PDS: %w", err)
105 }
106
107 existingVoteURI = &existingVoteRecord.URI
108 }
109
110 // 6. Build vote record with strong reference
111 voteRecord := map[string]interface{}{
112 "$type": "social.coves.interaction.vote",
113 "subject": map[string]interface{}{
114 "uri": req.Subject,
115 "cid": post.CID,
116 },
117 "direction": req.Direction,
118 "createdAt": time.Now().Format(time.RFC3339),
119 }
120
121 // 7. Write to user's PDS repository
122 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, voterDID, "social.coves.interaction.vote", "", voteRecord, userAccessToken)
123 if err != nil {
124 return nil, fmt.Errorf("failed to create vote on PDS: %w", err)
125 }
126
127 log.Printf("[VOTE-CREATE] Created %s vote: %s (CID: %s)", req.Direction, recordURI, recordCID)
128
129 // 8. Return response
130 return &CreateVoteResponse{
131 URI: recordURI,
132 CID: recordCID,
133 Existing: existingVoteURI,
134 }, nil
135}
136
137// DeleteVote removes a vote from a post/comment
138func (s *voteService) DeleteVote(ctx context.Context, voterDID string, userAccessToken string, req DeleteVoteRequest) error {
139 // 1. Validate input
140 if voterDID == "" {
141 return NewValidationError("voterDid", "required")
142 }
143 if userAccessToken == "" {
144 return NewValidationError("userAccessToken", "required")
145 }
146 if req.Subject == "" {
147 return NewValidationError("subject", "required")
148 }
149
150 // 2. Find existing vote on PDS (source of truth)
151 // IMPORTANT: Query PDS directly to avoid race conditions with AppView indexing
152 existingVoteRecord, err := s.findVoteOnPDS(ctx, voterDID, userAccessToken, req.Subject)
153 if err != nil {
154 return fmt.Errorf("failed to check existing vote on PDS: %w", err)
155 }
156
157 if existingVoteRecord == nil {
158 return ErrVoteNotFound
159 }
160
161 // 3. Delete from user's PDS
162 if err := s.deleteRecordOnPDSAs(ctx, voterDID, "social.coves.interaction.vote", existingVoteRecord.RKey, userAccessToken); err != nil {
163 return fmt.Errorf("failed to delete vote on PDS: %w", err)
164 }
165
166 log.Printf("[VOTE-DELETE] Deleted vote: %s", existingVoteRecord.URI)
167
168 return nil
169}
170
171// GetVote retrieves a user's vote on a specific subject
172func (s *voteService) GetVote(ctx context.Context, voterDID string, subjectURI string) (*Vote, error) {
173 return s.repo.GetByVoterAndSubject(ctx, voterDID, subjectURI)
174}
175
176// Helper methods for PDS operations
177
178// createRecordOnPDSAs creates a record on the PDS using the user's access token
179func (s *voteService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) {
180 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/"))
181
182 payload := map[string]interface{}{
183 "repo": repoDID,
184 "collection": collection,
185 "record": record,
186 }
187
188 if rkey != "" {
189 payload["rkey"] = rkey
190 }
191
192 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
193}
194
195// deleteRecordOnPDSAs deletes a record from the PDS using the user's access token
196func (s *voteService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey, accessToken string) error {
197 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/"))
198
199 payload := map[string]interface{}{
200 "repo": repoDID,
201 "collection": collection,
202 "rkey": rkey,
203 }
204
205 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
206 return err
207}
208
209// callPDSWithAuth makes a PDS call with a specific access token
210func (s *voteService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) {
211 jsonData, err := json.Marshal(payload)
212 if err != nil {
213 return "", "", fmt.Errorf("failed to marshal payload: %w", err)
214 }
215
216 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData))
217 if err != nil {
218 return "", "", fmt.Errorf("failed to create request: %w", err)
219 }
220 req.Header.Set("Content-Type", "application/json")
221
222 // Add authentication with provided access token
223 if accessToken != "" {
224 req.Header.Set("Authorization", "Bearer "+accessToken)
225 }
226
227 // Use 30 second timeout for write operations
228 timeout := 30 * time.Second
229 client := &http.Client{Timeout: timeout}
230 resp, err := client.Do(req)
231 if err != nil {
232 return "", "", fmt.Errorf("failed to call PDS: %w", err)
233 }
234 defer func() {
235 if closeErr := resp.Body.Close(); closeErr != nil {
236 log.Printf("Failed to close response body: %v", closeErr)
237 }
238 }()
239
240 body, err := io.ReadAll(resp.Body)
241 if err != nil {
242 return "", "", fmt.Errorf("failed to read response: %w", err)
243 }
244
245 if resp.StatusCode < 200 || resp.StatusCode >= 300 {
246 return "", "", fmt.Errorf("PDS returned error %d: %s", resp.StatusCode, string(body))
247 }
248
249 // Parse response to extract URI and CID
250 var result struct {
251 URI string `json:"uri"`
252 CID string `json:"cid"`
253 }
254 if err := json.Unmarshal(body, &result); err != nil {
255 return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
256 }
257
258 return result.URI, result.CID, nil
259}
260
261// Helper functions
262
263// PDSVoteRecord represents a vote record returned from PDS listRecords
264type PDSVoteRecord struct {
265 URI string
266 RKey string
267 Direction string
268 Subject struct {
269 URI string
270 CID string
271 }
272}
273
274// findVoteOnPDS queries the user's PDS to find an existing vote on a specific subject
275// This is the source of truth for toggle logic (avoiding AppView race conditions)
276//
277// IMPORTANT: This function paginates through ALL user votes with reverse=true (newest first)
278// to handle users with >100 votes. Without pagination, votes on older posts would not be found,
279// causing duplicate vote records and 404 errors on delete operations.
280func (s *voteService) findVoteOnPDS(ctx context.Context, voterDID, accessToken, subjectURI string) (*PDSVoteRecord, error) {
281 const maxPages = 50 // Safety limit: prevent infinite loops (50 pages * 100 = 5000 votes max)
282 var cursor string
283 pageCount := 0
284
285 client := &http.Client{Timeout: 10 * time.Second}
286
287 for {
288 pageCount++
289 if pageCount > maxPages {
290 log.Printf("[VOTE-PDS] Reached max pagination limit (%d pages) searching for vote on %s", maxPages, subjectURI)
291 break
292 }
293
294 // Build endpoint with pagination cursor and reverse=true (newest first)
295 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=social.coves.interaction.vote&limit=100&reverse=true",
296 strings.TrimSuffix(s.pdsURL, "/"), voterDID)
297
298 if cursor != "" {
299 endpoint += fmt.Sprintf("&cursor=%s", cursor)
300 }
301
302 req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil)
303 if err != nil {
304 return nil, fmt.Errorf("failed to create request: %w", err)
305 }
306
307 req.Header.Set("Authorization", "Bearer "+accessToken)
308
309 resp, err := client.Do(req)
310 if err != nil {
311 return nil, fmt.Errorf("failed to query PDS: %w", err)
312 }
313
314 if resp.StatusCode != http.StatusOK {
315 body, _ := io.ReadAll(resp.Body)
316 resp.Body.Close()
317 return nil, fmt.Errorf("PDS returned error %d: %s", resp.StatusCode, string(body))
318 }
319
320 var result struct {
321 Records []struct {
322 URI string `json:"uri"`
323 Value struct {
324 Subject struct {
325 URI string `json:"uri"`
326 CID string `json:"cid"`
327 } `json:"subject"`
328 Direction string `json:"direction"`
329 } `json:"value"`
330 } `json:"records"`
331 Cursor string `json:"cursor,omitempty"` // Pagination cursor for next page
332 }
333
334 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
335 resp.Body.Close()
336 return nil, fmt.Errorf("failed to decode PDS response: %w", err)
337 }
338 resp.Body.Close()
339
340 // Find vote on this specific subject in current page
341 for _, record := range result.Records {
342 if record.Value.Subject.URI == subjectURI {
343 rkey := extractRKeyFromURI(record.URI)
344 log.Printf("[VOTE-PDS] Found existing vote on page %d: %s (direction: %s)", pageCount, record.URI, record.Value.Direction)
345 return &PDSVoteRecord{
346 URI: record.URI,
347 RKey: rkey,
348 Direction: record.Value.Direction,
349 Subject: struct {
350 URI string
351 CID string
352 }{
353 URI: record.Value.Subject.URI,
354 CID: record.Value.Subject.CID,
355 },
356 }, nil
357 }
358 }
359
360 // No more pages to check
361 if result.Cursor == "" {
362 log.Printf("[VOTE-PDS] No existing vote found after checking %d page(s)", pageCount)
363 break
364 }
365
366 // Move to next page
367 cursor = result.Cursor
368 }
369
370 // No vote found on this subject after paginating through all records
371 return nil, nil
372}
373
374// extractRKeyFromURI extracts the rkey from an AT-URI (at://did/collection/rkey)
375func extractRKeyFromURI(uri string) string {
376 parts := strings.Split(uri, "/")
377 if len(parts) >= 4 {
378 return parts[len(parts)-1]
379 }
380 return ""
381}
382
383// ValidationError represents a validation error
384type ValidationError struct {
385 Field string
386 Message string
387}
388
389func (e *ValidationError) Error() string {
390 return fmt.Sprintf("validation error for field '%s': %s", e.Field, e.Message)
391}
392
393// NewValidationError creates a new validation error
394func NewValidationError(field, message string) error {
395 return &ValidationError{
396 Field: field,
397 Message: message,
398 }
399}