A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "strings" 8 9 "Coves/internal/core/votes" 10) 11 12type postgresVoteRepo struct { 13 db *sql.DB 14} 15 16// NewVoteRepository creates a new PostgreSQL vote repository 17func NewVoteRepository(db *sql.DB) votes.Repository { 18 return &postgresVoteRepo{db: db} 19} 20 21// Create inserts a new vote into the votes table 22// Called by Jetstream consumer after vote is created on PDS 23// Idempotent: Returns success if vote already exists (for Jetstream replays) 24func (r *postgresVoteRepo) Create(ctx context.Context, vote *votes.Vote) error { 25 query := ` 26 INSERT INTO votes ( 27 uri, cid, rkey, voter_did, 28 subject_uri, subject_cid, direction, 29 created_at, indexed_at 30 ) VALUES ( 31 $1, $2, $3, $4, 32 $5, $6, $7, 33 $8, NOW() 34 ) 35 ON CONFLICT (uri) DO NOTHING 36 RETURNING id, indexed_at 37 ` 38 39 err := r.db.QueryRowContext( 40 ctx, query, 41 vote.URI, vote.CID, vote.RKey, vote.VoterDID, 42 vote.SubjectURI, vote.SubjectCID, vote.Direction, 43 vote.CreatedAt, 44 ).Scan(&vote.ID, &vote.IndexedAt) 45 46 // ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent) 47 if err == sql.ErrNoRows { 48 return nil // Vote already exists, no error for idempotency 49 } 50 51 if err != nil { 52 // Check for unique constraint violation (voter + subject) 53 if strings.Contains(err.Error(), "duplicate key") && strings.Contains(err.Error(), "unique_voter_subject") { 54 return votes.ErrVoteAlreadyExists 55 } 56 57 // Check for DID format constraint violation 58 if strings.Contains(err.Error(), "chk_voter_did_format") { 59 return fmt.Errorf("invalid voter DID format: %s", vote.VoterDID) 60 } 61 62 return fmt.Errorf("failed to insert vote: %w", err) 63 } 64 65 return nil 66} 67 68// GetByURI retrieves a vote by its AT-URI 69// Used by Jetstream consumer for DELETE operations 70func (r *postgresVoteRepo) GetByURI(ctx context.Context, uri string) (*votes.Vote, error) { 71 query := ` 72 SELECT 73 id, uri, cid, rkey, voter_did, 74 subject_uri, subject_cid, direction, 75 created_at, indexed_at, deleted_at 76 FROM votes 77 WHERE uri = $1 78 ` 79 80 var vote votes.Vote 81 82 err := r.db.QueryRowContext(ctx, query, uri).Scan( 83 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID, 84 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction, 85 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt, 86 ) 87 88 if err == sql.ErrNoRows { 89 return nil, votes.ErrVoteNotFound 90 } 91 if err != nil { 92 return nil, fmt.Errorf("failed to get vote by URI: %w", err) 93 } 94 95 return &vote, nil 96} 97 98// GetByVoterAndSubject retrieves a user's vote on a specific subject 99// Used by service to check existing vote state before creating/toggling 100func (r *postgresVoteRepo) GetByVoterAndSubject(ctx context.Context, voterDID, subjectURI string) (*votes.Vote, error) { 101 query := ` 102 SELECT 103 id, uri, cid, rkey, voter_did, 104 subject_uri, subject_cid, direction, 105 created_at, indexed_at, deleted_at 106 FROM votes 107 WHERE voter_did = $1 AND subject_uri = $2 AND deleted_at IS NULL 108 ` 109 110 var vote votes.Vote 111 112 err := r.db.QueryRowContext(ctx, query, voterDID, subjectURI).Scan( 113 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID, 114 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction, 115 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt, 116 ) 117 118 if err == sql.ErrNoRows { 119 return nil, votes.ErrVoteNotFound 120 } 121 if err != nil { 122 return nil, fmt.Errorf("failed to get vote by voter and subject: %w", err) 123 } 124 125 return &vote, nil 126} 127 128// Delete soft-deletes a vote (sets deleted_at) 129// Called by Jetstream consumer after vote is deleted from PDS 130// Idempotent: Returns success if vote already deleted 131func (r *postgresVoteRepo) Delete(ctx context.Context, uri string) error { 132 query := ` 133 UPDATE votes 134 SET deleted_at = NOW() 135 WHERE uri = $1 AND deleted_at IS NULL 136 ` 137 138 result, err := r.db.ExecContext(ctx, query, uri) 139 if err != nil { 140 return fmt.Errorf("failed to delete vote: %w", err) 141 } 142 143 rowsAffected, err := result.RowsAffected() 144 if err != nil { 145 return fmt.Errorf("failed to check delete result: %w", err) 146 } 147 148 // Idempotent: If no rows affected, vote already deleted (OK for Jetstream replays) 149 if rowsAffected == 0 { 150 return nil 151 } 152 153 return nil 154} 155 156// ListBySubject retrieves all active votes on a specific post/comment 157// Future: Used for vote detail views 158func (r *postgresVoteRepo) ListBySubject(ctx context.Context, subjectURI string, limit, offset int) ([]*votes.Vote, error) { 159 query := ` 160 SELECT 161 id, uri, cid, rkey, voter_did, 162 subject_uri, subject_cid, direction, 163 created_at, indexed_at, deleted_at 164 FROM votes 165 WHERE subject_uri = $1 AND deleted_at IS NULL 166 ORDER BY created_at DESC 167 LIMIT $2 OFFSET $3 168 ` 169 170 rows, err := r.db.QueryContext(ctx, query, subjectURI, limit, offset) 171 if err != nil { 172 return nil, fmt.Errorf("failed to list votes by subject: %w", err) 173 } 174 defer func() { _ = rows.Close() }() 175 176 var result []*votes.Vote 177 for rows.Next() { 178 var vote votes.Vote 179 err := rows.Scan( 180 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID, 181 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction, 182 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt, 183 ) 184 if err != nil { 185 return nil, fmt.Errorf("failed to scan vote: %w", err) 186 } 187 result = append(result, &vote) 188 } 189 190 if err = rows.Err(); err != nil { 191 return nil, fmt.Errorf("error iterating votes: %w", err) 192 } 193 194 return result, nil 195} 196 197// ListByVoter retrieves all active votes by a specific user 198// Future: Used for user voting history 199func (r *postgresVoteRepo) ListByVoter(ctx context.Context, voterDID string, limit, offset int) ([]*votes.Vote, error) { 200 query := ` 201 SELECT 202 id, uri, cid, rkey, voter_did, 203 subject_uri, subject_cid, direction, 204 created_at, indexed_at, deleted_at 205 FROM votes 206 WHERE voter_did = $1 AND deleted_at IS NULL 207 ORDER BY created_at DESC 208 LIMIT $2 OFFSET $3 209 ` 210 211 rows, err := r.db.QueryContext(ctx, query, voterDID, limit, offset) 212 if err != nil { 213 return nil, fmt.Errorf("failed to list votes by voter: %w", err) 214 } 215 defer func() { _ = rows.Close() }() 216 217 var result []*votes.Vote 218 for rows.Next() { 219 var vote votes.Vote 220 err := rows.Scan( 221 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID, 222 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction, 223 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt, 224 ) 225 if err != nil { 226 return nil, fmt.Errorf("failed to scan vote: %w", err) 227 } 228 result = append(result, &vote) 229 } 230 231 if err = rows.Err(); err != nil { 232 return nil, fmt.Errorf("error iterating votes: %w", err) 233 } 234 235 return result, nil 236}