A community based topic aggregation platform built on atproto

Merge branch 'feat/comment-system-indexing-phase1'

Changed files
+3218
cmd
server
docs
internal
tests
+25
cmd/server/main.go
···
voteRepo := postgresRepo.NewVoteRepository(db)
log.Println("✅ Vote repository initialized (Jetstream indexing only)")
+
// Initialize comment repository (used by Jetstream consumer for indexing)
+
commentRepo := postgresRepo.NewCommentRepository(db)
+
log.Println("✅ Comment repository initialized (Jetstream indexing only)")
+
// Initialize feed service
feedRepo := postgresRepo.NewCommunityFeedRepository(db)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
···
log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL)
log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations")
log.Println(" - Updating: Post vote counts atomically")
+
+
// Start Jetstream consumer for comments
+
// This consumer indexes comments from user repositories and updates parent counts
+
commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL")
+
if commentJetstreamURL == "" {
+
// Listen to comment record CREATE/UPDATE/DELETE events from user repositories
+
commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.comment"
+
}
+
+
commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db)
+
commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL)
+
+
go func() {
+
if startErr := commentJetstreamConnector.Start(ctx); startErr != nil {
+
log.Printf("Comment Jetstream consumer stopped: %v", startErr)
+
}
+
}()
+
+
log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL)
+
log.Println(" - Indexing: social.coves.feed.comment CREATE/UPDATE/DELETE operations")
+
log.Println(" - Updating: Post comment counts and comment reply counts atomically")
// Register XRPC routes
routes.RegisterUserRoutes(r, userService)
+54
docs/PRD_BACKLOG.md
···
---
+
### Post comment_count Reconciliation Missing
+
**Added:** 2025-11-04 | **Effort:** 2-3 hours | **Priority:** ALPHA BLOCKER
+
+
**Problem:**
+
When comments arrive before their parent post is indexed (common with cross-repo Jetstream ordering), the post's `comment_count` is never reconciled. Later, when the post consumer indexes the post, there's no logic to count pre-existing comments. This causes posts to have permanently stale `comment_count` values.
+
+
**End-User Impact:**
+
- 🔴 Posts show "0 comments" when they actually have comments
+
- ❌ Broken engagement signals (users don't know there are discussions)
+
- ❌ UI inconsistency (thread page shows comments, but counter says "0")
+
- ⚠️ Users may not click into posts thinking they're empty
+
- 📉 Reduced engagement due to misleading counters
+
+
**Root Cause:**
+
- Comment consumer updates post counts when processing comment events ([comment_consumer.go:323-343](../internal/atproto/jetstream/comment_consumer.go#L323-L343))
+
- If comment arrives BEFORE post is indexed, update query returns 0 rows (only logs warning)
+
- When post consumer later indexes the post, it sets `comment_count = 0` with NO reconciliation
+
- Comments already exist in DB, but post never "discovers" them
+
+
**Solution:**
+
Post consumer MUST implement the same reconciliation pattern as comment consumer (see [comment_consumer.go:292-305](../internal/atproto/jetstream/comment_consumer.go#L292-L305)):
+
+
```go
+
// After inserting new post, reconcile comment_count for out-of-order comments
+
reconcileQuery := `
+
UPDATE posts
+
SET comment_count = (
+
SELECT COUNT(*)
+
FROM comments c
+
WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
+
)
+
WHERE id = $2
+
`
+
_, reconcileErr := tx.ExecContext(ctx, reconcileQuery, postURI, postID)
+
```
+
+
**Affected Operations:**
+
- Post indexing from Jetstream ([post_consumer.go](../internal/atproto/jetstream/post_consumer.go))
+
- Any cross-repo event ordering (community DID ≠ author DID)
+
+
**Current Status:**
+
- 🔴 Issue documented with FIXME(P1) comment at [comment_consumer.go:311-321](../internal/atproto/jetstream/comment_consumer.go#L311-L321)
+
- ⚠️ Test demonstrating limitation exists: `TestCommentConsumer_PostCountReconciliation_Limitation`
+
- 📋 Fix required in post consumer (out of scope for comment system PR)
+
+
**Files to Modify:**
+
- `internal/atproto/jetstream/post_consumer.go` - Add reconciliation after post creation
+
- `tests/integration/post_consumer_test.go` - Add test for out-of-order comment reconciliation
+
+
**Similar Issue Fixed:**
+
- ✅ Comment reply_count reconciliation - Fixed in comment system implementation (2025-11-04)
+
+
---
+
## 🔴 P1.5: Federation Blockers (Beta Launch)
### Cross-PDS Write-Forward Support for Community Service
+665
internal/atproto/jetstream/comment_consumer.go
···
+
package jetstream
+
+
import (
+
"Coves/internal/core/comments"
+
"context"
+
"database/sql"
+
"encoding/json"
+
"fmt"
+
"log"
+
"strings"
+
"time"
+
+
"github.com/lib/pq"
+
)
+
+
// Constants for comment validation and processing
+
const (
+
// CommentCollection is the lexicon collection identifier for comments
+
CommentCollection = "social.coves.feed.comment"
+
+
// ATProtoScheme is the URI scheme for atProto AT-URIs
+
ATProtoScheme = "at://"
+
+
// MaxCommentContentBytes is the maximum allowed size for comment content
+
// Per lexicon: max 3000 graphemes, ~30000 bytes
+
MaxCommentContentBytes = 30000
+
)
+
+
// CommentEventConsumer consumes comment-related events from Jetstream
+
// Handles CREATE, UPDATE, and DELETE operations for social.coves.feed.comment
+
type CommentEventConsumer struct {
+
commentRepo comments.Repository
+
db *sql.DB // Direct DB access for atomic count updates
+
}
+
+
// NewCommentEventConsumer creates a new Jetstream consumer for comment events
+
func NewCommentEventConsumer(
+
commentRepo comments.Repository,
+
db *sql.DB,
+
) *CommentEventConsumer {
+
return &CommentEventConsumer{
+
commentRepo: commentRepo,
+
db: db,
+
}
+
}
+
+
// HandleEvent processes a Jetstream event for comment records
+
func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
+
// We only care about commit events for comment records
+
if event.Kind != "commit" || event.Commit == nil {
+
return nil
+
}
+
+
commit := event.Commit
+
+
// Handle comment record operations
+
if commit.Collection == CommentCollection {
+
switch commit.Operation {
+
case "create":
+
return c.createComment(ctx, event.Did, commit)
+
case "update":
+
return c.updateComment(ctx, event.Did, commit)
+
case "delete":
+
return c.deleteComment(ctx, event.Did, commit)
+
}
+
}
+
+
// Silently ignore other operations and collections
+
return nil
+
}
+
+
// createComment indexes a new comment from the firehose and updates parent counts
+
func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
+
if commit.Record == nil {
+
return fmt.Errorf("comment create event missing record data")
+
}
+
+
// Parse the comment record
+
commentRecord, err := parseCommentRecord(commit.Record)
+
if err != nil {
+
return fmt.Errorf("failed to parse comment record: %w", err)
+
}
+
+
// SECURITY: Validate this is a legitimate comment event
+
if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
+
log.Printf("🚨 SECURITY: Rejecting comment event: %v", err)
+
return err
+
}
+
+
// Build AT-URI for this comment
+
// Format: at://commenter_did/social.coves.feed.comment/rkey
+
uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey)
+
+
// Parse timestamp from record
+
createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt)
+
if err != nil {
+
log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err)
+
createdAt = time.Now()
+
}
+
+
// Serialize optional JSON fields
+
facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
+
+
// Build comment entity
+
comment := &comments.Comment{
+
URI: uri,
+
CID: commit.CID,
+
RKey: commit.RKey,
+
CommenterDID: repoDID, // Comment comes from user's repository
+
RootURI: commentRecord.Reply.Root.URI,
+
RootCID: commentRecord.Reply.Root.CID,
+
ParentURI: commentRecord.Reply.Parent.URI,
+
ParentCID: commentRecord.Reply.Parent.CID,
+
Content: commentRecord.Content,
+
ContentFacets: facetsJSON,
+
Embed: embedJSON,
+
ContentLabels: labelsJSON,
+
Langs: commentRecord.Langs,
+
CreatedAt: createdAt,
+
IndexedAt: time.Now(),
+
}
+
+
// Atomically: Index comment + Update parent counts
+
if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil {
+
return fmt.Errorf("failed to index comment and update counts: %w", err)
+
}
+
+
log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI)
+
return nil
+
}
+
+
// updateComment updates an existing comment's content fields
+
func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
+
if commit.Record == nil {
+
return fmt.Errorf("comment update event missing record data")
+
}
+
+
// Parse the updated comment record
+
commentRecord, err := parseCommentRecord(commit.Record)
+
if err != nil {
+
return fmt.Errorf("failed to parse comment record: %w", err)
+
}
+
+
// SECURITY: Validate this is a legitimate update
+
if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
+
log.Printf("🚨 SECURITY: Rejecting comment update: %v", err)
+
return err
+
}
+
+
// Build AT-URI for the comment being updated
+
uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey)
+
+
// Fetch existing comment to validate threading references are immutable
+
existingComment, err := c.commentRepo.GetByURI(ctx, uri)
+
if err != nil {
+
if err == comments.ErrCommentNotFound {
+
// Comment doesn't exist yet - might arrive out of order
+
log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri)
+
return nil
+
}
+
return fmt.Errorf("failed to get existing comment for validation: %w", err)
+
}
+
+
// SECURITY: Threading references are IMMUTABLE after creation
+
// Reject updates that attempt to change root/parent (prevents thread hijacking)
+
if existingComment.RootURI != commentRecord.Reply.Root.URI ||
+
existingComment.RootCID != commentRecord.Reply.Root.CID ||
+
existingComment.ParentURI != commentRecord.Reply.Parent.URI ||
+
existingComment.ParentCID != commentRecord.Reply.Parent.CID {
+
log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri)
+
log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID)
+
log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID)
+
log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID)
+
log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID)
+
return fmt.Errorf("comment threading references cannot be changed after creation")
+
}
+
+
// Serialize optional JSON fields
+
facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
+
+
// Build comment update entity (preserves vote counts and created_at)
+
comment := &comments.Comment{
+
URI: uri,
+
CID: commit.CID,
+
Content: commentRecord.Content,
+
ContentFacets: facetsJSON,
+
Embed: embedJSON,
+
ContentLabels: labelsJSON,
+
Langs: commentRecord.Langs,
+
}
+
+
// Update the comment in repository
+
if err := c.commentRepo.Update(ctx, comment); err != nil {
+
return fmt.Errorf("failed to update comment: %w", err)
+
}
+
+
log.Printf("✓ Updated comment: %s", uri)
+
return nil
+
}
+
+
// deleteComment soft-deletes a comment and updates parent counts
+
func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
+
// Build AT-URI for the comment being deleted
+
uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey)
+
+
// Get existing comment to know its parent (for decrementing the right counter)
+
existingComment, err := c.commentRepo.GetByURI(ctx, uri)
+
if err != nil {
+
if err == comments.ErrCommentNotFound {
+
// Idempotent: Comment already deleted or never existed
+
log.Printf("Comment already deleted or not found: %s", uri)
+
return nil
+
}
+
return fmt.Errorf("failed to get existing comment: %w", err)
+
}
+
+
// Atomically: Soft-delete comment + Update parent counts
+
if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil {
+
return fmt.Errorf("failed to delete comment and update counts: %w", err)
+
}
+
+
log.Printf("✓ Deleted comment: %s", uri)
+
return nil
+
}
+
+
// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts
+
func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
+
tx, err := c.db.BeginTx(ctx, nil)
+
if err != nil {
+
return fmt.Errorf("failed to begin transaction: %w", err)
+
}
+
defer func() {
+
if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
+
log.Printf("Failed to rollback transaction: %v", rollbackErr)
+
}
+
}()
+
+
// 1. Check if comment exists and handle resurrection case
+
// In atProto, deleted records' rkeys become available - users can recreate with same rkey
+
// We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts)
+
var existingID int64
+
var existingDeletedAt *time.Time
+
checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1`
+
checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt)
+
+
var commentID int64
+
+
if checkErr == nil {
+
// Comment exists
+
if existingDeletedAt == nil {
+
// Not deleted - this is an idempotent replay, skip gracefully
+
log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI)
+
if commitErr := tx.Commit(); commitErr != nil {
+
return fmt.Errorf("failed to commit transaction: %w", commitErr)
+
}
+
return nil
+
}
+
+
// Comment was soft-deleted, now being recreated (resurrection)
+
// This is a NEW record with same rkey - update ALL fields including threading refs
+
// User may have deleted old comment and created a new one on a different parent/root
+
log.Printf("Resurrecting previously deleted comment: %s", comment.URI)
+
commentID = existingID
+
+
resurrectQuery := `
+
UPDATE comments
+
SET
+
cid = $1,
+
commenter_did = $2,
+
root_uri = $3,
+
root_cid = $4,
+
parent_uri = $5,
+
parent_cid = $6,
+
content = $7,
+
content_facets = $8,
+
embed = $9,
+
content_labels = $10,
+
langs = $11,
+
created_at = $12,
+
indexed_at = $13,
+
deleted_at = NULL,
+
reply_count = 0
+
WHERE id = $14
+
`
+
+
_, err = tx.ExecContext(
+
ctx, resurrectQuery,
+
comment.CID,
+
comment.CommenterDID,
+
comment.RootURI,
+
comment.RootCID,
+
comment.ParentURI,
+
comment.ParentCID,
+
comment.Content,
+
comment.ContentFacets,
+
comment.Embed,
+
comment.ContentLabels,
+
pq.Array(comment.Langs),
+
comment.CreatedAt,
+
time.Now(),
+
commentID,
+
)
+
+
if err != nil {
+
return fmt.Errorf("failed to resurrect comment: %w", err)
+
}
+
+
} else if checkErr == sql.ErrNoRows {
+
// Comment doesn't exist - insert new comment
+
insertQuery := `
+
INSERT INTO comments (
+
uri, cid, rkey, commenter_did,
+
root_uri, root_cid, parent_uri, parent_cid,
+
content, content_facets, embed, content_labels, langs,
+
created_at, indexed_at
+
) VALUES (
+
$1, $2, $3, $4,
+
$5, $6, $7, $8,
+
$9, $10, $11, $12, $13,
+
$14, $15
+
)
+
RETURNING id
+
`
+
+
err = tx.QueryRowContext(
+
ctx, insertQuery,
+
comment.URI, comment.CID, comment.RKey, comment.CommenterDID,
+
comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID,
+
comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
+
comment.CreatedAt, time.Now(),
+
).Scan(&commentID)
+
+
if err != nil {
+
return fmt.Errorf("failed to insert comment: %w", err)
+
}
+
+
} else {
+
// Unexpected error checking for existing comment
+
return fmt.Errorf("failed to check for existing comment: %w", checkErr)
+
}
+
+
// 1.5. Reconcile reply_count for this newly inserted comment
+
// In case any replies arrived out-of-order before this parent was indexed
+
reconcileQuery := `
+
UPDATE comments
+
SET reply_count = (
+
SELECT COUNT(*)
+
FROM comments c
+
WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
+
)
+
WHERE id = $2
+
`
+
_, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID)
+
if reconcileErr != nil {
+
log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr)
+
// Continue anyway - this is a best-effort reconciliation
+
}
+
+
// 2. Update parent counts atomically
+
// Parent could be a post (increment comment_count) or a comment (increment reply_count)
+
// Try posts table first
+
//
+
// FIXME(P1): Post comment_count reconciliation not implemented
+
// When a comment arrives before its parent post (common with cross-repo Jetstream ordering),
+
// the post update below returns 0 rows and we only log a warning. Later, when the post
+
// is indexed by the post consumer, there's NO reconciliation logic to count pre-existing
+
// comments. This causes posts to have permanently stale comment_count values.
+
//
+
// FIX REQUIRED: Post consumer MUST implement the same reconciliation pattern as comments
+
// (see lines 292-305 above). When indexing a new post, count any comments where parent_uri
+
// matches the post URI and set comment_count accordingly.
+
//
+
// Test demonstrating issue: TestCommentConsumer_PostCountReconciliation_Limitation
+
updatePostQuery := `
+
UPDATE posts
+
SET comment_count = comment_count + 1
+
WHERE uri = $1 AND deleted_at IS NULL
+
`
+
+
result, err := tx.ExecContext(ctx, updatePostQuery, comment.ParentURI)
+
if err != nil {
+
return fmt.Errorf("failed to update post comment count: %w", err)
+
}
+
+
rowsAffected, err := result.RowsAffected()
+
if err != nil {
+
return fmt.Errorf("failed to check update result: %w", err)
+
}
+
+
// If no post was updated, parent is probably a comment
+
if rowsAffected == 0 {
+
updateCommentQuery := `
+
UPDATE comments
+
SET reply_count = reply_count + 1
+
WHERE uri = $1 AND deleted_at IS NULL
+
`
+
+
result, err := tx.ExecContext(ctx, updateCommentQuery, comment.ParentURI)
+
if err != nil {
+
return fmt.Errorf("failed to update comment reply count: %w", err)
+
}
+
+
rowsAffected, err := result.RowsAffected()
+
if err != nil {
+
return fmt.Errorf("failed to check update result: %w", err)
+
}
+
+
// If neither post nor comment was found, that's OK (parent might not be indexed yet)
+
if rowsAffected == 0 {
+
log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI)
+
}
+
}
+
+
// Commit transaction
+
if err := tx.Commit(); err != nil {
+
return fmt.Errorf("failed to commit transaction: %w", err)
+
}
+
+
return nil
+
}
+
+
// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts
+
func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
+
tx, err := c.db.BeginTx(ctx, nil)
+
if err != nil {
+
return fmt.Errorf("failed to begin transaction: %w", err)
+
}
+
defer func() {
+
if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
+
log.Printf("Failed to rollback transaction: %v", rollbackErr)
+
}
+
}()
+
+
// 1. Soft-delete the comment (idempotent)
+
deleteQuery := `
+
UPDATE comments
+
SET deleted_at = $2
+
WHERE uri = $1 AND deleted_at IS NULL
+
`
+
+
result, err := tx.ExecContext(ctx, deleteQuery, comment.URI, time.Now())
+
if err != nil {
+
return fmt.Errorf("failed to delete comment: %w", err)
+
}
+
+
rowsAffected, err := result.RowsAffected()
+
if err != nil {
+
return fmt.Errorf("failed to check delete result: %w", err)
+
}
+
+
// Idempotent: If no rows affected, comment already deleted
+
if rowsAffected == 0 {
+
log.Printf("Comment already deleted: %s (idempotent)", comment.URI)
+
if commitErr := tx.Commit(); commitErr != nil {
+
return fmt.Errorf("failed to commit transaction: %w", commitErr)
+
}
+
return nil
+
}
+
+
// 2. Decrement parent counts atomically
+
// Parent could be a post or comment - try both (use GREATEST to prevent negative)
+
updatePostQuery := `
+
UPDATE posts
+
SET comment_count = GREATEST(0, comment_count - 1)
+
WHERE uri = $1 AND deleted_at IS NULL
+
`
+
+
result, err = tx.ExecContext(ctx, updatePostQuery, comment.ParentURI)
+
if err != nil {
+
return fmt.Errorf("failed to update post comment count: %w", err)
+
}
+
+
rowsAffected, err = result.RowsAffected()
+
if err != nil {
+
return fmt.Errorf("failed to check update result: %w", err)
+
}
+
+
// If no post was updated, parent is probably a comment
+
if rowsAffected == 0 {
+
updateCommentQuery := `
+
UPDATE comments
+
SET reply_count = GREATEST(0, reply_count - 1)
+
WHERE uri = $1 AND deleted_at IS NULL
+
`
+
+
result, err := tx.ExecContext(ctx, updateCommentQuery, comment.ParentURI)
+
if err != nil {
+
return fmt.Errorf("failed to update comment reply count: %w", err)
+
}
+
+
rowsAffected, err := result.RowsAffected()
+
if err != nil {
+
return fmt.Errorf("failed to check update result: %w", err)
+
}
+
+
// If neither was found, that's OK (parent might be deleted)
+
if rowsAffected == 0 {
+
log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI)
+
}
+
}
+
+
// Commit transaction
+
if err := tx.Commit(); err != nil {
+
return fmt.Errorf("failed to commit transaction: %w", err)
+
}
+
+
return nil
+
}
+
+
// validateCommentEvent performs security validation on comment events
+
func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error {
+
// SECURITY: Comments MUST come from user repositories (repo owner = commenter DID)
+
// The repository owner (repoDID) IS the commenter - comments are stored in user repos.
+
//
+
// We do NOT check if the user exists in AppView because:
+
// 1. Comment events may arrive before user events in Jetstream (race condition)
+
// 2. The comment came from the user's PDS repository (authenticated by PDS)
+
// 3. The database FK constraint was removed to allow out-of-order indexing
+
// 4. Orphaned comments (from never-indexed users) are harmless
+
//
+
// Security is maintained because:
+
// - Comment must come from user's own PDS repository (verified by atProto)
+
// - Fake DIDs will fail PDS authentication
+
+
// Validate DID format (basic sanity check)
+
if !strings.HasPrefix(repoDID, "did:") {
+
return fmt.Errorf("invalid commenter DID format: %s", repoDID)
+
}
+
+
// Validate content is not empty (required per lexicon)
+
if comment.Content == "" {
+
return fmt.Errorf("comment content is required")
+
}
+
+
// Validate content length (defensive check - PDS should enforce this)
+
// Per lexicon: max 3000 graphemes, ~30000 bytes
+
// We check bytes as a simple defensive measure
+
if len(comment.Content) > MaxCommentContentBytes {
+
return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content))
+
}
+
+
// Validate reply references exist
+
if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" {
+
return fmt.Errorf("invalid root reference: must have both URI and CID")
+
}
+
+
if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" {
+
return fmt.Errorf("invalid parent reference: must have both URI and CID")
+
}
+
+
// Validate AT-URI structure for root and parent
+
if err := validateATURI(comment.Reply.Root.URI); err != nil {
+
return fmt.Errorf("invalid root URI: %w", err)
+
}
+
+
if err := validateATURI(comment.Reply.Parent.URI); err != nil {
+
return fmt.Errorf("invalid parent URI: %w", err)
+
}
+
+
return nil
+
}
+
+
// validateATURI performs basic structure validation on AT-URIs
+
// Format: at://did:method:id/collection/rkey
+
// This is defensive validation - we trust PDS but catch obviously malformed URIs
+
func validateATURI(uri string) error {
+
if !strings.HasPrefix(uri, ATProtoScheme) {
+
return fmt.Errorf("must start with %s", ATProtoScheme)
+
}
+
+
// Remove at:// prefix and split by /
+
withoutScheme := strings.TrimPrefix(uri, ATProtoScheme)
+
parts := strings.Split(withoutScheme, "/")
+
+
// Must have at least 3 parts: did, collection, rkey
+
if len(parts) < 3 {
+
return fmt.Errorf("invalid structure (expected at://did/collection/rkey)")
+
}
+
+
// First part should be a DID
+
if !strings.HasPrefix(parts[0], "did:") {
+
return fmt.Errorf("repository identifier must be a DID")
+
}
+
+
// Collection and rkey should not be empty
+
if parts[1] == "" || parts[2] == "" {
+
return fmt.Errorf("collection and rkey cannot be empty")
+
}
+
+
return nil
+
}
+
+
// CommentRecordFromJetstream represents a comment record as received from Jetstream
+
// Matches social.coves.feed.comment lexicon
+
type CommentRecordFromJetstream struct {
+
Type string `json:"$type"`
+
Reply ReplyRefFromJetstream `json:"reply"`
+
Content string `json:"content"`
+
Facets []interface{} `json:"facets,omitempty"`
+
Embed map[string]interface{} `json:"embed,omitempty"`
+
Langs []string `json:"langs,omitempty"`
+
Labels interface{} `json:"labels,omitempty"`
+
CreatedAt string `json:"createdAt"`
+
}
+
+
// ReplyRefFromJetstream represents the threading structure
+
type ReplyRefFromJetstream struct {
+
Root StrongRefFromJetstream `json:"root"`
+
Parent StrongRefFromJetstream `json:"parent"`
+
}
+
+
// parseCommentRecord parses a comment record from Jetstream event data
+
func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) {
+
// Marshal to JSON and back for proper type conversion
+
recordJSON, err := json.Marshal(record)
+
if err != nil {
+
return nil, fmt.Errorf("failed to marshal record: %w", err)
+
}
+
+
var comment CommentRecordFromJetstream
+
if err := json.Unmarshal(recordJSON, &comment); err != nil {
+
return nil, fmt.Errorf("failed to unmarshal comment record: %w", err)
+
}
+
+
// Validate required fields
+
if comment.Content == "" {
+
return nil, fmt.Errorf("comment record missing content field")
+
}
+
+
if comment.CreatedAt == "" {
+
return nil, fmt.Errorf("comment record missing createdAt field")
+
}
+
+
return &comment, nil
+
}
+
+
// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings
+
// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication)
+
func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) {
+
// Serialize facets if present
+
if commentRecord.Facets != nil && len(commentRecord.Facets) > 0 {
+
if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil {
+
facetsStr := string(facetsBytes)
+
facetsJSON = &facetsStr
+
}
+
}
+
+
// Serialize embed if present
+
if commentRecord.Embed != nil && len(commentRecord.Embed) > 0 {
+
if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil {
+
embedStr := string(embedBytes)
+
embedJSON = &embedStr
+
}
+
}
+
+
// Serialize labels if present
+
if commentRecord.Labels != nil {
+
if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil {
+
labelsStr := string(labelsBytes)
+
labelsJSON = &labelsStr
+
}
+
}
+
+
return facetsJSON, embedJSON, labelsJSON
+
}
+125
internal/atproto/jetstream/comment_jetstream_connector.go
···
+
package jetstream
+
+
import (
+
"context"
+
"encoding/json"
+
"fmt"
+
"log"
+
"sync"
+
"time"
+
+
"github.com/gorilla/websocket"
+
)
+
+
// CommentJetstreamConnector handles WebSocket connection to Jetstream for comment events
+
type CommentJetstreamConnector struct {
+
consumer *CommentEventConsumer
+
wsURL string
+
}
+
+
// NewCommentJetstreamConnector creates a new Jetstream WebSocket connector for comment events
+
func NewCommentJetstreamConnector(consumer *CommentEventConsumer, wsURL string) *CommentJetstreamConnector {
+
return &CommentJetstreamConnector{
+
consumer: consumer,
+
wsURL: wsURL,
+
}
+
}
+
+
// Start begins consuming events from Jetstream
+
// Runs indefinitely, reconnecting on errors
+
func (c *CommentJetstreamConnector) Start(ctx context.Context) error {
+
log.Printf("Starting Jetstream comment consumer: %s", c.wsURL)
+
+
for {
+
select {
+
case <-ctx.Done():
+
log.Println("Jetstream comment consumer shutting down")
+
return ctx.Err()
+
default:
+
if err := c.connect(ctx); err != nil {
+
log.Printf("Jetstream comment connection error: %v. Retrying in 5s...", err)
+
time.Sleep(5 * time.Second)
+
continue
+
}
+
}
+
}
+
}
+
+
// connect establishes WebSocket connection and processes events
+
func (c *CommentJetstreamConnector) connect(ctx context.Context) error {
+
conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil)
+
if err != nil {
+
return fmt.Errorf("failed to connect to Jetstream: %w", err)
+
}
+
defer func() {
+
if closeErr := conn.Close(); closeErr != nil {
+
log.Printf("Failed to close WebSocket connection: %v", closeErr)
+
}
+
}()
+
+
log.Println("Connected to Jetstream (comment consumer)")
+
+
// Set read deadline to detect connection issues
+
if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
+
log.Printf("Failed to set read deadline: %v", err)
+
}
+
+
// Set pong handler to keep connection alive
+
conn.SetPongHandler(func(string) error {
+
if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
+
log.Printf("Failed to set read deadline in pong handler: %v", err)
+
}
+
return nil
+
})
+
+
// Start ping ticker
+
ticker := time.NewTicker(30 * time.Second)
+
defer ticker.Stop()
+
+
done := make(chan struct{})
+
var closeOnce sync.Once // Ensure done channel is only closed once
+
+
// Ping goroutine
+
go func() {
+
for {
+
select {
+
case <-ticker.C:
+
if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil {
+
log.Printf("Failed to send ping: %v", err)
+
closeOnce.Do(func() { close(done) })
+
return
+
}
+
case <-done:
+
return
+
}
+
}
+
}()
+
+
// Read loop
+
for {
+
select {
+
case <-done:
+
return fmt.Errorf("connection closed by ping failure")
+
default:
+
}
+
+
_, message, err := conn.ReadMessage()
+
if err != nil {
+
closeOnce.Do(func() { close(done) })
+
return fmt.Errorf("read error: %w", err)
+
}
+
+
// Parse Jetstream event
+
var event JetstreamEvent
+
if err := json.Unmarshal(message, &event); err != nil {
+
log.Printf("Failed to parse Jetstream event: %v", err)
+
continue
+
}
+
+
// Process event through consumer
+
if err := c.consumer.HandleEvent(ctx, &event); err != nil {
+
log.Printf("Failed to handle comment event: %v", err)
+
// Continue processing other events even if one fails
+
}
+
}
+
}
+80
internal/core/comments/comment.go
···
+
package comments
+
+
import (
+
"time"
+
)
+
+
// Comment represents a comment in the AppView database
+
// Comments are indexed from the firehose after being written to user repositories
+
type Comment struct {
+
ID int64 `json:"id" db:"id"`
+
URI string `json:"uri" db:"uri"`
+
CID string `json:"cid" db:"cid"`
+
RKey string `json:"rkey" db:"rkey"`
+
CommenterDID string `json:"commenterDid" db:"commenter_did"`
+
+
// Threading (reply references)
+
RootURI string `json:"rootUri" db:"root_uri"`
+
RootCID string `json:"rootCid" db:"root_cid"`
+
ParentURI string `json:"parentUri" db:"parent_uri"`
+
ParentCID string `json:"parentCid" db:"parent_cid"`
+
+
// Content
+
Content string `json:"content" db:"content"`
+
ContentFacets *string `json:"contentFacets,omitempty" db:"content_facets"`
+
Embed *string `json:"embed,omitempty" db:"embed"`
+
ContentLabels *string `json:"labels,omitempty" db:"content_labels"`
+
Langs []string `json:"langs,omitempty" db:"langs"`
+
+
// Timestamps
+
CreatedAt time.Time `json:"createdAt" db:"created_at"`
+
IndexedAt time.Time `json:"indexedAt" db:"indexed_at"`
+
DeletedAt *time.Time `json:"deletedAt,omitempty" db:"deleted_at"`
+
+
// Stats (denormalized for performance)
+
UpvoteCount int `json:"upvoteCount" db:"upvote_count"`
+
DownvoteCount int `json:"downvoteCount" db:"downvote_count"`
+
Score int `json:"score" db:"score"`
+
ReplyCount int `json:"replyCount" db:"reply_count"`
+
}
+
+
// CommentRecord represents the atProto record structure indexed from Jetstream
+
// This is the data structure that gets stored in the user's repository
+
// Matches social.coves.feed.comment lexicon
+
type CommentRecord struct {
+
Type string `json:"$type"`
+
Reply ReplyRef `json:"reply"`
+
Content string `json:"content"`
+
Facets []interface{} `json:"facets,omitempty"`
+
Embed map[string]interface{} `json:"embed,omitempty"`
+
Langs []string `json:"langs,omitempty"`
+
Labels *SelfLabels `json:"labels,omitempty"`
+
CreatedAt string `json:"createdAt"`
+
}
+
+
// ReplyRef represents the threading structure from the comment lexicon
+
// Root always points to the original post, parent points to the immediate parent
+
type ReplyRef struct {
+
Root StrongRef `json:"root"`
+
Parent StrongRef `json:"parent"`
+
}
+
+
// StrongRef represents a strong reference to a record (URI + CID)
+
// Matches com.atproto.repo.strongRef
+
type StrongRef struct {
+
URI string `json:"uri"`
+
CID string `json:"cid"`
+
}
+
+
// SelfLabels represents self-applied content labels per com.atproto.label.defs#selfLabels
+
// This is the structured format used in atProto for content warnings
+
type SelfLabels struct {
+
Values []SelfLabel `json:"values"`
+
}
+
+
// SelfLabel represents a single label value per com.atproto.label.defs#selfLabel
+
// Neg is optional and negates the label when true
+
type SelfLabel struct {
+
Val string `json:"val"` // Required: label value (max 128 chars)
+
Neg *bool `json:"neg,omitempty"` // Optional: negates the label if true
+
}
+44
internal/core/comments/errors.go
···
+
package comments
+
+
import "errors"
+
+
var (
+
// ErrCommentNotFound indicates the requested comment doesn't exist
+
ErrCommentNotFound = errors.New("comment not found")
+
+
// ErrInvalidReply indicates the reply reference is malformed or invalid
+
ErrInvalidReply = errors.New("invalid reply reference")
+
+
// ErrParentNotFound indicates the parent post/comment doesn't exist
+
ErrParentNotFound = errors.New("parent post or comment not found")
+
+
// ErrRootNotFound indicates the root post doesn't exist
+
ErrRootNotFound = errors.New("root post not found")
+
+
// ErrContentTooLong indicates comment content exceeds 3000 graphemes
+
ErrContentTooLong = errors.New("comment content exceeds 3000 graphemes")
+
+
// ErrContentEmpty indicates comment content is empty
+
ErrContentEmpty = errors.New("comment content is required")
+
+
// ErrNotAuthorized indicates the user is not authorized to perform this action
+
ErrNotAuthorized = errors.New("not authorized")
+
+
// ErrBanned indicates the user is banned from the community
+
ErrBanned = errors.New("user is banned from this community")
+
+
// ErrCommentAlreadyExists indicates a comment with this URI already exists
+
ErrCommentAlreadyExists = errors.New("comment already exists")
+
)
+
+
// IsNotFound checks if an error is a "not found" error
+
func IsNotFound(err error) bool {
+
return errors.Is(err, ErrCommentNotFound) ||
+
errors.Is(err, ErrParentNotFound) ||
+
errors.Is(err, ErrRootNotFound)
+
}
+
+
// IsConflict checks if an error is a conflict/already exists error
+
func IsConflict(err error) bool {
+
return errors.Is(err, ErrCommentAlreadyExists)
+
}
+45
internal/core/comments/interfaces.go
···
+
package comments
+
+
import "context"
+
+
// Repository defines the data access interface for comments
+
// Used by Jetstream consumer to index comments from firehose
+
//
+
// Architecture: Comments are written directly by clients to their PDS using
+
// com.atproto.repo.createRecord/updateRecord/deleteRecord. This AppView indexes
+
// comments from Jetstream for aggregation and querying.
+
type Repository interface {
+
// Create inserts a new comment into the AppView database
+
// Called by Jetstream consumer after comment is created on PDS
+
// Idempotent: ON CONFLICT DO NOTHING for duplicate URIs
+
Create(ctx context.Context, comment *Comment) error
+
+
// Update modifies an existing comment's content fields
+
// Called by Jetstream consumer after comment is updated on PDS
+
// Preserves vote counts and created_at timestamp
+
Update(ctx context.Context, comment *Comment) error
+
+
// GetByURI retrieves a comment by its AT-URI
+
// Used for Jetstream UPDATE/DELETE operations and queries
+
GetByURI(ctx context.Context, uri string) (*Comment, error)
+
+
// Delete soft-deletes a comment (sets deleted_at)
+
// Called by Jetstream consumer after comment is deleted from PDS
+
Delete(ctx context.Context, uri string) error
+
+
// ListByRoot retrieves all comments in a thread (flat)
+
// Used for fetching entire comment threads on posts
+
ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*Comment, error)
+
+
// ListByParent retrieves direct replies to a post or comment
+
// Used for building nested/threaded comment views
+
ListByParent(ctx context.Context, parentURI string, limit, offset int) ([]*Comment, error)
+
+
// CountByParent counts direct replies to a post or comment
+
// Used for showing reply counts in threading UI
+
CountByParent(ctx context.Context, parentURI string) (int, error)
+
+
// ListByCommenter retrieves all comments by a specific user
+
// Future: Used for user comment history
+
ListByCommenter(ctx context.Context, commenterDID string, limit, offset int) ([]*Comment, error)
+
}
+63
internal/db/migrations/016_create_comments_table.sql
···
+
-- +goose Up
+
-- Create comments table for AppView indexing
+
-- Comments are indexed from the firehose after being written to user repositories
+
CREATE TABLE comments (
+
id BIGSERIAL PRIMARY KEY,
+
uri TEXT UNIQUE NOT NULL, -- AT-URI (at://commenter_did/social.coves.feed.comment/rkey)
+
cid TEXT NOT NULL, -- Content ID
+
rkey TEXT NOT NULL, -- Record key (TID)
+
commenter_did TEXT NOT NULL, -- User who commented (from AT-URI repo field)
+
+
-- Threading structure (reply references)
+
root_uri TEXT NOT NULL, -- Strong reference to original post (at://...)
+
root_cid TEXT NOT NULL, -- CID of root post (version pinning)
+
parent_uri TEXT NOT NULL, -- Strong reference to immediate parent (post or comment)
+
parent_cid TEXT NOT NULL, -- CID of parent (version pinning)
+
+
-- Content (content is required per lexicon, others optional)
+
content TEXT NOT NULL, -- Comment text (max 3000 graphemes, 30000 bytes)
+
content_facets JSONB, -- Rich text facets (social.coves.richtext.facet)
+
embed JSONB, -- Embedded content (images, quoted posts)
+
content_labels JSONB, -- Self-applied labels (com.atproto.label.defs#selfLabels)
+
langs TEXT[], -- Languages (ISO 639-1, max 3)
+
+
-- Timestamps
+
created_at TIMESTAMPTZ NOT NULL, -- Commenter's timestamp from record
+
indexed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- When indexed by AppView
+
deleted_at TIMESTAMPTZ, -- Soft delete (for firehose delete events)
+
+
-- Stats (denormalized for performance)
+
upvote_count INT NOT NULL DEFAULT 0, -- Comments can be voted on (per vote lexicon)
+
downvote_count INT NOT NULL DEFAULT 0,
+
score INT NOT NULL DEFAULT 0, -- upvote_count - downvote_count (for sorting)
+
reply_count INT NOT NULL DEFAULT 0 -- Number of direct replies to this comment
+
+
-- NO foreign key constraint on commenter_did to allow out-of-order indexing from Jetstream
+
-- Comment events may arrive before user events, which is acceptable since:
+
-- 1. Comments are authenticated by the user's PDS (security maintained)
+
-- 2. Orphaned comments from never-indexed users are harmless
+
-- 3. This prevents race conditions in the firehose consumer
+
);
+
+
-- Indexes for threading queries (most important for comment UX)
+
CREATE INDEX idx_comments_root ON comments(root_uri, created_at DESC) WHERE deleted_at IS NULL;
+
CREATE INDEX idx_comments_parent ON comments(parent_uri, created_at DESC) WHERE deleted_at IS NULL;
+
CREATE INDEX idx_comments_parent_score ON comments(parent_uri, score DESC, created_at DESC) WHERE deleted_at IS NULL;
+
+
-- Indexes for user queries
+
CREATE INDEX idx_comments_commenter ON comments(commenter_did, created_at DESC);
+
CREATE INDEX idx_comments_uri ON comments(uri);
+
+
-- Index for vote targeting (when votes target comments)
+
CREATE INDEX idx_comments_uri_active ON comments(uri) WHERE deleted_at IS NULL;
+
+
-- Comment on table
+
COMMENT ON TABLE comments IS 'Comments indexed from user repositories via Jetstream firehose consumer';
+
COMMENT ON COLUMN comments.uri IS 'AT-URI in format: at://commenter_did/social.coves.feed.comment/rkey';
+
COMMENT ON COLUMN comments.root_uri IS 'Strong reference to the original post that started the thread';
+
COMMENT ON COLUMN comments.parent_uri IS 'Strong reference to immediate parent (post or comment)';
+
COMMENT ON COLUMN comments.score IS 'Computed as upvote_count - downvote_count for ranking replies';
+
COMMENT ON COLUMN comments.content_labels IS 'Self-applied labels per com.atproto.label.defs#selfLabels (JSONB: {"values":[{"val":"nsfw","neg":false}]})';
+
+
-- +goose Down
+
DROP TABLE IF EXISTS comments CASCADE;
+354
internal/db/postgres/comment_repo.go
···
+
package postgres
+
+
import (
+
"Coves/internal/core/comments"
+
"context"
+
"database/sql"
+
"fmt"
+
"log"
+
"strings"
+
+
"github.com/lib/pq"
+
)
+
+
type postgresCommentRepo struct {
+
db *sql.DB
+
}
+
+
// NewCommentRepository creates a new PostgreSQL comment repository
+
func NewCommentRepository(db *sql.DB) comments.Repository {
+
return &postgresCommentRepo{db: db}
+
}
+
+
// Create inserts a new comment into the comments table
+
// Called by Jetstream consumer after comment is created on PDS
+
// Idempotent: Returns success if comment already exists (for Jetstream replays)
+
func (r *postgresCommentRepo) Create(ctx context.Context, comment *comments.Comment) error {
+
query := `
+
INSERT INTO comments (
+
uri, cid, rkey, commenter_did,
+
root_uri, root_cid, parent_uri, parent_cid,
+
content, content_facets, embed, content_labels, langs,
+
created_at, indexed_at
+
) VALUES (
+
$1, $2, $3, $4,
+
$5, $6, $7, $8,
+
$9, $10, $11, $12, $13,
+
$14, NOW()
+
)
+
ON CONFLICT (uri) DO NOTHING
+
RETURNING id, indexed_at
+
`
+
+
err := r.db.QueryRowContext(
+
ctx, query,
+
comment.URI, comment.CID, comment.RKey, comment.CommenterDID,
+
comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID,
+
comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
+
comment.CreatedAt,
+
).Scan(&comment.ID, &comment.IndexedAt)
+
+
// ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent)
+
if err == sql.ErrNoRows {
+
return nil // Comment already exists, no error for idempotency
+
}
+
+
if err != nil {
+
// Check for unique constraint violation
+
if strings.Contains(err.Error(), "duplicate key") {
+
return comments.ErrCommentAlreadyExists
+
}
+
+
return fmt.Errorf("failed to insert comment: %w", err)
+
}
+
+
return nil
+
}
+
+
// Update modifies an existing comment's content fields
+
// Called by Jetstream consumer after comment is updated on PDS
+
// Preserves vote counts and created_at timestamp
+
func (r *postgresCommentRepo) Update(ctx context.Context, comment *comments.Comment) error {
+
query := `
+
UPDATE comments
+
SET
+
cid = $1,
+
content = $2,
+
content_facets = $3,
+
embed = $4,
+
content_labels = $5,
+
langs = $6
+
WHERE uri = $7 AND deleted_at IS NULL
+
RETURNING id, indexed_at, created_at, upvote_count, downvote_count, score, reply_count
+
`
+
+
err := r.db.QueryRowContext(
+
ctx, query,
+
comment.CID,
+
comment.Content,
+
comment.ContentFacets,
+
comment.Embed,
+
comment.ContentLabels,
+
pq.Array(comment.Langs),
+
comment.URI,
+
).Scan(
+
&comment.ID,
+
&comment.IndexedAt,
+
&comment.CreatedAt,
+
&comment.UpvoteCount,
+
&comment.DownvoteCount,
+
&comment.Score,
+
&comment.ReplyCount,
+
)
+
+
if err == sql.ErrNoRows {
+
return comments.ErrCommentNotFound
+
}
+
if err != nil {
+
return fmt.Errorf("failed to update comment: %w", err)
+
}
+
+
return nil
+
}
+
+
// GetByURI retrieves a comment by its AT-URI
+
// Used by Jetstream consumer for UPDATE/DELETE operations
+
func (r *postgresCommentRepo) GetByURI(ctx context.Context, uri string) (*comments.Comment, error) {
+
query := `
+
SELECT
+
id, uri, cid, rkey, commenter_did,
+
root_uri, root_cid, parent_uri, parent_cid,
+
content, content_facets, embed, content_labels, langs,
+
created_at, indexed_at, deleted_at,
+
upvote_count, downvote_count, score, reply_count
+
FROM comments
+
WHERE uri = $1
+
`
+
+
var comment comments.Comment
+
var langs pq.StringArray
+
+
err := r.db.QueryRowContext(ctx, query, uri).Scan(
+
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
+
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
+
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
+
)
+
+
if err == sql.ErrNoRows {
+
return nil, comments.ErrCommentNotFound
+
}
+
if err != nil {
+
return nil, fmt.Errorf("failed to get comment by URI: %w", err)
+
}
+
+
comment.Langs = langs
+
+
return &comment, nil
+
}
+
+
// Delete soft-deletes a comment (sets deleted_at)
+
// Called by Jetstream consumer after comment is deleted from PDS
+
// Idempotent: Returns success if comment already deleted
+
func (r *postgresCommentRepo) Delete(ctx context.Context, uri string) error {
+
query := `
+
UPDATE comments
+
SET deleted_at = NOW()
+
WHERE uri = $1 AND deleted_at IS NULL
+
`
+
+
result, err := r.db.ExecContext(ctx, query, uri)
+
if err != nil {
+
return fmt.Errorf("failed to delete comment: %w", err)
+
}
+
+
rowsAffected, err := result.RowsAffected()
+
if err != nil {
+
return fmt.Errorf("failed to check delete result: %w", err)
+
}
+
+
// Idempotent: If no rows affected, comment already deleted (OK for Jetstream replays)
+
if rowsAffected == 0 {
+
return nil
+
}
+
+
return nil
+
}
+
+
// ListByRoot retrieves all active comments in a thread (flat)
+
// Used for fetching entire comment threads on posts
+
func (r *postgresCommentRepo) ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*comments.Comment, error) {
+
query := `
+
SELECT
+
id, uri, cid, rkey, commenter_did,
+
root_uri, root_cid, parent_uri, parent_cid,
+
content, content_facets, embed, content_labels, langs,
+
created_at, indexed_at, deleted_at,
+
upvote_count, downvote_count, score, reply_count
+
FROM comments
+
WHERE root_uri = $1 AND deleted_at IS NULL
+
ORDER BY created_at ASC
+
LIMIT $2 OFFSET $3
+
`
+
+
rows, err := r.db.QueryContext(ctx, query, rootURI, limit, offset)
+
if err != nil {
+
return nil, fmt.Errorf("failed to list comments by root: %w", err)
+
}
+
defer func() {
+
if err := rows.Close(); err != nil {
+
log.Printf("Failed to close rows: %v", err)
+
}
+
}()
+
+
var result []*comments.Comment
+
for rows.Next() {
+
var comment comments.Comment
+
var langs pq.StringArray
+
+
err := rows.Scan(
+
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
+
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
+
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
+
)
+
if err != nil {
+
return nil, fmt.Errorf("failed to scan comment: %w", err)
+
}
+
+
comment.Langs = langs
+
result = append(result, &comment)
+
}
+
+
if err = rows.Err(); err != nil {
+
return nil, fmt.Errorf("error iterating comments: %w", err)
+
}
+
+
return result, nil
+
}
+
+
// ListByParent retrieves direct replies to a post or comment
+
// Used for building nested/threaded comment views
+
func (r *postgresCommentRepo) ListByParent(ctx context.Context, parentURI string, limit, offset int) ([]*comments.Comment, error) {
+
query := `
+
SELECT
+
id, uri, cid, rkey, commenter_did,
+
root_uri, root_cid, parent_uri, parent_cid,
+
content, content_facets, embed, content_labels, langs,
+
created_at, indexed_at, deleted_at,
+
upvote_count, downvote_count, score, reply_count
+
FROM comments
+
WHERE parent_uri = $1 AND deleted_at IS NULL
+
ORDER BY created_at ASC
+
LIMIT $2 OFFSET $3
+
`
+
+
rows, err := r.db.QueryContext(ctx, query, parentURI, limit, offset)
+
if err != nil {
+
return nil, fmt.Errorf("failed to list comments by parent: %w", err)
+
}
+
defer func() {
+
if err := rows.Close(); err != nil {
+
log.Printf("Failed to close rows: %v", err)
+
}
+
}()
+
+
var result []*comments.Comment
+
for rows.Next() {
+
var comment comments.Comment
+
var langs pq.StringArray
+
+
err := rows.Scan(
+
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
+
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
+
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
+
)
+
if err != nil {
+
return nil, fmt.Errorf("failed to scan comment: %w", err)
+
}
+
+
comment.Langs = langs
+
result = append(result, &comment)
+
}
+
+
if err = rows.Err(); err != nil {
+
return nil, fmt.Errorf("error iterating comments: %w", err)
+
}
+
+
return result, nil
+
}
+
+
// CountByParent counts direct replies to a post or comment
+
// Used for showing reply counts in threading UI
+
func (r *postgresCommentRepo) CountByParent(ctx context.Context, parentURI string) (int, error) {
+
query := `
+
SELECT COUNT(*)
+
FROM comments
+
WHERE parent_uri = $1 AND deleted_at IS NULL
+
`
+
+
var count int
+
err := r.db.QueryRowContext(ctx, query, parentURI).Scan(&count)
+
if err != nil {
+
return 0, fmt.Errorf("failed to count comments by parent: %w", err)
+
}
+
+
return count, nil
+
}
+
+
// ListByCommenter retrieves all active comments by a specific user
+
// Future: Used for user comment history
+
func (r *postgresCommentRepo) ListByCommenter(ctx context.Context, commenterDID string, limit, offset int) ([]*comments.Comment, error) {
+
query := `
+
SELECT
+
id, uri, cid, rkey, commenter_did,
+
root_uri, root_cid, parent_uri, parent_cid,
+
content, content_facets, embed, content_labels, langs,
+
created_at, indexed_at, deleted_at,
+
upvote_count, downvote_count, score, reply_count
+
FROM comments
+
WHERE commenter_did = $1 AND deleted_at IS NULL
+
ORDER BY created_at DESC
+
LIMIT $2 OFFSET $3
+
`
+
+
rows, err := r.db.QueryContext(ctx, query, commenterDID, limit, offset)
+
if err != nil {
+
return nil, fmt.Errorf("failed to list comments by commenter: %w", err)
+
}
+
defer func() {
+
if err := rows.Close(); err != nil {
+
log.Printf("Failed to close rows: %v", err)
+
}
+
}()
+
+
var result []*comments.Comment
+
for rows.Next() {
+
var comment comments.Comment
+
var langs pq.StringArray
+
+
err := rows.Scan(
+
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
+
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
+
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
+
)
+
if err != nil {
+
return nil, fmt.Errorf("failed to scan comment: %w", err)
+
}
+
+
comment.Langs = langs
+
result = append(result, &comment)
+
}
+
+
if err = rows.Err(); err != nil {
+
return nil, fmt.Errorf("error iterating comments: %w", err)
+
}
+
+
return result, nil
+
}
+1763
tests/integration/comment_consumer_test.go
···
+
package integration
+
+
import (
+
"Coves/internal/atproto/jetstream"
+
"Coves/internal/core/comments"
+
"Coves/internal/db/postgres"
+
"context"
+
"fmt"
+
"testing"
+
"time"
+
)
+
+
func TestCommentConsumer_CreateComment(t *testing.T) {
+
db := setupTestDB(t)
+
defer func() {
+
if err := db.Close(); err != nil {
+
t.Logf("Failed to close database: %v", err)
+
}
+
}()
+
+
ctx := context.Background()
+
commentRepo := postgres.NewCommentRepository(db)
+
consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
+
+
// Setup test data
+
testUser := createTestUser(t, db, "commenter.test", "did:plc:commenter123")
+
testCommunity, err := createFeedTestCommunity(db, ctx, "testcommunity", "owner.test")
+
if err != nil {
+
t.Fatalf("Failed to create test community: %v", err)
+
}
+
testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Test Post", 0, time.Now())
+
+
t.Run("Create comment on post", func(t *testing.T) {
+
rkey := generateTID()
+
uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey)
+
+
// Simulate Jetstream comment create event
+
event := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: "test-rev",
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey,
+
CID: "bafytest123",
+
Record: map[string]interface{}{
+
"$type": "social.coves.feed.comment",
+
"content": "This is a test comment on a post!",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
// Handle the event
+
err := consumer.HandleEvent(ctx, event)
+
if err != nil {
+
t.Fatalf("Failed to handle comment create event: %v", err)
+
}
+
+
// Verify comment was indexed
+
comment, err := commentRepo.GetByURI(ctx, uri)
+
if err != nil {
+
t.Fatalf("Failed to get indexed comment: %v", err)
+
}
+
+
if comment.URI != uri {
+
t.Errorf("Expected URI %s, got %s", uri, comment.URI)
+
}
+
+
if comment.CommenterDID != testUser.DID {
+
t.Errorf("Expected commenter %s, got %s", testUser.DID, comment.CommenterDID)
+
}
+
+
if comment.Content != "This is a test comment on a post!" {
+
t.Errorf("Expected content 'This is a test comment on a post!', got %s", comment.Content)
+
}
+
+
if comment.RootURI != testPostURI {
+
t.Errorf("Expected root URI %s, got %s", testPostURI, comment.RootURI)
+
}
+
+
if comment.ParentURI != testPostURI {
+
t.Errorf("Expected parent URI %s, got %s", testPostURI, comment.ParentURI)
+
}
+
+
// Verify post comment count was incremented
+
var commentCount int
+
err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&commentCount)
+
if err != nil {
+
t.Fatalf("Failed to get post comment count: %v", err)
+
}
+
+
if commentCount != 1 {
+
t.Errorf("Expected post comment_count to be 1, got %d", commentCount)
+
}
+
})
+
+
t.Run("Idempotent create - duplicate event", func(t *testing.T) {
+
rkey := generateTID()
+
+
event := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: "test-rev",
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey,
+
CID: "bafytest456",
+
Record: map[string]interface{}{
+
"$type": "social.coves.feed.comment",
+
"content": "Idempotent test comment",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
// First creation
+
err := consumer.HandleEvent(ctx, event)
+
if err != nil {
+
t.Fatalf("First creation failed: %v", err)
+
}
+
+
// Get initial comment count
+
var initialCount int
+
err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&initialCount)
+
if err != nil {
+
t.Fatalf("Failed to get initial comment count: %v", err)
+
}
+
+
// Duplicate creation - should be idempotent
+
err = consumer.HandleEvent(ctx, event)
+
if err != nil {
+
t.Fatalf("Duplicate event should be handled gracefully: %v", err)
+
}
+
+
// Verify count wasn't incremented again
+
var finalCount int
+
err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&finalCount)
+
if err != nil {
+
t.Fatalf("Failed to get final comment count: %v", err)
+
}
+
+
if finalCount != initialCount {
+
t.Errorf("Comment count should not increase on duplicate event. Initial: %d, Final: %d", initialCount, finalCount)
+
}
+
})
+
}
+
+
func TestCommentConsumer_Threading(t *testing.T) {
+
db := setupTestDB(t)
+
defer func() {
+
if err := db.Close(); err != nil {
+
t.Logf("Failed to close database: %v", err)
+
}
+
}()
+
+
ctx := context.Background()
+
commentRepo := postgres.NewCommentRepository(db)
+
consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
+
+
// Setup test data
+
testUser := createTestUser(t, db, "threader.test", "did:plc:threader123")
+
testCommunity, err := createFeedTestCommunity(db, ctx, "threadcommunity", "owner2.test")
+
if err != nil {
+
t.Fatalf("Failed to create test community: %v", err)
+
}
+
testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Threading Test", 0, time.Now())
+
+
t.Run("Create nested comment replies", func(t *testing.T) {
+
// Create first-level comment on post
+
comment1Rkey := generateTID()
+
comment1URI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, comment1Rkey)
+
+
event1 := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: comment1Rkey,
+
CID: "bafycomment1",
+
Record: map[string]interface{}{
+
"content": "First level comment",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, event1)
+
if err != nil {
+
t.Fatalf("Failed to create first-level comment: %v", err)
+
}
+
+
// Create second-level comment (reply to first comment)
+
comment2Rkey := generateTID()
+
comment2URI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, comment2Rkey)
+
+
event2 := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: comment2Rkey,
+
CID: "bafycomment2",
+
Record: map[string]interface{}{
+
"content": "Second level comment (reply to first)",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": comment1URI,
+
"cid": "bafycomment1",
+
},
+
},
+
"createdAt": time.Now().Add(1 * time.Second).Format(time.RFC3339),
+
},
+
},
+
}
+
+
err = consumer.HandleEvent(ctx, event2)
+
if err != nil {
+
t.Fatalf("Failed to create second-level comment: %v", err)
+
}
+
+
// Verify threading structure
+
comment1, err := commentRepo.GetByURI(ctx, comment1URI)
+
if err != nil {
+
t.Fatalf("Failed to get first comment: %v", err)
+
}
+
+
comment2, err := commentRepo.GetByURI(ctx, comment2URI)
+
if err != nil {
+
t.Fatalf("Failed to get second comment: %v", err)
+
}
+
+
// Both should have same root (original post)
+
if comment1.RootURI != testPostURI {
+
t.Errorf("Comment1 root should be post URI, got %s", comment1.RootURI)
+
}
+
+
if comment2.RootURI != testPostURI {
+
t.Errorf("Comment2 root should be post URI, got %s", comment2.RootURI)
+
}
+
+
// Comment1 parent should be post
+
if comment1.ParentURI != testPostURI {
+
t.Errorf("Comment1 parent should be post URI, got %s", comment1.ParentURI)
+
}
+
+
// Comment2 parent should be comment1
+
if comment2.ParentURI != comment1URI {
+
t.Errorf("Comment2 parent should be comment1 URI, got %s", comment2.ParentURI)
+
}
+
+
// Verify reply count on comment1
+
if comment1.ReplyCount != 1 {
+
t.Errorf("Comment1 should have 1 reply, got %d", comment1.ReplyCount)
+
}
+
+
// Query all comments by root
+
allComments, err := commentRepo.ListByRoot(ctx, testPostURI, 100, 0)
+
if err != nil {
+
t.Fatalf("Failed to list comments by root: %v", err)
+
}
+
+
if len(allComments) != 2 {
+
t.Errorf("Expected 2 comments in thread, got %d", len(allComments))
+
}
+
+
// Query direct replies to post
+
directReplies, err := commentRepo.ListByParent(ctx, testPostURI, 100, 0)
+
if err != nil {
+
t.Fatalf("Failed to list direct replies to post: %v", err)
+
}
+
+
if len(directReplies) != 1 {
+
t.Errorf("Expected 1 direct reply to post, got %d", len(directReplies))
+
}
+
+
// Query replies to comment1
+
comment1Replies, err := commentRepo.ListByParent(ctx, comment1URI, 100, 0)
+
if err != nil {
+
t.Fatalf("Failed to list replies to comment1: %v", err)
+
}
+
+
if len(comment1Replies) != 1 {
+
t.Errorf("Expected 1 reply to comment1, got %d", len(comment1Replies))
+
}
+
})
+
}
+
+
func TestCommentConsumer_UpdateComment(t *testing.T) {
+
db := setupTestDB(t)
+
defer func() {
+
if err := db.Close(); err != nil {
+
t.Logf("Failed to close database: %v", err)
+
}
+
}()
+
+
ctx := context.Background()
+
commentRepo := postgres.NewCommentRepository(db)
+
consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
+
+
// Setup test data
+
testUser := createTestUser(t, db, "editor.test", "did:plc:editor123")
+
testCommunity, err := createFeedTestCommunity(db, ctx, "editcommunity", "owner3.test")
+
if err != nil {
+
t.Fatalf("Failed to create test community: %v", err)
+
}
+
testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Edit Test", 0, time.Now())
+
+
t.Run("Update comment content preserves vote counts", func(t *testing.T) {
+
rkey := generateTID()
+
uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey)
+
+
// Create initial comment
+
createEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey,
+
CID: "bafyoriginal",
+
Record: map[string]interface{}{
+
"content": "Original comment content",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, createEvent)
+
if err != nil {
+
t.Fatalf("Failed to create comment: %v", err)
+
}
+
+
// Manually set vote counts to simulate votes
+
_, err = db.ExecContext(ctx, `
+
UPDATE comments
+
SET upvote_count = 5, downvote_count = 2, score = 3
+
WHERE uri = $1
+
`, uri)
+
if err != nil {
+
t.Fatalf("Failed to set vote counts: %v", err)
+
}
+
+
// Update the comment
+
updateEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Operation: "update",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey,
+
CID: "bafyupdated",
+
Record: map[string]interface{}{
+
"content": "EDITED: Updated comment content",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err = consumer.HandleEvent(ctx, updateEvent)
+
if err != nil {
+
t.Fatalf("Failed to update comment: %v", err)
+
}
+
+
// Verify content updated
+
comment, err := commentRepo.GetByURI(ctx, uri)
+
if err != nil {
+
t.Fatalf("Failed to get updated comment: %v", err)
+
}
+
+
if comment.Content != "EDITED: Updated comment content" {
+
t.Errorf("Expected updated content, got %s", comment.Content)
+
}
+
+
// Verify CID updated
+
if comment.CID != "bafyupdated" {
+
t.Errorf("Expected CID to be updated to bafyupdated, got %s", comment.CID)
+
}
+
+
// Verify vote counts preserved
+
if comment.UpvoteCount != 5 {
+
t.Errorf("Expected upvote_count preserved at 5, got %d", comment.UpvoteCount)
+
}
+
+
if comment.DownvoteCount != 2 {
+
t.Errorf("Expected downvote_count preserved at 2, got %d", comment.DownvoteCount)
+
}
+
+
if comment.Score != 3 {
+
t.Errorf("Expected score preserved at 3, got %d", comment.Score)
+
}
+
})
+
}
+
+
func TestCommentConsumer_DeleteComment(t *testing.T) {
+
db := setupTestDB(t)
+
defer func() {
+
if err := db.Close(); err != nil {
+
t.Logf("Failed to close database: %v", err)
+
}
+
}()
+
+
ctx := context.Background()
+
commentRepo := postgres.NewCommentRepository(db)
+
consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
+
+
// Setup test data
+
testUser := createTestUser(t, db, "deleter.test", "did:plc:deleter123")
+
testCommunity, err := createFeedTestCommunity(db, ctx, "deletecommunity", "owner4.test")
+
if err != nil {
+
t.Fatalf("Failed to create test community: %v", err)
+
}
+
testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Delete Test", 0, time.Now())
+
+
t.Run("Delete comment decrements parent count", func(t *testing.T) {
+
rkey := generateTID()
+
uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey)
+
+
// Create comment
+
createEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey,
+
CID: "bafydelete",
+
Record: map[string]interface{}{
+
"content": "Comment to be deleted",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, createEvent)
+
if err != nil {
+
t.Fatalf("Failed to create comment: %v", err)
+
}
+
+
// Get initial post comment count
+
var initialCount int
+
err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&initialCount)
+
if err != nil {
+
t.Fatalf("Failed to get initial comment count: %v", err)
+
}
+
+
// Delete comment
+
deleteEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Operation: "delete",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey,
+
},
+
}
+
+
err = consumer.HandleEvent(ctx, deleteEvent)
+
if err != nil {
+
t.Fatalf("Failed to delete comment: %v", err)
+
}
+
+
// Verify soft delete
+
comment, err := commentRepo.GetByURI(ctx, uri)
+
if err != nil {
+
t.Fatalf("Failed to get deleted comment: %v", err)
+
}
+
+
if comment.DeletedAt == nil {
+
t.Error("Expected deleted_at to be set, got nil")
+
}
+
+
// Verify post comment count decremented
+
var finalCount int
+
err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&finalCount)
+
if err != nil {
+
t.Fatalf("Failed to get final comment count: %v", err)
+
}
+
+
if finalCount != initialCount-1 {
+
t.Errorf("Expected comment count to decrease by 1. Initial: %d, Final: %d", initialCount, finalCount)
+
}
+
})
+
+
t.Run("Delete is idempotent", func(t *testing.T) {
+
rkey := generateTID()
+
+
// Create comment
+
createEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey,
+
CID: "bafyidempdelete",
+
Record: map[string]interface{}{
+
"content": "Idempotent delete test",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, createEvent)
+
if err != nil {
+
t.Fatalf("Failed to create comment: %v", err)
+
}
+
+
// First delete
+
deleteEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Operation: "delete",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey,
+
},
+
}
+
+
err = consumer.HandleEvent(ctx, deleteEvent)
+
if err != nil {
+
t.Fatalf("First delete failed: %v", err)
+
}
+
+
// Get count after first delete
+
var countAfterFirstDelete int
+
err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&countAfterFirstDelete)
+
if err != nil {
+
t.Fatalf("Failed to get count after first delete: %v", err)
+
}
+
+
// Second delete (idempotent)
+
err = consumer.HandleEvent(ctx, deleteEvent)
+
if err != nil {
+
t.Fatalf("Second delete should be idempotent: %v", err)
+
}
+
+
// Verify count didn't change
+
var countAfterSecondDelete int
+
err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", testPostURI).Scan(&countAfterSecondDelete)
+
if err != nil {
+
t.Fatalf("Failed to get count after second delete: %v", err)
+
}
+
+
if countAfterSecondDelete != countAfterFirstDelete {
+
t.Errorf("Count should not change on duplicate delete. After first: %d, After second: %d", countAfterFirstDelete, countAfterSecondDelete)
+
}
+
})
+
}
+
+
func TestCommentConsumer_SecurityValidation(t *testing.T) {
+
db := setupTestDB(t)
+
defer func() {
+
if err := db.Close(); err != nil {
+
t.Logf("Failed to close database: %v", err)
+
}
+
}()
+
+
ctx := context.Background()
+
commentRepo := postgres.NewCommentRepository(db)
+
consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
+
+
testUser := createTestUser(t, db, "security.test", "did:plc:security123")
+
testCommunity, err := createFeedTestCommunity(db, ctx, "seccommunity", "owner5.test")
+
if err != nil {
+
t.Fatalf("Failed to create test community: %v", err)
+
}
+
testPostURI := createTestPost(t, db, testCommunity, testUser.DID, "Security Test", 0, time.Now())
+
+
t.Run("Reject comment with empty content", func(t *testing.T) {
+
event := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: generateTID(),
+
CID: "bafyinvalid",
+
Record: map[string]interface{}{
+
"content": "", // Empty content
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, event)
+
if err == nil {
+
t.Error("Expected error for empty content, got nil")
+
}
+
})
+
+
t.Run("Reject comment with invalid root reference", func(t *testing.T) {
+
event := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: generateTID(),
+
CID: "bafyinvalid2",
+
Record: map[string]interface{}{
+
"content": "Valid content",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": "", // Missing URI
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, event)
+
if err == nil {
+
t.Error("Expected error for invalid root reference, got nil")
+
}
+
})
+
+
t.Run("Reject comment with invalid parent reference", func(t *testing.T) {
+
event := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: generateTID(),
+
CID: "bafyinvalid3",
+
Record: map[string]interface{}{
+
"content": "Valid content",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "", // Missing CID
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, event)
+
if err == nil {
+
t.Error("Expected error for invalid parent reference, got nil")
+
}
+
})
+
+
t.Run("Reject comment with invalid DID format", func(t *testing.T) {
+
event := &jetstream.JetstreamEvent{
+
Did: "invalid-did-format", // Bad DID
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: generateTID(),
+
CID: "bafyinvalid4",
+
Record: map[string]interface{}{
+
"content": "Valid content",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, event)
+
if err == nil {
+
t.Error("Expected error for invalid DID format, got nil")
+
}
+
})
+
+
t.Run("Reject comment exceeding max content length", func(t *testing.T) {
+
event := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: generateTID(),
+
CID: "bafytoobig",
+
Record: map[string]interface{}{
+
"content": string(make([]byte, 30001)), // Exceeds 30000 byte limit
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, event)
+
if err == nil {
+
t.Error("Expected error for oversized content, got nil")
+
}
+
if err != nil && !contains(err.Error(), "exceeds maximum length") {
+
t.Errorf("Expected 'exceeds maximum length' error, got: %v", err)
+
}
+
})
+
+
t.Run("Reject comment with malformed parent URI", func(t *testing.T) {
+
event := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: generateTID(),
+
CID: "bafymalformed",
+
Record: map[string]interface{}{
+
"content": "Valid content",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": "at://malformed", // Invalid: missing collection/rkey
+
"cid": "bafyparent",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, event)
+
if err == nil {
+
t.Error("Expected error for malformed AT-URI, got nil")
+
}
+
if err != nil && !contains(err.Error(), "invalid parent URI") {
+
t.Errorf("Expected 'invalid parent URI' error, got: %v", err)
+
}
+
})
+
+
t.Run("Reject comment with malformed root URI", func(t *testing.T) {
+
event := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: generateTID(),
+
CID: "bafymalformed2",
+
Record: map[string]interface{}{
+
"content": "Valid content",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": "at://did:plc:test123", // Invalid: missing collection/rkey
+
"cid": "bafyroot",
+
},
+
"parent": map[string]interface{}{
+
"uri": testPostURI,
+
"cid": "bafyparent",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, event)
+
if err == nil {
+
t.Error("Expected error for malformed AT-URI, got nil")
+
}
+
if err != nil && !contains(err.Error(), "invalid root URI") {
+
t.Errorf("Expected 'invalid root URI' error, got: %v", err)
+
}
+
})
+
}
+
+
func TestCommentRepository_Queries(t *testing.T) {
+
db := setupTestDB(t)
+
defer func() {
+
if err := db.Close(); err != nil {
+
t.Logf("Failed to close database: %v", err)
+
}
+
}()
+
+
ctx := context.Background()
+
commentRepo := postgres.NewCommentRepository(db)
+
+
// Clean up any existing test data from previous runs
+
_, err := db.ExecContext(ctx, "DELETE FROM comments WHERE commenter_did LIKE 'did:plc:%'")
+
if err != nil {
+
t.Fatalf("Failed to clean up test comments: %v", err)
+
}
+
+
testUser := createTestUser(t, db, "query.test", "did:plc:query123")
+
testCommunity, err := createFeedTestCommunity(db, ctx, "querycommunity", "owner6.test")
+
if err != nil {
+
t.Fatalf("Failed to create test community: %v", err)
+
}
+
postURI := createTestPost(t, db, testCommunity, testUser.DID, "Query Test", 0, time.Now())
+
+
// Create a comment tree
+
// Post
+
// |- Comment 1
+
// |- Comment 2
+
// |- Comment 3
+
// |- Comment 4
+
+
comment1 := &comments.Comment{
+
URI: fmt.Sprintf("at://%s/social.coves.feed.comment/1", testUser.DID),
+
CID: "bafyc1",
+
RKey: "1",
+
CommenterDID: testUser.DID,
+
RootURI: postURI,
+
RootCID: "bafypost",
+
ParentURI: postURI,
+
ParentCID: "bafypost",
+
Content: "Comment 1",
+
Langs: []string{},
+
CreatedAt: time.Now(),
+
}
+
+
comment2 := &comments.Comment{
+
URI: fmt.Sprintf("at://%s/social.coves.feed.comment/2", testUser.DID),
+
CID: "bafyc2",
+
RKey: "2",
+
CommenterDID: testUser.DID,
+
RootURI: postURI,
+
RootCID: "bafypost",
+
ParentURI: comment1.URI,
+
ParentCID: "bafyc1",
+
Content: "Comment 2 (reply to 1)",
+
Langs: []string{},
+
CreatedAt: time.Now().Add(1 * time.Second),
+
}
+
+
comment3 := &comments.Comment{
+
URI: fmt.Sprintf("at://%s/social.coves.feed.comment/3", testUser.DID),
+
CID: "bafyc3",
+
RKey: "3",
+
CommenterDID: testUser.DID,
+
RootURI: postURI,
+
RootCID: "bafypost",
+
ParentURI: comment1.URI,
+
ParentCID: "bafyc1",
+
Content: "Comment 3 (reply to 1)",
+
Langs: []string{},
+
CreatedAt: time.Now().Add(2 * time.Second),
+
}
+
+
comment4 := &comments.Comment{
+
URI: fmt.Sprintf("at://%s/social.coves.feed.comment/4", testUser.DID),
+
CID: "bafyc4",
+
RKey: "4",
+
CommenterDID: testUser.DID,
+
RootURI: postURI,
+
RootCID: "bafypost",
+
ParentURI: postURI,
+
ParentCID: "bafypost",
+
Content: "Comment 4",
+
Langs: []string{},
+
CreatedAt: time.Now().Add(3 * time.Second),
+
}
+
+
// Create all comments
+
for i, c := range []*comments.Comment{comment1, comment2, comment3, comment4} {
+
if err := commentRepo.Create(ctx, c); err != nil {
+
t.Fatalf("Failed to create comment %d: %v", i+1, err)
+
}
+
t.Logf("Created comment %d: %s", i+1, c.URI)
+
}
+
+
// Verify comments were created
+
verifyCount, err := commentRepo.CountByParent(ctx, postURI)
+
if err != nil {
+
t.Fatalf("Failed to count comments: %v", err)
+
}
+
t.Logf("Direct replies to post after creation: %d", verifyCount)
+
+
t.Run("ListByRoot returns all comments in thread", func(t *testing.T) {
+
comments, err := commentRepo.ListByRoot(ctx, postURI, 100, 0)
+
if err != nil {
+
t.Fatalf("Failed to list by root: %v", err)
+
}
+
+
if len(comments) != 4 {
+
t.Errorf("Expected 4 comments, got %d", len(comments))
+
}
+
})
+
+
t.Run("ListByParent returns direct replies", func(t *testing.T) {
+
// Direct replies to post
+
postReplies, err := commentRepo.ListByParent(ctx, postURI, 100, 0)
+
if err != nil {
+
t.Fatalf("Failed to list post replies: %v", err)
+
}
+
+
if len(postReplies) != 2 {
+
t.Errorf("Expected 2 direct replies to post, got %d", len(postReplies))
+
}
+
+
// Direct replies to comment1
+
comment1Replies, err := commentRepo.ListByParent(ctx, comment1.URI, 100, 0)
+
if err != nil {
+
t.Fatalf("Failed to list comment1 replies: %v", err)
+
}
+
+
if len(comment1Replies) != 2 {
+
t.Errorf("Expected 2 direct replies to comment1, got %d", len(comment1Replies))
+
}
+
})
+
+
t.Run("CountByParent returns correct counts", func(t *testing.T) {
+
postCount, err := commentRepo.CountByParent(ctx, postURI)
+
if err != nil {
+
t.Fatalf("Failed to count post replies: %v", err)
+
}
+
+
if postCount != 2 {
+
t.Errorf("Expected 2 direct replies to post, got %d", postCount)
+
}
+
+
comment1Count, err := commentRepo.CountByParent(ctx, comment1.URI)
+
if err != nil {
+
t.Fatalf("Failed to count comment1 replies: %v", err)
+
}
+
+
if comment1Count != 2 {
+
t.Errorf("Expected 2 direct replies to comment1, got %d", comment1Count)
+
}
+
})
+
+
t.Run("ListByCommenter returns user's comments", func(t *testing.T) {
+
userComments, err := commentRepo.ListByCommenter(ctx, testUser.DID, 100, 0)
+
if err != nil {
+
t.Fatalf("Failed to list by commenter: %v", err)
+
}
+
+
if len(userComments) != 4 {
+
t.Errorf("Expected 4 comments by user, got %d", len(userComments))
+
}
+
})
+
}
+
+
// TestCommentConsumer_OutOfOrderReconciliation tests that parent counts are
+
// correctly reconciled when child comments arrive before their parent
+
func TestCommentConsumer_OutOfOrderReconciliation(t *testing.T) {
+
db := setupTestDB(t)
+
defer func() {
+
if err := db.Close(); err != nil {
+
t.Logf("Failed to close database: %v", err)
+
}
+
}()
+
+
ctx := context.Background()
+
commentRepo := postgres.NewCommentRepository(db)
+
consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
+
+
testUser := createTestUser(t, db, "outoforder.test", "did:plc:outoforder123")
+
testCommunity, err := createFeedTestCommunity(db, ctx, "ooo-community", "owner7.test")
+
if err != nil {
+
t.Fatalf("Failed to create test community: %v", err)
+
}
+
postURI := createTestPost(t, db, testCommunity, testUser.DID, "OOO Test Post", 0, time.Now())
+
+
t.Run("Child arrives before parent - count reconciled", func(t *testing.T) {
+
// Scenario: User A creates comment C1 on post
+
// User B creates reply C2 to C1
+
// Jetstream delivers C2 before C1 (different repos)
+
// When C1 finally arrives, its reply_count should be 1, not 0
+
+
parentRkey := generateTID()
+
parentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, parentRkey)
+
+
childRkey := generateTID()
+
childURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, childRkey)
+
+
// Step 1: Index child FIRST (before parent exists)
+
childEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: "child-rev",
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: childRkey,
+
CID: "bafychild",
+
Record: map[string]interface{}{
+
"$type": "social.coves.feed.comment",
+
"content": "This is a reply to a comment that doesn't exist yet!",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": postURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": parentURI, // Points to parent that doesn't exist yet
+
"cid": "bafyparent",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, childEvent)
+
if err != nil {
+
t.Fatalf("Failed to handle child event: %v", err)
+
}
+
+
// Verify child was indexed
+
childComment, err := commentRepo.GetByURI(ctx, childURI)
+
if err != nil {
+
t.Fatalf("Child comment not indexed: %v", err)
+
}
+
if childComment.ParentURI != parentURI {
+
t.Errorf("Expected child parent_uri %s, got %s", parentURI, childComment.ParentURI)
+
}
+
+
// Step 2: Now index parent (arrives late due to Jetstream ordering)
+
parentEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: "parent-rev",
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: parentRkey,
+
CID: "bafyparent",
+
Record: map[string]interface{}{
+
"$type": "social.coves.feed.comment",
+
"content": "This is the parent comment arriving late",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": postURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": postURI,
+
"cid": "bafypost",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err = consumer.HandleEvent(ctx, parentEvent)
+
if err != nil {
+
t.Fatalf("Failed to handle parent event: %v", err)
+
}
+
+
// Step 3: Verify parent was indexed with CORRECT reply_count
+
parentComment, err := commentRepo.GetByURI(ctx, parentURI)
+
if err != nil {
+
t.Fatalf("Parent comment not indexed: %v", err)
+
}
+
+
// THIS IS THE KEY TEST: Parent should have reply_count = 1 due to reconciliation
+
if parentComment.ReplyCount != 1 {
+
t.Errorf("Expected parent reply_count to be 1 (reconciled), got %d", parentComment.ReplyCount)
+
t.Logf("This indicates out-of-order reconciliation failed!")
+
}
+
+
// Verify via query as well
+
count, err := commentRepo.CountByParent(ctx, parentURI)
+
if err != nil {
+
t.Fatalf("Failed to count parent replies: %v", err)
+
}
+
if count != 1 {
+
t.Errorf("Expected 1 reply to parent, got %d", count)
+
}
+
})
+
+
t.Run("Multiple children arrive before parent", func(t *testing.T) {
+
parentRkey := generateTID()
+
parentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, parentRkey)
+
+
// Index 3 children before parent
+
for i := 1; i <= 3; i++ {
+
childRkey := generateTID()
+
childEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: fmt.Sprintf("child-%d-rev", i),
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: childRkey,
+
CID: fmt.Sprintf("bafychild%d", i),
+
Record: map[string]interface{}{
+
"$type": "social.coves.feed.comment",
+
"content": fmt.Sprintf("Reply %d before parent", i),
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": postURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": parentURI,
+
"cid": "bafyparent2",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, childEvent)
+
if err != nil {
+
t.Fatalf("Failed to handle child %d event: %v", i, err)
+
}
+
}
+
+
// Now index parent
+
parentEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: "parent2-rev",
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: parentRkey,
+
CID: "bafyparent2",
+
Record: map[string]interface{}{
+
"$type": "social.coves.feed.comment",
+
"content": "Parent with 3 pre-existing children",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": postURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": postURI,
+
"cid": "bafypost",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, parentEvent)
+
if err != nil {
+
t.Fatalf("Failed to handle parent event: %v", err)
+
}
+
+
// Verify parent has reply_count = 3
+
parentComment, err := commentRepo.GetByURI(ctx, parentURI)
+
if err != nil {
+
t.Fatalf("Parent comment not indexed: %v", err)
+
}
+
+
if parentComment.ReplyCount != 3 {
+
t.Errorf("Expected parent reply_count to be 3 (reconciled), got %d", parentComment.ReplyCount)
+
}
+
})
+
}
+
+
// TestCommentConsumer_Resurrection tests that soft-deleted comments can be recreated
+
// In atProto, deleted records' rkeys become available for reuse
+
func TestCommentConsumer_Resurrection(t *testing.T) {
+
db := setupTestDB(t)
+
defer func() {
+
if err := db.Close(); err != nil {
+
t.Logf("Failed to close database: %v", err)
+
}
+
}()
+
+
ctx := context.Background()
+
commentRepo := postgres.NewCommentRepository(db)
+
consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
+
+
testUser := createTestUser(t, db, "resurrect.test", "did:plc:resurrect123")
+
testCommunity, err := createFeedTestCommunity(db, ctx, "resurrect-community", "owner8.test")
+
if err != nil {
+
t.Fatalf("Failed to create test community: %v", err)
+
}
+
postURI := createTestPost(t, db, testCommunity, testUser.DID, "Resurrection Test", 0, time.Now())
+
+
rkey := generateTID()
+
commentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey)
+
+
t.Run("Recreate deleted comment with same rkey", func(t *testing.T) {
+
// Step 1: Create initial comment
+
createEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: "v1",
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey,
+
CID: "bafyoriginal",
+
Record: map[string]interface{}{
+
"$type": "social.coves.feed.comment",
+
"content": "Original comment content",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": postURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": postURI,
+
"cid": "bafypost",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, createEvent)
+
if err != nil {
+
t.Fatalf("Failed to create initial comment: %v", err)
+
}
+
+
// Verify comment exists
+
comment, err := commentRepo.GetByURI(ctx, commentURI)
+
if err != nil {
+
t.Fatalf("Comment not found after creation: %v", err)
+
}
+
if comment.Content != "Original comment content" {
+
t.Errorf("Expected content 'Original comment content', got '%s'", comment.Content)
+
}
+
if comment.DeletedAt != nil {
+
t.Errorf("Expected deleted_at to be nil, got %v", comment.DeletedAt)
+
}
+
+
// Step 2: Delete the comment
+
deleteEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: "v2",
+
Operation: "delete",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey,
+
},
+
}
+
+
err = consumer.HandleEvent(ctx, deleteEvent)
+
if err != nil {
+
t.Fatalf("Failed to delete comment: %v", err)
+
}
+
+
// Verify comment is soft-deleted
+
comment, err = commentRepo.GetByURI(ctx, commentURI)
+
if err != nil {
+
t.Fatalf("Comment not found after deletion: %v", err)
+
}
+
if comment.DeletedAt == nil {
+
t.Error("Expected deleted_at to be set, got nil")
+
}
+
+
// Step 3: Recreate comment with same rkey (resurrection)
+
// In atProto, this is a valid operation - user can reuse the rkey
+
recreateEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: "v3",
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey, // Same rkey!
+
CID: "bafyresurrected",
+
Record: map[string]interface{}{
+
"$type": "social.coves.feed.comment",
+
"content": "Resurrected comment with new content",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": postURI,
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": postURI,
+
"cid": "bafypost",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err = consumer.HandleEvent(ctx, recreateEvent)
+
if err != nil {
+
t.Fatalf("Failed to resurrect comment: %v", err)
+
}
+
+
// Step 4: Verify comment is resurrected with new content
+
comment, err = commentRepo.GetByURI(ctx, commentURI)
+
if err != nil {
+
t.Fatalf("Comment not found after resurrection: %v", err)
+
}
+
+
if comment.DeletedAt != nil {
+
t.Errorf("Expected deleted_at to be NULL after resurrection, got %v", comment.DeletedAt)
+
}
+
if comment.Content != "Resurrected comment with new content" {
+
t.Errorf("Expected resurrected content, got '%s'", comment.Content)
+
}
+
if comment.CID != "bafyresurrected" {
+
t.Errorf("Expected CID 'bafyresurrected', got '%s'", comment.CID)
+
}
+
+
// Verify parent count was restored (post should have comment_count = 1)
+
var postCommentCount int
+
err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", postURI).Scan(&postCommentCount)
+
if err != nil {
+
t.Fatalf("Failed to check post comment count: %v", err)
+
}
+
if postCommentCount != 1 {
+
t.Errorf("Expected post comment_count to be 1 after resurrection, got %d", postCommentCount)
+
}
+
})
+
+
t.Run("Recreate deleted comment with DIFFERENT parent", func(t *testing.T) {
+
// Create two posts
+
post1URI := createTestPost(t, db, testCommunity, testUser.DID, "Post 1", 0, time.Now())
+
post2URI := createTestPost(t, db, testCommunity, testUser.DID, "Post 2", 0, time.Now())
+
+
rkey2 := generateTID()
+
commentURI2 := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey2)
+
+
// Step 1: Create comment on Post 1
+
createEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: "v1",
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey2,
+
CID: "bafyv1",
+
Record: map[string]interface{}{
+
"$type": "social.coves.feed.comment",
+
"content": "Original on Post 1",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": post1URI,
+
"cid": "bafypost1",
+
},
+
"parent": map[string]interface{}{
+
"uri": post1URI,
+
"cid": "bafypost1",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, createEvent)
+
if err != nil {
+
t.Fatalf("Failed to create comment on Post 1: %v", err)
+
}
+
+
// Verify Post 1 has comment_count = 1
+
var post1Count int
+
err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post1URI).Scan(&post1Count)
+
if err != nil {
+
t.Fatalf("Failed to check post 1 count: %v", err)
+
}
+
if post1Count != 1 {
+
t.Errorf("Expected Post 1 comment_count = 1, got %d", post1Count)
+
}
+
+
// Step 2: Delete comment
+
deleteEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: "v2",
+
Operation: "delete",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey2,
+
},
+
}
+
+
err = consumer.HandleEvent(ctx, deleteEvent)
+
if err != nil {
+
t.Fatalf("Failed to delete comment: %v", err)
+
}
+
+
// Verify Post 1 count decremented to 0
+
err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post1URI).Scan(&post1Count)
+
if err != nil {
+
t.Fatalf("Failed to check post 1 count after delete: %v", err)
+
}
+
if post1Count != 0 {
+
t.Errorf("Expected Post 1 comment_count = 0 after delete, got %d", post1Count)
+
}
+
+
// Step 3: Recreate comment with same rkey but on Post 2 (different parent!)
+
recreateEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: "v3",
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey2, // Same rkey!
+
CID: "bafyv3",
+
Record: map[string]interface{}{
+
"$type": "social.coves.feed.comment",
+
"content": "New comment on Post 2",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": post2URI, // Different root!
+
"cid": "bafypost2",
+
},
+
"parent": map[string]interface{}{
+
"uri": post2URI, // Different parent!
+
"cid": "bafypost2",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err = consumer.HandleEvent(ctx, recreateEvent)
+
if err != nil {
+
t.Fatalf("Failed to resurrect comment on Post 2: %v", err)
+
}
+
+
// Step 4: Verify threading references updated correctly
+
comment, err := commentRepo.GetByURI(ctx, commentURI2)
+
if err != nil {
+
t.Fatalf("Failed to get resurrected comment: %v", err)
+
}
+
+
// THIS IS THE CRITICAL TEST: Threading refs must point to Post 2, not Post 1
+
if comment.ParentURI != post2URI {
+
t.Errorf("Expected parent URI to be %s (Post 2), got %s (STALE!)", post2URI, comment.ParentURI)
+
}
+
if comment.RootURI != post2URI {
+
t.Errorf("Expected root URI to be %s (Post 2), got %s (STALE!)", post2URI, comment.RootURI)
+
}
+
if comment.ParentCID != "bafypost2" {
+
t.Errorf("Expected parent CID 'bafypost2', got '%s'", comment.ParentCID)
+
}
+
+
// Verify counts are correct
+
var post2Count int
+
err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post2URI).Scan(&post2Count)
+
if err != nil {
+
t.Fatalf("Failed to check post 2 count: %v", err)
+
}
+
if post2Count != 1 {
+
t.Errorf("Expected Post 2 comment_count = 1, got %d", post2Count)
+
}
+
+
// Verify Post 1 count still 0 (not incremented by resurrection on Post 2)
+
err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", post1URI).Scan(&post1Count)
+
if err != nil {
+
t.Fatalf("Failed to check post 1 count after resurrection: %v", err)
+
}
+
if post1Count != 0 {
+
t.Errorf("Expected Post 1 comment_count = 0 (unchanged), got %d", post1Count)
+
}
+
})
+
}
+
+
// TestCommentConsumer_ThreadingImmutability tests that UPDATE events cannot change threading refs
+
func TestCommentConsumer_ThreadingImmutability(t *testing.T) {
+
db := setupTestDB(t)
+
defer func() {
+
if err := db.Close(); err != nil {
+
t.Logf("Failed to close database: %v", err)
+
}
+
}()
+
+
ctx := context.Background()
+
commentRepo := postgres.NewCommentRepository(db)
+
consumer := jetstream.NewCommentEventConsumer(commentRepo, db)
+
+
testUser := createTestUser(t, db, "immutable.test", "did:plc:immutable123")
+
testCommunity, err := createFeedTestCommunity(db, ctx, "immutable-community", "owner9.test")
+
if err != nil {
+
t.Fatalf("Failed to create test community: %v", err)
+
}
+
postURI1 := createTestPost(t, db, testCommunity, testUser.DID, "Post 1", 0, time.Now())
+
postURI2 := createTestPost(t, db, testCommunity, testUser.DID, "Post 2", 0, time.Now())
+
+
rkey := generateTID()
+
commentURI := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey)
+
+
t.Run("Reject UPDATE that changes parent URI", func(t *testing.T) {
+
// Create comment on Post 1
+
createEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: "v1",
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey,
+
CID: "bafycomment1",
+
Record: map[string]interface{}{
+
"$type": "social.coves.feed.comment",
+
"content": "Comment on Post 1",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": postURI1,
+
"cid": "bafypost1",
+
},
+
"parent": map[string]interface{}{
+
"uri": postURI1,
+
"cid": "bafypost1",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, createEvent)
+
if err != nil {
+
t.Fatalf("Failed to create comment: %v", err)
+
}
+
+
// Attempt to update comment to move it to Post 2 (should fail)
+
updateEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: "v2",
+
Operation: "update",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey,
+
CID: "bafycomment2",
+
Record: map[string]interface{}{
+
"$type": "social.coves.feed.comment",
+
"content": "Trying to hijack this comment to Post 2",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": postURI2, // Changed!
+
"cid": "bafypost2",
+
},
+
"parent": map[string]interface{}{
+
"uri": postURI2, // Changed!
+
"cid": "bafypost2",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err = consumer.HandleEvent(ctx, updateEvent)
+
if err == nil {
+
t.Error("Expected error when changing threading references, got nil")
+
}
+
if err != nil && !contains(err.Error(), "threading references cannot be changed") {
+
t.Errorf("Expected 'threading references cannot be changed' error, got: %v", err)
+
}
+
+
// Verify comment still points to Post 1
+
comment, err := commentRepo.GetByURI(ctx, commentURI)
+
if err != nil {
+
t.Fatalf("Failed to get comment: %v", err)
+
}
+
if comment.ParentURI != postURI1 {
+
t.Errorf("Expected parent URI to remain %s, got %s", postURI1, comment.ParentURI)
+
}
+
if comment.RootURI != postURI1 {
+
t.Errorf("Expected root URI to remain %s, got %s", postURI1, comment.RootURI)
+
}
+
// Content should NOT have been updated since the operation was rejected
+
if comment.Content != "Comment on Post 1" {
+
t.Errorf("Expected original content, got '%s'", comment.Content)
+
}
+
})
+
+
t.Run("Allow UPDATE that only changes content (threading unchanged)", func(t *testing.T) {
+
rkey2 := generateTID()
+
commentURI2 := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", testUser.DID, rkey2)
+
+
// Create comment
+
createEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: "v1",
+
Operation: "create",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey2,
+
CID: "bafycomment3",
+
Record: map[string]interface{}{
+
"$type": "social.coves.feed.comment",
+
"content": "Original content",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": postURI1,
+
"cid": "bafypost1",
+
},
+
"parent": map[string]interface{}{
+
"uri": postURI1,
+
"cid": "bafypost1",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err := consumer.HandleEvent(ctx, createEvent)
+
if err != nil {
+
t.Fatalf("Failed to create comment: %v", err)
+
}
+
+
// Update content only (threading unchanged - should succeed)
+
updateEvent := &jetstream.JetstreamEvent{
+
Did: testUser.DID,
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: "v2",
+
Operation: "update",
+
Collection: "social.coves.feed.comment",
+
RKey: rkey2,
+
CID: "bafycomment4",
+
Record: map[string]interface{}{
+
"$type": "social.coves.feed.comment",
+
"content": "Updated content",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": postURI1, // Same
+
"cid": "bafypost1",
+
},
+
"parent": map[string]interface{}{
+
"uri": postURI1, // Same
+
"cid": "bafypost1",
+
},
+
},
+
"createdAt": time.Now().Format(time.RFC3339),
+
},
+
},
+
}
+
+
err = consumer.HandleEvent(ctx, updateEvent)
+
if err != nil {
+
t.Fatalf("Expected update to succeed when threading unchanged, got error: %v", err)
+
}
+
+
// Verify content was updated
+
comment, err := commentRepo.GetByURI(ctx, commentURI2)
+
if err != nil {
+
t.Fatalf("Failed to get comment: %v", err)
+
}
+
if comment.Content != "Updated content" {
+
t.Errorf("Expected updated content, got '%s'", comment.Content)
+
}
+
// Threading should remain unchanged
+
if comment.ParentURI != postURI1 {
+
t.Errorf("Expected parent URI %s, got %s", postURI1, comment.ParentURI)
+
}
+
})
+
}