A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/votes"
5 "context"
6 "database/sql"
7 "fmt"
8 "strings"
9)
10
11type postgresVoteRepo struct {
12 db *sql.DB
13}
14
15// NewVoteRepository creates a new PostgreSQL vote repository
16func NewVoteRepository(db *sql.DB) votes.Repository {
17 return &postgresVoteRepo{db: db}
18}
19
20// Create inserts a new vote into the votes table
21// Called by Jetstream consumer after vote is created on PDS
22// Idempotent: Returns success if vote already exists (for Jetstream replays)
23func (r *postgresVoteRepo) Create(ctx context.Context, vote *votes.Vote) error {
24 query := `
25 INSERT INTO votes (
26 uri, cid, rkey, voter_did,
27 subject_uri, subject_cid, direction,
28 created_at, indexed_at
29 ) VALUES (
30 $1, $2, $3, $4,
31 $5, $6, $7,
32 $8, NOW()
33 )
34 ON CONFLICT (uri) DO NOTHING
35 RETURNING id, indexed_at
36 `
37
38 err := r.db.QueryRowContext(
39 ctx, query,
40 vote.URI, vote.CID, vote.RKey, vote.VoterDID,
41 vote.SubjectURI, vote.SubjectCID, vote.Direction,
42 vote.CreatedAt,
43 ).Scan(&vote.ID, &vote.IndexedAt)
44
45 // ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent)
46 if err == sql.ErrNoRows {
47 return nil // Vote already exists, no error for idempotency
48 }
49
50 if err != nil {
51 // Check for unique constraint violation (voter + subject)
52 if strings.Contains(err.Error(), "duplicate key") && strings.Contains(err.Error(), "unique_voter_subject") {
53 return votes.ErrVoteAlreadyExists
54 }
55
56 // Check for DID format constraint violation
57 if strings.Contains(err.Error(), "chk_voter_did_format") {
58 return fmt.Errorf("invalid voter DID format: %s", vote.VoterDID)
59 }
60
61 return fmt.Errorf("failed to insert vote: %w", err)
62 }
63
64 return nil
65}
66
67// GetByURI retrieves an active vote by its AT-URI
68// Used by Jetstream consumer for DELETE operations
69// Returns ErrVoteNotFound for soft-deleted votes
70func (r *postgresVoteRepo) GetByURI(ctx context.Context, uri string) (*votes.Vote, error) {
71 query := `
72 SELECT
73 id, uri, cid, rkey, voter_did,
74 subject_uri, subject_cid, direction,
75 created_at, indexed_at, deleted_at
76 FROM votes
77 WHERE uri = $1 AND deleted_at IS NULL
78 `
79
80 var vote votes.Vote
81
82 err := r.db.QueryRowContext(ctx, query, uri).Scan(
83 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID,
84 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction,
85 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt,
86 )
87
88 if err == sql.ErrNoRows {
89 return nil, votes.ErrVoteNotFound
90 }
91 if err != nil {
92 return nil, fmt.Errorf("failed to get vote by URI: %w", err)
93 }
94
95 return &vote, nil
96}
97
98// GetByVoterAndSubject retrieves a user's vote on a specific subject
99// Used by service to check existing vote state before creating/toggling
100func (r *postgresVoteRepo) GetByVoterAndSubject(ctx context.Context, voterDID, subjectURI string) (*votes.Vote, error) {
101 query := `
102 SELECT
103 id, uri, cid, rkey, voter_did,
104 subject_uri, subject_cid, direction,
105 created_at, indexed_at, deleted_at
106 FROM votes
107 WHERE voter_did = $1 AND subject_uri = $2 AND deleted_at IS NULL
108 `
109
110 var vote votes.Vote
111
112 err := r.db.QueryRowContext(ctx, query, voterDID, subjectURI).Scan(
113 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID,
114 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction,
115 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt,
116 )
117
118 if err == sql.ErrNoRows {
119 return nil, votes.ErrVoteNotFound
120 }
121 if err != nil {
122 return nil, fmt.Errorf("failed to get vote by voter and subject: %w", err)
123 }
124
125 return &vote, nil
126}
127
128// Delete soft-deletes a vote (sets deleted_at)
129// Called by Jetstream consumer after vote is deleted from PDS
130// Idempotent: Returns success if vote already deleted
131func (r *postgresVoteRepo) Delete(ctx context.Context, uri string) error {
132 query := `
133 UPDATE votes
134 SET deleted_at = NOW()
135 WHERE uri = $1 AND deleted_at IS NULL
136 `
137
138 result, err := r.db.ExecContext(ctx, query, uri)
139 if err != nil {
140 return fmt.Errorf("failed to delete vote: %w", err)
141 }
142
143 rowsAffected, err := result.RowsAffected()
144 if err != nil {
145 return fmt.Errorf("failed to check delete result: %w", err)
146 }
147
148 // Idempotent: If no rows affected, vote already deleted (OK for Jetstream replays)
149 if rowsAffected == 0 {
150 return nil
151 }
152
153 return nil
154}
155
156// ListBySubject retrieves all active votes on a specific post/comment
157// Future: Used for vote detail views
158func (r *postgresVoteRepo) ListBySubject(ctx context.Context, subjectURI string, limit, offset int) ([]*votes.Vote, error) {
159 query := `
160 SELECT
161 id, uri, cid, rkey, voter_did,
162 subject_uri, subject_cid, direction,
163 created_at, indexed_at, deleted_at
164 FROM votes
165 WHERE subject_uri = $1 AND deleted_at IS NULL
166 ORDER BY created_at DESC
167 LIMIT $2 OFFSET $3
168 `
169
170 rows, err := r.db.QueryContext(ctx, query, subjectURI, limit, offset)
171 if err != nil {
172 return nil, fmt.Errorf("failed to list votes by subject: %w", err)
173 }
174 defer func() { _ = rows.Close() }()
175
176 var result []*votes.Vote
177 for rows.Next() {
178 var vote votes.Vote
179 err := rows.Scan(
180 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID,
181 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction,
182 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt,
183 )
184 if err != nil {
185 return nil, fmt.Errorf("failed to scan vote: %w", err)
186 }
187 result = append(result, &vote)
188 }
189
190 if err = rows.Err(); err != nil {
191 return nil, fmt.Errorf("error iterating votes: %w", err)
192 }
193
194 return result, nil
195}
196
197// ListByVoter retrieves all active votes by a specific user
198// Future: Used for user voting history
199func (r *postgresVoteRepo) ListByVoter(ctx context.Context, voterDID string, limit, offset int) ([]*votes.Vote, error) {
200 query := `
201 SELECT
202 id, uri, cid, rkey, voter_did,
203 subject_uri, subject_cid, direction,
204 created_at, indexed_at, deleted_at
205 FROM votes
206 WHERE voter_did = $1 AND deleted_at IS NULL
207 ORDER BY created_at DESC
208 LIMIT $2 OFFSET $3
209 `
210
211 rows, err := r.db.QueryContext(ctx, query, voterDID, limit, offset)
212 if err != nil {
213 return nil, fmt.Errorf("failed to list votes by voter: %w", err)
214 }
215 defer func() { _ = rows.Close() }()
216
217 var result []*votes.Vote
218 for rows.Next() {
219 var vote votes.Vote
220 err := rows.Scan(
221 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID,
222 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction,
223 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt,
224 )
225 if err != nil {
226 return nil, fmt.Errorf("failed to scan vote: %w", err)
227 }
228 result = append(result, &vote)
229 }
230
231 if err = rows.Err(); err != nil {
232 return nil, fmt.Errorf("error iterating votes: %w", err)
233 }
234
235 return result, nil
236}