A community based topic aggregation platform built on atproto

Compare changes

Choose any two refs to compare.

+125
internal/atproto/pds/factory.go
···
+
package pds
+
+
import (
+
"context"
+
"fmt"
+
"net/http"
+
+
"github.com/bluesky-social/indigo/atproto/atclient"
+
"github.com/bluesky-social/indigo/atproto/auth/oauth"
+
"github.com/bluesky-social/indigo/atproto/syntax"
+
)
+
+
// NewFromOAuthSession creates a PDS client from an OAuth session.
+
// This uses DPoP authentication - the correct method for OAuth tokens.
+
//
+
// The oauthClient is used to resume the session and get a properly configured
+
// APIClient that handles DPoP proof generation and nonce rotation automatically.
+
func NewFromOAuthSession(ctx context.Context, oauthClient *oauth.ClientApp, sessionData *oauth.ClientSessionData) (Client, error) {
+
if oauthClient == nil {
+
return nil, fmt.Errorf("oauthClient is required")
+
}
+
if sessionData == nil {
+
return nil, fmt.Errorf("sessionData is required")
+
}
+
+
// ResumeSession reconstructs the OAuth session with DPoP key
+
// and returns a ClientSession that can generate authenticated requests
+
sess, err := oauthClient.ResumeSession(ctx, sessionData.AccountDID, sessionData.SessionID)
+
if err != nil {
+
return nil, fmt.Errorf("failed to resume OAuth session: %w", err)
+
}
+
+
// APIClient() returns an *atclient.APIClient configured with DPoP auth
+
apiClient := sess.APIClient()
+
+
return &client{
+
apiClient: apiClient,
+
did: sessionData.AccountDID.String(),
+
host: sessionData.HostURL,
+
}, nil
+
}
+
+
// NewFromPasswordAuth creates a PDS client using password authentication.
+
// This uses Bearer token authentication from com.atproto.server.createSession.
+
//
+
// Primarily used for:
+
// - E2E tests with local PDS
+
// - Development/debugging tools
+
// - Non-OAuth clients
+
//
+
// Note: This establishes a new session with the PDS. For repeated calls,
+
// consider using NewFromAccessToken if you already have a valid access token.
+
func NewFromPasswordAuth(ctx context.Context, host, handle, password string) (Client, error) {
+
if host == "" {
+
return nil, fmt.Errorf("host is required")
+
}
+
if handle == "" {
+
return nil, fmt.Errorf("handle is required")
+
}
+
if password == "" {
+
return nil, fmt.Errorf("password is required")
+
}
+
+
// LoginWithPasswordHost creates a session and returns an authenticated APIClient
+
// This handles the createSession call and Bearer token setup
+
apiClient, err := atclient.LoginWithPasswordHost(ctx, host, handle, password, "", nil)
+
if err != nil {
+
return nil, fmt.Errorf("failed to login with password: %w", err)
+
}
+
+
// Get DID from the authenticated client
+
did := ""
+
if apiClient.AccountDID != nil {
+
did = apiClient.AccountDID.String()
+
}
+
+
return &client{
+
apiClient: apiClient,
+
did: did,
+
host: host,
+
}, nil
+
}
+
+
// NewFromAccessToken creates a PDS client from an existing access token.
+
// This is useful when you already have a valid Bearer token (e.g., from createSession)
+
// and don't want to re-authenticate.
+
//
+
// WARNING: This creates a client with Bearer auth only. Do NOT use this with
+
// OAuth access tokens - those require DPoP proofs. Use NewFromOAuthSession instead.
+
func NewFromAccessToken(host, did, accessToken string) (Client, error) {
+
if host == "" {
+
return nil, fmt.Errorf("host is required")
+
}
+
if did == "" {
+
return nil, fmt.Errorf("did is required")
+
}
+
if accessToken == "" {
+
return nil, fmt.Errorf("accessToken is required")
+
}
+
+
// Create APIClient with Bearer auth
+
apiClient := atclient.NewAPIClient(host)
+
apiClient.Auth = &bearerAuth{token: accessToken}
+
+
return &client{
+
apiClient: apiClient,
+
did: did,
+
host: host,
+
}, nil
+
}
+
+
// bearerAuth implements atclient.AuthMethod for simple Bearer token auth.
+
// This is used for password-based sessions where DPoP is not required.
+
type bearerAuth struct {
+
token string
+
}
+
+
// Ensure bearerAuth implements atclient.AuthMethod.
+
var _ atclient.AuthMethod = (*bearerAuth)(nil)
+
+
// DoWithAuth adds the Bearer token to the request and executes it.
+
func (b *bearerAuth) DoWithAuth(c *http.Client, req *http.Request, _ syntax.NSID) (*http.Response, error) {
+
req.Header.Set("Authorization", "Bearer "+b.token)
+
return c.Do(req)
+
}
+18
tests/integration/helpers.go
···
import (
"Coves/internal/api/middleware"
"Coves/internal/atproto/oauth"
+
"Coves/internal/atproto/pds"
"Coves/internal/core/users"
+
"Coves/internal/core/votes"
"bytes"
"context"
"database/sql"
···
e.store.AddSessionWithPDS(did, sessionID, pdsAccessToken, pdsURL)
return token
}
+
+
// PasswordAuthPDSClientFactory creates a PDSClientFactory that uses password-based Bearer auth.
+
// This is for E2E tests that use createSession instead of OAuth.
+
// The factory extracts the access token and host URL from the session data.
+
func PasswordAuthPDSClientFactory() votes.PDSClientFactory {
+
return func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) {
+
if session.AccessToken == "" {
+
return nil, fmt.Errorf("session has no access token")
+
}
+
if session.HostURL == "" {
+
return nil, fmt.Errorf("session has no host URL")
+
}
+
+
return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken)
+
}
+
}
+267
cmd/reindex-votes/main.go
···
+
// cmd/reindex-votes/main.go
+
// Quick tool to reindex votes from PDS to AppView database
+
package main
+
+
import (
+
"context"
+
"database/sql"
+
"encoding/json"
+
"fmt"
+
"log"
+
"net/http"
+
"net/url"
+
"os"
+
"strings"
+
"time"
+
+
_ "github.com/lib/pq"
+
)
+
+
type ListRecordsResponse struct {
+
Records []Record `json:"records"`
+
Cursor string `json:"cursor"`
+
}
+
+
type Record struct {
+
URI string `json:"uri"`
+
CID string `json:"cid"`
+
Value map[string]interface{} `json:"value"`
+
}
+
+
func main() {
+
// Get config from env
+
dbURL := os.Getenv("DATABASE_URL")
+
if dbURL == "" {
+
dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
+
}
+
pdsURL := os.Getenv("PDS_URL")
+
if pdsURL == "" {
+
pdsURL = "http://localhost:3001"
+
}
+
+
log.Printf("Connecting to database...")
+
db, err := sql.Open("postgres", dbURL)
+
if err != nil {
+
log.Fatalf("Failed to connect to database: %v", err)
+
}
+
defer db.Close()
+
+
ctx := context.Background()
+
+
// Get all accounts directly from the PDS
+
log.Printf("Fetching accounts from PDS (%s)...", pdsURL)
+
dids, err := fetchAllAccountsFromPDS(pdsURL)
+
if err != nil {
+
log.Fatalf("Failed to fetch accounts from PDS: %v", err)
+
}
+
log.Printf("Found %d accounts on PDS to check for votes", len(dids))
+
+
// Reset vote counts first
+
log.Printf("Resetting all vote counts...")
+
if _, err := db.ExecContext(ctx, "DELETE FROM votes"); err != nil {
+
log.Fatalf("Failed to clear votes table: %v", err)
+
}
+
if _, err := db.ExecContext(ctx, "UPDATE posts SET upvote_count = 0, downvote_count = 0, score = 0"); err != nil {
+
log.Fatalf("Failed to reset post vote counts: %v", err)
+
}
+
if _, err := db.ExecContext(ctx, "UPDATE comments SET upvote_count = 0, downvote_count = 0, score = 0"); err != nil {
+
log.Fatalf("Failed to reset comment vote counts: %v", err)
+
}
+
+
// For each user, fetch their votes from PDS
+
totalVotes := 0
+
for _, did := range dids {
+
votes, err := fetchVotesFromPDS(pdsURL, did)
+
if err != nil {
+
log.Printf("Warning: failed to fetch votes for %s: %v", did, err)
+
continue
+
}
+
+
if len(votes) == 0 {
+
continue
+
}
+
+
log.Printf("Found %d votes for %s", len(votes), did)
+
+
// Index each vote
+
for _, vote := range votes {
+
if err := indexVote(ctx, db, did, vote); err != nil {
+
log.Printf("Warning: failed to index vote %s: %v", vote.URI, err)
+
continue
+
}
+
totalVotes++
+
}
+
}
+
+
log.Printf("โœ“ Reindexed %d votes from PDS", totalVotes)
+
}
+
+
// fetchAllAccountsFromPDS queries the PDS sync API to get all repo DIDs
+
func fetchAllAccountsFromPDS(pdsURL string) ([]string, error) {
+
// Use com.atproto.sync.listRepos to get all repos on this PDS
+
var allDIDs []string
+
cursor := ""
+
+
for {
+
reqURL := fmt.Sprintf("%s/xrpc/com.atproto.sync.listRepos?limit=100", pdsURL)
+
if cursor != "" {
+
reqURL += "&cursor=" + url.QueryEscape(cursor)
+
}
+
+
resp, err := http.Get(reqURL)
+
if err != nil {
+
return nil, fmt.Errorf("HTTP request failed: %w", err)
+
}
+
defer resp.Body.Close()
+
+
if resp.StatusCode != 200 {
+
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
+
}
+
+
var result struct {
+
Repos []struct {
+
DID string `json:"did"`
+
} `json:"repos"`
+
Cursor string `json:"cursor"`
+
}
+
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
+
return nil, fmt.Errorf("failed to decode response: %w", err)
+
}
+
+
for _, repo := range result.Repos {
+
allDIDs = append(allDIDs, repo.DID)
+
}
+
+
if result.Cursor == "" {
+
break
+
}
+
cursor = result.Cursor
+
}
+
+
return allDIDs, nil
+
}
+
+
func fetchVotesFromPDS(pdsURL, did string) ([]Record, error) {
+
var allRecords []Record
+
cursor := ""
+
collection := "social.coves.feed.vote"
+
+
for {
+
reqURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s&limit=100",
+
pdsURL, url.QueryEscape(did), url.QueryEscape(collection))
+
if cursor != "" {
+
reqURL += "&cursor=" + url.QueryEscape(cursor)
+
}
+
+
resp, err := http.Get(reqURL)
+
if err != nil {
+
return nil, fmt.Errorf("HTTP request failed: %w", err)
+
}
+
defer resp.Body.Close()
+
+
if resp.StatusCode == 400 {
+
// User doesn't exist on this PDS or has no records - that's OK
+
return nil, nil
+
}
+
if resp.StatusCode != 200 {
+
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
+
}
+
+
var result ListRecordsResponse
+
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
+
return nil, fmt.Errorf("failed to decode response: %w", err)
+
}
+
+
allRecords = append(allRecords, result.Records...)
+
+
if result.Cursor == "" {
+
break
+
}
+
cursor = result.Cursor
+
}
+
+
return allRecords, nil
+
}
+
+
func indexVote(ctx context.Context, db *sql.DB, voterDID string, record Record) error {
+
// Extract vote data from record
+
subject, ok := record.Value["subject"].(map[string]interface{})
+
if !ok {
+
return fmt.Errorf("missing subject")
+
}
+
subjectURI, _ := subject["uri"].(string)
+
subjectCID, _ := subject["cid"].(string)
+
direction, _ := record.Value["direction"].(string)
+
createdAtStr, _ := record.Value["createdAt"].(string)
+
+
if subjectURI == "" || direction == "" {
+
return fmt.Errorf("invalid vote record: missing required fields")
+
}
+
+
// Parse created_at
+
createdAt, err := time.Parse(time.RFC3339, createdAtStr)
+
if err != nil {
+
createdAt = time.Now()
+
}
+
+
// Extract rkey from URI (at://did/collection/rkey)
+
parts := strings.Split(record.URI, "/")
+
if len(parts) < 5 {
+
return fmt.Errorf("invalid URI format: %s", record.URI)
+
}
+
rkey := parts[len(parts)-1]
+
+
// Start transaction
+
tx, err := db.BeginTx(ctx, nil)
+
if err != nil {
+
return fmt.Errorf("failed to begin transaction: %w", err)
+
}
+
defer tx.Rollback()
+
+
// Insert vote
+
_, err = tx.ExecContext(ctx, `
+
INSERT INTO votes (uri, cid, rkey, voter_did, subject_uri, subject_cid, direction, created_at, indexed_at)
+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW())
+
ON CONFLICT (uri) DO NOTHING
+
`, record.URI, record.CID, rkey, voterDID, subjectURI, subjectCID, direction, createdAt)
+
if err != nil {
+
return fmt.Errorf("failed to insert vote: %w", err)
+
}
+
+
// Update post/comment counts
+
collection := extractCollectionFromURI(subjectURI)
+
var updateQuery string
+
+
switch collection {
+
case "social.coves.community.post":
+
if direction == "up" {
+
updateQuery = `UPDATE posts SET upvote_count = upvote_count + 1, score = upvote_count + 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL`
+
} else {
+
updateQuery = `UPDATE posts SET downvote_count = downvote_count + 1, score = upvote_count - (downvote_count + 1) WHERE uri = $1 AND deleted_at IS NULL`
+
}
+
case "social.coves.community.comment":
+
if direction == "up" {
+
updateQuery = `UPDATE comments SET upvote_count = upvote_count + 1, score = upvote_count + 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL`
+
} else {
+
updateQuery = `UPDATE comments SET downvote_count = downvote_count + 1, score = upvote_count - (downvote_count + 1) WHERE uri = $1 AND deleted_at IS NULL`
+
}
+
default:
+
// Unknown collection, just index the vote
+
return tx.Commit()
+
}
+
+
if _, err := tx.ExecContext(ctx, updateQuery, subjectURI); err != nil {
+
return fmt.Errorf("failed to update vote counts: %w", err)
+
}
+
+
return tx.Commit()
+
}
+
+
func extractCollectionFromURI(uri string) string {
+
// at://did:plc:xxx/social.coves.community.post/rkey
+
parts := strings.Split(uri, "/")
+
if len(parts) >= 4 {
+
return parts[3]
+
}
+
return ""
+
}
+7 -5
internal/api/routes/communityFeed.go
···
import (
"Coves/internal/api/handlers/communityFeed"
+
"Coves/internal/api/middleware"
"Coves/internal/core/communityFeeds"
+
"Coves/internal/core/votes"
"github.com/go-chi/chi/v5"
)
···
func RegisterCommunityFeedRoutes(
r chi.Router,
feedService communityFeeds.Service,
+
voteService votes.Service,
+
authMiddleware *middleware.OAuthAuthMiddleware,
) {
// Create handlers
-
getCommunityHandler := communityFeed.NewGetCommunityHandler(feedService)
+
getCommunityHandler := communityFeed.NewGetCommunityHandler(feedService, voteService)
// GET /xrpc/social.coves.communityFeed.getCommunity
-
// Public endpoint - basic community sorting only for Alpha
-
// TODO(feed-generator): Add OptionalAuth middleware when implementing viewer-specific state
-
// (blocks, upvotes, saves, etc.) in feed generator skeleton
-
r.Get("/xrpc/social.coves.communityFeed.getCommunity", getCommunityHandler.HandleGetCommunity)
+
// Public endpoint with optional auth for viewer-specific state (vote state)
+
r.With(authMiddleware.OptionalAuth).Get("/xrpc/social.coves.communityFeed.getCommunity", getCommunityHandler.HandleGetCommunity)
}
+3 -1
internal/api/routes/timeline.go
···
"Coves/internal/api/handlers/timeline"
"Coves/internal/api/middleware"
timelineCore "Coves/internal/core/timeline"
+
"Coves/internal/core/votes"
"github.com/go-chi/chi/v5"
)
···
func RegisterTimelineRoutes(
r chi.Router,
timelineService timelineCore.Service,
+
voteService votes.Service,
authMiddleware *middleware.OAuthAuthMiddleware,
) {
// Create handlers
-
getTimelineHandler := timeline.NewGetTimelineHandler(timelineService)
+
getTimelineHandler := timeline.NewGetTimelineHandler(timelineService, voteService)
// GET /xrpc/social.coves.feed.getTimeline
// Requires authentication - user must be logged in to see their timeline
+221
internal/core/votes/cache.go
···
+
package votes
+
+
import (
+
"context"
+
"fmt"
+
"log/slog"
+
"strings"
+
"sync"
+
"time"
+
+
"Coves/internal/atproto/pds"
+
)
+
+
// CachedVote represents a vote stored in the cache
+
type CachedVote struct {
+
Direction string // "up" or "down"
+
URI string // vote record URI (at://did/collection/rkey)
+
RKey string // record key
+
}
+
+
// VoteCache provides an in-memory cache of user votes fetched from their PDS.
+
// This avoids eventual consistency issues with the AppView database.
+
type VoteCache struct {
+
mu sync.RWMutex
+
votes map[string]map[string]*CachedVote // userDID -> subjectURI -> vote
+
expiry map[string]time.Time // userDID -> expiry time
+
ttl time.Duration
+
logger *slog.Logger
+
}
+
+
// NewVoteCache creates a new vote cache with the specified TTL
+
func NewVoteCache(ttl time.Duration, logger *slog.Logger) *VoteCache {
+
if logger == nil {
+
logger = slog.Default()
+
}
+
return &VoteCache{
+
votes: make(map[string]map[string]*CachedVote),
+
expiry: make(map[string]time.Time),
+
ttl: ttl,
+
logger: logger,
+
}
+
}
+
+
// GetVotesForUser returns all cached votes for a user.
+
// Returns nil if cache is empty or expired for this user.
+
func (c *VoteCache) GetVotesForUser(userDID string) map[string]*CachedVote {
+
c.mu.RLock()
+
defer c.mu.RUnlock()
+
+
// Check if cache exists and is not expired
+
expiry, exists := c.expiry[userDID]
+
if !exists || time.Now().After(expiry) {
+
return nil
+
}
+
+
return c.votes[userDID]
+
}
+
+
// GetVote returns the cached vote for a specific subject, or nil if not found/expired
+
func (c *VoteCache) GetVote(userDID, subjectURI string) *CachedVote {
+
votes := c.GetVotesForUser(userDID)
+
if votes == nil {
+
return nil
+
}
+
return votes[subjectURI]
+
}
+
+
// IsCached returns true if the user's votes are cached and not expired
+
func (c *VoteCache) IsCached(userDID string) bool {
+
c.mu.RLock()
+
defer c.mu.RUnlock()
+
+
expiry, exists := c.expiry[userDID]
+
return exists && time.Now().Before(expiry)
+
}
+
+
// SetVotesForUser replaces all cached votes for a user
+
func (c *VoteCache) SetVotesForUser(userDID string, votes map[string]*CachedVote) {
+
c.mu.Lock()
+
defer c.mu.Unlock()
+
+
c.votes[userDID] = votes
+
c.expiry[userDID] = time.Now().Add(c.ttl)
+
+
c.logger.Debug("vote cache updated",
+
"user", userDID,
+
"vote_count", len(votes),
+
"expires_at", c.expiry[userDID])
+
}
+
+
// SetVote adds or updates a single vote in the cache
+
func (c *VoteCache) SetVote(userDID, subjectURI string, vote *CachedVote) {
+
c.mu.Lock()
+
defer c.mu.Unlock()
+
+
if c.votes[userDID] == nil {
+
c.votes[userDID] = make(map[string]*CachedVote)
+
}
+
+
c.votes[userDID][subjectURI] = vote
+
+
// Always extend expiry on vote action - active users keep their cache fresh
+
c.expiry[userDID] = time.Now().Add(c.ttl)
+
+
c.logger.Debug("vote cached",
+
"user", userDID,
+
"subject", subjectURI,
+
"direction", vote.Direction)
+
}
+
+
// RemoveVote removes a vote from the cache (for toggle-off)
+
func (c *VoteCache) RemoveVote(userDID, subjectURI string) {
+
c.mu.Lock()
+
defer c.mu.Unlock()
+
+
if c.votes[userDID] != nil {
+
delete(c.votes[userDID], subjectURI)
+
+
// Extend expiry on vote action - active users keep their cache fresh
+
c.expiry[userDID] = time.Now().Add(c.ttl)
+
+
c.logger.Debug("vote removed from cache",
+
"user", userDID,
+
"subject", subjectURI)
+
}
+
}
+
+
// Invalidate removes all cached votes for a user
+
func (c *VoteCache) Invalidate(userDID string) {
+
c.mu.Lock()
+
defer c.mu.Unlock()
+
+
delete(c.votes, userDID)
+
delete(c.expiry, userDID)
+
+
c.logger.Debug("vote cache invalidated", "user", userDID)
+
}
+
+
// FetchAndCacheFromPDS fetches all votes from the user's PDS and caches them.
+
// This should be called on first authenticated request or when cache is expired.
+
func (c *VoteCache) FetchAndCacheFromPDS(ctx context.Context, pdsClient pds.Client) error {
+
userDID := pdsClient.DID()
+
+
c.logger.Debug("fetching votes from PDS",
+
"user", userDID,
+
"pds", pdsClient.HostURL())
+
+
votes, err := c.fetchAllVotesFromPDS(ctx, pdsClient)
+
if err != nil {
+
return fmt.Errorf("failed to fetch votes from PDS: %w", err)
+
}
+
+
c.SetVotesForUser(userDID, votes)
+
+
c.logger.Info("vote cache populated from PDS",
+
"user", userDID,
+
"vote_count", len(votes))
+
+
return nil
+
}
+
+
// fetchAllVotesFromPDS paginates through all vote records on the user's PDS
+
func (c *VoteCache) fetchAllVotesFromPDS(ctx context.Context, pdsClient pds.Client) (map[string]*CachedVote, error) {
+
votes := make(map[string]*CachedVote)
+
cursor := ""
+
const pageSize = 100
+
const collection = "social.coves.feed.vote"
+
+
for {
+
result, err := pdsClient.ListRecords(ctx, collection, pageSize, cursor)
+
if err != nil {
+
if pds.IsAuthError(err) {
+
return nil, ErrNotAuthorized
+
}
+
return nil, fmt.Errorf("listRecords failed: %w", err)
+
}
+
+
for _, rec := range result.Records {
+
// Extract subject from record value
+
subject, ok := rec.Value["subject"].(map[string]any)
+
if !ok {
+
continue
+
}
+
+
subjectURI, ok := subject["uri"].(string)
+
if !ok || subjectURI == "" {
+
continue
+
}
+
+
direction, _ := rec.Value["direction"].(string)
+
if direction == "" {
+
continue
+
}
+
+
// Extract rkey from URI
+
rkey := extractRKeyFromURI(rec.URI)
+
+
votes[subjectURI] = &CachedVote{
+
Direction: direction,
+
URI: rec.URI,
+
RKey: rkey,
+
}
+
}
+
+
if result.Cursor == "" {
+
break
+
}
+
cursor = result.Cursor
+
}
+
+
return votes, nil
+
}
+
+
// extractRKeyFromURI extracts the rkey from an AT-URI (at://did/collection/rkey)
+
func extractRKeyFromURI(uri string) string {
+
parts := strings.Split(uri, "/")
+
if len(parts) >= 5 {
+
return parts[len(parts)-1]
+
}
+
return ""
+
}
+14
internal/core/votes/service.go
···
// - Deletes the user's vote record from their PDS
// - AppView will soft-delete via Jetstream consumer
DeleteVote(ctx context.Context, session *oauthlib.ClientSessionData, req DeleteVoteRequest) error
+
+
// EnsureCachePopulated fetches the user's votes from their PDS if not already cached.
+
// This should be called before rendering feeds to ensure vote state is available.
+
// If cache is already populated and not expired, this is a no-op.
+
EnsureCachePopulated(ctx context.Context, session *oauthlib.ClientSessionData) error
+
+
// GetViewerVote returns the viewer's vote for a specific subject, or nil if not voted.
+
// Returns from cache if available, otherwise returns nil (caller should ensure cache is populated).
+
GetViewerVote(userDID, subjectURI string) *CachedVote
+
+
// GetViewerVotesForSubjects returns the viewer's votes for multiple subjects.
+
// Returns a map of subjectURI -> CachedVote for subjects the user has voted on.
+
// This is efficient for batch lookups when rendering feeds.
+
GetViewerVotesForSubjects(userDID string, subjectURIs []string) map[string]*CachedVote
}
// CreateVoteRequest contains the parameters for creating a vote
+76 -16
internal/atproto/jetstream/vote_consumer.go
···
}
// Atomically: Index vote + Update post counts
-
if err := c.indexVoteAndUpdateCounts(ctx, vote); err != nil {
+
wasNew, err := c.indexVoteAndUpdateCounts(ctx, vote)
+
if err != nil {
return fmt.Errorf("failed to index vote and update counts: %w", err)
}
-
log.Printf("โœ“ Indexed vote: %s (%s on %s)", uri, vote.Direction, vote.SubjectURI)
+
if wasNew {
+
log.Printf("โœ“ Indexed vote: %s (%s on %s)", uri, vote.Direction, vote.SubjectURI)
+
}
return nil
}
···
}
// indexVoteAndUpdateCounts atomically indexes a vote and updates post vote counts
-
func (c *VoteEventConsumer) indexVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) error {
+
// Returns (true, nil) if vote was newly inserted, (false, nil) if already existed (idempotent)
+
func (c *VoteEventConsumer) indexVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) (bool, error) {
tx, err := c.db.BeginTx(ctx, nil)
if err != nil {
-
return fmt.Errorf("failed to begin transaction: %w", err)
+
return false, fmt.Errorf("failed to begin transaction: %w", err)
}
defer func() {
if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
···
}
}()
-
// 1. Index the vote (idempotent with ON CONFLICT DO NOTHING)
+
// 1. Check for existing active vote with different URI (stale record)
+
// This handles cases where:
+
// - User voted on another client and we missed the delete event
+
// - Vote was reindexed but user created a new vote with different rkey
+
// - Any other state mismatch between PDS and AppView
+
var existingDirection sql.NullString
+
checkQuery := `
+
SELECT direction FROM votes
+
WHERE voter_did = $1
+
AND subject_uri = $2
+
AND deleted_at IS NULL
+
AND uri != $3
+
LIMIT 1
+
`
+
if err := tx.QueryRowContext(ctx, checkQuery, vote.VoterDID, vote.SubjectURI, vote.URI).Scan(&existingDirection); err != nil && err != sql.ErrNoRows {
+
return false, fmt.Errorf("failed to check existing vote: %w", err)
+
}
+
+
// If there's a stale vote, soft-delete it and adjust counts
+
if existingDirection.Valid {
+
softDeleteQuery := `
+
UPDATE votes
+
SET deleted_at = NOW()
+
WHERE voter_did = $1
+
AND subject_uri = $2
+
AND deleted_at IS NULL
+
AND uri != $3
+
`
+
if _, err := tx.ExecContext(ctx, softDeleteQuery, vote.VoterDID, vote.SubjectURI, vote.URI); err != nil {
+
return false, fmt.Errorf("failed to soft-delete existing votes: %w", err)
+
}
+
+
// Decrement the old vote's count (will be re-incremented below if same direction)
+
collection := utils.ExtractCollectionFromURI(vote.SubjectURI)
+
var decrementQuery string
+
if existingDirection.String == "up" {
+
if collection == "social.coves.community.post" {
+
decrementQuery = `UPDATE posts SET upvote_count = GREATEST(0, upvote_count - 1), score = upvote_count - 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL`
+
} else if collection == "social.coves.community.comment" {
+
decrementQuery = `UPDATE comments SET upvote_count = GREATEST(0, upvote_count - 1), score = upvote_count - 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL`
+
}
+
} else {
+
if collection == "social.coves.community.post" {
+
decrementQuery = `UPDATE posts SET downvote_count = GREATEST(0, downvote_count - 1), score = upvote_count - (downvote_count - 1) WHERE uri = $1 AND deleted_at IS NULL`
+
} else if collection == "social.coves.community.comment" {
+
decrementQuery = `UPDATE comments SET downvote_count = GREATEST(0, downvote_count - 1), score = upvote_count - (downvote_count - 1) WHERE uri = $1 AND deleted_at IS NULL`
+
}
+
}
+
if decrementQuery != "" {
+
if _, err := tx.ExecContext(ctx, decrementQuery, vote.SubjectURI); err != nil {
+
return false, fmt.Errorf("failed to decrement old vote count: %w", err)
+
}
+
}
+
log.Printf("Cleaned up stale vote for %s on %s (was %s)", vote.VoterDID, vote.SubjectURI, existingDirection.String)
+
}
+
+
// 2. Index the vote (idempotent with ON CONFLICT DO NOTHING)
query := `
INSERT INTO votes (
uri, cid, rkey, voter_did,
···
// If no rows returned, vote already exists (idempotent - OK for Jetstream replays)
if err == sql.ErrNoRows {
-
log.Printf("Vote already indexed: %s (idempotent)", vote.URI)
+
// Silently handle idempotent case - no log needed for replayed events
if commitErr := tx.Commit(); commitErr != nil {
-
return fmt.Errorf("failed to commit transaction: %w", commitErr)
+
return false, fmt.Errorf("failed to commit transaction: %w", commitErr)
}
-
return nil
+
return false, nil // Vote already existed
}
if err != nil {
-
return fmt.Errorf("failed to insert vote: %w", err)
+
return false, fmt.Errorf("failed to insert vote: %w", err)
}
-
// 2. Update vote counts on the subject (post or comment)
+
// 3. Update vote counts on the subject (post or comment)
// Parse collection from subject URI to determine target table
collection := utils.ExtractCollectionFromURI(vote.SubjectURI)
···
// Vote is still indexed in votes table, we just don't update denormalized counts
log.Printf("Vote subject has unsupported collection: %s (vote indexed, counts not updated)", collection)
if commitErr := tx.Commit(); commitErr != nil {
-
return fmt.Errorf("failed to commit transaction: %w", commitErr)
+
return false, fmt.Errorf("failed to commit transaction: %w", commitErr)
}
-
return nil
+
return true, nil // Vote was newly indexed
}
result, err := tx.ExecContext(ctx, updateQuery, vote.SubjectURI)
if err != nil {
-
return fmt.Errorf("failed to update vote counts: %w", err)
+
return false, fmt.Errorf("failed to update vote counts: %w", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
-
return fmt.Errorf("failed to check update result: %w", err)
+
return false, fmt.Errorf("failed to check update result: %w", err)
}
// If subject doesn't exist or is deleted, that's OK (vote still indexed)
···
// Commit transaction
if err := tx.Commit(); err != nil {
-
return fmt.Errorf("failed to commit transaction: %w", err)
+
return false, fmt.Errorf("failed to commit transaction: %w", err)
}
-
return nil
+
return true, nil // Vote was newly indexed
}
// deleteVoteAndUpdateCounts atomically soft-deletes a vote and updates post vote counts
+109
internal/atproto/lexicon/social/coves/community/comment/create.json
···
+
{
+
"lexicon": 1,
+
"id": "social.coves.community.comment.create",
+
"defs": {
+
"main": {
+
"type": "procedure",
+
"description": "Create a comment on a post or another comment. Comments support nested threading, rich text, embeds, and self-labeling.",
+
"input": {
+
"encoding": "application/json",
+
"schema": {
+
"type": "object",
+
"required": ["reply", "content"],
+
"properties": {
+
"reply": {
+
"type": "object",
+
"description": "References for maintaining thread structure. Root always points to the original post, parent points to the immediate parent (post or comment).",
+
"required": ["root", "parent"],
+
"properties": {
+
"root": {
+
"type": "ref",
+
"ref": "com.atproto.repo.strongRef",
+
"description": "Strong reference to the original post that started the thread"
+
},
+
"parent": {
+
"type": "ref",
+
"ref": "com.atproto.repo.strongRef",
+
"description": "Strong reference to the immediate parent (post or comment) being replied to"
+
}
+
}
+
},
+
"content": {
+
"type": "string",
+
"maxGraphemes": 10000,
+
"maxLength": 100000,
+
"description": "Comment text content"
+
},
+
"facets": {
+
"type": "array",
+
"description": "Annotations for rich text (mentions, links, etc.)",
+
"items": {
+
"type": "ref",
+
"ref": "social.coves.richtext.facet"
+
}
+
},
+
"embed": {
+
"type": "union",
+
"description": "Embedded media or quoted posts",
+
"refs": [
+
"social.coves.embed.images",
+
"social.coves.embed.post"
+
]
+
},
+
"langs": {
+
"type": "array",
+
"description": "Languages used in the comment content (ISO 639-1)",
+
"maxLength": 3,
+
"items": {
+
"type": "string",
+
"format": "language"
+
}
+
},
+
"labels": {
+
"type": "ref",
+
"ref": "com.atproto.label.defs#selfLabels",
+
"description": "Self-applied content labels"
+
}
+
}
+
}
+
},
+
"output": {
+
"encoding": "application/json",
+
"schema": {
+
"type": "object",
+
"required": ["uri", "cid"],
+
"properties": {
+
"uri": {
+
"type": "string",
+
"format": "at-uri",
+
"description": "AT-URI of the created comment"
+
},
+
"cid": {
+
"type": "string",
+
"format": "cid",
+
"description": "CID of the created comment record"
+
}
+
}
+
}
+
},
+
"errors": [
+
{
+
"name": "InvalidReply",
+
"description": "The reply reference is invalid, malformed, or refers to non-existent content"
+
},
+
{
+
"name": "ContentTooLong",
+
"description": "Comment content exceeds maximum length constraints"
+
},
+
{
+
"name": "ContentEmpty",
+
"description": "Comment content is empty or contains only whitespace"
+
},
+
{
+
"name": "NotAuthorized",
+
"description": "User is not authorized to create comments on this content"
+
}
+
]
+
}
+
}
+
}
+41
internal/atproto/lexicon/social/coves/community/comment/delete.json
···
+
{
+
"lexicon": 1,
+
"id": "social.coves.community.comment.delete",
+
"defs": {
+
"main": {
+
"type": "procedure",
+
"description": "Delete a comment. Only the comment author can delete their own comments.",
+
"input": {
+
"encoding": "application/json",
+
"schema": {
+
"type": "object",
+
"required": ["uri"],
+
"properties": {
+
"uri": {
+
"type": "string",
+
"format": "at-uri",
+
"description": "AT-URI of the comment to delete"
+
}
+
}
+
}
+
},
+
"output": {
+
"encoding": "application/json",
+
"schema": {
+
"type": "object",
+
"properties": {}
+
}
+
},
+
"errors": [
+
{
+
"name": "CommentNotFound",
+
"description": "Comment with the specified URI does not exist"
+
},
+
{
+
"name": "NotAuthorized",
+
"description": "User is not authorized to delete this comment (not the author)"
+
}
+
]
+
}
+
}
+
}
+97
internal/atproto/lexicon/social/coves/community/comment/update.json
···
+
{
+
"lexicon": 1,
+
"id": "social.coves.community.comment.update",
+
"defs": {
+
"main": {
+
"type": "procedure",
+
"description": "Update an existing comment's content, facets, embed, languages, or labels. Threading references (reply.root and reply.parent) are immutable and cannot be changed.",
+
"input": {
+
"encoding": "application/json",
+
"schema": {
+
"type": "object",
+
"required": ["uri", "content"],
+
"properties": {
+
"uri": {
+
"type": "string",
+
"format": "at-uri",
+
"description": "AT-URI of the comment to update"
+
},
+
"content": {
+
"type": "string",
+
"maxGraphemes": 10000,
+
"maxLength": 100000,
+
"description": "Updated comment text content"
+
},
+
"facets": {
+
"type": "array",
+
"description": "Updated annotations for rich text (mentions, links, etc.)",
+
"items": {
+
"type": "ref",
+
"ref": "social.coves.richtext.facet"
+
}
+
},
+
"embed": {
+
"type": "union",
+
"description": "Updated embedded media or quoted posts",
+
"refs": [
+
"social.coves.embed.images",
+
"social.coves.embed.post"
+
]
+
},
+
"langs": {
+
"type": "array",
+
"description": "Updated languages used in the comment content (ISO 639-1)",
+
"maxLength": 3,
+
"items": {
+
"type": "string",
+
"format": "language"
+
}
+
},
+
"labels": {
+
"type": "ref",
+
"ref": "com.atproto.label.defs#selfLabels",
+
"description": "Updated self-applied content labels"
+
}
+
}
+
}
+
},
+
"output": {
+
"encoding": "application/json",
+
"schema": {
+
"type": "object",
+
"required": ["uri", "cid"],
+
"properties": {
+
"uri": {
+
"type": "string",
+
"format": "at-uri",
+
"description": "AT-URI of the updated comment (unchanged from input)"
+
},
+
"cid": {
+
"type": "string",
+
"format": "cid",
+
"description": "New CID of the updated comment record"
+
}
+
}
+
}
+
},
+
"errors": [
+
{
+
"name": "CommentNotFound",
+
"description": "Comment with the specified URI does not exist"
+
},
+
{
+
"name": "ContentTooLong",
+
"description": "Updated comment content exceeds maximum length constraints"
+
},
+
{
+
"name": "ContentEmpty",
+
"description": "Updated comment content is empty or contains only whitespace"
+
},
+
{
+
"name": "NotAuthorized",
+
"description": "User is not authorized to update this comment (not the author)"
+
}
+
]
+
}
+
}
+
}
+38
internal/core/comments/types.go
···
+
package comments
+
+
// CreateCommentRequest contains parameters for creating a comment
+
type CreateCommentRequest struct {
+
Reply ReplyRef `json:"reply"`
+
Content string `json:"content"`
+
Facets []interface{} `json:"facets,omitempty"`
+
Embed interface{} `json:"embed,omitempty"`
+
Langs []string `json:"langs,omitempty"`
+
Labels *SelfLabels `json:"labels,omitempty"`
+
}
+
+
// CreateCommentResponse contains the result of creating a comment
+
type CreateCommentResponse struct {
+
URI string `json:"uri"`
+
CID string `json:"cid"`
+
}
+
+
// UpdateCommentRequest contains parameters for updating a comment
+
type UpdateCommentRequest struct {
+
URI string `json:"uri"`
+
Content string `json:"content"`
+
Facets []interface{} `json:"facets,omitempty"`
+
Embed interface{} `json:"embed,omitempty"`
+
Langs []string `json:"langs,omitempty"`
+
Labels *SelfLabels `json:"labels,omitempty"`
+
}
+
+
// UpdateCommentResponse contains the result of updating a comment
+
type UpdateCommentResponse struct {
+
URI string `json:"uri"`
+
CID string `json:"cid"`
+
}
+
+
// DeleteCommentRequest contains parameters for deleting a comment
+
type DeleteCommentRequest struct {
+
URI string `json:"uri"`
+
}
+130
internal/api/handlers/comments/create_comment.go
···
+
package comments
+
+
import (
+
"Coves/internal/api/middleware"
+
"Coves/internal/core/comments"
+
"encoding/json"
+
"log"
+
"net/http"
+
)
+
+
// CreateCommentHandler handles comment creation requests
+
type CreateCommentHandler struct {
+
service comments.Service
+
}
+
+
// NewCreateCommentHandler creates a new handler for creating comments
+
func NewCreateCommentHandler(service comments.Service) *CreateCommentHandler {
+
return &CreateCommentHandler{
+
service: service,
+
}
+
}
+
+
// CreateCommentInput matches the lexicon input schema for social.coves.community.comment.create
+
type CreateCommentInput struct {
+
Reply struct {
+
Root struct {
+
URI string `json:"uri"`
+
CID string `json:"cid"`
+
} `json:"root"`
+
Parent struct {
+
URI string `json:"uri"`
+
CID string `json:"cid"`
+
} `json:"parent"`
+
} `json:"reply"`
+
Content string `json:"content"`
+
Facets []interface{} `json:"facets,omitempty"`
+
Embed interface{} `json:"embed,omitempty"`
+
Langs []string `json:"langs,omitempty"`
+
Labels interface{} `json:"labels,omitempty"`
+
}
+
+
// CreateCommentOutput matches the lexicon output schema
+
type CreateCommentOutput struct {
+
URI string `json:"uri"`
+
CID string `json:"cid"`
+
}
+
+
// HandleCreate handles comment creation requests
+
// POST /xrpc/social.coves.community.comment.create
+
//
+
// Request body: { "reply": { "root": {...}, "parent": {...} }, "content": "..." }
+
// Response: { "uri": "at://...", "cid": "..." }
+
func (h *CreateCommentHandler) HandleCreate(w http.ResponseWriter, r *http.Request) {
+
// 1. Check method is POST
+
if r.Method != http.MethodPost {
+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+
return
+
}
+
+
// 2. Limit request body size to prevent DoS attacks (100KB should be plenty for comments)
+
r.Body = http.MaxBytesReader(w, r.Body, 100*1024)
+
+
// 3. Parse JSON body into CreateCommentInput
+
var input CreateCommentInput
+
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
+
writeError(w, http.StatusBadRequest, "InvalidRequest", "Invalid request body")
+
return
+
}
+
+
// 4. Get OAuth session from context (injected by auth middleware)
+
session := middleware.GetOAuthSession(r)
+
if session == nil {
+
writeError(w, http.StatusUnauthorized, "AuthRequired", "Authentication required")
+
return
+
}
+
+
// 5. Convert labels interface{} to *comments.SelfLabels if provided
+
var labels *comments.SelfLabels
+
if input.Labels != nil {
+
labelsJSON, err := json.Marshal(input.Labels)
+
if err != nil {
+
writeError(w, http.StatusBadRequest, "InvalidLabels", "Invalid labels format")
+
return
+
}
+
var selfLabels comments.SelfLabels
+
if err := json.Unmarshal(labelsJSON, &selfLabels); err != nil {
+
writeError(w, http.StatusBadRequest, "InvalidLabels", "Invalid labels structure")
+
return
+
}
+
labels = &selfLabels
+
}
+
+
// 6. Convert input to CreateCommentRequest
+
req := comments.CreateCommentRequest{
+
Reply: comments.ReplyRef{
+
Root: comments.StrongRef{
+
URI: input.Reply.Root.URI,
+
CID: input.Reply.Root.CID,
+
},
+
Parent: comments.StrongRef{
+
URI: input.Reply.Parent.URI,
+
CID: input.Reply.Parent.CID,
+
},
+
},
+
Content: input.Content,
+
Facets: input.Facets,
+
Embed: input.Embed,
+
Langs: input.Langs,
+
Labels: labels,
+
}
+
+
// 7. Call service to create comment
+
response, err := h.service.CreateComment(r.Context(), session, req)
+
if err != nil {
+
handleServiceError(w, err)
+
return
+
}
+
+
// 8. Return JSON response with URI and CID
+
output := CreateCommentOutput{
+
URI: response.URI,
+
CID: response.CID,
+
}
+
+
w.Header().Set("Content-Type", "application/json")
+
w.WriteHeader(http.StatusOK)
+
if err := json.NewEncoder(w).Encode(output); err != nil {
+
log.Printf("Failed to encode response: %v", err)
+
}
+
}
+80
internal/api/handlers/comments/delete_comment.go
···
+
package comments
+
+
import (
+
"Coves/internal/api/middleware"
+
"Coves/internal/core/comments"
+
"encoding/json"
+
"log"
+
"net/http"
+
)
+
+
// DeleteCommentHandler handles comment deletion requests
+
type DeleteCommentHandler struct {
+
service comments.Service
+
}
+
+
// NewDeleteCommentHandler creates a new handler for deleting comments
+
func NewDeleteCommentHandler(service comments.Service) *DeleteCommentHandler {
+
return &DeleteCommentHandler{
+
service: service,
+
}
+
}
+
+
// DeleteCommentInput matches the lexicon input schema for social.coves.community.comment.delete
+
type DeleteCommentInput struct {
+
URI string `json:"uri"`
+
}
+
+
// DeleteCommentOutput is empty per lexicon specification
+
type DeleteCommentOutput struct{}
+
+
// HandleDelete handles comment deletion requests
+
// POST /xrpc/social.coves.community.comment.delete
+
//
+
// Request body: { "uri": "at://..." }
+
// Response: {}
+
func (h *DeleteCommentHandler) HandleDelete(w http.ResponseWriter, r *http.Request) {
+
// 1. Check method is POST
+
if r.Method != http.MethodPost {
+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+
return
+
}
+
+
// 2. Limit request body size to prevent DoS attacks (100KB should be plenty for comments)
+
r.Body = http.MaxBytesReader(w, r.Body, 100*1024)
+
+
// 3. Parse JSON body into DeleteCommentInput
+
var input DeleteCommentInput
+
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
+
writeError(w, http.StatusBadRequest, "InvalidRequest", "Invalid request body")
+
return
+
}
+
+
// 4. Get OAuth session from context (injected by auth middleware)
+
session := middleware.GetOAuthSession(r)
+
if session == nil {
+
writeError(w, http.StatusUnauthorized, "AuthRequired", "Authentication required")
+
return
+
}
+
+
// 5. Convert input to DeleteCommentRequest
+
req := comments.DeleteCommentRequest{
+
URI: input.URI,
+
}
+
+
// 6. Call service to delete comment
+
err := h.service.DeleteComment(r.Context(), session, req)
+
if err != nil {
+
handleServiceError(w, err)
+
return
+
}
+
+
// 7. Return empty JSON object per lexicon specification
+
output := DeleteCommentOutput{}
+
+
w.Header().Set("Content-Type", "application/json")
+
w.WriteHeader(http.StatusOK)
+
if err := json.NewEncoder(w).Encode(output); err != nil {
+
log.Printf("Failed to encode response: %v", err)
+
}
+
}
+34 -2
internal/api/handlers/comments/errors.go
···
import (
"Coves/internal/core/comments"
"encoding/json"
+
"errors"
"log"
"net/http"
)
···
func handleServiceError(w http.ResponseWriter, err error) {
switch {
case comments.IsNotFound(err):
-
writeError(w, http.StatusNotFound, "NotFound", err.Error())
+
// Map specific not found errors to appropriate messages
+
switch {
+
case errors.Is(err, comments.ErrCommentNotFound):
+
writeError(w, http.StatusNotFound, "CommentNotFound", "Comment not found")
+
case errors.Is(err, comments.ErrParentNotFound):
+
writeError(w, http.StatusNotFound, "ParentNotFound", "Parent post or comment not found")
+
case errors.Is(err, comments.ErrRootNotFound):
+
writeError(w, http.StatusNotFound, "RootNotFound", "Root post not found")
+
default:
+
writeError(w, http.StatusNotFound, "NotFound", err.Error())
+
}
case comments.IsValidationError(err):
-
writeError(w, http.StatusBadRequest, "InvalidRequest", err.Error())
+
// Map specific validation errors to appropriate messages
+
switch {
+
case errors.Is(err, comments.ErrInvalidReply):
+
writeError(w, http.StatusBadRequest, "InvalidReply", "The reply reference is invalid or malformed")
+
case errors.Is(err, comments.ErrContentTooLong):
+
writeError(w, http.StatusBadRequest, "ContentTooLong", "Comment content exceeds 10000 graphemes")
+
case errors.Is(err, comments.ErrContentEmpty):
+
writeError(w, http.StatusBadRequest, "ContentEmpty", "Comment content is required")
+
default:
+
writeError(w, http.StatusBadRequest, "InvalidRequest", err.Error())
+
}
+
+
case errors.Is(err, comments.ErrNotAuthorized):
+
writeError(w, http.StatusForbidden, "NotAuthorized", "User is not authorized to perform this action")
+
+
case errors.Is(err, comments.ErrBanned):
+
writeError(w, http.StatusForbidden, "Banned", "User is banned from this community")
+
+
// NOTE: IsConflict case removed - the PDS handles duplicate detection via CreateRecord,
+
// so ErrCommentAlreadyExists is never returned from the service layer. If the PDS rejects
+
// a duplicate record, it returns an auth/validation error which is handled by other cases.
+
// Keeping this code would be dead code that never executes.
default:
// Don't leak internal error details to clients
+112
internal/api/handlers/comments/update_comment.go
···
+
package comments
+
+
import (
+
"Coves/internal/api/middleware"
+
"Coves/internal/core/comments"
+
"encoding/json"
+
"log"
+
"net/http"
+
)
+
+
// UpdateCommentHandler handles comment update requests
+
type UpdateCommentHandler struct {
+
service comments.Service
+
}
+
+
// NewUpdateCommentHandler creates a new handler for updating comments
+
func NewUpdateCommentHandler(service comments.Service) *UpdateCommentHandler {
+
return &UpdateCommentHandler{
+
service: service,
+
}
+
}
+
+
// UpdateCommentInput matches the lexicon input schema for social.coves.community.comment.update
+
type UpdateCommentInput struct {
+
URI string `json:"uri"`
+
Content string `json:"content"`
+
Facets []interface{} `json:"facets,omitempty"`
+
Embed interface{} `json:"embed,omitempty"`
+
Langs []string `json:"langs,omitempty"`
+
Labels interface{} `json:"labels,omitempty"`
+
}
+
+
// UpdateCommentOutput matches the lexicon output schema
+
type UpdateCommentOutput struct {
+
URI string `json:"uri"`
+
CID string `json:"cid"`
+
}
+
+
// HandleUpdate handles comment update requests
+
// POST /xrpc/social.coves.community.comment.update
+
//
+
// Request body: { "uri": "at://...", "content": "..." }
+
// Response: { "uri": "at://...", "cid": "..." }
+
func (h *UpdateCommentHandler) HandleUpdate(w http.ResponseWriter, r *http.Request) {
+
// 1. Check method is POST
+
if r.Method != http.MethodPost {
+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+
return
+
}
+
+
// 2. Limit request body size to prevent DoS attacks (100KB should be plenty for comments)
+
r.Body = http.MaxBytesReader(w, r.Body, 100*1024)
+
+
// 3. Parse JSON body into UpdateCommentInput
+
var input UpdateCommentInput
+
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
+
writeError(w, http.StatusBadRequest, "InvalidRequest", "Invalid request body")
+
return
+
}
+
+
// 4. Get OAuth session from context (injected by auth middleware)
+
session := middleware.GetOAuthSession(r)
+
if session == nil {
+
writeError(w, http.StatusUnauthorized, "AuthRequired", "Authentication required")
+
return
+
}
+
+
// 5. Convert labels interface{} to *comments.SelfLabels if provided
+
var labels *comments.SelfLabels
+
if input.Labels != nil {
+
labelsJSON, err := json.Marshal(input.Labels)
+
if err != nil {
+
writeError(w, http.StatusBadRequest, "InvalidLabels", "Invalid labels format")
+
return
+
}
+
var selfLabels comments.SelfLabels
+
if err := json.Unmarshal(labelsJSON, &selfLabels); err != nil {
+
writeError(w, http.StatusBadRequest, "InvalidLabels", "Invalid labels structure")
+
return
+
}
+
labels = &selfLabels
+
}
+
+
// 6. Convert input to UpdateCommentRequest
+
req := comments.UpdateCommentRequest{
+
URI: input.URI,
+
Content: input.Content,
+
Facets: input.Facets,
+
Embed: input.Embed,
+
Langs: input.Langs,
+
Labels: labels,
+
}
+
+
// 7. Call service to update comment
+
response, err := h.service.UpdateComment(r.Context(), session, req)
+
if err != nil {
+
handleServiceError(w, err)
+
return
+
}
+
+
// 8. Return JSON response with URI and CID
+
output := UpdateCommentOutput{
+
URI: response.URI,
+
CID: response.CID,
+
}
+
+
w.Header().Set("Content-Type", "application/json")
+
w.WriteHeader(http.StatusOK)
+
if err := json.NewEncoder(w).Encode(output); err != nil {
+
log.Printf("Failed to encode response: %v", err)
+
}
+
}
+35
internal/api/routes/comment.go
···
+
package routes
+
+
import (
+
"Coves/internal/api/handlers/comments"
+
"Coves/internal/api/middleware"
+
commentsCore "Coves/internal/core/comments"
+
+
"github.com/go-chi/chi/v5"
+
)
+
+
// RegisterCommentRoutes registers comment-related XRPC endpoints on the router
+
// Implements social.coves.community.comment.* lexicon endpoints
+
// All write operations (create, update, delete) require authentication
+
func RegisterCommentRoutes(r chi.Router, service commentsCore.Service, authMiddleware *middleware.OAuthAuthMiddleware) {
+
// Initialize handlers
+
createHandler := comments.NewCreateCommentHandler(service)
+
updateHandler := comments.NewUpdateCommentHandler(service)
+
deleteHandler := comments.NewDeleteCommentHandler(service)
+
+
// Procedure endpoints (POST) - require authentication
+
// social.coves.community.comment.create - create a new comment on a post or another comment
+
r.With(authMiddleware.RequireAuth).Post(
+
"/xrpc/social.coves.community.comment.create",
+
createHandler.HandleCreate)
+
+
// social.coves.community.comment.update - update an existing comment's content
+
r.With(authMiddleware.RequireAuth).Post(
+
"/xrpc/social.coves.community.comment.update",
+
updateHandler.HandleUpdate)
+
+
// social.coves.community.comment.delete - soft delete a comment
+
r.With(authMiddleware.RequireAuth).Post(
+
"/xrpc/social.coves.community.comment.delete",
+
deleteHandler.HandleDelete)
+
}
+4 -2
tests/integration/comment_query_test.go
···
postRepo := postgres.NewPostRepository(db)
userRepo := postgres.NewUserRepository(db)
communityRepo := postgres.NewCommunityRepository(db)
-
return comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo)
+
// Use factory constructor with nil factory - these tests only use the read path (GetComments)
+
return comments.NewCommentServiceWithPDSFactory(commentRepo, userRepo, postRepo, communityRepo, nil, nil)
}
// Helper: createTestCommentWithScore creates a comment with specific vote counts
···
postRepo := postgres.NewPostRepository(db)
userRepo := postgres.NewUserRepository(db)
communityRepo := postgres.NewCommunityRepository(db)
-
service := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo)
+
// Use factory constructor with nil factory - these tests only use the read path (GetComments)
+
service := comments.NewCommentServiceWithPDSFactory(commentRepo, userRepo, postRepo, communityRepo, nil, nil)
return &testCommentServiceAdapter{service: service}
}
+6 -3
tests/integration/comment_vote_test.go
···
}
// Query comments with viewer authentication
-
commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo)
+
// Use factory constructor with nil factory - this test only uses the read path (GetComments)
+
commentService := comments.NewCommentServiceWithPDSFactory(commentRepo, userRepo, postRepo, communityRepo, nil, nil)
response, err := commentService.GetComments(ctx, &comments.GetCommentsRequest{
PostURI: testPostURI,
Sort: "new",
···
}
// Query with authentication but no vote
-
commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo)
+
// Use factory constructor with nil factory - this test only uses the read path (GetComments)
+
commentService := comments.NewCommentServiceWithPDSFactory(commentRepo, userRepo, postRepo, communityRepo, nil, nil)
response, err := commentService.GetComments(ctx, &comments.GetCommentsRequest{
PostURI: testPostURI,
Sort: "new",
···
t.Run("Unauthenticated request has no viewer state", func(t *testing.T) {
// Query without authentication
-
commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo)
+
// Use factory constructor with nil factory - this test only uses the read path (GetComments)
+
commentService := comments.NewCommentServiceWithPDSFactory(commentRepo, userRepo, postRepo, communityRepo, nil, nil)
response, err := commentService.GetComments(ctx, &comments.GetCommentsRequest{
PostURI: testPostURI,
Sort: "new",
+2 -1
tests/integration/concurrent_scenarios_test.go
···
}
// Verify all comments are retrievable via service
-
commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo)
+
// Use factory constructor with nil factory - this test only uses the read path (GetComments)
+
commentService := comments.NewCommentServiceWithPDSFactory(commentRepo, userRepo, postRepo, communityRepo, nil, nil)
response, err := commentService.GetComments(ctx, &comments.GetCommentsRequest{
PostURI: postURI,
Sort: "new",
+1 -1
go.mod
···
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
-
github.com/rivo/uniseg v0.1.0 // indirect
+
github.com/rivo/uniseg v0.4.7 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/sethvargo/go-retry v0.3.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
+2
go.sum
···
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rivo/uniseg v0.1.0 h1:+2KBaVoUmb9XzDsrx/Ct0W/EYOSFf/nWTauy++DprtY=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
+
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
+
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
+66
internal/db/migrations/021_add_comment_deletion_metadata.sql
···
+
-- +goose Up
+
-- Add deletion reason tracking to preserve thread structure while respecting privacy
+
-- When comments are deleted, we blank content but keep the record for threading
+
+
-- Create enum type for deletion reasons
+
CREATE TYPE deletion_reason AS ENUM ('author', 'moderator');
+
+
-- Add new columns to comments table
+
ALTER TABLE comments ADD COLUMN deletion_reason deletion_reason;
+
ALTER TABLE comments ADD COLUMN deleted_by TEXT;
+
+
-- Add comments for new columns
+
COMMENT ON COLUMN comments.deletion_reason IS 'Reason for deletion: author (user deleted), moderator (community mod removed)';
+
COMMENT ON COLUMN comments.deleted_by IS 'DID of the actor who performed the deletion';
+
+
-- Backfill existing deleted comments as author-deleted
+
-- This handles existing soft-deleted comments gracefully
+
UPDATE comments
+
SET deletion_reason = 'author',
+
deleted_by = commenter_did
+
WHERE deleted_at IS NOT NULL AND deletion_reason IS NULL;
+
+
-- Modify existing indexes to NOT filter deleted_at IS NULL
+
-- This allows deleted comments to appear in thread queries for structure preservation
+
-- Note: We drop and recreate to change the partial index condition
+
+
-- Drop old partial indexes that exclude deleted comments
+
DROP INDEX IF EXISTS idx_comments_root;
+
DROP INDEX IF EXISTS idx_comments_parent;
+
DROP INDEX IF EXISTS idx_comments_parent_score;
+
DROP INDEX IF EXISTS idx_comments_uri_active;
+
+
-- Recreate indexes without the deleted_at filter (include all comments for threading)
+
CREATE INDEX idx_comments_root ON comments(root_uri, created_at DESC);
+
CREATE INDEX idx_comments_parent ON comments(parent_uri, created_at DESC);
+
CREATE INDEX idx_comments_parent_score ON comments(parent_uri, score DESC, created_at DESC);
+
CREATE INDEX idx_comments_uri_lookup ON comments(uri);
+
+
-- Add index for querying by deletion_reason (for moderation dashboard)
+
CREATE INDEX idx_comments_deleted_reason ON comments(deletion_reason, deleted_at DESC)
+
WHERE deleted_at IS NOT NULL;
+
+
-- Add index for querying by deleted_by (for moderation audit/filtering)
+
CREATE INDEX idx_comments_deleted_by ON comments(deleted_by, deleted_at DESC)
+
WHERE deleted_at IS NOT NULL;
+
+
-- +goose Down
+
-- Remove deletion metadata columns and restore original indexes
+
+
DROP INDEX IF EXISTS idx_comments_deleted_by;
+
DROP INDEX IF EXISTS idx_comments_deleted_reason;
+
DROP INDEX IF EXISTS idx_comments_uri_lookup;
+
DROP INDEX IF EXISTS idx_comments_parent_score;
+
DROP INDEX IF EXISTS idx_comments_parent;
+
DROP INDEX IF EXISTS idx_comments_root;
+
+
-- Restore original partial indexes (excluding deleted comments)
+
CREATE INDEX idx_comments_root ON comments(root_uri, created_at DESC) WHERE deleted_at IS NULL;
+
CREATE INDEX idx_comments_parent ON comments(parent_uri, created_at DESC) WHERE deleted_at IS NULL;
+
CREATE INDEX idx_comments_parent_score ON comments(parent_uri, score DESC, created_at DESC) WHERE deleted_at IS NULL;
+
CREATE INDEX idx_comments_uri_active ON comments(uri) WHERE deleted_at IS NULL;
+
+
ALTER TABLE comments DROP COLUMN IF EXISTS deleted_by;
+
ALTER TABLE comments DROP COLUMN IF EXISTS deletion_reason;
+
+
DROP TYPE IF EXISTS deletion_reason;
+17 -13
internal/core/comments/view_models.go
···
// CommentView represents the full view of a comment with all metadata
// Matches social.coves.community.comment.getComments#commentView lexicon
// Used in thread views and get endpoints
+
// For deleted comments, IsDeleted=true and content-related fields are empty/nil
type CommentView struct {
-
Embed interface{} `json:"embed,omitempty"`
-
Record interface{} `json:"record"`
-
Viewer *CommentViewerState `json:"viewer,omitempty"`
-
Author *posts.AuthorView `json:"author"`
-
Post *CommentRef `json:"post"`
-
Parent *CommentRef `json:"parent,omitempty"`
-
Stats *CommentStats `json:"stats"`
-
Content string `json:"content"`
-
CreatedAt string `json:"createdAt"`
-
IndexedAt string `json:"indexedAt"`
-
URI string `json:"uri"`
-
CID string `json:"cid"`
-
ContentFacets []interface{} `json:"contentFacets,omitempty"`
+
Embed interface{} `json:"embed,omitempty"`
+
Record interface{} `json:"record"`
+
Viewer *CommentViewerState `json:"viewer,omitempty"`
+
Author *posts.AuthorView `json:"author"`
+
Post *CommentRef `json:"post"`
+
Parent *CommentRef `json:"parent,omitempty"`
+
Stats *CommentStats `json:"stats"`
+
Content string `json:"content"`
+
CreatedAt string `json:"createdAt"`
+
IndexedAt string `json:"indexedAt"`
+
URI string `json:"uri"`
+
CID string `json:"cid"`
+
ContentFacets []interface{} `json:"contentFacets,omitempty"`
+
IsDeleted bool `json:"isDeleted,omitempty"`
+
DeletionReason *string `json:"deletionReason,omitempty"`
+
DeletedAt *string `json:"deletedAt,omitempty"`
}
// ThreadViewComment represents a comment with its nested replies
+23 -1
internal/core/comments/interfaces.go
···
package comments
-
import "context"
+
import (
+
"context"
+
"database/sql"
+
)
// Repository defines the data access interface for comments
// Used by Jetstream consumer to index comments from firehose
···
// Delete soft-deletes a comment (sets deleted_at)
// Called by Jetstream consumer after comment is deleted from PDS
+
// Deprecated: Use SoftDeleteWithReason for new code to preserve thread structure
Delete(ctx context.Context, uri string) error
+
// SoftDeleteWithReason performs a soft delete that blanks content but preserves thread structure
+
// This allows deleted comments to appear as "[deleted]" placeholders in thread views
+
// reason: "author" (user deleted) or "moderator" (mod removed)
+
// deletedByDID: DID of the actor who performed the deletion
+
SoftDeleteWithReason(ctx context.Context, uri, reason, deletedByDID string) error
+
// ListByRoot retrieves all comments in a thread (flat)
// Used for fetching entire comment threads on posts
ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*Comment, error)
···
limitPerParent int,
) (map[string][]*Comment, error)
}
+
+
// RepositoryTx provides transaction-aware operations for consumers that need atomicity
+
// Used by Jetstream consumer to perform atomic delete + count updates
+
// Implementations that support transactions should also implement this interface
+
type RepositoryTx interface {
+
// SoftDeleteWithReasonTx performs a soft delete within a transaction
+
// If tx is nil, executes directly against the database
+
// Returns rows affected count for callers that need to check idempotency
+
// reason: must be DeletionReasonAuthor or DeletionReasonModerator
+
// deletedByDID: DID of the actor who performed the deletion
+
SoftDeleteWithReasonTx(ctx context.Context, tx *sql.Tx, uri, reason, deletedByDID string) (int64, error)
+
}
+87 -27
internal/db/postgres/comment_repo.go
···
id, uri, cid, rkey, commenter_did,
root_uri, root_cid, parent_uri, parent_cid,
content, content_facets, embed, content_labels, langs,
-
created_at, indexed_at, deleted_at,
+
created_at, indexed_at, deleted_at, deletion_reason, deleted_by,
upvote_count, downvote_count, score, reply_count
FROM comments
WHERE uri = $1
···
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
-
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
)
···
// Delete soft-deletes a comment (sets deleted_at)
// Called by Jetstream consumer after comment is deleted from PDS
// Idempotent: Returns success if comment already deleted
+
// Deprecated: Use SoftDeleteWithReason for new code to preserve thread structure
func (r *postgresCommentRepo) Delete(ctx context.Context, uri string) error {
query := `
UPDATE comments
···
return nil
}
-
// ListByRoot retrieves all active comments in a thread (flat)
+
// SoftDeleteWithReason performs a soft delete that blanks content but preserves thread structure
+
// This allows deleted comments to appear as "[deleted]" placeholders in thread views
+
// Idempotent: Returns success if comment already deleted
+
// Validates that reason is a known deletion reason constant
+
func (r *postgresCommentRepo) SoftDeleteWithReason(ctx context.Context, uri, reason, deletedByDID string) error {
+
// Validate deletion reason
+
if reason != comments.DeletionReasonAuthor && reason != comments.DeletionReasonModerator {
+
return fmt.Errorf("invalid deletion reason: %s", reason)
+
}
+
+
_, err := r.SoftDeleteWithReasonTx(ctx, nil, uri, reason, deletedByDID)
+
return err
+
}
+
+
// SoftDeleteWithReasonTx performs a soft delete within an optional transaction
+
// If tx is nil, executes directly against the database
+
// Returns rows affected count for callers that need to check idempotency
+
// This method is used by both the repository and the Jetstream consumer
+
func (r *postgresCommentRepo) SoftDeleteWithReasonTx(ctx context.Context, tx *sql.Tx, uri, reason, deletedByDID string) (int64, error) {
+
query := `
+
UPDATE comments
+
SET
+
content = '',
+
content_facets = NULL,
+
embed = NULL,
+
content_labels = NULL,
+
deleted_at = NOW(),
+
deletion_reason = $2,
+
deleted_by = $3
+
WHERE uri = $1 AND deleted_at IS NULL
+
`
+
+
var result sql.Result
+
var err error
+
+
if tx != nil {
+
result, err = tx.ExecContext(ctx, query, uri, reason, deletedByDID)
+
} else {
+
result, err = r.db.ExecContext(ctx, query, uri, reason, deletedByDID)
+
}
+
+
if err != nil {
+
return 0, fmt.Errorf("failed to soft delete comment: %w", err)
+
}
+
+
rowsAffected, err := result.RowsAffected()
+
if err != nil {
+
return 0, fmt.Errorf("failed to check delete result: %w", err)
+
}
+
+
return rowsAffected, nil
+
}
+
+
// ListByRoot retrieves all comments in a thread (flat), including deleted ones
// Used for fetching entire comment threads on posts
+
// Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders)
func (r *postgresCommentRepo) ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*comments.Comment, error) {
query := `
SELECT
id, uri, cid, rkey, commenter_did,
root_uri, root_cid, parent_uri, parent_cid,
content, content_facets, embed, content_labels, langs,
-
created_at, indexed_at, deleted_at,
+
created_at, indexed_at, deleted_at, deletion_reason, deleted_by,
upvote_count, downvote_count, score, reply_count
FROM comments
-
WHERE root_uri = $1 AND deleted_at IS NULL
+
WHERE root_uri = $1
ORDER BY created_at ASC
LIMIT $2 OFFSET $3
`
···
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
-
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
)
if err != nil {
···
return result, nil
}
-
// ListByParent retrieves direct replies to a post or comment
+
// ListByParent retrieves direct replies to a post or comment, including deleted ones
// Used for building nested/threaded comment views
+
// Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders)
func (r *postgresCommentRepo) ListByParent(ctx context.Context, parentURI string, limit, offset int) ([]*comments.Comment, error) {
query := `
SELECT
id, uri, cid, rkey, commenter_did,
root_uri, root_cid, parent_uri, parent_cid,
content, content_facets, embed, content_labels, langs,
-
created_at, indexed_at, deleted_at,
+
created_at, indexed_at, deleted_at, deletion_reason, deleted_by,
upvote_count, downvote_count, score, reply_count
FROM comments
-
WHERE parent_uri = $1 AND deleted_at IS NULL
+
WHERE parent_uri = $1
ORDER BY created_at ASC
LIMIT $2 OFFSET $3
`
···
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
-
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
)
if err != nil {
···
}
// ListByCommenter retrieves all active comments by a specific user
-
// Future: Used for user comment history
+
// Used for user comment history - filters out deleted comments
func (r *postgresCommentRepo) ListByCommenter(ctx context.Context, commenterDID string, limit, offset int) ([]*comments.Comment, error) {
query := `
SELECT
id, uri, cid, rkey, commenter_did,
root_uri, root_cid, parent_uri, parent_cid,
content, content_facets, embed, content_labels, langs,
-
created_at, indexed_at, deleted_at,
+
created_at, indexed_at, deleted_at, deletion_reason, deleted_by,
upvote_count, downvote_count, score, reply_count
FROM comments
WHERE commenter_did = $1 AND deleted_at IS NULL
···
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
-
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
)
if err != nil {
···
c.id, c.uri, c.cid, c.rkey, c.commenter_did,
c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
c.content, c.content_facets, c.embed, c.content_labels, c.langs,
-
c.created_at, c.indexed_at, c.deleted_at,
+
c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
c.upvote_count, c.downvote_count, c.score, c.reply_count,
log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank,
COALESCE(u.handle, c.commenter_did) as author_handle
···
c.id, c.uri, c.cid, c.rkey, c.commenter_did,
c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
c.content, c.content_facets, c.embed, c.content_labels, c.langs,
-
c.created_at, c.indexed_at, c.deleted_at,
+
c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
c.upvote_count, c.downvote_count, c.score, c.reply_count,
NULL::numeric as hot_rank,
COALESCE(u.handle, c.commenter_did) as author_handle
···
// Build complete query with JOINs and filters
// LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events)
+
// Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders)
query := fmt.Sprintf(`
%s
LEFT JOIN users u ON c.commenter_did = u.did
-
WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
+
WHERE c.parent_uri = $1
%s
%s
ORDER BY %s
···
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
-
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
&hotRank, &authorHandle,
)
···
// GetByURIsBatch retrieves multiple comments by their AT-URIs in a single query
// Returns map[uri]*Comment for efficient lookups without N+1 queries
+
// Includes deleted comments to preserve thread structure
func (r *postgresCommentRepo) GetByURIsBatch(ctx context.Context, uris []string) (map[string]*comments.Comment, error) {
if len(uris) == 0 {
return make(map[string]*comments.Comment), nil
···
// LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events)
// COALESCE falls back to DID when handle is NULL (user not yet in users table)
+
// Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders)
query := `
SELECT
c.id, c.uri, c.cid, c.rkey, c.commenter_did,
c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
c.content, c.content_facets, c.embed, c.content_labels, c.langs,
-
c.created_at, c.indexed_at, c.deleted_at,
+
c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
c.upvote_count, c.downvote_count, c.score, c.reply_count,
COALESCE(u.handle, c.commenter_did) as author_handle
FROM comments c
LEFT JOIN users u ON c.commenter_did = u.did
-
WHERE c.uri = ANY($1) AND c.deleted_at IS NULL
+
WHERE c.uri = ANY($1)
`
rows, err := r.db.QueryContext(ctx, query, pq.Array(uris))
···
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
-
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
&authorHandle,
)
···
c.id, c.uri, c.cid, c.rkey, c.commenter_did,
c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
c.content, c.content_facets, c.embed, c.content_labels, c.langs,
-
c.created_at, c.indexed_at, c.deleted_at,
+
c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
c.upvote_count, c.downvote_count, c.score, c.reply_count,
log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank,
COALESCE(u.handle, c.commenter_did) as author_handle`
···
c.id, c.uri, c.cid, c.rkey, c.commenter_did,
c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
c.content, c.content_facets, c.embed, c.content_labels, c.langs,
-
c.created_at, c.indexed_at, c.deleted_at,
+
c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
c.upvote_count, c.downvote_count, c.score, c.reply_count,
NULL::numeric as hot_rank,
COALESCE(u.handle, c.commenter_did) as author_handle`
···
c.id, c.uri, c.cid, c.rkey, c.commenter_did,
c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
c.content, c.content_facets, c.embed, c.content_labels, c.langs,
-
c.created_at, c.indexed_at, c.deleted_at,
+
c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
c.upvote_count, c.downvote_count, c.score, c.reply_count,
NULL::numeric as hot_rank,
COALESCE(u.handle, c.commenter_did) as author_handle`
···
c.id, c.uri, c.cid, c.rkey, c.commenter_did,
c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
c.content, c.content_facets, c.embed, c.content_labels, c.langs,
-
c.created_at, c.indexed_at, c.deleted_at,
+
c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
c.upvote_count, c.downvote_count, c.score, c.reply_count,
log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank,
COALESCE(u.handle, c.commenter_did) as author_handle`
···
// Use window function to limit results per parent
// This is more efficient than LIMIT in a subquery per parent
// LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events)
+
// Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders)
query := fmt.Sprintf(`
WITH ranked_comments AS (
SELECT
···
) as rn
FROM comments c
LEFT JOIN users u ON c.commenter_did = u.did
-
WHERE c.parent_uri = ANY($1) AND c.deleted_at IS NULL
+
WHERE c.parent_uri = ANY($1)
)
SELECT
id, uri, cid, rkey, commenter_did,
root_uri, root_cid, parent_uri, parent_cid,
content, content_facets, embed, content_labels, langs,
-
created_at, indexed_at, deleted_at,
+
created_at, indexed_at, deleted_at, deletion_reason, deleted_by,
upvote_count, downvote_count, score, reply_count,
hot_rank, author_handle
FROM ranked_comments
···
&comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
&comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
&comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
-
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
+
&comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
&comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
&hotRank, &authorHandle,
)
+5 -6
internal/core/comments/comment_service.go
···
CreatedAt: createdAt, // Preserve original timestamp
}
-
// Update the record on PDS (putRecord)
-
// Note: This creates a new CID even though the URI stays the same
-
// TODO: Use PutRecord instead of CreateRecord for proper update semantics with optimistic locking.
-
// PutRecord should accept the existing CID (existingRecord.CID) to ensure concurrent updates are detected.
-
// However, PutRecord is not yet implemented in internal/atproto/pds/client.go.
-
uri, cid, err := pdsClient.CreateRecord(ctx, commentCollection, rkey, updatedRecord)
+
// Update the record on PDS with optimistic locking via swapRecord CID
+
uri, cid, err := pdsClient.PutRecord(ctx, commentCollection, rkey, updatedRecord, existingRecord.CID)
if err != nil {
s.logger.Error("failed to update comment on PDS",
"error", err,
···
if pds.IsAuthError(err) {
return nil, ErrNotAuthorized
}
+
if errors.Is(err, pds.ErrConflict) {
+
return nil, ErrConcurrentModification
+
}
return nil, fmt.Errorf("failed to update comment: %w", err)
}
+2 -2
cmd/server/main.go
···
routes.RegisterTimelineRoutes(r, timelineService, voteService, authMiddleware)
log.Println("Timeline XRPC endpoints registered (requires authentication, includes viewer vote state)")
-
routes.RegisterDiscoverRoutes(r, discoverService)
-
log.Println("Discover XRPC endpoints registered (public, no auth required)")
+
routes.RegisterDiscoverRoutes(r, discoverService, voteService, authMiddleware)
+
log.Println("Discover XRPC endpoints registered (public with optional auth for viewer vote state)")
routes.RegisterAggregatorRoutes(r, aggregatorService, userService, identityResolver)
log.Println("Aggregator XRPC endpoints registered (query endpoints public, registration endpoint public)")
+73
internal/api/handlers/common/viewer_state.go
···
+
package common
+
+
import (
+
"Coves/internal/api/middleware"
+
"Coves/internal/core/posts"
+
"Coves/internal/core/votes"
+
"context"
+
"log"
+
"net/http"
+
)
+
+
// FeedPostProvider is implemented by any feed post wrapper that contains a PostView.
+
// This allows the helper to work with different feed post types (discover, timeline, communityFeed).
+
type FeedPostProvider interface {
+
GetPost() *posts.PostView
+
}
+
+
// PopulateViewerVoteState enriches feed posts with the authenticated user's vote state.
+
// This is a no-op if voteService is nil or the request is unauthenticated.
+
//
+
// Parameters:
+
// - ctx: Request context for PDS calls
+
// - r: HTTP request (used to extract OAuth session)
+
// - voteService: Vote service for cache lookup (may be nil)
+
// - feedPosts: Posts to enrich with viewer state (must implement FeedPostProvider)
+
//
+
// The function logs but does not fail on errors - viewer state is optional enrichment.
+
func PopulateViewerVoteState[T FeedPostProvider](
+
ctx context.Context,
+
r *http.Request,
+
voteService votes.Service,
+
feedPosts []T,
+
) {
+
if voteService == nil {
+
return
+
}
+
+
session := middleware.GetOAuthSession(r)
+
if session == nil {
+
return
+
}
+
+
userDID := middleware.GetUserDID(r)
+
+
// Ensure vote cache is populated from PDS
+
if err := voteService.EnsureCachePopulated(ctx, session); err != nil {
+
log.Printf("Warning: failed to populate vote cache: %v", err)
+
return
+
}
+
+
// Collect post URIs to batch lookup
+
postURIs := make([]string, 0, len(feedPosts))
+
for _, feedPost := range feedPosts {
+
if post := feedPost.GetPost(); post != nil {
+
postURIs = append(postURIs, post.URI)
+
}
+
}
+
+
// Get viewer votes for all posts
+
viewerVotes := voteService.GetViewerVotesForSubjects(userDID, postURIs)
+
+
// Populate viewer state on each post
+
for _, feedPost := range feedPosts {
+
if post := feedPost.GetPost(); post != nil {
+
if vote, exists := viewerVotes[post.URI]; exists {
+
post.Viewer = &posts.ViewerState{
+
Vote: &vote.Direction,
+
VoteURI: &vote.URI,
+
}
+
}
+
}
+
}
+
}
+11 -4
internal/api/handlers/discover/get_discover.go
···
package discover
import (
+
"Coves/internal/api/handlers/common"
"Coves/internal/core/discover"
"Coves/internal/core/posts"
+
"Coves/internal/core/votes"
"encoding/json"
"log"
"net/http"
···
// GetDiscoverHandler handles discover feed retrieval
type GetDiscoverHandler struct {
-
service discover.Service
+
service discover.Service
+
voteService votes.Service
}
// NewGetDiscoverHandler creates a new discover handler
-
func NewGetDiscoverHandler(service discover.Service) *GetDiscoverHandler {
+
func NewGetDiscoverHandler(service discover.Service, voteService votes.Service) *GetDiscoverHandler {
return &GetDiscoverHandler{
-
service: service,
+
service: service,
+
voteService: voteService,
}
}
// HandleGetDiscover retrieves posts from all communities (public feed)
// GET /xrpc/social.coves.feed.getDiscover?sort=hot&limit=15&cursor=...
-
// Public endpoint - no authentication required
+
// Public endpoint with optional auth - if authenticated, includes viewer vote state
func (h *GetDiscoverHandler) HandleGetDiscover(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
···
return
}
+
// Populate viewer vote state if authenticated
+
common.PopulateViewerVoteState(r.Context(), r, h.voteService, response.Feed)
+
// Transform blob refs to URLs for all posts
for _, feedPost := range response.Feed {
if feedPost.Post != nil {
+9 -4
internal/api/routes/discover.go
···
import (
"Coves/internal/api/handlers/discover"
+
"Coves/internal/api/middleware"
discoverCore "Coves/internal/core/discover"
+
"Coves/internal/core/votes"
"github.com/go-chi/chi/v5"
)
···
// RegisterDiscoverRoutes registers discover-related XRPC endpoints
//
// SECURITY & RATE LIMITING:
-
// - Discover feed is PUBLIC (no authentication required)
+
// - Discover feed is PUBLIC (works without authentication)
+
// - Optional auth: if authenticated, includes viewer vote state on posts
// - Protected by global rate limiter: 100 requests/minute per IP (main.go:84)
// - Query timeout enforced via context (prevents long-running queries)
// - Result limit capped at 50 posts per request (validated in service layer)
···
func RegisterDiscoverRoutes(
r chi.Router,
discoverService discoverCore.Service,
+
voteService votes.Service,
+
authMiddleware *middleware.OAuthAuthMiddleware,
) {
// Create handlers
-
getDiscoverHandler := discover.NewGetDiscoverHandler(discoverService)
+
getDiscoverHandler := discover.NewGetDiscoverHandler(discoverService, voteService)
// GET /xrpc/social.coves.feed.getDiscover
-
// Public endpoint - no authentication required
+
// Public endpoint with optional auth for viewer-specific state (vote state)
// Shows posts from ALL communities (not personalized)
// Rate limited: 100 req/min per IP via global middleware
-
r.Get("/xrpc/social.coves.feed.getDiscover", getDiscoverHandler.HandleGetDiscover)
+
r.With(authMiddleware.OptionalAuth).Get("/xrpc/social.coves.feed.getDiscover", getDiscoverHandler.HandleGetDiscover)
}
+5
internal/core/communityFeeds/types.go
···
Reply *ReplyRef `json:"reply,omitempty"` // Reply context
}
+
// GetPost returns the underlying PostView for viewer state enrichment
+
func (f *FeedViewPost) GetPost() *posts.PostView {
+
return f.Post
+
}
+
// FeedReason is a union type for feed context
// Can be reasonRepost or reasonPin
type FeedReason struct {
+5
internal/core/discover/types.go
···
Reply *ReplyRef `json:"reply,omitempty"`
}
+
// GetPost returns the underlying PostView for viewer state enrichment
+
func (f *FeedViewPost) GetPost() *posts.PostView {
+
return f.Post
+
}
+
// FeedReason is a union type for feed context
type FeedReason struct {
Repost *ReasonRepost `json:"-"`
+5
internal/core/timeline/types.go
···
Reply *ReplyRef `json:"reply,omitempty"` // Reply context
}
+
// GetPost returns the underlying PostView for viewer state enrichment
+
func (f *FeedViewPost) GetPost() *posts.PostView {
+
return f.Post
+
}
+
// FeedReason is a union type for feed context
// Future: Can be reasonRepost or reasonCommunity
type FeedReason struct {
+193 -5
tests/integration/discover_test.go
···
import (
"Coves/internal/api/handlers/discover"
+
"Coves/internal/api/middleware"
+
"Coves/internal/core/votes"
"Coves/internal/db/postgres"
"context"
"encoding/json"
···
discoverCore "Coves/internal/core/discover"
+
oauthlib "github.com/bluesky-social/indigo/atproto/auth/oauth"
+
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
+
// mockVoteService implements votes.Service for testing viewer vote state
+
type mockVoteService struct {
+
cachedVotes map[string]*votes.CachedVote // userDID:subjectURI -> vote
+
}
+
+
func newMockVoteService() *mockVoteService {
+
return &mockVoteService{
+
cachedVotes: make(map[string]*votes.CachedVote),
+
}
+
}
+
+
func (m *mockVoteService) AddVote(userDID, subjectURI, direction, voteURI string) {
+
key := userDID + ":" + subjectURI
+
m.cachedVotes[key] = &votes.CachedVote{
+
Direction: direction,
+
URI: voteURI,
+
}
+
}
+
+
func (m *mockVoteService) CreateVote(_ context.Context, _ *oauthlib.ClientSessionData, _ votes.CreateVoteRequest) (*votes.CreateVoteResponse, error) {
+
return &votes.CreateVoteResponse{}, nil
+
}
+
+
func (m *mockVoteService) DeleteVote(_ context.Context, _ *oauthlib.ClientSessionData, _ votes.DeleteVoteRequest) error {
+
return nil
+
}
+
+
func (m *mockVoteService) EnsureCachePopulated(_ context.Context, _ *oauthlib.ClientSessionData) error {
+
return nil // Mock always succeeds - votes pre-populated via AddVote
+
}
+
+
func (m *mockVoteService) GetViewerVote(userDID, subjectURI string) *votes.CachedVote {
+
key := userDID + ":" + subjectURI
+
return m.cachedVotes[key]
+
}
+
+
func (m *mockVoteService) GetViewerVotesForSubjects(userDID string, subjectURIs []string) map[string]*votes.CachedVote {
+
result := make(map[string]*votes.CachedVote)
+
for _, uri := range subjectURIs {
+
key := userDID + ":" + uri
+
if vote, exists := m.cachedVotes[key]; exists {
+
result[uri] = vote
+
}
+
}
+
return result
+
}
+
// TestGetDiscover_ShowsAllCommunities tests discover feed shows posts from ALL communities
func TestGetDiscover_ShowsAllCommunities(t *testing.T) {
if testing.Short() {
···
// Setup services
discoverRepo := postgres.NewDiscoverRepository(db, "test-cursor-secret")
discoverService := discoverCore.NewDiscoverService(discoverRepo)
-
handler := discover.NewGetDiscoverHandler(discoverService)
+
handler := discover.NewGetDiscoverHandler(discoverService, nil) // nil vote service - tests don't need vote state
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
discoverRepo := postgres.NewDiscoverRepository(db, "test-cursor-secret")
discoverService := discoverCore.NewDiscoverService(discoverRepo)
-
handler := discover.NewGetDiscoverHandler(discoverService)
+
handler := discover.NewGetDiscoverHandler(discoverService, nil) // nil vote service - tests don't need vote state
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
discoverRepo := postgres.NewDiscoverRepository(db, "test-cursor-secret")
discoverService := discoverCore.NewDiscoverService(discoverRepo)
-
handler := discover.NewGetDiscoverHandler(discoverService)
+
handler := discover.NewGetDiscoverHandler(discoverService, nil)
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
discoverRepo := postgres.NewDiscoverRepository(db, "test-cursor-secret")
discoverService := discoverCore.NewDiscoverService(discoverRepo)
-
handler := discover.NewGetDiscoverHandler(discoverService)
+
handler := discover.NewGetDiscoverHandler(discoverService, nil)
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
discoverRepo := postgres.NewDiscoverRepository(db, "test-cursor-secret")
discoverService := discoverCore.NewDiscoverService(discoverRepo)
-
handler := discover.NewGetDiscoverHandler(discoverService)
+
handler := discover.NewGetDiscoverHandler(discoverService, nil)
t.Run("Limit exceeds maximum", func(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "/xrpc/social.coves.feed.getDiscover?sort=new&limit=100", nil)
···
assert.Contains(t, errorResp["message"], "limit")
})
}
+
+
// TestGetDiscover_ViewerVoteState tests that authenticated users see their vote state on posts
+
func TestGetDiscover_ViewerVoteState(t *testing.T) {
+
if testing.Short() {
+
t.Skip("Skipping integration test in short mode")
+
}
+
+
db := setupTestDB(t)
+
t.Cleanup(func() { _ = db.Close() })
+
+
ctx := context.Background()
+
testID := time.Now().UnixNano()
+
+
// Create community and posts
+
communityDID, err := createFeedTestCommunity(db, ctx, fmt.Sprintf("votes-%d", testID), fmt.Sprintf("alice-%d.test", testID))
+
require.NoError(t, err)
+
+
post1URI := createTestPost(t, db, communityDID, "did:plc:author1", "Post with upvote", 10, time.Now().Add(-1*time.Hour))
+
post2URI := createTestPost(t, db, communityDID, "did:plc:author2", "Post with downvote", 5, time.Now().Add(-2*time.Hour))
+
_ = createTestPost(t, db, communityDID, "did:plc:author3", "Post without vote", 3, time.Now().Add(-3*time.Hour))
+
+
// Setup mock vote service with pre-populated votes
+
viewerDID := "did:plc:viewer123"
+
mockVotes := newMockVoteService()
+
mockVotes.AddVote(viewerDID, post1URI, "up", "at://"+viewerDID+"/social.coves.vote/vote1")
+
mockVotes.AddVote(viewerDID, post2URI, "down", "at://"+viewerDID+"/social.coves.vote/vote2")
+
+
// Setup handler with mock vote service
+
discoverRepo := postgres.NewDiscoverRepository(db, "test-cursor-secret")
+
discoverService := discoverCore.NewDiscoverService(discoverRepo)
+
handler := discover.NewGetDiscoverHandler(discoverService, mockVotes)
+
+
// Create request with authenticated user context
+
req := httptest.NewRequest(http.MethodGet, "/xrpc/social.coves.feed.getDiscover?sort=new&limit=50", nil)
+
+
// Inject OAuth session into context (simulates OptionalAuth middleware)
+
did, _ := syntax.ParseDID(viewerDID)
+
session := &oauthlib.ClientSessionData{
+
AccountDID: did,
+
AccessToken: "test_token",
+
}
+
reqCtx := context.WithValue(req.Context(), middleware.UserDIDKey, viewerDID)
+
reqCtx = context.WithValue(reqCtx, middleware.OAuthSessionKey, session)
+
req = req.WithContext(reqCtx)
+
+
rec := httptest.NewRecorder()
+
handler.HandleGetDiscover(rec, req)
+
+
// Assertions
+
assert.Equal(t, http.StatusOK, rec.Code)
+
+
var response discoverCore.DiscoverResponse
+
err = json.Unmarshal(rec.Body.Bytes(), &response)
+
require.NoError(t, err)
+
+
// Find our test posts and verify vote state
+
var foundPost1, foundPost2, foundPost3 bool
+
for _, feedPost := range response.Feed {
+
switch feedPost.Post.URI {
+
case post1URI:
+
foundPost1 = true
+
require.NotNil(t, feedPost.Post.Viewer, "Post1 should have viewer state")
+
require.NotNil(t, feedPost.Post.Viewer.Vote, "Post1 should have vote direction")
+
assert.Equal(t, "up", *feedPost.Post.Viewer.Vote, "Post1 should show upvote")
+
require.NotNil(t, feedPost.Post.Viewer.VoteURI, "Post1 should have vote URI")
+
assert.Contains(t, *feedPost.Post.Viewer.VoteURI, "vote1", "Post1 should have correct vote URI")
+
+
case post2URI:
+
foundPost2 = true
+
require.NotNil(t, feedPost.Post.Viewer, "Post2 should have viewer state")
+
require.NotNil(t, feedPost.Post.Viewer.Vote, "Post2 should have vote direction")
+
assert.Equal(t, "down", *feedPost.Post.Viewer.Vote, "Post2 should show downvote")
+
require.NotNil(t, feedPost.Post.Viewer.VoteURI, "Post2 should have vote URI")
+
+
default:
+
// Posts without votes should have nil Viewer or nil Vote
+
if feedPost.Post.Viewer != nil && feedPost.Post.Viewer.Vote != nil {
+
// This post has a vote from our viewer - it's not post3
+
continue
+
}
+
foundPost3 = true
+
}
+
}
+
+
assert.True(t, foundPost1, "Should find post1 with upvote")
+
assert.True(t, foundPost2, "Should find post2 with downvote")
+
assert.True(t, foundPost3, "Should find post3 without vote")
+
}
+
+
// TestGetDiscover_NoViewerStateWithoutAuth tests that unauthenticated users don't get viewer state
+
func TestGetDiscover_NoViewerStateWithoutAuth(t *testing.T) {
+
if testing.Short() {
+
t.Skip("Skipping integration test in short mode")
+
}
+
+
db := setupTestDB(t)
+
t.Cleanup(func() { _ = db.Close() })
+
+
ctx := context.Background()
+
testID := time.Now().UnixNano()
+
+
// Create community and post
+
communityDID, err := createFeedTestCommunity(db, ctx, fmt.Sprintf("noauth-%d", testID), fmt.Sprintf("alice-%d.test", testID))
+
require.NoError(t, err)
+
+
postURI := createTestPost(t, db, communityDID, "did:plc:author", "Some post", 10, time.Now())
+
+
// Setup mock vote service with a vote (but request will be unauthenticated)
+
mockVotes := newMockVoteService()
+
mockVotes.AddVote("did:plc:someuser", postURI, "up", "at://did:plc:someuser/social.coves.vote/vote1")
+
+
// Setup handler with mock vote service
+
discoverRepo := postgres.NewDiscoverRepository(db, "test-cursor-secret")
+
discoverService := discoverCore.NewDiscoverService(discoverRepo)
+
handler := discover.NewGetDiscoverHandler(discoverService, mockVotes)
+
+
// Create request WITHOUT auth context
+
req := httptest.NewRequest(http.MethodGet, "/xrpc/social.coves.feed.getDiscover?sort=new&limit=50", nil)
+
rec := httptest.NewRecorder()
+
handler.HandleGetDiscover(rec, req)
+
+
// Should succeed
+
assert.Equal(t, http.StatusOK, rec.Code)
+
+
var response discoverCore.DiscoverResponse
+
err = json.Unmarshal(rec.Body.Bytes(), &response)
+
require.NoError(t, err)
+
+
// Find our post and verify NO viewer state (unauthenticated)
+
for _, feedPost := range response.Feed {
+
if feedPost.Post.URI == postURI {
+
assert.Nil(t, feedPost.Post.Viewer, "Unauthenticated request should not have viewer state")
+
return
+
}
+
}
+
t.Fatal("Test post not found in response")
+
}
+11 -11
tests/integration/feed_test.go
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test data: community, users, and posts
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test data
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test data
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test data with many posts
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Request feed for non-existent community
req := httptest.NewRequest(http.MethodGet, "/xrpc/social.coves.communityFeed.getCommunity?community=did:plc:nonexistent&sort=hot&limit=10", nil)
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test community
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Create community with no posts
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test community
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test data
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test data
ctx := context.Background()
···
nil,
)
feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
-
handler := communityFeed.NewGetCommunityHandler(feedService)
+
handler := communityFeed.NewGetCommunityHandler(feedService, nil)
// Setup test data
ctx := context.Background()
+7 -7
tests/integration/timeline_test.go
···
// Setup services
timelineRepo := postgres.NewTimelineRepository(db, "test-cursor-secret")
timelineService := timelineCore.NewTimelineService(timelineRepo)
-
handler := timeline.NewGetTimelineHandler(timelineService)
+
handler := timeline.NewGetTimelineHandler(timelineService, nil)
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
timelineRepo := postgres.NewTimelineRepository(db, "test-cursor-secret")
timelineService := timelineCore.NewTimelineService(timelineRepo)
-
handler := timeline.NewGetTimelineHandler(timelineService)
+
handler := timeline.NewGetTimelineHandler(timelineService, nil)
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
timelineRepo := postgres.NewTimelineRepository(db, "test-cursor-secret")
timelineService := timelineCore.NewTimelineService(timelineRepo)
-
handler := timeline.NewGetTimelineHandler(timelineService)
+
handler := timeline.NewGetTimelineHandler(timelineService, nil)
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
timelineRepo := postgres.NewTimelineRepository(db, "test-cursor-secret")
timelineService := timelineCore.NewTimelineService(timelineRepo)
-
handler := timeline.NewGetTimelineHandler(timelineService)
+
handler := timeline.NewGetTimelineHandler(timelineService, nil)
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
timelineRepo := postgres.NewTimelineRepository(db, "test-cursor-secret")
timelineService := timelineCore.NewTimelineService(timelineRepo)
-
handler := timeline.NewGetTimelineHandler(timelineService)
+
handler := timeline.NewGetTimelineHandler(timelineService, nil)
// Request timeline WITHOUT auth context
req := httptest.NewRequest(http.MethodGet, "/xrpc/social.coves.feed.getTimeline?sort=new&limit=10", nil)
···
// Setup services
timelineRepo := postgres.NewTimelineRepository(db, "test-cursor-secret")
timelineService := timelineCore.NewTimelineService(timelineRepo)
-
handler := timeline.NewGetTimelineHandler(timelineService)
+
handler := timeline.NewGetTimelineHandler(timelineService, nil)
ctx := context.Background()
testID := time.Now().UnixNano()
···
// Setup services
timelineRepo := postgres.NewTimelineRepository(db, "test-cursor-secret")
timelineService := timelineCore.NewTimelineService(timelineRepo)
-
handler := timeline.NewGetTimelineHandler(timelineService)
+
handler := timeline.NewGetTimelineHandler(timelineService, nil)
ctx := context.Background()
testID := time.Now().UnixNano()
+1 -1
tests/integration/user_journey_e2e_test.go
···
r := chi.NewRouter()
routes.RegisterCommunityRoutes(r, communityService, e2eAuth.OAuthAuthMiddleware, nil) // nil = allow all community creators
routes.RegisterPostRoutes(r, postService, e2eAuth.OAuthAuthMiddleware)
-
routes.RegisterTimelineRoutes(r, timelineService, e2eAuth.OAuthAuthMiddleware)
+
routes.RegisterTimelineRoutes(r, timelineService, nil, e2eAuth.OAuthAuthMiddleware)
httpServer := httptest.NewServer(r)
defer httpServer.Close()