A community based topic aggregation platform built on atproto
1package votes 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log/slog" 10 "net/http" 11 "strings" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/auth/oauth" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 17 oauthclient "Coves/internal/atproto/oauth" 18) 19 20// voteService implements the Service interface for vote operations 21type voteService struct { 22 repo Repository 23 oauthClient *oauthclient.OAuthClient 24 oauthStore oauth.ClientAuthStore 25 logger *slog.Logger 26} 27 28// NewService creates a new vote service instance 29func NewService(repo Repository, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, logger *slog.Logger) Service { 30 if logger == nil { 31 logger = slog.Default() 32 } 33 return &voteService{ 34 repo: repo, 35 oauthClient: oauthClient, 36 oauthStore: oauthStore, 37 logger: logger, 38 } 39} 40 41// CreateVote creates a new vote or toggles off an existing vote 42// Implements the toggle behavior: 43// - No existing vote → Create new vote with given direction 44// - Vote exists with same direction → Delete vote (toggle off) 45// - Vote exists with different direction → Update to new direction 46func (s *voteService) CreateVote(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (*CreateVoteResponse, error) { 47 // Validate direction 48 if req.Direction != "up" && req.Direction != "down" { 49 return nil, ErrInvalidDirection 50 } 51 52 // Validate subject URI format 53 if req.Subject.URI == "" { 54 return nil, ErrInvalidSubject 55 } 56 if !strings.HasPrefix(req.Subject.URI, "at://") { 57 return nil, ErrInvalidSubject 58 } 59 60 // Validate subject CID is provided 61 if req.Subject.CID == "" { 62 return nil, ErrInvalidSubject 63 } 64 65 // Note: We intentionally don't validate subject existence here. 66 // The vote record goes to the user's PDS regardless. The Jetstream consumer 67 // handles orphaned votes correctly by only updating counts for non-deleted subjects. 68 // This avoids race conditions and eventual consistency issues. 69 70 // Check for existing vote by querying PDS directly (source of truth) 71 // This avoids eventual consistency issues with the AppView database 72 existing, err := s.getVoteFromPDS(ctx, session, req.Subject.URI) 73 if err != nil { 74 s.logger.Error("failed to check existing vote on PDS", 75 "error", err, 76 "voter", session.AccountDID, 77 "subject", req.Subject.URI) 78 return nil, fmt.Errorf("failed to check existing vote: %w", err) 79 } 80 81 // Toggle logic 82 if existing != nil { 83 // Vote exists - check if same direction 84 if existing.Direction == req.Direction { 85 // Same direction - toggle off (delete) 86 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil { 87 s.logger.Error("failed to delete vote on PDS", 88 "error", err, 89 "voter", session.AccountDID, 90 "rkey", existing.RKey) 91 return nil, fmt.Errorf("failed to delete vote: %w", err) 92 } 93 94 s.logger.Info("vote toggled off", 95 "voter", session.AccountDID, 96 "subject", req.Subject.URI, 97 "direction", req.Direction) 98 99 // Return empty response to indicate deletion 100 return &CreateVoteResponse{ 101 URI: "", 102 CID: "", 103 }, nil 104 } 105 106 // Different direction - delete old vote first, then create new one 107 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil { 108 s.logger.Error("failed to delete existing vote on PDS", 109 "error", err, 110 "voter", session.AccountDID, 111 "rkey", existing.RKey) 112 return nil, fmt.Errorf("failed to delete existing vote: %w", err) 113 } 114 115 s.logger.Info("deleted existing vote before creating new direction", 116 "voter", session.AccountDID, 117 "subject", req.Subject.URI, 118 "old_direction", existing.Direction, 119 "new_direction", req.Direction) 120 } 121 122 // Create new vote 123 uri, cid, err := s.createVoteRecord(ctx, session, req) 124 if err != nil { 125 s.logger.Error("failed to create vote on PDS", 126 "error", err, 127 "voter", session.AccountDID, 128 "subject", req.Subject.URI, 129 "direction", req.Direction) 130 return nil, fmt.Errorf("failed to create vote: %w", err) 131 } 132 133 s.logger.Info("vote created", 134 "voter", session.AccountDID, 135 "subject", req.Subject.URI, 136 "direction", req.Direction, 137 "uri", uri, 138 "cid", cid) 139 140 return &CreateVoteResponse{ 141 URI: uri, 142 CID: cid, 143 }, nil 144} 145 146// DeleteVote removes a vote on the specified subject 147func (s *voteService) DeleteVote(ctx context.Context, session *oauth.ClientSessionData, req DeleteVoteRequest) error { 148 // Validate subject URI format 149 if req.Subject.URI == "" { 150 return ErrInvalidSubject 151 } 152 if !strings.HasPrefix(req.Subject.URI, "at://") { 153 return ErrInvalidSubject 154 } 155 156 // Find existing vote by querying PDS directly (source of truth) 157 // This avoids eventual consistency issues with the AppView database 158 existing, err := s.getVoteFromPDS(ctx, session, req.Subject.URI) 159 if err != nil { 160 s.logger.Error("failed to find vote on PDS", 161 "error", err, 162 "voter", session.AccountDID, 163 "subject", req.Subject.URI) 164 return fmt.Errorf("failed to find vote: %w", err) 165 } 166 if existing == nil { 167 return ErrVoteNotFound 168 } 169 170 // Delete the vote record from user's PDS 171 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil { 172 s.logger.Error("failed to delete vote on PDS", 173 "error", err, 174 "voter", session.AccountDID, 175 "rkey", existing.RKey) 176 return fmt.Errorf("failed to delete vote: %w", err) 177 } 178 179 s.logger.Info("vote deleted", 180 "voter", session.AccountDID, 181 "subject", req.Subject.URI, 182 "uri", existing.URI) 183 184 return nil 185} 186 187// createVoteRecord writes a vote record to the user's PDS 188func (s *voteService) createVoteRecord(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (string, string, error) { 189 // Generate TID for the record key 190 tid := syntax.NewTIDNow(0) 191 192 // Build vote record following the lexicon schema 193 record := VoteRecord{ 194 Type: "social.coves.feed.vote", 195 Subject: StrongRef{ 196 URI: req.Subject.URI, 197 CID: req.Subject.CID, 198 }, 199 Direction: req.Direction, 200 CreatedAt: time.Now().UTC().Format(time.RFC3339), 201 } 202 203 // Call com.atproto.repo.createRecord on the user's PDS 204 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(session.HostURL, "/")) 205 206 payload := map[string]interface{}{ 207 "repo": session.AccountDID.String(), 208 "collection": "social.coves.feed.vote", 209 "rkey": tid.String(), 210 "record": record, 211 } 212 213 uri, cid, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, session.AccessToken) 214 if err != nil { 215 return "", "", err 216 } 217 218 return uri, cid, nil 219} 220 221// getVoteFromPDS queries the user's PDS directly to find an existing vote for a subject. 222// This avoids eventual consistency issues with the AppView database populated by Jetstream. 223// Paginates through all vote records to handle users with >100 votes. 224// Returns the vote record with rkey, or nil if no vote exists for the subject. 225func (s *voteService) getVoteFromPDS(ctx context.Context, session *oauth.ClientSessionData, subjectURI string) (*existingVote, error) { 226 baseURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=social.coves.feed.vote&limit=100", 227 strings.TrimSuffix(session.HostURL, "/"), 228 session.AccountDID.String()) 229 230 client := &http.Client{Timeout: 10 * time.Second} 231 cursor := "" 232 233 // Paginate through all vote records 234 for { 235 endpoint := baseURL 236 if cursor != "" { 237 endpoint = fmt.Sprintf("%s&cursor=%s", baseURL, cursor) 238 } 239 240 req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) 241 if err != nil { 242 return nil, fmt.Errorf("failed to create request: %w", err) 243 } 244 req.Header.Set("Authorization", "Bearer "+session.AccessToken) 245 246 resp, err := client.Do(req) 247 if err != nil { 248 return nil, fmt.Errorf("failed to call PDS: %w", err) 249 } 250 251 body, err := io.ReadAll(resp.Body) 252 closeErr := resp.Body.Close() 253 if closeErr != nil { 254 s.logger.Warn("failed to close response body", "error", closeErr) 255 } 256 if err != nil { 257 return nil, fmt.Errorf("failed to read response: %w", err) 258 } 259 260 // Handle auth errors - map to ErrNotAuthorized per lexicon 261 if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { 262 s.logger.Warn("PDS auth failure", 263 "status", resp.StatusCode, 264 "did", session.AccountDID) 265 return nil, ErrNotAuthorized 266 } 267 268 if resp.StatusCode != http.StatusOK { 269 return nil, fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 270 } 271 272 // Parse the listRecords response 273 var result struct { 274 Cursor string `json:"cursor"` 275 Records []struct { 276 URI string `json:"uri"` 277 CID string `json:"cid"` 278 Value struct { 279 Type string `json:"$type"` 280 Subject struct { 281 URI string `json:"uri"` 282 CID string `json:"cid"` 283 } `json:"subject"` 284 Direction string `json:"direction"` 285 CreatedAt string `json:"createdAt"` 286 } `json:"value"` 287 } `json:"records"` 288 } 289 290 if err := json.Unmarshal(body, &result); err != nil { 291 return nil, fmt.Errorf("failed to parse PDS response: %w", err) 292 } 293 294 // Search for the vote matching our subject in this page 295 for _, rec := range result.Records { 296 if rec.Value.Subject.URI == subjectURI { 297 // Extract rkey from the URI (at://did/collection/rkey) 298 parts := strings.Split(rec.URI, "/") 299 if len(parts) < 5 { 300 continue 301 } 302 rkey := parts[len(parts)-1] 303 304 return &existingVote{ 305 URI: rec.URI, 306 CID: rec.CID, 307 RKey: rkey, 308 Direction: rec.Value.Direction, 309 }, nil 310 } 311 } 312 313 // Check if there are more pages 314 if result.Cursor == "" { 315 break // No more pages 316 } 317 cursor = result.Cursor 318 } 319 320 // No vote found for this subject after checking all pages 321 return nil, nil 322} 323 324// existingVote represents a vote record found on the PDS 325type existingVote struct { 326 URI string 327 CID string 328 RKey string 329 Direction string 330} 331 332// deleteVoteRecord removes a vote record from the user's PDS 333func (s *voteService) deleteVoteRecord(ctx context.Context, session *oauth.ClientSessionData, rkey string) error { 334 // Call com.atproto.repo.deleteRecord on the user's PDS 335 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(session.HostURL, "/")) 336 337 payload := map[string]interface{}{ 338 "repo": session.AccountDID.String(), 339 "collection": "social.coves.feed.vote", 340 "rkey": rkey, 341 } 342 343 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, session.AccessToken) 344 return err 345} 346 347// callPDSWithAuth makes an authenticated HTTP call to the PDS 348// Returns URI and CID from the response (for create/update operations) 349func (s *voteService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) { 350 jsonData, err := json.Marshal(payload) 351 if err != nil { 352 return "", "", fmt.Errorf("failed to marshal payload: %w", err) 353 } 354 355 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData)) 356 if err != nil { 357 return "", "", fmt.Errorf("failed to create request: %w", err) 358 } 359 req.Header.Set("Content-Type", "application/json") 360 361 // Add OAuth bearer token for authentication 362 if accessToken != "" { 363 req.Header.Set("Authorization", "Bearer "+accessToken) 364 } 365 366 // Set reasonable timeout for PDS operations 367 timeout := 10 * time.Second 368 if strings.Contains(endpoint, "createRecord") || strings.Contains(endpoint, "putRecord") { 369 timeout = 15 * time.Second // Slightly longer for write operations 370 } 371 372 client := &http.Client{Timeout: timeout} 373 resp, err := client.Do(req) 374 if err != nil { 375 return "", "", fmt.Errorf("failed to call PDS: %w", err) 376 } 377 defer func() { 378 if closeErr := resp.Body.Close(); closeErr != nil { 379 s.logger.Warn("failed to close response body", "error", closeErr) 380 } 381 }() 382 383 body, err := io.ReadAll(resp.Body) 384 if err != nil { 385 return "", "", fmt.Errorf("failed to read response: %w", err) 386 } 387 388 // Handle auth errors - map to ErrNotAuthorized per lexicon 389 if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { 390 s.logger.Warn("PDS auth failure during write operation", 391 "status", resp.StatusCode, 392 "endpoint", endpoint) 393 return "", "", ErrNotAuthorized 394 } 395 396 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 397 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 398 } 399 400 // Parse response to extract URI and CID (for create/update operations) 401 var result struct { 402 URI string `json:"uri"` 403 CID string `json:"cid"` 404 } 405 if err := json.Unmarshal(body, &result); err != nil { 406 // For delete operations, there might not be a response body with URI/CID 407 if method == "POST" && strings.Contains(endpoint, "deleteRecord") { 408 return "", "", nil 409 } 410 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 411 } 412 413 return result.URI, result.CID, nil 414}