A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/atproto/utils"
5 "Coves/internal/core/users"
6 "Coves/internal/core/votes"
7 "context"
8 "database/sql"
9 "fmt"
10 "log"
11 "strings"
12 "time"
13)
14
15// VoteEventConsumer consumes vote-related events from Jetstream
16// Handles CREATE and DELETE operations for social.coves.feed.vote
17type VoteEventConsumer struct {
18 voteRepo votes.Repository
19 userService users.UserService
20 db *sql.DB // Direct DB access for atomic vote count updates
21}
22
23// NewVoteEventConsumer creates a new Jetstream consumer for vote events
24func NewVoteEventConsumer(
25 voteRepo votes.Repository,
26 userService users.UserService,
27 db *sql.DB,
28) *VoteEventConsumer {
29 return &VoteEventConsumer{
30 voteRepo: voteRepo,
31 userService: userService,
32 db: db,
33 }
34}
35
36// HandleEvent processes a Jetstream event for vote records
37func (c *VoteEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
38 // We only care about commit events for vote records
39 if event.Kind != "commit" || event.Commit == nil {
40 return nil
41 }
42
43 commit := event.Commit
44
45 // Handle vote record operations
46 if commit.Collection == "social.coves.feed.vote" {
47 switch commit.Operation {
48 case "create":
49 return c.createVote(ctx, event.Did, commit)
50 case "delete":
51 return c.deleteVote(ctx, event.Did, commit)
52 }
53 }
54
55 // Silently ignore other operations and collections
56 return nil
57}
58
59// createVote indexes a new vote from the firehose and updates post counts
60func (c *VoteEventConsumer) createVote(ctx context.Context, repoDID string, commit *CommitEvent) error {
61 if commit.Record == nil {
62 return fmt.Errorf("vote create event missing record data")
63 }
64
65 // Parse the vote record
66 voteRecord, err := parseVoteRecord(commit.Record)
67 if err != nil {
68 return fmt.Errorf("failed to parse vote record: %w", err)
69 }
70
71 // SECURITY: Validate this is a legitimate vote event
72 if err := c.validateVoteEvent(ctx, repoDID, voteRecord); err != nil {
73 log.Printf("🚨 SECURITY: Rejecting vote event: %v", err)
74 return err
75 }
76
77 // Build AT-URI for this vote
78 // Format: at://voter_did/social.coves.feed.vote/rkey
79 uri := fmt.Sprintf("at://%s/social.coves.feed.vote/%s", repoDID, commit.RKey)
80
81 // Parse timestamp from record
82 createdAt, err := time.Parse(time.RFC3339, voteRecord.CreatedAt)
83 if err != nil {
84 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err)
85 createdAt = time.Now()
86 }
87
88 // Build vote entity
89 vote := &votes.Vote{
90 URI: uri,
91 CID: commit.CID,
92 RKey: commit.RKey,
93 VoterDID: repoDID, // Vote comes from user's repository
94 SubjectURI: voteRecord.Subject.URI,
95 SubjectCID: voteRecord.Subject.CID,
96 Direction: voteRecord.Direction,
97 CreatedAt: createdAt,
98 IndexedAt: time.Now(),
99 }
100
101 // Atomically: Index vote + Update post counts
102 if err := c.indexVoteAndUpdateCounts(ctx, vote); err != nil {
103 return fmt.Errorf("failed to index vote and update counts: %w", err)
104 }
105
106 log.Printf("✓ Indexed vote: %s (%s on %s)", uri, vote.Direction, vote.SubjectURI)
107 return nil
108}
109
110// deleteVote soft-deletes a vote and updates post counts
111func (c *VoteEventConsumer) deleteVote(ctx context.Context, repoDID string, commit *CommitEvent) error {
112 // Build AT-URI for the vote being deleted
113 uri := fmt.Sprintf("at://%s/social.coves.feed.vote/%s", repoDID, commit.RKey)
114
115 // Get existing vote to know its direction (for decrementing the right counter)
116 existingVote, err := c.voteRepo.GetByURI(ctx, uri)
117 if err != nil {
118 if err == votes.ErrVoteNotFound {
119 // Idempotent: Vote already deleted or never existed
120 log.Printf("Vote already deleted or not found: %s", uri)
121 return nil
122 }
123 return fmt.Errorf("failed to get existing vote: %w", err)
124 }
125
126 // Atomically: Soft-delete vote + Update post counts
127 if err := c.deleteVoteAndUpdateCounts(ctx, existingVote); err != nil {
128 return fmt.Errorf("failed to delete vote and update counts: %w", err)
129 }
130
131 log.Printf("✓ Deleted vote: %s (%s on %s)", uri, existingVote.Direction, existingVote.SubjectURI)
132 return nil
133}
134
135// indexVoteAndUpdateCounts atomically indexes a vote and updates post vote counts
136func (c *VoteEventConsumer) indexVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) error {
137 tx, err := c.db.BeginTx(ctx, nil)
138 if err != nil {
139 return fmt.Errorf("failed to begin transaction: %w", err)
140 }
141 defer func() {
142 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
143 log.Printf("Failed to rollback transaction: %v", rollbackErr)
144 }
145 }()
146
147 // 1. Index the vote (idempotent with ON CONFLICT DO NOTHING)
148 query := `
149 INSERT INTO votes (
150 uri, cid, rkey, voter_did,
151 subject_uri, subject_cid, direction,
152 created_at, indexed_at
153 ) VALUES (
154 $1, $2, $3, $4,
155 $5, $6, $7,
156 $8, NOW()
157 )
158 ON CONFLICT (uri) DO NOTHING
159 RETURNING id
160 `
161
162 var voteID int64
163 err = tx.QueryRowContext(
164 ctx, query,
165 vote.URI, vote.CID, vote.RKey, vote.VoterDID,
166 vote.SubjectURI, vote.SubjectCID, vote.Direction,
167 vote.CreatedAt,
168 ).Scan(&voteID)
169
170 // If no rows returned, vote already exists (idempotent - OK for Jetstream replays)
171 if err == sql.ErrNoRows {
172 log.Printf("Vote already indexed: %s (idempotent)", vote.URI)
173 if commitErr := tx.Commit(); commitErr != nil {
174 return fmt.Errorf("failed to commit transaction: %w", commitErr)
175 }
176 return nil
177 }
178
179 if err != nil {
180 return fmt.Errorf("failed to insert vote: %w", err)
181 }
182
183 // 2. Update vote counts on the subject (post or comment)
184 // Parse collection from subject URI to determine target table
185 collection := utils.ExtractCollectionFromURI(vote.SubjectURI)
186
187 var updateQuery string
188 switch collection {
189 case "social.coves.community.post":
190 // Vote on post - update posts table
191 if vote.Direction == "up" {
192 updateQuery = `
193 UPDATE posts
194 SET upvote_count = upvote_count + 1,
195 score = upvote_count + 1 - downvote_count
196 WHERE uri = $1 AND deleted_at IS NULL
197 `
198 } else { // "down"
199 updateQuery = `
200 UPDATE posts
201 SET downvote_count = downvote_count + 1,
202 score = upvote_count - (downvote_count + 1)
203 WHERE uri = $1 AND deleted_at IS NULL
204 `
205 }
206
207 case "social.coves.community.comment":
208 // Vote on comment - update comments table
209 if vote.Direction == "up" {
210 updateQuery = `
211 UPDATE comments
212 SET upvote_count = upvote_count + 1,
213 score = upvote_count + 1 - downvote_count
214 WHERE uri = $1 AND deleted_at IS NULL
215 `
216 } else { // "down"
217 updateQuery = `
218 UPDATE comments
219 SET downvote_count = downvote_count + 1,
220 score = upvote_count - (downvote_count + 1)
221 WHERE uri = $1 AND deleted_at IS NULL
222 `
223 }
224
225 default:
226 // Unknown or unsupported collection
227 // Vote is still indexed in votes table, we just don't update denormalized counts
228 log.Printf("Vote subject has unsupported collection: %s (vote indexed, counts not updated)", collection)
229 if commitErr := tx.Commit(); commitErr != nil {
230 return fmt.Errorf("failed to commit transaction: %w", commitErr)
231 }
232 return nil
233 }
234
235 result, err := tx.ExecContext(ctx, updateQuery, vote.SubjectURI)
236 if err != nil {
237 return fmt.Errorf("failed to update vote counts: %w", err)
238 }
239
240 rowsAffected, err := result.RowsAffected()
241 if err != nil {
242 return fmt.Errorf("failed to check update result: %w", err)
243 }
244
245 // If subject doesn't exist or is deleted, that's OK (vote still indexed)
246 if rowsAffected == 0 {
247 log.Printf("Warning: Vote subject not found or deleted: %s (vote indexed anyway)", vote.SubjectURI)
248 }
249
250 // Commit transaction
251 if err := tx.Commit(); err != nil {
252 return fmt.Errorf("failed to commit transaction: %w", err)
253 }
254
255 return nil
256}
257
258// deleteVoteAndUpdateCounts atomically soft-deletes a vote and updates post vote counts
259func (c *VoteEventConsumer) deleteVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) error {
260 tx, err := c.db.BeginTx(ctx, nil)
261 if err != nil {
262 return fmt.Errorf("failed to begin transaction: %w", err)
263 }
264 defer func() {
265 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
266 log.Printf("Failed to rollback transaction: %v", rollbackErr)
267 }
268 }()
269
270 // 1. Soft-delete the vote (idempotent)
271 deleteQuery := `
272 UPDATE votes
273 SET deleted_at = NOW()
274 WHERE uri = $1 AND deleted_at IS NULL
275 `
276
277 result, err := tx.ExecContext(ctx, deleteQuery, vote.URI)
278 if err != nil {
279 return fmt.Errorf("failed to delete vote: %w", err)
280 }
281
282 rowsAffected, err := result.RowsAffected()
283 if err != nil {
284 return fmt.Errorf("failed to check delete result: %w", err)
285 }
286
287 // Idempotent: If no rows affected, vote already deleted
288 if rowsAffected == 0 {
289 log.Printf("Vote already deleted: %s (idempotent)", vote.URI)
290 if commitErr := tx.Commit(); commitErr != nil {
291 return fmt.Errorf("failed to commit transaction: %w", commitErr)
292 }
293 return nil
294 }
295
296 // 2. Decrement vote counts on the subject (post or comment)
297 // Parse collection from subject URI to determine target table
298 collection := utils.ExtractCollectionFromURI(vote.SubjectURI)
299
300 var updateQuery string
301 switch collection {
302 case "social.coves.community.post":
303 // Vote on post - update posts table
304 if vote.Direction == "up" {
305 updateQuery = `
306 UPDATE posts
307 SET upvote_count = GREATEST(0, upvote_count - 1),
308 score = GREATEST(0, upvote_count - 1) - downvote_count
309 WHERE uri = $1 AND deleted_at IS NULL
310 `
311 } else { // "down"
312 updateQuery = `
313 UPDATE posts
314 SET downvote_count = GREATEST(0, downvote_count - 1),
315 score = upvote_count - GREATEST(0, downvote_count - 1)
316 WHERE uri = $1 AND deleted_at IS NULL
317 `
318 }
319
320 case "social.coves.community.comment":
321 // Vote on comment - update comments table
322 if vote.Direction == "up" {
323 updateQuery = `
324 UPDATE comments
325 SET upvote_count = GREATEST(0, upvote_count - 1),
326 score = GREATEST(0, upvote_count - 1) - downvote_count
327 WHERE uri = $1 AND deleted_at IS NULL
328 `
329 } else { // "down"
330 updateQuery = `
331 UPDATE comments
332 SET downvote_count = GREATEST(0, downvote_count - 1),
333 score = upvote_count - GREATEST(0, downvote_count - 1)
334 WHERE uri = $1 AND deleted_at IS NULL
335 `
336 }
337
338 default:
339 // Unknown or unsupported collection
340 // Vote is still deleted, we just don't update denormalized counts
341 log.Printf("Vote subject has unsupported collection: %s (vote deleted, counts not updated)", collection)
342 if commitErr := tx.Commit(); commitErr != nil {
343 return fmt.Errorf("failed to commit transaction: %w", commitErr)
344 }
345 return nil
346 }
347
348 result, err = tx.ExecContext(ctx, updateQuery, vote.SubjectURI)
349 if err != nil {
350 return fmt.Errorf("failed to update vote counts: %w", err)
351 }
352
353 rowsAffected, err = result.RowsAffected()
354 if err != nil {
355 return fmt.Errorf("failed to check update result: %w", err)
356 }
357
358 // If subject doesn't exist or is deleted, that's OK (vote still deleted)
359 if rowsAffected == 0 {
360 log.Printf("Warning: Vote subject not found or deleted: %s (vote deleted anyway)", vote.SubjectURI)
361 }
362
363 // Commit transaction
364 if err := tx.Commit(); err != nil {
365 return fmt.Errorf("failed to commit transaction: %w", err)
366 }
367
368 return nil
369}
370
371// validateVoteEvent performs security validation on vote events
372func (c *VoteEventConsumer) validateVoteEvent(ctx context.Context, repoDID string, vote *VoteRecordFromJetstream) error {
373 // SECURITY: Votes MUST come from user repositories (repo owner = voter DID)
374 // The repository owner (repoDID) IS the voter - votes are stored in user repos.
375 //
376 // We do NOT check if the user exists in AppView because:
377 // 1. Vote events may arrive before user events in Jetstream (race condition)
378 // 2. The vote came from the user's PDS repository (authenticated by PDS)
379 // 3. The database FK constraint was removed to allow out-of-order indexing
380 // 4. Orphaned votes (from never-indexed users) are harmless
381 //
382 // Security is maintained because:
383 // - Vote must come from user's own PDS repository (verified by atProto)
384 // - Communities cannot create votes in their repos (different collection)
385 // - Fake DIDs will fail PDS authentication
386
387 // Validate DID format (basic sanity check)
388 if !strings.HasPrefix(repoDID, "did:") {
389 return fmt.Errorf("invalid voter DID format: %s", repoDID)
390 }
391
392 // Validate vote direction
393 if vote.Direction != "up" && vote.Direction != "down" {
394 return fmt.Errorf("invalid vote direction: %s (must be 'up' or 'down')", vote.Direction)
395 }
396
397 // Validate subject has both URI and CID (strong reference)
398 if vote.Subject.URI == "" || vote.Subject.CID == "" {
399 return fmt.Errorf("invalid subject: must have both URI and CID (strong reference)")
400 }
401
402 return nil
403}
404
405// VoteRecordFromJetstream represents a vote record as received from Jetstream
406type VoteRecordFromJetstream struct {
407 Subject StrongRefFromJetstream `json:"subject"`
408 Direction string `json:"direction"`
409 CreatedAt string `json:"createdAt"`
410}
411
412// StrongRefFromJetstream represents a strong reference (URI + CID)
413type StrongRefFromJetstream struct {
414 URI string `json:"uri"`
415 CID string `json:"cid"`
416}
417
418// parseVoteRecord parses a vote record from Jetstream event data
419func parseVoteRecord(record map[string]interface{}) (*VoteRecordFromJetstream, error) {
420 // Extract subject (strong reference)
421 subjectMap, ok := record["subject"].(map[string]interface{})
422 if !ok {
423 return nil, fmt.Errorf("missing or invalid subject field")
424 }
425
426 subjectURI, _ := subjectMap["uri"].(string)
427 subjectCID, _ := subjectMap["cid"].(string)
428
429 // Extract direction
430 direction, _ := record["direction"].(string)
431
432 // Extract createdAt
433 createdAt, _ := record["createdAt"].(string)
434
435 return &VoteRecordFromJetstream{
436 Subject: StrongRefFromJetstream{
437 URI: subjectURI,
438 CID: subjectCID,
439 },
440 Direction: direction,
441 CreatedAt: createdAt,
442 }, nil
443}