A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/votes"
5 "context"
6 "database/sql"
7 "fmt"
8 "strings"
9)
10
11type postgresVoteRepo struct {
12 db *sql.DB
13}
14
15// NewVoteRepository creates a new PostgreSQL vote repository
16func NewVoteRepository(db *sql.DB) votes.Repository {
17 return &postgresVoteRepo{db: db}
18}
19
20// Create inserts a new vote into the votes table
21// Called by Jetstream consumer after vote is created on PDS
22// Idempotent: Returns success if vote already exists (for Jetstream replays)
23func (r *postgresVoteRepo) Create(ctx context.Context, vote *votes.Vote) error {
24 query := `
25 INSERT INTO votes (
26 uri, cid, rkey, voter_did,
27 subject_uri, subject_cid, direction,
28 created_at, indexed_at
29 ) VALUES (
30 $1, $2, $3, $4,
31 $5, $6, $7,
32 $8, NOW()
33 )
34 ON CONFLICT (uri) DO NOTHING
35 RETURNING id, indexed_at
36 `
37
38 err := r.db.QueryRowContext(
39 ctx, query,
40 vote.URI, vote.CID, vote.RKey, vote.VoterDID,
41 vote.SubjectURI, vote.SubjectCID, vote.Direction,
42 vote.CreatedAt,
43 ).Scan(&vote.ID, &vote.IndexedAt)
44
45 // ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent)
46 if err == sql.ErrNoRows {
47 return nil // Vote already exists, no error for idempotency
48 }
49
50 if err != nil {
51 // Check for unique constraint violation (voter + subject)
52 if strings.Contains(err.Error(), "duplicate key") && strings.Contains(err.Error(), "unique_voter_subject") {
53 return votes.ErrVoteAlreadyExists
54 }
55
56 // Check for DID format constraint violation
57 if strings.Contains(err.Error(), "chk_voter_did_format") {
58 return fmt.Errorf("invalid voter DID format: %s", vote.VoterDID)
59 }
60
61 return fmt.Errorf("failed to insert vote: %w", err)
62 }
63
64 return nil
65}
66
67// GetByURI retrieves a vote by its AT-URI
68// Used by Jetstream consumer for DELETE operations
69func (r *postgresVoteRepo) GetByURI(ctx context.Context, uri string) (*votes.Vote, error) {
70 query := `
71 SELECT
72 id, uri, cid, rkey, voter_did,
73 subject_uri, subject_cid, direction,
74 created_at, indexed_at, deleted_at
75 FROM votes
76 WHERE uri = $1
77 `
78
79 var vote votes.Vote
80
81 err := r.db.QueryRowContext(ctx, query, uri).Scan(
82 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID,
83 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction,
84 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt,
85 )
86
87 if err == sql.ErrNoRows {
88 return nil, votes.ErrVoteNotFound
89 }
90 if err != nil {
91 return nil, fmt.Errorf("failed to get vote by URI: %w", err)
92 }
93
94 return &vote, nil
95}
96
97// GetByVoterAndSubject retrieves a user's vote on a specific subject
98// Used by service to check existing vote state before creating/toggling
99func (r *postgresVoteRepo) GetByVoterAndSubject(ctx context.Context, voterDID, subjectURI string) (*votes.Vote, error) {
100 query := `
101 SELECT
102 id, uri, cid, rkey, voter_did,
103 subject_uri, subject_cid, direction,
104 created_at, indexed_at, deleted_at
105 FROM votes
106 WHERE voter_did = $1 AND subject_uri = $2 AND deleted_at IS NULL
107 `
108
109 var vote votes.Vote
110
111 err := r.db.QueryRowContext(ctx, query, voterDID, subjectURI).Scan(
112 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID,
113 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction,
114 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt,
115 )
116
117 if err == sql.ErrNoRows {
118 return nil, votes.ErrVoteNotFound
119 }
120 if err != nil {
121 return nil, fmt.Errorf("failed to get vote by voter and subject: %w", err)
122 }
123
124 return &vote, nil
125}
126
127// Delete soft-deletes a vote (sets deleted_at)
128// Called by Jetstream consumer after vote is deleted from PDS
129// Idempotent: Returns success if vote already deleted
130func (r *postgresVoteRepo) Delete(ctx context.Context, uri string) error {
131 query := `
132 UPDATE votes
133 SET deleted_at = NOW()
134 WHERE uri = $1 AND deleted_at IS NULL
135 `
136
137 result, err := r.db.ExecContext(ctx, query, uri)
138 if err != nil {
139 return fmt.Errorf("failed to delete vote: %w", err)
140 }
141
142 rowsAffected, err := result.RowsAffected()
143 if err != nil {
144 return fmt.Errorf("failed to check delete result: %w", err)
145 }
146
147 // Idempotent: If no rows affected, vote already deleted (OK for Jetstream replays)
148 if rowsAffected == 0 {
149 return nil
150 }
151
152 return nil
153}
154
155// ListBySubject retrieves all active votes on a specific post/comment
156// Future: Used for vote detail views
157func (r *postgresVoteRepo) ListBySubject(ctx context.Context, subjectURI string, limit, offset int) ([]*votes.Vote, error) {
158 query := `
159 SELECT
160 id, uri, cid, rkey, voter_did,
161 subject_uri, subject_cid, direction,
162 created_at, indexed_at, deleted_at
163 FROM votes
164 WHERE subject_uri = $1 AND deleted_at IS NULL
165 ORDER BY created_at DESC
166 LIMIT $2 OFFSET $3
167 `
168
169 rows, err := r.db.QueryContext(ctx, query, subjectURI, limit, offset)
170 if err != nil {
171 return nil, fmt.Errorf("failed to list votes by subject: %w", err)
172 }
173 defer func() { _ = rows.Close() }()
174
175 var result []*votes.Vote
176 for rows.Next() {
177 var vote votes.Vote
178 err := rows.Scan(
179 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID,
180 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction,
181 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt,
182 )
183 if err != nil {
184 return nil, fmt.Errorf("failed to scan vote: %w", err)
185 }
186 result = append(result, &vote)
187 }
188
189 if err = rows.Err(); err != nil {
190 return nil, fmt.Errorf("error iterating votes: %w", err)
191 }
192
193 return result, nil
194}
195
196// ListByVoter retrieves all active votes by a specific user
197// Future: Used for user voting history
198func (r *postgresVoteRepo) ListByVoter(ctx context.Context, voterDID string, limit, offset int) ([]*votes.Vote, error) {
199 query := `
200 SELECT
201 id, uri, cid, rkey, voter_did,
202 subject_uri, subject_cid, direction,
203 created_at, indexed_at, deleted_at
204 FROM votes
205 WHERE voter_did = $1 AND deleted_at IS NULL
206 ORDER BY created_at DESC
207 LIMIT $2 OFFSET $3
208 `
209
210 rows, err := r.db.QueryContext(ctx, query, voterDID, limit, offset)
211 if err != nil {
212 return nil, fmt.Errorf("failed to list votes by voter: %w", err)
213 }
214 defer func() { _ = rows.Close() }()
215
216 var result []*votes.Vote
217 for rows.Next() {
218 var vote votes.Vote
219 err := rows.Scan(
220 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID,
221 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction,
222 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt,
223 )
224 if err != nil {
225 return nil, fmt.Errorf("failed to scan vote: %w", err)
226 }
227 result = append(result, &vote)
228 }
229
230 if err = rows.Err(); err != nil {
231 return nil, fmt.Errorf("error iterating votes: %w", err)
232 }
233
234 return result, nil
235}