A community based topic aggregation platform built on atproto
1package votes
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "strings"
8 "sync"
9 "time"
10
11 "Coves/internal/atproto/pds"
12)
13
14// CachedVote represents a vote stored in the cache
15type CachedVote struct {
16 Direction string // "up" or "down"
17 URI string // vote record URI (at://did/collection/rkey)
18 RKey string // record key
19}
20
21// VoteCache provides an in-memory cache of user votes fetched from their PDS.
22// This avoids eventual consistency issues with the AppView database.
23type VoteCache struct {
24 mu sync.RWMutex
25 votes map[string]map[string]*CachedVote // userDID -> subjectURI -> vote
26 expiry map[string]time.Time // userDID -> expiry time
27 ttl time.Duration
28 logger *slog.Logger
29}
30
31// NewVoteCache creates a new vote cache with the specified TTL
32func NewVoteCache(ttl time.Duration, logger *slog.Logger) *VoteCache {
33 if logger == nil {
34 logger = slog.Default()
35 }
36 return &VoteCache{
37 votes: make(map[string]map[string]*CachedVote),
38 expiry: make(map[string]time.Time),
39 ttl: ttl,
40 logger: logger,
41 }
42}
43
44// GetVotesForUser returns all cached votes for a user.
45// Returns nil if cache is empty or expired for this user.
46func (c *VoteCache) GetVotesForUser(userDID string) map[string]*CachedVote {
47 c.mu.RLock()
48 defer c.mu.RUnlock()
49
50 // Check if cache exists and is not expired
51 expiry, exists := c.expiry[userDID]
52 if !exists || time.Now().After(expiry) {
53 return nil
54 }
55
56 return c.votes[userDID]
57}
58
59// GetVote returns the cached vote for a specific subject, or nil if not found/expired
60func (c *VoteCache) GetVote(userDID, subjectURI string) *CachedVote {
61 votes := c.GetVotesForUser(userDID)
62 if votes == nil {
63 return nil
64 }
65 return votes[subjectURI]
66}
67
68// IsCached returns true if the user's votes are cached and not expired
69func (c *VoteCache) IsCached(userDID string) bool {
70 c.mu.RLock()
71 defer c.mu.RUnlock()
72
73 expiry, exists := c.expiry[userDID]
74 return exists && time.Now().Before(expiry)
75}
76
77// SetVotesForUser replaces all cached votes for a user
78func (c *VoteCache) SetVotesForUser(userDID string, votes map[string]*CachedVote) {
79 c.mu.Lock()
80 defer c.mu.Unlock()
81
82 c.votes[userDID] = votes
83 c.expiry[userDID] = time.Now().Add(c.ttl)
84
85 c.logger.Debug("vote cache updated",
86 "user", userDID,
87 "vote_count", len(votes),
88 "expires_at", c.expiry[userDID])
89}
90
91// SetVote adds or updates a single vote in the cache
92func (c *VoteCache) SetVote(userDID, subjectURI string, vote *CachedVote) {
93 c.mu.Lock()
94 defer c.mu.Unlock()
95
96 if c.votes[userDID] == nil {
97 c.votes[userDID] = make(map[string]*CachedVote)
98 }
99
100 c.votes[userDID][subjectURI] = vote
101
102 // Always extend expiry on vote action - active users keep their cache fresh
103 c.expiry[userDID] = time.Now().Add(c.ttl)
104
105 c.logger.Debug("vote cached",
106 "user", userDID,
107 "subject", subjectURI,
108 "direction", vote.Direction)
109}
110
111// RemoveVote removes a vote from the cache (for toggle-off)
112func (c *VoteCache) RemoveVote(userDID, subjectURI string) {
113 c.mu.Lock()
114 defer c.mu.Unlock()
115
116 if c.votes[userDID] != nil {
117 delete(c.votes[userDID], subjectURI)
118
119 // Extend expiry on vote action - active users keep their cache fresh
120 c.expiry[userDID] = time.Now().Add(c.ttl)
121
122 c.logger.Debug("vote removed from cache",
123 "user", userDID,
124 "subject", subjectURI)
125 }
126}
127
128// Invalidate removes all cached votes for a user
129func (c *VoteCache) Invalidate(userDID string) {
130 c.mu.Lock()
131 defer c.mu.Unlock()
132
133 delete(c.votes, userDID)
134 delete(c.expiry, userDID)
135
136 c.logger.Debug("vote cache invalidated", "user", userDID)
137}
138
139// FetchAndCacheFromPDS fetches all votes from the user's PDS and caches them.
140// This should be called on first authenticated request or when cache is expired.
141func (c *VoteCache) FetchAndCacheFromPDS(ctx context.Context, pdsClient pds.Client) error {
142 userDID := pdsClient.DID()
143
144 c.logger.Debug("fetching votes from PDS",
145 "user", userDID,
146 "pds", pdsClient.HostURL())
147
148 votes, err := c.fetchAllVotesFromPDS(ctx, pdsClient)
149 if err != nil {
150 return fmt.Errorf("failed to fetch votes from PDS: %w", err)
151 }
152
153 c.SetVotesForUser(userDID, votes)
154
155 c.logger.Info("vote cache populated from PDS",
156 "user", userDID,
157 "vote_count", len(votes))
158
159 return nil
160}
161
162// fetchAllVotesFromPDS paginates through all vote records on the user's PDS
163func (c *VoteCache) fetchAllVotesFromPDS(ctx context.Context, pdsClient pds.Client) (map[string]*CachedVote, error) {
164 votes := make(map[string]*CachedVote)
165 cursor := ""
166 const pageSize = 100
167 const collection = "social.coves.feed.vote"
168
169 for {
170 result, err := pdsClient.ListRecords(ctx, collection, pageSize, cursor)
171 if err != nil {
172 if pds.IsAuthError(err) {
173 return nil, ErrNotAuthorized
174 }
175 return nil, fmt.Errorf("listRecords failed: %w", err)
176 }
177
178 for _, rec := range result.Records {
179 // Extract subject from record value
180 subject, ok := rec.Value["subject"].(map[string]any)
181 if !ok {
182 continue
183 }
184
185 subjectURI, ok := subject["uri"].(string)
186 if !ok || subjectURI == "" {
187 continue
188 }
189
190 direction, _ := rec.Value["direction"].(string)
191 if direction == "" {
192 continue
193 }
194
195 // Extract rkey from URI
196 rkey := extractRKeyFromURI(rec.URI)
197
198 votes[subjectURI] = &CachedVote{
199 Direction: direction,
200 URI: rec.URI,
201 RKey: rkey,
202 }
203 }
204
205 if result.Cursor == "" {
206 break
207 }
208 cursor = result.Cursor
209 }
210
211 return votes, nil
212}
213
214// extractRKeyFromURI extracts the rkey from an AT-URI (at://did/collection/rkey)
215func extractRKeyFromURI(uri string) string {
216 parts := strings.Split(uri, "/")
217 if len(parts) >= 5 {
218 return parts[len(parts)-1]
219 }
220 return ""
221}