A community based topic aggregation platform built on atproto
1package votes 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log/slog" 10 "net/http" 11 "strings" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/auth/oauth" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 17 oauthclient "Coves/internal/atproto/oauth" 18) 19 20// voteService implements the Service interface for vote operations 21type voteService struct { 22 repo Repository 23 oauthClient *oauthclient.OAuthClient 24 oauthStore oauth.ClientAuthStore 25 logger *slog.Logger 26} 27 28// NewService creates a new vote service instance 29func NewService(repo Repository, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, logger *slog.Logger) Service { 30 if logger == nil { 31 logger = slog.Default() 32 } 33 return &voteService{ 34 repo: repo, 35 oauthClient: oauthClient, 36 oauthStore: oauthStore, 37 logger: logger, 38 } 39} 40 41// CreateVote creates a new vote or toggles off an existing vote 42// Implements the toggle behavior: 43// - No existing vote → Create new vote with given direction 44// - Vote exists with same direction → Delete vote (toggle off) 45// - Vote exists with different direction → Update to new direction 46func (s *voteService) CreateVote(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (*CreateVoteResponse, error) { 47 // Validate direction 48 if req.Direction != "up" && req.Direction != "down" { 49 return nil, ErrInvalidDirection 50 } 51 52 // Validate subject URI format 53 if req.Subject.URI == "" { 54 return nil, ErrInvalidSubject 55 } 56 if !strings.HasPrefix(req.Subject.URI, "at://") { 57 return nil, ErrInvalidSubject 58 } 59 60 // Validate subject CID is provided 61 if req.Subject.CID == "" { 62 return nil, ErrInvalidSubject 63 } 64 65 // Check for existing vote by querying PDS directly (source of truth) 66 // This avoids eventual consistency issues with the AppView database 67 existing, err := s.getVoteFromPDS(ctx, session, req.Subject.URI) 68 if err != nil { 69 s.logger.Error("failed to check existing vote on PDS", 70 "error", err, 71 "voter", session.AccountDID, 72 "subject", req.Subject.URI) 73 return nil, fmt.Errorf("failed to check existing vote: %w", err) 74 } 75 76 // Toggle logic 77 if existing != nil { 78 // Vote exists - check if same direction 79 if existing.Direction == req.Direction { 80 // Same direction - toggle off (delete) 81 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil { 82 s.logger.Error("failed to delete vote on PDS", 83 "error", err, 84 "voter", session.AccountDID, 85 "rkey", existing.RKey) 86 return nil, fmt.Errorf("failed to delete vote: %w", err) 87 } 88 89 s.logger.Info("vote toggled off", 90 "voter", session.AccountDID, 91 "subject", req.Subject.URI, 92 "direction", req.Direction) 93 94 // Return empty response to indicate deletion 95 return &CreateVoteResponse{ 96 URI: "", 97 CID: "", 98 }, nil 99 } 100 101 // Different direction - delete old vote first, then create new one 102 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil { 103 s.logger.Error("failed to delete existing vote on PDS", 104 "error", err, 105 "voter", session.AccountDID, 106 "rkey", existing.RKey) 107 return nil, fmt.Errorf("failed to delete existing vote: %w", err) 108 } 109 110 s.logger.Info("deleted existing vote before creating new direction", 111 "voter", session.AccountDID, 112 "subject", req.Subject.URI, 113 "old_direction", existing.Direction, 114 "new_direction", req.Direction) 115 } 116 117 // Create new vote 118 uri, cid, err := s.createVoteRecord(ctx, session, req) 119 if err != nil { 120 s.logger.Error("failed to create vote on PDS", 121 "error", err, 122 "voter", session.AccountDID, 123 "subject", req.Subject.URI, 124 "direction", req.Direction) 125 return nil, fmt.Errorf("failed to create vote: %w", err) 126 } 127 128 s.logger.Info("vote created", 129 "voter", session.AccountDID, 130 "subject", req.Subject.URI, 131 "direction", req.Direction, 132 "uri", uri, 133 "cid", cid) 134 135 return &CreateVoteResponse{ 136 URI: uri, 137 CID: cid, 138 }, nil 139} 140 141// DeleteVote removes a vote on the specified subject 142func (s *voteService) DeleteVote(ctx context.Context, session *oauth.ClientSessionData, req DeleteVoteRequest) error { 143 // Validate subject URI format 144 if req.Subject.URI == "" { 145 return ErrInvalidSubject 146 } 147 if !strings.HasPrefix(req.Subject.URI, "at://") { 148 return ErrInvalidSubject 149 } 150 151 // Find existing vote by querying PDS directly (source of truth) 152 // This avoids eventual consistency issues with the AppView database 153 existing, err := s.getVoteFromPDS(ctx, session, req.Subject.URI) 154 if err != nil { 155 s.logger.Error("failed to find vote on PDS", 156 "error", err, 157 "voter", session.AccountDID, 158 "subject", req.Subject.URI) 159 return fmt.Errorf("failed to find vote: %w", err) 160 } 161 if existing == nil { 162 return ErrVoteNotFound 163 } 164 165 // Delete the vote record from user's PDS 166 if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil { 167 s.logger.Error("failed to delete vote on PDS", 168 "error", err, 169 "voter", session.AccountDID, 170 "rkey", existing.RKey) 171 return fmt.Errorf("failed to delete vote: %w", err) 172 } 173 174 s.logger.Info("vote deleted", 175 "voter", session.AccountDID, 176 "subject", req.Subject.URI, 177 "uri", existing.URI) 178 179 return nil 180} 181 182// createVoteRecord writes a vote record to the user's PDS 183func (s *voteService) createVoteRecord(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (string, string, error) { 184 // Generate TID for the record key 185 tid := syntax.NewTIDNow(0) 186 187 // Build vote record following the lexicon schema 188 record := VoteRecord{ 189 Type: "social.coves.feed.vote", 190 Subject: StrongRef{ 191 URI: req.Subject.URI, 192 CID: req.Subject.CID, 193 }, 194 Direction: req.Direction, 195 CreatedAt: time.Now().UTC().Format(time.RFC3339), 196 } 197 198 // Call com.atproto.repo.createRecord on the user's PDS 199 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(session.HostURL, "/")) 200 201 payload := map[string]interface{}{ 202 "repo": session.AccountDID.String(), 203 "collection": "social.coves.feed.vote", 204 "rkey": tid.String(), 205 "record": record, 206 } 207 208 uri, cid, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, session.AccessToken) 209 if err != nil { 210 return "", "", err 211 } 212 213 return uri, cid, nil 214} 215 216// getVoteFromPDS queries the user's PDS directly to find an existing vote for a subject. 217// This avoids eventual consistency issues with the AppView database populated by Jetstream. 218// Paginates through all vote records to handle users with >100 votes. 219// Returns the vote record with rkey, or nil if no vote exists for the subject. 220func (s *voteService) getVoteFromPDS(ctx context.Context, session *oauth.ClientSessionData, subjectURI string) (*existingVote, error) { 221 baseURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=social.coves.feed.vote&limit=100", 222 strings.TrimSuffix(session.HostURL, "/"), 223 session.AccountDID.String()) 224 225 client := &http.Client{Timeout: 10 * time.Second} 226 cursor := "" 227 228 // Paginate through all vote records 229 for { 230 endpoint := baseURL 231 if cursor != "" { 232 endpoint = fmt.Sprintf("%s&cursor=%s", baseURL, cursor) 233 } 234 235 req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) 236 if err != nil { 237 return nil, fmt.Errorf("failed to create request: %w", err) 238 } 239 req.Header.Set("Authorization", "Bearer "+session.AccessToken) 240 241 resp, err := client.Do(req) 242 if err != nil { 243 return nil, fmt.Errorf("failed to call PDS: %w", err) 244 } 245 246 body, err := io.ReadAll(resp.Body) 247 closeErr := resp.Body.Close() 248 if closeErr != nil { 249 s.logger.Warn("failed to close response body", "error", closeErr) 250 } 251 if err != nil { 252 return nil, fmt.Errorf("failed to read response: %w", err) 253 } 254 255 // Handle auth errors - map to ErrNotAuthorized per lexicon 256 if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { 257 s.logger.Warn("PDS auth failure", 258 "status", resp.StatusCode, 259 "did", session.AccountDID) 260 return nil, ErrNotAuthorized 261 } 262 263 if resp.StatusCode != http.StatusOK { 264 return nil, fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 265 } 266 267 // Parse the listRecords response 268 var result struct { 269 Records []struct { 270 URI string `json:"uri"` 271 CID string `json:"cid"` 272 Value struct { 273 Type string `json:"$type"` 274 Subject struct { 275 URI string `json:"uri"` 276 CID string `json:"cid"` 277 } `json:"subject"` 278 Direction string `json:"direction"` 279 CreatedAt string `json:"createdAt"` 280 } `json:"value"` 281 } `json:"records"` 282 Cursor string `json:"cursor"` 283 } 284 285 if err := json.Unmarshal(body, &result); err != nil { 286 return nil, fmt.Errorf("failed to parse PDS response: %w", err) 287 } 288 289 // Search for the vote matching our subject in this page 290 for _, rec := range result.Records { 291 if rec.Value.Subject.URI == subjectURI { 292 // Extract rkey from the URI (at://did/collection/rkey) 293 parts := strings.Split(rec.URI, "/") 294 if len(parts) < 5 { 295 continue 296 } 297 rkey := parts[len(parts)-1] 298 299 return &existingVote{ 300 URI: rec.URI, 301 CID: rec.CID, 302 RKey: rkey, 303 Direction: rec.Value.Direction, 304 }, nil 305 } 306 } 307 308 // Check if there are more pages 309 if result.Cursor == "" { 310 break // No more pages 311 } 312 cursor = result.Cursor 313 } 314 315 // No vote found for this subject after checking all pages 316 return nil, nil 317} 318 319// existingVote represents a vote record found on the PDS 320type existingVote struct { 321 URI string 322 CID string 323 RKey string 324 Direction string 325} 326 327// deleteVoteRecord removes a vote record from the user's PDS 328func (s *voteService) deleteVoteRecord(ctx context.Context, session *oauth.ClientSessionData, rkey string) error { 329 // Call com.atproto.repo.deleteRecord on the user's PDS 330 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(session.HostURL, "/")) 331 332 payload := map[string]interface{}{ 333 "repo": session.AccountDID.String(), 334 "collection": "social.coves.feed.vote", 335 "rkey": rkey, 336 } 337 338 _, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, session.AccessToken) 339 return err 340} 341 342// callPDSWithAuth makes an authenticated HTTP call to the PDS 343// Returns URI and CID from the response (for create/update operations) 344func (s *voteService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) { 345 jsonData, err := json.Marshal(payload) 346 if err != nil { 347 return "", "", fmt.Errorf("failed to marshal payload: %w", err) 348 } 349 350 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData)) 351 if err != nil { 352 return "", "", fmt.Errorf("failed to create request: %w", err) 353 } 354 req.Header.Set("Content-Type", "application/json") 355 356 // Add OAuth bearer token for authentication 357 if accessToken != "" { 358 req.Header.Set("Authorization", "Bearer "+accessToken) 359 } 360 361 // Set reasonable timeout for PDS operations 362 timeout := 10 * time.Second 363 if strings.Contains(endpoint, "createRecord") || strings.Contains(endpoint, "putRecord") { 364 timeout = 15 * time.Second // Slightly longer for write operations 365 } 366 367 client := &http.Client{Timeout: timeout} 368 resp, err := client.Do(req) 369 if err != nil { 370 return "", "", fmt.Errorf("failed to call PDS: %w", err) 371 } 372 defer func() { 373 if closeErr := resp.Body.Close(); closeErr != nil { 374 s.logger.Warn("failed to close response body", "error", closeErr) 375 } 376 }() 377 378 body, err := io.ReadAll(resp.Body) 379 if err != nil { 380 return "", "", fmt.Errorf("failed to read response: %w", err) 381 } 382 383 // Handle auth errors - map to ErrNotAuthorized per lexicon 384 if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { 385 s.logger.Warn("PDS auth failure during write operation", 386 "status", resp.StatusCode, 387 "endpoint", endpoint) 388 return "", "", ErrNotAuthorized 389 } 390 391 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 392 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 393 } 394 395 // Parse response to extract URI and CID (for create/update operations) 396 var result struct { 397 URI string `json:"uri"` 398 CID string `json:"cid"` 399 } 400 if err := json.Unmarshal(body, &result); err != nil { 401 // For delete operations, there might not be a response body with URI/CID 402 if method == "POST" && strings.Contains(endpoint, "deleteRecord") { 403 return "", "", nil 404 } 405 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 406 } 407 408 return result.URI, result.CID, nil 409}