A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/comments"
5 "context"
6 "database/sql"
7 "encoding/base64"
8 "fmt"
9 "log"
10 "strings"
11
12 "github.com/lib/pq"
13)
14
15type postgresCommentRepo struct {
16 db *sql.DB
17}
18
19// NewCommentRepository creates a new PostgreSQL comment repository
20func NewCommentRepository(db *sql.DB) comments.Repository {
21 return &postgresCommentRepo{db: db}
22}
23
24// Create inserts a new comment into the comments table
25// Called by Jetstream consumer after comment is created on PDS
26// Idempotent: Returns success if comment already exists (for Jetstream replays)
27func (r *postgresCommentRepo) Create(ctx context.Context, comment *comments.Comment) error {
28 query := `
29 INSERT INTO comments (
30 uri, cid, rkey, commenter_did,
31 root_uri, root_cid, parent_uri, parent_cid,
32 content, content_facets, embed, content_labels, langs,
33 created_at, indexed_at
34 ) VALUES (
35 $1, $2, $3, $4,
36 $5, $6, $7, $8,
37 $9, $10, $11, $12, $13,
38 $14, NOW()
39 )
40 ON CONFLICT (uri) DO NOTHING
41 RETURNING id, indexed_at
42 `
43
44 err := r.db.QueryRowContext(
45 ctx, query,
46 comment.URI, comment.CID, comment.RKey, comment.CommenterDID,
47 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID,
48 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
49 comment.CreatedAt,
50 ).Scan(&comment.ID, &comment.IndexedAt)
51
52 // ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent)
53 if err == sql.ErrNoRows {
54 return nil // Comment already exists, no error for idempotency
55 }
56
57 if err != nil {
58 // Check for unique constraint violation
59 if strings.Contains(err.Error(), "duplicate key") {
60 return comments.ErrCommentAlreadyExists
61 }
62
63 return fmt.Errorf("failed to insert comment: %w", err)
64 }
65
66 return nil
67}
68
69// Update modifies an existing comment's content fields
70// Called by Jetstream consumer after comment is updated on PDS
71// Preserves vote counts and created_at timestamp
72func (r *postgresCommentRepo) Update(ctx context.Context, comment *comments.Comment) error {
73 query := `
74 UPDATE comments
75 SET
76 cid = $1,
77 content = $2,
78 content_facets = $3,
79 embed = $4,
80 content_labels = $5,
81 langs = $6
82 WHERE uri = $7 AND deleted_at IS NULL
83 RETURNING id, indexed_at, created_at, upvote_count, downvote_count, score, reply_count
84 `
85
86 err := r.db.QueryRowContext(
87 ctx, query,
88 comment.CID,
89 comment.Content,
90 comment.ContentFacets,
91 comment.Embed,
92 comment.ContentLabels,
93 pq.Array(comment.Langs),
94 comment.URI,
95 ).Scan(
96 &comment.ID,
97 &comment.IndexedAt,
98 &comment.CreatedAt,
99 &comment.UpvoteCount,
100 &comment.DownvoteCount,
101 &comment.Score,
102 &comment.ReplyCount,
103 )
104
105 if err == sql.ErrNoRows {
106 return comments.ErrCommentNotFound
107 }
108 if err != nil {
109 return fmt.Errorf("failed to update comment: %w", err)
110 }
111
112 return nil
113}
114
115// GetByURI retrieves a comment by its AT-URI
116// Used by Jetstream consumer for UPDATE/DELETE operations
117func (r *postgresCommentRepo) GetByURI(ctx context.Context, uri string) (*comments.Comment, error) {
118 query := `
119 SELECT
120 id, uri, cid, rkey, commenter_did,
121 root_uri, root_cid, parent_uri, parent_cid,
122 content, content_facets, embed, content_labels, langs,
123 created_at, indexed_at, deleted_at, deletion_reason, deleted_by,
124 upvote_count, downvote_count, score, reply_count
125 FROM comments
126 WHERE uri = $1
127 `
128
129 var comment comments.Comment
130 var langs pq.StringArray
131
132 err := r.db.QueryRowContext(ctx, query, uri).Scan(
133 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
134 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
135 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
136 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
137 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
138 )
139
140 if err == sql.ErrNoRows {
141 return nil, comments.ErrCommentNotFound
142 }
143 if err != nil {
144 return nil, fmt.Errorf("failed to get comment by URI: %w", err)
145 }
146
147 comment.Langs = langs
148
149 return &comment, nil
150}
151
152// Delete soft-deletes a comment (sets deleted_at)
153// Called by Jetstream consumer after comment is deleted from PDS
154// Idempotent: Returns success if comment already deleted
155// Deprecated: Use SoftDeleteWithReason for new code to preserve thread structure
156func (r *postgresCommentRepo) Delete(ctx context.Context, uri string) error {
157 query := `
158 UPDATE comments
159 SET deleted_at = NOW()
160 WHERE uri = $1 AND deleted_at IS NULL
161 `
162
163 result, err := r.db.ExecContext(ctx, query, uri)
164 if err != nil {
165 return fmt.Errorf("failed to delete comment: %w", err)
166 }
167
168 rowsAffected, err := result.RowsAffected()
169 if err != nil {
170 return fmt.Errorf("failed to check delete result: %w", err)
171 }
172
173 // Idempotent: If no rows affected, comment already deleted (OK for Jetstream replays)
174 if rowsAffected == 0 {
175 return nil
176 }
177
178 return nil
179}
180
181// SoftDeleteWithReason performs a soft delete that blanks content but preserves thread structure
182// This allows deleted comments to appear as "[deleted]" placeholders in thread views
183// Idempotent: Returns success if comment already deleted
184// Validates that reason is a known deletion reason constant
185func (r *postgresCommentRepo) SoftDeleteWithReason(ctx context.Context, uri, reason, deletedByDID string) error {
186 // Validate deletion reason
187 if reason != comments.DeletionReasonAuthor && reason != comments.DeletionReasonModerator {
188 return fmt.Errorf("invalid deletion reason: %s", reason)
189 }
190
191 _, err := r.SoftDeleteWithReasonTx(ctx, nil, uri, reason, deletedByDID)
192 return err
193}
194
195// SoftDeleteWithReasonTx performs a soft delete within an optional transaction
196// If tx is nil, executes directly against the database
197// Returns rows affected count for callers that need to check idempotency
198// This method is used by both the repository and the Jetstream consumer
199func (r *postgresCommentRepo) SoftDeleteWithReasonTx(ctx context.Context, tx *sql.Tx, uri, reason, deletedByDID string) (int64, error) {
200 query := `
201 UPDATE comments
202 SET
203 content = '',
204 content_facets = NULL,
205 embed = NULL,
206 content_labels = NULL,
207 deleted_at = NOW(),
208 deletion_reason = $2,
209 deleted_by = $3
210 WHERE uri = $1 AND deleted_at IS NULL
211 `
212
213 var result sql.Result
214 var err error
215
216 if tx != nil {
217 result, err = tx.ExecContext(ctx, query, uri, reason, deletedByDID)
218 } else {
219 result, err = r.db.ExecContext(ctx, query, uri, reason, deletedByDID)
220 }
221
222 if err != nil {
223 return 0, fmt.Errorf("failed to soft delete comment: %w", err)
224 }
225
226 rowsAffected, err := result.RowsAffected()
227 if err != nil {
228 return 0, fmt.Errorf("failed to check delete result: %w", err)
229 }
230
231 return rowsAffected, nil
232}
233
234// ListByRoot retrieves all comments in a thread (flat), including deleted ones
235// Used for fetching entire comment threads on posts
236// Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders)
237func (r *postgresCommentRepo) ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*comments.Comment, error) {
238 query := `
239 SELECT
240 id, uri, cid, rkey, commenter_did,
241 root_uri, root_cid, parent_uri, parent_cid,
242 content, content_facets, embed, content_labels, langs,
243 created_at, indexed_at, deleted_at, deletion_reason, deleted_by,
244 upvote_count, downvote_count, score, reply_count
245 FROM comments
246 WHERE root_uri = $1
247 ORDER BY created_at ASC
248 LIMIT $2 OFFSET $3
249 `
250
251 rows, err := r.db.QueryContext(ctx, query, rootURI, limit, offset)
252 if err != nil {
253 return nil, fmt.Errorf("failed to list comments by root: %w", err)
254 }
255 defer func() {
256 if err := rows.Close(); err != nil {
257 log.Printf("Failed to close rows: %v", err)
258 }
259 }()
260
261 var result []*comments.Comment
262 for rows.Next() {
263 var comment comments.Comment
264 var langs pq.StringArray
265
266 err := rows.Scan(
267 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
268 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
269 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
270 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
271 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
272 )
273 if err != nil {
274 return nil, fmt.Errorf("failed to scan comment: %w", err)
275 }
276
277 comment.Langs = langs
278 result = append(result, &comment)
279 }
280
281 if err = rows.Err(); err != nil {
282 return nil, fmt.Errorf("error iterating comments: %w", err)
283 }
284
285 return result, nil
286}
287
288// ListByParent retrieves direct replies to a post or comment, including deleted ones
289// Used for building nested/threaded comment views
290// Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders)
291func (r *postgresCommentRepo) ListByParent(ctx context.Context, parentURI string, limit, offset int) ([]*comments.Comment, error) {
292 query := `
293 SELECT
294 id, uri, cid, rkey, commenter_did,
295 root_uri, root_cid, parent_uri, parent_cid,
296 content, content_facets, embed, content_labels, langs,
297 created_at, indexed_at, deleted_at, deletion_reason, deleted_by,
298 upvote_count, downvote_count, score, reply_count
299 FROM comments
300 WHERE parent_uri = $1
301 ORDER BY created_at ASC
302 LIMIT $2 OFFSET $3
303 `
304
305 rows, err := r.db.QueryContext(ctx, query, parentURI, limit, offset)
306 if err != nil {
307 return nil, fmt.Errorf("failed to list comments by parent: %w", err)
308 }
309 defer func() {
310 if err := rows.Close(); err != nil {
311 log.Printf("Failed to close rows: %v", err)
312 }
313 }()
314
315 var result []*comments.Comment
316 for rows.Next() {
317 var comment comments.Comment
318 var langs pq.StringArray
319
320 err := rows.Scan(
321 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
322 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
323 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
324 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
325 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
326 )
327 if err != nil {
328 return nil, fmt.Errorf("failed to scan comment: %w", err)
329 }
330
331 comment.Langs = langs
332 result = append(result, &comment)
333 }
334
335 if err = rows.Err(); err != nil {
336 return nil, fmt.Errorf("error iterating comments: %w", err)
337 }
338
339 return result, nil
340}
341
342// CountByParent counts direct replies to a post or comment
343// Used for showing reply counts in threading UI
344func (r *postgresCommentRepo) CountByParent(ctx context.Context, parentURI string) (int, error) {
345 query := `
346 SELECT COUNT(*)
347 FROM comments
348 WHERE parent_uri = $1 AND deleted_at IS NULL
349 `
350
351 var count int
352 err := r.db.QueryRowContext(ctx, query, parentURI).Scan(&count)
353 if err != nil {
354 return 0, fmt.Errorf("failed to count comments by parent: %w", err)
355 }
356
357 return count, nil
358}
359
360// ListByCommenter retrieves all active comments by a specific user
361// Used for user comment history - filters out deleted comments
362func (r *postgresCommentRepo) ListByCommenter(ctx context.Context, commenterDID string, limit, offset int) ([]*comments.Comment, error) {
363 query := `
364 SELECT
365 id, uri, cid, rkey, commenter_did,
366 root_uri, root_cid, parent_uri, parent_cid,
367 content, content_facets, embed, content_labels, langs,
368 created_at, indexed_at, deleted_at, deletion_reason, deleted_by,
369 upvote_count, downvote_count, score, reply_count
370 FROM comments
371 WHERE commenter_did = $1 AND deleted_at IS NULL
372 ORDER BY created_at DESC
373 LIMIT $2 OFFSET $3
374 `
375
376 rows, err := r.db.QueryContext(ctx, query, commenterDID, limit, offset)
377 if err != nil {
378 return nil, fmt.Errorf("failed to list comments by commenter: %w", err)
379 }
380 defer func() {
381 if err := rows.Close(); err != nil {
382 log.Printf("Failed to close rows: %v", err)
383 }
384 }()
385
386 var result []*comments.Comment
387 for rows.Next() {
388 var comment comments.Comment
389 var langs pq.StringArray
390
391 err := rows.Scan(
392 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
393 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
394 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
395 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
396 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
397 )
398 if err != nil {
399 return nil, fmt.Errorf("failed to scan comment: %w", err)
400 }
401
402 comment.Langs = langs
403 result = append(result, &comment)
404 }
405
406 if err = rows.Err(); err != nil {
407 return nil, fmt.Errorf("error iterating comments: %w", err)
408 }
409
410 return result, nil
411}
412
413// ListByParentWithHotRank retrieves direct replies to a post or comment with sorting and pagination
414// Supports three sort modes: hot (Lemmy algorithm), top (by score + timeframe), and new (by created_at)
415// Uses cursor-based pagination with composite keys for consistent ordering
416// Hydrates author info (handle, display_name, avatar) via JOIN with users table
417func (r *postgresCommentRepo) ListByParentWithHotRank(
418 ctx context.Context,
419 parentURI string,
420 sort string,
421 timeframe string,
422 limit int,
423 cursor *string,
424) ([]*comments.Comment, *string, error) {
425 // Build ORDER BY clause and time filter based on sort type
426 orderBy, timeFilter := r.buildCommentSortClause(sort, timeframe)
427
428 // Parse cursor for pagination
429 cursorFilter, cursorValues, err := r.parseCommentCursor(cursor, sort)
430 if err != nil {
431 return nil, nil, fmt.Errorf("invalid cursor: %w", err)
432 }
433
434 // Build SELECT clause - compute hot_rank for "hot" sort
435 // Hot rank formula (Lemmy algorithm):
436 // log(greatest(2, score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600) + 2), 1.8)
437 //
438 // This formula:
439 // - Gives logarithmic weight to score (prevents high-score dominance)
440 // - Decays over time with power 1.8 (faster than linear, slower than quadratic)
441 // - Uses hours as time unit (3600 seconds)
442 // - Adds constants to prevent division by zero and ensure positive values
443 var selectClause string
444 if sort == "hot" {
445 selectClause = `
446 SELECT
447 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
448 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
449 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
450 c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
451 c.upvote_count, c.downvote_count, c.score, c.reply_count,
452 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank,
453 COALESCE(u.handle, c.commenter_did) as author_handle
454 FROM comments c`
455 } else {
456 selectClause = `
457 SELECT
458 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
459 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
460 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
461 c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
462 c.upvote_count, c.downvote_count, c.score, c.reply_count,
463 NULL::numeric as hot_rank,
464 COALESCE(u.handle, c.commenter_did) as author_handle
465 FROM comments c`
466 }
467
468 // Build complete query with JOINs and filters
469 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events)
470 // Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders)
471 query := fmt.Sprintf(`
472 %s
473 LEFT JOIN users u ON c.commenter_did = u.did
474 WHERE c.parent_uri = $1
475 %s
476 %s
477 ORDER BY %s
478 LIMIT $2
479 `, selectClause, timeFilter, cursorFilter, orderBy)
480
481 // Prepare query arguments
482 args := []interface{}{parentURI, limit + 1} // +1 to detect next page
483 args = append(args, cursorValues...)
484
485 // Execute query
486 rows, err := r.db.QueryContext(ctx, query, args...)
487 if err != nil {
488 return nil, nil, fmt.Errorf("failed to query comments with hot rank: %w", err)
489 }
490 defer func() {
491 if err := rows.Close(); err != nil {
492 log.Printf("Failed to close rows: %v", err)
493 }
494 }()
495
496 // Scan results
497 var result []*comments.Comment
498 var hotRanks []float64
499 for rows.Next() {
500 var comment comments.Comment
501 var langs pq.StringArray
502 var hotRank sql.NullFloat64
503 var authorHandle string
504
505 err := rows.Scan(
506 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
507 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
508 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
509 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
510 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
511 &hotRank, &authorHandle,
512 )
513 if err != nil {
514 return nil, nil, fmt.Errorf("failed to scan comment: %w", err)
515 }
516
517 comment.Langs = langs
518 comment.CommenterHandle = authorHandle
519
520 // Store hot_rank for cursor building
521 hotRankValue := 0.0
522 if hotRank.Valid {
523 hotRankValue = hotRank.Float64
524 }
525 hotRanks = append(hotRanks, hotRankValue)
526
527 result = append(result, &comment)
528 }
529
530 if err = rows.Err(); err != nil {
531 return nil, nil, fmt.Errorf("error iterating comments: %w", err)
532 }
533
534 // Handle pagination cursor
535 var nextCursor *string
536 if len(result) > limit && limit > 0 {
537 result = result[:limit]
538 hotRanks = hotRanks[:limit]
539 lastComment := result[len(result)-1]
540 lastHotRank := hotRanks[len(hotRanks)-1]
541 cursorStr := r.buildCommentCursor(lastComment, sort, lastHotRank)
542 nextCursor = &cursorStr
543 }
544
545 return result, nextCursor, nil
546}
547
548// buildCommentSortClause returns the ORDER BY SQL and optional time filter
549func (r *postgresCommentRepo) buildCommentSortClause(sort, timeframe string) (string, string) {
550 var orderBy string
551 switch sort {
552 case "hot":
553 // Hot rank DESC, then score DESC as tiebreaker, then created_at DESC, then uri DESC
554 orderBy = `hot_rank DESC, c.score DESC, c.created_at DESC, c.uri DESC`
555 case "top":
556 // Score DESC, then created_at DESC, then uri DESC
557 orderBy = `c.score DESC, c.created_at DESC, c.uri DESC`
558 case "new":
559 // Created at DESC, then uri DESC
560 orderBy = `c.created_at DESC, c.uri DESC`
561 default:
562 // Default to hot
563 orderBy = `hot_rank DESC, c.score DESC, c.created_at DESC, c.uri DESC`
564 }
565
566 // Add time filter for "top" sort
567 var timeFilter string
568 if sort == "top" {
569 timeFilter = r.buildCommentTimeFilter(timeframe)
570 }
571
572 return orderBy, timeFilter
573}
574
575// buildCommentTimeFilter returns SQL filter for timeframe
576func (r *postgresCommentRepo) buildCommentTimeFilter(timeframe string) string {
577 if timeframe == "" || timeframe == "all" {
578 return ""
579 }
580
581 var interval string
582 switch timeframe {
583 case "hour":
584 interval = "1 hour"
585 case "day":
586 interval = "1 day"
587 case "week":
588 interval = "7 days"
589 case "month":
590 interval = "30 days"
591 case "year":
592 interval = "1 year"
593 default:
594 return ""
595 }
596
597 return fmt.Sprintf("AND c.created_at >= NOW() - INTERVAL '%s'", interval)
598}
599
600// parseCommentCursor decodes pagination cursor for comments
601func (r *postgresCommentRepo) parseCommentCursor(cursor *string, sort string) (string, []interface{}, error) {
602 if cursor == nil || *cursor == "" {
603 return "", nil, nil
604 }
605
606 // Validate cursor size to prevent DoS via massive base64 strings
607 const maxCursorSize = 1024
608 if len(*cursor) > maxCursorSize {
609 return "", nil, fmt.Errorf("cursor too large: maximum %d bytes", maxCursorSize)
610 }
611
612 // Decode base64 cursor
613 decoded, err := base64.URLEncoding.DecodeString(*cursor)
614 if err != nil {
615 return "", nil, fmt.Errorf("invalid cursor encoding")
616 }
617
618 // Parse cursor based on sort type using | delimiter
619 // Format: hotRank|score|createdAt|uri (for hot)
620 // score|createdAt|uri (for top)
621 // createdAt|uri (for new)
622 parts := strings.Split(string(decoded), "|")
623
624 switch sort {
625 case "new":
626 // Cursor format: createdAt|uri
627 if len(parts) != 2 {
628 return "", nil, fmt.Errorf("invalid cursor format for new sort")
629 }
630
631 createdAt := parts[0]
632 uri := parts[1]
633
634 // Validate AT-URI format
635 if !strings.HasPrefix(uri, "at://") {
636 return "", nil, fmt.Errorf("invalid cursor URI")
637 }
638
639 filter := `AND (c.created_at < $3 OR (c.created_at = $3 AND c.uri < $4))`
640 return filter, []interface{}{createdAt, uri}, nil
641
642 case "top":
643 // Cursor format: score|createdAt|uri
644 if len(parts) != 3 {
645 return "", nil, fmt.Errorf("invalid cursor format for top sort")
646 }
647
648 scoreStr := parts[0]
649 createdAt := parts[1]
650 uri := parts[2]
651
652 // Parse score as integer
653 score := 0
654 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil {
655 return "", nil, fmt.Errorf("invalid cursor score")
656 }
657
658 // Validate AT-URI format
659 if !strings.HasPrefix(uri, "at://") {
660 return "", nil, fmt.Errorf("invalid cursor URI")
661 }
662
663 filter := `AND (c.score < $3 OR (c.score = $3 AND c.created_at < $4) OR (c.score = $3 AND c.created_at = $4 AND c.uri < $5))`
664 return filter, []interface{}{score, createdAt, uri}, nil
665
666 case "hot":
667 // Cursor format: hotRank|score|createdAt|uri
668 if len(parts) != 4 {
669 return "", nil, fmt.Errorf("invalid cursor format for hot sort")
670 }
671
672 hotRankStr := parts[0]
673 scoreStr := parts[1]
674 createdAt := parts[2]
675 uri := parts[3]
676
677 // Parse hot_rank as float
678 hotRank := 0.0
679 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil {
680 return "", nil, fmt.Errorf("invalid cursor hot rank")
681 }
682
683 // Parse score as integer
684 score := 0
685 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil {
686 return "", nil, fmt.Errorf("invalid cursor score")
687 }
688
689 // Validate AT-URI format
690 if !strings.HasPrefix(uri, "at://") {
691 return "", nil, fmt.Errorf("invalid cursor URI")
692 }
693
694 // Use computed hot_rank expression in comparison
695 hotRankExpr := `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8)`
696 filter := fmt.Sprintf(`AND ((%s < $3 OR (%s = $3 AND c.score < $4) OR (%s = $3 AND c.score = $4 AND c.created_at < $5) OR (%s = $3 AND c.score = $4 AND c.created_at = $5 AND c.uri < $6)) AND c.uri != $7)`,
697 hotRankExpr, hotRankExpr, hotRankExpr, hotRankExpr)
698 return filter, []interface{}{hotRank, score, createdAt, uri, uri}, nil
699
700 default:
701 return "", nil, nil
702 }
703}
704
705// buildCommentCursor creates pagination cursor from last comment
706func (r *postgresCommentRepo) buildCommentCursor(comment *comments.Comment, sort string, hotRank float64) string {
707 var cursorStr string
708 const delimiter = "|"
709
710 switch sort {
711 case "new":
712 // Format: createdAt|uri
713 cursorStr = fmt.Sprintf("%s%s%s",
714 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"),
715 delimiter,
716 comment.URI)
717
718 case "top":
719 // Format: score|createdAt|uri
720 cursorStr = fmt.Sprintf("%d%s%s%s%s",
721 comment.Score,
722 delimiter,
723 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"),
724 delimiter,
725 comment.URI)
726
727 case "hot":
728 // Format: hotRank|score|createdAt|uri
729 cursorStr = fmt.Sprintf("%f%s%d%s%s%s%s",
730 hotRank,
731 delimiter,
732 comment.Score,
733 delimiter,
734 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"),
735 delimiter,
736 comment.URI)
737
738 default:
739 cursorStr = comment.URI
740 }
741
742 return base64.URLEncoding.EncodeToString([]byte(cursorStr))
743}
744
745// GetByURIsBatch retrieves multiple comments by their AT-URIs in a single query
746// Returns map[uri]*Comment for efficient lookups without N+1 queries
747// Includes deleted comments to preserve thread structure
748func (r *postgresCommentRepo) GetByURIsBatch(ctx context.Context, uris []string) (map[string]*comments.Comment, error) {
749 if len(uris) == 0 {
750 return make(map[string]*comments.Comment), nil
751 }
752
753 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events)
754 // COALESCE falls back to DID when handle is NULL (user not yet in users table)
755 // Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders)
756 query := `
757 SELECT
758 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
759 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
760 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
761 c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
762 c.upvote_count, c.downvote_count, c.score, c.reply_count,
763 COALESCE(u.handle, c.commenter_did) as author_handle
764 FROM comments c
765 LEFT JOIN users u ON c.commenter_did = u.did
766 WHERE c.uri = ANY($1)
767 `
768
769 rows, err := r.db.QueryContext(ctx, query, pq.Array(uris))
770 if err != nil {
771 return nil, fmt.Errorf("failed to batch get comments by URIs: %w", err)
772 }
773 defer func() {
774 if err := rows.Close(); err != nil {
775 log.Printf("Failed to close rows: %v", err)
776 }
777 }()
778
779 result := make(map[string]*comments.Comment)
780 for rows.Next() {
781 var comment comments.Comment
782 var langs pq.StringArray
783 var authorHandle string
784
785 err := rows.Scan(
786 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
787 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
788 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
789 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
790 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
791 &authorHandle,
792 )
793 if err != nil {
794 return nil, fmt.Errorf("failed to scan comment: %w", err)
795 }
796
797 comment.Langs = langs
798 result[comment.URI] = &comment
799 }
800
801 if err = rows.Err(); err != nil {
802 return nil, fmt.Errorf("error iterating comments: %w", err)
803 }
804
805 return result, nil
806}
807
808// ListByParentsBatch retrieves direct replies to multiple parents in a single query
809// Groups results by parent URI to prevent N+1 queries when loading nested replies
810// Uses window functions to limit results per parent efficiently
811func (r *postgresCommentRepo) ListByParentsBatch(
812 ctx context.Context,
813 parentURIs []string,
814 sort string,
815 limitPerParent int,
816) (map[string][]*comments.Comment, error) {
817 if len(parentURIs) == 0 {
818 return make(map[string][]*comments.Comment), nil
819 }
820
821 // Build ORDER BY clause based on sort type
822 // windowOrderBy must inline expressions (can't use SELECT aliases in window functions)
823 var windowOrderBy string
824 var selectClause string
825 switch sort {
826 case "hot":
827 selectClause = `
828 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
829 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
830 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
831 c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
832 c.upvote_count, c.downvote_count, c.score, c.reply_count,
833 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank,
834 COALESCE(u.handle, c.commenter_did) as author_handle`
835 // CRITICAL: Must inline hot_rank formula - PostgreSQL doesn't allow SELECT aliases in window ORDER BY
836 windowOrderBy = `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) DESC, c.score DESC, c.created_at DESC`
837 case "top":
838 selectClause = `
839 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
840 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
841 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
842 c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
843 c.upvote_count, c.downvote_count, c.score, c.reply_count,
844 NULL::numeric as hot_rank,
845 COALESCE(u.handle, c.commenter_did) as author_handle`
846 windowOrderBy = `c.score DESC, c.created_at DESC`
847 case "new":
848 selectClause = `
849 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
850 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
851 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
852 c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
853 c.upvote_count, c.downvote_count, c.score, c.reply_count,
854 NULL::numeric as hot_rank,
855 COALESCE(u.handle, c.commenter_did) as author_handle`
856 windowOrderBy = `c.created_at DESC`
857 default:
858 // Default to hot
859 selectClause = `
860 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
861 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
862 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
863 c.created_at, c.indexed_at, c.deleted_at, c.deletion_reason, c.deleted_by,
864 c.upvote_count, c.downvote_count, c.score, c.reply_count,
865 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank,
866 COALESCE(u.handle, c.commenter_did) as author_handle`
867 // CRITICAL: Must inline hot_rank formula - PostgreSQL doesn't allow SELECT aliases in window ORDER BY
868 windowOrderBy = `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) DESC, c.score DESC, c.created_at DESC`
869 }
870
871 // Use window function to limit results per parent
872 // This is more efficient than LIMIT in a subquery per parent
873 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events)
874 // Includes deleted comments to preserve thread structure (shown as "[deleted]" placeholders)
875 query := fmt.Sprintf(`
876 WITH ranked_comments AS (
877 SELECT
878 %s,
879 ROW_NUMBER() OVER (
880 PARTITION BY c.parent_uri
881 ORDER BY %s
882 ) as rn
883 FROM comments c
884 LEFT JOIN users u ON c.commenter_did = u.did
885 WHERE c.parent_uri = ANY($1)
886 )
887 SELECT
888 id, uri, cid, rkey, commenter_did,
889 root_uri, root_cid, parent_uri, parent_cid,
890 content, content_facets, embed, content_labels, langs,
891 created_at, indexed_at, deleted_at, deletion_reason, deleted_by,
892 upvote_count, downvote_count, score, reply_count,
893 hot_rank, author_handle
894 FROM ranked_comments
895 WHERE rn <= $2
896 ORDER BY parent_uri, rn
897 `, selectClause, windowOrderBy)
898
899 rows, err := r.db.QueryContext(ctx, query, pq.Array(parentURIs), limitPerParent)
900 if err != nil {
901 return nil, fmt.Errorf("failed to batch query comments by parents: %w", err)
902 }
903 defer func() {
904 if err := rows.Close(); err != nil {
905 log.Printf("Failed to close rows: %v", err)
906 }
907 }()
908
909 // Group results by parent URI
910 result := make(map[string][]*comments.Comment)
911 for rows.Next() {
912 var comment comments.Comment
913 var langs pq.StringArray
914 var hotRank sql.NullFloat64
915 var authorHandle string
916
917 err := rows.Scan(
918 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
919 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
920 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
921 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, &comment.DeletionReason, &comment.DeletedBy,
922 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
923 &hotRank, &authorHandle,
924 )
925 if err != nil {
926 return nil, fmt.Errorf("failed to scan comment: %w", err)
927 }
928
929 comment.Langs = langs
930 comment.CommenterHandle = authorHandle
931
932 // Group by parent URI
933 result[comment.ParentURI] = append(result[comment.ParentURI], &comment)
934 }
935
936 if err = rows.Err(); err != nil {
937 return nil, fmt.Errorf("error iterating comments: %w", err)
938 }
939
940 return result, nil
941}
942
943// GetVoteStateForComments retrieves the viewer's votes on a batch of comments
944// Returns map[commentURI]*Vote for efficient lookups
945// Note: This implementation is prepared for when the votes table indexing is implemented
946// Currently returns an empty map as votes may not be fully indexed yet
947func (r *postgresCommentRepo) GetVoteStateForComments(ctx context.Context, viewerDID string, commentURIs []string) (map[string]interface{}, error) {
948 if len(commentURIs) == 0 || viewerDID == "" {
949 return make(map[string]interface{}), nil
950 }
951
952 // Query votes table for viewer's votes on these comments
953 // Note: This assumes votes table exists and is being indexed
954 // If votes table doesn't exist yet, this query will fail gracefully
955 query := `
956 SELECT subject_uri, direction, uri
957 FROM votes
958 WHERE voter_did = $1 AND subject_uri = ANY($2) AND deleted_at IS NULL
959 `
960
961 rows, err := r.db.QueryContext(ctx, query, viewerDID, pq.Array(commentURIs))
962 if err != nil {
963 // If votes table doesn't exist yet, return empty map instead of error
964 // This allows the API to work before votes indexing is fully implemented
965 if strings.Contains(err.Error(), "does not exist") {
966 return make(map[string]interface{}), nil
967 }
968 return nil, fmt.Errorf("failed to get vote state for comments: %w", err)
969 }
970 defer func() {
971 if err := rows.Close(); err != nil {
972 log.Printf("Failed to close rows: %v", err)
973 }
974 }()
975
976 // Build result map with vote information
977 result := make(map[string]interface{})
978 for rows.Next() {
979 var subjectURI, direction, uri string
980
981 err := rows.Scan(&subjectURI, &direction, &uri)
982 if err != nil {
983 return nil, fmt.Errorf("failed to scan vote: %w", err)
984 }
985
986 // Store vote info as a simple map (can be enhanced later with proper Vote struct)
987 result[subjectURI] = map[string]interface{}{
988 "direction": direction,
989 "uri": uri,
990 }
991 }
992
993 if err = rows.Err(); err != nil {
994 return nil, fmt.Errorf("error iterating votes: %w", err)
995 }
996
997 return result, nil
998}