A community based topic aggregation platform built on atproto
1package votes 2 3import ( 4 "Coves/internal/core/posts" 5 "bytes" 6 "context" 7 "encoding/json" 8 "fmt" 9 "io" 10 "log" 11 "net/http" 12 "strings" 13 "time" 14) 15 16type voteService struct { 17 repo Repository 18 postRepo posts.Repository 19 pdsURL string 20} 21 22// NewVoteService creates a new vote service 23func NewVoteService( 24 repo Repository, 25 postRepo posts.Repository, 26 pdsURL string, 27) Service { 28 return &voteService{ 29 repo: repo, 30 postRepo: postRepo, 31 pdsURL: pdsURL, 32 } 33} 34 35// CreateVote creates a new vote or toggles an existing vote 36// Toggle logic: 37// - No vote -> Create vote 38// - Same direction -> Delete vote (toggle off) 39// - Different direction -> Delete old + Create new (toggle direction) 40func (s *voteService) CreateVote(ctx context.Context, voterDID string, userAccessToken string, req CreateVoteRequest) (*CreateVoteResponse, error) { 41 // 1. Validate input 42 if voterDID == "" { 43 return nil, NewValidationError("voterDid", "required") 44 } 45 if userAccessToken == "" { 46 return nil, NewValidationError("userAccessToken", "required") 47 } 48 if req.Subject == "" { 49 return nil, NewValidationError("subject", "required") 50 } 51 if req.Direction != "up" && req.Direction != "down" { 52 return nil, ErrInvalidDirection 53 } 54 55 // 2. Validate subject URI format (should be at://...) 56 if !strings.HasPrefix(req.Subject, "at://") { 57 return nil, ErrInvalidSubject 58 } 59 60 // 3. Get subject post/comment to verify it exists and get its CID (for strong reference) 61 // For now, we assume the subject is a post. In the future, we'll support comments too. 62 post, err := s.postRepo.GetByURI(ctx, req.Subject) 63 if err != nil { 64 if err == posts.ErrNotFound { 65 return nil, ErrSubjectNotFound 66 } 67 return nil, fmt.Errorf("failed to get subject post: %w", err) 68 } 69 70 // 4. Check for existing vote on PDS (source of truth for toggle logic) 71 // IMPORTANT: We query the user's PDS directly instead of AppView to avoid race conditions. 72 // AppView is eventually consistent (updated via Jetstream), so querying it can cause 73 // duplicate vote records if the user toggles before Jetstream catches up. 74 existingVoteRecord, err := s.findVoteOnPDS(ctx, voterDID, userAccessToken, req.Subject) 75 if err != nil { 76 return nil, fmt.Errorf("failed to check existing vote on PDS: %w", err) 77 } 78 79 // 5. Handle toggle logic 80 var existingVoteURI *string 81 82 if existingVoteRecord != nil { 83 // Vote exists on PDS - implement toggle logic 84 if existingVoteRecord.Direction == req.Direction { 85 // Same direction -> Delete vote (toggle off) 86 log.Printf("[VOTE-CREATE] Toggle off: deleting existing %s vote on %s", req.Direction, req.Subject) 87 88 // Delete from user's PDS 89 if err := s.deleteRecordOnPDSAs(ctx, voterDID, "social.coves.interaction.vote", existingVoteRecord.RKey, userAccessToken); err != nil { 90 return nil, fmt.Errorf("failed to delete vote on PDS: %w", err) 91 } 92 93 // Return empty response (vote was deleted, not created) 94 return &CreateVoteResponse{ 95 URI: "", 96 CID: "", 97 }, nil 98 } 99 100 // Different direction -> Delete old vote first, then create new one below 101 log.Printf("[VOTE-CREATE] Toggle direction: %s -> %s on %s", existingVoteRecord.Direction, req.Direction, req.Subject) 102 103 if err := s.deleteRecordOnPDSAs(ctx, voterDID, "social.coves.interaction.vote", existingVoteRecord.RKey, userAccessToken); err != nil { 104 return nil, fmt.Errorf("failed to delete old vote on PDS: %w", err) 105 } 106 107 existingVoteURI = &existingVoteRecord.URI 108 } 109 110 // 6. Build vote record with strong reference 111 voteRecord := map[string]interface{}{ 112 "$type": "social.coves.interaction.vote", 113 "subject": map[string]interface{}{ 114 "uri": req.Subject, 115 "cid": post.CID, 116 }, 117 "direction": req.Direction, 118 "createdAt": time.Now().Format(time.RFC3339), 119 } 120 121 // 7. Write to user's PDS repository 122 recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, voterDID, "social.coves.interaction.vote", "", voteRecord, userAccessToken) 123 if err != nil { 124 return nil, fmt.Errorf("failed to create vote on PDS: %w", err) 125 } 126 127 log.Printf("[VOTE-CREATE] Created %s vote: %s (CID: %s)", req.Direction, recordURI, recordCID) 128 129 // 8. Return response 130 return &CreateVoteResponse{ 131 URI: recordURI, 132 CID: recordCID, 133 Existing: existingVoteURI, 134 }, nil 135} 136 137// DeleteVote removes a vote from a post/comment 138func (s *voteService) DeleteVote(ctx context.Context, voterDID string, userAccessToken string, req DeleteVoteRequest) error { 139 // 1. Validate input 140 if voterDID == "" { 141 return NewValidationError("voterDid", "required") 142 } 143 if userAccessToken == "" { 144 return NewValidationError("userAccessToken", "required") 145 } 146 if req.Subject == "" { 147 return NewValidationError("subject", "required") 148 } 149 150 // 2. Find existing vote on PDS (source of truth) 151 // IMPORTANT: Query PDS directly to avoid race conditions with AppView indexing 152 existingVoteRecord, err := s.findVoteOnPDS(ctx, voterDID, userAccessToken, req.Subject) 153 if err != nil { 154 return fmt.Errorf("failed to check existing vote on PDS: %w", err) 155 } 156 157 if existingVoteRecord == nil { 158 return ErrVoteNotFound 159 } 160 161 // 3. Delete from user's PDS 162 if err := s.deleteRecordOnPDSAs(ctx, voterDID, "social.coves.interaction.vote", existingVoteRecord.RKey, userAccessToken); err != nil { 163 return fmt.Errorf("failed to delete vote on PDS: %w", err) 164 } 165 166 log.Printf("[VOTE-DELETE] Deleted vote: %s", existingVoteRecord.URI) 167 168 return nil 169} 170 171// GetVote retrieves a user's vote on a specific subject 172func (s *voteService) GetVote(ctx context.Context, voterDID string, subjectURI string) (*Vote, error) { 173 return s.repo.GetByVoterAndSubject(ctx, voterDID, subjectURI) 174} 175 176// Helper methods for PDS operations 177 178// createRecordOnPDSAs creates a record on the PDS using the user's access token 179func (s *voteService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) { 180 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/")) 181 182 payload := map[string]interface{}{ 183 "repo": repoDID, 184 "collection": collection, 185 "record": record, 186 } 187 188 if rkey != "" { 189 payload["rkey"] = rkey 190 } 191 192 return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 193} 194 195// deleteRecordOnPDSAs deletes a record from the PDS using the user's access token 196func (s *voteService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey, accessToken string) error { 197 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/")) 198 199 payload := map[string]interface{}{ 200 "repo": repoDID, 201 "collection": collection, 202 "rkey": rkey, 203 } 204 205 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken) 206 return err 207} 208 209// callPDSWithAuth makes a PDS call with a specific access token 210func (s *voteService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) { 211 jsonData, err := json.Marshal(payload) 212 if err != nil { 213 return "", "", fmt.Errorf("failed to marshal payload: %w", err) 214 } 215 216 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData)) 217 if err != nil { 218 return "", "", fmt.Errorf("failed to create request: %w", err) 219 } 220 req.Header.Set("Content-Type", "application/json") 221 222 // Add authentication with provided access token 223 if accessToken != "" { 224 req.Header.Set("Authorization", "Bearer "+accessToken) 225 } 226 227 // Use 30 second timeout for write operations 228 timeout := 30 * time.Second 229 client := &http.Client{Timeout: timeout} 230 resp, err := client.Do(req) 231 if err != nil { 232 return "", "", fmt.Errorf("failed to call PDS: %w", err) 233 } 234 defer func() { 235 if closeErr := resp.Body.Close(); closeErr != nil { 236 log.Printf("Failed to close response body: %v", closeErr) 237 } 238 }() 239 240 body, err := io.ReadAll(resp.Body) 241 if err != nil { 242 return "", "", fmt.Errorf("failed to read response: %w", err) 243 } 244 245 if resp.StatusCode < 200 || resp.StatusCode >= 300 { 246 return "", "", fmt.Errorf("PDS returned error %d: %s", resp.StatusCode, string(body)) 247 } 248 249 // Parse response to extract URI and CID 250 var result struct { 251 URI string `json:"uri"` 252 CID string `json:"cid"` 253 } 254 if err := json.Unmarshal(body, &result); err != nil { 255 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 256 } 257 258 return result.URI, result.CID, nil 259} 260 261// Helper functions 262 263// PDSVoteRecord represents a vote record returned from PDS listRecords 264type PDSVoteRecord struct { 265 URI string 266 RKey string 267 Direction string 268 Subject struct { 269 URI string 270 CID string 271 } 272} 273 274// findVoteOnPDS queries the user's PDS to find an existing vote on a specific subject 275// This is the source of truth for toggle logic (avoiding AppView race conditions) 276// 277// IMPORTANT: This function paginates through ALL user votes with reverse=true (newest first) 278// to handle users with >100 votes. Without pagination, votes on older posts would not be found, 279// causing duplicate vote records and 404 errors on delete operations. 280func (s *voteService) findVoteOnPDS(ctx context.Context, voterDID, accessToken, subjectURI string) (*PDSVoteRecord, error) { 281 const maxPages = 50 // Safety limit: prevent infinite loops (50 pages * 100 = 5000 votes max) 282 var cursor string 283 pageCount := 0 284 285 client := &http.Client{Timeout: 10 * time.Second} 286 287 for { 288 pageCount++ 289 if pageCount > maxPages { 290 log.Printf("[VOTE-PDS] Reached max pagination limit (%d pages) searching for vote on %s", maxPages, subjectURI) 291 break 292 } 293 294 // Build endpoint with pagination cursor and reverse=true (newest first) 295 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=social.coves.interaction.vote&limit=100&reverse=true", 296 strings.TrimSuffix(s.pdsURL, "/"), voterDID) 297 298 if cursor != "" { 299 endpoint += fmt.Sprintf("&cursor=%s", cursor) 300 } 301 302 req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) 303 if err != nil { 304 return nil, fmt.Errorf("failed to create request: %w", err) 305 } 306 307 req.Header.Set("Authorization", "Bearer "+accessToken) 308 309 resp, err := client.Do(req) 310 if err != nil { 311 return nil, fmt.Errorf("failed to query PDS: %w", err) 312 } 313 314 if resp.StatusCode != http.StatusOK { 315 body, _ := io.ReadAll(resp.Body) 316 resp.Body.Close() 317 return nil, fmt.Errorf("PDS returned error %d: %s", resp.StatusCode, string(body)) 318 } 319 320 var result struct { 321 Records []struct { 322 URI string `json:"uri"` 323 Value struct { 324 Subject struct { 325 URI string `json:"uri"` 326 CID string `json:"cid"` 327 } `json:"subject"` 328 Direction string `json:"direction"` 329 } `json:"value"` 330 } `json:"records"` 331 Cursor string `json:"cursor,omitempty"` // Pagination cursor for next page 332 } 333 334 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 335 resp.Body.Close() 336 return nil, fmt.Errorf("failed to decode PDS response: %w", err) 337 } 338 resp.Body.Close() 339 340 // Find vote on this specific subject in current page 341 for _, record := range result.Records { 342 if record.Value.Subject.URI == subjectURI { 343 rkey := extractRKeyFromURI(record.URI) 344 log.Printf("[VOTE-PDS] Found existing vote on page %d: %s (direction: %s)", pageCount, record.URI, record.Value.Direction) 345 return &PDSVoteRecord{ 346 URI: record.URI, 347 RKey: rkey, 348 Direction: record.Value.Direction, 349 Subject: struct { 350 URI string 351 CID string 352 }{ 353 URI: record.Value.Subject.URI, 354 CID: record.Value.Subject.CID, 355 }, 356 }, nil 357 } 358 } 359 360 // No more pages to check 361 if result.Cursor == "" { 362 log.Printf("[VOTE-PDS] No existing vote found after checking %d page(s)", pageCount) 363 break 364 } 365 366 // Move to next page 367 cursor = result.Cursor 368 } 369 370 // No vote found on this subject after paginating through all records 371 return nil, nil 372} 373 374// extractRKeyFromURI extracts the rkey from an AT-URI (at://did/collection/rkey) 375func extractRKeyFromURI(uri string) string { 376 parts := strings.Split(uri, "/") 377 if len(parts) >= 4 { 378 return parts[len(parts)-1] 379 } 380 return "" 381} 382 383// ValidationError represents a validation error 384type ValidationError struct { 385 Field string 386 Message string 387} 388 389func (e *ValidationError) Error() string { 390 return fmt.Sprintf("validation error for field '%s': %s", e.Field, e.Message) 391} 392 393// NewValidationError creates a new validation error 394func NewValidationError(field, message string) error { 395 return &ValidationError{ 396 Field: field, 397 Message: message, 398 } 399}