A community based topic aggregation platform built on atproto

feat(repository): implement PostgreSQL comment repository

Add repository implementation for comment CRUD and thread queries.
Handles PostgreSQL-specific operations including array marshaling for langs
field and proper NULL handling for optional JSON fields.

Key operations:
- Create/Update/Delete with soft delete support
- GetByURI with ErrCommentNotFound for missing records
- ListByRoot/ListByParent for thread traversal
- ListByCommenter for user history
- CountByParent for pagination

All queries filter out soft-deleted comments (deleted_at IS NULL).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+354
internal
db
postgres
+354
internal/db/postgres/comment_repo.go
···
+
package postgres
+
+
import (
+
"Coves/internal/core/comments"
+
"context"
+
"database/sql"
+
"fmt"
+
"log"
+
"strings"
+
+
"github.com/lib/pq"
+
)
+
+
type postgresCommentRepo struct {
+
db *sql.DB
+
}
+
+
// NewCommentRepository creates a new PostgreSQL comment repository
+
func NewCommentRepository(db *sql.DB) comments.Repository {
+
return &postgresCommentRepo{db: db}
+
}
+
+
// Create inserts a new comment into the comments table
+
// Called by Jetstream consumer after comment is created on PDS
+
// Idempotent: Returns success if comment already exists (for Jetstream replays)
+
func (r *postgresCommentRepo) Create(ctx context.Context, comment *comments.Comment) error {
+
query := `
+
INSERT INTO comments (
+
uri, cid, rkey, commenter_did,
+
root_uri, root_cid, parent_uri, parent_cid,
+
content, content_facets, embed, content_labels, langs,
+
created_at, indexed_at
+
) VALUES (
+
$1, $2, $3, $4,
+
$5, $6, $7, $8,
+
$9, $10, $11, $12, $13,
+
$14, NOW()
+
)
+
ON CONFLICT (uri) DO NOTHING
+
RETURNING id, indexed_at
+
`
+
+
err := r.db.QueryRowContext(
+
ctx, query,
+
comment.URI, comment.CID, comment.RKey, comment.CommenterDID,
+
comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID,
+
comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
+
comment.CreatedAt,
+
).Scan(&comment.ID, &comment.IndexedAt)
+
+
// ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent)
+
if err == sql.ErrNoRows {
+
return nil // Comment already exists, no error for idempotency
+
}
+
+
if err != nil {
+
// Check for unique constraint violation
+
if strings.Contains(err.Error(), "duplicate key") {
+
return comments.ErrCommentAlreadyExists
+
}
+
+
return fmt.Errorf("failed to insert comment: %w", err)
+
}
+
+
return nil
+
}
+
+
// Update modifies an existing comment's content fields
+
// Called by Jetstream consumer after comment is updated on PDS
+
// Preserves vote counts and created_at timestamp
+
func (r *postgresCommentRepo) Update(ctx context.Context, comment *comments.Comment) error {
+
query := `
+
UPDATE comments
+
SET
+
cid = $1,
+
content = $2,
+
content_facets = $3,
+
embed = $4,
+
content_labels = $5,
+
langs = $6
+
WHERE uri = $7 AND deleted_at IS NULL
+
RETURNING id, indexed_at, created_at, upvote_count, downvote_count, score, reply_count
+
`
+
+
err := r.db.QueryRowContext(
+
ctx, query,
+
comment.CID,
+
comment.Content,
+
comment.ContentFacets,
+
comment.Embed,
+
comment.ContentLabels,
+
pq.Array(comment.Langs),
+
comment.URI,
+
).Scan(
+
&comment.ID,
+
&comment.IndexedAt,
+
&comment.CreatedAt,
+
&comment.UpvoteCount,
+
&comment.DownvoteCount,
+
&comment.Score,
+
&comment.ReplyCount,
+
)
+
+
if err == sql.ErrNoRows {
+
return comments.ErrCommentNotFound
+
}
+
if err != nil {
+
return fmt.Errorf("failed to update comment: %w", err)
+
}
+
+
return nil
+
}
+
+
// GetByURI retrieves a comment by its AT-URI
+
// Used by Jetstream consumer for UPDATE/DELETE operations
+
func (r *postgresCommentRepo) GetByURI(ctx context.Context, uri string) (*comments.Comment, error) {
+
query := `
+
SELECT
+
id, uri, cid, rkey, commenter_did,
+
root_uri, root_cid, parent_uri, parent_cid,
+
content, content_facets, embed, content_labels, langs,
+
created_at, indexed_at, deleted_at,
+
upvote_count, downvote_count, score, reply_count
+
FROM comments
+
WHERE uri = $1
+
`
+
+
var comment comments.Comment
+
var langs pq.StringArray
+
+
err := r.db.QueryRowContext(ctx, query, uri).Scan(
+
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
+
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
+
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
+
)
+
+
if err == sql.ErrNoRows {
+
return nil, comments.ErrCommentNotFound
+
}
+
if err != nil {
+
return nil, fmt.Errorf("failed to get comment by URI: %w", err)
+
}
+
+
comment.Langs = langs
+
+
return &comment, nil
+
}
+
+
// Delete soft-deletes a comment (sets deleted_at)
+
// Called by Jetstream consumer after comment is deleted from PDS
+
// Idempotent: Returns success if comment already deleted
+
func (r *postgresCommentRepo) Delete(ctx context.Context, uri string) error {
+
query := `
+
UPDATE comments
+
SET deleted_at = NOW()
+
WHERE uri = $1 AND deleted_at IS NULL
+
`
+
+
result, err := r.db.ExecContext(ctx, query, uri)
+
if err != nil {
+
return fmt.Errorf("failed to delete comment: %w", err)
+
}
+
+
rowsAffected, err := result.RowsAffected()
+
if err != nil {
+
return fmt.Errorf("failed to check delete result: %w", err)
+
}
+
+
// Idempotent: If no rows affected, comment already deleted (OK for Jetstream replays)
+
if rowsAffected == 0 {
+
return nil
+
}
+
+
return nil
+
}
+
+
// ListByRoot retrieves all active comments in a thread (flat)
+
// Used for fetching entire comment threads on posts
+
func (r *postgresCommentRepo) ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*comments.Comment, error) {
+
query := `
+
SELECT
+
id, uri, cid, rkey, commenter_did,
+
root_uri, root_cid, parent_uri, parent_cid,
+
content, content_facets, embed, content_labels, langs,
+
created_at, indexed_at, deleted_at,
+
upvote_count, downvote_count, score, reply_count
+
FROM comments
+
WHERE root_uri = $1 AND deleted_at IS NULL
+
ORDER BY created_at ASC
+
LIMIT $2 OFFSET $3
+
`
+
+
rows, err := r.db.QueryContext(ctx, query, rootURI, limit, offset)
+
if err != nil {
+
return nil, fmt.Errorf("failed to list comments by root: %w", err)
+
}
+
defer func() {
+
if err := rows.Close(); err != nil {
+
log.Printf("Failed to close rows: %v", err)
+
}
+
}()
+
+
var result []*comments.Comment
+
for rows.Next() {
+
var comment comments.Comment
+
var langs pq.StringArray
+
+
err := rows.Scan(
+
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
+
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
+
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
+
)
+
if err != nil {
+
return nil, fmt.Errorf("failed to scan comment: %w", err)
+
}
+
+
comment.Langs = langs
+
result = append(result, &comment)
+
}
+
+
if err = rows.Err(); err != nil {
+
return nil, fmt.Errorf("error iterating comments: %w", err)
+
}
+
+
return result, nil
+
}
+
+
// ListByParent retrieves direct replies to a post or comment
+
// Used for building nested/threaded comment views
+
func (r *postgresCommentRepo) ListByParent(ctx context.Context, parentURI string, limit, offset int) ([]*comments.Comment, error) {
+
query := `
+
SELECT
+
id, uri, cid, rkey, commenter_did,
+
root_uri, root_cid, parent_uri, parent_cid,
+
content, content_facets, embed, content_labels, langs,
+
created_at, indexed_at, deleted_at,
+
upvote_count, downvote_count, score, reply_count
+
FROM comments
+
WHERE parent_uri = $1 AND deleted_at IS NULL
+
ORDER BY created_at ASC
+
LIMIT $2 OFFSET $3
+
`
+
+
rows, err := r.db.QueryContext(ctx, query, parentURI, limit, offset)
+
if err != nil {
+
return nil, fmt.Errorf("failed to list comments by parent: %w", err)
+
}
+
defer func() {
+
if err := rows.Close(); err != nil {
+
log.Printf("Failed to close rows: %v", err)
+
}
+
}()
+
+
var result []*comments.Comment
+
for rows.Next() {
+
var comment comments.Comment
+
var langs pq.StringArray
+
+
err := rows.Scan(
+
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
+
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
+
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
+
)
+
if err != nil {
+
return nil, fmt.Errorf("failed to scan comment: %w", err)
+
}
+
+
comment.Langs = langs
+
result = append(result, &comment)
+
}
+
+
if err = rows.Err(); err != nil {
+
return nil, fmt.Errorf("error iterating comments: %w", err)
+
}
+
+
return result, nil
+
}
+
+
// CountByParent counts direct replies to a post or comment
+
// Used for showing reply counts in threading UI
+
func (r *postgresCommentRepo) CountByParent(ctx context.Context, parentURI string) (int, error) {
+
query := `
+
SELECT COUNT(*)
+
FROM comments
+
WHERE parent_uri = $1 AND deleted_at IS NULL
+
`
+
+
var count int
+
err := r.db.QueryRowContext(ctx, query, parentURI).Scan(&count)
+
if err != nil {
+
return 0, fmt.Errorf("failed to count comments by parent: %w", err)
+
}
+
+
return count, nil
+
}
+
+
// ListByCommenter retrieves all active comments by a specific user
+
// Future: Used for user comment history
+
func (r *postgresCommentRepo) ListByCommenter(ctx context.Context, commenterDID string, limit, offset int) ([]*comments.Comment, error) {
+
query := `
+
SELECT
+
id, uri, cid, rkey, commenter_did,
+
root_uri, root_cid, parent_uri, parent_cid,
+
content, content_facets, embed, content_labels, langs,
+
created_at, indexed_at, deleted_at,
+
upvote_count, downvote_count, score, reply_count
+
FROM comments
+
WHERE commenter_did = $1 AND deleted_at IS NULL
+
ORDER BY created_at DESC
+
LIMIT $2 OFFSET $3
+
`
+
+
rows, err := r.db.QueryContext(ctx, query, commenterDID, limit, offset)
+
if err != nil {
+
return nil, fmt.Errorf("failed to list comments by commenter: %w", err)
+
}
+
defer func() {
+
if err := rows.Close(); err != nil {
+
log.Printf("Failed to close rows: %v", err)
+
}
+
}()
+
+
var result []*comments.Comment
+
for rows.Next() {
+
var comment comments.Comment
+
var langs pq.StringArray
+
+
err := rows.Scan(
+
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
+
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
+
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
+
)
+
if err != nil {
+
return nil, fmt.Errorf("failed to scan comment: %w", err)
+
}
+
+
comment.Langs = langs
+
result = append(result, &comment)
+
}
+
+
if err = rows.Err(); err != nil {
+
return nil, fmt.Errorf("error iterating comments: %w", err)
+
}
+
+
return result, nil
+
}