A community based topic aggregation platform built on atproto
1package votes
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "strings"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/auth/oauth"
11 "github.com/bluesky-social/indigo/atproto/syntax"
12
13 oauthclient "Coves/internal/atproto/oauth"
14 "Coves/internal/atproto/pds"
15)
16
17const (
18 // voteCollection is the AT Protocol collection for vote records
19 voteCollection = "social.coves.feed.vote"
20)
21
22// PDSClientFactory creates PDS clients from session data.
23// Used to allow injection of different auth mechanisms (OAuth for production, password for tests).
24type PDSClientFactory func(ctx context.Context, session *oauth.ClientSessionData) (pds.Client, error)
25
26// voteService implements the Service interface for vote operations
27type voteService struct {
28 repo Repository
29 oauthClient *oauthclient.OAuthClient
30 oauthStore oauth.ClientAuthStore
31 logger *slog.Logger
32 pdsClientFactory PDSClientFactory // Optional, for testing. If nil, uses OAuth.
33 cache *VoteCache // In-memory cache of user votes from PDS
34}
35
36// NewService creates a new vote service instance
37func NewService(repo Repository, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, cache *VoteCache, logger *slog.Logger) Service {
38 if logger == nil {
39 logger = slog.Default()
40 }
41 return &voteService{
42 repo: repo,
43 oauthClient: oauthClient,
44 oauthStore: oauthStore,
45 cache: cache,
46 logger: logger,
47 }
48}
49
50// NewServiceWithPDSFactory creates a vote service with a custom PDS client factory.
51// This is primarily for testing with password-based authentication.
52func NewServiceWithPDSFactory(repo Repository, cache *VoteCache, logger *slog.Logger, factory PDSClientFactory) Service {
53 if logger == nil {
54 logger = slog.Default()
55 }
56 return &voteService{
57 repo: repo,
58 cache: cache,
59 logger: logger,
60 pdsClientFactory: factory,
61 }
62}
63
64// getPDSClient creates a PDS client from an OAuth session.
65// If a custom factory was provided (for testing), uses that.
66// Otherwise, uses DPoP authentication via indigo's APIClient for proper OAuth token handling.
67func (s *voteService) getPDSClient(ctx context.Context, session *oauth.ClientSessionData) (pds.Client, error) {
68 // Use custom factory if provided (e.g., for testing with password auth)
69 if s.pdsClientFactory != nil {
70 return s.pdsClientFactory(ctx, session)
71 }
72
73 // Production path: use OAuth with DPoP
74 if s.oauthClient == nil || s.oauthClient.ClientApp == nil {
75 return nil, fmt.Errorf("OAuth client not configured")
76 }
77
78 client, err := pds.NewFromOAuthSession(ctx, s.oauthClient.ClientApp, session)
79 if err != nil {
80 return nil, fmt.Errorf("failed to create PDS client: %w", err)
81 }
82
83 return client, nil
84}
85
86// CreateVote creates a new vote or toggles off an existing vote
87// Implements the toggle behavior:
88// - No existing vote → Create new vote with given direction
89// - Vote exists with same direction → Delete vote (toggle off)
90// - Vote exists with different direction → Update to new direction
91func (s *voteService) CreateVote(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (*CreateVoteResponse, error) {
92 // Validate direction
93 if req.Direction != "up" && req.Direction != "down" {
94 return nil, ErrInvalidDirection
95 }
96
97 // Validate subject URI format
98 if req.Subject.URI == "" {
99 return nil, ErrInvalidSubject
100 }
101 if !strings.HasPrefix(req.Subject.URI, "at://") {
102 return nil, ErrInvalidSubject
103 }
104
105 // Validate subject CID is provided
106 if req.Subject.CID == "" {
107 return nil, ErrInvalidSubject
108 }
109
110 // Create PDS client for this session
111 pdsClient, err := s.getPDSClient(ctx, session)
112 if err != nil {
113 s.logger.Error("failed to create PDS client",
114 "error", err,
115 "voter", session.AccountDID)
116 return nil, fmt.Errorf("failed to create PDS client: %w", err)
117 }
118
119 // Note: We intentionally don't validate subject existence here.
120 // The vote record goes to the user's PDS regardless. The Jetstream consumer
121 // handles orphaned votes correctly by only updating counts for non-deleted subjects.
122 // This avoids race conditions and eventual consistency issues.
123
124 // Check for existing vote by querying PDS directly (source of truth)
125 // This avoids eventual consistency issues with the AppView database
126 existing, err := s.findExistingVote(ctx, pdsClient, req.Subject.URI)
127 if err != nil {
128 s.logger.Error("failed to check existing vote on PDS",
129 "error", err,
130 "voter", session.AccountDID,
131 "subject", req.Subject.URI)
132 return nil, fmt.Errorf("failed to check existing vote: %w", err)
133 }
134
135 // Toggle logic
136 if existing != nil {
137 // Vote exists - check if same direction
138 if existing.Direction == req.Direction {
139 // Same direction - toggle off (delete)
140 if err := pdsClient.DeleteRecord(ctx, voteCollection, existing.RKey); err != nil {
141 s.logger.Error("failed to delete vote on PDS",
142 "error", err,
143 "voter", session.AccountDID,
144 "rkey", existing.RKey)
145 if pds.IsAuthError(err) {
146 return nil, ErrNotAuthorized
147 }
148 return nil, fmt.Errorf("failed to delete vote: %w", err)
149 }
150
151 s.logger.Info("vote toggled off",
152 "voter", session.AccountDID,
153 "subject", req.Subject.URI,
154 "direction", req.Direction)
155
156 // Update cache - remove the vote
157 if s.cache != nil {
158 s.cache.RemoveVote(session.AccountDID.String(), req.Subject.URI)
159 }
160
161 // Return empty response to indicate deletion
162 return &CreateVoteResponse{
163 URI: "",
164 CID: "",
165 }, nil
166 }
167
168 // Different direction - delete old vote first, then create new one
169 if err := pdsClient.DeleteRecord(ctx, voteCollection, existing.RKey); err != nil {
170 s.logger.Error("failed to delete existing vote on PDS",
171 "error", err,
172 "voter", session.AccountDID,
173 "rkey", existing.RKey)
174 if pds.IsAuthError(err) {
175 return nil, ErrNotAuthorized
176 }
177 return nil, fmt.Errorf("failed to delete existing vote: %w", err)
178 }
179
180 s.logger.Info("deleted existing vote before creating new direction",
181 "voter", session.AccountDID,
182 "subject", req.Subject.URI,
183 "old_direction", existing.Direction,
184 "new_direction", req.Direction)
185 }
186
187 // Create new vote
188 uri, cid, err := s.createVoteRecord(ctx, pdsClient, req)
189 if err != nil {
190 s.logger.Error("failed to create vote on PDS",
191 "error", err,
192 "voter", session.AccountDID,
193 "subject", req.Subject.URI,
194 "direction", req.Direction)
195 if pds.IsAuthError(err) {
196 return nil, ErrNotAuthorized
197 }
198 return nil, fmt.Errorf("failed to create vote: %w", err)
199 }
200
201 s.logger.Info("vote created",
202 "voter", session.AccountDID,
203 "subject", req.Subject.URI,
204 "direction", req.Direction,
205 "uri", uri,
206 "cid", cid)
207
208 // Update cache - add the new vote
209 if s.cache != nil {
210 s.cache.SetVote(session.AccountDID.String(), req.Subject.URI, &CachedVote{
211 Direction: req.Direction,
212 URI: uri,
213 RKey: extractRKeyFromURI(uri),
214 })
215 }
216
217 return &CreateVoteResponse{
218 URI: uri,
219 CID: cid,
220 }, nil
221}
222
223// DeleteVote removes a vote on the specified subject
224func (s *voteService) DeleteVote(ctx context.Context, session *oauth.ClientSessionData, req DeleteVoteRequest) error {
225 // Validate subject URI format
226 if req.Subject.URI == "" {
227 return ErrInvalidSubject
228 }
229 if !strings.HasPrefix(req.Subject.URI, "at://") {
230 return ErrInvalidSubject
231 }
232
233 // Create PDS client for this session
234 pdsClient, err := s.getPDSClient(ctx, session)
235 if err != nil {
236 s.logger.Error("failed to create PDS client",
237 "error", err,
238 "voter", session.AccountDID)
239 return fmt.Errorf("failed to create PDS client: %w", err)
240 }
241
242 // Find existing vote by querying PDS directly (source of truth)
243 // This avoids eventual consistency issues with the AppView database
244 existing, err := s.findExistingVote(ctx, pdsClient, req.Subject.URI)
245 if err != nil {
246 s.logger.Error("failed to find vote on PDS",
247 "error", err,
248 "voter", session.AccountDID,
249 "subject", req.Subject.URI)
250 return fmt.Errorf("failed to find vote: %w", err)
251 }
252 if existing == nil {
253 return ErrVoteNotFound
254 }
255
256 // Delete the vote record from user's PDS
257 if err := pdsClient.DeleteRecord(ctx, voteCollection, existing.RKey); err != nil {
258 s.logger.Error("failed to delete vote on PDS",
259 "error", err,
260 "voter", session.AccountDID,
261 "rkey", existing.RKey)
262 if pds.IsAuthError(err) {
263 return ErrNotAuthorized
264 }
265 return fmt.Errorf("failed to delete vote: %w", err)
266 }
267
268 s.logger.Info("vote deleted",
269 "voter", session.AccountDID,
270 "subject", req.Subject.URI,
271 "uri", existing.URI)
272
273 // Update cache - remove the vote
274 if s.cache != nil {
275 s.cache.RemoveVote(session.AccountDID.String(), req.Subject.URI)
276 }
277
278 return nil
279}
280
281// createVoteRecord writes a vote record to the user's PDS using PDSClient
282func (s *voteService) createVoteRecord(ctx context.Context, pdsClient pds.Client, req CreateVoteRequest) (string, string, error) {
283 // Generate TID for the record key
284 tid := syntax.NewTIDNow(0)
285
286 // Build vote record following the lexicon schema
287 record := VoteRecord{
288 Type: voteCollection,
289 Subject: StrongRef{
290 URI: req.Subject.URI,
291 CID: req.Subject.CID,
292 },
293 Direction: req.Direction,
294 CreatedAt: time.Now().UTC().Format(time.RFC3339),
295 }
296
297 uri, cid, err := pdsClient.CreateRecord(ctx, voteCollection, tid.String(), record)
298 if err != nil {
299 return "", "", fmt.Errorf("createRecord failed: %w", err)
300 }
301
302 return uri, cid, nil
303}
304
305// existingVote represents a vote record found on the PDS
306type existingVote struct {
307 URI string
308 CID string
309 RKey string
310 Direction string
311}
312
313// findExistingVote queries the user's PDS directly to find an existing vote for a subject.
314// This avoids eventual consistency issues with the AppView database populated by Jetstream.
315// Paginates through all vote records to handle users with >100 votes.
316// Returns the vote record with rkey, or nil if no vote exists for the subject.
317func (s *voteService) findExistingVote(ctx context.Context, pdsClient pds.Client, subjectURI string) (*existingVote, error) {
318 cursor := ""
319 const pageSize = 100
320
321 // Paginate through all vote records
322 for {
323 result, err := pdsClient.ListRecords(ctx, voteCollection, pageSize, cursor)
324 if err != nil {
325 // Check for auth errors using typed errors
326 if pds.IsAuthError(err) {
327 return nil, ErrNotAuthorized
328 }
329 return nil, fmt.Errorf("listRecords failed: %w", err)
330 }
331
332 // Search for the vote matching our subject in this page
333 for _, rec := range result.Records {
334 // Extract subject from record value
335 subject, ok := rec.Value["subject"].(map[string]any)
336 if !ok {
337 continue
338 }
339
340 subjectURIValue, ok := subject["uri"].(string)
341 if !ok {
342 continue
343 }
344
345 if subjectURIValue == subjectURI {
346 // Extract rkey from the URI (at://did/collection/rkey)
347 parts := strings.Split(rec.URI, "/")
348 if len(parts) < 5 {
349 continue
350 }
351 rkey := parts[len(parts)-1]
352
353 // Extract direction
354 direction, _ := rec.Value["direction"].(string)
355
356 return &existingVote{
357 URI: rec.URI,
358 CID: rec.CID,
359 RKey: rkey,
360 Direction: direction,
361 }, nil
362 }
363 }
364
365 // Check if there are more pages
366 if result.Cursor == "" {
367 break // No more pages
368 }
369 cursor = result.Cursor
370 }
371
372 // No vote found for this subject after checking all pages
373 return nil, nil
374}
375
376// EnsureCachePopulated fetches the user's votes from their PDS if not already cached.
377func (s *voteService) EnsureCachePopulated(ctx context.Context, session *oauth.ClientSessionData) error {
378 if s.cache == nil {
379 return nil // No cache configured
380 }
381
382 // Check if already cached
383 if s.cache.IsCached(session.AccountDID.String()) {
384 return nil
385 }
386
387 // Create PDS client for this session
388 pdsClient, err := s.getPDSClient(ctx, session)
389 if err != nil {
390 s.logger.Error("failed to create PDS client for cache population",
391 "error", err,
392 "user", session.AccountDID)
393 return fmt.Errorf("failed to create PDS client: %w", err)
394 }
395
396 // Fetch and cache votes from PDS
397 if err := s.cache.FetchAndCacheFromPDS(ctx, pdsClient); err != nil {
398 s.logger.Error("failed to populate vote cache from PDS",
399 "error", err,
400 "user", session.AccountDID)
401 return fmt.Errorf("failed to populate vote cache: %w", err)
402 }
403
404 return nil
405}
406
407// GetViewerVote returns the viewer's vote for a specific subject, or nil if not voted.
408func (s *voteService) GetViewerVote(userDID, subjectURI string) *CachedVote {
409 if s.cache == nil {
410 return nil
411 }
412 return s.cache.GetVote(userDID, subjectURI)
413}
414
415// GetViewerVotesForSubjects returns the viewer's votes for multiple subjects.
416func (s *voteService) GetViewerVotesForSubjects(userDID string, subjectURIs []string) map[string]*CachedVote {
417 result := make(map[string]*CachedVote)
418 if s.cache == nil {
419 return result
420 }
421
422 allVotes := s.cache.GetVotesForUser(userDID)
423 if allVotes == nil {
424 return result
425 }
426
427 for _, uri := range subjectURIs {
428 if vote, exists := allVotes[uri]; exists {
429 result[uri] = vote
430 }
431 }
432
433 return result
434}