···
4
-
"Coves/internal/core/posts"
16
-
type voteService struct {
18
-
postRepo posts.Repository
22
-
// NewVoteService creates a new vote service
23
-
func NewVoteService(
25
-
postRepo posts.Repository,
28
-
return &voteService{
35
-
// CreateVote creates a new vote or toggles an existing vote
37
-
// - No vote -> Create vote
38
-
// - Same direction -> Delete vote (toggle off)
39
-
// - Different direction -> Delete old + Create new (toggle direction)
40
-
func (s *voteService) CreateVote(ctx context.Context, voterDID string, userAccessToken string, req CreateVoteRequest) (*CreateVoteResponse, error) {
41
-
// 1. Validate input
43
-
return nil, NewValidationError("voterDid", "required")
45
-
if userAccessToken == "" {
46
-
return nil, NewValidationError("userAccessToken", "required")
48
-
if req.Subject == "" {
49
-
return nil, NewValidationError("subject", "required")
51
-
if req.Direction != "up" && req.Direction != "down" {
52
-
return nil, ErrInvalidDirection
55
-
// 2. Validate subject URI format (should be at://...)
56
-
if !strings.HasPrefix(req.Subject, "at://") {
57
-
return nil, ErrInvalidSubject
60
-
// 3. Get subject post/comment to verify it exists and get its CID (for strong reference)
61
-
// For now, we assume the subject is a post. In the future, we'll support comments too.
62
-
post, err := s.postRepo.GetByURI(ctx, req.Subject)
64
-
if err == posts.ErrNotFound {
65
-
return nil, ErrSubjectNotFound
67
-
return nil, fmt.Errorf("failed to get subject post: %w", err)
70
-
// 4. Check for existing vote on PDS (source of truth for toggle logic)
71
-
// IMPORTANT: We query the user's PDS directly instead of AppView to avoid race conditions.
72
-
// AppView is eventually consistent (updated via Jetstream), so querying it can cause
73
-
// duplicate vote records if the user toggles before Jetstream catches up.
74
-
existingVoteRecord, err := s.findVoteOnPDS(ctx, voterDID, userAccessToken, req.Subject)
76
-
return nil, fmt.Errorf("failed to check existing vote on PDS: %w", err)
79
-
// 5. Handle toggle logic
80
-
var existingVoteURI *string
82
-
if existingVoteRecord != nil {
83
-
// Vote exists on PDS - implement toggle logic
84
-
if existingVoteRecord.Direction == req.Direction {
85
-
// Same direction -> Delete vote (toggle off)
86
-
log.Printf("[VOTE-CREATE] Toggle off: deleting existing %s vote on %s", req.Direction, req.Subject)
88
-
// Delete from user's PDS
89
-
if err := s.deleteRecordOnPDSAs(ctx, voterDID, "social.coves.interaction.vote", existingVoteRecord.RKey, userAccessToken); err != nil {
90
-
return nil, fmt.Errorf("failed to delete vote on PDS: %w", err)
93
-
// Return empty response (vote was deleted, not created)
94
-
return &CreateVoteResponse{
100
-
// Different direction -> Delete old vote first, then create new one below
101
-
log.Printf("[VOTE-CREATE] Toggle direction: %s -> %s on %s", existingVoteRecord.Direction, req.Direction, req.Subject)
103
-
if err := s.deleteRecordOnPDSAs(ctx, voterDID, "social.coves.interaction.vote", existingVoteRecord.RKey, userAccessToken); err != nil {
104
-
return nil, fmt.Errorf("failed to delete old vote on PDS: %w", err)
107
-
existingVoteURI = &existingVoteRecord.URI
110
-
// 6. Build vote record with strong reference
111
-
voteRecord := map[string]interface{}{
112
-
"$type": "social.coves.interaction.vote",
113
-
"subject": map[string]interface{}{
114
-
"uri": req.Subject,
117
-
"direction": req.Direction,
118
-
"createdAt": time.Now().Format(time.RFC3339),
121
-
// 7. Write to user's PDS repository
122
-
recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, voterDID, "social.coves.interaction.vote", "", voteRecord, userAccessToken)
124
-
return nil, fmt.Errorf("failed to create vote on PDS: %w", err)
127
-
log.Printf("[VOTE-CREATE] Created %s vote: %s (CID: %s)", req.Direction, recordURI, recordCID)
129
-
// 8. Return response
130
-
return &CreateVoteResponse{
133
-
Existing: existingVoteURI,
137
-
// DeleteVote removes a vote from a post/comment
138
-
func (s *voteService) DeleteVote(ctx context.Context, voterDID string, userAccessToken string, req DeleteVoteRequest) error {
139
-
// 1. Validate input
140
-
if voterDID == "" {
141
-
return NewValidationError("voterDid", "required")
143
-
if userAccessToken == "" {
144
-
return NewValidationError("userAccessToken", "required")
146
-
if req.Subject == "" {
147
-
return NewValidationError("subject", "required")
150
-
// 2. Find existing vote on PDS (source of truth)
151
-
// IMPORTANT: Query PDS directly to avoid race conditions with AppView indexing
152
-
existingVoteRecord, err := s.findVoteOnPDS(ctx, voterDID, userAccessToken, req.Subject)
154
-
return fmt.Errorf("failed to check existing vote on PDS: %w", err)
157
-
if existingVoteRecord == nil {
158
-
return ErrVoteNotFound
161
-
// 3. Delete from user's PDS
162
-
if err := s.deleteRecordOnPDSAs(ctx, voterDID, "social.coves.interaction.vote", existingVoteRecord.RKey, userAccessToken); err != nil {
163
-
return fmt.Errorf("failed to delete vote on PDS: %w", err)
166
-
log.Printf("[VOTE-DELETE] Deleted vote: %s", existingVoteRecord.URI)
171
-
// GetVote retrieves a user's vote on a specific subject
172
-
func (s *voteService) GetVote(ctx context.Context, voterDID string, subjectURI string) (*Vote, error) {
173
-
return s.repo.GetByVoterAndSubject(ctx, voterDID, subjectURI)
176
-
// Helper methods for PDS operations
178
-
// createRecordOnPDSAs creates a record on the PDS using the user's access token
179
-
func (s *voteService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) {
180
-
endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/"))
182
-
payload := map[string]interface{}{
184
-
"collection": collection,
189
-
payload["rkey"] = rkey
192
-
return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
195
-
// deleteRecordOnPDSAs deletes a record from the PDS using the user's access token
196
-
func (s *voteService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey, accessToken string) error {
197
-
endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/"))
199
-
payload := map[string]interface{}{
201
-
"collection": collection,
205
-
_, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
209
-
// callPDSWithAuth makes a PDS call with a specific access token
210
-
func (s *voteService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) {
211
-
jsonData, err := json.Marshal(payload)
213
-
return "", "", fmt.Errorf("failed to marshal payload: %w", err)
216
-
req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData))
218
-
return "", "", fmt.Errorf("failed to create request: %w", err)
220
-
req.Header.Set("Content-Type", "application/json")
222
-
// Add authentication with provided access token
223
-
if accessToken != "" {
224
-
req.Header.Set("Authorization", "Bearer "+accessToken)
227
-
// Use 30 second timeout for write operations
228
-
timeout := 30 * time.Second
229
-
client := &http.Client{Timeout: timeout}
230
-
resp, err := client.Do(req)
232
-
return "", "", fmt.Errorf("failed to call PDS: %w", err)
235
-
if closeErr := resp.Body.Close(); closeErr != nil {
236
-
log.Printf("Failed to close response body: %v", closeErr)
240
-
body, err := io.ReadAll(resp.Body)
242
-
return "", "", fmt.Errorf("failed to read response: %w", err)
245
-
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
246
-
return "", "", fmt.Errorf("PDS returned error %d: %s", resp.StatusCode, string(body))
249
-
// Parse response to extract URI and CID
250
-
var result struct {
251
-
URI string `json:"uri"`
252
-
CID string `json:"cid"`
254
-
if err := json.Unmarshal(body, &result); err != nil {
255
-
return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
258
-
return result.URI, result.CID, nil
261
-
// Helper functions
263
-
// PDSVoteRecord represents a vote record returned from PDS listRecords
264
-
type PDSVoteRecord struct {
274
-
// findVoteOnPDS queries the user's PDS to find an existing vote on a specific subject
275
-
// This is the source of truth for toggle logic (avoiding AppView race conditions)
277
-
// IMPORTANT: This function paginates through ALL user votes with reverse=true (newest first)
278
-
// to handle users with >100 votes. Without pagination, votes on older posts would not be found,
279
-
// causing duplicate vote records and 404 errors on delete operations.
280
-
func (s *voteService) findVoteOnPDS(ctx context.Context, voterDID, accessToken, subjectURI string) (*PDSVoteRecord, error) {
281
-
const maxPages = 50 // Safety limit: prevent infinite loops (50 pages * 100 = 5000 votes max)
285
-
client := &http.Client{Timeout: 10 * time.Second}
289
-
if pageCount > maxPages {
290
-
log.Printf("[VOTE-PDS] Reached max pagination limit (%d pages) searching for vote on %s", maxPages, subjectURI)
294
-
// Build endpoint with pagination cursor and reverse=true (newest first)
295
-
endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=social.coves.interaction.vote&limit=100&reverse=true",
296
-
strings.TrimSuffix(s.pdsURL, "/"), voterDID)
299
-
endpoint += fmt.Sprintf("&cursor=%s", cursor)
302
-
req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil)
304
-
return nil, fmt.Errorf("failed to create request: %w", err)
307
-
req.Header.Set("Authorization", "Bearer "+accessToken)
309
-
resp, err := client.Do(req)
311
-
return nil, fmt.Errorf("failed to query PDS: %w", err)
314
-
if resp.StatusCode != http.StatusOK {
315
-
body, _ := io.ReadAll(resp.Body)
317
-
return nil, fmt.Errorf("PDS returned error %d: %s", resp.StatusCode, string(body))
320
-
var result struct {
322
-
URI string `json:"uri"`
325
-
URI string `json:"uri"`
326
-
CID string `json:"cid"`
328
-
Direction string `json:"direction"`
331
-
Cursor string `json:"cursor,omitempty"` // Pagination cursor for next page
334
-
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
336
-
return nil, fmt.Errorf("failed to decode PDS response: %w", err)
340
-
// Find vote on this specific subject in current page
341
-
for _, record := range result.Records {
342
-
if record.Value.Subject.URI == subjectURI {
343
-
rkey := extractRKeyFromURI(record.URI)
344
-
log.Printf("[VOTE-PDS] Found existing vote on page %d: %s (direction: %s)", pageCount, record.URI, record.Value.Direction)
345
-
return &PDSVoteRecord{
348
-
Direction: record.Value.Direction,
353
-
URI: record.Value.Subject.URI,
354
-
CID: record.Value.Subject.CID,
360
-
// No more pages to check
361
-
if result.Cursor == "" {
362
-
log.Printf("[VOTE-PDS] No existing vote found after checking %d page(s)", pageCount)
366
-
// Move to next page
367
-
cursor = result.Cursor
370
-
// No vote found on this subject after paginating through all records
374
-
// extractRKeyFromURI extracts the rkey from an AT-URI (at://did/collection/rkey)
375
-
func extractRKeyFromURI(uri string) string {
376
-
parts := strings.Split(uri, "/")
377
-
if len(parts) >= 4 {
378
-
return parts[len(parts)-1]
383
-
// ValidationError represents a validation error
384
-
type ValidationError struct {
389
-
func (e *ValidationError) Error() string {
390
-
return fmt.Sprintf("validation error for field '%s': %s", e.Field, e.Message)
393
-
// NewValidationError creates a new validation error
394
-
func NewValidationError(field, message string) error {
395
-
return &ValidationError{