A community based topic aggregation platform built on atproto
at main 6.5 kB view raw
1package postgres 2 3import ( 4 "Coves/internal/core/votes" 5 "context" 6 "database/sql" 7 "fmt" 8 "strings" 9) 10 11type postgresVoteRepo struct { 12 db *sql.DB 13} 14 15// NewVoteRepository creates a new PostgreSQL vote repository 16func NewVoteRepository(db *sql.DB) votes.Repository { 17 return &postgresVoteRepo{db: db} 18} 19 20// Create inserts a new vote into the votes table 21// Called by Jetstream consumer after vote is created on PDS 22// Idempotent: Returns success if vote already exists (for Jetstream replays) 23func (r *postgresVoteRepo) Create(ctx context.Context, vote *votes.Vote) error { 24 query := ` 25 INSERT INTO votes ( 26 uri, cid, rkey, voter_did, 27 subject_uri, subject_cid, direction, 28 created_at, indexed_at 29 ) VALUES ( 30 $1, $2, $3, $4, 31 $5, $6, $7, 32 $8, NOW() 33 ) 34 ON CONFLICT (uri) DO NOTHING 35 RETURNING id, indexed_at 36 ` 37 38 err := r.db.QueryRowContext( 39 ctx, query, 40 vote.URI, vote.CID, vote.RKey, vote.VoterDID, 41 vote.SubjectURI, vote.SubjectCID, vote.Direction, 42 vote.CreatedAt, 43 ).Scan(&vote.ID, &vote.IndexedAt) 44 45 // ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent) 46 if err == sql.ErrNoRows { 47 return nil // Vote already exists, no error for idempotency 48 } 49 50 if err != nil { 51 // Check for unique constraint violation (voter + subject) 52 if strings.Contains(err.Error(), "duplicate key") && strings.Contains(err.Error(), "unique_voter_subject") { 53 return votes.ErrVoteAlreadyExists 54 } 55 56 // Check for DID format constraint violation 57 if strings.Contains(err.Error(), "chk_voter_did_format") { 58 return fmt.Errorf("invalid voter DID format: %s", vote.VoterDID) 59 } 60 61 return fmt.Errorf("failed to insert vote: %w", err) 62 } 63 64 return nil 65} 66 67// GetByURI retrieves an active vote by its AT-URI 68// Used by Jetstream consumer for DELETE operations 69// Returns ErrVoteNotFound for soft-deleted votes 70func (r *postgresVoteRepo) GetByURI(ctx context.Context, uri string) (*votes.Vote, error) { 71 query := ` 72 SELECT 73 id, uri, cid, rkey, voter_did, 74 subject_uri, subject_cid, direction, 75 created_at, indexed_at, deleted_at 76 FROM votes 77 WHERE uri = $1 AND deleted_at IS NULL 78 ` 79 80 var vote votes.Vote 81 82 err := r.db.QueryRowContext(ctx, query, uri).Scan( 83 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID, 84 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction, 85 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt, 86 ) 87 88 if err == sql.ErrNoRows { 89 return nil, votes.ErrVoteNotFound 90 } 91 if err != nil { 92 return nil, fmt.Errorf("failed to get vote by URI: %w", err) 93 } 94 95 return &vote, nil 96} 97 98// GetByVoterAndSubject retrieves a user's vote on a specific subject 99// Used by service to check existing vote state before creating/toggling 100func (r *postgresVoteRepo) GetByVoterAndSubject(ctx context.Context, voterDID, subjectURI string) (*votes.Vote, error) { 101 query := ` 102 SELECT 103 id, uri, cid, rkey, voter_did, 104 subject_uri, subject_cid, direction, 105 created_at, indexed_at, deleted_at 106 FROM votes 107 WHERE voter_did = $1 AND subject_uri = $2 AND deleted_at IS NULL 108 ` 109 110 var vote votes.Vote 111 112 err := r.db.QueryRowContext(ctx, query, voterDID, subjectURI).Scan( 113 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID, 114 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction, 115 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt, 116 ) 117 118 if err == sql.ErrNoRows { 119 return nil, votes.ErrVoteNotFound 120 } 121 if err != nil { 122 return nil, fmt.Errorf("failed to get vote by voter and subject: %w", err) 123 } 124 125 return &vote, nil 126} 127 128// Delete soft-deletes a vote (sets deleted_at) 129// Called by Jetstream consumer after vote is deleted from PDS 130// Idempotent: Returns success if vote already deleted 131func (r *postgresVoteRepo) Delete(ctx context.Context, uri string) error { 132 query := ` 133 UPDATE votes 134 SET deleted_at = NOW() 135 WHERE uri = $1 AND deleted_at IS NULL 136 ` 137 138 result, err := r.db.ExecContext(ctx, query, uri) 139 if err != nil { 140 return fmt.Errorf("failed to delete vote: %w", err) 141 } 142 143 rowsAffected, err := result.RowsAffected() 144 if err != nil { 145 return fmt.Errorf("failed to check delete result: %w", err) 146 } 147 148 // Idempotent: If no rows affected, vote already deleted (OK for Jetstream replays) 149 if rowsAffected == 0 { 150 return nil 151 } 152 153 return nil 154} 155 156// ListBySubject retrieves all active votes on a specific post/comment 157// Future: Used for vote detail views 158func (r *postgresVoteRepo) ListBySubject(ctx context.Context, subjectURI string, limit, offset int) ([]*votes.Vote, error) { 159 query := ` 160 SELECT 161 id, uri, cid, rkey, voter_did, 162 subject_uri, subject_cid, direction, 163 created_at, indexed_at, deleted_at 164 FROM votes 165 WHERE subject_uri = $1 AND deleted_at IS NULL 166 ORDER BY created_at DESC 167 LIMIT $2 OFFSET $3 168 ` 169 170 rows, err := r.db.QueryContext(ctx, query, subjectURI, limit, offset) 171 if err != nil { 172 return nil, fmt.Errorf("failed to list votes by subject: %w", err) 173 } 174 defer func() { _ = rows.Close() }() 175 176 var result []*votes.Vote 177 for rows.Next() { 178 var vote votes.Vote 179 err := rows.Scan( 180 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID, 181 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction, 182 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt, 183 ) 184 if err != nil { 185 return nil, fmt.Errorf("failed to scan vote: %w", err) 186 } 187 result = append(result, &vote) 188 } 189 190 if err = rows.Err(); err != nil { 191 return nil, fmt.Errorf("error iterating votes: %w", err) 192 } 193 194 return result, nil 195} 196 197// ListByVoter retrieves all active votes by a specific user 198// Future: Used for user voting history 199func (r *postgresVoteRepo) ListByVoter(ctx context.Context, voterDID string, limit, offset int) ([]*votes.Vote, error) { 200 query := ` 201 SELECT 202 id, uri, cid, rkey, voter_did, 203 subject_uri, subject_cid, direction, 204 created_at, indexed_at, deleted_at 205 FROM votes 206 WHERE voter_did = $1 AND deleted_at IS NULL 207 ORDER BY created_at DESC 208 LIMIT $2 OFFSET $3 209 ` 210 211 rows, err := r.db.QueryContext(ctx, query, voterDID, limit, offset) 212 if err != nil { 213 return nil, fmt.Errorf("failed to list votes by voter: %w", err) 214 } 215 defer func() { _ = rows.Close() }() 216 217 var result []*votes.Vote 218 for rows.Next() { 219 var vote votes.Vote 220 err := rows.Scan( 221 &vote.ID, &vote.URI, &vote.CID, &vote.RKey, &vote.VoterDID, 222 &vote.SubjectURI, &vote.SubjectCID, &vote.Direction, 223 &vote.CreatedAt, &vote.IndexedAt, &vote.DeletedAt, 224 ) 225 if err != nil { 226 return nil, fmt.Errorf("failed to scan vote: %w", err) 227 } 228 result = append(result, &vote) 229 } 230 231 if err = rows.Err(); err != nil { 232 return nil, fmt.Errorf("error iterating votes: %w", err) 233 } 234 235 return result, nil 236}