A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "Coves/internal/core/comments" 5 "context" 6 "database/sql" 7 "encoding/base64" 8 "fmt" 9 "log" 10 "strings" 11 12 "github.com/lib/pq" 13) 14 15type postgresCommentRepo struct { 16 db *sql.DB 17} 18 19// NewCommentRepository creates a new PostgreSQL comment repository 20func NewCommentRepository(db *sql.DB) comments.Repository { 21 return &postgresCommentRepo{db: db} 22} 23 24// Create inserts a new comment into the comments table 25// Called by Jetstream consumer after comment is created on PDS 26// Idempotent: Returns success if comment already exists (for Jetstream replays) 27func (r *postgresCommentRepo) Create(ctx context.Context, comment *comments.Comment) error { 28 query := ` 29 INSERT INTO comments ( 30 uri, cid, rkey, commenter_did, 31 root_uri, root_cid, parent_uri, parent_cid, 32 content, content_facets, embed, content_labels, langs, 33 created_at, indexed_at 34 ) VALUES ( 35 $1, $2, $3, $4, 36 $5, $6, $7, $8, 37 $9, $10, $11, $12, $13, 38 $14, NOW() 39 ) 40 ON CONFLICT (uri) DO NOTHING 41 RETURNING id, indexed_at 42 ` 43 44 err := r.db.QueryRowContext( 45 ctx, query, 46 comment.URI, comment.CID, comment.RKey, comment.CommenterDID, 47 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID, 48 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs), 49 comment.CreatedAt, 50 ).Scan(&comment.ID, &comment.IndexedAt) 51 52 // ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent) 53 if err == sql.ErrNoRows { 54 return nil // Comment already exists, no error for idempotency 55 } 56 57 if err != nil { 58 // Check for unique constraint violation 59 if strings.Contains(err.Error(), "duplicate key") { 60 return comments.ErrCommentAlreadyExists 61 } 62 63 return fmt.Errorf("failed to insert comment: %w", err) 64 } 65 66 return nil 67} 68 69// Update modifies an existing comment's content fields 70// Called by Jetstream consumer after comment is updated on PDS 71// Preserves vote counts and created_at timestamp 72func (r *postgresCommentRepo) Update(ctx context.Context, comment *comments.Comment) error { 73 query := ` 74 UPDATE comments 75 SET 76 cid = $1, 77 content = $2, 78 content_facets = $3, 79 embed = $4, 80 content_labels = $5, 81 langs = $6 82 WHERE uri = $7 AND deleted_at IS NULL 83 RETURNING id, indexed_at, created_at, upvote_count, downvote_count, score, reply_count 84 ` 85 86 err := r.db.QueryRowContext( 87 ctx, query, 88 comment.CID, 89 comment.Content, 90 comment.ContentFacets, 91 comment.Embed, 92 comment.ContentLabels, 93 pq.Array(comment.Langs), 94 comment.URI, 95 ).Scan( 96 &comment.ID, 97 &comment.IndexedAt, 98 &comment.CreatedAt, 99 &comment.UpvoteCount, 100 &comment.DownvoteCount, 101 &comment.Score, 102 &comment.ReplyCount, 103 ) 104 105 if err == sql.ErrNoRows { 106 return comments.ErrCommentNotFound 107 } 108 if err != nil { 109 return fmt.Errorf("failed to update comment: %w", err) 110 } 111 112 return nil 113} 114 115// GetByURI retrieves a comment by its AT-URI 116// Used by Jetstream consumer for UPDATE/DELETE operations 117func (r *postgresCommentRepo) GetByURI(ctx context.Context, uri string) (*comments.Comment, error) { 118 query := ` 119 SELECT 120 id, uri, cid, rkey, commenter_did, 121 root_uri, root_cid, parent_uri, parent_cid, 122 content, content_facets, embed, content_labels, langs, 123 created_at, indexed_at, deleted_at, 124 upvote_count, downvote_count, score, reply_count 125 FROM comments 126 WHERE uri = $1 127 ` 128 129 var comment comments.Comment 130 var langs pq.StringArray 131 132 err := r.db.QueryRowContext(ctx, query, uri).Scan( 133 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 134 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 135 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 136 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 137 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 138 ) 139 140 if err == sql.ErrNoRows { 141 return nil, comments.ErrCommentNotFound 142 } 143 if err != nil { 144 return nil, fmt.Errorf("failed to get comment by URI: %w", err) 145 } 146 147 comment.Langs = langs 148 149 return &comment, nil 150} 151 152// Delete soft-deletes a comment (sets deleted_at) 153// Called by Jetstream consumer after comment is deleted from PDS 154// Idempotent: Returns success if comment already deleted 155func (r *postgresCommentRepo) Delete(ctx context.Context, uri string) error { 156 query := ` 157 UPDATE comments 158 SET deleted_at = NOW() 159 WHERE uri = $1 AND deleted_at IS NULL 160 ` 161 162 result, err := r.db.ExecContext(ctx, query, uri) 163 if err != nil { 164 return fmt.Errorf("failed to delete comment: %w", err) 165 } 166 167 rowsAffected, err := result.RowsAffected() 168 if err != nil { 169 return fmt.Errorf("failed to check delete result: %w", err) 170 } 171 172 // Idempotent: If no rows affected, comment already deleted (OK for Jetstream replays) 173 if rowsAffected == 0 { 174 return nil 175 } 176 177 return nil 178} 179 180// ListByRoot retrieves all active comments in a thread (flat) 181// Used for fetching entire comment threads on posts 182func (r *postgresCommentRepo) ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*comments.Comment, error) { 183 query := ` 184 SELECT 185 id, uri, cid, rkey, commenter_did, 186 root_uri, root_cid, parent_uri, parent_cid, 187 content, content_facets, embed, content_labels, langs, 188 created_at, indexed_at, deleted_at, 189 upvote_count, downvote_count, score, reply_count 190 FROM comments 191 WHERE root_uri = $1 AND deleted_at IS NULL 192 ORDER BY created_at ASC 193 LIMIT $2 OFFSET $3 194 ` 195 196 rows, err := r.db.QueryContext(ctx, query, rootURI, limit, offset) 197 if err != nil { 198 return nil, fmt.Errorf("failed to list comments by root: %w", err) 199 } 200 defer func() { 201 if err := rows.Close(); err != nil { 202 log.Printf("Failed to close rows: %v", err) 203 } 204 }() 205 206 var result []*comments.Comment 207 for rows.Next() { 208 var comment comments.Comment 209 var langs pq.StringArray 210 211 err := rows.Scan( 212 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 213 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 214 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 215 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 216 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 217 ) 218 if err != nil { 219 return nil, fmt.Errorf("failed to scan comment: %w", err) 220 } 221 222 comment.Langs = langs 223 result = append(result, &comment) 224 } 225 226 if err = rows.Err(); err != nil { 227 return nil, fmt.Errorf("error iterating comments: %w", err) 228 } 229 230 return result, nil 231} 232 233// ListByParent retrieves direct replies to a post or comment 234// Used for building nested/threaded comment views 235func (r *postgresCommentRepo) ListByParent(ctx context.Context, parentURI string, limit, offset int) ([]*comments.Comment, error) { 236 query := ` 237 SELECT 238 id, uri, cid, rkey, commenter_did, 239 root_uri, root_cid, parent_uri, parent_cid, 240 content, content_facets, embed, content_labels, langs, 241 created_at, indexed_at, deleted_at, 242 upvote_count, downvote_count, score, reply_count 243 FROM comments 244 WHERE parent_uri = $1 AND deleted_at IS NULL 245 ORDER BY created_at ASC 246 LIMIT $2 OFFSET $3 247 ` 248 249 rows, err := r.db.QueryContext(ctx, query, parentURI, limit, offset) 250 if err != nil { 251 return nil, fmt.Errorf("failed to list comments by parent: %w", err) 252 } 253 defer func() { 254 if err := rows.Close(); err != nil { 255 log.Printf("Failed to close rows: %v", err) 256 } 257 }() 258 259 var result []*comments.Comment 260 for rows.Next() { 261 var comment comments.Comment 262 var langs pq.StringArray 263 264 err := rows.Scan( 265 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 266 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 267 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 268 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 269 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 270 ) 271 if err != nil { 272 return nil, fmt.Errorf("failed to scan comment: %w", err) 273 } 274 275 comment.Langs = langs 276 result = append(result, &comment) 277 } 278 279 if err = rows.Err(); err != nil { 280 return nil, fmt.Errorf("error iterating comments: %w", err) 281 } 282 283 return result, nil 284} 285 286// CountByParent counts direct replies to a post or comment 287// Used for showing reply counts in threading UI 288func (r *postgresCommentRepo) CountByParent(ctx context.Context, parentURI string) (int, error) { 289 query := ` 290 SELECT COUNT(*) 291 FROM comments 292 WHERE parent_uri = $1 AND deleted_at IS NULL 293 ` 294 295 var count int 296 err := r.db.QueryRowContext(ctx, query, parentURI).Scan(&count) 297 if err != nil { 298 return 0, fmt.Errorf("failed to count comments by parent: %w", err) 299 } 300 301 return count, nil 302} 303 304// ListByCommenter retrieves all active comments by a specific user 305// Future: Used for user comment history 306func (r *postgresCommentRepo) ListByCommenter(ctx context.Context, commenterDID string, limit, offset int) ([]*comments.Comment, error) { 307 query := ` 308 SELECT 309 id, uri, cid, rkey, commenter_did, 310 root_uri, root_cid, parent_uri, parent_cid, 311 content, content_facets, embed, content_labels, langs, 312 created_at, indexed_at, deleted_at, 313 upvote_count, downvote_count, score, reply_count 314 FROM comments 315 WHERE commenter_did = $1 AND deleted_at IS NULL 316 ORDER BY created_at DESC 317 LIMIT $2 OFFSET $3 318 ` 319 320 rows, err := r.db.QueryContext(ctx, query, commenterDID, limit, offset) 321 if err != nil { 322 return nil, fmt.Errorf("failed to list comments by commenter: %w", err) 323 } 324 defer func() { 325 if err := rows.Close(); err != nil { 326 log.Printf("Failed to close rows: %v", err) 327 } 328 }() 329 330 var result []*comments.Comment 331 for rows.Next() { 332 var comment comments.Comment 333 var langs pq.StringArray 334 335 err := rows.Scan( 336 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 337 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 338 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 339 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 340 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 341 ) 342 if err != nil { 343 return nil, fmt.Errorf("failed to scan comment: %w", err) 344 } 345 346 comment.Langs = langs 347 result = append(result, &comment) 348 } 349 350 if err = rows.Err(); err != nil { 351 return nil, fmt.Errorf("error iterating comments: %w", err) 352 } 353 354 return result, nil 355} 356// ListByParentWithHotRank retrieves direct replies to a post or comment with sorting and pagination 357// Supports three sort modes: hot (Lemmy algorithm), top (by score + timeframe), and new (by created_at) 358// Uses cursor-based pagination with composite keys for consistent ordering 359// Hydrates author info (handle, display_name, avatar) via JOIN with users table 360func (r *postgresCommentRepo) ListByParentWithHotRank( 361 ctx context.Context, 362 parentURI string, 363 sort string, 364 timeframe string, 365 limit int, 366 cursor *string, 367) ([]*comments.Comment, *string, error) { 368 // Build ORDER BY clause and time filter based on sort type 369 orderBy, timeFilter := r.buildCommentSortClause(sort, timeframe) 370 371 // Parse cursor for pagination 372 cursorFilter, cursorValues, err := r.parseCommentCursor(cursor, sort) 373 if err != nil { 374 return nil, nil, fmt.Errorf("invalid cursor: %w", err) 375 } 376 377 // Build SELECT clause - compute hot_rank for "hot" sort 378 // Hot rank formula (Lemmy algorithm): 379 // log(greatest(2, score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600) + 2), 1.8) 380 // 381 // This formula: 382 // - Gives logarithmic weight to score (prevents high-score dominance) 383 // - Decays over time with power 1.8 (faster than linear, slower than quadratic) 384 // - Uses hours as time unit (3600 seconds) 385 // - Adds constants to prevent division by zero and ensure positive values 386 var selectClause string 387 if sort == "hot" { 388 selectClause = ` 389 SELECT 390 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 391 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 392 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 393 c.created_at, c.indexed_at, c.deleted_at, 394 c.upvote_count, c.downvote_count, c.score, c.reply_count, 395 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank, 396 u.handle as author_handle 397 FROM comments c` 398 } else { 399 selectClause = ` 400 SELECT 401 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 402 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 403 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 404 c.created_at, c.indexed_at, c.deleted_at, 405 c.upvote_count, c.downvote_count, c.score, c.reply_count, 406 NULL::numeric as hot_rank, 407 u.handle as author_handle 408 FROM comments c` 409 } 410 411 // Build complete query with JOINs and filters 412 query := fmt.Sprintf(` 413 %s 414 INNER JOIN users u ON c.commenter_did = u.did 415 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL 416 %s 417 %s 418 ORDER BY %s 419 LIMIT $2 420 `, selectClause, timeFilter, cursorFilter, orderBy) 421 422 // Prepare query arguments 423 args := []interface{}{parentURI, limit + 1} // +1 to detect next page 424 args = append(args, cursorValues...) 425 426 // Execute query 427 rows, err := r.db.QueryContext(ctx, query, args...) 428 if err != nil { 429 return nil, nil, fmt.Errorf("failed to query comments with hot rank: %w", err) 430 } 431 defer func() { 432 if err := rows.Close(); err != nil { 433 log.Printf("Failed to close rows: %v", err) 434 } 435 }() 436 437 // Scan results 438 var result []*comments.Comment 439 var hotRanks []float64 440 for rows.Next() { 441 var comment comments.Comment 442 var langs pq.StringArray 443 var hotRank sql.NullFloat64 444 var authorHandle string 445 446 err := rows.Scan( 447 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 448 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 449 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 450 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 451 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 452 &hotRank, &authorHandle, 453 ) 454 if err != nil { 455 return nil, nil, fmt.Errorf("failed to scan comment: %w", err) 456 } 457 458 comment.Langs = langs 459 comment.CommenterHandle = authorHandle 460 461 // Store hot_rank for cursor building 462 hotRankValue := 0.0 463 if hotRank.Valid { 464 hotRankValue = hotRank.Float64 465 } 466 hotRanks = append(hotRanks, hotRankValue) 467 468 result = append(result, &comment) 469 } 470 471 if err = rows.Err(); err != nil { 472 return nil, nil, fmt.Errorf("error iterating comments: %w", err) 473 } 474 475 // Handle pagination cursor 476 var nextCursor *string 477 if len(result) > limit && limit > 0 { 478 result = result[:limit] 479 hotRanks = hotRanks[:limit] 480 lastComment := result[len(result)-1] 481 lastHotRank := hotRanks[len(hotRanks)-1] 482 cursorStr := r.buildCommentCursor(lastComment, sort, lastHotRank) 483 nextCursor = &cursorStr 484 } 485 486 return result, nextCursor, nil 487} 488 489// buildCommentSortClause returns the ORDER BY SQL and optional time filter 490func (r *postgresCommentRepo) buildCommentSortClause(sort, timeframe string) (string, string) { 491 var orderBy string 492 switch sort { 493 case "hot": 494 // Hot rank DESC, then score DESC as tiebreaker, then created_at DESC, then uri DESC 495 orderBy = `hot_rank DESC, c.score DESC, c.created_at DESC, c.uri DESC` 496 case "top": 497 // Score DESC, then created_at DESC, then uri DESC 498 orderBy = `c.score DESC, c.created_at DESC, c.uri DESC` 499 case "new": 500 // Created at DESC, then uri DESC 501 orderBy = `c.created_at DESC, c.uri DESC` 502 default: 503 // Default to hot 504 orderBy = `hot_rank DESC, c.score DESC, c.created_at DESC, c.uri DESC` 505 } 506 507 // Add time filter for "top" sort 508 var timeFilter string 509 if sort == "top" { 510 timeFilter = r.buildCommentTimeFilter(timeframe) 511 } 512 513 return orderBy, timeFilter 514} 515 516// buildCommentTimeFilter returns SQL filter for timeframe 517func (r *postgresCommentRepo) buildCommentTimeFilter(timeframe string) string { 518 if timeframe == "" || timeframe == "all" { 519 return "" 520 } 521 522 var interval string 523 switch timeframe { 524 case "hour": 525 interval = "1 hour" 526 case "day": 527 interval = "1 day" 528 case "week": 529 interval = "7 days" 530 case "month": 531 interval = "30 days" 532 case "year": 533 interval = "1 year" 534 default: 535 return "" 536 } 537 538 return fmt.Sprintf("AND c.created_at >= NOW() - INTERVAL '%s'", interval) 539} 540 541// parseCommentCursor decodes pagination cursor for comments 542func (r *postgresCommentRepo) parseCommentCursor(cursor *string, sort string) (string, []interface{}, error) { 543 if cursor == nil || *cursor == "" { 544 return "", nil, nil 545 } 546 547 // Decode base64 cursor 548 decoded, err := base64.URLEncoding.DecodeString(*cursor) 549 if err != nil { 550 return "", nil, fmt.Errorf("invalid cursor encoding") 551 } 552 553 // Parse cursor based on sort type using | delimiter 554 // Format: hotRank|score|createdAt|uri (for hot) 555 // score|createdAt|uri (for top) 556 // createdAt|uri (for new) 557 parts := strings.Split(string(decoded), "|") 558 559 switch sort { 560 case "new": 561 // Cursor format: createdAt|uri 562 if len(parts) != 2 { 563 return "", nil, fmt.Errorf("invalid cursor format for new sort") 564 } 565 566 createdAt := parts[0] 567 uri := parts[1] 568 569 // Validate AT-URI format 570 if !strings.HasPrefix(uri, "at://") { 571 return "", nil, fmt.Errorf("invalid cursor URI") 572 } 573 574 filter := `AND (c.created_at < $3 OR (c.created_at = $3 AND c.uri < $4))` 575 return filter, []interface{}{createdAt, uri}, nil 576 577 case "top": 578 // Cursor format: score|createdAt|uri 579 if len(parts) != 3 { 580 return "", nil, fmt.Errorf("invalid cursor format for top sort") 581 } 582 583 scoreStr := parts[0] 584 createdAt := parts[1] 585 uri := parts[2] 586 587 // Parse score as integer 588 score := 0 589 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil { 590 return "", nil, fmt.Errorf("invalid cursor score") 591 } 592 593 // Validate AT-URI format 594 if !strings.HasPrefix(uri, "at://") { 595 return "", nil, fmt.Errorf("invalid cursor URI") 596 } 597 598 filter := `AND (c.score < $3 OR (c.score = $3 AND c.created_at < $4) OR (c.score = $3 AND c.created_at = $4 AND c.uri < $5))` 599 return filter, []interface{}{score, createdAt, uri}, nil 600 601 case "hot": 602 // Cursor format: hotRank|score|createdAt|uri 603 if len(parts) != 4 { 604 return "", nil, fmt.Errorf("invalid cursor format for hot sort") 605 } 606 607 hotRankStr := parts[0] 608 scoreStr := parts[1] 609 createdAt := parts[2] 610 uri := parts[3] 611 612 // Parse hot_rank as float 613 hotRank := 0.0 614 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil { 615 return "", nil, fmt.Errorf("invalid cursor hot rank") 616 } 617 618 // Parse score as integer 619 score := 0 620 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil { 621 return "", nil, fmt.Errorf("invalid cursor score") 622 } 623 624 // Validate AT-URI format 625 if !strings.HasPrefix(uri, "at://") { 626 return "", nil, fmt.Errorf("invalid cursor URI") 627 } 628 629 // Use computed hot_rank expression in comparison 630 hotRankExpr := `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8)` 631 filter := fmt.Sprintf(`AND ((%s < $3 OR (%s = $3 AND c.score < $4) OR (%s = $3 AND c.score = $4 AND c.created_at < $5) OR (%s = $3 AND c.score = $4 AND c.created_at = $5 AND c.uri < $6)) AND c.uri != $7)`, 632 hotRankExpr, hotRankExpr, hotRankExpr, hotRankExpr) 633 return filter, []interface{}{hotRank, score, createdAt, uri, uri}, nil 634 635 default: 636 return "", nil, nil 637 } 638} 639 640// buildCommentCursor creates pagination cursor from last comment 641func (r *postgresCommentRepo) buildCommentCursor(comment *comments.Comment, sort string, hotRank float64) string { 642 var cursorStr string 643 const delimiter = "|" 644 645 switch sort { 646 case "new": 647 // Format: createdAt|uri 648 cursorStr = fmt.Sprintf("%s%s%s", 649 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), 650 delimiter, 651 comment.URI) 652 653 case "top": 654 // Format: score|createdAt|uri 655 cursorStr = fmt.Sprintf("%d%s%s%s%s", 656 comment.Score, 657 delimiter, 658 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), 659 delimiter, 660 comment.URI) 661 662 case "hot": 663 // Format: hotRank|score|createdAt|uri 664 cursorStr = fmt.Sprintf("%f%s%d%s%s%s%s", 665 hotRank, 666 delimiter, 667 comment.Score, 668 delimiter, 669 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"), 670 delimiter, 671 comment.URI) 672 673 default: 674 cursorStr = comment.URI 675 } 676 677 return base64.URLEncoding.EncodeToString([]byte(cursorStr)) 678} 679 680// GetByURIsBatch retrieves multiple comments by their AT-URIs in a single query 681// Returns map[uri]*Comment for efficient lookups without N+1 queries 682func (r *postgresCommentRepo) GetByURIsBatch(ctx context.Context, uris []string) (map[string]*comments.Comment, error) { 683 if len(uris) == 0 { 684 return make(map[string]*comments.Comment), nil 685 } 686 687 query := ` 688 SELECT 689 c.id, c.uri, c.cid, c.rkey, c.commenter_did, 690 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid, 691 c.content, c.content_facets, c.embed, c.content_labels, c.langs, 692 c.created_at, c.indexed_at, c.deleted_at, 693 c.upvote_count, c.downvote_count, c.score, c.reply_count, 694 u.handle as author_handle 695 FROM comments c 696 INNER JOIN users u ON c.commenter_did = u.did 697 WHERE c.uri = ANY($1) AND c.deleted_at IS NULL 698 ` 699 700 rows, err := r.db.QueryContext(ctx, query, pq.Array(uris)) 701 if err != nil { 702 return nil, fmt.Errorf("failed to batch get comments by URIs: %w", err) 703 } 704 defer func() { 705 if err := rows.Close(); err != nil { 706 log.Printf("Failed to close rows: %v", err) 707 } 708 }() 709 710 result := make(map[string]*comments.Comment) 711 for rows.Next() { 712 var comment comments.Comment 713 var langs pq.StringArray 714 var authorHandle string 715 716 err := rows.Scan( 717 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 718 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 719 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 720 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 721 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 722 &authorHandle, 723 ) 724 if err != nil { 725 return nil, fmt.Errorf("failed to scan comment: %w", err) 726 } 727 728 comment.Langs = langs 729 result[comment.URI] = &comment 730 } 731 732 if err = rows.Err(); err != nil { 733 return nil, fmt.Errorf("error iterating comments: %w", err) 734 } 735 736 return result, nil 737} 738 739// GetVoteStateForComments retrieves the viewer's votes on a batch of comments 740// Returns map[commentURI]*Vote for efficient lookups 741// Note: This implementation is prepared for when the votes table indexing is implemented 742// Currently returns an empty map as votes may not be fully indexed yet 743func (r *postgresCommentRepo) GetVoteStateForComments(ctx context.Context, viewerDID string, commentURIs []string) (map[string]interface{}, error) { 744 if len(commentURIs) == 0 || viewerDID == "" { 745 return make(map[string]interface{}), nil 746 } 747 748 // Query votes table for viewer's votes on these comments 749 // Note: This assumes votes table exists and is being indexed 750 // If votes table doesn't exist yet, this query will fail gracefully 751 query := ` 752 SELECT subject_uri, direction, uri, cid, created_at 753 FROM votes 754 WHERE voter_did = $1 AND subject_uri = ANY($2) AND deleted_at IS NULL 755 ` 756 757 rows, err := r.db.QueryContext(ctx, query, viewerDID, pq.Array(commentURIs)) 758 if err != nil { 759 // If votes table doesn't exist yet, return empty map instead of error 760 // This allows the API to work before votes indexing is fully implemented 761 if strings.Contains(err.Error(), "does not exist") { 762 return make(map[string]interface{}), nil 763 } 764 return nil, fmt.Errorf("failed to get vote state for comments: %w", err) 765 } 766 defer func() { 767 if err := rows.Close(); err != nil { 768 log.Printf("Failed to close rows: %v", err) 769 } 770 }() 771 772 // Build result map with vote information 773 result := make(map[string]interface{}) 774 for rows.Next() { 775 var subjectURI, direction, uri, cid string 776 var createdAt sql.NullTime 777 778 err := rows.Scan(&subjectURI, &direction, &uri, &cid, &createdAt) 779 if err != nil { 780 return nil, fmt.Errorf("failed to scan vote: %w", err) 781 } 782 783 // Store vote info as a simple map (can be enhanced later with proper Vote struct) 784 result[subjectURI] = map[string]interface{}{ 785 "direction": direction, 786 "uri": uri, 787 "cid": cid, 788 "createdAt": createdAt.Time, 789 } 790 } 791 792 if err = rows.Err(); err != nil { 793 return nil, fmt.Errorf("error iterating votes: %w", err) 794 } 795 796 return result, nil 797}