A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/core/users" 5 "Coves/internal/core/votes" 6 "context" 7 "database/sql" 8 "fmt" 9 "log" 10 "strings" 11 "time" 12) 13 14// VoteEventConsumer consumes vote-related events from Jetstream 15// Handles CREATE and DELETE operations for social.coves.feed.vote 16type VoteEventConsumer struct { 17 voteRepo votes.Repository 18 userService users.UserService 19 db *sql.DB // Direct DB access for atomic vote count updates 20} 21 22// NewVoteEventConsumer creates a new Jetstream consumer for vote events 23func NewVoteEventConsumer( 24 voteRepo votes.Repository, 25 userService users.UserService, 26 db *sql.DB, 27) *VoteEventConsumer { 28 return &VoteEventConsumer{ 29 voteRepo: voteRepo, 30 userService: userService, 31 db: db, 32 } 33} 34 35// HandleEvent processes a Jetstream event for vote records 36func (c *VoteEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 37 // We only care about commit events for vote records 38 if event.Kind != "commit" || event.Commit == nil { 39 return nil 40 } 41 42 commit := event.Commit 43 44 // Handle vote record operations 45 if commit.Collection == "social.coves.feed.vote" { 46 switch commit.Operation { 47 case "create": 48 return c.createVote(ctx, event.Did, commit) 49 case "delete": 50 return c.deleteVote(ctx, event.Did, commit) 51 } 52 } 53 54 // Silently ignore other operations and collections 55 return nil 56} 57 58// createVote indexes a new vote from the firehose and updates post counts 59func (c *VoteEventConsumer) createVote(ctx context.Context, repoDID string, commit *CommitEvent) error { 60 if commit.Record == nil { 61 return fmt.Errorf("vote create event missing record data") 62 } 63 64 // Parse the vote record 65 voteRecord, err := parseVoteRecord(commit.Record) 66 if err != nil { 67 return fmt.Errorf("failed to parse vote record: %w", err) 68 } 69 70 // SECURITY: Validate this is a legitimate vote event 71 if err := c.validateVoteEvent(ctx, repoDID, voteRecord); err != nil { 72 log.Printf("🚨 SECURITY: Rejecting vote event: %v", err) 73 return err 74 } 75 76 // Build AT-URI for this vote 77 // Format: at://voter_did/social.coves.feed.vote/rkey 78 uri := fmt.Sprintf("at://%s/social.coves.feed.vote/%s", repoDID, commit.RKey) 79 80 // Parse timestamp from record 81 createdAt, err := time.Parse(time.RFC3339, voteRecord.CreatedAt) 82 if err != nil { 83 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err) 84 createdAt = time.Now() 85 } 86 87 // Build vote entity 88 vote := &votes.Vote{ 89 URI: uri, 90 CID: commit.CID, 91 RKey: commit.RKey, 92 VoterDID: repoDID, // Vote comes from user's repository 93 SubjectURI: voteRecord.Subject.URI, 94 SubjectCID: voteRecord.Subject.CID, 95 Direction: voteRecord.Direction, 96 CreatedAt: createdAt, 97 IndexedAt: time.Now(), 98 } 99 100 // Atomically: Index vote + Update post counts 101 if err := c.indexVoteAndUpdateCounts(ctx, vote); err != nil { 102 return fmt.Errorf("failed to index vote and update counts: %w", err) 103 } 104 105 log.Printf("✓ Indexed vote: %s (%s on %s)", uri, vote.Direction, vote.SubjectURI) 106 return nil 107} 108 109// deleteVote soft-deletes a vote and updates post counts 110func (c *VoteEventConsumer) deleteVote(ctx context.Context, repoDID string, commit *CommitEvent) error { 111 // Build AT-URI for the vote being deleted 112 uri := fmt.Sprintf("at://%s/social.coves.feed.vote/%s", repoDID, commit.RKey) 113 114 // Get existing vote to know its direction (for decrementing the right counter) 115 existingVote, err := c.voteRepo.GetByURI(ctx, uri) 116 if err != nil { 117 if err == votes.ErrVoteNotFound { 118 // Idempotent: Vote already deleted or never existed 119 log.Printf("Vote already deleted or not found: %s", uri) 120 return nil 121 } 122 return fmt.Errorf("failed to get existing vote: %w", err) 123 } 124 125 // Atomically: Soft-delete vote + Update post counts 126 if err := c.deleteVoteAndUpdateCounts(ctx, existingVote); err != nil { 127 return fmt.Errorf("failed to delete vote and update counts: %w", err) 128 } 129 130 log.Printf("✓ Deleted vote: %s (%s on %s)", uri, existingVote.Direction, existingVote.SubjectURI) 131 return nil 132} 133 134// indexVoteAndUpdateCounts atomically indexes a vote and updates post vote counts 135func (c *VoteEventConsumer) indexVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) error { 136 tx, err := c.db.BeginTx(ctx, nil) 137 if err != nil { 138 return fmt.Errorf("failed to begin transaction: %w", err) 139 } 140 defer func() { 141 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 142 log.Printf("Failed to rollback transaction: %v", rollbackErr) 143 } 144 }() 145 146 // 1. Index the vote (idempotent with ON CONFLICT DO NOTHING) 147 query := ` 148 INSERT INTO votes ( 149 uri, cid, rkey, voter_did, 150 subject_uri, subject_cid, direction, 151 created_at, indexed_at 152 ) VALUES ( 153 $1, $2, $3, $4, 154 $5, $6, $7, 155 $8, NOW() 156 ) 157 ON CONFLICT (uri) DO NOTHING 158 RETURNING id 159 ` 160 161 var voteID int64 162 err = tx.QueryRowContext( 163 ctx, query, 164 vote.URI, vote.CID, vote.RKey, vote.VoterDID, 165 vote.SubjectURI, vote.SubjectCID, vote.Direction, 166 vote.CreatedAt, 167 ).Scan(&voteID) 168 169 // If no rows returned, vote already exists (idempotent - OK for Jetstream replays) 170 if err == sql.ErrNoRows { 171 log.Printf("Vote already indexed: %s (idempotent)", vote.URI) 172 if commitErr := tx.Commit(); commitErr != nil { 173 return fmt.Errorf("failed to commit transaction: %w", commitErr) 174 } 175 return nil 176 } 177 178 if err != nil { 179 return fmt.Errorf("failed to insert vote: %w", err) 180 } 181 182 // 2. Update post vote counts atomically 183 // Increment upvote_count or downvote_count based on direction 184 // Also update score (upvote_count - downvote_count) 185 var updateQuery string 186 if vote.Direction == "up" { 187 updateQuery = ` 188 UPDATE posts 189 SET upvote_count = upvote_count + 1, 190 score = upvote_count + 1 - downvote_count 191 WHERE uri = $1 AND deleted_at IS NULL 192 ` 193 } else { // "down" 194 updateQuery = ` 195 UPDATE posts 196 SET downvote_count = downvote_count + 1, 197 score = upvote_count - (downvote_count + 1) 198 WHERE uri = $1 AND deleted_at IS NULL 199 ` 200 } 201 202 result, err := tx.ExecContext(ctx, updateQuery, vote.SubjectURI) 203 if err != nil { 204 return fmt.Errorf("failed to update post counts: %w", err) 205 } 206 207 rowsAffected, err := result.RowsAffected() 208 if err != nil { 209 return fmt.Errorf("failed to check update result: %w", err) 210 } 211 212 // If post doesn't exist or is deleted, that's OK (vote still indexed) 213 // Future: We could validate post exists before indexing vote 214 if rowsAffected == 0 { 215 log.Printf("Warning: Post not found or deleted: %s (vote indexed anyway)", vote.SubjectURI) 216 } 217 218 // Commit transaction 219 if err := tx.Commit(); err != nil { 220 return fmt.Errorf("failed to commit transaction: %w", err) 221 } 222 223 return nil 224} 225 226// deleteVoteAndUpdateCounts atomically soft-deletes a vote and updates post vote counts 227func (c *VoteEventConsumer) deleteVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) error { 228 tx, err := c.db.BeginTx(ctx, nil) 229 if err != nil { 230 return fmt.Errorf("failed to begin transaction: %w", err) 231 } 232 defer func() { 233 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 234 log.Printf("Failed to rollback transaction: %v", rollbackErr) 235 } 236 }() 237 238 // 1. Soft-delete the vote (idempotent) 239 deleteQuery := ` 240 UPDATE votes 241 SET deleted_at = NOW() 242 WHERE uri = $1 AND deleted_at IS NULL 243 ` 244 245 result, err := tx.ExecContext(ctx, deleteQuery, vote.URI) 246 if err != nil { 247 return fmt.Errorf("failed to delete vote: %w", err) 248 } 249 250 rowsAffected, err := result.RowsAffected() 251 if err != nil { 252 return fmt.Errorf("failed to check delete result: %w", err) 253 } 254 255 // Idempotent: If no rows affected, vote already deleted 256 if rowsAffected == 0 { 257 log.Printf("Vote already deleted: %s (idempotent)", vote.URI) 258 if commitErr := tx.Commit(); commitErr != nil { 259 return fmt.Errorf("failed to commit transaction: %w", commitErr) 260 } 261 return nil 262 } 263 264 // 2. Decrement post vote counts atomically 265 // Decrement upvote_count or downvote_count based on direction 266 // Also update score (use GREATEST to prevent negative counts) 267 var updateQuery string 268 if vote.Direction == "up" { 269 updateQuery = ` 270 UPDATE posts 271 SET upvote_count = GREATEST(0, upvote_count - 1), 272 score = GREATEST(0, upvote_count - 1) - downvote_count 273 WHERE uri = $1 AND deleted_at IS NULL 274 ` 275 } else { // "down" 276 updateQuery = ` 277 UPDATE posts 278 SET downvote_count = GREATEST(0, downvote_count - 1), 279 score = upvote_count - GREATEST(0, downvote_count - 1) 280 WHERE uri = $1 AND deleted_at IS NULL 281 ` 282 } 283 284 result, err = tx.ExecContext(ctx, updateQuery, vote.SubjectURI) 285 if err != nil { 286 return fmt.Errorf("failed to update post counts: %w", err) 287 } 288 289 rowsAffected, err = result.RowsAffected() 290 if err != nil { 291 return fmt.Errorf("failed to check update result: %w", err) 292 } 293 294 // If post doesn't exist or is deleted, that's OK (vote still deleted) 295 if rowsAffected == 0 { 296 log.Printf("Warning: Post not found or deleted: %s (vote deleted anyway)", vote.SubjectURI) 297 } 298 299 // Commit transaction 300 if err := tx.Commit(); err != nil { 301 return fmt.Errorf("failed to commit transaction: %w", err) 302 } 303 304 return nil 305} 306 307// validateVoteEvent performs security validation on vote events 308func (c *VoteEventConsumer) validateVoteEvent(ctx context.Context, repoDID string, vote *VoteRecordFromJetstream) error { 309 // SECURITY: Votes MUST come from user repositories (repo owner = voter DID) 310 // The repository owner (repoDID) IS the voter - votes are stored in user repos. 311 // 312 // We do NOT check if the user exists in AppView because: 313 // 1. Vote events may arrive before user events in Jetstream (race condition) 314 // 2. The vote came from the user's PDS repository (authenticated by PDS) 315 // 3. The database FK constraint was removed to allow out-of-order indexing 316 // 4. Orphaned votes (from never-indexed users) are harmless 317 // 318 // Security is maintained because: 319 // - Vote must come from user's own PDS repository (verified by atProto) 320 // - Communities cannot create votes in their repos (different collection) 321 // - Fake DIDs will fail PDS authentication 322 323 // Validate DID format (basic sanity check) 324 if !strings.HasPrefix(repoDID, "did:") { 325 return fmt.Errorf("invalid voter DID format: %s", repoDID) 326 } 327 328 // Validate vote direction 329 if vote.Direction != "up" && vote.Direction != "down" { 330 return fmt.Errorf("invalid vote direction: %s (must be 'up' or 'down')", vote.Direction) 331 } 332 333 // Validate subject has both URI and CID (strong reference) 334 if vote.Subject.URI == "" || vote.Subject.CID == "" { 335 return fmt.Errorf("invalid subject: must have both URI and CID (strong reference)") 336 } 337 338 return nil 339} 340 341// VoteRecordFromJetstream represents a vote record as received from Jetstream 342type VoteRecordFromJetstream struct { 343 Subject StrongRefFromJetstream `json:"subject"` 344 Direction string `json:"direction"` 345 CreatedAt string `json:"createdAt"` 346} 347 348// StrongRefFromJetstream represents a strong reference (URI + CID) 349type StrongRefFromJetstream struct { 350 URI string `json:"uri"` 351 CID string `json:"cid"` 352} 353 354// parseVoteRecord parses a vote record from Jetstream event data 355func parseVoteRecord(record map[string]interface{}) (*VoteRecordFromJetstream, error) { 356 // Extract subject (strong reference) 357 subjectMap, ok := record["subject"].(map[string]interface{}) 358 if !ok { 359 return nil, fmt.Errorf("missing or invalid subject field") 360 } 361 362 subjectURI, _ := subjectMap["uri"].(string) 363 subjectCID, _ := subjectMap["cid"].(string) 364 365 // Extract direction 366 direction, _ := record["direction"].(string) 367 368 // Extract createdAt 369 createdAt, _ := record["createdAt"].(string) 370 371 return &VoteRecordFromJetstream{ 372 Subject: StrongRefFromJetstream{ 373 URI: subjectURI, 374 CID: subjectCID, 375 }, 376 Direction: direction, 377 CreatedAt: createdAt, 378 }, nil 379}