A community based topic aggregation platform built on atproto

Compare changes

Choose any two refs to compare.

Changed files
+359 -182
.beads
cmd
server
internal
api
atproto
lexicon
social
coves
feed
oauth
core
db
postgres
static
tests
integration
+1 -8
Caddyfile
···
file_server
}
-
# Serve OAuth callback page
-
handle /oauth/callback {
-
root * /srv
-
rewrite * /oauth/callback.html
-
file_server
-
}
-
-
# Proxy all other requests to AppView
handle {
reverse_proxy appview:8080 {
# Health check
···
file_server
}
+
# Proxy all requests to AppView
handle {
reverse_proxy appview:8080 {
# Health check
-97
static/oauth/callback.html
···
-
<!DOCTYPE html>
-
<html>
-
<head>
-
<meta charset="utf-8">
-
<meta name="viewport" content="width=device-width, initial-scale=1">
-
<meta http-equiv="Content-Security-Policy" content="default-src 'self'; script-src 'unsafe-inline'; style-src 'unsafe-inline'">
-
<title>Authorization Successful - Coves</title>
-
<style>
-
body {
-
font-family: system-ui, -apple-system, sans-serif;
-
display: flex;
-
align-items: center;
-
justify-content: center;
-
min-height: 100vh;
-
margin: 0;
-
background: #f5f5f5;
-
}
-
.container {
-
text-align: center;
-
padding: 2rem;
-
background: white;
-
border-radius: 8px;
-
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
-
max-width: 400px;
-
}
-
.success { color: #22c55e; font-size: 3rem; margin-bottom: 1rem; }
-
h1 { margin: 0 0 0.5rem; color: #1f2937; font-size: 1.5rem; }
-
p { color: #6b7280; margin: 0.5rem 0; }
-
a {
-
display: inline-block;
-
margin-top: 1rem;
-
padding: 0.75rem 1.5rem;
-
background: #3b82f6;
-
color: white;
-
text-decoration: none;
-
border-radius: 6px;
-
font-weight: 500;
-
}
-
a:hover { background: #2563eb; }
-
</style>
-
</head>
-
<body>
-
<div class="container">
-
<div class="success">โœ“</div>
-
<h1>Authorization Successful!</h1>
-
<p id="status">Returning to Coves...</p>
-
<a href="#" id="manualLink">Open Coves</a>
-
</div>
-
<script>
-
(function() {
-
// Parse and sanitize query params - only allow expected OAuth parameters
-
const urlParams = new URLSearchParams(window.location.search);
-
const safeParams = new URLSearchParams();
-
-
// Whitelist only expected OAuth callback parameters
-
const code = urlParams.get('code');
-
const state = urlParams.get('state');
-
const error = urlParams.get('error');
-
const errorDescription = urlParams.get('error_description');
-
const iss = urlParams.get('iss');
-
-
if (code) safeParams.set('code', code);
-
if (state) safeParams.set('state', state);
-
if (error) safeParams.set('error', error);
-
if (errorDescription) safeParams.set('error_description', errorDescription);
-
if (iss) safeParams.set('iss', iss);
-
-
const sanitizedQuery = safeParams.toString() ? '?' + safeParams.toString() : '';
-
-
const userAgent = navigator.userAgent || '';
-
const isAndroid = /Android/i.test(userAgent);
-
-
// Build deep link based on platform
-
let deepLink;
-
if (isAndroid) {
-
// Android: Intent URL format
-
const pathAndQuery = '/oauth/callback' + sanitizedQuery;
-
deepLink = 'intent:/' + pathAndQuery + '#Intent;scheme=social.coves;package=social.coves;end';
-
} else {
-
// iOS: Custom scheme
-
deepLink = 'social.coves:/oauth/callback' + sanitizedQuery;
-
}
-
-
// Update manual link
-
document.getElementById('manualLink').href = deepLink;
-
-
// Attempt automatic redirect
-
window.location.href = deepLink;
-
-
// Update status after 2 seconds if redirect didn't work
-
setTimeout(function() {
-
document.getElementById('status').textContent = 'Click the button above to continue';
-
}, 2000);
-
})();
-
</script>
-
</body>
-
</html>
···
+6 -5
internal/api/routes/oauth.go
···
// Use login limiter since callback completes the authentication flow
r.With(corsMiddleware(allowedOrigins), loginLimiter.Middleware).Get("/oauth/callback", handler.HandleCallback)
-
// Mobile Universal Link callback route
-
// This route is used for iOS Universal Links and Android App Links
-
// Path must match the path in .well-known/apple-app-site-association
-
// Uses the same handler as web callback - the system routes it to the mobile app
-
r.With(loginLimiter.Middleware).Get("/app/oauth/callback", handler.HandleCallback)
// Session management - dedicated rate limits
r.With(logoutLimiter.Middleware).Post("/oauth/logout", handler.HandleLogout)
···
// Use login limiter since callback completes the authentication flow
r.With(corsMiddleware(allowedOrigins), loginLimiter.Middleware).Get("/oauth/callback", handler.HandleCallback)
+
// Mobile Universal Link callback route (fallback when app doesn't intercept)
+
// This route exists for iOS Universal Links and Android App Links.
+
// When properly configured, the mobile OS intercepts this URL and opens the app
+
// BEFORE the request reaches the server. If this handler is reached, it means
+
// Universal Links failed to intercept.
+
r.With(loginLimiter.Middleware).Get("/app/oauth/callback", handler.HandleMobileDeepLinkFallback)
// Session management - dedicated rate limits
r.With(logoutLimiter.Middleware).Post("/oauth/logout", handler.HandleLogout)
+11
static/.well-known/apple-app-site-association
···
···
+
{
+
"applinks": {
+
"apps": [],
+
"details": [
+
{
+
"appID": "TEAM_ID.social.coves",
+
"paths": ["/app/oauth/callback"]
+
}
+
]
+
}
+
}
+10
static/.well-known/assetlinks.json
···
···
+
[{
+
"relation": ["delegate_permission/common.handle_all_urls"],
+
"target": {
+
"namespace": "android_app",
+
"package_name": "social.coves",
+
"sha256_cert_fingerprints": [
+
"0B:D8:8C:99:66:25:E5:CD:06:54:80:88:01:6F:B7:38:B9:F4:5B:41:71:F7:95:C8:68:94:87:AD:EA:9F:D9:ED"
+
]
+
}
+
}]
+16 -9
internal/atproto/oauth/handlers_test.go
···
}
// TestIsMobileRedirectURI tests mobile redirect URI validation with EXACT URI matching
-
// Only Universal Links (HTTPS) are allowed - custom schemes are blocked for security
func TestIsMobileRedirectURI(t *testing.T) {
tests := []struct {
uri string
expected bool
}{
-
{"https://coves.social/app/oauth/callback", true}, // Universal Link - allowed
-
{"coves-app://oauth/callback", false}, // Custom scheme - blocked (insecure)
-
{"coves://oauth/callback", false}, // Custom scheme - blocked (insecure)
-
{"coves-app://callback", false}, // Custom scheme - blocked
-
{"coves://oauth", false}, // Custom scheme - blocked
-
{"myapp://oauth", false}, // Not in allowlist
-
{"https://example.com", false}, // Wrong domain
-
{"http://localhost", false}, // HTTP not allowed
{"", false},
{"not-a-uri", false},
}
···
}
// TestIsMobileRedirectURI tests mobile redirect URI validation with EXACT URI matching
+
// Per atproto spec, custom schemes must match client_id hostname in reverse-domain order
func TestIsMobileRedirectURI(t *testing.T) {
tests := []struct {
uri string
expected bool
}{
+
// Custom scheme per atproto spec (reverse domain of coves.social)
+
{"social.coves:/callback", true},
+
{"social.coves://callback", true},
+
{"social.coves:/oauth/callback", true},
+
{"social.coves://oauth/callback", true},
+
// Universal Link - allowed (strongest security)
+
{"https://coves.social/app/oauth/callback", true},
+
// Wrong custom schemes - not reverse-domain of coves.social
+
{"coves-app://oauth/callback", false},
+
{"coves://oauth/callback", false},
+
{"coves.social://callback", false}, // Not reversed
+
{"myapp://oauth", false},
+
// Wrong domain/scheme
+
{"https://example.com", false},
+
{"http://localhost", false},
{"", false},
{"not-a-uri", false},
}
+41
internal/atproto/lexicon/social/coves/feed/vote/delete.json
···
···
+
{
+
"lexicon": 1,
+
"id": "social.coves.feed.vote.delete",
+
"defs": {
+
"main": {
+
"type": "procedure",
+
"description": "Delete a vote on a post or comment",
+
"input": {
+
"encoding": "application/json",
+
"schema": {
+
"type": "object",
+
"required": ["subject"],
+
"properties": {
+
"subject": {
+
"type": "ref",
+
"ref": "com.atproto.repo.strongRef",
+
"description": "Strong reference to the post or comment to remove the vote from"
+
}
+
}
+
}
+
},
+
"output": {
+
"encoding": "application/json",
+
"schema": {
+
"type": "object",
+
"properties": {}
+
}
+
},
+
"errors": [
+
{
+
"name": "VoteNotFound",
+
"description": "No vote found for this subject"
+
},
+
{
+
"name": "NotAuthorized",
+
"description": "User is not authorized to delete this vote"
+
}
+
]
+
}
+
}
+
}
+115
internal/api/handlers/vote/create_vote.go
···
···
+
package vote
+
+
import (
+
"Coves/internal/api/middleware"
+
"Coves/internal/core/votes"
+
"encoding/json"
+
"log"
+
"net/http"
+
)
+
+
// CreateVoteHandler handles vote creation
+
type CreateVoteHandler struct {
+
service votes.Service
+
}
+
+
// NewCreateVoteHandler creates a new create vote handler
+
func NewCreateVoteHandler(service votes.Service) *CreateVoteHandler {
+
return &CreateVoteHandler{
+
service: service,
+
}
+
}
+
+
// CreateVoteInput represents the request body for creating a vote
+
type CreateVoteInput struct {
+
Subject struct {
+
URI string `json:"uri"`
+
CID string `json:"cid"`
+
} `json:"subject"`
+
Direction string `json:"direction"`
+
}
+
+
// CreateVoteOutput represents the response body for creating a vote
+
type CreateVoteOutput struct {
+
URI string `json:"uri"`
+
CID string `json:"cid"`
+
}
+
+
// HandleCreateVote creates a vote on a post or comment
+
// POST /xrpc/social.coves.vote.create
+
//
+
// Request body: { "subject": { "uri": "at://...", "cid": "..." }, "direction": "up" }
+
// Response: { "uri": "at://...", "cid": "..." }
+
//
+
// Behavior:
+
// - If no vote exists: creates new vote with given direction
+
// - If vote exists with same direction: deletes vote (toggle off)
+
// - If vote exists with different direction: updates to new direction
+
func (h *CreateVoteHandler) HandleCreateVote(w http.ResponseWriter, r *http.Request) {
+
if r.Method != http.MethodPost {
+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+
return
+
}
+
+
// Parse request body
+
var input CreateVoteInput
+
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
+
writeError(w, http.StatusBadRequest, "InvalidRequest", "Invalid request body")
+
return
+
}
+
+
// Validate required fields
+
if input.Subject.URI == "" {
+
writeError(w, http.StatusBadRequest, "InvalidRequest", "subject.uri is required")
+
return
+
}
+
if input.Subject.CID == "" {
+
writeError(w, http.StatusBadRequest, "InvalidRequest", "subject.cid is required")
+
return
+
}
+
if input.Direction == "" {
+
writeError(w, http.StatusBadRequest, "InvalidRequest", "direction is required")
+
return
+
}
+
+
// Validate direction
+
if input.Direction != "up" && input.Direction != "down" {
+
writeError(w, http.StatusBadRequest, "InvalidRequest", "direction must be 'up' or 'down'")
+
return
+
}
+
+
// Get OAuth session from context (injected by auth middleware)
+
session := middleware.GetOAuthSession(r)
+
if session == nil {
+
writeError(w, http.StatusUnauthorized, "AuthRequired", "Authentication required")
+
return
+
}
+
+
// Create vote request
+
req := votes.CreateVoteRequest{
+
Subject: votes.StrongRef{
+
URI: input.Subject.URI,
+
CID: input.Subject.CID,
+
},
+
Direction: input.Direction,
+
}
+
+
// Call service to create vote
+
response, err := h.service.CreateVote(r.Context(), session, req)
+
if err != nil {
+
handleServiceError(w, err)
+
return
+
}
+
+
// Return success response
+
output := CreateVoteOutput{
+
URI: response.URI,
+
CID: response.CID,
+
}
+
+
w.Header().Set("Content-Type", "application/json")
+
w.WriteHeader(http.StatusOK)
+
if err := json.NewEncoder(w).Encode(output); err != nil {
+
log.Printf("Failed to encode response: %v", err)
+
}
+
}
+93
internal/api/handlers/vote/delete_vote.go
···
···
+
package vote
+
+
import (
+
"Coves/internal/api/middleware"
+
"Coves/internal/core/votes"
+
"encoding/json"
+
"log"
+
"net/http"
+
)
+
+
// DeleteVoteHandler handles vote deletion
+
type DeleteVoteHandler struct {
+
service votes.Service
+
}
+
+
// NewDeleteVoteHandler creates a new delete vote handler
+
func NewDeleteVoteHandler(service votes.Service) *DeleteVoteHandler {
+
return &DeleteVoteHandler{
+
service: service,
+
}
+
}
+
+
// DeleteVoteInput represents the request body for deleting a vote
+
type DeleteVoteInput struct {
+
Subject struct {
+
URI string `json:"uri"`
+
CID string `json:"cid"`
+
} `json:"subject"`
+
}
+
+
// DeleteVoteOutput represents the response body for deleting a vote
+
// Per lexicon: output is an empty object
+
type DeleteVoteOutput struct{}
+
+
// HandleDeleteVote removes a vote from a post or comment
+
// POST /xrpc/social.coves.vote.delete
+
//
+
// Request body: { "subject": { "uri": "at://...", "cid": "..." } }
+
// Response: { "success": true }
+
func (h *DeleteVoteHandler) HandleDeleteVote(w http.ResponseWriter, r *http.Request) {
+
if r.Method != http.MethodPost {
+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+
return
+
}
+
+
// Parse request body
+
var input DeleteVoteInput
+
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
+
writeError(w, http.StatusBadRequest, "InvalidRequest", "Invalid request body")
+
return
+
}
+
+
// Validate required fields
+
if input.Subject.URI == "" {
+
writeError(w, http.StatusBadRequest, "InvalidRequest", "subject.uri is required")
+
return
+
}
+
if input.Subject.CID == "" {
+
writeError(w, http.StatusBadRequest, "InvalidRequest", "subject.cid is required")
+
return
+
}
+
+
// Get OAuth session from context (injected by auth middleware)
+
session := middleware.GetOAuthSession(r)
+
if session == nil {
+
writeError(w, http.StatusUnauthorized, "AuthRequired", "Authentication required")
+
return
+
}
+
+
// Create delete vote request
+
req := votes.DeleteVoteRequest{
+
Subject: votes.StrongRef{
+
URI: input.Subject.URI,
+
CID: input.Subject.CID,
+
},
+
}
+
+
// Call service to delete vote
+
err := h.service.DeleteVote(r.Context(), session, req)
+
if err != nil {
+
handleServiceError(w, err)
+
return
+
}
+
+
// Return success response (empty object per lexicon)
+
output := DeleteVoteOutput{}
+
+
w.Header().Set("Content-Type", "application/json")
+
w.WriteHeader(http.StatusOK)
+
if err := json.NewEncoder(w).Encode(output); err != nil {
+
log.Printf("Failed to encode response: %v", err)
+
}
+
}
+24
internal/api/routes/vote.go
···
···
+
package routes
+
+
import (
+
"Coves/internal/api/handlers/vote"
+
"Coves/internal/api/middleware"
+
"Coves/internal/core/votes"
+
+
"github.com/go-chi/chi/v5"
+
)
+
+
// RegisterVoteRoutes registers vote-related XRPC endpoints on the router
+
// Implements social.coves.feed.vote.* lexicon endpoints
+
func RegisterVoteRoutes(r chi.Router, voteService votes.Service, authMiddleware *middleware.OAuthAuthMiddleware) {
+
// Initialize handlers
+
createHandler := vote.NewCreateVoteHandler(voteService)
+
deleteHandler := vote.NewDeleteVoteHandler(voteService)
+
+
// Procedure endpoints (POST) - require authentication
+
// social.coves.feed.vote.create - create or update a vote on a post/comment
+
r.With(authMiddleware.RequireAuth).Post("/xrpc/social.coves.feed.vote.create", createHandler.HandleCreateVote)
+
+
// social.coves.feed.vote.delete - delete a vote from a post/comment
+
r.With(authMiddleware.RequireAuth).Post("/xrpc/social.coves.feed.vote.delete", deleteHandler.HandleDeleteVote)
+
}
+16
tests/integration/helpers.go
···
// AddSession adds a session to the store
func (m *MockOAuthStore) AddSession(did, sessionID, accessToken string) {
key := did + ":" + sessionID
parsedDID, _ := syntax.ParseDID(did)
m.sessions[key] = &oauthlib.ClientSessionData{
AccountDID: parsedDID,
SessionID: sessionID,
AccessToken: accessToken,
}
}
···
e.store.AddSession(did, sessionID, "access-token-"+did)
return token
}
···
// AddSession adds a session to the store
func (m *MockOAuthStore) AddSession(did, sessionID, accessToken string) {
+
m.AddSessionWithPDS(did, sessionID, accessToken, getTestPDSURL())
+
}
+
+
// AddSessionWithPDS adds a session to the store with a specific PDS URL
+
func (m *MockOAuthStore) AddSessionWithPDS(did, sessionID, accessToken, pdsURL string) {
key := did + ":" + sessionID
parsedDID, _ := syntax.ParseDID(did)
m.sessions[key] = &oauthlib.ClientSessionData{
AccountDID: parsedDID,
SessionID: sessionID,
AccessToken: accessToken,
+
HostURL: pdsURL,
}
}
···
e.store.AddSession(did, sessionID, "access-token-"+did)
return token
}
+
+
// AddUserWithPDSToken registers a user with their real PDS access token
+
// Use this for E2E tests that need to write to the real PDS
+
func (e *E2EOAuthMiddleware) AddUserWithPDSToken(did, pdsAccessToken, pdsURL string) string {
+
token := "test-token-" + did
+
sessionID := "session-" + did
+
e.unsealer.AddSession(token, did, sessionID)
+
e.store.AddSessionWithPDS(did, sessionID, pdsAccessToken, pdsURL)
+
return token
+
}
+3
.beads/beads.left.jsonl
···
···
+
{"id":"Coves-95q","content_hash":"8ec99d598f067780436b985f9ad57f0fa19632026981038df4f65f192186620b","title":"Add comprehensive API documentation","description":"","status":"open","priority":2,"issue_type":"task","created_at":"2025-11-17T20:30:34.835721854-08:00","updated_at":"2025-11-17T20:30:34.835721854-08:00","source_repo":".","dependencies":[{"issue_id":"Coves-95q","depends_on_id":"Coves-e16","type":"blocks","created_at":"2025-11-17T20:30:46.273899399-08:00","created_by":"daemon"}]}
+
{"id":"Coves-e16","content_hash":"7c5d0fc8f0e7f626be3dad62af0e8412467330bad01a244e5a7e52ac5afff1c1","title":"Complete post creation and moderation features","description":"","status":"open","priority":1,"issue_type":"feature","created_at":"2025-11-17T20:30:12.885991306-08:00","updated_at":"2025-11-17T20:30:12.885991306-08:00","source_repo":"."}
+
{"id":"Coves-fce","content_hash":"26b3e16b99f827316ee0d741cc959464bd0c813446c95aef8105c7fd1e6b09ff","title":"Implement aggregator feed federation","description":"","status":"open","priority":1,"issue_type":"feature","created_at":"2025-11-17T20:30:21.453326012-08:00","updated_at":"2025-11-17T20:30:21.453326012-08:00","source_repo":"."}
+1
.beads/beads.left.meta.json
···
···
+
{"version":"0.23.1","timestamp":"2025-12-02T18:25:24.009187871-08:00","commit":"00d7d8d"}
+5 -28
cmd/server/main.go
···
commentRepo := postgresRepo.NewCommentRepository(db)
log.Println("โœ… Comment repository initialized (Jetstream indexing only)")
-
// Initialize subject validator for votes (checks posts and comments exist)
-
subjectValidator := votes.NewCompositeSubjectValidator(
-
// Post existence checker
-
func(ctx context.Context, uri string) (bool, error) {
-
_, err := postRepo.GetByURI(ctx, uri)
-
if err != nil {
-
if err == posts.ErrNotFound {
-
return false, nil
-
}
-
return false, err
-
}
-
return true, nil
-
},
-
// Comment existence checker
-
func(ctx context.Context, uri string) (bool, error) {
-
_, err := commentRepo.GetByURI(ctx, uri)
-
if err != nil {
-
if err == comments.ErrCommentNotFound {
-
return false, nil
-
}
-
return false, err
-
}
-
return true, nil
-
},
-
)
-
// Initialize vote service (for XRPC API endpoints)
-
voteService := votes.NewService(voteRepo, subjectValidator, oauthClient, oauthStore, nil)
-
log.Println("โœ… Vote service initialized (with OAuth authentication and subject validation)")
// Initialize comment service (for query API)
// Requires user and community repos for proper author/community hydration per lexicon
···
commentRepo := postgresRepo.NewCommentRepository(db)
log.Println("โœ… Comment repository initialized (Jetstream indexing only)")
// Initialize vote service (for XRPC API endpoints)
+
// Note: We don't validate subject existence - the vote goes to the user's PDS regardless.
+
// The Jetstream consumer handles orphaned votes correctly by only updating counts for
+
// non-deleted subjects. This avoids race conditions and eventual consistency issues.
+
voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil)
+
log.Println("โœ… Vote service initialized (with OAuth authentication)")
// Initialize comment service (for query API)
// Requires user and community repos for proper author/community hydration per lexicon
-3
internal/api/handlers/vote/errors.go
···
case errors.Is(err, votes.ErrVoteNotFound):
// Matches: social.coves.feed.vote.delete#VoteNotFound
writeError(w, http.StatusNotFound, "VoteNotFound", "No vote found for this subject")
-
case errors.Is(err, votes.ErrSubjectNotFound):
-
// Matches: social.coves.feed.vote.create#SubjectNotFound
-
writeError(w, http.StatusNotFound, "SubjectNotFound", "The subject post or comment was not found")
case errors.Is(err, votes.ErrInvalidDirection):
writeError(w, http.StatusBadRequest, "InvalidRequest", "Vote direction must be 'up' or 'down'")
case errors.Is(err, votes.ErrInvalidSubject):
···
case errors.Is(err, votes.ErrVoteNotFound):
// Matches: social.coves.feed.vote.delete#VoteNotFound
writeError(w, http.StatusNotFound, "VoteNotFound", "No vote found for this subject")
case errors.Is(err, votes.ErrInvalidDirection):
writeError(w, http.StatusBadRequest, "InvalidRequest", "Vote direction must be 'up' or 'down'")
case errors.Is(err, votes.ErrInvalidSubject):
-3
internal/core/votes/errors.go
···
// ErrVoteNotFound indicates the requested vote doesn't exist
ErrVoteNotFound = errors.New("vote not found")
-
// ErrSubjectNotFound indicates the post/comment being voted on doesn't exist
-
ErrSubjectNotFound = errors.New("subject not found")
-
// ErrInvalidDirection indicates the vote direction is not "up" or "down"
ErrInvalidDirection = errors.New("invalid vote direction: must be 'up' or 'down'")
···
// ErrVoteNotFound indicates the requested vote doesn't exist
ErrVoteNotFound = errors.New("vote not found")
// ErrInvalidDirection indicates the vote direction is not "up" or "down"
ErrInvalidDirection = errors.New("invalid vote direction: must be 'up' or 'down'")
+14 -27
internal/core/votes/service_impl.go
···
// voteService implements the Service interface for vote operations
type voteService struct {
-
repo Repository
-
subjectValidator SubjectValidator
-
oauthClient *oauthclient.OAuthClient
-
oauthStore oauth.ClientAuthStore
-
logger *slog.Logger
}
// NewService creates a new vote service instance
-
// subjectValidator can be nil to skip subject existence checks (not recommended for production)
-
func NewService(repo Repository, subjectValidator SubjectValidator, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, logger *slog.Logger) Service {
if logger == nil {
logger = slog.Default()
}
return &voteService{
-
repo: repo,
-
subjectValidator: subjectValidator,
-
oauthClient: oauthClient,
-
oauthStore: oauthStore,
-
logger: logger,
}
}
···
return nil, ErrInvalidSubject
}
-
// Validate subject exists in AppView (post or comment)
-
// This prevents creating votes on non-existent content
-
if s.subjectValidator != nil {
-
exists, err := s.subjectValidator.SubjectExists(ctx, req.Subject.URI)
-
if err != nil {
-
s.logger.Error("failed to validate subject existence",
-
"error", err,
-
"subject", req.Subject.URI)
-
return nil, fmt.Errorf("failed to validate subject: %w", err)
-
}
-
if !exists {
-
return nil, ErrSubjectNotFound
-
}
-
}
// Check for existing vote by querying PDS directly (source of truth)
// This avoids eventual consistency issues with the AppView database
···
// Parse the listRecords response
var result struct {
Records []struct {
URI string `json:"uri"`
CID string `json:"cid"`
···
CreatedAt string `json:"createdAt"`
} `json:"value"`
} `json:"records"`
-
Cursor string `json:"cursor"`
}
if err := json.Unmarshal(body, &result); err != nil {
···
// voteService implements the Service interface for vote operations
type voteService struct {
+
repo Repository
+
oauthClient *oauthclient.OAuthClient
+
oauthStore oauth.ClientAuthStore
+
logger *slog.Logger
}
// NewService creates a new vote service instance
+
func NewService(repo Repository, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, logger *slog.Logger) Service {
if logger == nil {
logger = slog.Default()
}
return &voteService{
+
repo: repo,
+
oauthClient: oauthClient,
+
oauthStore: oauthStore,
+
logger: logger,
}
}
···
return nil, ErrInvalidSubject
}
+
// Note: We intentionally don't validate subject existence here.
+
// The vote record goes to the user's PDS regardless. The Jetstream consumer
+
// handles orphaned votes correctly by only updating counts for non-deleted subjects.
+
// This avoids race conditions and eventual consistency issues.
// Check for existing vote by querying PDS directly (source of truth)
// This avoids eventual consistency issues with the AppView database
···
// Parse the listRecords response
var result struct {
+
Cursor string `json:"cursor"`
Records []struct {
URI string `json:"uri"`
CID string `json:"cid"`
···
CreatedAt string `json:"createdAt"`
} `json:"value"`
} `json:"records"`
}
if err := json.Unmarshal(body, &result); err != nil {
+3 -2
internal/db/postgres/vote_repo.go
···
return nil
}
-
// GetByURI retrieves a vote by its AT-URI
// Used by Jetstream consumer for DELETE operations
func (r *postgresVoteRepo) GetByURI(ctx context.Context, uri string) (*votes.Vote, error) {
query := `
SELECT
···
subject_uri, subject_cid, direction,
created_at, indexed_at, deleted_at
FROM votes
-
WHERE uri = $1
`
var vote votes.Vote
···
return nil
}
+
// GetByURI retrieves an active vote by its AT-URI
// Used by Jetstream consumer for DELETE operations
+
// Returns ErrVoteNotFound for soft-deleted votes
func (r *postgresVoteRepo) GetByURI(ctx context.Context, uri string) (*votes.Vote, error) {
query := `
SELECT
···
subject_uri, subject_cid, direction,
created_at, indexed_at, deleted_at
FROM votes
+
WHERE uri = $1 AND deleted_at IS NULL
`
var vote votes.Vote