A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/atproto/utils" 5 "Coves/internal/core/users" 6 "Coves/internal/core/votes" 7 "context" 8 "database/sql" 9 "fmt" 10 "log" 11 "strings" 12 "time" 13) 14 15// VoteEventConsumer consumes vote-related events from Jetstream 16// Handles CREATE and DELETE operations for social.coves.feed.vote 17type VoteEventConsumer struct { 18 voteRepo votes.Repository 19 userService users.UserService 20 db *sql.DB // Direct DB access for atomic vote count updates 21} 22 23// NewVoteEventConsumer creates a new Jetstream consumer for vote events 24func NewVoteEventConsumer( 25 voteRepo votes.Repository, 26 userService users.UserService, 27 db *sql.DB, 28) *VoteEventConsumer { 29 return &VoteEventConsumer{ 30 voteRepo: voteRepo, 31 userService: userService, 32 db: db, 33 } 34} 35 36// HandleEvent processes a Jetstream event for vote records 37func (c *VoteEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 38 // We only care about commit events for vote records 39 if event.Kind != "commit" || event.Commit == nil { 40 return nil 41 } 42 43 commit := event.Commit 44 45 // Handle vote record operations 46 if commit.Collection == "social.coves.feed.vote" { 47 switch commit.Operation { 48 case "create": 49 return c.createVote(ctx, event.Did, commit) 50 case "delete": 51 return c.deleteVote(ctx, event.Did, commit) 52 } 53 } 54 55 // Silently ignore other operations and collections 56 return nil 57} 58 59// createVote indexes a new vote from the firehose and updates post counts 60func (c *VoteEventConsumer) createVote(ctx context.Context, repoDID string, commit *CommitEvent) error { 61 if commit.Record == nil { 62 return fmt.Errorf("vote create event missing record data") 63 } 64 65 // Parse the vote record 66 voteRecord, err := parseVoteRecord(commit.Record) 67 if err != nil { 68 return fmt.Errorf("failed to parse vote record: %w", err) 69 } 70 71 // SECURITY: Validate this is a legitimate vote event 72 if err := c.validateVoteEvent(ctx, repoDID, voteRecord); err != nil { 73 log.Printf("🚨 SECURITY: Rejecting vote event: %v", err) 74 return err 75 } 76 77 // Build AT-URI for this vote 78 // Format: at://voter_did/social.coves.feed.vote/rkey 79 uri := fmt.Sprintf("at://%s/social.coves.feed.vote/%s", repoDID, commit.RKey) 80 81 // Parse timestamp from record 82 createdAt, err := time.Parse(time.RFC3339, voteRecord.CreatedAt) 83 if err != nil { 84 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err) 85 createdAt = time.Now() 86 } 87 88 // Build vote entity 89 vote := &votes.Vote{ 90 URI: uri, 91 CID: commit.CID, 92 RKey: commit.RKey, 93 VoterDID: repoDID, // Vote comes from user's repository 94 SubjectURI: voteRecord.Subject.URI, 95 SubjectCID: voteRecord.Subject.CID, 96 Direction: voteRecord.Direction, 97 CreatedAt: createdAt, 98 IndexedAt: time.Now(), 99 } 100 101 // Atomically: Index vote + Update post counts 102 if err := c.indexVoteAndUpdateCounts(ctx, vote); err != nil { 103 return fmt.Errorf("failed to index vote and update counts: %w", err) 104 } 105 106 log.Printf("✓ Indexed vote: %s (%s on %s)", uri, vote.Direction, vote.SubjectURI) 107 return nil 108} 109 110// deleteVote soft-deletes a vote and updates post counts 111func (c *VoteEventConsumer) deleteVote(ctx context.Context, repoDID string, commit *CommitEvent) error { 112 // Build AT-URI for the vote being deleted 113 uri := fmt.Sprintf("at://%s/social.coves.feed.vote/%s", repoDID, commit.RKey) 114 115 // Get existing vote to know its direction (for decrementing the right counter) 116 existingVote, err := c.voteRepo.GetByURI(ctx, uri) 117 if err != nil { 118 if err == votes.ErrVoteNotFound { 119 // Idempotent: Vote already deleted or never existed 120 log.Printf("Vote already deleted or not found: %s", uri) 121 return nil 122 } 123 return fmt.Errorf("failed to get existing vote: %w", err) 124 } 125 126 // Atomically: Soft-delete vote + Update post counts 127 if err := c.deleteVoteAndUpdateCounts(ctx, existingVote); err != nil { 128 return fmt.Errorf("failed to delete vote and update counts: %w", err) 129 } 130 131 log.Printf("✓ Deleted vote: %s (%s on %s)", uri, existingVote.Direction, existingVote.SubjectURI) 132 return nil 133} 134 135// indexVoteAndUpdateCounts atomically indexes a vote and updates post vote counts 136func (c *VoteEventConsumer) indexVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) error { 137 tx, err := c.db.BeginTx(ctx, nil) 138 if err != nil { 139 return fmt.Errorf("failed to begin transaction: %w", err) 140 } 141 defer func() { 142 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 143 log.Printf("Failed to rollback transaction: %v", rollbackErr) 144 } 145 }() 146 147 // 1. Index the vote (idempotent with ON CONFLICT DO NOTHING) 148 query := ` 149 INSERT INTO votes ( 150 uri, cid, rkey, voter_did, 151 subject_uri, subject_cid, direction, 152 created_at, indexed_at 153 ) VALUES ( 154 $1, $2, $3, $4, 155 $5, $6, $7, 156 $8, NOW() 157 ) 158 ON CONFLICT (uri) DO NOTHING 159 RETURNING id 160 ` 161 162 var voteID int64 163 err = tx.QueryRowContext( 164 ctx, query, 165 vote.URI, vote.CID, vote.RKey, vote.VoterDID, 166 vote.SubjectURI, vote.SubjectCID, vote.Direction, 167 vote.CreatedAt, 168 ).Scan(&voteID) 169 170 // If no rows returned, vote already exists (idempotent - OK for Jetstream replays) 171 if err == sql.ErrNoRows { 172 log.Printf("Vote already indexed: %s (idempotent)", vote.URI) 173 if commitErr := tx.Commit(); commitErr != nil { 174 return fmt.Errorf("failed to commit transaction: %w", commitErr) 175 } 176 return nil 177 } 178 179 if err != nil { 180 return fmt.Errorf("failed to insert vote: %w", err) 181 } 182 183 // 2. Update vote counts on the subject (post or comment) 184 // Parse collection from subject URI to determine target table 185 collection := utils.ExtractCollectionFromURI(vote.SubjectURI) 186 187 var updateQuery string 188 switch collection { 189 case "social.coves.community.post": 190 // Vote on post - update posts table 191 if vote.Direction == "up" { 192 updateQuery = ` 193 UPDATE posts 194 SET upvote_count = upvote_count + 1, 195 score = upvote_count + 1 - downvote_count 196 WHERE uri = $1 AND deleted_at IS NULL 197 ` 198 } else { // "down" 199 updateQuery = ` 200 UPDATE posts 201 SET downvote_count = downvote_count + 1, 202 score = upvote_count - (downvote_count + 1) 203 WHERE uri = $1 AND deleted_at IS NULL 204 ` 205 } 206 207 case "social.coves.community.comment": 208 // Vote on comment - update comments table 209 if vote.Direction == "up" { 210 updateQuery = ` 211 UPDATE comments 212 SET upvote_count = upvote_count + 1, 213 score = upvote_count + 1 - downvote_count 214 WHERE uri = $1 AND deleted_at IS NULL 215 ` 216 } else { // "down" 217 updateQuery = ` 218 UPDATE comments 219 SET downvote_count = downvote_count + 1, 220 score = upvote_count - (downvote_count + 1) 221 WHERE uri = $1 AND deleted_at IS NULL 222 ` 223 } 224 225 default: 226 // Unknown or unsupported collection 227 // Vote is still indexed in votes table, we just don't update denormalized counts 228 log.Printf("Vote subject has unsupported collection: %s (vote indexed, counts not updated)", collection) 229 if commitErr := tx.Commit(); commitErr != nil { 230 return fmt.Errorf("failed to commit transaction: %w", commitErr) 231 } 232 return nil 233 } 234 235 result, err := tx.ExecContext(ctx, updateQuery, vote.SubjectURI) 236 if err != nil { 237 return fmt.Errorf("failed to update vote counts: %w", err) 238 } 239 240 rowsAffected, err := result.RowsAffected() 241 if err != nil { 242 return fmt.Errorf("failed to check update result: %w", err) 243 } 244 245 // If subject doesn't exist or is deleted, that's OK (vote still indexed) 246 if rowsAffected == 0 { 247 log.Printf("Warning: Vote subject not found or deleted: %s (vote indexed anyway)", vote.SubjectURI) 248 } 249 250 // Commit transaction 251 if err := tx.Commit(); err != nil { 252 return fmt.Errorf("failed to commit transaction: %w", err) 253 } 254 255 return nil 256} 257 258// deleteVoteAndUpdateCounts atomically soft-deletes a vote and updates post vote counts 259func (c *VoteEventConsumer) deleteVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) error { 260 tx, err := c.db.BeginTx(ctx, nil) 261 if err != nil { 262 return fmt.Errorf("failed to begin transaction: %w", err) 263 } 264 defer func() { 265 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 266 log.Printf("Failed to rollback transaction: %v", rollbackErr) 267 } 268 }() 269 270 // 1. Soft-delete the vote (idempotent) 271 deleteQuery := ` 272 UPDATE votes 273 SET deleted_at = NOW() 274 WHERE uri = $1 AND deleted_at IS NULL 275 ` 276 277 result, err := tx.ExecContext(ctx, deleteQuery, vote.URI) 278 if err != nil { 279 return fmt.Errorf("failed to delete vote: %w", err) 280 } 281 282 rowsAffected, err := result.RowsAffected() 283 if err != nil { 284 return fmt.Errorf("failed to check delete result: %w", err) 285 } 286 287 // Idempotent: If no rows affected, vote already deleted 288 if rowsAffected == 0 { 289 log.Printf("Vote already deleted: %s (idempotent)", vote.URI) 290 if commitErr := tx.Commit(); commitErr != nil { 291 return fmt.Errorf("failed to commit transaction: %w", commitErr) 292 } 293 return nil 294 } 295 296 // 2. Decrement vote counts on the subject (post or comment) 297 // Parse collection from subject URI to determine target table 298 collection := utils.ExtractCollectionFromURI(vote.SubjectURI) 299 300 var updateQuery string 301 switch collection { 302 case "social.coves.community.post": 303 // Vote on post - update posts table 304 if vote.Direction == "up" { 305 updateQuery = ` 306 UPDATE posts 307 SET upvote_count = GREATEST(0, upvote_count - 1), 308 score = GREATEST(0, upvote_count - 1) - downvote_count 309 WHERE uri = $1 AND deleted_at IS NULL 310 ` 311 } else { // "down" 312 updateQuery = ` 313 UPDATE posts 314 SET downvote_count = GREATEST(0, downvote_count - 1), 315 score = upvote_count - GREATEST(0, downvote_count - 1) 316 WHERE uri = $1 AND deleted_at IS NULL 317 ` 318 } 319 320 case "social.coves.community.comment": 321 // Vote on comment - update comments table 322 if vote.Direction == "up" { 323 updateQuery = ` 324 UPDATE comments 325 SET upvote_count = GREATEST(0, upvote_count - 1), 326 score = GREATEST(0, upvote_count - 1) - downvote_count 327 WHERE uri = $1 AND deleted_at IS NULL 328 ` 329 } else { // "down" 330 updateQuery = ` 331 UPDATE comments 332 SET downvote_count = GREATEST(0, downvote_count - 1), 333 score = upvote_count - GREATEST(0, downvote_count - 1) 334 WHERE uri = $1 AND deleted_at IS NULL 335 ` 336 } 337 338 default: 339 // Unknown or unsupported collection 340 // Vote is still deleted, we just don't update denormalized counts 341 log.Printf("Vote subject has unsupported collection: %s (vote deleted, counts not updated)", collection) 342 if commitErr := tx.Commit(); commitErr != nil { 343 return fmt.Errorf("failed to commit transaction: %w", commitErr) 344 } 345 return nil 346 } 347 348 result, err = tx.ExecContext(ctx, updateQuery, vote.SubjectURI) 349 if err != nil { 350 return fmt.Errorf("failed to update vote counts: %w", err) 351 } 352 353 rowsAffected, err = result.RowsAffected() 354 if err != nil { 355 return fmt.Errorf("failed to check update result: %w", err) 356 } 357 358 // If subject doesn't exist or is deleted, that's OK (vote still deleted) 359 if rowsAffected == 0 { 360 log.Printf("Warning: Vote subject not found or deleted: %s (vote deleted anyway)", vote.SubjectURI) 361 } 362 363 // Commit transaction 364 if err := tx.Commit(); err != nil { 365 return fmt.Errorf("failed to commit transaction: %w", err) 366 } 367 368 return nil 369} 370 371// validateVoteEvent performs security validation on vote events 372func (c *VoteEventConsumer) validateVoteEvent(ctx context.Context, repoDID string, vote *VoteRecordFromJetstream) error { 373 // SECURITY: Votes MUST come from user repositories (repo owner = voter DID) 374 // The repository owner (repoDID) IS the voter - votes are stored in user repos. 375 // 376 // We do NOT check if the user exists in AppView because: 377 // 1. Vote events may arrive before user events in Jetstream (race condition) 378 // 2. The vote came from the user's PDS repository (authenticated by PDS) 379 // 3. The database FK constraint was removed to allow out-of-order indexing 380 // 4. Orphaned votes (from never-indexed users) are harmless 381 // 382 // Security is maintained because: 383 // - Vote must come from user's own PDS repository (verified by atProto) 384 // - Communities cannot create votes in their repos (different collection) 385 // - Fake DIDs will fail PDS authentication 386 387 // Validate DID format (basic sanity check) 388 if !strings.HasPrefix(repoDID, "did:") { 389 return fmt.Errorf("invalid voter DID format: %s", repoDID) 390 } 391 392 // Validate vote direction 393 if vote.Direction != "up" && vote.Direction != "down" { 394 return fmt.Errorf("invalid vote direction: %s (must be 'up' or 'down')", vote.Direction) 395 } 396 397 // Validate subject has both URI and CID (strong reference) 398 if vote.Subject.URI == "" || vote.Subject.CID == "" { 399 return fmt.Errorf("invalid subject: must have both URI and CID (strong reference)") 400 } 401 402 return nil 403} 404 405// VoteRecordFromJetstream represents a vote record as received from Jetstream 406type VoteRecordFromJetstream struct { 407 Subject StrongRefFromJetstream `json:"subject"` 408 Direction string `json:"direction"` 409 CreatedAt string `json:"createdAt"` 410} 411 412// StrongRefFromJetstream represents a strong reference (URI + CID) 413type StrongRefFromJetstream struct { 414 URI string `json:"uri"` 415 CID string `json:"cid"` 416} 417 418// parseVoteRecord parses a vote record from Jetstream event data 419func parseVoteRecord(record map[string]interface{}) (*VoteRecordFromJetstream, error) { 420 // Extract subject (strong reference) 421 subjectMap, ok := record["subject"].(map[string]interface{}) 422 if !ok { 423 return nil, fmt.Errorf("missing or invalid subject field") 424 } 425 426 subjectURI, _ := subjectMap["uri"].(string) 427 subjectCID, _ := subjectMap["cid"].(string) 428 429 // Extract direction 430 direction, _ := record["direction"].(string) 431 432 // Extract createdAt 433 createdAt, _ := record["createdAt"].(string) 434 435 return &VoteRecordFromJetstream{ 436 Subject: StrongRefFromJetstream{ 437 URI: subjectURI, 438 CID: subjectCID, 439 }, 440 Direction: direction, 441 CreatedAt: createdAt, 442 }, nil 443}