A community based topic aggregation platform built on atproto
at main 17 kB view raw
1package jetstream 2 3import ( 4 "Coves/internal/atproto/utils" 5 "Coves/internal/core/users" 6 "Coves/internal/core/votes" 7 "context" 8 "database/sql" 9 "fmt" 10 "log" 11 "strings" 12 "time" 13) 14 15// VoteEventConsumer consumes vote-related events from Jetstream 16// Handles CREATE and DELETE operations for social.coves.feed.vote 17type VoteEventConsumer struct { 18 voteRepo votes.Repository 19 userService users.UserService 20 db *sql.DB // Direct DB access for atomic vote count updates 21} 22 23// NewVoteEventConsumer creates a new Jetstream consumer for vote events 24func NewVoteEventConsumer( 25 voteRepo votes.Repository, 26 userService users.UserService, 27 db *sql.DB, 28) *VoteEventConsumer { 29 return &VoteEventConsumer{ 30 voteRepo: voteRepo, 31 userService: userService, 32 db: db, 33 } 34} 35 36// HandleEvent processes a Jetstream event for vote records 37func (c *VoteEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 38 // We only care about commit events for vote records 39 if event.Kind != "commit" || event.Commit == nil { 40 return nil 41 } 42 43 commit := event.Commit 44 45 // Handle vote record operations 46 if commit.Collection == "social.coves.feed.vote" { 47 switch commit.Operation { 48 case "create": 49 return c.createVote(ctx, event.Did, commit) 50 case "delete": 51 return c.deleteVote(ctx, event.Did, commit) 52 } 53 } 54 55 // Silently ignore other operations and collections 56 return nil 57} 58 59// createVote indexes a new vote from the firehose and updates post counts 60func (c *VoteEventConsumer) createVote(ctx context.Context, repoDID string, commit *CommitEvent) error { 61 if commit.Record == nil { 62 return fmt.Errorf("vote create event missing record data") 63 } 64 65 // Parse the vote record 66 voteRecord, err := parseVoteRecord(commit.Record) 67 if err != nil { 68 return fmt.Errorf("failed to parse vote record: %w", err) 69 } 70 71 // SECURITY: Validate this is a legitimate vote event 72 if err := c.validateVoteEvent(ctx, repoDID, voteRecord); err != nil { 73 log.Printf("🚨 SECURITY: Rejecting vote event: %v", err) 74 return err 75 } 76 77 // Build AT-URI for this vote 78 // Format: at://voter_did/social.coves.feed.vote/rkey 79 uri := fmt.Sprintf("at://%s/social.coves.feed.vote/%s", repoDID, commit.RKey) 80 81 // Parse timestamp from record 82 createdAt, err := time.Parse(time.RFC3339, voteRecord.CreatedAt) 83 if err != nil { 84 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err) 85 createdAt = time.Now() 86 } 87 88 // Build vote entity 89 vote := &votes.Vote{ 90 URI: uri, 91 CID: commit.CID, 92 RKey: commit.RKey, 93 VoterDID: repoDID, // Vote comes from user's repository 94 SubjectURI: voteRecord.Subject.URI, 95 SubjectCID: voteRecord.Subject.CID, 96 Direction: voteRecord.Direction, 97 CreatedAt: createdAt, 98 IndexedAt: time.Now(), 99 } 100 101 // Atomically: Index vote + Update post counts 102 wasNew, err := c.indexVoteAndUpdateCounts(ctx, vote) 103 if err != nil { 104 return fmt.Errorf("failed to index vote and update counts: %w", err) 105 } 106 107 if wasNew { 108 log.Printf("✓ Indexed vote: %s (%s on %s)", uri, vote.Direction, vote.SubjectURI) 109 } 110 return nil 111} 112 113// deleteVote soft-deletes a vote and updates post counts 114func (c *VoteEventConsumer) deleteVote(ctx context.Context, repoDID string, commit *CommitEvent) error { 115 // Build AT-URI for the vote being deleted 116 uri := fmt.Sprintf("at://%s/social.coves.feed.vote/%s", repoDID, commit.RKey) 117 118 // Get existing vote to know its direction (for decrementing the right counter) 119 existingVote, err := c.voteRepo.GetByURI(ctx, uri) 120 if err != nil { 121 if err == votes.ErrVoteNotFound { 122 // Idempotent: Vote already deleted or never existed 123 log.Printf("Vote already deleted or not found: %s", uri) 124 return nil 125 } 126 return fmt.Errorf("failed to get existing vote: %w", err) 127 } 128 129 // Atomically: Soft-delete vote + Update post counts 130 if err := c.deleteVoteAndUpdateCounts(ctx, existingVote); err != nil { 131 return fmt.Errorf("failed to delete vote and update counts: %w", err) 132 } 133 134 log.Printf("✓ Deleted vote: %s (%s on %s)", uri, existingVote.Direction, existingVote.SubjectURI) 135 return nil 136} 137 138// indexVoteAndUpdateCounts atomically indexes a vote and updates post vote counts 139// Returns (true, nil) if vote was newly inserted, (false, nil) if already existed (idempotent) 140func (c *VoteEventConsumer) indexVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) (bool, error) { 141 tx, err := c.db.BeginTx(ctx, nil) 142 if err != nil { 143 return false, fmt.Errorf("failed to begin transaction: %w", err) 144 } 145 defer func() { 146 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 147 log.Printf("Failed to rollback transaction: %v", rollbackErr) 148 } 149 }() 150 151 // 1. Check for existing active vote with different URI (stale record) 152 // This handles cases where: 153 // - User voted on another client and we missed the delete event 154 // - Vote was reindexed but user created a new vote with different rkey 155 // - Any other state mismatch between PDS and AppView 156 var existingDirection sql.NullString 157 checkQuery := ` 158 SELECT direction FROM votes 159 WHERE voter_did = $1 160 AND subject_uri = $2 161 AND deleted_at IS NULL 162 AND uri != $3 163 LIMIT 1 164 ` 165 if err := tx.QueryRowContext(ctx, checkQuery, vote.VoterDID, vote.SubjectURI, vote.URI).Scan(&existingDirection); err != nil && err != sql.ErrNoRows { 166 return false, fmt.Errorf("failed to check existing vote: %w", err) 167 } 168 169 // If there's a stale vote, soft-delete it and adjust counts 170 if existingDirection.Valid { 171 softDeleteQuery := ` 172 UPDATE votes 173 SET deleted_at = NOW() 174 WHERE voter_did = $1 175 AND subject_uri = $2 176 AND deleted_at IS NULL 177 AND uri != $3 178 ` 179 if _, err := tx.ExecContext(ctx, softDeleteQuery, vote.VoterDID, vote.SubjectURI, vote.URI); err != nil { 180 return false, fmt.Errorf("failed to soft-delete existing votes: %w", err) 181 } 182 183 // Decrement the old vote's count (will be re-incremented below if same direction) 184 collection := utils.ExtractCollectionFromURI(vote.SubjectURI) 185 var decrementQuery string 186 if existingDirection.String == "up" { 187 if collection == "social.coves.community.post" { 188 decrementQuery = `UPDATE posts SET upvote_count = GREATEST(0, upvote_count - 1), score = upvote_count - 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL` 189 } else if collection == "social.coves.community.comment" { 190 decrementQuery = `UPDATE comments SET upvote_count = GREATEST(0, upvote_count - 1), score = upvote_count - 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL` 191 } 192 } else { 193 if collection == "social.coves.community.post" { 194 decrementQuery = `UPDATE posts SET downvote_count = GREATEST(0, downvote_count - 1), score = upvote_count - (downvote_count - 1) WHERE uri = $1 AND deleted_at IS NULL` 195 } else if collection == "social.coves.community.comment" { 196 decrementQuery = `UPDATE comments SET downvote_count = GREATEST(0, downvote_count - 1), score = upvote_count - (downvote_count - 1) WHERE uri = $1 AND deleted_at IS NULL` 197 } 198 } 199 if decrementQuery != "" { 200 if _, err := tx.ExecContext(ctx, decrementQuery, vote.SubjectURI); err != nil { 201 return false, fmt.Errorf("failed to decrement old vote count: %w", err) 202 } 203 } 204 log.Printf("Cleaned up stale vote for %s on %s (was %s)", vote.VoterDID, vote.SubjectURI, existingDirection.String) 205 } 206 207 // 2. Index the vote (idempotent with ON CONFLICT DO NOTHING) 208 query := ` 209 INSERT INTO votes ( 210 uri, cid, rkey, voter_did, 211 subject_uri, subject_cid, direction, 212 created_at, indexed_at 213 ) VALUES ( 214 $1, $2, $3, $4, 215 $5, $6, $7, 216 $8, NOW() 217 ) 218 ON CONFLICT (uri) DO NOTHING 219 RETURNING id 220 ` 221 222 var voteID int64 223 err = tx.QueryRowContext( 224 ctx, query, 225 vote.URI, vote.CID, vote.RKey, vote.VoterDID, 226 vote.SubjectURI, vote.SubjectCID, vote.Direction, 227 vote.CreatedAt, 228 ).Scan(&voteID) 229 230 // If no rows returned, vote already exists (idempotent - OK for Jetstream replays) 231 if err == sql.ErrNoRows { 232 // Silently handle idempotent case - no log needed for replayed events 233 if commitErr := tx.Commit(); commitErr != nil { 234 return false, fmt.Errorf("failed to commit transaction: %w", commitErr) 235 } 236 return false, nil // Vote already existed 237 } 238 239 if err != nil { 240 return false, fmt.Errorf("failed to insert vote: %w", err) 241 } 242 243 // 3. Update vote counts on the subject (post or comment) 244 // Parse collection from subject URI to determine target table 245 collection := utils.ExtractCollectionFromURI(vote.SubjectURI) 246 247 var updateQuery string 248 switch collection { 249 case "social.coves.community.post": 250 // Vote on post - update posts table 251 if vote.Direction == "up" { 252 updateQuery = ` 253 UPDATE posts 254 SET upvote_count = upvote_count + 1, 255 score = upvote_count + 1 - downvote_count 256 WHERE uri = $1 AND deleted_at IS NULL 257 ` 258 } else { // "down" 259 updateQuery = ` 260 UPDATE posts 261 SET downvote_count = downvote_count + 1, 262 score = upvote_count - (downvote_count + 1) 263 WHERE uri = $1 AND deleted_at IS NULL 264 ` 265 } 266 267 case "social.coves.community.comment": 268 // Vote on comment - update comments table 269 if vote.Direction == "up" { 270 updateQuery = ` 271 UPDATE comments 272 SET upvote_count = upvote_count + 1, 273 score = upvote_count + 1 - downvote_count 274 WHERE uri = $1 AND deleted_at IS NULL 275 ` 276 } else { // "down" 277 updateQuery = ` 278 UPDATE comments 279 SET downvote_count = downvote_count + 1, 280 score = upvote_count - (downvote_count + 1) 281 WHERE uri = $1 AND deleted_at IS NULL 282 ` 283 } 284 285 default: 286 // Unknown or unsupported collection 287 // Vote is still indexed in votes table, we just don't update denormalized counts 288 log.Printf("Vote subject has unsupported collection: %s (vote indexed, counts not updated)", collection) 289 if commitErr := tx.Commit(); commitErr != nil { 290 return false, fmt.Errorf("failed to commit transaction: %w", commitErr) 291 } 292 return true, nil // Vote was newly indexed 293 } 294 295 result, err := tx.ExecContext(ctx, updateQuery, vote.SubjectURI) 296 if err != nil { 297 return false, fmt.Errorf("failed to update vote counts: %w", err) 298 } 299 300 rowsAffected, err := result.RowsAffected() 301 if err != nil { 302 return false, fmt.Errorf("failed to check update result: %w", err) 303 } 304 305 // If subject doesn't exist or is deleted, that's OK (vote still indexed) 306 if rowsAffected == 0 { 307 log.Printf("Warning: Vote subject not found or deleted: %s (vote indexed anyway)", vote.SubjectURI) 308 } 309 310 // Commit transaction 311 if err := tx.Commit(); err != nil { 312 return false, fmt.Errorf("failed to commit transaction: %w", err) 313 } 314 315 return true, nil // Vote was newly indexed 316} 317 318// deleteVoteAndUpdateCounts atomically soft-deletes a vote and updates post vote counts 319func (c *VoteEventConsumer) deleteVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) error { 320 tx, err := c.db.BeginTx(ctx, nil) 321 if err != nil { 322 return fmt.Errorf("failed to begin transaction: %w", err) 323 } 324 defer func() { 325 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 326 log.Printf("Failed to rollback transaction: %v", rollbackErr) 327 } 328 }() 329 330 // 1. Soft-delete the vote (idempotent) 331 deleteQuery := ` 332 UPDATE votes 333 SET deleted_at = NOW() 334 WHERE uri = $1 AND deleted_at IS NULL 335 ` 336 337 result, err := tx.ExecContext(ctx, deleteQuery, vote.URI) 338 if err != nil { 339 return fmt.Errorf("failed to delete vote: %w", err) 340 } 341 342 rowsAffected, err := result.RowsAffected() 343 if err != nil { 344 return fmt.Errorf("failed to check delete result: %w", err) 345 } 346 347 // Idempotent: If no rows affected, vote already deleted 348 if rowsAffected == 0 { 349 log.Printf("Vote already deleted: %s (idempotent)", vote.URI) 350 if commitErr := tx.Commit(); commitErr != nil { 351 return fmt.Errorf("failed to commit transaction: %w", commitErr) 352 } 353 return nil 354 } 355 356 // 2. Decrement vote counts on the subject (post or comment) 357 // Parse collection from subject URI to determine target table 358 collection := utils.ExtractCollectionFromURI(vote.SubjectURI) 359 360 var updateQuery string 361 switch collection { 362 case "social.coves.community.post": 363 // Vote on post - update posts table 364 if vote.Direction == "up" { 365 updateQuery = ` 366 UPDATE posts 367 SET upvote_count = GREATEST(0, upvote_count - 1), 368 score = GREATEST(0, upvote_count - 1) - downvote_count 369 WHERE uri = $1 AND deleted_at IS NULL 370 ` 371 } else { // "down" 372 updateQuery = ` 373 UPDATE posts 374 SET downvote_count = GREATEST(0, downvote_count - 1), 375 score = upvote_count - GREATEST(0, downvote_count - 1) 376 WHERE uri = $1 AND deleted_at IS NULL 377 ` 378 } 379 380 case "social.coves.community.comment": 381 // Vote on comment - update comments table 382 if vote.Direction == "up" { 383 updateQuery = ` 384 UPDATE comments 385 SET upvote_count = GREATEST(0, upvote_count - 1), 386 score = GREATEST(0, upvote_count - 1) - downvote_count 387 WHERE uri = $1 AND deleted_at IS NULL 388 ` 389 } else { // "down" 390 updateQuery = ` 391 UPDATE comments 392 SET downvote_count = GREATEST(0, downvote_count - 1), 393 score = upvote_count - GREATEST(0, downvote_count - 1) 394 WHERE uri = $1 AND deleted_at IS NULL 395 ` 396 } 397 398 default: 399 // Unknown or unsupported collection 400 // Vote is still deleted, we just don't update denormalized counts 401 log.Printf("Vote subject has unsupported collection: %s (vote deleted, counts not updated)", collection) 402 if commitErr := tx.Commit(); commitErr != nil { 403 return fmt.Errorf("failed to commit transaction: %w", commitErr) 404 } 405 return nil 406 } 407 408 result, err = tx.ExecContext(ctx, updateQuery, vote.SubjectURI) 409 if err != nil { 410 return fmt.Errorf("failed to update vote counts: %w", err) 411 } 412 413 rowsAffected, err = result.RowsAffected() 414 if err != nil { 415 return fmt.Errorf("failed to check update result: %w", err) 416 } 417 418 // If subject doesn't exist or is deleted, that's OK (vote still deleted) 419 if rowsAffected == 0 { 420 log.Printf("Warning: Vote subject not found or deleted: %s (vote deleted anyway)", vote.SubjectURI) 421 } 422 423 // Commit transaction 424 if err := tx.Commit(); err != nil { 425 return fmt.Errorf("failed to commit transaction: %w", err) 426 } 427 428 return nil 429} 430 431// validateVoteEvent performs security validation on vote events 432func (c *VoteEventConsumer) validateVoteEvent(ctx context.Context, repoDID string, vote *VoteRecordFromJetstream) error { 433 // SECURITY: Votes MUST come from user repositories (repo owner = voter DID) 434 // The repository owner (repoDID) IS the voter - votes are stored in user repos. 435 // 436 // We do NOT check if the user exists in AppView because: 437 // 1. Vote events may arrive before user events in Jetstream (race condition) 438 // 2. The vote came from the user's PDS repository (authenticated by PDS) 439 // 3. The database FK constraint was removed to allow out-of-order indexing 440 // 4. Orphaned votes (from never-indexed users) are harmless 441 // 442 // Security is maintained because: 443 // - Vote must come from user's own PDS repository (verified by atProto) 444 // - Communities cannot create votes in their repos (different collection) 445 // - Fake DIDs will fail PDS authentication 446 447 // Validate DID format (basic sanity check) 448 if !strings.HasPrefix(repoDID, "did:") { 449 return fmt.Errorf("invalid voter DID format: %s", repoDID) 450 } 451 452 // Validate vote direction 453 if vote.Direction != "up" && vote.Direction != "down" { 454 return fmt.Errorf("invalid vote direction: %s (must be 'up' or 'down')", vote.Direction) 455 } 456 457 // Validate subject has both URI and CID (strong reference) 458 if vote.Subject.URI == "" || vote.Subject.CID == "" { 459 return fmt.Errorf("invalid subject: must have both URI and CID (strong reference)") 460 } 461 462 return nil 463} 464 465// VoteRecordFromJetstream represents a vote record as received from Jetstream 466type VoteRecordFromJetstream struct { 467 Subject StrongRefFromJetstream `json:"subject"` 468 Direction string `json:"direction"` 469 CreatedAt string `json:"createdAt"` 470} 471 472// StrongRefFromJetstream represents a strong reference (URI + CID) 473type StrongRefFromJetstream struct { 474 URI string `json:"uri"` 475 CID string `json:"cid"` 476} 477 478// parseVoteRecord parses a vote record from Jetstream event data 479func parseVoteRecord(record map[string]interface{}) (*VoteRecordFromJetstream, error) { 480 // Extract subject (strong reference) 481 subjectMap, ok := record["subject"].(map[string]interface{}) 482 if !ok { 483 return nil, fmt.Errorf("missing or invalid subject field") 484 } 485 486 subjectURI, _ := subjectMap["uri"].(string) 487 subjectCID, _ := subjectMap["cid"].(string) 488 489 // Extract direction 490 direction, _ := record["direction"].(string) 491 492 // Extract createdAt 493 createdAt, _ := record["createdAt"].(string) 494 495 return &VoteRecordFromJetstream{ 496 Subject: StrongRefFromJetstream{ 497 URI: subjectURI, 498 CID: subjectCID, 499 }, 500 Direction: direction, 501 CreatedAt: createdAt, 502 }, nil 503}