A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "Coves/internal/core/comments" 5 "context" 6 "database/sql" 7 "encoding/base64" 8 "fmt" 9 "log" 10 "strings" 11 12 "github.com/lib/pq" 13) 14 15type postgresCommentRepo struct { 16 db *sql.DB 17} 18 19// NewCommentRepository creates a new PostgreSQL comment repository 20func NewCommentRepository(db *sql.DB) comments.Repository { 21 return &postgresCommentRepo{db: db} 22} 23 24// Create inserts a new comment into the comments table 25// Called by Jetstream consumer after comment is created on PDS 26// Idempotent: Returns success if comment already exists (for Jetstream replays) 27func (r *postgresCommentRepo) Create(ctx context.Context, comment *comments.Comment) error { 28 query := ` 29 INSERT INTO comments ( 30 uri, cid, rkey, commenter_did, 31 root_uri, root_cid, parent_uri, parent_cid, 32 content, content_facets, embed, content_labels, langs, 33 created_at, indexed_at 34 ) VALUES ( 35 $1, $2, $3, $4, 36 $5, $6, $7, $8, 37 $9, $10, $11, $12, $13, 38 $14, NOW() 39 ) 40 ON CONFLICT (uri) DO NOTHING 41 RETURNING id, indexed_at 42 ` 43 44 err := r.db.QueryRowContext( 45 ctx, query, 46 comment.URI, comment.CID, comment.RKey, comment.CommenterDID, 47 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID, 48 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs), 49 comment.CreatedAt, 50 ).Scan(&comment.ID, &comment.IndexedAt) 51 52 // ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent) 53 if err == sql.ErrNoRows { 54 return nil // Comment already exists, no error for idempotency 55 } 56 57 if err != nil { 58 // Check for unique constraint violation 59 if strings.Contains(err.Error(), "duplicate key") { 60 return comments.ErrCommentAlreadyExists 61 } 62 63 return fmt.Errorf("failed to insert comment: %w", err) 64 } 65 66 return nil 67} 68 69// Update modifies an existing comment's content fields 70// Called by Jetstream consumer after comment is updated on PDS 71// Preserves vote counts and created_at timestamp 72func (r *postgresCommentRepo) Update(ctx context.Context, comment *comments.Comment) error { 73 query := ` 74 UPDATE comments 75 SET 76 cid = $1, 77 content = $2, 78 content_facets = $3, 79 embed = $4, 80 content_labels = $5, 81 langs = $6 82 WHERE uri = $7 AND deleted_at IS NULL 83 RETURNING id, indexed_at, created_at, upvote_count, downvote_count, score, reply_count 84 ` 85 86 err := r.db.QueryRowContext( 87 ctx, query, 88 comment.CID, 89 comment.Content, 90 comment.ContentFacets, 91 comment.Embed, 92 comment.ContentLabels, 93 pq.Array(comment.Langs), 94 comment.URI, 95 ).Scan( 96 &comment.ID, 97 &comment.IndexedAt, 98 &comment.CreatedAt, 99 &comment.UpvoteCount, 100 &comment.DownvoteCount, 101 &comment.Score, 102 &comment.ReplyCount, 103 ) 104 105 if err == sql.ErrNoRows { 106 return comments.ErrCommentNotFound 107 } 108 if err != nil { 109 return fmt.Errorf("failed to update comment: %w", err) 110 } 111 112 return nil 113} 114 115// GetByURI retrieves a comment by its AT-URI 116// Used by Jetstream consumer for UPDATE/DELETE operations 117func (r *postgresCommentRepo) GetByURI(ctx context.Context, uri string) (*comments.Comment, error) { 118 query := ` 119 SELECT 120 id, uri, cid, rkey, commenter_did, 121 root_uri, root_cid, parent_uri, parent_cid, 122 content, content_facets, embed, content_labels, langs, 123 created_at, indexed_at, deleted_at, 124 upvote_count, downvote_count, score, reply_count 125 FROM comments 126 WHERE uri = $1 127 ` 128 129 var comment comments.Comment 130 var langs pq.StringArray 131 132 err := r.db.QueryRowContext(ctx, query, uri).Scan( 133 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 134 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 135 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 136 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 137 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 138 ) 139 140 if err == sql.ErrNoRows { 141 return nil, comments.ErrCommentNotFound 142 } 143 if err != nil { 144 return nil, fmt.Errorf("failed to get comment by URI: %w", err) 145 } 146 147 comment.Langs = langs 148 149 return &comment, nil 150} 151 152// Delete soft-deletes a comment (sets deleted_at) 153// Called by Jetstream consumer after comment is deleted from PDS 154// Idempotent: Returns success if comment already deleted 155func (r *postgresCommentRepo) Delete(ctx context.Context, uri string) error { 156 query := ` 157 UPDATE comments 158 SET deleted_at = NOW() 159 WHERE uri = $1 AND deleted_at IS NULL 160 ` 161 162 result, err := r.db.ExecContext(ctx, query, uri) 163 if err != nil { 164 return fmt.Errorf("failed to delete comment: %w", err) 165 } 166 167 rowsAffected, err := result.RowsAffected() 168 if err != nil { 169 return fmt.Errorf("failed to check delete result: %w", err) 170 } 171 172 // Idempotent: If no rows affected, comment already deleted (OK for Jetstream replays) 173 if rowsAffected == 0 { 174 return nil 175 } 176 177 return nil 178} 179 180// ListByRoot retrieves all active comments in a thread (flat) 181// Used for fetching entire comment threads on posts 182func (r *postgresCommentRepo) ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*comments.Comment, error) { 183 query := ` 184 SELECT 185 id, uri, cid, rkey, commenter_did, 186 root_uri, root_cid, parent_uri, parent_cid, 187 content, content_facets, embed, content_labels, langs, 188 created_at, indexed_at, deleted_at, 189 upvote_count, downvote_count, score, reply_count 190 FROM comments 191 WHERE root_uri = $1 AND deleted_at IS NULL 192 ORDER BY created_at ASC 193 LIMIT $2 OFFSET $3 194 ` 195 196 rows, err := r.db.QueryContext(ctx, query, rootURI, limit, offset) 197 if err != nil { 198 return nil, fmt.Errorf("failed to list comments by root: %w", err) 199 } 200 defer func() { 201 if err := rows.Close(); err != nil { 202 log.Printf("Failed to close rows: %v", err) 203 } 204 }() 205 206 var result []*comments.Comment 207 for rows.Next() { 208 var comment comments.Comment 209 var langs pq.StringArray 210 211 err := rows.Scan( 212 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 213 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 214 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 215 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 216 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 217 ) 218 if err != nil { 219 return nil, fmt.Errorf("failed to scan comment: %w", err) 220 } 221 222 comment.Langs = langs 223 result = append(result, &comment) 224 } 225 226 if err = rows.Err(); err != nil { 227 return nil, fmt.Errorf("error iterating comments: %w", err) 228 } 229 230 return result, nil 231} 232 233// ListByParent retrieves direct replies to a post or comment 234// Used for building nested/threaded comment views 235func (r *postgresCommentRepo) ListByParent(ctx context.Context, parentURI string, limit, offset int) ([]*comments.Comment, error) { 236 query := ` 237 SELECT 238 id, uri, cid, rkey, commenter_did, 239 root_uri, root_cid, parent_uri, parent_cid, 240 content, content_facets, embed, content_labels, langs, 241 created_at, indexed_at, deleted_at, 242 upvote_count, downvote_count, score, reply_count 243 FROM comments 244 WHERE parent_uri = $1 AND deleted_at IS NULL 245 ORDER BY created_at ASC 246 LIMIT $2 OFFSET $3 247 ` 248 249 rows, err := r.db.QueryContext(ctx, query, parentURI, limit, offset) 250 if err != nil { 251 return nil, fmt.Errorf("failed to list comments by parent: %w", err) 252 } 253 defer func() { 254 if err := rows.Close(); err != nil { 255 log.Printf("Failed to close rows: %v", err) 256 } 257 }() 258 259 var result []*comments.Comment 260 for rows.Next() { 261 var comment comments.Comment 262 var langs pq.StringArray 263 264 err := rows.Scan( 265 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 266 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 267 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 268 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 269 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 270 ) 271 if err != nil { 272 return nil, fmt.Errorf("failed to scan comment: %w", err) 273 } 274 275 comment.Langs = langs 276 result = append(result, &comment) 277 } 278 279 if err = rows.Err(); err != nil { 280 return nil, fmt.Errorf("error iterating comments: %w", err) 281 } 282 283 return result, nil 284} 285 286// CountByParent counts direct replies to a post or comment 287// Used for showing reply counts in threading UI 288func (r *postgresCommentRepo) CountByParent(ctx context.Context, parentURI string) (int, error) { 289 query := ` 290 SELECT COUNT(*) 291 FROM comments 292 WHERE parent_uri = $1 AND deleted_at IS NULL 293 ` 294 295 var count int 296 err := r.db.QueryRowContext(ctx, query, parentURI).Scan(&count) 297 if err != nil { 298 return 0, fmt.Errorf("failed to count comments by parent: %w", err) 299 } 300 301 return count, nil 302} 303 304// ListByCommenter retrieves all active comments by a specific user 305// Future: Used for user comment history 306func (r *postgresCommentRepo) ListByCommenter(ctx context.Context, commenterDID string, limit, offset int) ([]*comments.Comment, error) { 307 query := ` 308 SELECT 309 id, uri, cid, rkey, commenter_did, 310 root_uri, root_cid, parent_uri, parent_cid, 311 content, content_facets, embed, content_labels, langs, 312 created_at, indexed_at, deleted_at, 313 upvote_count, downvote_count, score, reply_count 314 FROM comments 315 WHERE commenter_did = $1 AND deleted_at IS NULL 316 ORDER BY created_at DESC 317 LIMIT $2 OFFSET $3 318 ` 319 320 rows, err := r.db.QueryContext(ctx, query, commenterDID, limit, offset) 321 if err != nil { 322 return nil, fmt.Errorf("failed to list comments by commenter: %w", err) 323 } 324 defer func() { 325 if err := rows.Close(); err != nil { 326 log.Printf("Failed to close rows: %v", err) 327 } 328 }() 329 330 var result []*comments.Comment 331 for rows.Next() { 332 var comment comments.Comment 333 var langs pq.StringArray 334 335 err := rows.Scan( 336 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 337 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 338 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 339 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 340 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 341 ) 342 if err != nil { 343 return nil, fmt.Errorf("failed to scan comment: %w", err) 344 } 345 346 comment.Langs = langs 347 result = append(result, &comment) 348 } 349 350 if err = rows.Err(); err != nil { 351 return nil, fmt.Errorf("error iterating comments: %w", err) 352 } 353 354 return result, nil 355} 356 357// ListByParentWithHotRank retrieves direct replies to a post or comment with sorting and pagination 358// Supports three sort modes: hot (Lemmy algorithm), top (by score + timeframe), and new (by created_at) 359// Uses cursor-based pagination with composite keys for consistent ordering 360// Hydrates author info (handle, display_name, avatar) via JOIN with users table 361func (r *postgresCommentRepo) ListByParentWithHotRank( 362 ctx context.Context, 363 parentURI string, 364 sort string, 365 timeframe string, 366 limit int, 367 cursor *string, 368) ([]*comments.Comment, *string, error) { 369 // Build ORDER BY clause and time filter based on sort type 370 orderBy, timeFilter := r.buildCommentSortClause(sort, timeframe) 371 372 // Parse cursor for pagination 373 cursorFilter, cursorValues, err := r.parseCommentCursor(cursor, sort) 374 if err != nil { 375 return nil, nil, fmt.Errorf("invalid cursor: %w", err) 376 } 377 378 // Build SELECT clause - compute hot_rank for "hot" sort 379 // Hot rank formula (Lemmy algorithm): 380 // log(greatest(2, score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600) + 2), 1.8) 381 // 382 // This formula: 383 // - Gives logarithmic weight to score (prevents high-score dominance) 384 // - Decays over time with power 1.8 (faster than linear, slower than quadratic) 385 // - Uses hours as time unit (3600 seconds) 386 // - Adds constants to prevent division by zero and ensure positive values 387 var selectClause string 388 if sort == "hot" { 389 selectClause = ` 390 SELECT 391 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 392 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 393 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 394 c.created_at, c.indexed_at, c.deleted_at, 395 c.upvote_count, c.downvote_count, c.score, c.reply_count, 396 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank, 397 COALESCE(u.handle, c.commenter_did) as author_handle 398 FROM comments c` 399 } else { 400 selectClause = ` 401 SELECT 402 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 403 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 404 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 405 c.created_at, c.indexed_at, c.deleted_at, 406 c.upvote_count, c.downvote_count, c.score, c.reply_count, 407 NULL::numeric as hot_rank, 408 COALESCE(u.handle, c.commenter_did) as author_handle 409 FROM comments c` 410 } 411 412 // Build complete query with JOINs and filters 413 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events) 414 query := fmt.Sprintf(` 415 %s 416 LEFT JOIN users u ON c.commenter_did = u.did 417 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL 418 %s 419 %s 420 ORDER BY %s 421 LIMIT $2 422 `, selectClause, timeFilter, cursorFilter, orderBy) 423 424 // Prepare query arguments 425 args := []interface{}{parentURI, limit + 1} // +1 to detect next page 426 args = append(args, cursorValues...) 427 428 // Execute query 429 rows, err := r.db.QueryContext(ctx, query, args...) 430 if err != nil { 431 return nil, nil, fmt.Errorf("failed to query comments with hot rank: %w", err) 432 } 433 defer func() { 434 if err := rows.Close(); err != nil { 435 log.Printf("Failed to close rows: %v", err) 436 } 437 }() 438 439 // Scan results 440 var result []*comments.Comment 441 var hotRanks []float64 442 for rows.Next() { 443 var comment comments.Comment 444 var langs pq.StringArray 445 var hotRank sql.NullFloat64 446 var authorHandle string 447 448 err := rows.Scan( 449 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 450 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 451 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 452 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 453 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 454 &hotRank, &authorHandle, 455 ) 456 if err != nil { 457 return nil, nil, fmt.Errorf("failed to scan comment: %w", err) 458 } 459 460 comment.Langs = langs 461 comment.CommenterHandle = authorHandle 462 463 // Store hot_rank for cursor building 464 hotRankValue := 0.0 465 if hotRank.Valid { 466 hotRankValue = hotRank.Float64 467 } 468 hotRanks = append(hotRanks, hotRankValue) 469 470 result = append(result, &comment) 471 } 472 473 if err = rows.Err(); err != nil { 474 return nil, nil, fmt.Errorf("error iterating comments: %w", err) 475 } 476 477 // Handle pagination cursor 478 var nextCursor *string 479 if len(result) > limit && limit > 0 { 480 result = result[:limit] 481 hotRanks = hotRanks[:limit] 482 lastComment := result[len(result)-1] 483 lastHotRank := hotRanks[len(hotRanks)-1] 484 cursorStr := r.buildCommentCursor(lastComment, sort, lastHotRank) 485 nextCursor = &cursorStr 486 } 487 488 return result, nextCursor, nil 489} 490 491// buildCommentSortClause returns the ORDER BY SQL and optional time filter 492func (r *postgresCommentRepo) buildCommentSortClause(sort, timeframe string) (string, string) { 493 var orderBy string 494 switch sort { 495 case "hot": 496 // Hot rank DESC, then score DESC as tiebreaker, then created_at DESC, then uri DESC 497 orderBy = `hot_rank DESC, c.score DESC, c.created_at DESC, c.uri DESC` 498 case "top": 499 // Score DESC, then created_at DESC, then uri DESC 500 orderBy = `c.score DESC, c.created_at DESC, c.uri DESC` 501 case "new": 502 // Created at DESC, then uri DESC 503 orderBy = `c.created_at DESC, c.uri DESC` 504 default: 505 // Default to hot 506 orderBy = `hot_rank DESC, c.score DESC, c.created_at DESC, c.uri DESC` 507 } 508 509 // Add time filter for "top" sort 510 var timeFilter string 511 if sort == "top" { 512 timeFilter = r.buildCommentTimeFilter(timeframe) 513 } 514 515 return orderBy, timeFilter 516} 517 518// buildCommentTimeFilter returns SQL filter for timeframe 519func (r *postgresCommentRepo) buildCommentTimeFilter(timeframe string) string { 520 if timeframe == "" || timeframe == "all" { 521 return "" 522 } 523 524 var interval string 525 switch timeframe { 526 case "hour": 527 interval = "1 hour" 528 case "day": 529 interval = "1 day" 530 case "week": 531 interval = "7 days" 532 case "month": 533 interval = "30 days" 534 case "year": 535 interval = "1 year" 536 default: 537 return "" 538 } 539 540 return fmt.Sprintf("AND c.created_at >= NOW() - INTERVAL '%s'", interval) 541} 542 543// parseCommentCursor decodes pagination cursor for comments 544func (r *postgresCommentRepo) parseCommentCursor(cursor *string, sort string) (string, []interface{}, error) { 545 if cursor == nil || *cursor == "" { 546 return "", nil, nil 547 } 548 549 // Validate cursor size to prevent DoS via massive base64 strings 550 const maxCursorSize = 1024 551 if len(*cursor) > maxCursorSize { 552 return "", nil, fmt.Errorf("cursor too large: maximum %d bytes", maxCursorSize) 553 } 554 555 // Decode base64 cursor 556 decoded, err := base64.URLEncoding.DecodeString(*cursor) 557 if err != nil { 558 return "", nil, fmt.Errorf("invalid cursor encoding") 559 } 560 561 // Parse cursor based on sort type using | delimiter 562 // Format: hotRank|score|createdAt|uri (for hot) 563 // score|createdAt|uri (for top) 564 // createdAt|uri (for new) 565 parts := strings.Split(string(decoded), "|") 566 567 switch sort { 568 case "new": 569 // Cursor format: createdAt|uri 570 if len(parts) != 2 { 571 return "", nil, fmt.Errorf("invalid cursor format for new sort") 572 } 573 574 createdAt := parts[0] 575 uri := parts[1] 576 577 // Validate AT-URI format 578 if !strings.HasPrefix(uri, "at://") { 579 return "", nil, fmt.Errorf("invalid cursor URI") 580 } 581 582 filter := `AND (c.created_at < $3 OR (c.created_at = $3 AND c.uri < $4))` 583 return filter, []interface{}{createdAt, uri}, nil 584 585 case "top": 586 // Cursor format: score|createdAt|uri 587 if len(parts) != 3 { 588 return "", nil, fmt.Errorf("invalid cursor format for top sort") 589 } 590 591 scoreStr := parts[0] 592 createdAt := parts[1] 593 uri := parts[2] 594 595 // Parse score as integer 596 score := 0 597 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil { 598 return "", nil, fmt.Errorf("invalid cursor score") 599 } 600 601 // Validate AT-URI format 602 if !strings.HasPrefix(uri, "at://") { 603 return "", nil, fmt.Errorf("invalid cursor URI") 604 } 605 606 filter := `AND (c.score < $3 OR (c.score = $3 AND c.created_at < $4) OR (c.score = $3 AND c.created_at = $4 AND c.uri < $5))` 607 return filter, []interface{}{score, createdAt, uri}, nil 608 609 case "hot": 610 // Cursor format: hotRank|score|createdAt|uri 611 if len(parts) != 4 { 612 return "", nil, fmt.Errorf("invalid cursor format for hot sort") 613 } 614 615 hotRankStr := parts[0] 616 scoreStr := parts[1] 617 createdAt := parts[2] 618 uri := parts[3] 619 620 // Parse hot_rank as float 621 hotRank := 0.0 622 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil { 623 return "", nil, fmt.Errorf("invalid cursor hot rank") 624 } 625 626 // Parse score as integer 627 score := 0 628 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil { 629 return "", nil, fmt.Errorf("invalid cursor score") 630 } 631 632 // Validate AT-URI format 633 if !strings.HasPrefix(uri, "at://") { 634 return "", nil, fmt.Errorf("invalid cursor URI") 635 } 636 637 // Use computed hot_rank expression in comparison 638 hotRankExpr := `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8)` 639 filter := fmt.Sprintf(`AND ((%s < $3 OR (%s = $3 AND c.score < $4) OR (%s = $3 AND c.score = $4 AND c.created_at < $5) OR (%s = $3 AND c.score = $4 AND c.created_at = $5 AND c.uri < $6)) AND c.uri != $7)`, 640 hotRankExpr, hotRankExpr, hotRankExpr, hotRankExpr) 641 return filter, []interface{}{hotRank, score, createdAt, uri, uri}, nil 642 643 default: 644 return "", nil, nil 645 } 646} 647 648// buildCommentCursor creates pagination cursor from last comment 649func (r *postgresCommentRepo) buildCommentCursor(comment *comments.Comment, sort string, hotRank float64) string { 650 var cursorStr string 651 const delimiter = "|" 652 653 switch sort { 654 case "new": 655 // Format: createdAt|uri 656 cursorStr = fmt.Sprintf("%s%s%s", 657 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), 658 delimiter, 659 comment.URI) 660 661 case "top": 662 // Format: score|createdAt|uri 663 cursorStr = fmt.Sprintf("%d%s%s%s%s", 664 comment.Score, 665 delimiter, 666 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), 667 delimiter, 668 comment.URI) 669 670 case "hot": 671 // Format: hotRank|score|createdAt|uri 672 cursorStr = fmt.Sprintf("%f%s%d%s%s%s%s", 673 hotRank, 674 delimiter, 675 comment.Score, 676 delimiter, 677 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), 678 delimiter, 679 comment.URI) 680 681 default: 682 cursorStr = comment.URI 683 } 684 685 return base64.URLEncoding.EncodeToString([]byte(cursorStr)) 686} 687 688// GetByURIsBatch retrieves multiple comments by their AT-URIs in a single query 689// Returns map[uri]*Comment for efficient lookups without N+1 queries 690func (r *postgresCommentRepo) GetByURIsBatch(ctx context.Context, uris []string) (map[string]*comments.Comment, error) { 691 if len(uris) == 0 { 692 return make(map[string]*comments.Comment), nil 693 } 694 695 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events) 696 // COALESCE falls back to DID when handle is NULL (user not yet in users table) 697 query := ` 698 SELECT 699 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 700 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 701 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 702 c.created_at, c.indexed_at, c.deleted_at, 703 c.upvote_count, c.downvote_count, c.score, c.reply_count, 704 COALESCE(u.handle, c.commenter_did) as author_handle 705 FROM comments c 706 LEFT JOIN users u ON c.commenter_did = u.did 707 WHERE c.uri = ANY($1) AND c.deleted_at IS NULL 708 ` 709 710 rows, err := r.db.QueryContext(ctx, query, pq.Array(uris)) 711 if err != nil { 712 return nil, fmt.Errorf("failed to batch get comments by URIs: %w", err) 713 } 714 defer func() { 715 if err := rows.Close(); err != nil { 716 log.Printf("Failed to close rows: %v", err) 717 } 718 }() 719 720 result := make(map[string]*comments.Comment) 721 for rows.Next() { 722 var comment comments.Comment 723 var langs pq.StringArray 724 var authorHandle string 725 726 err := rows.Scan( 727 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 728 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 729 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 730 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 731 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 732 &authorHandle, 733 ) 734 if err != nil { 735 return nil, fmt.Errorf("failed to scan comment: %w", err) 736 } 737 738 comment.Langs = langs 739 result[comment.URI] = &comment 740 } 741 742 if err = rows.Err(); err != nil { 743 return nil, fmt.Errorf("error iterating comments: %w", err) 744 } 745 746 return result, nil 747} 748 749// ListByParentsBatch retrieves direct replies to multiple parents in a single query 750// Groups results by parent URI to prevent N+1 queries when loading nested replies 751// Uses window functions to limit results per parent efficiently 752func (r *postgresCommentRepo) ListByParentsBatch( 753 ctx context.Context, 754 parentURIs []string, 755 sort string, 756 limitPerParent int, 757) (map[string][]*comments.Comment, error) { 758 if len(parentURIs) == 0 { 759 return make(map[string][]*comments.Comment), nil 760 } 761 762 // Build ORDER BY clause based on sort type 763 // windowOrderBy must inline expressions (can't use SELECT aliases in window functions) 764 var windowOrderBy string 765 var selectClause string 766 switch sort { 767 case "hot": 768 selectClause = ` 769 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 770 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 771 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 772 c.created_at, c.indexed_at, c.deleted_at, 773 c.upvote_count, c.downvote_count, c.score, c.reply_count, 774 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank, 775 COALESCE(u.handle, c.commenter_did) as author_handle` 776 // CRITICAL: Must inline hot_rank formula - PostgreSQL doesn't allow SELECT aliases in window ORDER BY 777 windowOrderBy = `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) DESC, c.score DESC, c.created_at DESC` 778 case "top": 779 selectClause = ` 780 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 781 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 782 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 783 c.created_at, c.indexed_at, c.deleted_at, 784 c.upvote_count, c.downvote_count, c.score, c.reply_count, 785 NULL::numeric as hot_rank, 786 COALESCE(u.handle, c.commenter_did) as author_handle` 787 windowOrderBy = `c.score DESC, c.created_at DESC` 788 case "new": 789 selectClause = ` 790 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 791 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 792 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 793 c.created_at, c.indexed_at, c.deleted_at, 794 c.upvote_count, c.downvote_count, c.score, c.reply_count, 795 NULL::numeric as hot_rank, 796 COALESCE(u.handle, c.commenter_did) as author_handle` 797 windowOrderBy = `c.created_at DESC` 798 default: 799 // Default to hot 800 selectClause = ` 801 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 802 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 803 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 804 c.created_at, c.indexed_at, c.deleted_at, 805 c.upvote_count, c.downvote_count, c.score, c.reply_count, 806 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank, 807 COALESCE(u.handle, c.commenter_did) as author_handle` 808 // CRITICAL: Must inline hot_rank formula - PostgreSQL doesn't allow SELECT aliases in window ORDER BY 809 windowOrderBy = `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) DESC, c.score DESC, c.created_at DESC` 810 } 811 812 // Use window function to limit results per parent 813 // This is more efficient than LIMIT in a subquery per parent 814 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events) 815 query := fmt.Sprintf(` 816 WITH ranked_comments AS ( 817 SELECT 818 %s, 819 ROW_NUMBER() OVER ( 820 PARTITION BY c.parent_uri 821 ORDER BY %s 822 ) as rn 823 FROM comments c 824 LEFT JOIN users u ON c.commenter_did = u.did 825 WHERE c.parent_uri = ANY($1) AND c.deleted_at IS NULL 826 ) 827 SELECT 828 id, uri, cid, rkey, commenter_did, 829 root_uri, root_cid, parent_uri, parent_cid, 830 content, content_facets, embed, content_labels, langs, 831 created_at, indexed_at, deleted_at, 832 upvote_count, downvote_count, score, reply_count, 833 hot_rank, author_handle 834 FROM ranked_comments 835 WHERE rn <= $2 836 ORDER BY parent_uri, rn 837 `, selectClause, windowOrderBy) 838 839 rows, err := r.db.QueryContext(ctx, query, pq.Array(parentURIs), limitPerParent) 840 if err != nil { 841 return nil, fmt.Errorf("failed to batch query comments by parents: %w", err) 842 } 843 defer func() { 844 if err := rows.Close(); err != nil { 845 log.Printf("Failed to close rows: %v", err) 846 } 847 }() 848 849 // Group results by parent URI 850 result := make(map[string][]*comments.Comment) 851 for rows.Next() { 852 var comment comments.Comment 853 var langs pq.StringArray 854 var hotRank sql.NullFloat64 855 var authorHandle string 856 857 err := rows.Scan( 858 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 859 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 860 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 861 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 862 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 863 &hotRank, &authorHandle, 864 ) 865 if err != nil { 866 return nil, fmt.Errorf("failed to scan comment: %w", err) 867 } 868 869 comment.Langs = langs 870 comment.CommenterHandle = authorHandle 871 872 // Group by parent URI 873 result[comment.ParentURI] = append(result[comment.ParentURI], &comment) 874 } 875 876 if err = rows.Err(); err != nil { 877 return nil, fmt.Errorf("error iterating comments: %w", err) 878 } 879 880 return result, nil 881} 882 883// GetVoteStateForComments retrieves the viewer's votes on a batch of comments 884// Returns map[commentURI]*Vote for efficient lookups 885// Note: This implementation is prepared for when the votes table indexing is implemented 886// Currently returns an empty map as votes may not be fully indexed yet 887func (r *postgresCommentRepo) GetVoteStateForComments(ctx context.Context, viewerDID string, commentURIs []string) (map[string]interface{}, error) { 888 if len(commentURIs) == 0 || viewerDID == "" { 889 return make(map[string]interface{}), nil 890 } 891 892 // Query votes table for viewer's votes on these comments 893 // Note: This assumes votes table exists and is being indexed 894 // If votes table doesn't exist yet, this query will fail gracefully 895 query := ` 896 SELECT subject_uri, direction, uri 897 FROM votes 898 WHERE voter_did = $1 AND subject_uri = ANY($2) AND deleted_at IS NULL 899 ` 900 901 rows, err := r.db.QueryContext(ctx, query, viewerDID, pq.Array(commentURIs)) 902 if err != nil { 903 // If votes table doesn't exist yet, return empty map instead of error 904 // This allows the API to work before votes indexing is fully implemented 905 if strings.Contains(err.Error(), "does not exist") { 906 return make(map[string]interface{}), nil 907 } 908 return nil, fmt.Errorf("failed to get vote state for comments: %w", err) 909 } 910 defer func() { 911 if err := rows.Close(); err != nil { 912 log.Printf("Failed to close rows: %v", err) 913 } 914 }() 915 916 // Build result map with vote information 917 result := make(map[string]interface{}) 918 for rows.Next() { 919 var subjectURI, direction, uri string 920 921 err := rows.Scan(&subjectURI, &direction, &uri) 922 if err != nil { 923 return nil, fmt.Errorf("failed to scan vote: %w", err) 924 } 925 926 // Store vote info as a simple map (can be enhanced later with proper Vote struct) 927 result[subjectURI] = map[string]interface{}{ 928 "direction": direction, 929 "uri": uri, 930 } 931 } 932 933 if err = rows.Err(); err != nil { 934 return nil, fmt.Errorf("error iterating votes: %w", err) 935 } 936 937 return result, nil 938}