A community based topic aggregation platform built on atproto
1package votes 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "strings" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/auth/oauth" 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 13 oauthclient "Coves/internal/atproto/oauth" 14 "Coves/internal/atproto/pds" 15) 16 17const ( 18 // voteCollection is the AT Protocol collection for vote records 19 voteCollection = "social.coves.feed.vote" 20) 21 22// PDSClientFactory creates PDS clients from session data. 23// Used to allow injection of different auth mechanisms (OAuth for production, password for tests). 24type PDSClientFactory func(ctx context.Context, session *oauth.ClientSessionData) (pds.Client, error) 25 26// voteService implements the Service interface for vote operations 27type voteService struct { 28 repo Repository 29 oauthClient *oauthclient.OAuthClient 30 oauthStore oauth.ClientAuthStore 31 logger *slog.Logger 32 pdsClientFactory PDSClientFactory // Optional, for testing. If nil, uses OAuth. 33} 34 35// NewService creates a new vote service instance 36func NewService(repo Repository, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, logger *slog.Logger) Service { 37 if logger == nil { 38 logger = slog.Default() 39 } 40 return &voteService{ 41 repo: repo, 42 oauthClient: oauthClient, 43 oauthStore: oauthStore, 44 logger: logger, 45 } 46} 47 48// NewServiceWithPDSFactory creates a vote service with a custom PDS client factory. 49// This is primarily for testing with password-based authentication. 50func NewServiceWithPDSFactory(repo Repository, logger *slog.Logger, factory PDSClientFactory) Service { 51 if logger == nil { 52 logger = slog.Default() 53 } 54 return &voteService{ 55 repo: repo, 56 logger: logger, 57 pdsClientFactory: factory, 58 } 59} 60 61// getPDSClient creates a PDS client from an OAuth session. 62// If a custom factory was provided (for testing), uses that. 63// Otherwise, uses DPoP authentication via indigo's APIClient for proper OAuth token handling. 64func (s *voteService) getPDSClient(ctx context.Context, session *oauth.ClientSessionData) (pds.Client, error) { 65 // Use custom factory if provided (e.g., for testing with password auth) 66 if s.pdsClientFactory != nil { 67 return s.pdsClientFactory(ctx, session) 68 } 69 70 // Production path: use OAuth with DPoP 71 if s.oauthClient == nil || s.oauthClient.ClientApp == nil { 72 return nil, fmt.Errorf("OAuth client not configured") 73 } 74 75 client, err := pds.NewFromOAuthSession(ctx, s.oauthClient.ClientApp, session) 76 if err != nil { 77 return nil, fmt.Errorf("failed to create PDS client: %w", err) 78 } 79 80 return client, nil 81} 82 83// CreateVote creates a new vote or toggles off an existing vote 84// Implements the toggle behavior: 85// - No existing vote → Create new vote with given direction 86// - Vote exists with same direction → Delete vote (toggle off) 87// - Vote exists with different direction → Update to new direction 88func (s *voteService) CreateVote(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (*CreateVoteResponse, error) { 89 // Validate direction 90 if req.Direction != "up" && req.Direction != "down" { 91 return nil, ErrInvalidDirection 92 } 93 94 // Validate subject URI format 95 if req.Subject.URI == "" { 96 return nil, ErrInvalidSubject 97 } 98 if !strings.HasPrefix(req.Subject.URI, "at://") { 99 return nil, ErrInvalidSubject 100 } 101 102 // Validate subject CID is provided 103 if req.Subject.CID == "" { 104 return nil, ErrInvalidSubject 105 } 106 107 // Create PDS client for this session 108 pdsClient, err := s.getPDSClient(ctx, session) 109 if err != nil { 110 s.logger.Error("failed to create PDS client", 111 "error", err, 112 "voter", session.AccountDID) 113 return nil, fmt.Errorf("failed to create PDS client: %w", err) 114 } 115 116 // Note: We intentionally don't validate subject existence here. 117 // The vote record goes to the user's PDS regardless. The Jetstream consumer 118 // handles orphaned votes correctly by only updating counts for non-deleted subjects. 119 // This avoids race conditions and eventual consistency issues. 120 121 // Check for existing vote by querying PDS directly (source of truth) 122 // This avoids eventual consistency issues with the AppView database 123 existing, err := s.findExistingVote(ctx, pdsClient, req.Subject.URI) 124 if err != nil { 125 s.logger.Error("failed to check existing vote on PDS", 126 "error", err, 127 "voter", session.AccountDID, 128 "subject", req.Subject.URI) 129 return nil, fmt.Errorf("failed to check existing vote: %w", err) 130 } 131 132 // Toggle logic 133 if existing != nil { 134 // Vote exists - check if same direction 135 if existing.Direction == req.Direction { 136 // Same direction - toggle off (delete) 137 if err := pdsClient.DeleteRecord(ctx, voteCollection, existing.RKey); err != nil { 138 s.logger.Error("failed to delete vote on PDS", 139 "error", err, 140 "voter", session.AccountDID, 141 "rkey", existing.RKey) 142 if pds.IsAuthError(err) { 143 return nil, ErrNotAuthorized 144 } 145 return nil, fmt.Errorf("failed to delete vote: %w", err) 146 } 147 148 s.logger.Info("vote toggled off", 149 "voter", session.AccountDID, 150 "subject", req.Subject.URI, 151 "direction", req.Direction) 152 153 // Return empty response to indicate deletion 154 return &CreateVoteResponse{ 155 URI: "", 156 CID: "", 157 }, nil 158 } 159 160 // Different direction - delete old vote first, then create new one 161 if err := pdsClient.DeleteRecord(ctx, voteCollection, existing.RKey); err != nil { 162 s.logger.Error("failed to delete existing vote on PDS", 163 "error", err, 164 "voter", session.AccountDID, 165 "rkey", existing.RKey) 166 if pds.IsAuthError(err) { 167 return nil, ErrNotAuthorized 168 } 169 return nil, fmt.Errorf("failed to delete existing vote: %w", err) 170 } 171 172 s.logger.Info("deleted existing vote before creating new direction", 173 "voter", session.AccountDID, 174 "subject", req.Subject.URI, 175 "old_direction", existing.Direction, 176 "new_direction", req.Direction) 177 } 178 179 // Create new vote 180 uri, cid, err := s.createVoteRecord(ctx, pdsClient, req) 181 if err != nil { 182 s.logger.Error("failed to create vote on PDS", 183 "error", err, 184 "voter", session.AccountDID, 185 "subject", req.Subject.URI, 186 "direction", req.Direction) 187 if pds.IsAuthError(err) { 188 return nil, ErrNotAuthorized 189 } 190 return nil, fmt.Errorf("failed to create vote: %w", err) 191 } 192 193 s.logger.Info("vote created", 194 "voter", session.AccountDID, 195 "subject", req.Subject.URI, 196 "direction", req.Direction, 197 "uri", uri, 198 "cid", cid) 199 200 return &CreateVoteResponse{ 201 URI: uri, 202 CID: cid, 203 }, nil 204} 205 206// DeleteVote removes a vote on the specified subject 207func (s *voteService) DeleteVote(ctx context.Context, session *oauth.ClientSessionData, req DeleteVoteRequest) error { 208 // Validate subject URI format 209 if req.Subject.URI == "" { 210 return ErrInvalidSubject 211 } 212 if !strings.HasPrefix(req.Subject.URI, "at://") { 213 return ErrInvalidSubject 214 } 215 216 // Create PDS client for this session 217 pdsClient, err := s.getPDSClient(ctx, session) 218 if err != nil { 219 s.logger.Error("failed to create PDS client", 220 "error", err, 221 "voter", session.AccountDID) 222 return fmt.Errorf("failed to create PDS client: %w", err) 223 } 224 225 // Find existing vote by querying PDS directly (source of truth) 226 // This avoids eventual consistency issues with the AppView database 227 existing, err := s.findExistingVote(ctx, pdsClient, req.Subject.URI) 228 if err != nil { 229 s.logger.Error("failed to find vote on PDS", 230 "error", err, 231 "voter", session.AccountDID, 232 "subject", req.Subject.URI) 233 return fmt.Errorf("failed to find vote: %w", err) 234 } 235 if existing == nil { 236 return ErrVoteNotFound 237 } 238 239 // Delete the vote record from user's PDS 240 if err := pdsClient.DeleteRecord(ctx, voteCollection, existing.RKey); err != nil { 241 s.logger.Error("failed to delete vote on PDS", 242 "error", err, 243 "voter", session.AccountDID, 244 "rkey", existing.RKey) 245 if pds.IsAuthError(err) { 246 return ErrNotAuthorized 247 } 248 return fmt.Errorf("failed to delete vote: %w", err) 249 } 250 251 s.logger.Info("vote deleted", 252 "voter", session.AccountDID, 253 "subject", req.Subject.URI, 254 "uri", existing.URI) 255 256 return nil 257} 258 259// createVoteRecord writes a vote record to the user's PDS using PDSClient 260func (s *voteService) createVoteRecord(ctx context.Context, pdsClient pds.Client, req CreateVoteRequest) (string, string, error) { 261 // Generate TID for the record key 262 tid := syntax.NewTIDNow(0) 263 264 // Build vote record following the lexicon schema 265 record := VoteRecord{ 266 Type: voteCollection, 267 Subject: StrongRef{ 268 URI: req.Subject.URI, 269 CID: req.Subject.CID, 270 }, 271 Direction: req.Direction, 272 CreatedAt: time.Now().UTC().Format(time.RFC3339), 273 } 274 275 uri, cid, err := pdsClient.CreateRecord(ctx, voteCollection, tid.String(), record) 276 if err != nil { 277 return "", "", fmt.Errorf("createRecord failed: %w", err) 278 } 279 280 return uri, cid, nil 281} 282 283// existingVote represents a vote record found on the PDS 284type existingVote struct { 285 URI string 286 CID string 287 RKey string 288 Direction string 289} 290 291// findExistingVote queries the user's PDS directly to find an existing vote for a subject. 292// This avoids eventual consistency issues with the AppView database populated by Jetstream. 293// Paginates through all vote records to handle users with >100 votes. 294// Returns the vote record with rkey, or nil if no vote exists for the subject. 295func (s *voteService) findExistingVote(ctx context.Context, pdsClient pds.Client, subjectURI string) (*existingVote, error) { 296 cursor := "" 297 const pageSize = 100 298 299 // Paginate through all vote records 300 for { 301 result, err := pdsClient.ListRecords(ctx, voteCollection, pageSize, cursor) 302 if err != nil { 303 // Check for auth errors using typed errors 304 if pds.IsAuthError(err) { 305 return nil, ErrNotAuthorized 306 } 307 return nil, fmt.Errorf("listRecords failed: %w", err) 308 } 309 310 // Search for the vote matching our subject in this page 311 for _, rec := range result.Records { 312 // Extract subject from record value 313 subject, ok := rec.Value["subject"].(map[string]any) 314 if !ok { 315 continue 316 } 317 318 subjectURIValue, ok := subject["uri"].(string) 319 if !ok { 320 continue 321 } 322 323 if subjectURIValue == subjectURI { 324 // Extract rkey from the URI (at://did/collection/rkey) 325 parts := strings.Split(rec.URI, "/") 326 if len(parts) < 5 { 327 continue 328 } 329 rkey := parts[len(parts)-1] 330 331 // Extract direction 332 direction, _ := rec.Value["direction"].(string) 333 334 return &existingVote{ 335 URI: rec.URI, 336 CID: rec.CID, 337 RKey: rkey, 338 Direction: direction, 339 }, nil 340 } 341 } 342 343 // Check if there are more pages 344 if result.Cursor == "" { 345 break // No more pages 346 } 347 cursor = result.Cursor 348 } 349 350 // No vote found for this subject after checking all pages 351 return nil, nil 352}