A community based topic aggregation platform built on atproto

Compare changes

Choose any two refs to compare.

Changed files
+917 -260
cmd
server
docs
internal
api
handlers
common
communityFeed
discover
timeline
routes
atproto
core
db
postgres
tests
+87 -27
internal/db/postgres/comment_repo.go
···
id, uri, cid, rkey, commenter_did,
root_uri, root_cid, parent_uri, parent_cid,
content, content_facets, embed, content_labels, langs,
-
created_at, indexed_at, deleted_at,
+
created_at, indexed_at, deleted_at, deletion_reason, deleted_by,
upvote_count, downvote_count, score, reply_count
FROM comments
WHERE uri = $1
···
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
-
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
)
···
// Delete soft-deletes a comment (sets deleted_at)
// Called by Jetstream consumer after comment is deleted from PDS
// Idempotent: Returns success if comment already deleted
+
// Deprecated: Use SoftDeleteWithReason for new code to preserve thread structure
func (r *postgresCommentRepo) Delete(ctx context.Context, uri string) error {
query := `
UPDATE comments
···
return nil
}
-
// ListByRoot retrieves all active comments in a thread (flat)
+
// SoftDeleteWithReason performs a soft delete that blanks content but preserves thread structure
+
// This allows deleted comments to appear as "[deleted]" placeholders in thread views
+
// Idempotent: Returns success if comment already deleted
+
// Validates that reason is a known deletion reason constant
+
func (r *postgresCommentRepo) SoftDeleteWithReason(ctx context.Context, uri, reason, deletedByDID string) error {
+
// Validate deletion reason
+
if reason != comments.DeletionReasonAuthor && reason != comments.DeletionReasonModerator {
+
return fmt.Errorf("invalid deletion reason: %s", reason)
+
}
+
+
_, err := r.SoftDeleteWithReasonTx(ctx, nil, uri, reason, deletedByDID)
+
return err
+
}
+
+
// SoftDeleteWithReasonTx performs a soft delete within an optional transaction
+
// If tx is nil, executes directly against the database
+
// Returns rows affected count for callers that need to check idempotency
+
// This method is used by both the repository and the Jetstream consumer
+
func (r *postgresCommentRepo) SoftDeleteWithReasonTx(ctx context.Context, tx *sql.Tx, uri, reason, deletedByDID string) (int64, error) {
+
query := `
+
UPDATE comments
+
SET
+
content = '',
+
content_facets = NULL,
+
embed = NULL,
+
content_labels = NULL,
+
deleted_at = NOW(),
+
deletion_reason = $2,
+
deleted_by = $3
+
WHERE uri = $1 AND deleted_at IS NULL
+
`
+
+
var result sql.Result
+
var err error
+
+
if tx != nil {
+
result, err = tx.ExecContext(ctx, query, uri, reason, deletedByDID)
+
} else {
+
result, err = r.db.ExecContext(ctx, query, uri, reason, deletedByDID)
+
}
+
+
if err != nil {
+
return 0, fmt.Errorf("failed to soft delete comment: %w", err)
+
}
+
+
rowsAffected, err := result.RowsAffected()
+
if err != nil {
+
return 0, fmt.Errorf("failed to check delete result: %w", err)
+
}
+
+
return rowsAffected, nil
+
}
+
+
// ListByRoot retrieves all comments in a thread (flat), including deleted ones
// Used for fetching entire comment threads on posts
+
// Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders)
func (r *postgresCommentRepo) ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*comments.Comment, error) {
query := `
SELECT
id, uri, cid, rkey, commenter_did,
root_uri, root_cid, parent_uri, parent_cid,
content, content_facets, embed, content_labels, langs,
-
created_at, indexed_at, deleted_at,
+
created_at, indexed_at, deleted_at, deletion_reason, deleted_by,
upvote_count, downvote_count, score, reply_count
FROM comments
-
WHERE root_uri = $1 AND deleted_at IS NULL
+
WHERE root_uri = $1
ORDER BY created_at ASC
LIMIT $2 OFFSET $3
`
···
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
-
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
)
if err != nil {
···
return result, nil
}
-
// ListByParent retrieves direct replies to a post or comment
+
// ListByParent retrieves direct replies to a post or comment, including deleted ones
// Used for building nested/threaded comment views
+
// Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders)
func (r *postgresCommentRepo) ListByParent(ctx context.Context, parentURI string, limit, offset int) ([]*comments.Comment, error) {
query := `
SELECT
id, uri, cid, rkey, commenter_did,
root_uri, root_cid, parent_uri, parent_cid,
content, content_facets, embed, content_labels, langs,
-
created_at, indexed_at, deleted_at,
+
created_at, indexed_at, deleted_at, deletion_reason, deleted_by,
upvote_count, downvote_count, score, reply_count
FROM comments
-
WHERE parent_uri = $1 AND deleted_at IS NULL
+
WHERE parent_uri = $1
ORDER BY created_at ASC
LIMIT $2 OFFSET $3
`
···
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
-
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
)
if err != nil {
···
}
// ListByCommenter retrieves all active comments by a specific user
-
// Future: Used for user comment history
+
// Used for user comment history - filters out deleted comments
func (r *postgresCommentRepo) ListByCommenter(ctx context.Context, commenterDID string, limit, offset int) ([]*comments.Comment, error) {
query := `
SELECT
id, uri, cid, rkey, commenter_did,
root_uri, root_cid, parent_uri, parent_cid,
content, content_facets, embed, content_labels, langs,
-
created_at, indexed_at, deleted_at,
+
created_at, indexed_at, deleted_at, deletion_reason, deleted_by,
upvote_count, downvote_count, score, reply_count
FROM comments
WHERE commenter_did = $1 AND deleted_at IS NULL
···
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
-
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
)
if err != nil {
···
c.id, c.uri, c.cid, c.rkey, c.commenter_did,
c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
c.content, c.content_facets, c.embed, c.content_labels, c.langs,
-
c.created_at, c.indexed_at, c.deleted_at,
+
c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
c.upvote_count, c.downvote_count, c.score, c.reply_count,
log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank,
COALESCE(u.handle, c.commenter_did) as author_handle
···
c.id, c.uri, c.cid, c.rkey, c.commenter_did,
c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
c.content, c.content_facets, c.embed, c.content_labels, c.langs,
-
c.created_at, c.indexed_at, c.deleted_at,
+
c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
c.upvote_count, c.downvote_count, c.score, c.reply_count,
NULL::numeric as hot_rank,
COALESCE(u.handle, c.commenter_did) as author_handle
···
// Build complete query with JOINs and filters
// LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events)
+
// Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders)
query := fmt.Sprintf(`
%s
LEFT JOIN users u ON c.commenter_did = u.did
-
WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
+
WHERE c.parent_uri = $1
%s
%s
ORDER BY %s
···
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
-
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
&hotRank, &authorHandle,
)
···
// GetByURIsBatch retrieves multiple comments by their AT-URIs in a single query
// Returns map[uri]*Comment for efficient lookups without N+1 queries
+
// Includes deleted comments to preserve thread structure
func (r *postgresCommentRepo) GetByURIsBatch(ctx context.Context, uris []string) (map[string]*comments.Comment, error) {
if len(uris) == 0 {
return make(map[string]*comments.Comment), nil
···
// LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events)
// COALESCE falls back to DID when handle is NULL (user not yet in users table)
+
// Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders)
query := `
SELECT
c.id, c.uri, c.cid, c.rkey, c.commenter_did,
c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
c.content, c.content_facets, c.embed, c.content_labels, c.langs,
-
c.created_at, c.indexed_at, c.deleted_at,
+
c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
c.upvote_count, c.downvote_count, c.score, c.reply_count,
COALESCE(u.handle, c.commenter_did) as author_handle
FROM comments c
LEFT JOIN users u ON c.commenter_did = u.did
-
WHERE c.uri = ANY($1) AND c.deleted_at IS NULL
+
WHERE c.uri = ANY($1)
`
rows, err := r.db.QueryContext(ctx, query, pq.Array(uris))
···
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
-
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
&authorHandle,
)
···
c.id, c.uri, c.cid, c.rkey, c.commenter_did,
c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
c.content, c.content_facets, c.embed, c.content_labels, c.langs,
-
c.created_at, c.indexed_at, c.deleted_at,
+
c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
c.upvote_count, c.downvote_count, c.score, c.reply_count,
log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank,
COALESCE(u.handle, c.commenter_did) as author_handle`
···
c.id, c.uri, c.cid, c.rkey, c.commenter_did,
c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
c.content, c.content_facets, c.embed, c.content_labels, c.langs,
-
c.created_at, c.indexed_at, c.deleted_at,
+
c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
c.upvote_count, c.downvote_count, c.score, c.reply_count,
NULL::numeric as hot_rank,
COALESCE(u.handle, c.commenter_did) as author_handle`
···
c.id, c.uri, c.cid, c.rkey, c.commenter_did,
c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
c.content, c.content_facets, c.embed, c.content_labels, c.langs,
-
c.created_at, c.indexed_at, c.deleted_at,
+
c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
c.upvote_count, c.downvote_count, c.score, c.reply_count,
NULL::numeric as hot_rank,
COALESCE(u.handle, c.commenter_did) as author_handle`
···
c.id, c.uri, c.cid, c.rkey, c.commenter_did,
c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
c.content, c.content_facets, c.embed, c.content_labels, c.langs,
-
c.created_at, c.indexed_at, c.deleted_at,
+
c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
c.upvote_count, c.downvote_count, c.score, c.reply_count,
log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank,
COALESCE(u.handle, c.commenter_did) as author_handle`
···
// Use window function to limit results per parent
// This is more efficient than LIMIT in a subquery per parent
// LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events)
+
// Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders)
query := fmt.Sprintf(`
WITH ranked_comments AS (
SELECT
···
) as rn
FROM comments c
LEFT JOIN users u ON c.commenter_did = u.did
-
WHERE c.parent_uri = ANY($1) AND c.deleted_at IS NULL
+
WHERE c.parent_uri = ANY($1)
)
SELECT
id, uri, cid, rkey, commenter_did,
root_uri, root_cid, parent_uri, parent_cid,
content, content_facets, embed, content_labels, langs,
-
created_at, indexed_at, deleted_at,
+
created_at, indexed_at, deleted_at, deletion_reason, deleted_by,
upvote_count, downvote_count, score, reply_count,
hot_rank, author_handle
FROM ranked_comments
···
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
-
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
&hotRank, &authorHandle,
)
+44 -4
internal/core/comments/comment_service_test.go
···
"Coves/internal/core/posts"
"Coves/internal/core/users"
"context"
+
"database/sql"
"errors"
"testing"
"time"
···
return nil
}
+
func (m *mockCommentRepo) SoftDeleteWithReason(ctx context.Context, uri, reason, deletedByDID string) error {
+
// Validate deletion reason
+
if reason != DeletionReasonAuthor && reason != DeletionReasonModerator {
+
return errors.New("invalid deletion reason: " + reason)
+
}
+
_, err := m.SoftDeleteWithReasonTx(ctx, nil, uri, reason, deletedByDID)
+
return err
+
}
+
+
// SoftDeleteWithReasonTx implements RepositoryTx interface for transactional deletes
+
func (m *mockCommentRepo) SoftDeleteWithReasonTx(ctx context.Context, tx *sql.Tx, uri, reason, deletedByDID string) (int64, error) {
+
if c, ok := m.comments[uri]; ok {
+
if c.DeletedAt != nil {
+
// Already deleted - idempotent
+
return 0, nil
+
}
+
now := time.Now()
+
c.DeletedAt = &now
+
c.DeletionReason = &reason
+
c.DeletedBy = &deletedByDID
+
c.Content = ""
+
return 1, nil
+
}
+
return 0, nil
+
}
+
func (m *mockCommentRepo) ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*Comment, error) {
return nil, nil
}
···
assert.Len(t, result, 0)
}
-
func TestCommentService_buildThreadViews_SkipsDeletedComments(t *testing.T) {
+
func TestCommentService_buildThreadViews_IncludesDeletedCommentsAsPlaceholders(t *testing.T) {
// Setup
commentRepo := newMockCommentRepo()
userRepo := newMockUserRepo()
···
postURI := "at://did:plc:post123/app.bsky.feed.post/test"
deletedAt := time.Now()
+
deletionReason := DeletionReasonAuthor
// Create a deleted comment
deletedComment := createTestComment("at://did:plc:commenter123/comment/1", "did:plc:commenter123", "commenter.test", postURI, postURI, 0)
deletedComment.DeletedAt = &deletedAt
+
deletedComment.DeletionReason = &deletionReason
+
deletedComment.Content = "" // Content is blanked on deletion
// Create a normal comment
normalComment := createTestComment("at://did:plc:commenter123/comment/2", "did:plc:commenter123", "commenter.test", postURI, postURI, 0)
···
// Execute
result := service.buildThreadViews(context.Background(), []*Comment{deletedComment, normalComment}, 10, "hot", nil)
-
// Verify - should only include non-deleted comment
-
assert.Len(t, result, 1)
-
assert.Equal(t, normalComment.URI, result[0].Comment.URI)
+
// Verify - both comments should be included to preserve thread structure
+
assert.Len(t, result, 2)
+
+
// First comment should be the deleted one with placeholder info
+
assert.Equal(t, deletedComment.URI, result[0].Comment.URI)
+
assert.True(t, result[0].Comment.IsDeleted)
+
assert.Equal(t, DeletionReasonAuthor, *result[0].Comment.DeletionReason)
+
assert.Empty(t, result[0].Comment.Content)
+
+
// Second comment should be the normal one
+
assert.Equal(t, normalComment.URI, result[1].Comment.URI)
+
assert.False(t, result[1].Comment.IsDeleted)
+
assert.Nil(t, result[1].Comment.DeletionReason)
}
func TestCommentService_buildThreadViews_WithNestedReplies(t *testing.T) {
-124
docs/PRD_BACKLOG.md
···
## ๐Ÿ”ต P3: Technical Debt
-
### Implement PutRecord in PDS Client
-
**Added:** 2025-12-04 | **Effort:** 2-3 hours | **Priority:** Technical Debt
-
**Status:** ๐Ÿ“‹ TODO
-
-
**Problem:**
-
The PDS client (`internal/atproto/pds/client.go`) only has `CreateRecord` but lacks `PutRecord`. This means updates use `CreateRecord` with an existing rkey, which:
-
1. Loses optimistic locking (no CID swap check)
-
2. Is semantically incorrect (creates vs updates)
-
3. Could cause race conditions on concurrent updates
-
-
**atProto Best Practice:**
-
- `com.atproto.repo.putRecord` should be used for updates
-
- Accepts `swapRecord` (expected CID) for optimistic locking
-
- Returns conflict error if CID doesn't match (concurrent modification detected)
-
-
**Solution:**
-
Add `PutRecord` method to the PDS client interface:
-
-
```go
-
// Client interface addition
-
type Client interface {
-
// ... existing methods ...
-
-
// PutRecord creates or updates a record with optional optimistic locking.
-
// If swapRecord is provided, the operation fails if the current CID doesn't match.
-
PutRecord(ctx context.Context, collection string, rkey string, record any, swapRecord string) (uri string, cid string, err error)
-
}
-
-
// Implementation
-
func (c *client) PutRecord(ctx context.Context, collection string, rkey string, record any, swapRecord string) (string, string, error) {
-
payload := map[string]any{
-
"repo": c.did,
-
"collection": collection,
-
"rkey": rkey,
-
"record": record,
-
}
-
-
// Optional: optimistic locking via CID swap check
-
if swapRecord != "" {
-
payload["swapRecord"] = swapRecord
-
}
-
-
var result struct {
-
URI string `json:"uri"`
-
CID string `json:"cid"`
-
}
-
-
err := c.apiClient.Post(ctx, syntax.NSID("com.atproto.repo.putRecord"), payload, &result)
-
if err != nil {
-
return "", "", wrapAPIError(err, "putRecord")
-
}
-
-
return result.URI, result.CID, nil
-
}
-
```
-
-
**Error Handling:**
-
Add new error type for conflict detection:
-
```go
-
var ErrConflict = errors.New("record was modified by another operation")
-
```
-
-
Map HTTP 409 in `wrapAPIError`:
-
```go
-
case 409:
-
return fmt.Errorf("%s: %w: %s", operation, ErrConflict, apiErr.Message)
-
```
-
-
**Files to Modify:**
-
- `internal/atproto/pds/client.go` - Add `PutRecord` method and interface
-
- `internal/atproto/pds/errors.go` - Add `ErrConflict` error type
-
-
**Testing:**
-
- Unit test: Verify payload includes `swapRecord` when provided
-
- Integration test: Concurrent updates detect conflict
-
- Integration test: Update without `swapRecord` still works (backwards compatible)
-
-
**Blocked By:** Nothing
-
**Blocks:** "Migrate UpdateComment to use PutRecord"
-
-
---
-
-
### Migrate UpdateComment to Use PutRecord
-
**Added:** 2025-12-04 | **Effort:** 1 hour | **Priority:** Technical Debt
-
**Status:** ๐Ÿ“‹ TODO (Blocked)
-
**Blocked By:** "Implement PutRecord in PDS Client"
-
-
**Problem:**
-
`UpdateComment` in `internal/core/comments/comment_service.go` uses `CreateRecord` for updates instead of `PutRecord`. This lacks optimistic locking and is semantically incorrect.
-
-
**Current Code (lines 687-690):**
-
```go
-
// TODO: Use PutRecord instead of CreateRecord for proper update semantics with optimistic locking.
-
// PutRecord should accept the existing CID (existingRecord.CID) to ensure concurrent updates are detected.
-
// However, PutRecord is not yet implemented in internal/atproto/pds/client.go.
-
uri, cid, err := pdsClient.CreateRecord(ctx, commentCollection, rkey, updatedRecord)
-
```
-
-
**Solution:**
-
Once `PutRecord` is implemented in the PDS client, update to:
-
```go
-
// Use PutRecord with optimistic locking via existing CID
-
uri, cid, err := pdsClient.PutRecord(ctx, commentCollection, rkey, updatedRecord, existingRecord.CID)
-
if err != nil {
-
if errors.Is(err, pds.ErrConflict) {
-
// Record was modified by another operation - return appropriate error
-
return nil, fmt.Errorf("comment was modified, please refresh and try again: %w", err)
-
}
-
// ... existing error handling
-
}
-
```
-
-
**Files to Modify:**
-
- `internal/core/comments/comment_service.go` - UpdateComment method
-
- `internal/core/comments/errors.go` - Add `ErrConcurrentModification` if needed
-
-
**Testing:**
-
- Unit test: Verify `PutRecord` is called with correct CID
-
- Integration test: Simulate concurrent update, verify conflict handling
-
-
**Impact:** Proper optimistic locking prevents lost updates from race conditions
-
-
---
-
### Consolidate Environment Variable Validation
**Added:** 2025-10-11 | **Effort:** 2-3 hours
+33
internal/atproto/pds/client.go
···
// GetRecord retrieves a single record by collection and rkey.
GetRecord(ctx context.Context, collection string, rkey string) (*RecordResponse, error)
+
// PutRecord creates or updates a record with optional optimistic locking.
+
// If swapRecord CID is provided, the operation fails if the current CID doesn't match.
+
PutRecord(ctx context.Context, collection string, rkey string, record any, swapRecord string) (uri string, cid string, err error)
+
// DID returns the authenticated user's DID.
DID() string
···
return fmt.Errorf("%s: %w: %s", operation, ErrForbidden, apiErr.Message)
case 404:
return fmt.Errorf("%s: %w: %s", operation, ErrNotFound, apiErr.Message)
+
case 409:
+
return fmt.Errorf("%s: %w: %s", operation, ErrConflict, apiErr.Message)
}
}
···
Value: result.Value,
}, nil
}
+
+
// PutRecord creates or updates a record with optional optimistic locking.
+
func (c *client) PutRecord(ctx context.Context, collection string, rkey string, record any, swapRecord string) (string, string, error) {
+
payload := map[string]any{
+
"repo": c.did,
+
"collection": collection,
+
"rkey": rkey,
+
"record": record,
+
}
+
+
// Optional: optimistic locking via CID swap check
+
if swapRecord != "" {
+
payload["swapRecord"] = swapRecord
+
}
+
+
var result struct {
+
URI string `json:"uri"`
+
CID string `json:"cid"`
+
}
+
+
err := c.apiClient.Post(ctx, syntax.NSID("com.atproto.repo.putRecord"), payload, &result)
+
if err != nil {
+
return "", "", wrapAPIError(err, "putRecord")
+
}
+
+
return result.URI, result.CID, nil
+
}
+231
internal/atproto/pds/client_test.go
···
operation: "createRecord",
wantTyped: ErrBadRequest,
},
+
{
+
name: "409 maps to ErrConflict",
+
err: &atclient.APIError{StatusCode: 409, Name: "InvalidSwap", Message: "Record CID mismatch"},
+
operation: "putRecord",
+
wantTyped: ErrConflict,
+
},
{
name: "500 wraps without typed error",
err: &atclient.APIError{StatusCode: 500, Name: "InternalError", Message: "Server error"},
···
})
+
+
// TestClient_PutRecord tests the PutRecord method with a mock server.
+
func TestClient_PutRecord(t *testing.T) {
+
tests := []struct {
+
name string
+
collection string
+
rkey string
+
record map[string]any
+
swapRecord string
+
serverResponse map[string]any
+
serverStatus int
+
wantURI string
+
wantCID string
+
wantErr bool
+
}{
+
{
+
name: "successful put with swapRecord",
+
collection: "social.coves.comment",
+
rkey: "3kjzl5kcb2s2v",
+
record: map[string]any{
+
"$type": "social.coves.comment",
+
"content": "Updated comment content",
+
},
+
swapRecord: "bafyreigbtj4x7ip5legnfznufuopl4sg4knzc2cof6duas4b3q2fy6swua",
+
serverResponse: map[string]any{
+
"uri": "at://did:plc:test/social.coves.comment/3kjzl5kcb2s2v",
+
"cid": "bafyreihd4q3yqcfvnv5zlp6n4fqzh6z4p4m3mwc7vvr6k2j6y6v2a3b4c5",
+
},
+
serverStatus: http.StatusOK,
+
wantURI: "at://did:plc:test/social.coves.comment/3kjzl5kcb2s2v",
+
wantCID: "bafyreihd4q3yqcfvnv5zlp6n4fqzh6z4p4m3mwc7vvr6k2j6y6v2a3b4c5",
+
wantErr: false,
+
},
+
{
+
name: "successful put without swapRecord",
+
collection: "social.coves.comment",
+
rkey: "3kjzl5kcb2s2v",
+
record: map[string]any{
+
"$type": "social.coves.comment",
+
"content": "Updated comment",
+
},
+
swapRecord: "",
+
serverResponse: map[string]any{
+
"uri": "at://did:plc:test/social.coves.comment/3kjzl5kcb2s2v",
+
"cid": "bafyreihd4q3yqcfvnv5zlp6n4fqzh6z4p4m3mwc7vvr6k2j6y6v2a3b4c5",
+
},
+
serverStatus: http.StatusOK,
+
wantURI: "at://did:plc:test/social.coves.comment/3kjzl5kcb2s2v",
+
wantCID: "bafyreihd4q3yqcfvnv5zlp6n4fqzh6z4p4m3mwc7vvr6k2j6y6v2a3b4c5",
+
wantErr: false,
+
},
+
{
+
name: "conflict error (409)",
+
collection: "social.coves.comment",
+
rkey: "test",
+
record: map[string]any{"$type": "social.coves.comment"},
+
swapRecord: "bafyreigbtj4x7ip5legnfznufuopl4sg4knzc2cof6duas4b3q2fy6swua",
+
serverResponse: map[string]any{
+
"error": "InvalidSwap",
+
"message": "Record CID does not match",
+
},
+
serverStatus: http.StatusConflict,
+
wantErr: true,
+
},
+
{
+
name: "server error",
+
collection: "social.coves.comment",
+
rkey: "test",
+
record: map[string]any{"$type": "social.coves.comment"},
+
swapRecord: "",
+
serverResponse: map[string]any{
+
"error": "InvalidRequest",
+
"message": "Invalid record",
+
},
+
serverStatus: http.StatusBadRequest,
+
wantErr: true,
+
},
+
}
+
+
for _, tt := range tests {
+
t.Run(tt.name, func(t *testing.T) {
+
// Create mock server
+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+
// Verify method
+
if r.Method != http.MethodPost {
+
t.Errorf("expected POST request, got %s", r.Method)
+
}
+
+
// Verify path
+
expectedPath := "/xrpc/com.atproto.repo.putRecord"
+
if r.URL.Path != expectedPath {
+
t.Errorf("path = %q, want %q", r.URL.Path, expectedPath)
+
}
+
+
// Verify request body
+
var payload map[string]any
+
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
+
t.Fatalf("failed to decode request body: %v", err)
+
}
+
+
// Check required fields
+
if payload["collection"] != tt.collection {
+
t.Errorf("collection = %v, want %v", payload["collection"], tt.collection)
+
}
+
if payload["rkey"] != tt.rkey {
+
t.Errorf("rkey = %v, want %v", payload["rkey"], tt.rkey)
+
}
+
+
// Check swapRecord inclusion
+
if tt.swapRecord != "" {
+
if payload["swapRecord"] != tt.swapRecord {
+
t.Errorf("swapRecord = %v, want %v", payload["swapRecord"], tt.swapRecord)
+
}
+
} else {
+
if _, exists := payload["swapRecord"]; exists {
+
t.Error("swapRecord should not be included when empty")
+
}
+
}
+
+
// Send response
+
w.Header().Set("Content-Type", "application/json")
+
w.WriteHeader(tt.serverStatus)
+
json.NewEncoder(w).Encode(tt.serverResponse)
+
}))
+
defer server.Close()
+
+
// Create client
+
apiClient := atclient.NewAPIClient(server.URL)
+
apiClient.Auth = &bearerAuth{token: "test-token"}
+
+
c := &client{
+
apiClient: apiClient,
+
did: "did:plc:test",
+
host: server.URL,
+
}
+
+
// Execute PutRecord
+
ctx := context.Background()
+
uri, cid, err := c.PutRecord(ctx, tt.collection, tt.rkey, tt.record, tt.swapRecord)
+
+
if tt.wantErr {
+
if err == nil {
+
t.Fatal("expected error, got nil")
+
}
+
return
+
}
+
+
if err != nil {
+
t.Fatalf("unexpected error: %v", err)
+
}
+
+
if uri != tt.wantURI {
+
t.Errorf("uri = %q, want %q", uri, tt.wantURI)
+
}
+
+
if cid != tt.wantCID {
+
t.Errorf("cid = %q, want %q", cid, tt.wantCID)
+
}
+
})
+
}
+
}
+
+
// TestClient_TypedErrors_PutRecord tests that PutRecord returns typed errors.
+
func TestClient_TypedErrors_PutRecord(t *testing.T) {
+
tests := []struct {
+
name string
+
serverStatus int
+
wantErr error
+
}{
+
{
+
name: "401 returns ErrUnauthorized",
+
serverStatus: http.StatusUnauthorized,
+
wantErr: ErrUnauthorized,
+
},
+
{
+
name: "403 returns ErrForbidden",
+
serverStatus: http.StatusForbidden,
+
wantErr: ErrForbidden,
+
},
+
{
+
name: "409 returns ErrConflict",
+
serverStatus: http.StatusConflict,
+
wantErr: ErrConflict,
+
},
+
{
+
name: "400 returns ErrBadRequest",
+
serverStatus: http.StatusBadRequest,
+
wantErr: ErrBadRequest,
+
},
+
}
+
+
for _, tt := range tests {
+
t.Run(tt.name, func(t *testing.T) {
+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+
w.Header().Set("Content-Type", "application/json")
+
w.WriteHeader(tt.serverStatus)
+
json.NewEncoder(w).Encode(map[string]any{
+
"error": "TestError",
+
"message": "Test error message",
+
})
+
}))
+
defer server.Close()
+
+
apiClient := atclient.NewAPIClient(server.URL)
+
apiClient.Auth = &bearerAuth{token: "test-token"}
+
+
c := &client{
+
apiClient: apiClient,
+
did: "did:plc:test",
+
host: server.URL,
+
}
+
+
ctx := context.Background()
+
_, _, err := c.PutRecord(ctx, "test.collection", "rkey", map[string]any{}, "")
+
+
if err == nil {
+
t.Fatal("expected error, got nil")
+
}
+
+
if !errors.Is(err, tt.wantErr) {
+
t.Errorf("expected errors.Is(%v, %v) to be true", err, tt.wantErr)
+
}
+
})
+
}
+
}
+3
internal/atproto/pds/errors.go
···
// ErrBadRequest indicates the request was malformed or invalid (HTTP 400).
ErrBadRequest = errors.New("bad request")
+
+
// ErrConflict indicates the record was modified by another operation (HTTP 409).
+
ErrConflict = errors.New("record was modified by another operation")
)
// IsAuthError returns true if the error is an authentication/authorization error.
+17
internal/core/comments/comment_write_service_test.go
···
createError error // Error to return on CreateRecord
getError error // Error to return on GetRecord
deleteError error // Error to return on DeleteRecord
+
putError error // Error to return on PutRecord
did string // DID of the authenticated user
hostURL string // PDS host URL
}
···
return &pds.ListRecordsResponse{}, nil
}
+
func (m *mockPDSClient) PutRecord(ctx context.Context, collection, rkey string, record any, swapRecord string) (string, string, error) {
+
if m.putError != nil {
+
return "", "", m.putError
+
}
+
+
// Store record (same logic as CreateRecord)
+
if m.records[collection] == nil {
+
m.records[collection] = make(map[string]interface{})
+
}
+
m.records[collection][rkey] = record
+
+
uri := fmt.Sprintf("at://%s/%s/%s", m.did, collection, rkey)
+
cid := fmt.Sprintf("bafytest%d", time.Now().UnixNano())
+
return uri, cid, nil
+
}
+
// mockPDSClientFactory creates mock PDS clients for testing
type mockPDSClientFactory struct {
client *mockPDSClient
+5 -1
internal/core/comments/errors.go
···
// ErrCommentAlreadyExists indicates a comment with this URI already exists
ErrCommentAlreadyExists = errors.New("comment already exists")
+
+
// ErrConcurrentModification indicates the comment was modified since it was loaded
+
ErrConcurrentModification = errors.New("comment was modified by another operation")
)
// IsNotFound checks if an error is a "not found" error
···
// IsConflict checks if an error is a conflict/already exists error
func IsConflict(err error) bool {
-
return errors.Is(err, ErrCommentAlreadyExists)
+
return errors.Is(err, ErrCommentAlreadyExists) ||
+
errors.Is(err, ErrConcurrentModification)
}
// IsValidationError checks if an error is a validation error
+169
tests/integration/comment_write_test.go
···
func parseTestDID(did string) (syntax.DID, error) {
return syntax.ParseDID(did)
}
+
+
// TestCommentWrite_ConcurrentModificationDetection tests that PutRecord's swapRecord
+
// CID validation correctly detects concurrent modifications.
+
// This verifies the optimistic locking mechanism that prevents lost updates.
+
func TestCommentWrite_ConcurrentModificationDetection(t *testing.T) {
+
if testing.Short() {
+
t.Skip("Skipping E2E test in short mode")
+
}
+
+
db := setupTestDB(t)
+
defer func() { _ = db.Close() }()
+
+
ctx := context.Background()
+
pdsURL := getTestPDSURL()
+
+
// Setup repositories and service
+
commentRepo := postgres.NewCommentRepository(db)
+
+
commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) {
+
if session.AccessToken == "" {
+
return nil, fmt.Errorf("session has no access token")
+
}
+
if session.HostURL == "" {
+
return nil, fmt.Errorf("session has no host URL")
+
}
+
return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken)
+
}
+
+
commentService := comments.NewCommentServiceWithPDSFactory(
+
commentRepo,
+
nil,
+
nil,
+
nil,
+
nil,
+
commentPDSFactory,
+
)
+
+
// Create test user
+
testUserHandle := fmt.Sprintf("concurrency-%d.local.coves.dev", time.Now().Unix())
+
testUserEmail := fmt.Sprintf("concurrency-%d@test.local", time.Now().Unix())
+
testUserPassword := "test-password-123"
+
+
pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
+
if err != nil {
+
t.Skipf("PDS not available: %v", err)
+
}
+
+
// Setup OAuth
+
mockStore := NewMockOAuthStore()
+
mockStore.AddSessionWithPDS(userDID, "session-"+userDID, pdsAccessToken, pdsURL)
+
+
parsedDID, _ := parseTestDID(userDID)
+
session, _ := mockStore.GetSession(ctx, parsedDID, "session-"+userDID)
+
+
// Step 1: Create a comment
+
t.Logf("\n๐Ÿ“ Step 1: Creating initial comment...")
+
createReq := comments.CreateCommentRequest{
+
Reply: comments.ReplyRef{
+
Root: comments.StrongRef{
+
URI: "at://did:plc:test/social.coves.community.post/test123",
+
CID: "bafypost",
+
},
+
Parent: comments.StrongRef{
+
URI: "at://did:plc:test/social.coves.community.post/test123",
+
CID: "bafypost",
+
},
+
},
+
Content: "Original content for concurrency test",
+
Langs: []string{"en"},
+
}
+
+
createResp, err := commentService.CreateComment(ctx, session, createReq)
+
if err != nil {
+
t.Fatalf("Failed to create comment: %v", err)
+
}
+
t.Logf("โœ… Comment created: URI=%s, CID=%s", createResp.URI, createResp.CID)
+
originalCID := createResp.CID
+
+
// Step 2: Update the comment (this changes the CID)
+
t.Logf("\n๐Ÿ“ Step 2: Updating comment (this changes CID)...")
+
updateReq := comments.UpdateCommentRequest{
+
URI: createResp.URI,
+
Content: "Updated content - CID has changed",
+
}
+
+
updateResp, err := commentService.UpdateComment(ctx, session, updateReq)
+
if err != nil {
+
t.Fatalf("Failed to update comment: %v", err)
+
}
+
t.Logf("โœ… Comment updated: New CID=%s", updateResp.CID)
+
newCID := updateResp.CID
+
+
// Verify CIDs are different
+
if originalCID == newCID {
+
t.Fatalf("CIDs should be different after update: original=%s, new=%s", originalCID, newCID)
+
}
+
+
// Step 3: Simulate concurrent modification detection using direct PDS client
+
// Create a PDS client and attempt to update with the stale (original) CID
+
t.Logf("\n๐Ÿ” Step 3: Testing concurrent modification detection with stale CID...")
+
+
pdsClient, err := pds.NewFromAccessToken(pdsURL, userDID, pdsAccessToken)
+
if err != nil {
+
t.Fatalf("Failed to create PDS client: %v", err)
+
}
+
+
rkey := utils.ExtractRKeyFromURI(createResp.URI)
+
+
// Try to update with the ORIGINAL (now stale) CID - this should fail with 409
+
staleRecord := map[string]interface{}{
+
"$type": "social.coves.community.comment",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": "at://did:plc:test/social.coves.community.post/test123",
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": "at://did:plc:test/social.coves.community.post/test123",
+
"cid": "bafypost",
+
},
+
},
+
"content": "This update should fail - using stale CID",
+
"createdAt": time.Now().UTC().Format(time.RFC3339),
+
}
+
+
_, _, err = pdsClient.PutRecord(ctx, "social.coves.community.comment", rkey, staleRecord, originalCID)
+
+
// Verify we get ErrConflict
+
if err == nil {
+
t.Fatal("Expected ErrConflict when updating with stale CID, got nil")
+
}
+
+
if !errors.Is(err, pds.ErrConflict) {
+
t.Errorf("Expected pds.ErrConflict, got: %v", err)
+
}
+
+
t.Logf("โœ… Correctly detected concurrent modification!")
+
t.Logf(" Error: %v", err)
+
+
// Step 4: Verify that updating with the correct CID succeeds
+
t.Logf("\n๐Ÿ“ Step 4: Verifying update with correct CID succeeds...")
+
correctRecord := map[string]interface{}{
+
"$type": "social.coves.community.comment",
+
"reply": map[string]interface{}{
+
"root": map[string]interface{}{
+
"uri": "at://did:plc:test/social.coves.community.post/test123",
+
"cid": "bafypost",
+
},
+
"parent": map[string]interface{}{
+
"uri": "at://did:plc:test/social.coves.community.post/test123",
+
"cid": "bafypost",
+
},
+
},
+
"content": "This update should succeed - using correct CID",
+
"createdAt": time.Now().UTC().Format(time.RFC3339),
+
}
+
+
_, finalCID, err := pdsClient.PutRecord(ctx, "social.coves.community.comment", rkey, correctRecord, newCID)
+
if err != nil {
+
t.Fatalf("Update with correct CID should succeed, got: %v", err)
+
}
+
+
t.Logf("โœ… Update with correct CID succeeded: New CID=%s", finalCID)
+
+
t.Logf("\nโœ… CONCURRENT MODIFICATION DETECTION TEST COMPLETE:")
+
t.Logf(" โœ“ PutRecord with stale CID correctly returns ErrConflict")
+
t.Logf(" โœ“ PutRecord with correct CID succeeds")
+
t.Logf(" โœ“ Optimistic locking prevents lost updates")
+
}
+2 -2
cmd/server/main.go
···
routes.RegisterTimelineRoutes(r, timelineService, voteService, authMiddleware)
log.Println("Timeline XRPC endpoints registered (requires authentication, includes viewer vote state)")
-
routes.RegisterDiscoverRoutes(r, discoverService)
-
log.Println("Discover XRPC endpoints registered (public, no auth required)")
+
routes.RegisterDiscoverRoutes(r, discoverService, voteService, authMiddleware)
+
log.Println("Discover XRPC endpoints registered (public with optional auth for viewer vote state)")
routes.RegisterAggregatorRoutes(r, aggregatorService, userService, identityResolver)
log.Println("Aggregator XRPC endpoints registered (query endpoints public, registration endpoint public)")
+73
internal/api/handlers/common/viewer_state.go
···
+
package common
+
+
import (
+
"Coves/internal/api/middleware"
+
"Coves/internal/core/posts"
+
"Coves/internal/core/votes"
+
"context"
+
"log"
+
"net/http"
+
)
+
+
// FeedPostProvider is implemented by any feed post wrapper that contains a PostView.
+
// This allows the helper to work with different feed post types (discover, timeline, communityFeed).
+
type FeedPostProvider interface {
+
GetPost() *posts.PostView
+
}
+
+
// PopulateViewerVoteState enriches feed posts with the authenticated user's vote state.
+
// This is a no-op if voteService is nil or the request is unauthenticated.
+
//
+
// Parameters:
+
// - ctx: Request context for PDS calls
+
// - r: HTTP request (used to extract OAuth session)
+
// - voteService: Vote service for cache lookup (may be nil)
+
// - feedPosts: Posts to enrich with viewer state (must implement FeedPostProvider)
+
//
+
// The function logs but does not fail on errors - viewer state is optional enrichment.
+
func PopulateViewerVoteState[T FeedPostProvider](
+
ctx context.Context,
+
r *http.Request,
+
voteService votes.Service,
+
feedPosts []T,
+
) {
+
if voteService == nil {
+
return
+
}
+
+
session := middleware.GetOAuthSession(r)
+
if session == nil {
+
return
+
}
+
+
userDID := middleware.GetUserDID(r)
+
+
// Ensure vote cache is populated from PDS
+
if err := voteService.EnsureCachePopulated(ctx, session); err != nil {
+
log.Printf("Warning: failed to populate vote cache: %v", err)
+
return
+
}
+
+
// Collect post URIs to batch lookup
+
postURIs := make([]string, 0, len(feedPosts))
+
for _, feedPost := range feedPosts {
+
if post := feedPost.GetPost(); post != nil {
+
postURIs = append(postURIs, post.URI)
+
}
+
}
+
+
// Get viewer votes for all posts
+
viewerVotes := voteService.GetViewerVotesForSubjects(userDID, postURIs)
+
+
// Populate viewer state on each post
+
for _, feedPost := range feedPosts {
+
if post := feedPost.GetPost(); post != nil {
+
if vote, exists := viewerVotes[post.URI]; exists {
+
post.Viewer = &posts.ViewerState{
+
Vote: &vote.Direction,
+
VoteURI: &vote.URI,
+
}
+
}
+
}
+
}
+
}
+3 -36
internal/api/handlers/communityFeed/get_community.go
···
package communityFeed
import (
-
"Coves/internal/api/middleware"
+
"Coves/internal/api/handlers/common"
"Coves/internal/core/communityFeeds"
"Coves/internal/core/posts"
"Coves/internal/core/votes"
···
return
}
-
// Populate viewer vote state if authenticated and vote service available
-
if h.voteService != nil {
-
session := middleware.GetOAuthSession(r)
-
if session != nil {
-
userDID := middleware.GetUserDID(r)
-
// Ensure vote cache is populated from PDS
-
if err := h.voteService.EnsureCachePopulated(r.Context(), session); err != nil {
-
// Log but don't fail - viewer state is optional
-
log.Printf("Warning: failed to populate vote cache: %v", err)
-
} else {
-
// Collect post URIs to batch lookup
-
postURIs := make([]string, 0, len(response.Feed))
-
for _, feedPost := range response.Feed {
-
if feedPost.Post != nil {
-
postURIs = append(postURIs, feedPost.Post.URI)
-
}
-
}
-
-
// Get viewer votes for all posts
-
viewerVotes := h.voteService.GetViewerVotesForSubjects(userDID, postURIs)
-
-
// Populate viewer state on each post
-
for _, feedPost := range response.Feed {
-
if feedPost.Post != nil {
-
if vote, exists := viewerVotes[feedPost.Post.URI]; exists {
-
feedPost.Post.Viewer = &posts.ViewerState{
-
Vote: &vote.Direction,
-
VoteURI: &vote.URI,
-
}
-
}
-
}
-
}
-
}
-
}
-
}
+
// Populate viewer vote state if authenticated
+
common.PopulateViewerVoteState(r.Context(), r, h.voteService, response.Feed)
// Transform blob refs to URLs for all posts
for _, feedPost := range response.Feed {
+11 -4
internal/api/handlers/discover/get_discover.go
···
package discover
import (
+
"Coves/internal/api/handlers/common"
"Coves/internal/core/discover"
"Coves/internal/core/posts"
+
"Coves/internal/core/votes"
"encoding/json"
"log"
"net/http"
···
// GetDiscoverHandler handles discover feed retrieval
type GetDiscoverHandler struct {
-
service discover.Service
+
service discover.Service
+
voteService votes.Service
}
// NewGetDiscoverHandler creates a new discover handler
-
func NewGetDiscoverHandler(service discover.Service) *GetDiscoverHandler {
+
func NewGetDiscoverHandler(service discover.Service, voteService votes.Service) *GetDiscoverHandler {
return &GetDiscoverHandler{
-
service: service,
+
service: service,
+
voteService: voteService,
}
}
// HandleGetDiscover retrieves posts from all communities (public feed)
// GET /xrpc/social.coves.feed.getDiscover?sort=hot&limit=15&cursor=...
-
// Public endpoint - no authentication required
+
// Public endpoint with optional auth - if authenticated, includes viewer vote state
func (h *GetDiscoverHandler) HandleGetDiscover(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
···
return
}
+
// Populate viewer vote state if authenticated
+
common.PopulateViewerVoteState(r.Context(), r, h.voteService, response.Feed)
+
// Transform blob refs to URLs for all posts
for _, feedPost := range response.Feed {
if feedPost.Post != nil {
+3 -34
internal/api/handlers/timeline/get_timeline.go
···
package timeline
import (
+
"Coves/internal/api/handlers/common"
"Coves/internal/api/middleware"
"Coves/internal/core/posts"
"Coves/internal/core/timeline"
···
return
}
-
// Populate viewer vote state if authenticated and vote service available
-
if h.voteService != nil {
-
session := middleware.GetOAuthSession(r)
-
if session != nil {
-
// Ensure vote cache is populated from PDS
-
if err := h.voteService.EnsureCachePopulated(r.Context(), session); err != nil {
-
// Log but don't fail - viewer state is optional
-
log.Printf("Warning: failed to populate vote cache: %v", err)
-
} else {
-
// Collect post URIs to batch lookup
-
postURIs := make([]string, 0, len(response.Feed))
-
for _, feedPost := range response.Feed {
-
if feedPost.Post != nil {
-
postURIs = append(postURIs, feedPost.Post.URI)
-
}
-
}
-
-
// Get viewer votes for all posts
-
viewerVotes := h.voteService.GetViewerVotesForSubjects(userDID, postURIs)
-
-
// Populate viewer state on each post
-
for _, feedPost := range response.Feed {
-
if feedPost.Post != nil {
-
if vote, exists := viewerVotes[feedPost.Post.URI]; exists {
-
feedPost.Post.Viewer = &posts.ViewerState{
-
Vote: &vote.Direction,
-
VoteURI: &vote.URI,
-
}
-
}
-
}
-
}
-
}
-
}
-
}
+
// Populate viewer vote state if authenticated
+
common.PopulateViewerVoteState(r.Context(), r, h.voteService, response.Feed)
// Transform blob refs to URLs for all posts
for _, feedPost := range response.Feed {
+9 -4
internal/api/routes/discover.go
···
import (
"Coves/internal/api/handlers/discover"
+
"Coves/internal/api/middleware"
discoverCore "Coves/internal/core/discover"
+
"Coves/internal/core/votes"
"github.com/go-chi/chi/v5"
)
···
// RegisterDiscoverRoutes registers discover-related XRPC endpoints
//
// SECURITY & RATE LIMITING:
-
// - Discover feed is PUBLIC (no authentication required)
+
// - Discover feed is PUBLIC (works without authentication)
+
// - Optional auth: if authenticated, includes viewer vote state on posts
// - Protected by global rate limiter: 100 requests/minute per IP (main.go:84)
// - Query timeout enforced via context (prevents long-running queries)
// - Result limit capped at 50 posts per request (validated in service layer)
···
func RegisterDiscoverRoutes(
r chi.Router,
discoverService discoverCore.Service,
+
voteService votes.Service,
+
authMiddleware *middleware.OAuthAuthMiddleware,
) {
// Create handlers
-
getDiscoverHandler := discover.NewGetDiscoverHandler(discoverService)
+
getDiscoverHandler := discover.NewGetDiscoverHandler(discoverService, voteService)
// GET /xrpc/social.coves.feed.getDiscover
-
// Public endpoint - no authentication required
+
// Public endpoint with optional auth for viewer-specific state (vote state)
// Shows posts from ALL communities (not personalized)
// Rate limited: 100 req/min per IP via global middleware
-
r.Get("/xrpc/social.coves.feed.getDiscover", getDiscoverHandler.HandleGetDiscover)
+
r.With(authMiddleware.OptionalAuth).Get("/xrpc/social.coves.feed.getDiscover", getDiscoverHandler.HandleGetDiscover)
}
+5
internal/core/communityFeeds/types.go
···
Reply *ReplyRef `json:"reply,omitempty"` // Reply context
}
+
// GetPost returns the underlying PostView for viewer state enrichment
+
func (f *FeedViewPost) GetPost() *posts.PostView {
+
return f.Post
+
}
+
// FeedReason is a union type for feed context
// Can be reasonRepost or reasonPin
type FeedReason struct {
+5
internal/core/discover/types.go
···
Reply *ReplyRef `json:"reply,omitempty"`
}
+
// GetPost returns the underlying PostView for viewer state enrichment
+
func (f *FeedViewPost) GetPost() *posts.PostView {
+
return f.Post
+
}
+
// FeedReason is a union type for feed context
type FeedReason struct {
Repost *ReasonRepost `json:"-"`
+5
internal/core/timeline/types.go
···
Reply *ReplyRef `json:"reply,omitempty"` // Reply context
}
+
// GetPost returns the underlying PostView for viewer state enrichment
+
func (f *FeedViewPost) GetPost() *posts.PostView {
+
return f.Post
+
}
+
// FeedReason is a union type for feed context
// Future: Can be reasonRepost or reasonCommunity
type FeedReason struct {
+193 -5
tests/integration/discover_test.go
···
import (
"Coves/internal/api/handlers/discover"
+
"Coves/internal/api/middleware"
+
"Coves/internal/core/votes"
"Coves/internal/db/postgres"
"context"
"encoding/json"
···
discoverCore "Coves/internal/core/discover"
+
oauthlib "github.com/bluesky-social/indigo/atproto/auth/oauth"
+
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
+
// mockVoteService implements votes.Service for testing viewer vote state
+
type mockVoteService struct {
+
cachedVotes map[string]*votes.CachedVote // userDID:subjectURI -> vote
+
}
+
+
func newMockVoteService() *mockVoteService {
+
return &mockVoteService{
+
cachedVotes: make(map[string]*votes.CachedVote),
+
}
+
}
+
+
func (m *mockVoteService) AddVote(userDID, subjectURI, direction, voteURI string) {
+
key := userDID + ":" + subjectURI
+
m.cachedVotes[key] = &votes.CachedVote{
+
Direction: direction,
+
URI: voteURI,
+
}
+
}
+
+
func (m *mockVoteService) CreateVote(_ context.Context, _ *oauthlib.ClientSessionData, _ votes.CreateVoteRequest) (*votes.CreateVoteResponse, error) {
+
return &votes.CreateVoteResponse{}, nil
+
}
+
+
func (m *mockVoteService) DeleteVote(_ context.Context, _ *oauthlib.ClientSessionData, _ votes.DeleteVoteRequest) error {
+
return nil
+
}
+
+
func (m *mockVoteService) EnsureCachePopulated(_ context.Context, _ *oauthlib.ClientSessionData) error {
+
return nil // Mock always succeeds - votes pre-populated via AddVote
+
}
+
+
func (m *mockVoteService) GetViewerVote(userDID, subjectURI string) *votes.CachedVote {
+
key := userDID + ":" + subjectURI
+
return m.cachedVotes[key]
+
}
+
+
func (m *mockVoteService) GetViewerVotesForSubjects(userDID string, subjectURIs []string) map[string]*votes.CachedVote {
+
result := make(map[string]*votes.CachedVote)
+
for _, uri := range subjectURIs {
+
key := userDID + ":" + uri
+
if vote, exists := m.cachedVotes[key]; exists {
+
result[uri] = vote
+
}
+
}
+
return result
+
}
+
// TestGetDiscover_ShowsAllCommunities tests discover feed shows posts from ALL communities
func TestGetDiscover_ShowsAllCommunities(t *testing.T) {
if testing.Short() {
···
// Setup services
discoverRepo := postgres.NewDiscoverRepository(db, "test-cursor-secret")
discoverService := discoverCore.NewDiscoverService(discoverRepo)
-
handler := discover.NewGetDiscoverHandler(discoverService)
+
handler := discover.NewGetDiscoverHandler(discoverService, nil) // nil vote service - tests don't need vote state
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
discoverRepo := postgres.NewDiscoverRepository(db, "test-cursor-secret")
discoverService := discoverCore.NewDiscoverService(discoverRepo)
-
handler := discover.NewGetDiscoverHandler(discoverService)
+
handler := discover.NewGetDiscoverHandler(discoverService, nil) // nil vote service - tests don't need vote state
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
discoverRepo := postgres.NewDiscoverRepository(db, "test-cursor-secret")
discoverService := discoverCore.NewDiscoverService(discoverRepo)
-
handler := discover.NewGetDiscoverHandler(discoverService)
+
handler := discover.NewGetDiscoverHandler(discoverService, nil)
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
discoverRepo := postgres.NewDiscoverRepository(db, "test-cursor-secret")
discoverService := discoverCore.NewDiscoverService(discoverRepo)
-
handler := discover.NewGetDiscoverHandler(discoverService)
+
handler := discover.NewGetDiscoverHandler(discoverService, nil)
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
discoverRepo := postgres.NewDiscoverRepository(db, "test-cursor-secret")
discoverService := discoverCore.NewDiscoverService(discoverRepo)
-
handler := discover.NewGetDiscoverHandler(discoverService)
+
handler := discover.NewGetDiscoverHandler(discoverService, nil)
t.Run("Limit exceeds maximum", func(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "/xrpc/social.coves.feed.getDiscover?sort=new&limit=100", nil)
···
assert.Contains(t, errorResp["message"], "limit")
})
}
+
+
// TestGetDiscover_ViewerVoteState tests that authenticated users see their vote state on posts
+
func TestGetDiscover_ViewerVoteState(t *testing.T) {
+
if testing.Short() {
+
t.Skip("Skipping integration test in short mode")
+
}
+
+
db := setupTestDB(t)
+
t.Cleanup(func() { _ = db.Close() })
+
+
ctx := context.Background()
+
testID := time.Now().UnixNano()
+
+
// Create community and posts
+
communityDID, err := createFeedTestCommunity(db, ctx, fmt.Sprintf("votes-%d", testID), fmt.Sprintf("alice-%d.test", testID))
+
require.NoError(t, err)
+
+
post1URI := createTestPost(t, db, communityDID, "did:plc:author1", "Post with upvote", 10, time.Now().Add(-1*time.Hour))
+
post2URI := createTestPost(t, db, communityDID, "did:plc:author2", "Post with downvote", 5, time.Now().Add(-2*time.Hour))
+
_ = createTestPost(t, db, communityDID, "did:plc:author3", "Post without vote", 3, time.Now().Add(-3*time.Hour))
+
+
// Setup mock vote service with pre-populated votes
+
viewerDID := "did:plc:viewer123"
+
mockVotes := newMockVoteService()
+
mockVotes.AddVote(viewerDID, post1URI, "up", "at://"+viewerDID+"/social.coves.vote/vote1")
+
mockVotes.AddVote(viewerDID, post2URI, "down", "at://"+viewerDID+"/social.coves.vote/vote2")
+
+
// Setup handler with mock vote service
+
discoverRepo := postgres.NewDiscoverRepository(db, "test-cursor-secret")
+
discoverService := discoverCore.NewDiscoverService(discoverRepo)
+
handler := discover.NewGetDiscoverHandler(discoverService, mockVotes)
+
+
// Create request with authenticated user context
+
req := httptest.NewRequest(http.MethodGet, "/xrpc/social.coves.feed.getDiscover?sort=new&limit=50", nil)
+
+
// Inject OAuth session into context (simulates OptionalAuth middleware)
+
did, _ := syntax.ParseDID(viewerDID)
+
session := &oauthlib.ClientSessionData{
+
AccountDID: did,
+
AccessToken: "test_token",
+
}
+
reqCtx := context.WithValue(req.Context(), middleware.UserDIDKey, viewerDID)
+
reqCtx = context.WithValue(reqCtx, middleware.OAuthSessionKey, session)
+
req = req.WithContext(reqCtx)
+
+
rec := httptest.NewRecorder()
+
handler.HandleGetDiscover(rec, req)
+
+
// Assertions
+
assert.Equal(t, http.StatusOK, rec.Code)
+
+
var response discoverCore.DiscoverResponse
+
err = json.Unmarshal(rec.Body.Bytes(), &response)
+
require.NoError(t, err)
+
+
// Find our test posts and verify vote state
+
var foundPost1, foundPost2, foundPost3 bool
+
for _, feedPost := range response.Feed {
+
switch feedPost.Post.URI {
+
case post1URI:
+
foundPost1 = true
+
require.NotNil(t, feedPost.Post.Viewer, "Post1 should have viewer state")
+
require.NotNil(t, feedPost.Post.Viewer.Vote, "Post1 should have vote direction")
+
assert.Equal(t, "up", *feedPost.Post.Viewer.Vote, "Post1 should show upvote")
+
require.NotNil(t, feedPost.Post.Viewer.VoteURI, "Post1 should have vote URI")
+
assert.Contains(t, *feedPost.Post.Viewer.VoteURI, "vote1", "Post1 should have correct vote URI")
+
+
case post2URI:
+
foundPost2 = true
+
require.NotNil(t, feedPost.Post.Viewer, "Post2 should have viewer state")
+
require.NotNil(t, feedPost.Post.Viewer.Vote, "Post2 should have vote direction")
+
assert.Equal(t, "down", *feedPost.Post.Viewer.Vote, "Post2 should show downvote")
+
require.NotNil(t, feedPost.Post.Viewer.VoteURI, "Post2 should have vote URI")
+
+
default:
+
// Posts without votes should have nil Viewer or nil Vote
+
if feedPost.Post.Viewer != nil && feedPost.Post.Viewer.Vote != nil {
+
// This post has a vote from our viewer - it's not post3
+
continue
+
}
+
foundPost3 = true
+
}
+
}
+
+
assert.True(t, foundPost1, "Should find post1 with upvote")
+
assert.True(t, foundPost2, "Should find post2 with downvote")
+
assert.True(t, foundPost3, "Should find post3 without vote")
+
}
+
+
// TestGetDiscover_NoViewerStateWithoutAuth tests that unauthenticated users don't get viewer state
+
func TestGetDiscover_NoViewerStateWithoutAuth(t *testing.T) {
+
if testing.Short() {
+
t.Skip("Skipping integration test in short mode")
+
}
+
+
db := setupTestDB(t)
+
t.Cleanup(func() { _ = db.Close() })
+
+
ctx := context.Background()
+
testID := time.Now().UnixNano()
+
+
// Create community and post
+
communityDID, err := createFeedTestCommunity(db, ctx, fmt.Sprintf("noauth-%d", testID), fmt.Sprintf("alice-%d.test", testID))
+
require.NoError(t, err)
+
+
postURI := createTestPost(t, db, communityDID, "did:plc:author", "Some post", 10, time.Now())
+
+
// Setup mock vote service with a vote (but request will be unauthenticated)
+
mockVotes := newMockVoteService()
+
mockVotes.AddVote("did:plc:someuser", postURI, "up", "at://did:plc:someuser/social.coves.vote/vote1")
+
+
// Setup handler with mock vote service
+
discoverRepo := postgres.NewDiscoverRepository(db, "test-cursor-secret")
+
discoverService := discoverCore.NewDiscoverService(discoverRepo)
+
handler := discover.NewGetDiscoverHandler(discoverService, mockVotes)
+
+
// Create request WITHOUT auth context
+
req := httptest.NewRequest(http.MethodGet, "/xrpc/social.coves.feed.getDiscover?sort=new&limit=50", nil)
+
rec := httptest.NewRecorder()
+
handler.HandleGetDiscover(rec, req)
+
+
// Should succeed
+
assert.Equal(t, http.StatusOK, rec.Code)
+
+
var response discoverCore.DiscoverResponse
+
err = json.Unmarshal(rec.Body.Bytes(), &response)
+
require.NoError(t, err)
+
+
// Find our post and verify NO viewer state (unauthenticated)
+
for _, feedPost := range response.Feed {
+
if feedPost.Post.URI == postURI {
+
assert.Nil(t, feedPost.Post.Viewer, "Unauthenticated request should not have viewer state")
+
return
+
}
+
}
+
t.Fatal("Test post not found in response")
+
}
+11 -11
tests/integration/feed_test.go
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test data: community, users, and posts
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test data
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test data
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test data with many posts
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Request feed for non-existent community
req := httptest.NewRequest(http.MethodGet, "/xrpc/social.coves.communityFeed.getCommunity?community=did:plc:nonexistent&sort=hot&limit=10", nil)
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test community
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Create community with no posts
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test community
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test data
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test data
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test data
ctx := context.Background()
+7 -7
tests/integration/timeline_test.go
···
// Setup services
timelineRepo := postgres.NewTimelineRepository(db, "test-cursor-secret")
timelineService := timelineCore.NewTimelineService(timelineRepo)
-
handler := timeline.NewGetTimelineHandler(timelineService)
+
handler := timeline.NewGetTimelineHandler(timelineService, nil)
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
timelineRepo := postgres.NewTimelineRepository(db, "test-cursor-secret")
timelineService := timelineCore.NewTimelineService(timelineRepo)
-
handler := timeline.NewGetTimelineHandler(timelineService)
+
handler := timeline.NewGetTimelineHandler(timelineService, nil)
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
timelineRepo := postgres.NewTimelineRepository(db, "test-cursor-secret")
timelineService := timelineCore.NewTimelineService(timelineRepo)
-
handler := timeline.NewGetTimelineHandler(timelineService)
+
handler := timeline.NewGetTimelineHandler(timelineService, nil)
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
timelineRepo := postgres.NewTimelineRepository(db, "test-cursor-secret")
timelineService := timelineCore.NewTimelineService(timelineRepo)
-
handler := timeline.NewGetTimelineHandler(timelineService)
+
handler := timeline.NewGetTimelineHandler(timelineService, nil)
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
timelineRepo := postgres.NewTimelineRepository(db, "test-cursor-secret")
timelineService := timelineCore.NewTimelineService(timelineRepo)
-
handler := timeline.NewGetTimelineHandler(timelineService)
+
handler := timeline.NewGetTimelineHandler(timelineService, nil)
// Request timeline WITHOUT auth context
req := httptest.NewRequest(http.MethodGet, "/xrpc/social.coves.feed.getTimeline?sort=new&limit=10", nil)
···
// Setup services
timelineRepo := postgres.NewTimelineRepository(db, "test-cursor-secret")
timelineService := timelineCore.NewTimelineService(timelineRepo)
-
handler := timeline.NewGetTimelineHandler(timelineService)
+
handler := timeline.NewGetTimelineHandler(timelineService, nil)
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
timelineRepo := postgres.NewTimelineRepository(db, "test-cursor-secret")
timelineService := timelineCore.NewTimelineService(timelineRepo)
-
handler := timeline.NewGetTimelineHandler(timelineService)
+
handler := timeline.NewGetTimelineHandler(timelineService, nil)
ctx := context.Background()
testID := time.Now().UnixNano()
+1 -1
tests/integration/user_journey_e2e_test.go
···
r := chi.NewRouter()
routes.RegisterCommunityRoutes(r, communityService, e2eAuth.OAuthAuthMiddleware, nil) // nil = allow all community creators
routes.RegisterPostRoutes(r, postService, e2eAuth.OAuthAuthMiddleware)
-
routes.RegisterTimelineRoutes(r, timelineService, e2eAuth.OAuthAuthMiddleware)
+
routes.RegisterTimelineRoutes(r, timelineService, nil, e2eAuth.OAuthAuthMiddleware)
httpServer := httptest.NewServer(r)
defer httpServer.Close()