A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "Coves/internal/core/votes" 5 "context" 6 "database/sql" 7 "fmt" 8 "strings" 9) 10 11type postgresVoteRepo struct { 12 db *sql.DB 13} 14 15// NewVoteRepository creates a new PostgreSQL vote repository 16func NewVoteRepository(db *sql.DB) votes.Repository { 17 return &postgresVoteRepo{db: db} 18} 19 20// Create inserts a new vote into the votes table 21// Called by Jetstream consumer after vote is created on PDS 22// Idempotent: Returns success if vote already exists (for Jetstream replays) 23func (r *postgresVoteRepo) Create(ctx context.Context, vote *votes.Vote) error { 24 query := ` 25 INSERT INTO votes ( 26 uri, cid, rkey, voter_did, 27 subject_uri, subject_cid, direction, 28 created_at, indexed_at 29 ) VALUES ( 30 $1, $2, $3, $4, 31 $5, $6, $7, 32 $8, NOW() 33 ) 34 ON CONFLICT (uri) DO NOTHING 35 RETURNING id, indexed_at 36 ` 37 38 err := r.db.QueryRowContext( 39 ctx, query, 40 vote.URI, vote.CID, vote.RKey, vote.VoterDID, 41 vote.SubjectURI, vote.SubjectCID, vote.Direction, 42 vote.CreatedAt, 43 ).Scan(&vote.ID, &vote.IndexedAt) 44 45 // ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent) 46 if err == sql.ErrNoRows { 47 return nil // Vote already exists, no error for idempotency 48 } 49 50 if err != nil { 51 // Check for unique constraint violation (voter + subject) 52 if strings.Contains(err.Error(), "duplicate key") && strings.Contains(err.Error(), "unique_voter_subject") { 53 return votes.ErrVoteAlreadyExists 54 } 55 56 // Check for DID format constraint violation 57 if strings.Contains(err.Error(), "chk_voter_did_format") { 58 return fmt.Errorf("invalid voter DID format: %s", vote.VoterDID) 59 } 60 61 return fmt.Errorf("failed to insert vote: %w", err) 62 } 63 64 return nil 65} 66 67// GetByURI retrieves a vote by its AT-URI 68// Used by Jetstream consumer for DELETE operations 69func (r *postgresVoteRepo) GetByURI(ctx context.Context, uri string) (*votes.Vote, error) { 70 query := ` 71 SELECT 72 id, uri, cid, rkey, voter_did, 73 subject_uri, subject_cid, direction, 74 created_at, indexed_at, deleted_at 75 FROM votes 76 WHERE uri = $1 77 ` 78 79 var vote votes.Vote 80 81 err := r.db.QueryRowContext(ctx, query, uri).Scan( 82 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID, 83 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction, 84 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt, 85 ) 86 87 if err == sql.ErrNoRows { 88 return nil, votes.ErrVoteNotFound 89 } 90 if err != nil { 91 return nil, fmt.Errorf("failed to get vote by URI: %w", err) 92 } 93 94 return &vote, nil 95} 96 97// GetByVoterAndSubject retrieves a user's vote on a specific subject 98// Used by service to check existing vote state before creating/toggling 99func (r *postgresVoteRepo) GetByVoterAndSubject(ctx context.Context, voterDID, subjectURI string) (*votes.Vote, error) { 100 query := ` 101 SELECT 102 id, uri, cid, rkey, voter_did, 103 subject_uri, subject_cid, direction, 104 created_at, indexed_at, deleted_at 105 FROM votes 106 WHERE voter_did = $1 AND subject_uri = $2 AND deleted_at IS NULL 107 ` 108 109 var vote votes.Vote 110 111 err := r.db.QueryRowContext(ctx, query, voterDID, subjectURI).Scan( 112 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID, 113 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction, 114 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt, 115 ) 116 117 if err == sql.ErrNoRows { 118 return nil, votes.ErrVoteNotFound 119 } 120 if err != nil { 121 return nil, fmt.Errorf("failed to get vote by voter and subject: %w", err) 122 } 123 124 return &vote, nil 125} 126 127// Delete soft-deletes a vote (sets deleted_at) 128// Called by Jetstream consumer after vote is deleted from PDS 129// Idempotent: Returns success if vote already deleted 130func (r *postgresVoteRepo) Delete(ctx context.Context, uri string) error { 131 query := ` 132 UPDATE votes 133 SET deleted_at = NOW() 134 WHERE uri = $1 AND deleted_at IS NULL 135 ` 136 137 result, err := r.db.ExecContext(ctx, query, uri) 138 if err != nil { 139 return fmt.Errorf("failed to delete vote: %w", err) 140 } 141 142 rowsAffected, err := result.RowsAffected() 143 if err != nil { 144 return fmt.Errorf("failed to check delete result: %w", err) 145 } 146 147 // Idempotent: If no rows affected, vote already deleted (OK for Jetstream replays) 148 if rowsAffected == 0 { 149 return nil 150 } 151 152 return nil 153} 154 155// ListBySubject retrieves all active votes on a specific post/comment 156// Future: Used for vote detail views 157func (r *postgresVoteRepo) ListBySubject(ctx context.Context, subjectURI string, limit, offset int) ([]*votes.Vote, error) { 158 query := ` 159 SELECT 160 id, uri, cid, rkey, voter_did, 161 subject_uri, subject_cid, direction, 162 created_at, indexed_at, deleted_at 163 FROM votes 164 WHERE subject_uri = $1 AND deleted_at IS NULL 165 ORDER BY created_at DESC 166 LIMIT $2 OFFSET $3 167 ` 168 169 rows, err := r.db.QueryContext(ctx, query, subjectURI, limit, offset) 170 if err != nil { 171 return nil, fmt.Errorf("failed to list votes by subject: %w", err) 172 } 173 defer func() { _ = rows.Close() }() 174 175 var result []*votes.Vote 176 for rows.Next() { 177 var vote votes.Vote 178 err := rows.Scan( 179 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID, 180 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction, 181 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt, 182 ) 183 if err != nil { 184 return nil, fmt.Errorf("failed to scan vote: %w", err) 185 } 186 result = append(result, &vote) 187 } 188 189 if err = rows.Err(); err != nil { 190 return nil, fmt.Errorf("error iterating votes: %w", err) 191 } 192 193 return result, nil 194} 195 196// ListByVoter retrieves all active votes by a specific user 197// Future: Used for user voting history 198func (r *postgresVoteRepo) ListByVoter(ctx context.Context, voterDID string, limit, offset int) ([]*votes.Vote, error) { 199 query := ` 200 SELECT 201 id, uri, cid, rkey, voter_did, 202 subject_uri, subject_cid, direction, 203 created_at, indexed_at, deleted_at 204 FROM votes 205 WHERE voter_did = $1 AND deleted_at IS NULL 206 ORDER BY created_at DESC 207 LIMIT $2 OFFSET $3 208 ` 209 210 rows, err := r.db.QueryContext(ctx, query, voterDID, limit, offset) 211 if err != nil { 212 return nil, fmt.Errorf("failed to list votes by voter: %w", err) 213 } 214 defer func() { _ = rows.Close() }() 215 216 var result []*votes.Vote 217 for rows.Next() { 218 var vote votes.Vote 219 err := rows.Scan( 220 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID, 221 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction, 222 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt, 223 ) 224 if err != nil { 225 return nil, fmt.Errorf("failed to scan vote: %w", err) 226 } 227 result = append(result, &vote) 228 } 229 230 if err = rows.Err(); err != nil { 231 return nil, fmt.Errorf("error iterating votes: %w", err) 232 } 233 234 return result, nil 235}