A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/atproto/utils"
5 "Coves/internal/core/users"
6 "Coves/internal/core/votes"
7 "context"
8 "database/sql"
9 "fmt"
10 "log"
11 "strings"
12 "time"
13)
14
15// VoteEventConsumer consumes vote-related events from Jetstream
16// Handles CREATE and DELETE operations for social.coves.feed.vote
17type VoteEventConsumer struct {
18 voteRepo votes.Repository
19 userService users.UserService
20 db *sql.DB // Direct DB access for atomic vote count updates
21}
22
23// NewVoteEventConsumer creates a new Jetstream consumer for vote events
24func NewVoteEventConsumer(
25 voteRepo votes.Repository,
26 userService users.UserService,
27 db *sql.DB,
28) *VoteEventConsumer {
29 return &VoteEventConsumer{
30 voteRepo: voteRepo,
31 userService: userService,
32 db: db,
33 }
34}
35
36// HandleEvent processes a Jetstream event for vote records
37func (c *VoteEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
38 // We only care about commit events for vote records
39 if event.Kind != "commit" || event.Commit == nil {
40 return nil
41 }
42
43 commit := event.Commit
44
45 // Handle vote record operations
46 if commit.Collection == "social.coves.feed.vote" {
47 switch commit.Operation {
48 case "create":
49 return c.createVote(ctx, event.Did, commit)
50 case "delete":
51 return c.deleteVote(ctx, event.Did, commit)
52 }
53 }
54
55 // Silently ignore other operations and collections
56 return nil
57}
58
59// createVote indexes a new vote from the firehose and updates post counts
60func (c *VoteEventConsumer) createVote(ctx context.Context, repoDID string, commit *CommitEvent) error {
61 if commit.Record == nil {
62 return fmt.Errorf("vote create event missing record data")
63 }
64
65 // Parse the vote record
66 voteRecord, err := parseVoteRecord(commit.Record)
67 if err != nil {
68 return fmt.Errorf("failed to parse vote record: %w", err)
69 }
70
71 // SECURITY: Validate this is a legitimate vote event
72 if err := c.validateVoteEvent(ctx, repoDID, voteRecord); err != nil {
73 log.Printf("🚨 SECURITY: Rejecting vote event: %v", err)
74 return err
75 }
76
77 // Build AT-URI for this vote
78 // Format: at://voter_did/social.coves.feed.vote/rkey
79 uri := fmt.Sprintf("at://%s/social.coves.feed.vote/%s", repoDID, commit.RKey)
80
81 // Parse timestamp from record
82 createdAt, err := time.Parse(time.RFC3339, voteRecord.CreatedAt)
83 if err != nil {
84 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err)
85 createdAt = time.Now()
86 }
87
88 // Build vote entity
89 vote := &votes.Vote{
90 URI: uri,
91 CID: commit.CID,
92 RKey: commit.RKey,
93 VoterDID: repoDID, // Vote comes from user's repository
94 SubjectURI: voteRecord.Subject.URI,
95 SubjectCID: voteRecord.Subject.CID,
96 Direction: voteRecord.Direction,
97 CreatedAt: createdAt,
98 IndexedAt: time.Now(),
99 }
100
101 // Atomically: Index vote + Update post counts
102 wasNew, err := c.indexVoteAndUpdateCounts(ctx, vote)
103 if err != nil {
104 return fmt.Errorf("failed to index vote and update counts: %w", err)
105 }
106
107 if wasNew {
108 log.Printf("✓ Indexed vote: %s (%s on %s)", uri, vote.Direction, vote.SubjectURI)
109 }
110 return nil
111}
112
113// deleteVote soft-deletes a vote and updates post counts
114func (c *VoteEventConsumer) deleteVote(ctx context.Context, repoDID string, commit *CommitEvent) error {
115 // Build AT-URI for the vote being deleted
116 uri := fmt.Sprintf("at://%s/social.coves.feed.vote/%s", repoDID, commit.RKey)
117
118 // Get existing vote to know its direction (for decrementing the right counter)
119 existingVote, err := c.voteRepo.GetByURI(ctx, uri)
120 if err != nil {
121 if err == votes.ErrVoteNotFound {
122 // Idempotent: Vote already deleted or never existed
123 log.Printf("Vote already deleted or not found: %s", uri)
124 return nil
125 }
126 return fmt.Errorf("failed to get existing vote: %w", err)
127 }
128
129 // Atomically: Soft-delete vote + Update post counts
130 if err := c.deleteVoteAndUpdateCounts(ctx, existingVote); err != nil {
131 return fmt.Errorf("failed to delete vote and update counts: %w", err)
132 }
133
134 log.Printf("✓ Deleted vote: %s (%s on %s)", uri, existingVote.Direction, existingVote.SubjectURI)
135 return nil
136}
137
138// indexVoteAndUpdateCounts atomically indexes a vote and updates post vote counts
139// Returns (true, nil) if vote was newly inserted, (false, nil) if already existed (idempotent)
140func (c *VoteEventConsumer) indexVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) (bool, error) {
141 tx, err := c.db.BeginTx(ctx, nil)
142 if err != nil {
143 return false, fmt.Errorf("failed to begin transaction: %w", err)
144 }
145 defer func() {
146 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
147 log.Printf("Failed to rollback transaction: %v", rollbackErr)
148 }
149 }()
150
151 // 1. Check for existing active vote with different URI (stale record)
152 // This handles cases where:
153 // - User voted on another client and we missed the delete event
154 // - Vote was reindexed but user created a new vote with different rkey
155 // - Any other state mismatch between PDS and AppView
156 var existingDirection sql.NullString
157 checkQuery := `
158 SELECT direction FROM votes
159 WHERE voter_did = $1
160 AND subject_uri = $2
161 AND deleted_at IS NULL
162 AND uri != $3
163 LIMIT 1
164 `
165 if err := tx.QueryRowContext(ctx, checkQuery, vote.VoterDID, vote.SubjectURI, vote.URI).Scan(&existingDirection); err != nil && err != sql.ErrNoRows {
166 return false, fmt.Errorf("failed to check existing vote: %w", err)
167 }
168
169 // If there's a stale vote, soft-delete it and adjust counts
170 if existingDirection.Valid {
171 softDeleteQuery := `
172 UPDATE votes
173 SET deleted_at = NOW()
174 WHERE voter_did = $1
175 AND subject_uri = $2
176 AND deleted_at IS NULL
177 AND uri != $3
178 `
179 if _, err := tx.ExecContext(ctx, softDeleteQuery, vote.VoterDID, vote.SubjectURI, vote.URI); err != nil {
180 return false, fmt.Errorf("failed to soft-delete existing votes: %w", err)
181 }
182
183 // Decrement the old vote's count (will be re-incremented below if same direction)
184 collection := utils.ExtractCollectionFromURI(vote.SubjectURI)
185 var decrementQuery string
186 if existingDirection.String == "up" {
187 if collection == "social.coves.community.post" {
188 decrementQuery = `UPDATE posts SET upvote_count = GREATEST(0, upvote_count - 1), score = upvote_count - 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL`
189 } else if collection == "social.coves.community.comment" {
190 decrementQuery = `UPDATE comments SET upvote_count = GREATEST(0, upvote_count - 1), score = upvote_count - 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL`
191 }
192 } else {
193 if collection == "social.coves.community.post" {
194 decrementQuery = `UPDATE posts SET downvote_count = GREATEST(0, downvote_count - 1), score = upvote_count - (downvote_count - 1) WHERE uri = $1 AND deleted_at IS NULL`
195 } else if collection == "social.coves.community.comment" {
196 decrementQuery = `UPDATE comments SET downvote_count = GREATEST(0, downvote_count - 1), score = upvote_count - (downvote_count - 1) WHERE uri = $1 AND deleted_at IS NULL`
197 }
198 }
199 if decrementQuery != "" {
200 if _, err := tx.ExecContext(ctx, decrementQuery, vote.SubjectURI); err != nil {
201 return false, fmt.Errorf("failed to decrement old vote count: %w", err)
202 }
203 }
204 log.Printf("Cleaned up stale vote for %s on %s (was %s)", vote.VoterDID, vote.SubjectURI, existingDirection.String)
205 }
206
207 // 2. Index the vote (idempotent with ON CONFLICT DO NOTHING)
208 query := `
209 INSERT INTO votes (
210 uri, cid, rkey, voter_did,
211 subject_uri, subject_cid, direction,
212 created_at, indexed_at
213 ) VALUES (
214 $1, $2, $3, $4,
215 $5, $6, $7,
216 $8, NOW()
217 )
218 ON CONFLICT (uri) DO NOTHING
219 RETURNING id
220 `
221
222 var voteID int64
223 err = tx.QueryRowContext(
224 ctx, query,
225 vote.URI, vote.CID, vote.RKey, vote.VoterDID,
226 vote.SubjectURI, vote.SubjectCID, vote.Direction,
227 vote.CreatedAt,
228 ).Scan(&voteID)
229
230 // If no rows returned, vote already exists (idempotent - OK for Jetstream replays)
231 if err == sql.ErrNoRows {
232 // Silently handle idempotent case - no log needed for replayed events
233 if commitErr := tx.Commit(); commitErr != nil {
234 return false, fmt.Errorf("failed to commit transaction: %w", commitErr)
235 }
236 return false, nil // Vote already existed
237 }
238
239 if err != nil {
240 return false, fmt.Errorf("failed to insert vote: %w", err)
241 }
242
243 // 3. Update vote counts on the subject (post or comment)
244 // Parse collection from subject URI to determine target table
245 collection := utils.ExtractCollectionFromURI(vote.SubjectURI)
246
247 var updateQuery string
248 switch collection {
249 case "social.coves.community.post":
250 // Vote on post - update posts table
251 if vote.Direction == "up" {
252 updateQuery = `
253 UPDATE posts
254 SET upvote_count = upvote_count + 1,
255 score = upvote_count + 1 - downvote_count
256 WHERE uri = $1 AND deleted_at IS NULL
257 `
258 } else { // "down"
259 updateQuery = `
260 UPDATE posts
261 SET downvote_count = downvote_count + 1,
262 score = upvote_count - (downvote_count + 1)
263 WHERE uri = $1 AND deleted_at IS NULL
264 `
265 }
266
267 case "social.coves.community.comment":
268 // Vote on comment - update comments table
269 if vote.Direction == "up" {
270 updateQuery = `
271 UPDATE comments
272 SET upvote_count = upvote_count + 1,
273 score = upvote_count + 1 - downvote_count
274 WHERE uri = $1 AND deleted_at IS NULL
275 `
276 } else { // "down"
277 updateQuery = `
278 UPDATE comments
279 SET downvote_count = downvote_count + 1,
280 score = upvote_count - (downvote_count + 1)
281 WHERE uri = $1 AND deleted_at IS NULL
282 `
283 }
284
285 default:
286 // Unknown or unsupported collection
287 // Vote is still indexed in votes table, we just don't update denormalized counts
288 log.Printf("Vote subject has unsupported collection: %s (vote indexed, counts not updated)", collection)
289 if commitErr := tx.Commit(); commitErr != nil {
290 return false, fmt.Errorf("failed to commit transaction: %w", commitErr)
291 }
292 return true, nil // Vote was newly indexed
293 }
294
295 result, err := tx.ExecContext(ctx, updateQuery, vote.SubjectURI)
296 if err != nil {
297 return false, fmt.Errorf("failed to update vote counts: %w", err)
298 }
299
300 rowsAffected, err := result.RowsAffected()
301 if err != nil {
302 return false, fmt.Errorf("failed to check update result: %w", err)
303 }
304
305 // If subject doesn't exist or is deleted, that's OK (vote still indexed)
306 if rowsAffected == 0 {
307 log.Printf("Warning: Vote subject not found or deleted: %s (vote indexed anyway)", vote.SubjectURI)
308 }
309
310 // Commit transaction
311 if err := tx.Commit(); err != nil {
312 return false, fmt.Errorf("failed to commit transaction: %w", err)
313 }
314
315 return true, nil // Vote was newly indexed
316}
317
318// deleteVoteAndUpdateCounts atomically soft-deletes a vote and updates post vote counts
319func (c *VoteEventConsumer) deleteVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) error {
320 tx, err := c.db.BeginTx(ctx, nil)
321 if err != nil {
322 return fmt.Errorf("failed to begin transaction: %w", err)
323 }
324 defer func() {
325 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
326 log.Printf("Failed to rollback transaction: %v", rollbackErr)
327 }
328 }()
329
330 // 1. Soft-delete the vote (idempotent)
331 deleteQuery := `
332 UPDATE votes
333 SET deleted_at = NOW()
334 WHERE uri = $1 AND deleted_at IS NULL
335 `
336
337 result, err := tx.ExecContext(ctx, deleteQuery, vote.URI)
338 if err != nil {
339 return fmt.Errorf("failed to delete vote: %w", err)
340 }
341
342 rowsAffected, err := result.RowsAffected()
343 if err != nil {
344 return fmt.Errorf("failed to check delete result: %w", err)
345 }
346
347 // Idempotent: If no rows affected, vote already deleted
348 if rowsAffected == 0 {
349 log.Printf("Vote already deleted: %s (idempotent)", vote.URI)
350 if commitErr := tx.Commit(); commitErr != nil {
351 return fmt.Errorf("failed to commit transaction: %w", commitErr)
352 }
353 return nil
354 }
355
356 // 2. Decrement vote counts on the subject (post or comment)
357 // Parse collection from subject URI to determine target table
358 collection := utils.ExtractCollectionFromURI(vote.SubjectURI)
359
360 var updateQuery string
361 switch collection {
362 case "social.coves.community.post":
363 // Vote on post - update posts table
364 if vote.Direction == "up" {
365 updateQuery = `
366 UPDATE posts
367 SET upvote_count = GREATEST(0, upvote_count - 1),
368 score = GREATEST(0, upvote_count - 1) - downvote_count
369 WHERE uri = $1 AND deleted_at IS NULL
370 `
371 } else { // "down"
372 updateQuery = `
373 UPDATE posts
374 SET downvote_count = GREATEST(0, downvote_count - 1),
375 score = upvote_count - GREATEST(0, downvote_count - 1)
376 WHERE uri = $1 AND deleted_at IS NULL
377 `
378 }
379
380 case "social.coves.community.comment":
381 // Vote on comment - update comments table
382 if vote.Direction == "up" {
383 updateQuery = `
384 UPDATE comments
385 SET upvote_count = GREATEST(0, upvote_count - 1),
386 score = GREATEST(0, upvote_count - 1) - downvote_count
387 WHERE uri = $1 AND deleted_at IS NULL
388 `
389 } else { // "down"
390 updateQuery = `
391 UPDATE comments
392 SET downvote_count = GREATEST(0, downvote_count - 1),
393 score = upvote_count - GREATEST(0, downvote_count - 1)
394 WHERE uri = $1 AND deleted_at IS NULL
395 `
396 }
397
398 default:
399 // Unknown or unsupported collection
400 // Vote is still deleted, we just don't update denormalized counts
401 log.Printf("Vote subject has unsupported collection: %s (vote deleted, counts not updated)", collection)
402 if commitErr := tx.Commit(); commitErr != nil {
403 return fmt.Errorf("failed to commit transaction: %w", commitErr)
404 }
405 return nil
406 }
407
408 result, err = tx.ExecContext(ctx, updateQuery, vote.SubjectURI)
409 if err != nil {
410 return fmt.Errorf("failed to update vote counts: %w", err)
411 }
412
413 rowsAffected, err = result.RowsAffected()
414 if err != nil {
415 return fmt.Errorf("failed to check update result: %w", err)
416 }
417
418 // If subject doesn't exist or is deleted, that's OK (vote still deleted)
419 if rowsAffected == 0 {
420 log.Printf("Warning: Vote subject not found or deleted: %s (vote deleted anyway)", vote.SubjectURI)
421 }
422
423 // Commit transaction
424 if err := tx.Commit(); err != nil {
425 return fmt.Errorf("failed to commit transaction: %w", err)
426 }
427
428 return nil
429}
430
431// validateVoteEvent performs security validation on vote events
432func (c *VoteEventConsumer) validateVoteEvent(ctx context.Context, repoDID string, vote *VoteRecordFromJetstream) error {
433 // SECURITY: Votes MUST come from user repositories (repo owner = voter DID)
434 // The repository owner (repoDID) IS the voter - votes are stored in user repos.
435 //
436 // We do NOT check if the user exists in AppView because:
437 // 1. Vote events may arrive before user events in Jetstream (race condition)
438 // 2. The vote came from the user's PDS repository (authenticated by PDS)
439 // 3. The database FK constraint was removed to allow out-of-order indexing
440 // 4. Orphaned votes (from never-indexed users) are harmless
441 //
442 // Security is maintained because:
443 // - Vote must come from user's own PDS repository (verified by atProto)
444 // - Communities cannot create votes in their repos (different collection)
445 // - Fake DIDs will fail PDS authentication
446
447 // Validate DID format (basic sanity check)
448 if !strings.HasPrefix(repoDID, "did:") {
449 return fmt.Errorf("invalid voter DID format: %s", repoDID)
450 }
451
452 // Validate vote direction
453 if vote.Direction != "up" && vote.Direction != "down" {
454 return fmt.Errorf("invalid vote direction: %s (must be 'up' or 'down')", vote.Direction)
455 }
456
457 // Validate subject has both URI and CID (strong reference)
458 if vote.Subject.URI == "" || vote.Subject.CID == "" {
459 return fmt.Errorf("invalid subject: must have both URI and CID (strong reference)")
460 }
461
462 return nil
463}
464
465// VoteRecordFromJetstream represents a vote record as received from Jetstream
466type VoteRecordFromJetstream struct {
467 Subject StrongRefFromJetstream `json:"subject"`
468 Direction string `json:"direction"`
469 CreatedAt string `json:"createdAt"`
470}
471
472// StrongRefFromJetstream represents a strong reference (URI + CID)
473type StrongRefFromJetstream struct {
474 URI string `json:"uri"`
475 CID string `json:"cid"`
476}
477
478// parseVoteRecord parses a vote record from Jetstream event data
479func parseVoteRecord(record map[string]interface{}) (*VoteRecordFromJetstream, error) {
480 // Extract subject (strong reference)
481 subjectMap, ok := record["subject"].(map[string]interface{})
482 if !ok {
483 return nil, fmt.Errorf("missing or invalid subject field")
484 }
485
486 subjectURI, _ := subjectMap["uri"].(string)
487 subjectCID, _ := subjectMap["cid"].(string)
488
489 // Extract direction
490 direction, _ := record["direction"].(string)
491
492 // Extract createdAt
493 createdAt, _ := record["createdAt"].(string)
494
495 return &VoteRecordFromJetstream{
496 Subject: StrongRefFromJetstream{
497 URI: subjectURI,
498 CID: subjectCID,
499 },
500 Direction: direction,
501 CreatedAt: createdAt,
502 }, nil
503}