···
// Atomically: Index vote + Update post counts
+
wasNew, err := c.indexVoteAndUpdateCounts(ctx, vote)
return fmt.Errorf("failed to index vote and update counts: %w", err)
+
log.Printf("✓ Indexed vote: %s (%s on %s)", uri, vote.Direction, vote.SubjectURI)
···
// indexVoteAndUpdateCounts atomically indexes a vote and updates post vote counts
+
// Returns (true, nil) if vote was newly inserted, (false, nil) if already existed (idempotent)
+
func (c *VoteEventConsumer) indexVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) (bool, error) {
tx, err := c.db.BeginTx(ctx, nil)
+
return false, fmt.Errorf("failed to begin transaction: %w", err)
if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
···
+
// 1. Check for existing active vote with different URI (stale record)
+
// This handles cases where:
+
// - User voted on another client and we missed the delete event
+
// - Vote was reindexed but user created a new vote with different rkey
+
// - Any other state mismatch between PDS and AppView
+
var existingDirection sql.NullString
+
SELECT direction FROM votes
+
if err := tx.QueryRowContext(ctx, checkQuery, vote.VoterDID, vote.SubjectURI, vote.URI).Scan(&existingDirection); err != nil && err != sql.ErrNoRows {
+
return false, fmt.Errorf("failed to check existing vote: %w", err)
+
// If there's a stale vote, soft-delete it and adjust counts
+
if existingDirection.Valid {
+
if _, err := tx.ExecContext(ctx, softDeleteQuery, vote.VoterDID, vote.SubjectURI, vote.URI); err != nil {
+
return false, fmt.Errorf("failed to soft-delete existing votes: %w", err)
+
// Decrement the old vote's count (will be re-incremented below if same direction)
+
collection := utils.ExtractCollectionFromURI(vote.SubjectURI)
+
var decrementQuery string
+
if existingDirection.String == "up" {
+
if collection == "social.coves.community.post" {
+
decrementQuery = `UPDATE posts SET upvote_count = GREATEST(0, upvote_count - 1), score = upvote_count - 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL`
+
} else if collection == "social.coves.community.comment" {
+
decrementQuery = `UPDATE comments SET upvote_count = GREATEST(0, upvote_count - 1), score = upvote_count - 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL`
+
if collection == "social.coves.community.post" {
+
decrementQuery = `UPDATE posts SET downvote_count = GREATEST(0, downvote_count - 1), score = upvote_count - (downvote_count - 1) WHERE uri = $1 AND deleted_at IS NULL`
+
} else if collection == "social.coves.community.comment" {
+
decrementQuery = `UPDATE comments SET downvote_count = GREATEST(0, downvote_count - 1), score = upvote_count - (downvote_count - 1) WHERE uri = $1 AND deleted_at IS NULL`
+
if decrementQuery != "" {
+
if _, err := tx.ExecContext(ctx, decrementQuery, vote.SubjectURI); err != nil {
+
return false, fmt.Errorf("failed to decrement old vote count: %w", err)
+
log.Printf("Cleaned up stale vote for %s on %s (was %s)", vote.VoterDID, vote.SubjectURI, existingDirection.String)
+
// 2. Index the vote (idempotent with ON CONFLICT DO NOTHING)
uri, cid, rkey, voter_did,
···
// If no rows returned, vote already exists (idempotent - OK for Jetstream replays)
if err == sql.ErrNoRows {
+
// Silently handle idempotent case - no log needed for replayed events
if commitErr := tx.Commit(); commitErr != nil {
+
return false, fmt.Errorf("failed to commit transaction: %w", commitErr)
+
return false, nil // Vote already existed
+
return false, fmt.Errorf("failed to insert vote: %w", err)
+
// 3. Update vote counts on the subject (post or comment)
// Parse collection from subject URI to determine target table
collection := utils.ExtractCollectionFromURI(vote.SubjectURI)
···
// Vote is still indexed in votes table, we just don't update denormalized counts
log.Printf("Vote subject has unsupported collection: %s (vote indexed, counts not updated)", collection)
if commitErr := tx.Commit(); commitErr != nil {
+
return false, fmt.Errorf("failed to commit transaction: %w", commitErr)
+
return true, nil // Vote was newly indexed
result, err := tx.ExecContext(ctx, updateQuery, vote.SubjectURI)
+
return false, fmt.Errorf("failed to update vote counts: %w", err)
rowsAffected, err := result.RowsAffected()
+
return false, fmt.Errorf("failed to check update result: %w", err)
// If subject doesn't exist or is deleted, that's OK (vote still indexed)
···
if err := tx.Commit(); err != nil {
+
return false, fmt.Errorf("failed to commit transaction: %w", err)
+
return true, nil // Vote was newly indexed
// deleteVoteAndUpdateCounts atomically soft-deletes a vote and updates post vote counts