A community based topic aggregation platform built on atproto

Compare changes

Choose any two refs to compare.

Changed files
+90 -77
.beads
cmd
server
internal
api
atproto
lexicon
social
coves
feed
oauth
core
db
postgres
tests
+16
tests/integration/helpers.go
···
// AddSession adds a session to the store
func (m *MockOAuthStore) AddSession(did, sessionID, accessToken string) {
+
m.AddSessionWithPDS(did, sessionID, accessToken, getTestPDSURL())
+
}
+
+
// AddSessionWithPDS adds a session to the store with a specific PDS URL
+
func (m *MockOAuthStore) AddSessionWithPDS(did, sessionID, accessToken, pdsURL string) {
key := did + ":" + sessionID
parsedDID, _ := syntax.ParseDID(did)
m.sessions[key] = &oauthlib.ClientSessionData{
AccountDID: parsedDID,
SessionID: sessionID,
AccessToken: accessToken,
+
HostURL: pdsURL,
}
}
···
e.store.AddSession(did, sessionID, "access-token-"+did)
return token
}
+
+
// AddUserWithPDSToken registers a user with their real PDS access token
+
// Use this for E2E tests that need to write to the real PDS
+
func (e *E2EOAuthMiddleware) AddUserWithPDSToken(did, pdsAccessToken, pdsURL string) string {
+
token := "test-token-" + did
+
sessionID := "session-" + did
+
e.unsealer.AddSession(token, did, sessionID)
+
e.store.AddSessionWithPDS(did, sessionID, pdsAccessToken, pdsURL)
+
return token
+
}
+3
.beads/beads.left.jsonl
···
+
{"id":"Coves-95q","content_hash":"8ec99d598f067780436b985f9ad57f0fa19632026981038df4f65f192186620b","title":"Add comprehensive API documentation","description":"","status":"open","priority":2,"issue_type":"task","created_at":"2025-11-17T20:30:34.835721854-08:00","updated_at":"2025-11-17T20:30:34.835721854-08:00","source_repo":".","dependencies":[{"issue_id":"Coves-95q","depends_on_id":"Coves-e16","type":"blocks","created_at":"2025-11-17T20:30:46.273899399-08:00","created_by":"daemon"}]}
+
{"id":"Coves-e16","content_hash":"7c5d0fc8f0e7f626be3dad62af0e8412467330bad01a244e5a7e52ac5afff1c1","title":"Complete post creation and moderation features","description":"","status":"open","priority":1,"issue_type":"feature","created_at":"2025-11-17T20:30:12.885991306-08:00","updated_at":"2025-11-17T20:30:12.885991306-08:00","source_repo":"."}
+
{"id":"Coves-fce","content_hash":"26b3e16b99f827316ee0d741cc959464bd0c813446c95aef8105c7fd1e6b09ff","title":"Implement aggregator feed federation","description":"","status":"open","priority":1,"issue_type":"feature","created_at":"2025-11-17T20:30:21.453326012-08:00","updated_at":"2025-11-17T20:30:21.453326012-08:00","source_repo":"."}
+1
.beads/beads.left.meta.json
···
+
{"version":"0.23.1","timestamp":"2025-12-02T18:25:24.009187871-08:00","commit":"00d7d8d"}
+5 -28
cmd/server/main.go
···
commentRepo := postgresRepo.NewCommentRepository(db)
log.Println("โœ… Comment repository initialized (Jetstream indexing only)")
-
// Initialize subject validator for votes (checks posts and comments exist)
-
subjectValidator := votes.NewCompositeSubjectValidator(
-
// Post existence checker
-
func(ctx context.Context, uri string) (bool, error) {
-
_, err := postRepo.GetByURI(ctx, uri)
-
if err != nil {
-
if err == posts.ErrNotFound {
-
return false, nil
-
}
-
return false, err
-
}
-
return true, nil
-
},
-
// Comment existence checker
-
func(ctx context.Context, uri string) (bool, error) {
-
_, err := commentRepo.GetByURI(ctx, uri)
-
if err != nil {
-
if err == comments.ErrCommentNotFound {
-
return false, nil
-
}
-
return false, err
-
}
-
return true, nil
-
},
-
)
-
// Initialize vote service (for XRPC API endpoints)
-
voteService := votes.NewService(voteRepo, subjectValidator, oauthClient, oauthStore, nil)
-
log.Println("โœ… Vote service initialized (with OAuth authentication and subject validation)")
+
// Note: We don't validate subject existence - the vote goes to the user's PDS regardless.
+
// The Jetstream consumer handles orphaned votes correctly by only updating counts for
+
// non-deleted subjects. This avoids race conditions and eventual consistency issues.
+
voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
+
log.Println("โœ… Vote service initialized (with OAuth authentication)")
// Initialize comment service (for query API)
// Requires user and community repos for proper author/community hydration per lexicon
+2 -8
internal/api/handlers/vote/create_vote_test.go
···
func TestCreateVoteHandler_ServiceError(t *testing.T) {
tests := []struct {
-
name string
serviceError error
-
expectedStatus int
+
name string
expectedError string
+
expectedStatus int
}{
-
{
-
name: "subject not found",
-
serviceError: votes.ErrSubjectNotFound,
-
expectedStatus: http.StatusNotFound,
-
expectedError: "SubjectNotFound", // Per lexicon: social.coves.feed.vote.create#SubjectNotFound
-
},
{
name: "invalid direction",
serviceError: votes.ErrInvalidDirection,
+2 -8
internal/api/handlers/vote/delete_vote_test.go
···
func TestDeleteVoteHandler_ServiceError(t *testing.T) {
tests := []struct {
-
name string
serviceError error
-
expectedStatus int
+
name string
expectedError string
+
expectedStatus int
}{
{
name: "vote not found",
···
expectedStatus: http.StatusNotFound,
expectedError: "VoteNotFound", // Per lexicon: social.coves.feed.vote.delete#VoteNotFound
},
-
{
-
name: "subject not found",
-
serviceError: votes.ErrSubjectNotFound,
-
expectedStatus: http.StatusNotFound,
-
expectedError: "SubjectNotFound", // Per lexicon: social.coves.feed.vote.create#SubjectNotFound
-
},
{
name: "invalid subject",
serviceError: votes.ErrInvalidSubject,
-4
internal/atproto/lexicon/social/coves/feed/vote/create.json
···
}
},
"errors": [
-
{
-
"name": "SubjectNotFound",
-
"description": "The subject post or comment was not found"
-
},
{
"name": "NotAuthorized",
"description": "User is not authorized to vote on this content"
+4 -4
internal/atproto/oauth/handlers_security.go
···
// - Android: Verified via /.well-known/assetlinks.json
var allowedMobileRedirectURIs = map[string]bool{
// Custom scheme per atproto spec (reverse-domain of coves.social)
-
"social.coves:/callback": true,
-
"social.coves://callback": true, // Some platforms add double slash
-
"social.coves:/oauth/callback": true, // Alternative path
-
"social.coves://oauth/callback": true,
+
"social.coves:/callback": true,
+
"social.coves://callback": true, // Some platforms add double slash
+
"social.coves:/oauth/callback": true, // Alternative path
+
"social.coves://oauth/callback": true,
// Universal Links - cryptographically bound to app (preferred for security)
"https://coves.social/app/oauth/callback": true,
}
-3
internal/core/votes/errors.go
···
// ErrVoteNotFound indicates the requested vote doesn't exist
ErrVoteNotFound = errors.New("vote not found")
-
// ErrSubjectNotFound indicates the post/comment being voted on doesn't exist
-
ErrSubjectNotFound = errors.New("subject not found")
-
// ErrInvalidDirection indicates the vote direction is not "up" or "down"
ErrInvalidDirection = errors.New("invalid vote direction: must be 'up' or 'down'")
+10 -6
internal/core/votes/service.go
···
// Implements write-forward pattern: validates requests, then forwards to user's PDS
//
// Architecture:
-
// - Service validates input and checks authorization
-
// - Queries user's PDS directly via com.atproto.repo.listRecords to check existing votes
-
// (avoids eventual consistency issues with AppView database)
-
// - Creates/deletes vote records via com.atproto.repo.createRecord/deleteRecord
-
// - AppView indexes resulting records from Jetstream firehose for aggregate counts
+
// - Service validates input and checks authorization
+
// - Queries user's PDS directly via com.atproto.repo.listRecords to check existing votes
+
// (avoids eventual consistency issues with AppView database)
+
// - Creates/deletes vote records via com.atproto.repo.createRecord/deleteRecord
+
// - AppView indexes resulting records from Jetstream firehose for aggregate counts
type Service interface {
// CreateVote creates a new vote or toggles off an existing vote
// Returns URI and CID of created vote, or empty strings if toggled off
···
// Validation:
// - Direction must be "up" or "down" (returns ErrInvalidDirection)
// - Subject URI must be valid AT-URI (returns ErrInvalidSubject)
-
// - Subject must exist (returns ErrSubjectNotFound)
+
// - Subject CID must be provided (returns ErrInvalidSubject)
+
//
+
// Note: Subject existence is NOT validated. Votes on non-existent or deleted
+
// subjects are allowed - the Jetstream consumer handles orphaned votes correctly
+
// by only updating counts for non-deleted subjects.
//
// Behavior:
// - If no vote exists: creates new vote with given direction
+3 -2
internal/db/postgres/vote_repo.go
···
return nil
}
-
// GetByURI retrieves a vote by its AT-URI
+
// GetByURI retrieves an active vote by its AT-URI
// Used by Jetstream consumer for DELETE operations
+
// Returns ErrVoteNotFound for soft-deleted votes
func (r *postgresVoteRepo) GetByURI(ctx context.Context, uri string) (*votes.Vote, error) {
query := `
SELECT
···
subject_uri, subject_cid, direction,
created_at, indexed_at, deleted_at
FROM votes
-
WHERE uri = $1
+
WHERE uri = $1 AND deleted_at IS NULL
`
var vote votes.Vote
+44 -14
tests/integration/vote_e2e_test.go
···
t.Logf("Failed to close response body: %v", closeErr)
}
-
// Simulate Jetstream UPDATE event (PDS updates the existing record)
-
t.Logf("\n๐Ÿ”„ Simulating Jetstream UPDATE event...")
-
updateEvent := jetstream.JetstreamEvent{
+
// The service flow for direction change is:
+
// 1. DELETE old vote on PDS
+
// 2. CREATE new vote with NEW rkey on PDS
+
// So we simulate DELETE + CREATE events (not UPDATE)
+
+
// Simulate Jetstream DELETE event for old vote
+
t.Logf("\n๐Ÿ”„ Simulating Jetstream DELETE event for old upvote...")
+
deleteEvent := jetstream.JetstreamEvent{
+
Did: userDID,
+
TimeUS: time.Now().UnixMicro(),
+
Kind: "commit",
+
Commit: &jetstream.CommitEvent{
+
Rev: "test-vote-rev-delete",
+
Operation: "delete",
+
Collection: "social.coves.feed.vote",
+
RKey: rkey, // Old upvote rkey
+
},
+
}
+
if handleErr := voteConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil {
+
t.Fatalf("Failed to handle delete event: %v", handleErr)
+
}
+
+
// Simulate Jetstream CREATE event for new downvote
+
t.Logf("\n๐Ÿ”„ Simulating Jetstream CREATE event for new downvote...")
+
newRkey := utils.ExtractRKeyFromURI(downvoteResp.URI)
+
createEvent := jetstream.JetstreamEvent{
Did: userDID,
TimeUS: time.Now().UnixMicro(),
Kind: "commit",
Commit: &jetstream.CommitEvent{
Rev: "test-vote-rev-down",
-
Operation: "update",
+
Operation: "create",
Collection: "social.coves.feed.vote",
-
RKey: rkey, // Same rkey as before
+
RKey: newRkey, // NEW rkey from downvote response
CID: downvoteResp.CID,
Record: map[string]interface{}{
"$type": "social.coves.feed.vote",
···
"uri": postURI,
"cid": postCID,
},
-
"direction": "down", // Changed direction
+
"direction": "down",
"createdAt": time.Now().Format(time.RFC3339),
},
},
}
-
if handleErr := voteConsumer.HandleEvent(ctx, &updateEvent); handleErr != nil {
-
t.Fatalf("Failed to handle update event: %v", handleErr)
+
if handleErr := voteConsumer.HandleEvent(ctx, &createEvent); handleErr != nil {
+
t.Fatalf("Failed to handle create event: %v", handleErr)
+
}
+
+
// Verify old upvote was deleted
+
t.Logf("\n๐Ÿ” Verifying old upvote was deleted...")
+
_, err = voteRepo.GetByURI(ctx, upvoteResp.URI)
+
if err == nil {
+
t.Error("Expected old upvote to be deleted, but it still exists")
}
-
// Verify vote direction changed in AppView
-
t.Logf("\n๐Ÿ” Verifying vote direction changed in AppView...")
-
updatedVote, err := voteRepo.GetByURI(ctx, upvoteResp.URI)
+
// Verify new downvote was indexed
+
t.Logf("\n๐Ÿ” Verifying new downvote indexed in AppView...")
+
newVote, err := voteRepo.GetByURI(ctx, downvoteResp.URI)
if err != nil {
-
t.Fatalf("Vote not found after update: %v", err)
+
t.Fatalf("New downvote not found: %v", err)
}
-
if updatedVote.Direction != "down" {
-
t.Errorf("Expected direction 'down', got %s", updatedVote.Direction)
+
if newVote.Direction != "down" {
+
t.Errorf("Expected direction 'down', got %s", newVote.Direction)
}
// Verify post counts updated