A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/core/users"
5 "Coves/internal/core/votes"
6 "context"
7 "database/sql"
8 "fmt"
9 "log"
10 "strings"
11 "time"
12)
13
14// VoteEventConsumer consumes vote-related events from Jetstream
15// Handles CREATE and DELETE operations for social.coves.feed.vote
16type VoteEventConsumer struct {
17 voteRepo votes.Repository
18 userService users.UserService
19 db *sql.DB // Direct DB access for atomic vote count updates
20}
21
22// NewVoteEventConsumer creates a new Jetstream consumer for vote events
23func NewVoteEventConsumer(
24 voteRepo votes.Repository,
25 userService users.UserService,
26 db *sql.DB,
27) *VoteEventConsumer {
28 return &VoteEventConsumer{
29 voteRepo: voteRepo,
30 userService: userService,
31 db: db,
32 }
33}
34
35// HandleEvent processes a Jetstream event for vote records
36func (c *VoteEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
37 // We only care about commit events for vote records
38 if event.Kind != "commit" || event.Commit == nil {
39 return nil
40 }
41
42 commit := event.Commit
43
44 // Handle vote record operations
45 if commit.Collection == "social.coves.feed.vote" {
46 switch commit.Operation {
47 case "create":
48 return c.createVote(ctx, event.Did, commit)
49 case "delete":
50 return c.deleteVote(ctx, event.Did, commit)
51 }
52 }
53
54 // Silently ignore other operations and collections
55 return nil
56}
57
58// createVote indexes a new vote from the firehose and updates post counts
59func (c *VoteEventConsumer) createVote(ctx context.Context, repoDID string, commit *CommitEvent) error {
60 if commit.Record == nil {
61 return fmt.Errorf("vote create event missing record data")
62 }
63
64 // Parse the vote record
65 voteRecord, err := parseVoteRecord(commit.Record)
66 if err != nil {
67 return fmt.Errorf("failed to parse vote record: %w", err)
68 }
69
70 // SECURITY: Validate this is a legitimate vote event
71 if err := c.validateVoteEvent(ctx, repoDID, voteRecord); err != nil {
72 log.Printf("🚨 SECURITY: Rejecting vote event: %v", err)
73 return err
74 }
75
76 // Build AT-URI for this vote
77 // Format: at://voter_did/social.coves.feed.vote/rkey
78 uri := fmt.Sprintf("at://%s/social.coves.feed.vote/%s", repoDID, commit.RKey)
79
80 // Parse timestamp from record
81 createdAt, err := time.Parse(time.RFC3339, voteRecord.CreatedAt)
82 if err != nil {
83 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err)
84 createdAt = time.Now()
85 }
86
87 // Build vote entity
88 vote := &votes.Vote{
89 URI: uri,
90 CID: commit.CID,
91 RKey: commit.RKey,
92 VoterDID: repoDID, // Vote comes from user's repository
93 SubjectURI: voteRecord.Subject.URI,
94 SubjectCID: voteRecord.Subject.CID,
95 Direction: voteRecord.Direction,
96 CreatedAt: createdAt,
97 IndexedAt: time.Now(),
98 }
99
100 // Atomically: Index vote + Update post counts
101 if err := c.indexVoteAndUpdateCounts(ctx, vote); err != nil {
102 return fmt.Errorf("failed to index vote and update counts: %w", err)
103 }
104
105 log.Printf("✓ Indexed vote: %s (%s on %s)", uri, vote.Direction, vote.SubjectURI)
106 return nil
107}
108
109// deleteVote soft-deletes a vote and updates post counts
110func (c *VoteEventConsumer) deleteVote(ctx context.Context, repoDID string, commit *CommitEvent) error {
111 // Build AT-URI for the vote being deleted
112 uri := fmt.Sprintf("at://%s/social.coves.feed.vote/%s", repoDID, commit.RKey)
113
114 // Get existing vote to know its direction (for decrementing the right counter)
115 existingVote, err := c.voteRepo.GetByURI(ctx, uri)
116 if err != nil {
117 if err == votes.ErrVoteNotFound {
118 // Idempotent: Vote already deleted or never existed
119 log.Printf("Vote already deleted or not found: %s", uri)
120 return nil
121 }
122 return fmt.Errorf("failed to get existing vote: %w", err)
123 }
124
125 // Atomically: Soft-delete vote + Update post counts
126 if err := c.deleteVoteAndUpdateCounts(ctx, existingVote); err != nil {
127 return fmt.Errorf("failed to delete vote and update counts: %w", err)
128 }
129
130 log.Printf("✓ Deleted vote: %s (%s on %s)", uri, existingVote.Direction, existingVote.SubjectURI)
131 return nil
132}
133
134// indexVoteAndUpdateCounts atomically indexes a vote and updates post vote counts
135func (c *VoteEventConsumer) indexVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) error {
136 tx, err := c.db.BeginTx(ctx, nil)
137 if err != nil {
138 return fmt.Errorf("failed to begin transaction: %w", err)
139 }
140 defer func() {
141 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
142 log.Printf("Failed to rollback transaction: %v", rollbackErr)
143 }
144 }()
145
146 // 1. Index the vote (idempotent with ON CONFLICT DO NOTHING)
147 query := `
148 INSERT INTO votes (
149 uri, cid, rkey, voter_did,
150 subject_uri, subject_cid, direction,
151 created_at, indexed_at
152 ) VALUES (
153 $1, $2, $3, $4,
154 $5, $6, $7,
155 $8, NOW()
156 )
157 ON CONFLICT (uri) DO NOTHING
158 RETURNING id
159 `
160
161 var voteID int64
162 err = tx.QueryRowContext(
163 ctx, query,
164 vote.URI, vote.CID, vote.RKey, vote.VoterDID,
165 vote.SubjectURI, vote.SubjectCID, vote.Direction,
166 vote.CreatedAt,
167 ).Scan(&voteID)
168
169 // If no rows returned, vote already exists (idempotent - OK for Jetstream replays)
170 if err == sql.ErrNoRows {
171 log.Printf("Vote already indexed: %s (idempotent)", vote.URI)
172 if commitErr := tx.Commit(); commitErr != nil {
173 return fmt.Errorf("failed to commit transaction: %w", commitErr)
174 }
175 return nil
176 }
177
178 if err != nil {
179 return fmt.Errorf("failed to insert vote: %w", err)
180 }
181
182 // 2. Update post vote counts atomically
183 // Increment upvote_count or downvote_count based on direction
184 // Also update score (upvote_count - downvote_count)
185 var updateQuery string
186 if vote.Direction == "up" {
187 updateQuery = `
188 UPDATE posts
189 SET upvote_count = upvote_count + 1,
190 score = upvote_count + 1 - downvote_count
191 WHERE uri = $1 AND deleted_at IS NULL
192 `
193 } else { // "down"
194 updateQuery = `
195 UPDATE posts
196 SET downvote_count = downvote_count + 1,
197 score = upvote_count - (downvote_count + 1)
198 WHERE uri = $1 AND deleted_at IS NULL
199 `
200 }
201
202 result, err := tx.ExecContext(ctx, updateQuery, vote.SubjectURI)
203 if err != nil {
204 return fmt.Errorf("failed to update post counts: %w", err)
205 }
206
207 rowsAffected, err := result.RowsAffected()
208 if err != nil {
209 return fmt.Errorf("failed to check update result: %w", err)
210 }
211
212 // If post doesn't exist or is deleted, that's OK (vote still indexed)
213 // Future: We could validate post exists before indexing vote
214 if rowsAffected == 0 {
215 log.Printf("Warning: Post not found or deleted: %s (vote indexed anyway)", vote.SubjectURI)
216 }
217
218 // Commit transaction
219 if err := tx.Commit(); err != nil {
220 return fmt.Errorf("failed to commit transaction: %w", err)
221 }
222
223 return nil
224}
225
226// deleteVoteAndUpdateCounts atomically soft-deletes a vote and updates post vote counts
227func (c *VoteEventConsumer) deleteVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) error {
228 tx, err := c.db.BeginTx(ctx, nil)
229 if err != nil {
230 return fmt.Errorf("failed to begin transaction: %w", err)
231 }
232 defer func() {
233 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
234 log.Printf("Failed to rollback transaction: %v", rollbackErr)
235 }
236 }()
237
238 // 1. Soft-delete the vote (idempotent)
239 deleteQuery := `
240 UPDATE votes
241 SET deleted_at = NOW()
242 WHERE uri = $1 AND deleted_at IS NULL
243 `
244
245 result, err := tx.ExecContext(ctx, deleteQuery, vote.URI)
246 if err != nil {
247 return fmt.Errorf("failed to delete vote: %w", err)
248 }
249
250 rowsAffected, err := result.RowsAffected()
251 if err != nil {
252 return fmt.Errorf("failed to check delete result: %w", err)
253 }
254
255 // Idempotent: If no rows affected, vote already deleted
256 if rowsAffected == 0 {
257 log.Printf("Vote already deleted: %s (idempotent)", vote.URI)
258 if commitErr := tx.Commit(); commitErr != nil {
259 return fmt.Errorf("failed to commit transaction: %w", commitErr)
260 }
261 return nil
262 }
263
264 // 2. Decrement post vote counts atomically
265 // Decrement upvote_count or downvote_count based on direction
266 // Also update score (use GREATEST to prevent negative counts)
267 var updateQuery string
268 if vote.Direction == "up" {
269 updateQuery = `
270 UPDATE posts
271 SET upvote_count = GREATEST(0, upvote_count - 1),
272 score = GREATEST(0, upvote_count - 1) - downvote_count
273 WHERE uri = $1 AND deleted_at IS NULL
274 `
275 } else { // "down"
276 updateQuery = `
277 UPDATE posts
278 SET downvote_count = GREATEST(0, downvote_count - 1),
279 score = upvote_count - GREATEST(0, downvote_count - 1)
280 WHERE uri = $1 AND deleted_at IS NULL
281 `
282 }
283
284 result, err = tx.ExecContext(ctx, updateQuery, vote.SubjectURI)
285 if err != nil {
286 return fmt.Errorf("failed to update post counts: %w", err)
287 }
288
289 rowsAffected, err = result.RowsAffected()
290 if err != nil {
291 return fmt.Errorf("failed to check update result: %w", err)
292 }
293
294 // If post doesn't exist or is deleted, that's OK (vote still deleted)
295 if rowsAffected == 0 {
296 log.Printf("Warning: Post not found or deleted: %s (vote deleted anyway)", vote.SubjectURI)
297 }
298
299 // Commit transaction
300 if err := tx.Commit(); err != nil {
301 return fmt.Errorf("failed to commit transaction: %w", err)
302 }
303
304 return nil
305}
306
307// validateVoteEvent performs security validation on vote events
308func (c *VoteEventConsumer) validateVoteEvent(ctx context.Context, repoDID string, vote *VoteRecordFromJetstream) error {
309 // SECURITY: Votes MUST come from user repositories (repo owner = voter DID)
310 // The repository owner (repoDID) IS the voter - votes are stored in user repos.
311 //
312 // We do NOT check if the user exists in AppView because:
313 // 1. Vote events may arrive before user events in Jetstream (race condition)
314 // 2. The vote came from the user's PDS repository (authenticated by PDS)
315 // 3. The database FK constraint was removed to allow out-of-order indexing
316 // 4. Orphaned votes (from never-indexed users) are harmless
317 //
318 // Security is maintained because:
319 // - Vote must come from user's own PDS repository (verified by atProto)
320 // - Communities cannot create votes in their repos (different collection)
321 // - Fake DIDs will fail PDS authentication
322
323 // Validate DID format (basic sanity check)
324 if !strings.HasPrefix(repoDID, "did:") {
325 return fmt.Errorf("invalid voter DID format: %s", repoDID)
326 }
327
328 // Validate vote direction
329 if vote.Direction != "up" && vote.Direction != "down" {
330 return fmt.Errorf("invalid vote direction: %s (must be 'up' or 'down')", vote.Direction)
331 }
332
333 // Validate subject has both URI and CID (strong reference)
334 if vote.Subject.URI == "" || vote.Subject.CID == "" {
335 return fmt.Errorf("invalid subject: must have both URI and CID (strong reference)")
336 }
337
338 return nil
339}
340
341// VoteRecordFromJetstream represents a vote record as received from Jetstream
342type VoteRecordFromJetstream struct {
343 Subject StrongRefFromJetstream `json:"subject"`
344 Direction string `json:"direction"`
345 CreatedAt string `json:"createdAt"`
346}
347
348// StrongRefFromJetstream represents a strong reference (URI + CID)
349type StrongRefFromJetstream struct {
350 URI string `json:"uri"`
351 CID string `json:"cid"`
352}
353
354// parseVoteRecord parses a vote record from Jetstream event data
355func parseVoteRecord(record map[string]interface{}) (*VoteRecordFromJetstream, error) {
356 // Extract subject (strong reference)
357 subjectMap, ok := record["subject"].(map[string]interface{})
358 if !ok {
359 return nil, fmt.Errorf("missing or invalid subject field")
360 }
361
362 subjectURI, _ := subjectMap["uri"].(string)
363 subjectCID, _ := subjectMap["cid"].(string)
364
365 // Extract direction
366 direction, _ := record["direction"].(string)
367
368 // Extract createdAt
369 createdAt, _ := record["createdAt"].(string)
370
371 return &VoteRecordFromJetstream{
372 Subject: StrongRefFromJetstream{
373 URI: subjectURI,
374 CID: subjectCID,
375 },
376 Direction: direction,
377 CreatedAt: createdAt,
378 }, nil
379}