A community based topic aggregation platform built on atproto

feat(votes): add vote service with PDS-first architecture

Implements vote service that queries PDS directly for existing votes,
avoiding eventual consistency issues with the AppView database.

Key features:
- CreateVote with toggle behavior (same direction = delete)
- DeleteVote for explicit vote removal
- getVoteFromPDS: Paginates through all vote records to handle >100 votes
- Proper auth error handling (401/403 -> ErrNotAuthorized)

Architecture:
- Queries PDS via com.atproto.repo.listRecords (source of truth)
- Writes via com.atproto.repo.createRecord/deleteRecord
- AppView indexes from Jetstream for aggregate counts only

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+478
internal
+69
internal/core/votes/service.go
···
+
package votes
+
+
import (
+
"context"
+
+
oauthlib "github.com/bluesky-social/indigo/atproto/auth/oauth"
+
)
+
+
// Service defines the business logic interface for vote operations
+
// Implements write-forward pattern: validates requests, then forwards to user's PDS
+
//
+
// Architecture:
+
// - Service validates input and checks authorization
+
// - Queries user's PDS directly via com.atproto.repo.listRecords to check existing votes
+
// (avoids eventual consistency issues with AppView database)
+
// - Creates/deletes vote records via com.atproto.repo.createRecord/deleteRecord
+
// - AppView indexes resulting records from Jetstream firehose for aggregate counts
+
type Service interface {
+
// CreateVote creates a new vote or toggles off an existing vote
+
// Returns URI and CID of created vote, or empty strings if toggled off
+
//
+
// Validation:
+
// - Direction must be "up" or "down" (returns ErrInvalidDirection)
+
// - Subject URI must be valid AT-URI (returns ErrInvalidSubject)
+
// - Subject must exist (returns ErrSubjectNotFound)
+
//
+
// Behavior:
+
// - If no vote exists: creates new vote with given direction
+
// - If vote exists with same direction: deletes vote (toggle off)
+
// - If vote exists with different direction: updates to new direction
+
CreateVote(ctx context.Context, session *oauthlib.ClientSessionData, req CreateVoteRequest) (*CreateVoteResponse, error)
+
+
// DeleteVote removes a vote on the specified subject
+
//
+
// Validation:
+
// - Subject URI must be valid AT-URI (returns ErrInvalidSubject)
+
// - Vote must exist (returns ErrVoteNotFound)
+
//
+
// Behavior:
+
// - Deletes the user's vote record from their PDS
+
// - AppView will soft-delete via Jetstream consumer
+
DeleteVote(ctx context.Context, session *oauthlib.ClientSessionData, req DeleteVoteRequest) error
+
}
+
+
// CreateVoteRequest contains the parameters for creating a vote
+
type CreateVoteRequest struct {
+
// Subject is the post or comment being voted on
+
Subject StrongRef `json:"subject"`
+
+
// Direction is either "up" or "down"
+
Direction string `json:"direction"`
+
}
+
+
// CreateVoteResponse contains the result of creating a vote
+
type CreateVoteResponse struct {
+
// URI is the AT-URI of the created vote record
+
// Empty string if vote was toggled off (deleted)
+
URI string `json:"uri"`
+
+
// CID is the content identifier of the created vote record
+
// Empty string if vote was toggled off (deleted)
+
CID string `json:"cid"`
+
}
+
+
// DeleteVoteRequest contains the parameters for deleting a vote
+
type DeleteVoteRequest struct {
+
// Subject is the post or comment whose vote should be removed
+
Subject StrongRef `json:"subject"`
+
}
+409
internal/core/votes/service_impl.go
···
+
package votes
+
+
import (
+
"bytes"
+
"context"
+
"encoding/json"
+
"fmt"
+
"io"
+
"log/slog"
+
"net/http"
+
"strings"
+
"time"
+
+
"github.com/bluesky-social/indigo/atproto/auth/oauth"
+
"github.com/bluesky-social/indigo/atproto/syntax"
+
+
oauthclient "Coves/internal/atproto/oauth"
+
)
+
+
// voteService implements the Service interface for vote operations
+
type voteService struct {
+
repo Repository
+
oauthClient *oauthclient.OAuthClient
+
oauthStore oauth.ClientAuthStore
+
logger *slog.Logger
+
}
+
+
// NewService creates a new vote service instance
+
func NewService(repo Repository, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, logger *slog.Logger) Service {
+
if logger == nil {
+
logger = slog.Default()
+
}
+
return &voteService{
+
repo: repo,
+
oauthClient: oauthClient,
+
oauthStore: oauthStore,
+
logger: logger,
+
}
+
}
+
+
// CreateVote creates a new vote or toggles off an existing vote
+
// Implements the toggle behavior:
+
// - No existing vote → Create new vote with given direction
+
// - Vote exists with same direction → Delete vote (toggle off)
+
// - Vote exists with different direction → Update to new direction
+
func (s *voteService) CreateVote(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (*CreateVoteResponse, error) {
+
// Validate direction
+
if req.Direction != "up" && req.Direction != "down" {
+
return nil, ErrInvalidDirection
+
}
+
+
// Validate subject URI format
+
if req.Subject.URI == "" {
+
return nil, ErrInvalidSubject
+
}
+
if !strings.HasPrefix(req.Subject.URI, "at://") {
+
return nil, ErrInvalidSubject
+
}
+
+
// Validate subject CID is provided
+
if req.Subject.CID == "" {
+
return nil, ErrInvalidSubject
+
}
+
+
// Check for existing vote by querying PDS directly (source of truth)
+
// This avoids eventual consistency issues with the AppView database
+
existing, err := s.getVoteFromPDS(ctx, session, req.Subject.URI)
+
if err != nil {
+
s.logger.Error("failed to check existing vote on PDS",
+
"error", err,
+
"voter", session.AccountDID,
+
"subject", req.Subject.URI)
+
return nil, fmt.Errorf("failed to check existing vote: %w", err)
+
}
+
+
// Toggle logic
+
if existing != nil {
+
// Vote exists - check if same direction
+
if existing.Direction == req.Direction {
+
// Same direction - toggle off (delete)
+
if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil {
+
s.logger.Error("failed to delete vote on PDS",
+
"error", err,
+
"voter", session.AccountDID,
+
"rkey", existing.RKey)
+
return nil, fmt.Errorf("failed to delete vote: %w", err)
+
}
+
+
s.logger.Info("vote toggled off",
+
"voter", session.AccountDID,
+
"subject", req.Subject.URI,
+
"direction", req.Direction)
+
+
// Return empty response to indicate deletion
+
return &CreateVoteResponse{
+
URI: "",
+
CID: "",
+
}, nil
+
}
+
+
// Different direction - delete old vote first, then create new one
+
if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil {
+
s.logger.Error("failed to delete existing vote on PDS",
+
"error", err,
+
"voter", session.AccountDID,
+
"rkey", existing.RKey)
+
return nil, fmt.Errorf("failed to delete existing vote: %w", err)
+
}
+
+
s.logger.Info("deleted existing vote before creating new direction",
+
"voter", session.AccountDID,
+
"subject", req.Subject.URI,
+
"old_direction", existing.Direction,
+
"new_direction", req.Direction)
+
}
+
+
// Create new vote
+
uri, cid, err := s.createVoteRecord(ctx, session, req)
+
if err != nil {
+
s.logger.Error("failed to create vote on PDS",
+
"error", err,
+
"voter", session.AccountDID,
+
"subject", req.Subject.URI,
+
"direction", req.Direction)
+
return nil, fmt.Errorf("failed to create vote: %w", err)
+
}
+
+
s.logger.Info("vote created",
+
"voter", session.AccountDID,
+
"subject", req.Subject.URI,
+
"direction", req.Direction,
+
"uri", uri,
+
"cid", cid)
+
+
return &CreateVoteResponse{
+
URI: uri,
+
CID: cid,
+
}, nil
+
}
+
+
// DeleteVote removes a vote on the specified subject
+
func (s *voteService) DeleteVote(ctx context.Context, session *oauth.ClientSessionData, req DeleteVoteRequest) error {
+
// Validate subject URI format
+
if req.Subject.URI == "" {
+
return ErrInvalidSubject
+
}
+
if !strings.HasPrefix(req.Subject.URI, "at://") {
+
return ErrInvalidSubject
+
}
+
+
// Find existing vote by querying PDS directly (source of truth)
+
// This avoids eventual consistency issues with the AppView database
+
existing, err := s.getVoteFromPDS(ctx, session, req.Subject.URI)
+
if err != nil {
+
s.logger.Error("failed to find vote on PDS",
+
"error", err,
+
"voter", session.AccountDID,
+
"subject", req.Subject.URI)
+
return fmt.Errorf("failed to find vote: %w", err)
+
}
+
if existing == nil {
+
return ErrVoteNotFound
+
}
+
+
// Delete the vote record from user's PDS
+
if err := s.deleteVoteRecord(ctx, session, existing.RKey); err != nil {
+
s.logger.Error("failed to delete vote on PDS",
+
"error", err,
+
"voter", session.AccountDID,
+
"rkey", existing.RKey)
+
return fmt.Errorf("failed to delete vote: %w", err)
+
}
+
+
s.logger.Info("vote deleted",
+
"voter", session.AccountDID,
+
"subject", req.Subject.URI,
+
"uri", existing.URI)
+
+
return nil
+
}
+
+
// createVoteRecord writes a vote record to the user's PDS
+
func (s *voteService) createVoteRecord(ctx context.Context, session *oauth.ClientSessionData, req CreateVoteRequest) (string, string, error) {
+
// Generate TID for the record key
+
tid := syntax.NewTIDNow(0)
+
+
// Build vote record following the lexicon schema
+
record := VoteRecord{
+
Type: "social.coves.feed.vote",
+
Subject: StrongRef{
+
URI: req.Subject.URI,
+
CID: req.Subject.CID,
+
},
+
Direction: req.Direction,
+
CreatedAt: time.Now().UTC().Format(time.RFC3339),
+
}
+
+
// Call com.atproto.repo.createRecord on the user's PDS
+
endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(session.HostURL, "/"))
+
+
payload := map[string]interface{}{
+
"repo": session.AccountDID.String(),
+
"collection": "social.coves.feed.vote",
+
"rkey": tid.String(),
+
"record": record,
+
}
+
+
uri, cid, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, session.AccessToken)
+
if err != nil {
+
return "", "", err
+
}
+
+
return uri, cid, nil
+
}
+
+
// getVoteFromPDS queries the user's PDS directly to find an existing vote for a subject.
+
// This avoids eventual consistency issues with the AppView database populated by Jetstream.
+
// Paginates through all vote records to handle users with >100 votes.
+
// Returns the vote record with rkey, or nil if no vote exists for the subject.
+
func (s *voteService) getVoteFromPDS(ctx context.Context, session *oauth.ClientSessionData, subjectURI string) (*existingVote, error) {
+
baseURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=social.coves.feed.vote&limit=100",
+
strings.TrimSuffix(session.HostURL, "/"),
+
session.AccountDID.String())
+
+
client := &http.Client{Timeout: 10 * time.Second}
+
cursor := ""
+
+
// Paginate through all vote records
+
for {
+
endpoint := baseURL
+
if cursor != "" {
+
endpoint = fmt.Sprintf("%s&cursor=%s", baseURL, cursor)
+
}
+
+
req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil)
+
if err != nil {
+
return nil, fmt.Errorf("failed to create request: %w", err)
+
}
+
req.Header.Set("Authorization", "Bearer "+session.AccessToken)
+
+
resp, err := client.Do(req)
+
if err != nil {
+
return nil, fmt.Errorf("failed to call PDS: %w", err)
+
}
+
+
body, err := io.ReadAll(resp.Body)
+
closeErr := resp.Body.Close()
+
if closeErr != nil {
+
s.logger.Warn("failed to close response body", "error", closeErr)
+
}
+
if err != nil {
+
return nil, fmt.Errorf("failed to read response: %w", err)
+
}
+
+
// Handle auth errors - map to ErrNotAuthorized per lexicon
+
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
+
s.logger.Warn("PDS auth failure",
+
"status", resp.StatusCode,
+
"did", session.AccountDID)
+
return nil, ErrNotAuthorized
+
}
+
+
if resp.StatusCode != http.StatusOK {
+
return nil, fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
+
}
+
+
// Parse the listRecords response
+
var result struct {
+
Records []struct {
+
URI string `json:"uri"`
+
CID string `json:"cid"`
+
Value struct {
+
Type string `json:"$type"`
+
Subject struct {
+
URI string `json:"uri"`
+
CID string `json:"cid"`
+
} `json:"subject"`
+
Direction string `json:"direction"`
+
CreatedAt string `json:"createdAt"`
+
} `json:"value"`
+
} `json:"records"`
+
Cursor string `json:"cursor"`
+
}
+
+
if err := json.Unmarshal(body, &result); err != nil {
+
return nil, fmt.Errorf("failed to parse PDS response: %w", err)
+
}
+
+
// Search for the vote matching our subject in this page
+
for _, rec := range result.Records {
+
if rec.Value.Subject.URI == subjectURI {
+
// Extract rkey from the URI (at://did/collection/rkey)
+
parts := strings.Split(rec.URI, "/")
+
if len(parts) < 5 {
+
continue
+
}
+
rkey := parts[len(parts)-1]
+
+
return &existingVote{
+
URI: rec.URI,
+
CID: rec.CID,
+
RKey: rkey,
+
Direction: rec.Value.Direction,
+
}, nil
+
}
+
}
+
+
// Check if there are more pages
+
if result.Cursor == "" {
+
break // No more pages
+
}
+
cursor = result.Cursor
+
}
+
+
// No vote found for this subject after checking all pages
+
return nil, nil
+
}
+
+
// existingVote represents a vote record found on the PDS
+
type existingVote struct {
+
URI string
+
CID string
+
RKey string
+
Direction string
+
}
+
+
// deleteVoteRecord removes a vote record from the user's PDS
+
func (s *voteService) deleteVoteRecord(ctx context.Context, session *oauth.ClientSessionData, rkey string) error {
+
// Call com.atproto.repo.deleteRecord on the user's PDS
+
endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(session.HostURL, "/"))
+
+
payload := map[string]interface{}{
+
"repo": session.AccountDID.String(),
+
"collection": "social.coves.feed.vote",
+
"rkey": rkey,
+
}
+
+
_, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, session.AccessToken)
+
return err
+
}
+
+
// callPDSWithAuth makes an authenticated HTTP call to the PDS
+
// Returns URI and CID from the response (for create/update operations)
+
func (s *voteService) callPDSWithAuth(ctx context.Context, method, endpoint string, payload map[string]interface{}, accessToken string) (string, string, error) {
+
jsonData, err := json.Marshal(payload)
+
if err != nil {
+
return "", "", fmt.Errorf("failed to marshal payload: %w", err)
+
}
+
+
req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData))
+
if err != nil {
+
return "", "", fmt.Errorf("failed to create request: %w", err)
+
}
+
req.Header.Set("Content-Type", "application/json")
+
+
// Add OAuth bearer token for authentication
+
if accessToken != "" {
+
req.Header.Set("Authorization", "Bearer "+accessToken)
+
}
+
+
// Set reasonable timeout for PDS operations
+
timeout := 10 * time.Second
+
if strings.Contains(endpoint, "createRecord") || strings.Contains(endpoint, "putRecord") {
+
timeout = 15 * time.Second // Slightly longer for write operations
+
}
+
+
client := &http.Client{Timeout: timeout}
+
resp, err := client.Do(req)
+
if err != nil {
+
return "", "", fmt.Errorf("failed to call PDS: %w", err)
+
}
+
defer func() {
+
if closeErr := resp.Body.Close(); closeErr != nil {
+
s.logger.Warn("failed to close response body", "error", closeErr)
+
}
+
}()
+
+
body, err := io.ReadAll(resp.Body)
+
if err != nil {
+
return "", "", fmt.Errorf("failed to read response: %w", err)
+
}
+
+
// Handle auth errors - map to ErrNotAuthorized per lexicon
+
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
+
s.logger.Warn("PDS auth failure during write operation",
+
"status", resp.StatusCode,
+
"endpoint", endpoint)
+
return "", "", ErrNotAuthorized
+
}
+
+
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
+
return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
+
}
+
+
// Parse response to extract URI and CID (for create/update operations)
+
var result struct {
+
URI string `json:"uri"`
+
CID string `json:"cid"`
+
}
+
if err := json.Unmarshal(body, &result); err != nil {
+
// For delete operations, there might not be a response body with URI/CID
+
if method == "POST" && strings.Contains(endpoint, "deleteRecord") {
+
return "", "", nil
+
}
+
return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
+
}
+
+
return result.URI, result.CID, nil
+
}