A community based topic aggregation platform built on atproto

refactor(votes): remove subject validation for atproto compatibility

Remove SubjectValidator and SubjectNotFound error - votes on non-existent
or deleted subjects are now allowed. This aligns with atproto's design:

- User's PDS accepts the vote regardless (they own their repo)
- Jetstream emits the event regardless
- AppView consumer correctly handles orphaned votes by only updating
counts for non-deleted subjects

Benefits:
- Reduced latency (no extra DB queries per vote)
- No race conditions (subject could be deleted between validation and PDS write)
- No eventual consistency issues (subject might not be indexed yet)
- Simpler code and fewer failure modes

Changes:
- Remove SubjectValidator interface and CompositeSubjectValidator
- Remove ErrSubjectNotFound from errors and lexicon
- Update NewService signature to remove validator parameter
- Update tests to remove SubjectNotFound test cases

šŸ¤– Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+118 -177
.beads
cmd
server
internal
tests
integration
+3
.beads/beads.left.jsonl
···
···
+
{"id":"Coves-95q","content_hash":"8ec99d598f067780436b985f9ad57f0fa19632026981038df4f65f192186620b","title":"Add comprehensive API documentation","description":"","status":"open","priority":2,"issue_type":"task","created_at":"2025-11-17T20:30:34.835721854-08:00","updated_at":"2025-11-17T20:30:34.835721854-08:00","source_repo":".","dependencies":[{"issue_id":"Coves-95q","depends_on_id":"Coves-e16","type":"blocks","created_at":"2025-11-17T20:30:46.273899399-08:00","created_by":"daemon"}]}
+
{"id":"Coves-e16","content_hash":"7c5d0fc8f0e7f626be3dad62af0e8412467330bad01a244e5a7e52ac5afff1c1","title":"Complete post creation and moderation features","description":"","status":"open","priority":1,"issue_type":"feature","created_at":"2025-11-17T20:30:12.885991306-08:00","updated_at":"2025-11-17T20:30:12.885991306-08:00","source_repo":"."}
+
{"id":"Coves-fce","content_hash":"26b3e16b99f827316ee0d741cc959464bd0c813446c95aef8105c7fd1e6b09ff","title":"Implement aggregator feed federation","description":"","status":"open","priority":1,"issue_type":"feature","created_at":"2025-11-17T20:30:21.453326012-08:00","updated_at":"2025-11-17T20:30:21.453326012-08:00","source_repo":"."}
+1
.beads/beads.left.meta.json
···
···
+
{"version":"0.23.1","timestamp":"2025-12-02T18:25:24.009187871-08:00","commit":"00d7d8d"}
+5 -28
cmd/server/main.go
···
commentRepo := postgresRepo.NewCommentRepository(db)
log.Println("āœ… Comment repository initialized (Jetstream indexing only)")
-
// Initialize subject validator for votes (checks posts and comments exist)
-
subjectValidator := votes.NewCompositeSubjectValidator(
-
// Post existence checker
-
func(ctx context.Context, uri string) (bool, error) {
-
_, err := postRepo.GetByURI(ctx, uri)
-
if err != nil {
-
if err == posts.ErrNotFound {
-
return false, nil
-
}
-
return false, err
-
}
-
return true, nil
-
},
-
// Comment existence checker
-
func(ctx context.Context, uri string) (bool, error) {
-
_, err := commentRepo.GetByURI(ctx, uri)
-
if err != nil {
-
if err == comments.ErrCommentNotFound {
-
return false, nil
-
}
-
return false, err
-
}
-
return true, nil
-
},
-
)
-
// Initialize vote service (for XRPC API endpoints)
-
voteService := votes.NewService(voteRepo, subjectValidator, oauthClient, oauthStore, nil)
-
log.Println("āœ… Vote service initialized (with OAuth authentication and subject validation)")
// Initialize comment service (for query API)
// Requires user and community repos for proper author/community hydration per lexicon
···
commentRepo := postgresRepo.NewCommentRepository(db)
log.Println("āœ… Comment repository initialized (Jetstream indexing only)")
// Initialize vote service (for XRPC API endpoints)
+
// Note: We don't validate subject existence - the vote goes to the user's PDS regardless.
+
// The Jetstream consumer handles orphaned votes correctly by only updating counts for
+
// non-deleted subjects. This avoids race conditions and eventual consistency issues.
+
voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
+
log.Println("āœ… Vote service initialized (with OAuth authentication)")
// Initialize comment service (for query API)
// Requires user and community repos for proper author/community hydration per lexicon
+2 -8
internal/api/handlers/vote/create_vote_test.go
···
func TestCreateVoteHandler_ServiceError(t *testing.T) {
tests := []struct {
name string
-
serviceError error
expectedStatus int
-
expectedError string
}{
-
{
-
name: "subject not found",
-
serviceError: votes.ErrSubjectNotFound,
-
expectedStatus: http.StatusNotFound,
-
expectedError: "SubjectNotFound", // Per lexicon: social.coves.feed.vote.create#SubjectNotFound
-
},
{
name: "invalid direction",
serviceError: votes.ErrInvalidDirection,
···
func TestCreateVoteHandler_ServiceError(t *testing.T) {
tests := []struct {
+
serviceError error
name string
+
expectedError string
expectedStatus int
}{
{
name: "invalid direction",
serviceError: votes.ErrInvalidDirection,
+2 -8
internal/api/handlers/vote/delete_vote_test.go
···
func TestDeleteVoteHandler_ServiceError(t *testing.T) {
tests := []struct {
-
name string
serviceError error
-
expectedStatus int
expectedError string
}{
{
name: "vote not found",
serviceError: votes.ErrVoteNotFound,
expectedStatus: http.StatusNotFound,
expectedError: "VoteNotFound", // Per lexicon: social.coves.feed.vote.delete#VoteNotFound
-
},
-
{
-
name: "subject not found",
-
serviceError: votes.ErrSubjectNotFound,
-
expectedStatus: http.StatusNotFound,
-
expectedError: "SubjectNotFound", // Per lexicon: social.coves.feed.vote.create#SubjectNotFound
},
{
name: "invalid subject",
···
func TestDeleteVoteHandler_ServiceError(t *testing.T) {
tests := []struct {
serviceError error
+
name string
expectedError string
+
expectedStatus int
}{
{
name: "vote not found",
serviceError: votes.ErrVoteNotFound,
expectedStatus: http.StatusNotFound,
expectedError: "VoteNotFound", // Per lexicon: social.coves.feed.vote.delete#VoteNotFound
},
{
name: "invalid subject",
-3
internal/api/handlers/vote/errors.go
···
case errors.Is(err, votes.ErrVoteNotFound):
// Matches: social.coves.feed.vote.delete#VoteNotFound
writeError(w, http.StatusNotFound, "VoteNotFound", "No vote found for this subject")
-
case errors.Is(err, votes.ErrSubjectNotFound):
-
// Matches: social.coves.feed.vote.create#SubjectNotFound
-
writeError(w, http.StatusNotFound, "SubjectNotFound", "The subject post or comment was not found")
case errors.Is(err, votes.ErrInvalidDirection):
writeError(w, http.StatusBadRequest, "InvalidRequest", "Vote direction must be 'up' or 'down'")
case errors.Is(err, votes.ErrInvalidSubject):
···
case errors.Is(err, votes.ErrVoteNotFound):
// Matches: social.coves.feed.vote.delete#VoteNotFound
writeError(w, http.StatusNotFound, "VoteNotFound", "No vote found for this subject")
case errors.Is(err, votes.ErrInvalidDirection):
writeError(w, http.StatusBadRequest, "InvalidRequest", "Vote direction must be 'up' or 'down'")
case errors.Is(err, votes.ErrInvalidSubject):
-4
internal/atproto/lexicon/social/coves/feed/vote/create.json
···
},
"errors": [
{
-
"name": "SubjectNotFound",
-
"description": "The subject post or comment was not found"
-
},
-
{
"name": "NotAuthorized",
"description": "User is not authorized to vote on this content"
},
···
},
"errors": [
{
"name": "NotAuthorized",
"description": "User is not authorized to vote on this content"
},
+4 -4
internal/atproto/oauth/handlers_security.go
···
// - Android: Verified via /.well-known/assetlinks.json
var allowedMobileRedirectURIs = map[string]bool{
// Custom scheme per atproto spec (reverse-domain of coves.social)
-
"social.coves:/callback": true,
-
"social.coves://callback": true, // Some platforms add double slash
-
"social.coves:/oauth/callback": true, // Alternative path
-
"social.coves://oauth/callback": true,
// Universal Links - cryptographically bound to app (preferred for security)
"https://coves.social/app/oauth/callback": true,
}
···
// - Android: Verified via /.well-known/assetlinks.json
var allowedMobileRedirectURIs = map[string]bool{
// Custom scheme per atproto spec (reverse-domain of coves.social)
+
"social.coves:/callback": true,
+
"social.coves://callback": true, // Some platforms add double slash
+
"social.coves:/oauth/callback": true, // Alternative path
+
"social.coves://oauth/callback": true,
// Universal Links - cryptographically bound to app (preferred for security)
"https://coves.social/app/oauth/callback": true,
}
-3
internal/core/votes/errors.go
···
// ErrVoteNotFound indicates the requested vote doesn't exist
ErrVoteNotFound = errors.New("vote not found")
-
// ErrSubjectNotFound indicates the post/comment being voted on doesn't exist
-
ErrSubjectNotFound = errors.New("subject not found")
-
// ErrInvalidDirection indicates the vote direction is not "up" or "down"
ErrInvalidDirection = errors.New("invalid vote direction: must be 'up' or 'down'")
···
// ErrVoteNotFound indicates the requested vote doesn't exist
ErrVoteNotFound = errors.New("vote not found")
// ErrInvalidDirection indicates the vote direction is not "up" or "down"
ErrInvalidDirection = errors.New("invalid vote direction: must be 'up' or 'down'")
+10 -6
internal/core/votes/service.go
···
// Implements write-forward pattern: validates requests, then forwards to user's PDS
//
// Architecture:
-
// - Service validates input and checks authorization
-
// - Queries user's PDS directly via com.atproto.repo.listRecords to check existing votes
-
// (avoids eventual consistency issues with AppView database)
-
// - Creates/deletes vote records via com.atproto.repo.createRecord/deleteRecord
-
// - AppView indexes resulting records from Jetstream firehose for aggregate counts
type Service interface {
// CreateVote creates a new vote or toggles off an existing vote
// Returns URI and CID of created vote, or empty strings if toggled off
···
// Validation:
// - Direction must be "up" or "down" (returns ErrInvalidDirection)
// - Subject URI must be valid AT-URI (returns ErrInvalidSubject)
-
// - Subject must exist (returns ErrSubjectNotFound)
//
// Behavior:
// - If no vote exists: creates new vote with given direction
···
// Implements write-forward pattern: validates requests, then forwards to user's PDS
//
// Architecture:
+
// - Service validates input and checks authorization
+
// - Queries user's PDS directly via com.atproto.repo.listRecords to check existing votes
+
// (avoids eventual consistency issues with AppView database)
+
// - Creates/deletes vote records via com.atproto.repo.createRecord/deleteRecord
+
// - AppView indexes resulting records from Jetstream firehose for aggregate counts
type Service interface {
// CreateVote creates a new vote or toggles off an existing vote
// Returns URI and CID of created vote, or empty strings if toggled off
···
// Validation:
// - Direction must be "up" or "down" (returns ErrInvalidDirection)
// - Subject URI must be valid AT-URI (returns ErrInvalidSubject)
+
// - Subject CID must be provided (returns ErrInvalidSubject)
+
//
+
// Note: Subject existence is NOT validated. Votes on non-existent or deleted
+
// subjects are allowed - the Jetstream consumer handles orphaned votes correctly
+
// by only updating counts for non-deleted subjects.
//
// Behavior:
// - If no vote exists: creates new vote with given direction
+14 -27
internal/core/votes/service_impl.go
···
// voteService implements the Service interface for vote operations
type voteService struct {
-
repo Repository
-
subjectValidator SubjectValidator
-
oauthClient *oauthclient.OAuthClient
-
oauthStore oauth.ClientAuthStore
-
logger *slog.Logger
}
// NewService creates a new vote service instance
-
// subjectValidator can be nil to skip subject existence checks (not recommended for production)
-
func NewService(repo Repository, subjectValidator SubjectValidator, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, logger *slog.Logger) Service {
if logger == nil {
logger = slog.Default()
}
return &voteService{
-
repo: repo,
-
subjectValidator: subjectValidator,
-
oauthClient: oauthClient,
-
oauthStore: oauthStore,
-
logger: logger,
}
}
···
return nil, ErrInvalidSubject
}
-
// Validate subject exists in AppView (post or comment)
-
// This prevents creating votes on non-existent content
-
if s.subjectValidator != nil {
-
exists, err := s.subjectValidator.SubjectExists(ctx, req.Subject.URI)
-
if err != nil {
-
s.logger.Error("failed to validate subject existence",
-
"error", err,
-
"subject", req.Subject.URI)
-
return nil, fmt.Errorf("failed to validate subject: %w", err)
-
}
-
if !exists {
-
return nil, ErrSubjectNotFound
-
}
-
}
// Check for existing vote by querying PDS directly (source of truth)
// This avoids eventual consistency issues with the AppView database
···
// Parse the listRecords response
var result struct {
Records []struct {
URI string `json:"uri"`
CID string `json:"cid"`
···
CreatedAt string `json:"createdAt"`
} `json:"value"`
} `json:"records"`
-
Cursor string `json:"cursor"`
}
if err := json.Unmarshal(body, &result); err != nil {
···
// voteService implements the Service interface for vote operations
type voteService struct {
+
repo Repository
+
oauthClient *oauthclient.OAuthClient
+
oauthStore oauth.ClientAuthStore
+
logger *slog.Logger
}
// NewService creates a new vote service instance
+
func NewService(repo Repository, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, logger *slog.Logger) Service {
if logger == nil {
logger = slog.Default()
}
return &voteService{
+
repo: repo,
+
oauthClient: oauthClient,
+
oauthStore: oauthStore,
+
logger: logger,
}
}
···
return nil, ErrInvalidSubject
}
+
// Note: We intentionally don't validate subject existence here.
+
// The vote record goes to the user's PDS regardless. The Jetstream consumer
+
// handles orphaned votes correctly by only updating counts for non-deleted subjects.
+
// This avoids race conditions and eventual consistency issues.
// Check for existing vote by querying PDS directly (source of truth)
// This avoids eventual consistency issues with the AppView database
···
// Parse the listRecords response
var result struct {
+
Cursor string `json:"cursor"`
Records []struct {
URI string `json:"uri"`
CID string `json:"cid"`
···
CreatedAt string `json:"createdAt"`
} `json:"value"`
} `json:"records"`
}
if err := json.Unmarshal(body, &result); err != nil {
-50
internal/core/votes/subject_validator.go
···
-
package votes
-
-
import (
-
"context"
-
"strings"
-
)
-
-
// SubjectExistsFunc is a function type that checks if a subject exists
-
type SubjectExistsFunc func(ctx context.Context, uri string) (bool, error)
-
-
// CompositeSubjectValidator validates subjects by checking both posts and comments
-
type CompositeSubjectValidator struct {
-
postExists SubjectExistsFunc
-
commentExists SubjectExistsFunc
-
}
-
-
// NewCompositeSubjectValidator creates a validator that checks both posts and comments
-
// Pass nil for either function to skip validation for that type
-
func NewCompositeSubjectValidator(postExists, commentExists SubjectExistsFunc) *CompositeSubjectValidator {
-
return &CompositeSubjectValidator{
-
postExists: postExists,
-
commentExists: commentExists,
-
}
-
}
-
-
// SubjectExists checks if a post or comment exists at the given URI
-
// Determines type from the collection in the URI (e.g., social.coves.feed.post vs social.coves.feed.comment)
-
func (v *CompositeSubjectValidator) SubjectExists(ctx context.Context, uri string) (bool, error) {
-
// Parse collection from AT-URI: at://did/collection/rkey
-
// Example: at://did:plc:xxx/social.coves.feed.post/abc123
-
if strings.Contains(uri, "/social.coves.feed.post/") {
-
if v.postExists != nil {
-
return v.postExists(ctx, uri)
-
}
-
// If no post checker, assume exists (for testing)
-
return true, nil
-
}
-
-
if strings.Contains(uri, "/social.coves.feed.comment/") {
-
if v.commentExists != nil {
-
return v.commentExists(ctx, uri)
-
}
-
// If no comment checker, assume exists (for testing)
-
return true, nil
-
}
-
-
// Unknown collection type - could be from another app
-
// For now, allow voting on unknown types (future-proofing)
-
return true, nil
-
}
···
-9
internal/core/votes/vote.go
···
package votes
import (
-
"context"
"time"
)
-
-
// SubjectValidator validates that vote subjects (posts/comments) exist
-
// This prevents creating votes on non-existent content
-
type SubjectValidator interface {
-
// SubjectExists checks if a post or comment exists at the given URI
-
// Returns true if found, false if not found
-
SubjectExists(ctx context.Context, uri string) (bool, error)
-
}
// Vote represents a vote in the AppView database
// Votes are indexed from the firehose after being written to user repositories
···
package votes
import (
"time"
)
// Vote represents a vote in the AppView database
// Votes are indexed from the firehose after being written to user repositories
+77 -27
tests/integration/vote_e2e_test.go
···
oauthClient := SetupOAuthTestClient(t, oauthStore)
// Setup services
-
voteService := votes.NewService(voteRepo, nil, oauthClient, oauthStore, nil)
// Create test user on PDS
testUserHandle := fmt.Sprintf("voter-%d.local.coves.dev", time.Now().Unix())
···
CID string `json:"cid"`
}
-
if err := json.NewDecoder(resp.Body).Decode(&voteResp); err != nil {
-
t.Fatalf("Failed to decode vote response: %v", err)
}
t.Logf("āœ… XRPC response received:")
···
oauthStore := SetupOAuthTestStore(t, db)
oauthClient := SetupOAuthTestClient(t, oauthStore)
-
voteService := votes.NewService(voteRepo, nil, oauthClient, oauthStore, nil)
// Create test user
testUserHandle := fmt.Sprintf("toggle-%d.local.coves.dev", time.Now().Unix())
···
URI string `json:"uri"`
CID string `json:"cid"`
}
-
json.NewDecoder(resp.Body).Decode(&firstVoteResp)
-
resp.Body.Close()
t.Logf("āœ… First vote created: %s", firstVoteResp.URI)
···
},
},
}
-
voteConsumer.HandleEvent(ctx, &voteEvent)
// Second upvote (same direction) - should toggle off (delete)
t.Logf("\nšŸ“ Creating second upvote (toggle off)...")
···
if err != nil {
t.Fatalf("Failed to toggle vote: %v", err)
}
-
defer resp2.Body.Close()
if resp2.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp2.Body)
···
RKey: rkey,
},
}
-
voteConsumer.HandleEvent(ctx, &deleteEvent)
// Verify vote was removed from AppView
t.Logf("\nšŸ” Verifying vote removed from AppView...")
···
oauthStore := SetupOAuthTestStore(t, db)
oauthClient := SetupOAuthTestClient(t, oauthStore)
-
voteService := votes.NewService(voteRepo, nil, oauthClient, oauthStore, nil)
// Create test user
testUserHandle := fmt.Sprintf("flip-%d.local.coves.dev", time.Now().Unix())
···
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+token)
-
resp, _ := http.DefaultClient.Do(req)
var upvoteResp struct {
URI string `json:"uri"`
CID string `json:"cid"`
}
-
json.NewDecoder(resp.Body).Decode(&upvoteResp)
-
resp.Body.Close()
// Index upvote
rkey := utils.ExtractRKeyFromURI(upvoteResp.URI)
···
},
},
}
-
voteConsumer.HandleEvent(ctx, &upvoteEvent)
t.Logf("āœ… Upvote created and indexed")
···
req2.Header.Set("Content-Type", "application/json")
req2.Header.Set("Authorization", "Bearer "+token)
-
resp2, _ := http.DefaultClient.Do(req2)
var downvoteResp struct {
URI string `json:"uri"`
CID string `json:"cid"`
}
-
json.NewDecoder(resp2.Body).Decode(&downvoteResp)
-
resp2.Body.Close()
// Simulate Jetstream UPDATE event (PDS updates the existing record)
t.Logf("\nšŸ”„ Simulating Jetstream UPDATE event...")
···
},
},
}
-
voteConsumer.HandleEvent(ctx, &updateEvent)
// Verify vote direction changed in AppView
t.Logf("\nšŸ” Verifying vote direction changed in AppView...")
···
oauthStore := SetupOAuthTestStore(t, db)
oauthClient := SetupOAuthTestClient(t, oauthStore)
-
voteService := votes.NewService(voteRepo, nil, oauthClient, oauthStore, nil)
// Create test user
testUserHandle := fmt.Sprintf("delete-%d.local.coves.dev", time.Now().Unix())
···
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+token)
-
resp, _ := http.DefaultClient.Do(req)
var voteResp struct {
URI string `json:"uri"`
CID string `json:"cid"`
}
-
json.NewDecoder(resp.Body).Decode(&voteResp)
-
resp.Body.Close()
// Index vote
rkey := utils.ExtractRKeyFromURI(voteResp.URI)
···
},
},
}
-
voteConsumer.HandleEvent(ctx, &voteEvent)
t.Logf("āœ… Vote created and indexed")
···
deleteHttpReq.Header.Set("Content-Type", "application/json")
deleteHttpReq.Header.Set("Authorization", "Bearer "+token)
-
deleteResp, _ := http.DefaultClient.Do(deleteHttpReq)
-
defer deleteResp.Body.Close()
if deleteResp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(deleteResp.Body)
···
// Per lexicon, delete returns empty object {}
var deleteRespBody map[string]interface{}
-
json.NewDecoder(deleteResp.Body).Decode(&deleteRespBody)
if len(deleteRespBody) != 0 {
t.Errorf("Expected empty object per lexicon, got %v", deleteRespBody)
···
RKey: rkey,
},
}
-
voteConsumer.HandleEvent(ctx, &deleteEvent)
// Verify vote removed from AppView
t.Logf("\nšŸ” Verifying vote removed from AppView...")
···
oauthClient := SetupOAuthTestClient(t, oauthStore)
// Setup services
+
voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
// Create test user on PDS
testUserHandle := fmt.Sprintf("voter-%d.local.coves.dev", time.Now().Unix())
···
CID string `json:"cid"`
}
+
if decodeErr := json.NewDecoder(resp.Body).Decode(&voteResp); decodeErr != nil {
+
t.Fatalf("Failed to decode vote response: %v", decodeErr)
}
t.Logf("āœ… XRPC response received:")
···
oauthStore := SetupOAuthTestStore(t, db)
oauthClient := SetupOAuthTestClient(t, oauthStore)
+
voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
// Create test user
testUserHandle := fmt.Sprintf("toggle-%d.local.coves.dev", time.Now().Unix())
···
URI string `json:"uri"`
CID string `json:"cid"`
}
+
if decodeErr := json.NewDecoder(resp.Body).Decode(&firstVoteResp); decodeErr != nil {
+
t.Fatalf("Failed to decode first vote response: %v", decodeErr)
+
}
+
if closeErr := resp.Body.Close(); closeErr != nil {
+
t.Logf("Failed to close response body: %v", closeErr)
+
}
t.Logf("āœ… First vote created: %s", firstVoteResp.URI)
···
},
},
}
+
if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil {
+
t.Fatalf("Failed to handle first vote event: %v", handleErr)
+
}
// Second upvote (same direction) - should toggle off (delete)
t.Logf("\nšŸ“ Creating second upvote (toggle off)...")
···
if err != nil {
t.Fatalf("Failed to toggle vote: %v", err)
}
+
defer func() {
+
if closeErr := resp2.Body.Close(); closeErr != nil {
+
t.Logf("Failed to close response body: %v", closeErr)
+
}
+
}()
if resp2.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp2.Body)
···
RKey: rkey,
},
}
+
if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil {
+
t.Fatalf("Failed to handle delete event: %v", handleErr)
+
}
// Verify vote was removed from AppView
t.Logf("\nšŸ” Verifying vote removed from AppView...")
···
oauthStore := SetupOAuthTestStore(t, db)
oauthClient := SetupOAuthTestClient(t, oauthStore)
+
voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
// Create test user
testUserHandle := fmt.Sprintf("flip-%d.local.coves.dev", time.Now().Unix())
···
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+token)
+
resp, err := http.DefaultClient.Do(req)
+
if err != nil {
+
t.Fatalf("Failed to create upvote: %v", err)
+
}
var upvoteResp struct {
URI string `json:"uri"`
CID string `json:"cid"`
}
+
if decodeErr := json.NewDecoder(resp.Body).Decode(&upvoteResp); decodeErr != nil {
+
t.Fatalf("Failed to decode upvote response: %v", decodeErr)
+
}
+
if closeErr := resp.Body.Close(); closeErr != nil {
+
t.Logf("Failed to close response body: %v", closeErr)
+
}
// Index upvote
rkey := utils.ExtractRKeyFromURI(upvoteResp.URI)
···
},
},
}
+
if handleErr := voteConsumer.HandleEvent(ctx, &upvoteEvent); handleErr != nil {
+
t.Fatalf("Failed to handle upvote event: %v", handleErr)
+
}
t.Logf("āœ… Upvote created and indexed")
···
req2.Header.Set("Content-Type", "application/json")
req2.Header.Set("Authorization", "Bearer "+token)
+
resp2, err := http.DefaultClient.Do(req2)
+
if err != nil {
+
t.Fatalf("Failed to create downvote: %v", err)
+
}
var downvoteResp struct {
URI string `json:"uri"`
CID string `json:"cid"`
}
+
if decodeErr := json.NewDecoder(resp2.Body).Decode(&downvoteResp); decodeErr != nil {
+
t.Fatalf("Failed to decode downvote response: %v", decodeErr)
+
}
+
if closeErr := resp2.Body.Close(); closeErr != nil {
+
t.Logf("Failed to close response body: %v", closeErr)
+
}
// Simulate Jetstream UPDATE event (PDS updates the existing record)
t.Logf("\nšŸ”„ Simulating Jetstream UPDATE event...")
···
},
},
}
+
if handleErr := voteConsumer.HandleEvent(ctx, &updateEvent); handleErr != nil {
+
t.Fatalf("Failed to handle update event: %v", handleErr)
+
}
// Verify vote direction changed in AppView
t.Logf("\nšŸ” Verifying vote direction changed in AppView...")
···
oauthStore := SetupOAuthTestStore(t, db)
oauthClient := SetupOAuthTestClient(t, oauthStore)
+
voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
// Create test user
testUserHandle := fmt.Sprintf("delete-%d.local.coves.dev", time.Now().Unix())
···
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+token)
+
resp, err := http.DefaultClient.Do(req)
+
if err != nil {
+
t.Fatalf("Failed to create vote: %v", err)
+
}
var voteResp struct {
URI string `json:"uri"`
CID string `json:"cid"`
}
+
if decodeErr := json.NewDecoder(resp.Body).Decode(&voteResp); decodeErr != nil {
+
t.Fatalf("Failed to decode vote response: %v", decodeErr)
+
}
+
if closeErr := resp.Body.Close(); closeErr != nil {
+
t.Logf("Failed to close response body: %v", closeErr)
+
}
// Index vote
rkey := utils.ExtractRKeyFromURI(voteResp.URI)
···
},
},
}
+
if handleErr := voteConsumer.HandleEvent(ctx, &voteEvent); handleErr != nil {
+
t.Fatalf("Failed to handle vote event: %v", handleErr)
+
}
t.Logf("āœ… Vote created and indexed")
···
deleteHttpReq.Header.Set("Content-Type", "application/json")
deleteHttpReq.Header.Set("Authorization", "Bearer "+token)
+
deleteResp, err := http.DefaultClient.Do(deleteHttpReq)
+
if err != nil {
+
t.Fatalf("Failed to delete vote: %v", err)
+
}
+
defer func() {
+
if closeErr := deleteResp.Body.Close(); closeErr != nil {
+
t.Logf("Failed to close response body: %v", closeErr)
+
}
+
}()
if deleteResp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(deleteResp.Body)
···
// Per lexicon, delete returns empty object {}
var deleteRespBody map[string]interface{}
+
if decodeErr := json.NewDecoder(deleteResp.Body).Decode(&deleteRespBody); decodeErr != nil {
+
t.Fatalf("Failed to decode delete response: %v", decodeErr)
+
}
if len(deleteRespBody) != 0 {
t.Errorf("Expected empty object per lexicon, got %v", deleteRespBody)
···
RKey: rkey,
},
}
+
if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil {
+
t.Fatalf("Failed to handle delete event: %v", handleErr)
+
}
// Verify vote removed from AppView
t.Logf("\nšŸ” Verifying vote removed from AppView...")