A community based topic aggregation platform built on atproto

fix(jetstream): handle duplicate comment events gracefully

Use ON CONFLICT DO NOTHING for comment indexing to handle race
conditions from duplicate Jetstream events (at-least-once delivery).
This eliminates duplicate key constraint errors when the same event
is delivered multiple times.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+12
internal
atproto
+12
internal/atproto/jetstream/comment_consumer.go
···
} else if checkErr == sql.ErrNoRows {
// Comment doesn't exist - insert new comment
insertQuery := `
INSERT INTO comments (
uri, cid, rkey, commenter_did,
···
$9, $10, $11, $12, $13,
$14, $15
)
RETURNING id
`
···
comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
comment.CreatedAt, time.Now(),
).Scan(&commentID)
if err != nil {
return fmt.Errorf("failed to insert comment: %w", err)
}
···
} else if checkErr == sql.ErrNoRows {
// Comment doesn't exist - insert new comment
+
// Use ON CONFLICT DO NOTHING to handle race conditions gracefully
+
// (e.g., duplicate Jetstream events from reconnections/retries)
insertQuery := `
INSERT INTO comments (
uri, cid, rkey, commenter_did,
···
$9, $10, $11, $12, $13,
$14, $15
)
+
ON CONFLICT (uri) DO NOTHING
RETURNING id
`
···
comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
comment.CreatedAt, time.Now(),
).Scan(&commentID)
+
if err == sql.ErrNoRows {
+
// ON CONFLICT triggered - comment was inserted by concurrent process
+
// This is an idempotent replay, skip gracefully
+
log.Printf("Comment already indexed (concurrent insert): %s", comment.URI)
+
if commitErr := tx.Commit(); commitErr != nil {
+
return fmt.Errorf("failed to commit transaction: %w", commitErr)
+
}
+
return nil
+
}
if err != nil {
return fmt.Errorf("failed to insert comment: %w", err)
}