A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/base64" 7 "fmt" 8 "log" 9 "strings" 10 11 "Coves/internal/core/comments" 12 13 "github.com/lib/pq" 14) 15 16type postgresCommentRepo struct { 17 db *sql.DB 18} 19 20// NewCommentRepository creates a new PostgreSQL comment repository 21func NewCommentRepository(db *sql.DB) comments.Repository { 22 return &postgresCommentRepo{db: db} 23} 24 25// Create inserts a new comment into the comments table 26// Called by Jetstream consumer after comment is created on PDS 27// Idempotent: Returns success if comment already exists (for Jetstream replays) 28func (r *postgresCommentRepo) Create(ctx context.Context, comment *comments.Comment) error { 29 query := ` 30 INSERT INTO comments ( 31 uri, cid, rkey, commenter_did, 32 root_uri, root_cid, parent_uri, parent_cid, 33 content, content_facets, embed, content_labels, langs, 34 created_at, indexed_at 35 ) VALUES ( 36 $1, $2, $3, $4, 37 $5, $6, $7, $8, 38 $9, $10, $11, $12, $13, 39 $14, NOW() 40 ) 41 ON CONFLICT (uri) DO NOTHING 42 RETURNING id, indexed_at 43 ` 44 45 err := r.db.QueryRowContext( 46 ctx, query, 47 comment.URI, comment.CID, comment.RKey, comment.CommenterDID, 48 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID, 49 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs), 50 comment.CreatedAt, 51 ).Scan(&comment.ID, &comment.IndexedAt) 52 53 // ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent) 54 if err == sql.ErrNoRows { 55 return nil // Comment already exists, no error for idempotency 56 } 57 58 if err != nil { 59 // Check for unique constraint violation 60 if strings.Contains(err.Error(), "duplicate key") { 61 return comments.ErrCommentAlreadyExists 62 } 63 64 return fmt.Errorf("failed to insert comment: %w", err) 65 } 66 67 return nil 68} 69 70// Update modifies an existing comment's content fields 71// Called by Jetstream consumer after comment is updated on PDS 72// Preserves vote counts and created_at timestamp 73func (r *postgresCommentRepo) Update(ctx context.Context, comment *comments.Comment) error { 74 query := ` 75 UPDATE comments 76 SET 77 cid = $1, 78 content = $2, 79 content_facets = $3, 80 embed = $4, 81 content_labels = $5, 82 langs = $6 83 WHERE uri = $7 AND deleted_at IS NULL 84 RETURNING id, indexed_at, created_at, upvote_count, downvote_count, score, reply_count 85 ` 86 87 err := r.db.QueryRowContext( 88 ctx, query, 89 comment.CID, 90 comment.Content, 91 comment.ContentFacets, 92 comment.Embed, 93 comment.ContentLabels, 94 pq.Array(comment.Langs), 95 comment.URI, 96 ).Scan( 97 &comment.ID, 98 &comment.IndexedAt, 99 &comment.CreatedAt, 100 &comment.UpvoteCount, 101 &comment.DownvoteCount, 102 &comment.Score, 103 &comment.ReplyCount, 104 ) 105 106 if err == sql.ErrNoRows { 107 return comments.ErrCommentNotFound 108 } 109 if err != nil { 110 return fmt.Errorf("failed to update comment: %w", err) 111 } 112 113 return nil 114} 115 116// GetByURI retrieves a comment by its AT-URI 117// Used by Jetstream consumer for UPDATE/DELETE operations 118func (r *postgresCommentRepo) GetByURI(ctx context.Context, uri string) (*comments.Comment, error) { 119 query := ` 120 SELECT 121 id, uri, cid, rkey, commenter_did, 122 root_uri, root_cid, parent_uri, parent_cid, 123 content, content_facets, embed, content_labels, langs, 124 created_at, indexed_at, deleted_at, 125 upvote_count, downvote_count, score, reply_count 126 FROM comments 127 WHERE uri = $1 128 ` 129 130 var comment comments.Comment 131 var langs pq.StringArray 132 133 err := r.db.QueryRowContext(ctx, query, uri).Scan( 134 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 135 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 136 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 137 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 138 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 139 ) 140 141 if err == sql.ErrNoRows { 142 return nil, comments.ErrCommentNotFound 143 } 144 if err != nil { 145 return nil, fmt.Errorf("failed to get comment by URI: %w", err) 146 } 147 148 comment.Langs = langs 149 150 return &comment, nil 151} 152 153// Delete soft-deletes a comment (sets deleted_at) 154// Called by Jetstream consumer after comment is deleted from PDS 155// Idempotent: Returns success if comment already deleted 156func (r *postgresCommentRepo) Delete(ctx context.Context, uri string) error { 157 query := ` 158 UPDATE comments 159 SET deleted_at = NOW() 160 WHERE uri = $1 AND deleted_at IS NULL 161 ` 162 163 result, err := r.db.ExecContext(ctx, query, uri) 164 if err != nil { 165 return fmt.Errorf("failed to delete comment: %w", err) 166 } 167 168 rowsAffected, err := result.RowsAffected() 169 if err != nil { 170 return fmt.Errorf("failed to check delete result: %w", err) 171 } 172 173 // Idempotent: If no rows affected, comment already deleted (OK for Jetstream replays) 174 if rowsAffected == 0 { 175 return nil 176 } 177 178 return nil 179} 180 181// ListByRoot retrieves all active comments in a thread (flat) 182// Used for fetching entire comment threads on posts 183func (r *postgresCommentRepo) ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*comments.Comment, error) { 184 query := ` 185 SELECT 186 id, uri, cid, rkey, commenter_did, 187 root_uri, root_cid, parent_uri, parent_cid, 188 content, content_facets, embed, content_labels, langs, 189 created_at, indexed_at, deleted_at, 190 upvote_count, downvote_count, score, reply_count 191 FROM comments 192 WHERE root_uri = $1 AND deleted_at IS NULL 193 ORDER BY created_at ASC 194 LIMIT $2 OFFSET $3 195 ` 196 197 rows, err := r.db.QueryContext(ctx, query, rootURI, limit, offset) 198 if err != nil { 199 return nil, fmt.Errorf("failed to list comments by root: %w", err) 200 } 201 defer func() { 202 if err := rows.Close(); err != nil { 203 log.Printf("Failed to close rows: %v", err) 204 } 205 }() 206 207 var result []*comments.Comment 208 for rows.Next() { 209 var comment comments.Comment 210 var langs pq.StringArray 211 212 err := rows.Scan( 213 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 214 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 215 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 216 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 217 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 218 ) 219 if err != nil { 220 return nil, fmt.Errorf("failed to scan comment: %w", err) 221 } 222 223 comment.Langs = langs 224 result = append(result, &comment) 225 } 226 227 if err = rows.Err(); err != nil { 228 return nil, fmt.Errorf("error iterating comments: %w", err) 229 } 230 231 return result, nil 232} 233 234// ListByParent retrieves direct replies to a post or comment 235// Used for building nested/threaded comment views 236func (r *postgresCommentRepo) ListByParent(ctx context.Context, parentURI string, limit, offset int) ([]*comments.Comment, error) { 237 query := ` 238 SELECT 239 id, uri, cid, rkey, commenter_did, 240 root_uri, root_cid, parent_uri, parent_cid, 241 content, content_facets, embed, content_labels, langs, 242 created_at, indexed_at, deleted_at, 243 upvote_count, downvote_count, score, reply_count 244 FROM comments 245 WHERE parent_uri = $1 AND deleted_at IS NULL 246 ORDER BY created_at ASC 247 LIMIT $2 OFFSET $3 248 ` 249 250 rows, err := r.db.QueryContext(ctx, query, parentURI, limit, offset) 251 if err != nil { 252 return nil, fmt.Errorf("failed to list comments by parent: %w", err) 253 } 254 defer func() { 255 if err := rows.Close(); err != nil { 256 log.Printf("Failed to close rows: %v", err) 257 } 258 }() 259 260 var result []*comments.Comment 261 for rows.Next() { 262 var comment comments.Comment 263 var langs pq.StringArray 264 265 err := rows.Scan( 266 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 267 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 268 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 269 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 270 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 271 ) 272 if err != nil { 273 return nil, fmt.Errorf("failed to scan comment: %w", err) 274 } 275 276 comment.Langs = langs 277 result = append(result, &comment) 278 } 279 280 if err = rows.Err(); err != nil { 281 return nil, fmt.Errorf("error iterating comments: %w", err) 282 } 283 284 return result, nil 285} 286 287// CountByParent counts direct replies to a post or comment 288// Used for showing reply counts in threading UI 289func (r *postgresCommentRepo) CountByParent(ctx context.Context, parentURI string) (int, error) { 290 query := ` 291 SELECT COUNT(*) 292 FROM comments 293 WHERE parent_uri = $1 AND deleted_at IS NULL 294 ` 295 296 var count int 297 err := r.db.QueryRowContext(ctx, query, parentURI).Scan(&count) 298 if err != nil { 299 return 0, fmt.Errorf("failed to count comments by parent: %w", err) 300 } 301 302 return count, nil 303} 304 305// ListByCommenter retrieves all active comments by a specific user 306// Future: Used for user comment history 307func (r *postgresCommentRepo) ListByCommenter(ctx context.Context, commenterDID string, limit, offset int) ([]*comments.Comment, error) { 308 query := ` 309 SELECT 310 id, uri, cid, rkey, commenter_did, 311 root_uri, root_cid, parent_uri, parent_cid, 312 content, content_facets, embed, content_labels, langs, 313 created_at, indexed_at, deleted_at, 314 upvote_count, downvote_count, score, reply_count 315 FROM comments 316 WHERE commenter_did = $1 AND deleted_at IS NULL 317 ORDER BY created_at DESC 318 LIMIT $2 OFFSET $3 319 ` 320 321 rows, err := r.db.QueryContext(ctx, query, commenterDID, limit, offset) 322 if err != nil { 323 return nil, fmt.Errorf("failed to list comments by commenter: %w", err) 324 } 325 defer func() { 326 if err := rows.Close(); err != nil { 327 log.Printf("Failed to close rows: %v", err) 328 } 329 }() 330 331 var result []*comments.Comment 332 for rows.Next() { 333 var comment comments.Comment 334 var langs pq.StringArray 335 336 err := rows.Scan( 337 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 338 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 339 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 340 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 341 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 342 ) 343 if err != nil { 344 return nil, fmt.Errorf("failed to scan comment: %w", err) 345 } 346 347 comment.Langs = langs 348 result = append(result, &comment) 349 } 350 351 if err = rows.Err(); err != nil { 352 return nil, fmt.Errorf("error iterating comments: %w", err) 353 } 354 355 return result, nil 356} 357 358// ListByParentWithHotRank retrieves direct replies to a post or comment with sorting and pagination 359// Supports three sort modes: hot (Lemmy algorithm), top (by score + timeframe), and new (by created_at) 360// Uses cursor-based pagination with composite keys for consistent ordering 361// Hydrates author info (handle, display_name, avatar) via JOIN with users table 362func (r *postgresCommentRepo) ListByParentWithHotRank( 363 ctx context.Context, 364 parentURI string, 365 sort string, 366 timeframe string, 367 limit int, 368 cursor *string, 369) ([]*comments.Comment, *string, error) { 370 // Build ORDER BY clause and time filter based on sort type 371 orderBy, timeFilter := r.buildCommentSortClause(sort, timeframe) 372 373 // Parse cursor for pagination 374 cursorFilter, cursorValues, err := r.parseCommentCursor(cursor, sort) 375 if err != nil { 376 return nil, nil, fmt.Errorf("invalid cursor: %w", err) 377 } 378 379 // Build SELECT clause - compute hot_rank for "hot" sort 380 // Hot rank formula (Lemmy algorithm): 381 // log(greatest(2, score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600) + 2), 1.8) 382 // 383 // This formula: 384 // - Gives logarithmic weight to score (prevents high-score dominance) 385 // - Decays over time with power 1.8 (faster than linear, slower than quadratic) 386 // - Uses hours as time unit (3600 seconds) 387 // - Adds constants to prevent division by zero and ensure positive values 388 var selectClause string 389 if sort == "hot" { 390 selectClause = ` 391 SELECT 392 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 393 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 394 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 395 c.created_at, c.indexed_at, c.deleted_at, 396 c.upvote_count, c.downvote_count, c.score, c.reply_count, 397 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank, 398 COALESCE(u.handle, c.commenter_did) as author_handle 399 FROM comments c` 400 } else { 401 selectClause = ` 402 SELECT 403 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 404 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 405 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 406 c.created_at, c.indexed_at, c.deleted_at, 407 c.upvote_count, c.downvote_count, c.score, c.reply_count, 408 NULL::numeric as hot_rank, 409 COALESCE(u.handle, c.commenter_did) as author_handle 410 FROM comments c` 411 } 412 413 // Build complete query with JOINs and filters 414 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events) 415 query := fmt.Sprintf(` 416 %s 417 LEFT JOIN users u ON c.commenter_did = u.did 418 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL 419 %s 420 %s 421 ORDER BY %s 422 LIMIT $2 423 `, selectClause, timeFilter, cursorFilter, orderBy) 424 425 // Prepare query arguments 426 args := []interface{}{parentURI, limit + 1} // +1 to detect next page 427 args = append(args, cursorValues...) 428 429 // Execute query 430 rows, err := r.db.QueryContext(ctx, query, args...) 431 if err != nil { 432 return nil, nil, fmt.Errorf("failed to query comments with hot rank: %w", err) 433 } 434 defer func() { 435 if err := rows.Close(); err != nil { 436 log.Printf("Failed to close rows: %v", err) 437 } 438 }() 439 440 // Scan results 441 var result []*comments.Comment 442 var hotRanks []float64 443 for rows.Next() { 444 var comment comments.Comment 445 var langs pq.StringArray 446 var hotRank sql.NullFloat64 447 var authorHandle string 448 449 err := rows.Scan( 450 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 451 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 452 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 453 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 454 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 455 &hotRank, &authorHandle, 456 ) 457 if err != nil { 458 return nil, nil, fmt.Errorf("failed to scan comment: %w", err) 459 } 460 461 comment.Langs = langs 462 comment.CommenterHandle = authorHandle 463 464 // Store hot_rank for cursor building 465 hotRankValue := 0.0 466 if hotRank.Valid { 467 hotRankValue = hotRank.Float64 468 } 469 hotRanks = append(hotRanks, hotRankValue) 470 471 result = append(result, &comment) 472 } 473 474 if err = rows.Err(); err != nil { 475 return nil, nil, fmt.Errorf("error iterating comments: %w", err) 476 } 477 478 // Handle pagination cursor 479 var nextCursor *string 480 if len(result) > limit && limit > 0 { 481 result = result[:limit] 482 hotRanks = hotRanks[:limit] 483 lastComment := result[len(result)-1] 484 lastHotRank := hotRanks[len(hotRanks)-1] 485 cursorStr := r.buildCommentCursor(lastComment, sort, lastHotRank) 486 nextCursor = &cursorStr 487 } 488 489 return result, nextCursor, nil 490} 491 492// buildCommentSortClause returns the ORDER BY SQL and optional time filter 493func (r *postgresCommentRepo) buildCommentSortClause(sort, timeframe string) (string, string) { 494 var orderBy string 495 switch sort { 496 case "hot": 497 // Hot rank DESC, then score DESC as tiebreaker, then created_at DESC, then uri DESC 498 orderBy = `hot_rank DESC, c.score DESC, c.created_at DESC, c.uri DESC` 499 case "top": 500 // Score DESC, then created_at DESC, then uri DESC 501 orderBy = `c.score DESC, c.created_at DESC, c.uri DESC` 502 case "new": 503 // Created at DESC, then uri DESC 504 orderBy = `c.created_at DESC, c.uri DESC` 505 default: 506 // Default to hot 507 orderBy = `hot_rank DESC, c.score DESC, c.created_at DESC, c.uri DESC` 508 } 509 510 // Add time filter for "top" sort 511 var timeFilter string 512 if sort == "top" { 513 timeFilter = r.buildCommentTimeFilter(timeframe) 514 } 515 516 return orderBy, timeFilter 517} 518 519// buildCommentTimeFilter returns SQL filter for timeframe 520func (r *postgresCommentRepo) buildCommentTimeFilter(timeframe string) string { 521 if timeframe == "" || timeframe == "all" { 522 return "" 523 } 524 525 var interval string 526 switch timeframe { 527 case "hour": 528 interval = "1 hour" 529 case "day": 530 interval = "1 day" 531 case "week": 532 interval = "7 days" 533 case "month": 534 interval = "30 days" 535 case "year": 536 interval = "1 year" 537 default: 538 return "" 539 } 540 541 return fmt.Sprintf("AND c.created_at >= NOW() - INTERVAL '%s'", interval) 542} 543 544// parseCommentCursor decodes pagination cursor for comments 545func (r *postgresCommentRepo) parseCommentCursor(cursor *string, sort string) (string, []interface{}, error) { 546 if cursor == nil || *cursor == "" { 547 return "", nil, nil 548 } 549 550 // Validate cursor size to prevent DoS via massive base64 strings 551 const maxCursorSize = 1024 552 if len(*cursor) > maxCursorSize { 553 return "", nil, fmt.Errorf("cursor too large: maximum %d bytes", maxCursorSize) 554 } 555 556 // Decode base64 cursor 557 decoded, err := base64.URLEncoding.DecodeString(*cursor) 558 if err != nil { 559 return "", nil, fmt.Errorf("invalid cursor encoding") 560 } 561 562 // Parse cursor based on sort type using | delimiter 563 // Format: hotRank|score|createdAt|uri (for hot) 564 // score|createdAt|uri (for top) 565 // createdAt|uri (for new) 566 parts := strings.Split(string(decoded), "|") 567 568 switch sort { 569 case "new": 570 // Cursor format: createdAt|uri 571 if len(parts) != 2 { 572 return "", nil, fmt.Errorf("invalid cursor format for new sort") 573 } 574 575 createdAt := parts[0] 576 uri := parts[1] 577 578 // Validate AT-URI format 579 if !strings.HasPrefix(uri, "at://") { 580 return "", nil, fmt.Errorf("invalid cursor URI") 581 } 582 583 filter := `AND (c.created_at < $3 OR (c.created_at = $3 AND c.uri < $4))` 584 return filter, []interface{}{createdAt, uri}, nil 585 586 case "top": 587 // Cursor format: score|createdAt|uri 588 if len(parts) != 3 { 589 return "", nil, fmt.Errorf("invalid cursor format for top sort") 590 } 591 592 scoreStr := parts[0] 593 createdAt := parts[1] 594 uri := parts[2] 595 596 // Parse score as integer 597 score := 0 598 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil { 599 return "", nil, fmt.Errorf("invalid cursor score") 600 } 601 602 // Validate AT-URI format 603 if !strings.HasPrefix(uri, "at://") { 604 return "", nil, fmt.Errorf("invalid cursor URI") 605 } 606 607 filter := `AND (c.score < $3 OR (c.score = $3 AND c.created_at < $4) OR (c.score = $3 AND c.created_at = $4 AND c.uri < $5))` 608 return filter, []interface{}{score, createdAt, uri}, nil 609 610 case "hot": 611 // Cursor format: hotRank|score|createdAt|uri 612 if len(parts) != 4 { 613 return "", nil, fmt.Errorf("invalid cursor format for hot sort") 614 } 615 616 hotRankStr := parts[0] 617 scoreStr := parts[1] 618 createdAt := parts[2] 619 uri := parts[3] 620 621 // Parse hot_rank as float 622 hotRank := 0.0 623 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil { 624 return "", nil, fmt.Errorf("invalid cursor hot rank") 625 } 626 627 // Parse score as integer 628 score := 0 629 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil { 630 return "", nil, fmt.Errorf("invalid cursor score") 631 } 632 633 // Validate AT-URI format 634 if !strings.HasPrefix(uri, "at://") { 635 return "", nil, fmt.Errorf("invalid cursor URI") 636 } 637 638 // Use computed hot_rank expression in comparison 639 hotRankExpr := `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8)` 640 filter := fmt.Sprintf(`AND ((%s < $3 OR (%s = $3 AND c.score < $4) OR (%s = $3 AND c.score = $4 AND c.created_at < $5) OR (%s = $3 AND c.score = $4 AND c.created_at = $5 AND c.uri < $6)) AND c.uri != $7)`, 641 hotRankExpr, hotRankExpr, hotRankExpr, hotRankExpr) 642 return filter, []interface{}{hotRank, score, createdAt, uri, uri}, nil 643 644 default: 645 return "", nil, nil 646 } 647} 648 649// buildCommentCursor creates pagination cursor from last comment 650func (r *postgresCommentRepo) buildCommentCursor(comment *comments.Comment, sort string, hotRank float64) string { 651 var cursorStr string 652 const delimiter = "|" 653 654 switch sort { 655 case "new": 656 // Format: createdAt|uri 657 cursorStr = fmt.Sprintf("%s%s%s", 658 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), 659 delimiter, 660 comment.URI) 661 662 case "top": 663 // Format: score|createdAt|uri 664 cursorStr = fmt.Sprintf("%d%s%s%s%s", 665 comment.Score, 666 delimiter, 667 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), 668 delimiter, 669 comment.URI) 670 671 case "hot": 672 // Format: hotRank|score|createdAt|uri 673 cursorStr = fmt.Sprintf("%f%s%d%s%s%s%s", 674 hotRank, 675 delimiter, 676 comment.Score, 677 delimiter, 678 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), 679 delimiter, 680 comment.URI) 681 682 default: 683 cursorStr = comment.URI 684 } 685 686 return base64.URLEncoding.EncodeToString([]byte(cursorStr)) 687} 688 689// GetByURIsBatch retrieves multiple comments by their AT-URIs in a single query 690// Returns map[uri]*Comment for efficient lookups without N+1 queries 691func (r *postgresCommentRepo) GetByURIsBatch(ctx context.Context, uris []string) (map[string]*comments.Comment, error) { 692 if len(uris) == 0 { 693 return make(map[string]*comments.Comment), nil 694 } 695 696 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events) 697 // COALESCE falls back to DID when handle is NULL (user not yet in users table) 698 query := ` 699 SELECT 700 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 701 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 702 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 703 c.created_at, c.indexed_at, c.deleted_at, 704 c.upvote_count, c.downvote_count, c.score, c.reply_count, 705 COALESCE(u.handle, c.commenter_did) as author_handle 706 FROM comments c 707 LEFT JOIN users u ON c.commenter_did = u.did 708 WHERE c.uri = ANY($1) AND c.deleted_at IS NULL 709 ` 710 711 rows, err := r.db.QueryContext(ctx, query, pq.Array(uris)) 712 if err != nil { 713 return nil, fmt.Errorf("failed to batch get comments by URIs: %w", err) 714 } 715 defer func() { 716 if err := rows.Close(); err != nil { 717 log.Printf("Failed to close rows: %v", err) 718 } 719 }() 720 721 result := make(map[string]*comments.Comment) 722 for rows.Next() { 723 var comment comments.Comment 724 var langs pq.StringArray 725 var authorHandle string 726 727 err := rows.Scan( 728 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 729 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 730 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 731 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 732 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 733 &authorHandle, 734 ) 735 if err != nil { 736 return nil, fmt.Errorf("failed to scan comment: %w", err) 737 } 738 739 comment.Langs = langs 740 result[comment.URI] = &comment 741 } 742 743 if err = rows.Err(); err != nil { 744 return nil, fmt.Errorf("error iterating comments: %w", err) 745 } 746 747 return result, nil 748} 749 750// ListByParentsBatch retrieves direct replies to multiple parents in a single query 751// Groups results by parent URI to prevent N+1 queries when loading nested replies 752// Uses window functions to limit results per parent efficiently 753func (r *postgresCommentRepo) ListByParentsBatch( 754 ctx context.Context, 755 parentURIs []string, 756 sort string, 757 limitPerParent int, 758) (map[string][]*comments.Comment, error) { 759 if len(parentURIs) == 0 { 760 return make(map[string][]*comments.Comment), nil 761 } 762 763 // Build ORDER BY clause based on sort type 764 // windowOrderBy must inline expressions (can't use SELECT aliases in window functions) 765 var windowOrderBy string 766 var selectClause string 767 switch sort { 768 case "hot": 769 selectClause = ` 770 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 771 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 772 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 773 c.created_at, c.indexed_at, c.deleted_at, 774 c.upvote_count, c.downvote_count, c.score, c.reply_count, 775 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank, 776 COALESCE(u.handle, c.commenter_did) as author_handle` 777 // CRITICAL: Must inline hot_rank formula - PostgreSQL doesn't allow SELECT aliases in window ORDER BY 778 windowOrderBy = `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) DESC, c.score DESC, c.created_at DESC` 779 case "top": 780 selectClause = ` 781 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 782 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 783 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 784 c.created_at, c.indexed_at, c.deleted_at, 785 c.upvote_count, c.downvote_count, c.score, c.reply_count, 786 NULL::numeric as hot_rank, 787 COALESCE(u.handle, c.commenter_did) as author_handle` 788 windowOrderBy = `c.score DESC, c.created_at DESC` 789 case "new": 790 selectClause = ` 791 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 792 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 793 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 794 c.created_at, c.indexed_at, c.deleted_at, 795 c.upvote_count, c.downvote_count, c.score, c.reply_count, 796 NULL::numeric as hot_rank, 797 COALESCE(u.handle, c.commenter_did) as author_handle` 798 windowOrderBy = `c.created_at DESC` 799 default: 800 // Default to hot 801 selectClause = ` 802 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 803 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 804 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 805 c.created_at, c.indexed_at, c.deleted_at, 806 c.upvote_count, c.downvote_count, c.score, c.reply_count, 807 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank, 808 COALESCE(u.handle, c.commenter_did) as author_handle` 809 // CRITICAL: Must inline hot_rank formula - PostgreSQL doesn't allow SELECT aliases in window ORDER BY 810 windowOrderBy = `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) DESC, c.score DESC, c.created_at DESC` 811 } 812 813 // Use window function to limit results per parent 814 // This is more efficient than LIMIT in a subquery per parent 815 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events) 816 query := fmt.Sprintf(` 817 WITH ranked_comments AS ( 818 SELECT 819 %s, 820 ROW_NUMBER() OVER ( 821 PARTITION BY c.parent_uri 822 ORDER BY %s 823 ) as rn 824 FROM comments c 825 LEFT JOIN users u ON c.commenter_did = u.did 826 WHERE c.parent_uri = ANY($1) AND c.deleted_at IS NULL 827 ) 828 SELECT 829 id, uri, cid, rkey, commenter_did, 830 root_uri, root_cid, parent_uri, parent_cid, 831 content, content_facets, embed, content_labels, langs, 832 created_at, indexed_at, deleted_at, 833 upvote_count, downvote_count, score, reply_count, 834 hot_rank, author_handle 835 FROM ranked_comments 836 WHERE rn <= $2 837 ORDER BY parent_uri, rn 838 `, selectClause, windowOrderBy) 839 840 rows, err := r.db.QueryContext(ctx, query, pq.Array(parentURIs), limitPerParent) 841 if err != nil { 842 return nil, fmt.Errorf("failed to batch query comments by parents: %w", err) 843 } 844 defer func() { 845 if err := rows.Close(); err != nil { 846 log.Printf("Failed to close rows: %v", err) 847 } 848 }() 849 850 // Group results by parent URI 851 result := make(map[string][]*comments.Comment) 852 for rows.Next() { 853 var comment comments.Comment 854 var langs pq.StringArray 855 var hotRank sql.NullFloat64 856 var authorHandle string 857 858 err := rows.Scan( 859 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 860 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 861 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 862 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 863 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 864 &hotRank, &authorHandle, 865 ) 866 if err != nil { 867 return nil, fmt.Errorf("failed to scan comment: %w", err) 868 } 869 870 comment.Langs = langs 871 comment.CommenterHandle = authorHandle 872 873 // Group by parent URI 874 result[comment.ParentURI] = append(result[comment.ParentURI], &comment) 875 } 876 877 if err = rows.Err(); err != nil { 878 return nil, fmt.Errorf("error iterating comments: %w", err) 879 } 880 881 return result, nil 882} 883 884// GetVoteStateForComments retrieves the viewer's votes on a batch of comments 885// Returns map[commentURI]*Vote for efficient lookups 886// Note: This implementation is prepared for when the votes table indexing is implemented 887// Currently returns an empty map as votes may not be fully indexed yet 888func (r *postgresCommentRepo) GetVoteStateForComments(ctx context.Context, viewerDID string, commentURIs []string) (map[string]interface{}, error) { 889 if len(commentURIs) == 0 || viewerDID == "" { 890 return make(map[string]interface{}), nil 891 } 892 893 // Query votes table for viewer's votes on these comments 894 // Note: This assumes votes table exists and is being indexed 895 // If votes table doesn't exist yet, this query will fail gracefully 896 query := ` 897 SELECT subject_uri, direction, uri 898 FROM votes 899 WHERE voter_did = $1 AND subject_uri = ANY($2) AND deleted_at IS NULL 900 ` 901 902 rows, err := r.db.QueryContext(ctx, query, viewerDID, pq.Array(commentURIs)) 903 if err != nil { 904 // If votes table doesn't exist yet, return empty map instead of error 905 // This allows the API to work before votes indexing is fully implemented 906 if strings.Contains(err.Error(), "does not exist") { 907 return make(map[string]interface{}), nil 908 } 909 return nil, fmt.Errorf("failed to get vote state for comments: %w", err) 910 } 911 defer func() { 912 if err := rows.Close(); err != nil { 913 log.Printf("Failed to close rows: %v", err) 914 } 915 }() 916 917 // Build result map with vote information 918 result := make(map[string]interface{}) 919 for rows.Next() { 920 var subjectURI, direction, uri string 921 922 err := rows.Scan(&subjectURI, &direction, &uri) 923 if err != nil { 924 return nil, fmt.Errorf("failed to scan vote: %w", err) 925 } 926 927 // Store vote info as a simple map (can be enhanced later with proper Vote struct) 928 result[subjectURI] = map[string]interface{}{ 929 "direction": direction, 930 "uri": uri, 931 } 932 } 933 934 if err = rows.Err(); err != nil { 935 return nil, fmt.Errorf("error iterating votes: %w", err) 936 } 937 938 return result, nil 939}