A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "strings"
8
9 "Coves/internal/core/votes"
10)
11
12type postgresVoteRepo struct {
13 db *sql.DB
14}
15
16// NewVoteRepository creates a new PostgreSQL vote repository
17func NewVoteRepository(db *sql.DB) votes.Repository {
18 return &postgresVoteRepo{db: db}
19}
20
21// Create inserts a new vote into the votes table
22// Called by Jetstream consumer after vote is created on PDS
23// Idempotent: Returns success if vote already exists (for Jetstream replays)
24func (r *postgresVoteRepo) Create(ctx context.Context, vote *votes.Vote) error {
25 query := `
26 INSERT INTO votes (
27 uri, cid, rkey, voter_did,
28 subject_uri, subject_cid, direction,
29 created_at, indexed_at
30 ) VALUES (
31 $1, $2, $3, $4,
32 $5, $6, $7,
33 $8, NOW()
34 )
35 ON CONFLICT (uri) DO NOTHING
36 RETURNING id, indexed_at
37 `
38
39 err := r.db.QueryRowContext(
40 ctx, query,
41 vote.URI, vote.CID, vote.RKey, vote.VoterDID,
42 vote.SubjectURI, vote.SubjectCID, vote.Direction,
43 vote.CreatedAt,
44 ).Scan(&vote.ID, &vote.IndexedAt)
45
46 // ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent)
47 if err == sql.ErrNoRows {
48 return nil // Vote already exists, no error for idempotency
49 }
50
51 if err != nil {
52 // Check for unique constraint violation (voter + subject)
53 if strings.Contains(err.Error(), "duplicate key") && strings.Contains(err.Error(), "unique_voter_subject") {
54 return votes.ErrVoteAlreadyExists
55 }
56
57 // Check for DID format constraint violation
58 if strings.Contains(err.Error(), "chk_voter_did_format") {
59 return fmt.Errorf("invalid voter DID format: %s", vote.VoterDID)
60 }
61
62 return fmt.Errorf("failed to insert vote: %w", err)
63 }
64
65 return nil
66}
67
68// GetByURI retrieves a vote by its AT-URI
69// Used by Jetstream consumer for DELETE operations
70func (r *postgresVoteRepo) GetByURI(ctx context.Context, uri string) (*votes.Vote, error) {
71 query := `
72 SELECT
73 id, uri, cid, rkey, voter_did,
74 subject_uri, subject_cid, direction,
75 created_at, indexed_at, deleted_at
76 FROM votes
77 WHERE uri = $1
78 `
79
80 var vote votes.Vote
81
82 err := r.db.QueryRowContext(ctx, query, uri).Scan(
83 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID,
84 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction,
85 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt,
86 )
87
88 if err == sql.ErrNoRows {
89 return nil, votes.ErrVoteNotFound
90 }
91 if err != nil {
92 return nil, fmt.Errorf("failed to get vote by URI: %w", err)
93 }
94
95 return &vote, nil
96}
97
98// GetByVoterAndSubject retrieves a user's vote on a specific subject
99// Used by service to check existing vote state before creating/toggling
100func (r *postgresVoteRepo) GetByVoterAndSubject(ctx context.Context, voterDID, subjectURI string) (*votes.Vote, error) {
101 query := `
102 SELECT
103 id, uri, cid, rkey, voter_did,
104 subject_uri, subject_cid, direction,
105 created_at, indexed_at, deleted_at
106 FROM votes
107 WHERE voter_did = $1 AND subject_uri = $2 AND deleted_at IS NULL
108 `
109
110 var vote votes.Vote
111
112 err := r.db.QueryRowContext(ctx, query, voterDID, subjectURI).Scan(
113 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID,
114 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction,
115 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt,
116 )
117
118 if err == sql.ErrNoRows {
119 return nil, votes.ErrVoteNotFound
120 }
121 if err != nil {
122 return nil, fmt.Errorf("failed to get vote by voter and subject: %w", err)
123 }
124
125 return &vote, nil
126}
127
128// Delete soft-deletes a vote (sets deleted_at)
129// Called by Jetstream consumer after vote is deleted from PDS
130// Idempotent: Returns success if vote already deleted
131func (r *postgresVoteRepo) Delete(ctx context.Context, uri string) error {
132 query := `
133 UPDATE votes
134 SET deleted_at = NOW()
135 WHERE uri = $1 AND deleted_at IS NULL
136 `
137
138 result, err := r.db.ExecContext(ctx, query, uri)
139 if err != nil {
140 return fmt.Errorf("failed to delete vote: %w", err)
141 }
142
143 rowsAffected, err := result.RowsAffected()
144 if err != nil {
145 return fmt.Errorf("failed to check delete result: %w", err)
146 }
147
148 // Idempotent: If no rows affected, vote already deleted (OK for Jetstream replays)
149 if rowsAffected == 0 {
150 return nil
151 }
152
153 return nil
154}
155
156// ListBySubject retrieves all active votes on a specific post/comment
157// Future: Used for vote detail views
158func (r *postgresVoteRepo) ListBySubject(ctx context.Context, subjectURI string, limit, offset int) ([]*votes.Vote, error) {
159 query := `
160 SELECT
161 id, uri, cid, rkey, voter_did,
162 subject_uri, subject_cid, direction,
163 created_at, indexed_at, deleted_at
164 FROM votes
165 WHERE subject_uri = $1 AND deleted_at IS NULL
166 ORDER BY created_at DESC
167 LIMIT $2 OFFSET $3
168 `
169
170 rows, err := r.db.QueryContext(ctx, query, subjectURI, limit, offset)
171 if err != nil {
172 return nil, fmt.Errorf("failed to list votes by subject: %w", err)
173 }
174 defer func() { _ = rows.Close() }()
175
176 var result []*votes.Vote
177 for rows.Next() {
178 var vote votes.Vote
179 err := rows.Scan(
180 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID,
181 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction,
182 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt,
183 )
184 if err != nil {
185 return nil, fmt.Errorf("failed to scan vote: %w", err)
186 }
187 result = append(result, &vote)
188 }
189
190 if err = rows.Err(); err != nil {
191 return nil, fmt.Errorf("error iterating votes: %w", err)
192 }
193
194 return result, nil
195}
196
197// ListByVoter retrieves all active votes by a specific user
198// Future: Used for user voting history
199func (r *postgresVoteRepo) ListByVoter(ctx context.Context, voterDID string, limit, offset int) ([]*votes.Vote, error) {
200 query := `
201 SELECT
202 id, uri, cid, rkey, voter_did,
203 subject_uri, subject_cid, direction,
204 created_at, indexed_at, deleted_at
205 FROM votes
206 WHERE voter_did = $1 AND deleted_at IS NULL
207 ORDER BY created_at DESC
208 LIMIT $2 OFFSET $3
209 `
210
211 rows, err := r.db.QueryContext(ctx, query, voterDID, limit, offset)
212 if err != nil {
213 return nil, fmt.Errorf("failed to list votes by voter: %w", err)
214 }
215 defer func() { _ = rows.Close() }()
216
217 var result []*votes.Vote
218 for rows.Next() {
219 var vote votes.Vote
220 err := rows.Scan(
221 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID,
222 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction,
223 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt,
224 )
225 if err != nil {
226 return nil, fmt.Errorf("failed to scan vote: %w", err)
227 }
228 result = append(result, &vote)
229 }
230
231 if err = rows.Err(); err != nil {
232 return nil, fmt.Errorf("error iterating votes: %w", err)
233 }
234
235 return result, nil
236}