A community based topic aggregation platform built on atproto
at main 5.6 kB view raw
1package votes 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "strings" 8 "sync" 9 "time" 10 11 "Coves/internal/atproto/pds" 12) 13 14// CachedVote represents a vote stored in the cache 15type CachedVote struct { 16 Direction string // "up" or "down" 17 URI string // vote record URI (at://did/collection/rkey) 18 RKey string // record key 19} 20 21// VoteCache provides an in-memory cache of user votes fetched from their PDS. 22// This avoids eventual consistency issues with the AppView database. 23type VoteCache struct { 24 mu sync.RWMutex 25 votes map[string]map[string]*CachedVote // userDID -> subjectURI -> vote 26 expiry map[string]time.Time // userDID -> expiry time 27 ttl time.Duration 28 logger *slog.Logger 29} 30 31// NewVoteCache creates a new vote cache with the specified TTL 32func NewVoteCache(ttl time.Duration, logger *slog.Logger) *VoteCache { 33 if logger == nil { 34 logger = slog.Default() 35 } 36 return &VoteCache{ 37 votes: make(map[string]map[string]*CachedVote), 38 expiry: make(map[string]time.Time), 39 ttl: ttl, 40 logger: logger, 41 } 42} 43 44// GetVotesForUser returns all cached votes for a user. 45// Returns nil if cache is empty or expired for this user. 46func (c *VoteCache) GetVotesForUser(userDID string) map[string]*CachedVote { 47 c.mu.RLock() 48 defer c.mu.RUnlock() 49 50 // Check if cache exists and is not expired 51 expiry, exists := c.expiry[userDID] 52 if !exists || time.Now().After(expiry) { 53 return nil 54 } 55 56 return c.votes[userDID] 57} 58 59// GetVote returns the cached vote for a specific subject, or nil if not found/expired 60func (c *VoteCache) GetVote(userDID, subjectURI string) *CachedVote { 61 votes := c.GetVotesForUser(userDID) 62 if votes == nil { 63 return nil 64 } 65 return votes[subjectURI] 66} 67 68// IsCached returns true if the user's votes are cached and not expired 69func (c *VoteCache) IsCached(userDID string) bool { 70 c.mu.RLock() 71 defer c.mu.RUnlock() 72 73 expiry, exists := c.expiry[userDID] 74 return exists && time.Now().Before(expiry) 75} 76 77// SetVotesForUser replaces all cached votes for a user 78func (c *VoteCache) SetVotesForUser(userDID string, votes map[string]*CachedVote) { 79 c.mu.Lock() 80 defer c.mu.Unlock() 81 82 c.votes[userDID] = votes 83 c.expiry[userDID] = time.Now().Add(c.ttl) 84 85 c.logger.Debug("vote cache updated", 86 "user", userDID, 87 "vote_count", len(votes), 88 "expires_at", c.expiry[userDID]) 89} 90 91// SetVote adds or updates a single vote in the cache 92func (c *VoteCache) SetVote(userDID, subjectURI string, vote *CachedVote) { 93 c.mu.Lock() 94 defer c.mu.Unlock() 95 96 if c.votes[userDID] == nil { 97 c.votes[userDID] = make(map[string]*CachedVote) 98 } 99 100 c.votes[userDID][subjectURI] = vote 101 102 // Always extend expiry on vote action - active users keep their cache fresh 103 c.expiry[userDID] = time.Now().Add(c.ttl) 104 105 c.logger.Debug("vote cached", 106 "user", userDID, 107 "subject", subjectURI, 108 "direction", vote.Direction) 109} 110 111// RemoveVote removes a vote from the cache (for toggle-off) 112func (c *VoteCache) RemoveVote(userDID, subjectURI string) { 113 c.mu.Lock() 114 defer c.mu.Unlock() 115 116 if c.votes[userDID] != nil { 117 delete(c.votes[userDID], subjectURI) 118 119 // Extend expiry on vote action - active users keep their cache fresh 120 c.expiry[userDID] = time.Now().Add(c.ttl) 121 122 c.logger.Debug("vote removed from cache", 123 "user", userDID, 124 "subject", subjectURI) 125 } 126} 127 128// Invalidate removes all cached votes for a user 129func (c *VoteCache) Invalidate(userDID string) { 130 c.mu.Lock() 131 defer c.mu.Unlock() 132 133 delete(c.votes, userDID) 134 delete(c.expiry, userDID) 135 136 c.logger.Debug("vote cache invalidated", "user", userDID) 137} 138 139// FetchAndCacheFromPDS fetches all votes from the user's PDS and caches them. 140// This should be called on first authenticated request or when cache is expired. 141func (c *VoteCache) FetchAndCacheFromPDS(ctx context.Context, pdsClient pds.Client) error { 142 userDID := pdsClient.DID() 143 144 c.logger.Debug("fetching votes from PDS", 145 "user", userDID, 146 "pds", pdsClient.HostURL()) 147 148 votes, err := c.fetchAllVotesFromPDS(ctx, pdsClient) 149 if err != nil { 150 return fmt.Errorf("failed to fetch votes from PDS: %w", err) 151 } 152 153 c.SetVotesForUser(userDID, votes) 154 155 c.logger.Info("vote cache populated from PDS", 156 "user", userDID, 157 "vote_count", len(votes)) 158 159 return nil 160} 161 162// fetchAllVotesFromPDS paginates through all vote records on the user's PDS 163func (c *VoteCache) fetchAllVotesFromPDS(ctx context.Context, pdsClient pds.Client) (map[string]*CachedVote, error) { 164 votes := make(map[string]*CachedVote) 165 cursor := "" 166 const pageSize = 100 167 const collection = "social.coves.feed.vote" 168 169 for { 170 result, err := pdsClient.ListRecords(ctx, collection, pageSize, cursor) 171 if err != nil { 172 if pds.IsAuthError(err) { 173 return nil, ErrNotAuthorized 174 } 175 return nil, fmt.Errorf("listRecords failed: %w", err) 176 } 177 178 for _, rec := range result.Records { 179 // Extract subject from record value 180 subject, ok := rec.Value["subject"].(map[string]any) 181 if !ok { 182 continue 183 } 184 185 subjectURI, ok := subject["uri"].(string) 186 if !ok || subjectURI == "" { 187 continue 188 } 189 190 direction, _ := rec.Value["direction"].(string) 191 if direction == "" { 192 continue 193 } 194 195 // Extract rkey from URI 196 rkey := extractRKeyFromURI(rec.URI) 197 198 votes[subjectURI] = &CachedVote{ 199 Direction: direction, 200 URI: rec.URI, 201 RKey: rkey, 202 } 203 } 204 205 if result.Cursor == "" { 206 break 207 } 208 cursor = result.Cursor 209 } 210 211 return votes, nil 212} 213 214// extractRKeyFromURI extracts the rkey from an AT-URI (at://did/collection/rkey) 215func extractRKeyFromURI(uri string) string { 216 parts := strings.Split(uri, "/") 217 if len(parts) >= 5 { 218 return parts[len(parts)-1] 219 } 220 return "" 221}