A community based topic aggregation platform built on atproto
1package votes
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "strings"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/auth/oauth"
11 "github.com/bluesky-social/indigo/atproto/syntax"
12
13 oauthclient "Coves/internal/atproto/oauth"
14 "Coves/internal/atproto/pds"
15)
16
17const (
18 // voteCollection is the AT Protocol collection for vote records
19 voteCollection = "social.coves.feed.vote"
20)
21
22// PDSClientFactory creates PDS clients from session data.
23// Used to allow injection of different auth mechanisms (OAuth for production, password for tests).
24type PDSClientFactory func(ctx context.Context, session *oauth.ClientSessionData) (pds.Client, error)
25
26// voteService implements the Service interface for vote operations
27type voteService struct {
28 repo Repository
29 oauthClient *oauthclient.OAuthClient
30 oauthStore oauth.ClientAuthStore
31 logger *slog.Logger
32 pdsClientFactory PDSClientFactory // Optional, for testing. If nil, uses OAuth.
33}
34
35// NewService creates a new vote service instance
36func NewService(repo Repository, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, logger *slog.Logger) Service {
37 if logger == nil {
38 logger = slog.Default()
39 }
40 return &voteService{
41 repo: repo,
42 oauthClient: oauthClient,
43 oauthStore: oauthStore,
44 logger: logger,
45 }
46}
47
48// NewServiceWithPDSFactory creates a vote service with a custom PDS client factory.
49// This is primarily for testing with password-based authentication.
50func NewServiceWithPDSFactory(repo Repository, logger *slog.Logger, factory PDSClientFactory) Service {
51 if logger == nil {
52 logger = slog.Default()
53 }
54 return &voteService{
55 repo: repo,
56 logger: logger,
57 pdsClientFactory: factory,
58 }
59}
60
61// getPDSClient creates a PDS client from an OAuth session.
62// If a custom factory was provided (for testing), uses that.
63// Otherwise, uses DPoP authentication via indigo's APIClient for proper OAuth token handling.
64func (s *voteService) getPDSClient(ctx context.Context, session *oauth.ClientSessionData) (pds.Client, error) {
65 // Use custom factory if provided (e.g., for testing with password auth)
66 if s.pdsClientFactory != nil {
67 return s.pdsClientFactory(ctx, session)
68 }
69
70 // Production path: use OAuth with DPoP
71 if s.oauthClient == nil || s.oauthClient.ClientApp == nil {
72 return nil, fmt.Errorf("OAuth client not configured")
73 }
74
75 client, err := pds.NewFromOAuthSession(ctx, s.oauthClient.ClientApp, session)
76 if err != nil {
77 return nil, fmt.Errorf("failed to create PDS client: %w", err)
78 }
79
80 return client, nil
81}
82
83// CreateVote creates a new vote or toggles off an existing vote
84// Implements the toggle behavior:
85// - No existing vote → Create new vote with given direction
86// - Vote exists with same direction → Delete vote (toggle off)
87// - Vote exists with different direction → Update to new direction
88func (s *voteService) CreateVote(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (*CreateVoteResponse, error) {
89 // Validate direction
90 if req.Direction != "up" && req.Direction != "down" {
91 return nil, ErrInvalidDirection
92 }
93
94 // Validate subject URI format
95 if req.Subject.URI == "" {
96 return nil, ErrInvalidSubject
97 }
98 if !strings.HasPrefix(req.Subject.URI, "at://") {
99 return nil, ErrInvalidSubject
100 }
101
102 // Validate subject CID is provided
103 if req.Subject.CID == "" {
104 return nil, ErrInvalidSubject
105 }
106
107 // Create PDS client for this session
108 pdsClient, err := s.getPDSClient(ctx, session)
109 if err != nil {
110 s.logger.Error("failed to create PDS client",
111 "error", err,
112 "voter", session.AccountDID)
113 return nil, fmt.Errorf("failed to create PDS client: %w", err)
114 }
115
116 // Note: We intentionally don't validate subject existence here.
117 // The vote record goes to the user's PDS regardless. The Jetstream consumer
118 // handles orphaned votes correctly by only updating counts for non-deleted subjects.
119 // This avoids race conditions and eventual consistency issues.
120
121 // Check for existing vote by querying PDS directly (source of truth)
122 // This avoids eventual consistency issues with the AppView database
123 existing, err := s.findExistingVote(ctx, pdsClient, req.Subject.URI)
124 if err != nil {
125 s.logger.Error("failed to check existing vote on PDS",
126 "error", err,
127 "voter", session.AccountDID,
128 "subject", req.Subject.URI)
129 return nil, fmt.Errorf("failed to check existing vote: %w", err)
130 }
131
132 // Toggle logic
133 if existing != nil {
134 // Vote exists - check if same direction
135 if existing.Direction == req.Direction {
136 // Same direction - toggle off (delete)
137 if err := pdsClient.DeleteRecord(ctx, voteCollection, existing.RKey); err != nil {
138 s.logger.Error("failed to delete vote on PDS",
139 "error", err,
140 "voter", session.AccountDID,
141 "rkey", existing.RKey)
142 if pds.IsAuthError(err) {
143 return nil, ErrNotAuthorized
144 }
145 return nil, fmt.Errorf("failed to delete vote: %w", err)
146 }
147
148 s.logger.Info("vote toggled off",
149 "voter", session.AccountDID,
150 "subject", req.Subject.URI,
151 "direction", req.Direction)
152
153 // Return empty response to indicate deletion
154 return &CreateVoteResponse{
155 URI: "",
156 CID: "",
157 }, nil
158 }
159
160 // Different direction - delete old vote first, then create new one
161 if err := pdsClient.DeleteRecord(ctx, voteCollection, existing.RKey); err != nil {
162 s.logger.Error("failed to delete existing vote on PDS",
163 "error", err,
164 "voter", session.AccountDID,
165 "rkey", existing.RKey)
166 if pds.IsAuthError(err) {
167 return nil, ErrNotAuthorized
168 }
169 return nil, fmt.Errorf("failed to delete existing vote: %w", err)
170 }
171
172 s.logger.Info("deleted existing vote before creating new direction",
173 "voter", session.AccountDID,
174 "subject", req.Subject.URI,
175 "old_direction", existing.Direction,
176 "new_direction", req.Direction)
177 }
178
179 // Create new vote
180 uri, cid, err := s.createVoteRecord(ctx, pdsClient, req)
181 if err != nil {
182 s.logger.Error("failed to create vote on PDS",
183 "error", err,
184 "voter", session.AccountDID,
185 "subject", req.Subject.URI,
186 "direction", req.Direction)
187 if pds.IsAuthError(err) {
188 return nil, ErrNotAuthorized
189 }
190 return nil, fmt.Errorf("failed to create vote: %w", err)
191 }
192
193 s.logger.Info("vote created",
194 "voter", session.AccountDID,
195 "subject", req.Subject.URI,
196 "direction", req.Direction,
197 "uri", uri,
198 "cid", cid)
199
200 return &CreateVoteResponse{
201 URI: uri,
202 CID: cid,
203 }, nil
204}
205
206// DeleteVote removes a vote on the specified subject
207func (s *voteService) DeleteVote(ctx context.Context, session *oauth.ClientSessionData, req DeleteVoteRequest) error {
208 // Validate subject URI format
209 if req.Subject.URI == "" {
210 return ErrInvalidSubject
211 }
212 if !strings.HasPrefix(req.Subject.URI, "at://") {
213 return ErrInvalidSubject
214 }
215
216 // Create PDS client for this session
217 pdsClient, err := s.getPDSClient(ctx, session)
218 if err != nil {
219 s.logger.Error("failed to create PDS client",
220 "error", err,
221 "voter", session.AccountDID)
222 return fmt.Errorf("failed to create PDS client: %w", err)
223 }
224
225 // Find existing vote by querying PDS directly (source of truth)
226 // This avoids eventual consistency issues with the AppView database
227 existing, err := s.findExistingVote(ctx, pdsClient, req.Subject.URI)
228 if err != nil {
229 s.logger.Error("failed to find vote on PDS",
230 "error", err,
231 "voter", session.AccountDID,
232 "subject", req.Subject.URI)
233 return fmt.Errorf("failed to find vote: %w", err)
234 }
235 if existing == nil {
236 return ErrVoteNotFound
237 }
238
239 // Delete the vote record from user's PDS
240 if err := pdsClient.DeleteRecord(ctx, voteCollection, existing.RKey); err != nil {
241 s.logger.Error("failed to delete vote on PDS",
242 "error", err,
243 "voter", session.AccountDID,
244 "rkey", existing.RKey)
245 if pds.IsAuthError(err) {
246 return ErrNotAuthorized
247 }
248 return fmt.Errorf("failed to delete vote: %w", err)
249 }
250
251 s.logger.Info("vote deleted",
252 "voter", session.AccountDID,
253 "subject", req.Subject.URI,
254 "uri", existing.URI)
255
256 return nil
257}
258
259// createVoteRecord writes a vote record to the user's PDS using PDSClient
260func (s *voteService) createVoteRecord(ctx context.Context, pdsClient pds.Client, req CreateVoteRequest) (string, string, error) {
261 // Generate TID for the record key
262 tid := syntax.NewTIDNow(0)
263
264 // Build vote record following the lexicon schema
265 record := VoteRecord{
266 Type: voteCollection,
267 Subject: StrongRef{
268 URI: req.Subject.URI,
269 CID: req.Subject.CID,
270 },
271 Direction: req.Direction,
272 CreatedAt: time.Now().UTC().Format(time.RFC3339),
273 }
274
275 uri, cid, err := pdsClient.CreateRecord(ctx, voteCollection, tid.String(), record)
276 if err != nil {
277 return "", "", fmt.Errorf("createRecord failed: %w", err)
278 }
279
280 return uri, cid, nil
281}
282
283// existingVote represents a vote record found on the PDS
284type existingVote struct {
285 URI string
286 CID string
287 RKey string
288 Direction string
289}
290
291// findExistingVote queries the user's PDS directly to find an existing vote for a subject.
292// This avoids eventual consistency issues with the AppView database populated by Jetstream.
293// Paginates through all vote records to handle users with >100 votes.
294// Returns the vote record with rkey, or nil if no vote exists for the subject.
295func (s *voteService) findExistingVote(ctx context.Context, pdsClient pds.Client, subjectURI string) (*existingVote, error) {
296 cursor := ""
297 const pageSize = 100
298
299 // Paginate through all vote records
300 for {
301 result, err := pdsClient.ListRecords(ctx, voteCollection, pageSize, cursor)
302 if err != nil {
303 // Check for auth errors using typed errors
304 if pds.IsAuthError(err) {
305 return nil, ErrNotAuthorized
306 }
307 return nil, fmt.Errorf("listRecords failed: %w", err)
308 }
309
310 // Search for the vote matching our subject in this page
311 for _, rec := range result.Records {
312 // Extract subject from record value
313 subject, ok := rec.Value["subject"].(map[string]any)
314 if !ok {
315 continue
316 }
317
318 subjectURIValue, ok := subject["uri"].(string)
319 if !ok {
320 continue
321 }
322
323 if subjectURIValue == subjectURI {
324 // Extract rkey from the URI (at://did/collection/rkey)
325 parts := strings.Split(rec.URI, "/")
326 if len(parts) < 5 {
327 continue
328 }
329 rkey := parts[len(parts)-1]
330
331 // Extract direction
332 direction, _ := rec.Value["direction"].(string)
333
334 return &existingVote{
335 URI: rec.URI,
336 CID: rec.CID,
337 RKey: rkey,
338 Direction: direction,
339 }, nil
340 }
341 }
342
343 // Check if there are more pages
344 if result.Cursor == "" {
345 break // No more pages
346 }
347 cursor = result.Cursor
348 }
349
350 // No vote found for this subject after checking all pages
351 return nil, nil
352}