A community based topic aggregation platform built on atproto
at main 13 kB view raw
1package votes 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "strings" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/auth/oauth" 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 13 oauthclient "Coves/internal/atproto/oauth" 14 "Coves/internal/atproto/pds" 15) 16 17const ( 18 // voteCollection is the AT Protocol collection for vote records 19 voteCollection = "social.coves.feed.vote" 20) 21 22// PDSClientFactory creates PDS clients from session data. 23// Used to allow injection of different auth mechanisms (OAuth for production, password for tests). 24type PDSClientFactory func(ctx context.Context, session *oauth.ClientSessionData) (pds.Client, error) 25 26// voteService implements the Service interface for vote operations 27type voteService struct { 28 repo Repository 29 oauthClient *oauthclient.OAuthClient 30 oauthStore oauth.ClientAuthStore 31 logger *slog.Logger 32 pdsClientFactory PDSClientFactory // Optional, for testing. If nil, uses OAuth. 33 cache *VoteCache // In-memory cache of user votes from PDS 34} 35 36// NewService creates a new vote service instance 37func NewService(repo Repository, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, cache *VoteCache, logger *slog.Logger) Service { 38 if logger == nil { 39 logger = slog.Default() 40 } 41 return &voteService{ 42 repo: repo, 43 oauthClient: oauthClient, 44 oauthStore: oauthStore, 45 cache: cache, 46 logger: logger, 47 } 48} 49 50// NewServiceWithPDSFactory creates a vote service with a custom PDS client factory. 51// This is primarily for testing with password-based authentication. 52func NewServiceWithPDSFactory(repo Repository, cache *VoteCache, logger *slog.Logger, factory PDSClientFactory) Service { 53 if logger == nil { 54 logger = slog.Default() 55 } 56 return &voteService{ 57 repo: repo, 58 cache: cache, 59 logger: logger, 60 pdsClientFactory: factory, 61 } 62} 63 64// getPDSClient creates a PDS client from an OAuth session. 65// If a custom factory was provided (for testing), uses that. 66// Otherwise, uses DPoP authentication via indigo's APIClient for proper OAuth token handling. 67func (s *voteService) getPDSClient(ctx context.Context, session *oauth.ClientSessionData) (pds.Client, error) { 68 // Use custom factory if provided (e.g., for testing with password auth) 69 if s.pdsClientFactory != nil { 70 return s.pdsClientFactory(ctx, session) 71 } 72 73 // Production path: use OAuth with DPoP 74 if s.oauthClient == nil || s.oauthClient.ClientApp == nil { 75 return nil, fmt.Errorf("OAuth client not configured") 76 } 77 78 client, err := pds.NewFromOAuthSession(ctx, s.oauthClient.ClientApp, session) 79 if err != nil { 80 return nil, fmt.Errorf("failed to create PDS client: %w", err) 81 } 82 83 return client, nil 84} 85 86// CreateVote creates a new vote or toggles off an existing vote 87// Implements the toggle behavior: 88// - No existing vote → Create new vote with given direction 89// - Vote exists with same direction → Delete vote (toggle off) 90// - Vote exists with different direction → Update to new direction 91func (s *voteService) CreateVote(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (*CreateVoteResponse, error) { 92 // Validate direction 93 if req.Direction != "up" && req.Direction != "down" { 94 return nil, ErrInvalidDirection 95 } 96 97 // Validate subject URI format 98 if req.Subject.URI == "" { 99 return nil, ErrInvalidSubject 100 } 101 if !strings.HasPrefix(req.Subject.URI, "at://") { 102 return nil, ErrInvalidSubject 103 } 104 105 // Validate subject CID is provided 106 if req.Subject.CID == "" { 107 return nil, ErrInvalidSubject 108 } 109 110 // Create PDS client for this session 111 pdsClient, err := s.getPDSClient(ctx, session) 112 if err != nil { 113 s.logger.Error("failed to create PDS client", 114 "error", err, 115 "voter", session.AccountDID) 116 return nil, fmt.Errorf("failed to create PDS client: %w", err) 117 } 118 119 // Note: We intentionally don't validate subject existence here. 120 // The vote record goes to the user's PDS regardless. The Jetstream consumer 121 // handles orphaned votes correctly by only updating counts for non-deleted subjects. 122 // This avoids race conditions and eventual consistency issues. 123 124 // Check for existing vote by querying PDS directly (source of truth) 125 // This avoids eventual consistency issues with the AppView database 126 existing, err := s.findExistingVote(ctx, pdsClient, req.Subject.URI) 127 if err != nil { 128 s.logger.Error("failed to check existing vote on PDS", 129 "error", err, 130 "voter", session.AccountDID, 131 "subject", req.Subject.URI) 132 return nil, fmt.Errorf("failed to check existing vote: %w", err) 133 } 134 135 // Toggle logic 136 if existing != nil { 137 // Vote exists - check if same direction 138 if existing.Direction == req.Direction { 139 // Same direction - toggle off (delete) 140 if err := pdsClient.DeleteRecord(ctx, voteCollection, existing.RKey); err != nil { 141 s.logger.Error("failed to delete vote on PDS", 142 "error", err, 143 "voter", session.AccountDID, 144 "rkey", existing.RKey) 145 if pds.IsAuthError(err) { 146 return nil, ErrNotAuthorized 147 } 148 return nil, fmt.Errorf("failed to delete vote: %w", err) 149 } 150 151 s.logger.Info("vote toggled off", 152 "voter", session.AccountDID, 153 "subject", req.Subject.URI, 154 "direction", req.Direction) 155 156 // Update cache - remove the vote 157 if s.cache != nil { 158 s.cache.RemoveVote(session.AccountDID.String(), req.Subject.URI) 159 } 160 161 // Return empty response to indicate deletion 162 return &CreateVoteResponse{ 163 URI: "", 164 CID: "", 165 }, nil 166 } 167 168 // Different direction - delete old vote first, then create new one 169 if err := pdsClient.DeleteRecord(ctx, voteCollection, existing.RKey); err != nil { 170 s.logger.Error("failed to delete existing vote on PDS", 171 "error", err, 172 "voter", session.AccountDID, 173 "rkey", existing.RKey) 174 if pds.IsAuthError(err) { 175 return nil, ErrNotAuthorized 176 } 177 return nil, fmt.Errorf("failed to delete existing vote: %w", err) 178 } 179 180 s.logger.Info("deleted existing vote before creating new direction", 181 "voter", session.AccountDID, 182 "subject", req.Subject.URI, 183 "old_direction", existing.Direction, 184 "new_direction", req.Direction) 185 } 186 187 // Create new vote 188 uri, cid, err := s.createVoteRecord(ctx, pdsClient, req) 189 if err != nil { 190 s.logger.Error("failed to create vote on PDS", 191 "error", err, 192 "voter", session.AccountDID, 193 "subject", req.Subject.URI, 194 "direction", req.Direction) 195 if pds.IsAuthError(err) { 196 return nil, ErrNotAuthorized 197 } 198 return nil, fmt.Errorf("failed to create vote: %w", err) 199 } 200 201 s.logger.Info("vote created", 202 "voter", session.AccountDID, 203 "subject", req.Subject.URI, 204 "direction", req.Direction, 205 "uri", uri, 206 "cid", cid) 207 208 // Update cache - add the new vote 209 if s.cache != nil { 210 s.cache.SetVote(session.AccountDID.String(), req.Subject.URI, &CachedVote{ 211 Direction: req.Direction, 212 URI: uri, 213 RKey: extractRKeyFromURI(uri), 214 }) 215 } 216 217 return &CreateVoteResponse{ 218 URI: uri, 219 CID: cid, 220 }, nil 221} 222 223// DeleteVote removes a vote on the specified subject 224func (s *voteService) DeleteVote(ctx context.Context, session *oauth.ClientSessionData, req DeleteVoteRequest) error { 225 // Validate subject URI format 226 if req.Subject.URI == "" { 227 return ErrInvalidSubject 228 } 229 if !strings.HasPrefix(req.Subject.URI, "at://") { 230 return ErrInvalidSubject 231 } 232 233 // Create PDS client for this session 234 pdsClient, err := s.getPDSClient(ctx, session) 235 if err != nil { 236 s.logger.Error("failed to create PDS client", 237 "error", err, 238 "voter", session.AccountDID) 239 return fmt.Errorf("failed to create PDS client: %w", err) 240 } 241 242 // Find existing vote by querying PDS directly (source of truth) 243 // This avoids eventual consistency issues with the AppView database 244 existing, err := s.findExistingVote(ctx, pdsClient, req.Subject.URI) 245 if err != nil { 246 s.logger.Error("failed to find vote on PDS", 247 "error", err, 248 "voter", session.AccountDID, 249 "subject", req.Subject.URI) 250 return fmt.Errorf("failed to find vote: %w", err) 251 } 252 if existing == nil { 253 return ErrVoteNotFound 254 } 255 256 // Delete the vote record from user's PDS 257 if err := pdsClient.DeleteRecord(ctx, voteCollection, existing.RKey); err != nil { 258 s.logger.Error("failed to delete vote on PDS", 259 "error", err, 260 "voter", session.AccountDID, 261 "rkey", existing.RKey) 262 if pds.IsAuthError(err) { 263 return ErrNotAuthorized 264 } 265 return fmt.Errorf("failed to delete vote: %w", err) 266 } 267 268 s.logger.Info("vote deleted", 269 "voter", session.AccountDID, 270 "subject", req.Subject.URI, 271 "uri", existing.URI) 272 273 // Update cache - remove the vote 274 if s.cache != nil { 275 s.cache.RemoveVote(session.AccountDID.String(), req.Subject.URI) 276 } 277 278 return nil 279} 280 281// createVoteRecord writes a vote record to the user's PDS using PDSClient 282func (s *voteService) createVoteRecord(ctx context.Context, pdsClient pds.Client, req CreateVoteRequest) (string, string, error) { 283 // Generate TID for the record key 284 tid := syntax.NewTIDNow(0) 285 286 // Build vote record following the lexicon schema 287 record := VoteRecord{ 288 Type: voteCollection, 289 Subject: StrongRef{ 290 URI: req.Subject.URI, 291 CID: req.Subject.CID, 292 }, 293 Direction: req.Direction, 294 CreatedAt: time.Now().UTC().Format(time.RFC3339), 295 } 296 297 uri, cid, err := pdsClient.CreateRecord(ctx, voteCollection, tid.String(), record) 298 if err != nil { 299 return "", "", fmt.Errorf("createRecord failed: %w", err) 300 } 301 302 return uri, cid, nil 303} 304 305// existingVote represents a vote record found on the PDS 306type existingVote struct { 307 URI string 308 CID string 309 RKey string 310 Direction string 311} 312 313// findExistingVote queries the user's PDS directly to find an existing vote for a subject. 314// This avoids eventual consistency issues with the AppView database populated by Jetstream. 315// Paginates through all vote records to handle users with >100 votes. 316// Returns the vote record with rkey, or nil if no vote exists for the subject. 317func (s *voteService) findExistingVote(ctx context.Context, pdsClient pds.Client, subjectURI string) (*existingVote, error) { 318 cursor := "" 319 const pageSize = 100 320 321 // Paginate through all vote records 322 for { 323 result, err := pdsClient.ListRecords(ctx, voteCollection, pageSize, cursor) 324 if err != nil { 325 // Check for auth errors using typed errors 326 if pds.IsAuthError(err) { 327 return nil, ErrNotAuthorized 328 } 329 return nil, fmt.Errorf("listRecords failed: %w", err) 330 } 331 332 // Search for the vote matching our subject in this page 333 for _, rec := range result.Records { 334 // Extract subject from record value 335 subject, ok := rec.Value["subject"].(map[string]any) 336 if !ok { 337 continue 338 } 339 340 subjectURIValue, ok := subject["uri"].(string) 341 if !ok { 342 continue 343 } 344 345 if subjectURIValue == subjectURI { 346 // Extract rkey from the URI (at://did/collection/rkey) 347 parts := strings.Split(rec.URI, "/") 348 if len(parts) < 5 { 349 continue 350 } 351 rkey := parts[len(parts)-1] 352 353 // Extract direction 354 direction, _ := rec.Value["direction"].(string) 355 356 return &existingVote{ 357 URI: rec.URI, 358 CID: rec.CID, 359 RKey: rkey, 360 Direction: direction, 361 }, nil 362 } 363 } 364 365 // Check if there are more pages 366 if result.Cursor == "" { 367 break // No more pages 368 } 369 cursor = result.Cursor 370 } 371 372 // No vote found for this subject after checking all pages 373 return nil, nil 374} 375 376// EnsureCachePopulated fetches the user's votes from their PDS if not already cached. 377func (s *voteService) EnsureCachePopulated(ctx context.Context, session *oauth.ClientSessionData) error { 378 if s.cache == nil { 379 return nil // No cache configured 380 } 381 382 // Check if already cached 383 if s.cache.IsCached(session.AccountDID.String()) { 384 return nil 385 } 386 387 // Create PDS client for this session 388 pdsClient, err := s.getPDSClient(ctx, session) 389 if err != nil { 390 s.logger.Error("failed to create PDS client for cache population", 391 "error", err, 392 "user", session.AccountDID) 393 return fmt.Errorf("failed to create PDS client: %w", err) 394 } 395 396 // Fetch and cache votes from PDS 397 if err := s.cache.FetchAndCacheFromPDS(ctx, pdsClient); err != nil { 398 s.logger.Error("failed to populate vote cache from PDS", 399 "error", err, 400 "user", session.AccountDID) 401 return fmt.Errorf("failed to populate vote cache: %w", err) 402 } 403 404 return nil 405} 406 407// GetViewerVote returns the viewer's vote for a specific subject, or nil if not voted. 408func (s *voteService) GetViewerVote(userDID, subjectURI string) *CachedVote { 409 if s.cache == nil { 410 return nil 411 } 412 return s.cache.GetVote(userDID, subjectURI) 413} 414 415// GetViewerVotesForSubjects returns the viewer's votes for multiple subjects. 416func (s *voteService) GetViewerVotesForSubjects(userDID string, subjectURIs []string) map[string]*CachedVote { 417 result := make(map[string]*CachedVote) 418 if s.cache == nil { 419 return result 420 } 421 422 allVotes := s.cache.GetVotesForUser(userDID) 423 if allVotes == nil { 424 return result 425 } 426 427 for _, uri := range subjectURIs { 428 if vote, exists := allVotes[uri]; exists { 429 result[uri] = vote 430 } 431 } 432 433 return result 434}