A community based topic aggregation platform built on atproto
1package votes 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log/slog" 10 "net/http" 11 "strings" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/auth/oauth" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 17 oauthclient "Coves/internal/atproto/oauth" 18) 19 20// voteService implements the Service interface for vote operations 21type voteService struct { 22 repo Repository 23 subjectValidator SubjectValidator 24 oauthClient *oauthclient.OAuthClient 25 oauthStore oauth.ClientAuthStore 26 logger *slog.Logger 27} 28 29// NewService creates a new vote service instance 30// subjectValidator can be nil to skip subject existence checks (not recommended for production) 31func NewService(repo Repository, subjectValidator SubjectValidator, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, logger *slog.Logger) Service { 32 if logger == nil { 33 logger = slog.Default() 34 } 35 return &voteService{ 36 repo: repo, 37 subjectValidator: subjectValidator, 38 oauthClient: oauthClient, 39 oauthStore: oauthStore, 40 logger: logger, 41 } 42} 43 44// CreateVote creates a new vote or toggles off an existing vote 45// Implements the toggle behavior: 46// - No existing vote → Create new vote with given direction 47// - Vote exists with same direction → Delete vote (toggle off) 48// - Vote exists with different direction → Update to new direction 49func (s *voteService) CreateVote(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (*CreateVoteResponse, error) { 50 // Validate direction 51 if req.Direction != "up" && req.Direction != "down" { 52 return nil, ErrInvalidDirection 53 } 54 55 // Validate subject URI format 56 if req.Subject.URI == "" { 57 return nil, ErrInvalidSubject 58 } 59 if !strings.HasPrefix(req.Subject.URI, "at://") { 60 return nil, ErrInvalidSubject 61 } 62 63 // Validate subject CID is provided 64 if req.Subject.CID == "" { 65 return nil, ErrInvalidSubject 66 } 67 68 // Validate subject exists in AppView (post or comment) 69 // This prevents creating votes on non-existent content 70 if s.subjectValidator != nil { 71 exists, err := s.subjectValidator.SubjectExists(ctx, req.Subject.URI) 72 if err != nil { 73 s.logger.Error("failed to validate subject existence", 74 "error", err, 75 "subject", req.Subject.URI) 76 return nil, fmt.Errorf("failed to validate subject: %w", err) 77 } 78 if !exists { 79 return nil, ErrSubjectNotFound 80 } 81 } 82 83 // Check for existing vote by querying PDS directly (source of truth) 84 // This avoids eventual consistency issues with the AppView database 85 existing, err := s.getVoteFromPDS(ctx, session, req.Subject.URI) 86 if err != nil { 87 s.logger.Error("failed to check existing vote on PDS", 88 "error", err, 89 "voter", session.AccountDID, 90 "subject", req.Subject.URI) 91 return nil, fmt.Errorf("failed to check existing vote: %w", err) 92 } 93 94 // Toggle logic 95 if existing != nil { 96 // Vote exists - check if same direction 97 if existing.Direction == req.Direction { 98 // Same direction - toggle off (delete) 99 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil { 100 s.logger.Error("failed to delete vote on PDS", 101 "error", err, 102 "voter", session.AccountDID, 103 "rkey", existing.RKey) 104 return nil, fmt.Errorf("failed to delete vote: %w", err) 105 } 106 107 s.logger.Info("vote toggled off", 108 "voter", session.AccountDID, 109 "subject", req.Subject.URI, 110 "direction", req.Direction) 111 112 // Return empty response to indicate deletion 113 return &CreateVoteResponse{ 114 URI: "", 115 CID: "", 116 }, nil 117 } 118 119 // Different direction - delete old vote first, then create new one 120 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil { 121 s.logger.Error("failed to delete existing vote on PDS", 122 "error", err, 123 "voter", session.AccountDID, 124 "rkey", existing.RKey) 125 return nil, fmt.Errorf("failed to delete existing vote: %w", err) 126 } 127 128 s.logger.Info("deleted existing vote before creating new direction", 129 "voter", session.AccountDID, 130 "subject", req.Subject.URI, 131 "old_direction", existing.Direction, 132 "new_direction", req.Direction) 133 } 134 135 // Create new vote 136 uri, cid, err := s.createVoteRecord(ctx, session, req) 137 if err != nil { 138 s.logger.Error("failed to create vote on PDS", 139 "error", err, 140 "voter", session.AccountDID, 141 "subject", req.Subject.URI, 142 "direction", req.Direction) 143 return nil, fmt.Errorf("failed to create vote: %w", err) 144 } 145 146 s.logger.Info("vote created", 147 "voter", session.AccountDID, 148 "subject", req.Subject.URI, 149 "direction", req.Direction, 150 "uri", uri, 151 "cid", cid) 152 153 return &CreateVoteResponse{ 154 URI: uri, 155 CID: cid, 156 }, nil 157} 158 159// DeleteVote removes a vote on the specified subject 160func (s *voteService) DeleteVote(ctx context.Context, session *oauth.ClientSessionData, req DeleteVoteRequest) error { 161 // Validate subject URI format 162 if req.Subject.URI == "" { 163 return ErrInvalidSubject 164 } 165 if !strings.HasPrefix(req.Subject.URI, "at://") { 166 return ErrInvalidSubject 167 } 168 169 // Find existing vote by querying PDS directly (source of truth) 170 // This avoids eventual consistency issues with the AppView database 171 existing, err := s.getVoteFromPDS(ctx, session, req.Subject.URI) 172 if err != nil { 173 s.logger.Error("failed to find vote on PDS", 174 "error", err, 175 "voter", session.AccountDID, 176 "subject", req.Subject.URI) 177 return fmt.Errorf("failed to find vote: %w", err) 178 } 179 if existing == nil { 180 return ErrVoteNotFound 181 } 182 183 // Delete the vote record from user's PDS 184 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil { 185 s.logger.Error("failed to delete vote on PDS", 186 "error", err, 187 "voter", session.AccountDID, 188 "rkey", existing.RKey) 189 return fmt.Errorf("failed to delete vote: %w", err) 190 } 191 192 s.logger.Info("vote deleted", 193 "voter", session.AccountDID, 194 "subject", req.Subject.URI, 195 "uri", existing.URI) 196 197 return nil 198} 199 200// createVoteRecord writes a vote record to the user's PDS 201func (s *voteService) createVoteRecord(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (string, string, error) { 202 // Generate TID for the record key 203 tid := syntax.NewTIDNow(0) 204 205 // Build vote record following the lexicon schema 206 record := VoteRecord{ 207 Type: "social.coves.feed.vote", 208 Subject: StrongRef{ 209 URI: req.Subject.URI, 210 CID: req.Subject.CID, 211 }, 212 Direction: req.Direction, 213 CreatedAt: time.Now().UTC().Format(time.RFC3339), 214 } 215 216 // Call com.atproto.repo.createRecord on the user's PDS 217 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(session.HostURL, "/")) 218 219 payload := map[string]interface{}{ 220 "repo": session.AccountDID.String(), 221 "collection": "social.coves.feed.vote", 222 "rkey": tid.String(), 223 "record": record, 224 } 225 226 uri, cid, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, session.AccessToken) 227 if err != nil { 228 return "", "", err 229 } 230 231 return uri, cid, nil 232} 233 234// getVoteFromPDS queries the user's PDS directly to find an existing vote for a subject. 235// This avoids eventual consistency issues with the AppView database populated by Jetstream. 236// Paginates through all vote records to handle users with >100 votes. 237// Returns the vote record with rkey, or nil if no vote exists for the subject. 238func (s *voteService) getVoteFromPDS(ctx context.Context, session *oauth.ClientSessionData, subjectURI string) (*existingVote, error) { 239 baseURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=social.coves.feed.vote&limit=100", 240 strings.TrimSuffix(session.HostURL, "/"), 241 session.AccountDID.String()) 242 243 client := &http.Client{Timeout: 10 * time.Second} 244 cursor := "" 245 246 // Paginate through all vote records 247 for { 248 endpoint := baseURL 249 if cursor != "" { 250 endpoint = fmt.Sprintf("%s&cursor=%s", baseURL, cursor) 251 } 252 253 req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) 254 if err != nil { 255 return nil, fmt.Errorf("failed to create request: %w", err) 256 } 257 req.Header.Set("Authorization", "Bearer "+session.AccessToken) 258 259 resp, err := client.Do(req) 260 if err != nil { 261 return nil, fmt.Errorf("failed to call PDS: %w", err) 262 } 263 264 body, err := io.ReadAll(resp.Body) 265 closeErr := resp.Body.Close() 266 if closeErr != nil { 267 s.logger.Warn("failed to close response body", "error", closeErr) 268 } 269 if err != nil { 270 return nil, fmt.Errorf("failed to read response: %w", err) 271 } 272 273 // Handle auth errors - map to ErrNotAuthorized per lexicon 274 if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { 275 s.logger.Warn("PDS auth failure", 276 "status", resp.StatusCode, 277 "did", session.AccountDID) 278 return nil, ErrNotAuthorized 279 } 280 281 if resp.StatusCode != http.StatusOK { 282 return nil, fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 283 } 284 285 // Parse the listRecords response 286 var result struct { 287 Records []struct { 288 URI string `json:"uri"` 289 CID string `json:"cid"` 290 Value struct { 291 Type string `json:"$type"` 292 Subject struct { 293 URI string `json:"uri"` 294 CID string `json:"cid"` 295 } `json:"subject"` 296 Direction string `json:"direction"` 297 CreatedAt string `json:"createdAt"` 298 } `json:"value"` 299 } `json:"records"` 300 Cursor string `json:"cursor"` 301 } 302 303 if err := json.Unmarshal(body, &result); err != nil { 304 return nil, fmt.Errorf("failed to parse PDS response: %w", err) 305 } 306 307 // Search for the vote matching our subject in this page 308 for _, rec := range result.Records { 309 if rec.Value.Subject.URI == subjectURI { 310 // Extract rkey from the URI (at://did/collection/rkey) 311 parts := strings.Split(rec.URI, "/") 312 if len(parts) < 5 { 313 continue 314 } 315 rkey := parts[len(parts)-1] 316 317 return &existingVote{ 318 URI: rec.URI, 319 CID: rec.CID, 320 RKey: rkey, 321 Direction: rec.Value.Direction, 322 }, nil 323 } 324 } 325 326 // Check if there are more pages 327 if result.Cursor == "" { 328 break // No more pages 329 } 330 cursor = result.Cursor 331 } 332 333 // No vote found for this subject after checking all pages 334 return nil, nil 335} 336 337// existingVote represents a vote record found on the PDS 338type existingVote struct { 339 URI string 340 CID string 341 RKey string 342 Direction string 343} 344 345// deleteVoteRecord removes a vote record from the user's PDS 346func (s *voteService) deleteVoteRecord(ctx context.Context, session *oauth.ClientSessionData, rkey string) error { 347 // Call com.atproto.repo.deleteRecord on the user's PDS 348 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(session.HostURL, "/")) 349 350 payload := map[string]interface{}{ 351 "repo": session.AccountDID.String(), 352 "collection": "social.coves.feed.vote", 353 "rkey": rkey, 354 } 355 356 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, session.AccessToken) 357 return err 358} 359 360// callPDSWithAuth makes an authenticated HTTP call to the PDS 361// Returns URI and CID from the response (for create/update operations) 362func (s *voteService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) { 363 jsonData, err := json.Marshal(payload) 364 if err != nil { 365 return "", "", fmt.Errorf("failed to marshal payload: %w", err) 366 } 367 368 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData)) 369 if err != nil { 370 return "", "", fmt.Errorf("failed to create request: %w", err) 371 } 372 req.Header.Set("Content-Type", "application/json") 373 374 // Add OAuth bearer token for authentication 375 if accessToken != "" { 376 req.Header.Set("Authorization", "Bearer "+accessToken) 377 } 378 379 // Set reasonable timeout for PDS operations 380 timeout := 10 * time.Second 381 if strings.Contains(endpoint, "createRecord") || strings.Contains(endpoint, "putRecord") { 382 timeout = 15 * time.Second // Slightly longer for write operations 383 } 384 385 client := &http.Client{Timeout: timeout} 386 resp, err := client.Do(req) 387 if err != nil { 388 return "", "", fmt.Errorf("failed to call PDS: %w", err) 389 } 390 defer func() { 391 if closeErr := resp.Body.Close(); closeErr != nil { 392 s.logger.Warn("failed to close response body", "error", closeErr) 393 } 394 }() 395 396 body, err := io.ReadAll(resp.Body) 397 if err != nil { 398 return "", "", fmt.Errorf("failed to read response: %w", err) 399 } 400 401 // Handle auth errors - map to ErrNotAuthorized per lexicon 402 if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { 403 s.logger.Warn("PDS auth failure during write operation", 404 "status", resp.StatusCode, 405 "endpoint", endpoint) 406 return "", "", ErrNotAuthorized 407 } 408 409 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 410 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 411 } 412 413 // Parse response to extract URI and CID (for create/update operations) 414 var result struct { 415 URI string `json:"uri"` 416 CID string `json:"cid"` 417 } 418 if err := json.Unmarshal(body, &result); err != nil { 419 // For delete operations, there might not be a response body with URI/CID 420 if method == "POST" && strings.Contains(endpoint, "deleteRecord") { 421 return "", "", nil 422 } 423 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 424 } 425 426 return result.URI, result.CID, nil 427}