A community based topic aggregation platform built on atproto
at main 33 kB view raw
1package postgres 2 3import ( 4 "Coves/internal/core/comments" 5 "context" 6 "database/sql" 7 "encoding/base64" 8 "fmt" 9 "log" 10 "strings" 11 12 "github.com/lib/pq" 13) 14 15type postgresCommentRepo struct { 16 db *sql.DB 17} 18 19// NewCommentRepository creates a new PostgreSQL comment repository 20func NewCommentRepository(db *sql.DB) comments.Repository { 21 return &postgresCommentRepo{db: db} 22} 23 24// Create inserts a new comment into the comments table 25// Called by Jetstream consumer after comment is created on PDS 26// Idempotent: Returns success if comment already exists (for Jetstream replays) 27func (r *postgresCommentRepo) Create(ctx context.Context, comment *comments.Comment) error { 28 query := ` 29 INSERT INTO comments ( 30 uri, cid, rkey, commenter_did, 31 root_uri, root_cid, parent_uri, parent_cid, 32 content, content_facets, embed, content_labels, langs, 33 created_at, indexed_at 34 ) VALUES ( 35 $1, $2, $3, $4, 36 $5, $6, $7, $8, 37 $9, $10, $11, $12, $13, 38 $14, NOW() 39 ) 40 ON CONFLICT (uri) DO NOTHING 41 RETURNING id, indexed_at 42 ` 43 44 err := r.db.QueryRowContext( 45 ctx, query, 46 comment.URI, comment.CID, comment.RKey, comment.CommenterDID, 47 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID, 48 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs), 49 comment.CreatedAt, 50 ).Scan(&comment.ID, &comment.IndexedAt) 51 52 // ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent) 53 if err == sql.ErrNoRows { 54 return nil // Comment already exists, no error for idempotency 55 } 56 57 if err != nil { 58 // Check for unique constraint violation 59 if strings.Contains(err.Error(), "duplicate key") { 60 return comments.ErrCommentAlreadyExists 61 } 62 63 return fmt.Errorf("failed to insert comment: %w", err) 64 } 65 66 return nil 67} 68 69// Update modifies an existing comment's content fields 70// Called by Jetstream consumer after comment is updated on PDS 71// Preserves vote counts and created_at timestamp 72func (r *postgresCommentRepo) Update(ctx context.Context, comment *comments.Comment) error { 73 query := ` 74 UPDATE comments 75 SET 76 cid = $1, 77 content = $2, 78 content_facets = $3, 79 embed = $4, 80 content_labels = $5, 81 langs = $6 82 WHERE uri = $7 AND deleted_at IS NULL 83 RETURNING id, indexed_at, created_at, upvote_count, downvote_count, score, reply_count 84 ` 85 86 err := r.db.QueryRowContext( 87 ctx, query, 88 comment.CID, 89 comment.Content, 90 comment.ContentFacets, 91 comment.Embed, 92 comment.ContentLabels, 93 pq.Array(comment.Langs), 94 comment.URI, 95 ).Scan( 96 &comment.ID, 97 &comment.IndexedAt, 98 &comment.CreatedAt, 99 &comment.UpvoteCount, 100 &comment.DownvoteCount, 101 &comment.Score, 102 &comment.ReplyCount, 103 ) 104 105 if err == sql.ErrNoRows { 106 return comments.ErrCommentNotFound 107 } 108 if err != nil { 109 return fmt.Errorf("failed to update comment: %w", err) 110 } 111 112 return nil 113} 114 115// GetByURI retrieves a comment by its AT-URI 116// Used by Jetstream consumer for UPDATE/DELETE operations 117func (r *postgresCommentRepo) GetByURI(ctx context.Context, uri string) (*comments.Comment, error) { 118 query := ` 119 SELECT 120 id, uri, cid, rkey, commenter_did, 121 root_uri, root_cid, parent_uri, parent_cid, 122 content, content_facets, embed, content_labels, langs, 123 created_at, indexed_at, deleted_at, deletion_reason, deleted_by, 124 upvote_count, downvote_count, score, reply_count 125 FROM comments 126 WHERE uri = $1 127 ` 128 129 var comment comments.Comment 130 var langs pq.StringArray 131 132 err := r.db.QueryRowContext(ctx, query, uri).Scan( 133 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 134 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 135 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 136 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy, 137 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 138 ) 139 140 if err == sql.ErrNoRows { 141 return nil, comments.ErrCommentNotFound 142 } 143 if err != nil { 144 return nil, fmt.Errorf("failed to get comment by URI: %w", err) 145 } 146 147 comment.Langs = langs 148 149 return &comment, nil 150} 151 152// Delete soft-deletes a comment (sets deleted_at) 153// Called by Jetstream consumer after comment is deleted from PDS 154// Idempotent: Returns success if comment already deleted 155// Deprecated: Use SoftDeleteWithReason for new code to preserve thread structure 156func (r *postgresCommentRepo) Delete(ctx context.Context, uri string) error { 157 query := ` 158 UPDATE comments 159 SET deleted_at = NOW() 160 WHERE uri = $1 AND deleted_at IS NULL 161 ` 162 163 result, err := r.db.ExecContext(ctx, query, uri) 164 if err != nil { 165 return fmt.Errorf("failed to delete comment: %w", err) 166 } 167 168 rowsAffected, err := result.RowsAffected() 169 if err != nil { 170 return fmt.Errorf("failed to check delete result: %w", err) 171 } 172 173 // Idempotent: If no rows affected, comment already deleted (OK for Jetstream replays) 174 if rowsAffected == 0 { 175 return nil 176 } 177 178 return nil 179} 180 181// SoftDeleteWithReason performs a soft delete that blanks content but preserves thread structure 182// This allows deleted comments to appear as "[deleted]" placeholders in thread views 183// Idempotent: Returns success if comment already deleted 184// Validates that reason is a known deletion reason constant 185func (r *postgresCommentRepo) SoftDeleteWithReason(ctx context.Context, uri, reason, deletedByDID string) error { 186 // Validate deletion reason 187 if reason != comments.DeletionReasonAuthor && reason != comments.DeletionReasonModerator { 188 return fmt.Errorf("invalid deletion reason: %s", reason) 189 } 190 191 _, err := r.SoftDeleteWithReasonTx(ctx, nil, uri, reason, deletedByDID) 192 return err 193} 194 195// SoftDeleteWithReasonTx performs a soft delete within an optional transaction 196// If tx is nil, executes directly against the database 197// Returns rows affected count for callers that need to check idempotency 198// This method is used by both the repository and the Jetstream consumer 199func (r *postgresCommentRepo) SoftDeleteWithReasonTx(ctx context.Context, tx *sql.Tx, uri, reason, deletedByDID string) (int64, error) { 200 query := ` 201 UPDATE comments 202 SET 203 content = '', 204 content_facets = NULL, 205 embed = NULL, 206 content_labels = NULL, 207 deleted_at = NOW(), 208 deletion_reason = $2, 209 deleted_by = $3 210 WHERE uri = $1 AND deleted_at IS NULL 211 ` 212 213 var result sql.Result 214 var err error 215 216 if tx != nil { 217 result, err = tx.ExecContext(ctx, query, uri, reason, deletedByDID) 218 } else { 219 result, err = r.db.ExecContext(ctx, query, uri, reason, deletedByDID) 220 } 221 222 if err != nil { 223 return 0, fmt.Errorf("failed to soft delete comment: %w", err) 224 } 225 226 rowsAffected, err := result.RowsAffected() 227 if err != nil { 228 return 0, fmt.Errorf("failed to check delete result: %w", err) 229 } 230 231 return rowsAffected, nil 232} 233 234// ListByRoot retrieves all comments in a thread (flat), including deleted ones 235// Used for fetching entire comment threads on posts 236// Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders) 237func (r *postgresCommentRepo) ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*comments.Comment, error) { 238 query := ` 239 SELECT 240 id, uri, cid, rkey, commenter_did, 241 root_uri, root_cid, parent_uri, parent_cid, 242 content, content_facets, embed, content_labels, langs, 243 created_at, indexed_at, deleted_at, deletion_reason, deleted_by, 244 upvote_count, downvote_count, score, reply_count 245 FROM comments 246 WHERE root_uri = $1 247 ORDER BY created_at ASC 248 LIMIT $2 OFFSET $3 249 ` 250 251 rows, err := r.db.QueryContext(ctx, query, rootURI, limit, offset) 252 if err != nil { 253 return nil, fmt.Errorf("failed to list comments by root: %w", err) 254 } 255 defer func() { 256 if err := rows.Close(); err != nil { 257 log.Printf("Failed to close rows: %v", err) 258 } 259 }() 260 261 var result []*comments.Comment 262 for rows.Next() { 263 var comment comments.Comment 264 var langs pq.StringArray 265 266 err := rows.Scan( 267 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 268 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 269 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 270 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy, 271 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 272 ) 273 if err != nil { 274 return nil, fmt.Errorf("failed to scan comment: %w", err) 275 } 276 277 comment.Langs = langs 278 result = append(result, &comment) 279 } 280 281 if err = rows.Err(); err != nil { 282 return nil, fmt.Errorf("error iterating comments: %w", err) 283 } 284 285 return result, nil 286} 287 288// ListByParent retrieves direct replies to a post or comment, including deleted ones 289// Used for building nested/threaded comment views 290// Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders) 291func (r *postgresCommentRepo) ListByParent(ctx context.Context, parentURI string, limit, offset int) ([]*comments.Comment, error) { 292 query := ` 293 SELECT 294 id, uri, cid, rkey, commenter_did, 295 root_uri, root_cid, parent_uri, parent_cid, 296 content, content_facets, embed, content_labels, langs, 297 created_at, indexed_at, deleted_at, deletion_reason, deleted_by, 298 upvote_count, downvote_count, score, reply_count 299 FROM comments 300 WHERE parent_uri = $1 301 ORDER BY created_at ASC 302 LIMIT $2 OFFSET $3 303 ` 304 305 rows, err := r.db.QueryContext(ctx, query, parentURI, limit, offset) 306 if err != nil { 307 return nil, fmt.Errorf("failed to list comments by parent: %w", err) 308 } 309 defer func() { 310 if err := rows.Close(); err != nil { 311 log.Printf("Failed to close rows: %v", err) 312 } 313 }() 314 315 var result []*comments.Comment 316 for rows.Next() { 317 var comment comments.Comment 318 var langs pq.StringArray 319 320 err := rows.Scan( 321 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 322 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 323 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 324 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy, 325 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 326 ) 327 if err != nil { 328 return nil, fmt.Errorf("failed to scan comment: %w", err) 329 } 330 331 comment.Langs = langs 332 result = append(result, &comment) 333 } 334 335 if err = rows.Err(); err != nil { 336 return nil, fmt.Errorf("error iterating comments: %w", err) 337 } 338 339 return result, nil 340} 341 342// CountByParent counts direct replies to a post or comment 343// Used for showing reply counts in threading UI 344func (r *postgresCommentRepo) CountByParent(ctx context.Context, parentURI string) (int, error) { 345 query := ` 346 SELECT COUNT(*) 347 FROM comments 348 WHERE parent_uri = $1 AND deleted_at IS NULL 349 ` 350 351 var count int 352 err := r.db.QueryRowContext(ctx, query, parentURI).Scan(&count) 353 if err != nil { 354 return 0, fmt.Errorf("failed to count comments by parent: %w", err) 355 } 356 357 return count, nil 358} 359 360// ListByCommenter retrieves all active comments by a specific user 361// Used for user comment history - filters out deleted comments 362func (r *postgresCommentRepo) ListByCommenter(ctx context.Context, commenterDID string, limit, offset int) ([]*comments.Comment, error) { 363 query := ` 364 SELECT 365 id, uri, cid, rkey, commenter_did, 366 root_uri, root_cid, parent_uri, parent_cid, 367 content, content_facets, embed, content_labels, langs, 368 created_at, indexed_at, deleted_at, deletion_reason, deleted_by, 369 upvote_count, downvote_count, score, reply_count 370 FROM comments 371 WHERE commenter_did = $1 AND deleted_at IS NULL 372 ORDER BY created_at DESC 373 LIMIT $2 OFFSET $3 374 ` 375 376 rows, err := r.db.QueryContext(ctx, query, commenterDID, limit, offset) 377 if err != nil { 378 return nil, fmt.Errorf("failed to list comments by commenter: %w", err) 379 } 380 defer func() { 381 if err := rows.Close(); err != nil { 382 log.Printf("Failed to close rows: %v", err) 383 } 384 }() 385 386 var result []*comments.Comment 387 for rows.Next() { 388 var comment comments.Comment 389 var langs pq.StringArray 390 391 err := rows.Scan( 392 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 393 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 394 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 395 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy, 396 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 397 ) 398 if err != nil { 399 return nil, fmt.Errorf("failed to scan comment: %w", err) 400 } 401 402 comment.Langs = langs 403 result = append(result, &comment) 404 } 405 406 if err = rows.Err(); err != nil { 407 return nil, fmt.Errorf("error iterating comments: %w", err) 408 } 409 410 return result, nil 411} 412 413// ListByParentWithHotRank retrieves direct replies to a post or comment with sorting and pagination 414// Supports three sort modes: hot (Lemmy algorithm), top (by score + timeframe), and new (by created_at) 415// Uses cursor-based pagination with composite keys for consistent ordering 416// Hydrates author info (handle, display_name, avatar) via JOIN with users table 417func (r *postgresCommentRepo) ListByParentWithHotRank( 418 ctx context.Context, 419 parentURI string, 420 sort string, 421 timeframe string, 422 limit int, 423 cursor *string, 424) ([]*comments.Comment, *string, error) { 425 // Build ORDER BY clause and time filter based on sort type 426 orderBy, timeFilter := r.buildCommentSortClause(sort, timeframe) 427 428 // Parse cursor for pagination 429 cursorFilter, cursorValues, err := r.parseCommentCursor(cursor, sort) 430 if err != nil { 431 return nil, nil, fmt.Errorf("invalid cursor: %w", err) 432 } 433 434 // Build SELECT clause - compute hot_rank for "hot" sort 435 // Hot rank formula (Lemmy algorithm): 436 // log(greatest(2, score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600) + 2), 1.8) 437 // 438 // This formula: 439 // - Gives logarithmic weight to score (prevents high-score dominance) 440 // - Decays over time with power 1.8 (faster than linear, slower than quadratic) 441 // - Uses hours as time unit (3600 seconds) 442 // - Adds constants to prevent division by zero and ensure positive values 443 var selectClause string 444 if sort == "hot" { 445 selectClause = ` 446 SELECT 447 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 448 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 449 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 450 c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by, 451 c.upvote_count, c.downvote_count, c.score, c.reply_count, 452 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank, 453 COALESCE(u.handle, c.commenter_did) as author_handle 454 FROM comments c` 455 } else { 456 selectClause = ` 457 SELECT 458 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 459 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 460 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 461 c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by, 462 c.upvote_count, c.downvote_count, c.score, c.reply_count, 463 NULL::numeric as hot_rank, 464 COALESCE(u.handle, c.commenter_did) as author_handle 465 FROM comments c` 466 } 467 468 // Build complete query with JOINs and filters 469 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events) 470 // Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders) 471 query := fmt.Sprintf(` 472 %s 473 LEFT JOIN users u ON c.commenter_did = u.did 474 WHERE c.parent_uri = $1 475 %s 476 %s 477 ORDER BY %s 478 LIMIT $2 479 `, selectClause, timeFilter, cursorFilter, orderBy) 480 481 // Prepare query arguments 482 args := []interface{}{parentURI, limit + 1} // +1 to detect next page 483 args = append(args, cursorValues...) 484 485 // Execute query 486 rows, err := r.db.QueryContext(ctx, query, args...) 487 if err != nil { 488 return nil, nil, fmt.Errorf("failed to query comments with hot rank: %w", err) 489 } 490 defer func() { 491 if err := rows.Close(); err != nil { 492 log.Printf("Failed to close rows: %v", err) 493 } 494 }() 495 496 // Scan results 497 var result []*comments.Comment 498 var hotRanks []float64 499 for rows.Next() { 500 var comment comments.Comment 501 var langs pq.StringArray 502 var hotRank sql.NullFloat64 503 var authorHandle string 504 505 err := rows.Scan( 506 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 507 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 508 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 509 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy, 510 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 511 &hotRank, &authorHandle, 512 ) 513 if err != nil { 514 return nil, nil, fmt.Errorf("failed to scan comment: %w", err) 515 } 516 517 comment.Langs = langs 518 comment.CommenterHandle = authorHandle 519 520 // Store hot_rank for cursor building 521 hotRankValue := 0.0 522 if hotRank.Valid { 523 hotRankValue = hotRank.Float64 524 } 525 hotRanks = append(hotRanks, hotRankValue) 526 527 result = append(result, &comment) 528 } 529 530 if err = rows.Err(); err != nil { 531 return nil, nil, fmt.Errorf("error iterating comments: %w", err) 532 } 533 534 // Handle pagination cursor 535 var nextCursor *string 536 if len(result) > limit && limit > 0 { 537 result = result[:limit] 538 hotRanks = hotRanks[:limit] 539 lastComment := result[len(result)-1] 540 lastHotRank := hotRanks[len(hotRanks)-1] 541 cursorStr := r.buildCommentCursor(lastComment, sort, lastHotRank) 542 nextCursor = &cursorStr 543 } 544 545 return result, nextCursor, nil 546} 547 548// buildCommentSortClause returns the ORDER BY SQL and optional time filter 549func (r *postgresCommentRepo) buildCommentSortClause(sort, timeframe string) (string, string) { 550 var orderBy string 551 switch sort { 552 case "hot": 553 // Hot rank DESC, then score DESC as tiebreaker, then created_at DESC, then uri DESC 554 orderBy = `hot_rank DESC, c.score DESC, c.created_at DESC, c.uri DESC` 555 case "top": 556 // Score DESC, then created_at DESC, then uri DESC 557 orderBy = `c.score DESC, c.created_at DESC, c.uri DESC` 558 case "new": 559 // Created at DESC, then uri DESC 560 orderBy = `c.created_at DESC, c.uri DESC` 561 default: 562 // Default to hot 563 orderBy = `hot_rank DESC, c.score DESC, c.created_at DESC, c.uri DESC` 564 } 565 566 // Add time filter for "top" sort 567 var timeFilter string 568 if sort == "top" { 569 timeFilter = r.buildCommentTimeFilter(timeframe) 570 } 571 572 return orderBy, timeFilter 573} 574 575// buildCommentTimeFilter returns SQL filter for timeframe 576func (r *postgresCommentRepo) buildCommentTimeFilter(timeframe string) string { 577 if timeframe == "" || timeframe == "all" { 578 return "" 579 } 580 581 var interval string 582 switch timeframe { 583 case "hour": 584 interval = "1 hour" 585 case "day": 586 interval = "1 day" 587 case "week": 588 interval = "7 days" 589 case "month": 590 interval = "30 days" 591 case "year": 592 interval = "1 year" 593 default: 594 return "" 595 } 596 597 return fmt.Sprintf("AND c.created_at >= NOW() - INTERVAL '%s'", interval) 598} 599 600// parseCommentCursor decodes pagination cursor for comments 601func (r *postgresCommentRepo) parseCommentCursor(cursor *string, sort string) (string, []interface{}, error) { 602 if cursor == nil || *cursor == "" { 603 return "", nil, nil 604 } 605 606 // Validate cursor size to prevent DoS via massive base64 strings 607 const maxCursorSize = 1024 608 if len(*cursor) > maxCursorSize { 609 return "", nil, fmt.Errorf("cursor too large: maximum %d bytes", maxCursorSize) 610 } 611 612 // Decode base64 cursor 613 decoded, err := base64.URLEncoding.DecodeString(*cursor) 614 if err != nil { 615 return "", nil, fmt.Errorf("invalid cursor encoding") 616 } 617 618 // Parse cursor based on sort type using | delimiter 619 // Format: hotRank|score|createdAt|uri (for hot) 620 // score|createdAt|uri (for top) 621 // createdAt|uri (for new) 622 parts := strings.Split(string(decoded), "|") 623 624 switch sort { 625 case "new": 626 // Cursor format: createdAt|uri 627 if len(parts) != 2 { 628 return "", nil, fmt.Errorf("invalid cursor format for new sort") 629 } 630 631 createdAt := parts[0] 632 uri := parts[1] 633 634 // Validate AT-URI format 635 if !strings.HasPrefix(uri, "at://") { 636 return "", nil, fmt.Errorf("invalid cursor URI") 637 } 638 639 filter := `AND (c.created_at < $3 OR (c.created_at = $3 AND c.uri < $4))` 640 return filter, []interface{}{createdAt, uri}, nil 641 642 case "top": 643 // Cursor format: score|createdAt|uri 644 if len(parts) != 3 { 645 return "", nil, fmt.Errorf("invalid cursor format for top sort") 646 } 647 648 scoreStr := parts[0] 649 createdAt := parts[1] 650 uri := parts[2] 651 652 // Parse score as integer 653 score := 0 654 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil { 655 return "", nil, fmt.Errorf("invalid cursor score") 656 } 657 658 // Validate AT-URI format 659 if !strings.HasPrefix(uri, "at://") { 660 return "", nil, fmt.Errorf("invalid cursor URI") 661 } 662 663 filter := `AND (c.score < $3 OR (c.score = $3 AND c.created_at < $4) OR (c.score = $3 AND c.created_at = $4 AND c.uri < $5))` 664 return filter, []interface{}{score, createdAt, uri}, nil 665 666 case "hot": 667 // Cursor format: hotRank|score|createdAt|uri 668 if len(parts) != 4 { 669 return "", nil, fmt.Errorf("invalid cursor format for hot sort") 670 } 671 672 hotRankStr := parts[0] 673 scoreStr := parts[1] 674 createdAt := parts[2] 675 uri := parts[3] 676 677 // Parse hot_rank as float 678 hotRank := 0.0 679 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil { 680 return "", nil, fmt.Errorf("invalid cursor hot rank") 681 } 682 683 // Parse score as integer 684 score := 0 685 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil { 686 return "", nil, fmt.Errorf("invalid cursor score") 687 } 688 689 // Validate AT-URI format 690 if !strings.HasPrefix(uri, "at://") { 691 return "", nil, fmt.Errorf("invalid cursor URI") 692 } 693 694 // Use computed hot_rank expression in comparison 695 hotRankExpr := `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8)` 696 filter := fmt.Sprintf(`AND ((%s < $3 OR (%s = $3 AND c.score < $4) OR (%s = $3 AND c.score = $4 AND c.created_at < $5) OR (%s = $3 AND c.score = $4 AND c.created_at = $5 AND c.uri < $6)) AND c.uri != $7)`, 697 hotRankExpr, hotRankExpr, hotRankExpr, hotRankExpr) 698 return filter, []interface{}{hotRank, score, createdAt, uri, uri}, nil 699 700 default: 701 return "", nil, nil 702 } 703} 704 705// buildCommentCursor creates pagination cursor from last comment 706func (r *postgresCommentRepo) buildCommentCursor(comment *comments.Comment, sort string, hotRank float64) string { 707 var cursorStr string 708 const delimiter = "|" 709 710 switch sort { 711 case "new": 712 // Format: createdAt|uri 713 cursorStr = fmt.Sprintf("%s%s%s", 714 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), 715 delimiter, 716 comment.URI) 717 718 case "top": 719 // Format: score|createdAt|uri 720 cursorStr = fmt.Sprintf("%d%s%s%s%s", 721 comment.Score, 722 delimiter, 723 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), 724 delimiter, 725 comment.URI) 726 727 case "hot": 728 // Format: hotRank|score|createdAt|uri 729 cursorStr = fmt.Sprintf("%f%s%d%s%s%s%s", 730 hotRank, 731 delimiter, 732 comment.Score, 733 delimiter, 734 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), 735 delimiter, 736 comment.URI) 737 738 default: 739 cursorStr = comment.URI 740 } 741 742 return base64.URLEncoding.EncodeToString([]byte(cursorStr)) 743} 744 745// GetByURIsBatch retrieves multiple comments by their AT-URIs in a single query 746// Returns map[uri]*Comment for efficient lookups without N+1 queries 747// Includes deleted comments to preserve thread structure 748func (r *postgresCommentRepo) GetByURIsBatch(ctx context.Context, uris []string) (map[string]*comments.Comment, error) { 749 if len(uris) == 0 { 750 return make(map[string]*comments.Comment), nil 751 } 752 753 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events) 754 // COALESCE falls back to DID when handle is NULL (user not yet in users table) 755 // Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders) 756 query := ` 757 SELECT 758 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 759 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 760 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 761 c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by, 762 c.upvote_count, c.downvote_count, c.score, c.reply_count, 763 COALESCE(u.handle, c.commenter_did) as author_handle 764 FROM comments c 765 LEFT JOIN users u ON c.commenter_did = u.did 766 WHERE c.uri = ANY($1) 767 ` 768 769 rows, err := r.db.QueryContext(ctx, query, pq.Array(uris)) 770 if err != nil { 771 return nil, fmt.Errorf("failed to batch get comments by URIs: %w", err) 772 } 773 defer func() { 774 if err := rows.Close(); err != nil { 775 log.Printf("Failed to close rows: %v", err) 776 } 777 }() 778 779 result := make(map[string]*comments.Comment) 780 for rows.Next() { 781 var comment comments.Comment 782 var langs pq.StringArray 783 var authorHandle string 784 785 err := rows.Scan( 786 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 787 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 788 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 789 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy, 790 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 791 &authorHandle, 792 ) 793 if err != nil { 794 return nil, fmt.Errorf("failed to scan comment: %w", err) 795 } 796 797 comment.Langs = langs 798 result[comment.URI] = &comment 799 } 800 801 if err = rows.Err(); err != nil { 802 return nil, fmt.Errorf("error iterating comments: %w", err) 803 } 804 805 return result, nil 806} 807 808// ListByParentsBatch retrieves direct replies to multiple parents in a single query 809// Groups results by parent URI to prevent N+1 queries when loading nested replies 810// Uses window functions to limit results per parent efficiently 811func (r *postgresCommentRepo) ListByParentsBatch( 812 ctx context.Context, 813 parentURIs []string, 814 sort string, 815 limitPerParent int, 816) (map[string][]*comments.Comment, error) { 817 if len(parentURIs) == 0 { 818 return make(map[string][]*comments.Comment), nil 819 } 820 821 // Build ORDER BY clause based on sort type 822 // windowOrderBy must inline expressions (can't use SELECT aliases in window functions) 823 var windowOrderBy string 824 var selectClause string 825 switch sort { 826 case "hot": 827 selectClause = ` 828 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 829 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 830 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 831 c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by, 832 c.upvote_count, c.downvote_count, c.score, c.reply_count, 833 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank, 834 COALESCE(u.handle, c.commenter_did) as author_handle` 835 // CRITICAL: Must inline hot_rank formula - PostgreSQL doesn't allow SELECT aliases in window ORDER BY 836 windowOrderBy = `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) DESC, c.score DESC, c.created_at DESC` 837 case "top": 838 selectClause = ` 839 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 840 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 841 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 842 c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by, 843 c.upvote_count, c.downvote_count, c.score, c.reply_count, 844 NULL::numeric as hot_rank, 845 COALESCE(u.handle, c.commenter_did) as author_handle` 846 windowOrderBy = `c.score DESC, c.created_at DESC` 847 case "new": 848 selectClause = ` 849 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 850 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 851 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 852 c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by, 853 c.upvote_count, c.downvote_count, c.score, c.reply_count, 854 NULL::numeric as hot_rank, 855 COALESCE(u.handle, c.commenter_did) as author_handle` 856 windowOrderBy = `c.created_at DESC` 857 default: 858 // Default to hot 859 selectClause = ` 860 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 861 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 862 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 863 c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by, 864 c.upvote_count, c.downvote_count, c.score, c.reply_count, 865 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank, 866 COALESCE(u.handle, c.commenter_did) as author_handle` 867 // CRITICAL: Must inline hot_rank formula - PostgreSQL doesn't allow SELECT aliases in window ORDER BY 868 windowOrderBy = `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) DESC, c.score DESC, c.created_at DESC` 869 } 870 871 // Use window function to limit results per parent 872 // This is more efficient than LIMIT in a subquery per parent 873 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events) 874 // Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders) 875 query := fmt.Sprintf(` 876 WITH ranked_comments AS ( 877 SELECT 878 %s, 879 ROW_NUMBER() OVER ( 880 PARTITION BY c.parent_uri 881 ORDER BY %s 882 ) as rn 883 FROM comments c 884 LEFT JOIN users u ON c.commenter_did = u.did 885 WHERE c.parent_uri = ANY($1) 886 ) 887 SELECT 888 id, uri, cid, rkey, commenter_did, 889 root_uri, root_cid, parent_uri, parent_cid, 890 content, content_facets, embed, content_labels, langs, 891 created_at, indexed_at, deleted_at, deletion_reason, deleted_by, 892 upvote_count, downvote_count, score, reply_count, 893 hot_rank, author_handle 894 FROM ranked_comments 895 WHERE rn <= $2 896 ORDER BY parent_uri, rn 897 `, selectClause, windowOrderBy) 898 899 rows, err := r.db.QueryContext(ctx, query, pq.Array(parentURIs), limitPerParent) 900 if err != nil { 901 return nil, fmt.Errorf("failed to batch query comments by parents: %w", err) 902 } 903 defer func() { 904 if err := rows.Close(); err != nil { 905 log.Printf("Failed to close rows: %v", err) 906 } 907 }() 908 909 // Group results by parent URI 910 result := make(map[string][]*comments.Comment) 911 for rows.Next() { 912 var comment comments.Comment 913 var langs pq.StringArray 914 var hotRank sql.NullFloat64 915 var authorHandle string 916 917 err := rows.Scan( 918 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 919 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 920 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 921 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy, 922 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 923 &hotRank, &authorHandle, 924 ) 925 if err != nil { 926 return nil, fmt.Errorf("failed to scan comment: %w", err) 927 } 928 929 comment.Langs = langs 930 comment.CommenterHandle = authorHandle 931 932 // Group by parent URI 933 result[comment.ParentURI] = append(result[comment.ParentURI], &comment) 934 } 935 936 if err = rows.Err(); err != nil { 937 return nil, fmt.Errorf("error iterating comments: %w", err) 938 } 939 940 return result, nil 941} 942 943// GetVoteStateForComments retrieves the viewer's votes on a batch of comments 944// Returns map[commentURI]*Vote for efficient lookups 945// Note: This implementation is prepared for when the votes table indexing is implemented 946// Currently returns an empty map as votes may not be fully indexed yet 947func (r *postgresCommentRepo) GetVoteStateForComments(ctx context.Context, viewerDID string, commentURIs []string) (map[string]interface{}, error) { 948 if len(commentURIs) == 0 || viewerDID == "" { 949 return make(map[string]interface{}), nil 950 } 951 952 // Query votes table for viewer's votes on these comments 953 // Note: This assumes votes table exists and is being indexed 954 // If votes table doesn't exist yet, this query will fail gracefully 955 query := ` 956 SELECT subject_uri, direction, uri 957 FROM votes 958 WHERE voter_did = $1 AND subject_uri = ANY($2) AND deleted_at IS NULL 959 ` 960 961 rows, err := r.db.QueryContext(ctx, query, viewerDID, pq.Array(commentURIs)) 962 if err != nil { 963 // If votes table doesn't exist yet, return empty map instead of error 964 // This allows the API to work before votes indexing is fully implemented 965 if strings.Contains(err.Error(), "does not exist") { 966 return make(map[string]interface{}), nil 967 } 968 return nil, fmt.Errorf("failed to get vote state for comments: %w", err) 969 } 970 defer func() { 971 if err := rows.Close(); err != nil { 972 log.Printf("Failed to close rows: %v", err) 973 } 974 }() 975 976 // Build result map with vote information 977 result := make(map[string]interface{}) 978 for rows.Next() { 979 var subjectURI, direction, uri string 980 981 err := rows.Scan(&subjectURI, &direction, &uri) 982 if err != nil { 983 return nil, fmt.Errorf("failed to scan vote: %w", err) 984 } 985 986 // Store vote info as a simple map (can be enhanced later with proper Vote struct) 987 result[subjectURI] = map[string]interface{}{ 988 "direction": direction, 989 "uri": uri, 990 } 991 } 992 993 if err = rows.Err(); err != nil { 994 return nil, fmt.Errorf("error iterating votes: %w", err) 995 } 996 997 return result, nil 998}