A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "context"
5 "database/sql"
6 "encoding/base64"
7 "fmt"
8 "log"
9 "strings"
10
11 "Coves/internal/core/comments"
12
13 "github.com/lib/pq"
14)
15
16type postgresCommentRepo struct {
17 db *sql.DB
18}
19
20// NewCommentRepository creates a new PostgreSQL comment repository
21func NewCommentRepository(db *sql.DB) comments.Repository {
22 return &postgresCommentRepo{db: db}
23}
24
25// Create inserts a new comment into the comments table
26// Called by Jetstream consumer after comment is created on PDS
27// Idempotent: Returns success if comment already exists (for Jetstream replays)
28func (r *postgresCommentRepo) Create(ctx context.Context, comment *comments.Comment) error {
29 query := `
30 INSERT INTO comments (
31 uri, cid, rkey, commenter_did,
32 root_uri, root_cid, parent_uri, parent_cid,
33 content, content_facets, embed, content_labels, langs,
34 created_at, indexed_at
35 ) VALUES (
36 $1, $2, $3, $4,
37 $5, $6, $7, $8,
38 $9, $10, $11, $12, $13,
39 $14, NOW()
40 )
41 ON CONFLICT (uri) DO NOTHING
42 RETURNING id, indexed_at
43 `
44
45 err := r.db.QueryRowContext(
46 ctx, query,
47 comment.URI, comment.CID, comment.RKey, comment.CommenterDID,
48 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID,
49 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
50 comment.CreatedAt,
51 ).Scan(&comment.ID, &comment.IndexedAt)
52
53 // ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent)
54 if err == sql.ErrNoRows {
55 return nil // Comment already exists, no error for idempotency
56 }
57
58 if err != nil {
59 // Check for unique constraint violation
60 if strings.Contains(err.Error(), "duplicate key") {
61 return comments.ErrCommentAlreadyExists
62 }
63
64 return fmt.Errorf("failed to insert comment: %w", err)
65 }
66
67 return nil
68}
69
70// Update modifies an existing comment's content fields
71// Called by Jetstream consumer after comment is updated on PDS
72// Preserves vote counts and created_at timestamp
73func (r *postgresCommentRepo) Update(ctx context.Context, comment *comments.Comment) error {
74 query := `
75 UPDATE comments
76 SET
77 cid = $1,
78 content = $2,
79 content_facets = $3,
80 embed = $4,
81 content_labels = $5,
82 langs = $6
83 WHERE uri = $7 AND deleted_at IS NULL
84 RETURNING id, indexed_at, created_at, upvote_count, downvote_count, score, reply_count
85 `
86
87 err := r.db.QueryRowContext(
88 ctx, query,
89 comment.CID,
90 comment.Content,
91 comment.ContentFacets,
92 comment.Embed,
93 comment.ContentLabels,
94 pq.Array(comment.Langs),
95 comment.URI,
96 ).Scan(
97 &comment.ID,
98 &comment.IndexedAt,
99 &comment.CreatedAt,
100 &comment.UpvoteCount,
101 &comment.DownvoteCount,
102 &comment.Score,
103 &comment.ReplyCount,
104 )
105
106 if err == sql.ErrNoRows {
107 return comments.ErrCommentNotFound
108 }
109 if err != nil {
110 return fmt.Errorf("failed to update comment: %w", err)
111 }
112
113 return nil
114}
115
116// GetByURI retrieves a comment by its AT-URI
117// Used by Jetstream consumer for UPDATE/DELETE operations
118func (r *postgresCommentRepo) GetByURI(ctx context.Context, uri string) (*comments.Comment, error) {
119 query := `
120 SELECT
121 id, uri, cid, rkey, commenter_did,
122 root_uri, root_cid, parent_uri, parent_cid,
123 content, content_facets, embed, content_labels, langs,
124 created_at, indexed_at, deleted_at,
125 upvote_count, downvote_count, score, reply_count
126 FROM comments
127 WHERE uri = $1
128 `
129
130 var comment comments.Comment
131 var langs pq.StringArray
132
133 err := r.db.QueryRowContext(ctx, query, uri).Scan(
134 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
135 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
136 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
137 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
138 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
139 )
140
141 if err == sql.ErrNoRows {
142 return nil, comments.ErrCommentNotFound
143 }
144 if err != nil {
145 return nil, fmt.Errorf("failed to get comment by URI: %w", err)
146 }
147
148 comment.Langs = langs
149
150 return &comment, nil
151}
152
153// Delete soft-deletes a comment (sets deleted_at)
154// Called by Jetstream consumer after comment is deleted from PDS
155// Idempotent: Returns success if comment already deleted
156func (r *postgresCommentRepo) Delete(ctx context.Context, uri string) error {
157 query := `
158 UPDATE comments
159 SET deleted_at = NOW()
160 WHERE uri = $1 AND deleted_at IS NULL
161 `
162
163 result, err := r.db.ExecContext(ctx, query, uri)
164 if err != nil {
165 return fmt.Errorf("failed to delete comment: %w", err)
166 }
167
168 rowsAffected, err := result.RowsAffected()
169 if err != nil {
170 return fmt.Errorf("failed to check delete result: %w", err)
171 }
172
173 // Idempotent: If no rows affected, comment already deleted (OK for Jetstream replays)
174 if rowsAffected == 0 {
175 return nil
176 }
177
178 return nil
179}
180
181// ListByRoot retrieves all active comments in a thread (flat)
182// Used for fetching entire comment threads on posts
183func (r *postgresCommentRepo) ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*comments.Comment, error) {
184 query := `
185 SELECT
186 id, uri, cid, rkey, commenter_did,
187 root_uri, root_cid, parent_uri, parent_cid,
188 content, content_facets, embed, content_labels, langs,
189 created_at, indexed_at, deleted_at,
190 upvote_count, downvote_count, score, reply_count
191 FROM comments
192 WHERE root_uri = $1 AND deleted_at IS NULL
193 ORDER BY created_at ASC
194 LIMIT $2 OFFSET $3
195 `
196
197 rows, err := r.db.QueryContext(ctx, query, rootURI, limit, offset)
198 if err != nil {
199 return nil, fmt.Errorf("failed to list comments by root: %w", err)
200 }
201 defer func() {
202 if err := rows.Close(); err != nil {
203 log.Printf("Failed to close rows: %v", err)
204 }
205 }()
206
207 var result []*comments.Comment
208 for rows.Next() {
209 var comment comments.Comment
210 var langs pq.StringArray
211
212 err := rows.Scan(
213 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
214 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
215 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
216 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
217 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
218 )
219 if err != nil {
220 return nil, fmt.Errorf("failed to scan comment: %w", err)
221 }
222
223 comment.Langs = langs
224 result = append(result, &comment)
225 }
226
227 if err = rows.Err(); err != nil {
228 return nil, fmt.Errorf("error iterating comments: %w", err)
229 }
230
231 return result, nil
232}
233
234// ListByParent retrieves direct replies to a post or comment
235// Used for building nested/threaded comment views
236func (r *postgresCommentRepo) ListByParent(ctx context.Context, parentURI string, limit, offset int) ([]*comments.Comment, error) {
237 query := `
238 SELECT
239 id, uri, cid, rkey, commenter_did,
240 root_uri, root_cid, parent_uri, parent_cid,
241 content, content_facets, embed, content_labels, langs,
242 created_at, indexed_at, deleted_at,
243 upvote_count, downvote_count, score, reply_count
244 FROM comments
245 WHERE parent_uri = $1 AND deleted_at IS NULL
246 ORDER BY created_at ASC
247 LIMIT $2 OFFSET $3
248 `
249
250 rows, err := r.db.QueryContext(ctx, query, parentURI, limit, offset)
251 if err != nil {
252 return nil, fmt.Errorf("failed to list comments by parent: %w", err)
253 }
254 defer func() {
255 if err := rows.Close(); err != nil {
256 log.Printf("Failed to close rows: %v", err)
257 }
258 }()
259
260 var result []*comments.Comment
261 for rows.Next() {
262 var comment comments.Comment
263 var langs pq.StringArray
264
265 err := rows.Scan(
266 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
267 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
268 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
269 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
270 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
271 )
272 if err != nil {
273 return nil, fmt.Errorf("failed to scan comment: %w", err)
274 }
275
276 comment.Langs = langs
277 result = append(result, &comment)
278 }
279
280 if err = rows.Err(); err != nil {
281 return nil, fmt.Errorf("error iterating comments: %w", err)
282 }
283
284 return result, nil
285}
286
287// CountByParent counts direct replies to a post or comment
288// Used for showing reply counts in threading UI
289func (r *postgresCommentRepo) CountByParent(ctx context.Context, parentURI string) (int, error) {
290 query := `
291 SELECT COUNT(*)
292 FROM comments
293 WHERE parent_uri = $1 AND deleted_at IS NULL
294 `
295
296 var count int
297 err := r.db.QueryRowContext(ctx, query, parentURI).Scan(&count)
298 if err != nil {
299 return 0, fmt.Errorf("failed to count comments by parent: %w", err)
300 }
301
302 return count, nil
303}
304
305// ListByCommenter retrieves all active comments by a specific user
306// Future: Used for user comment history
307func (r *postgresCommentRepo) ListByCommenter(ctx context.Context, commenterDID string, limit, offset int) ([]*comments.Comment, error) {
308 query := `
309 SELECT
310 id, uri, cid, rkey, commenter_did,
311 root_uri, root_cid, parent_uri, parent_cid,
312 content, content_facets, embed, content_labels, langs,
313 created_at, indexed_at, deleted_at,
314 upvote_count, downvote_count, score, reply_count
315 FROM comments
316 WHERE commenter_did = $1 AND deleted_at IS NULL
317 ORDER BY created_at DESC
318 LIMIT $2 OFFSET $3
319 `
320
321 rows, err := r.db.QueryContext(ctx, query, commenterDID, limit, offset)
322 if err != nil {
323 return nil, fmt.Errorf("failed to list comments by commenter: %w", err)
324 }
325 defer func() {
326 if err := rows.Close(); err != nil {
327 log.Printf("Failed to close rows: %v", err)
328 }
329 }()
330
331 var result []*comments.Comment
332 for rows.Next() {
333 var comment comments.Comment
334 var langs pq.StringArray
335
336 err := rows.Scan(
337 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
338 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
339 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
340 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
341 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
342 )
343 if err != nil {
344 return nil, fmt.Errorf("failed to scan comment: %w", err)
345 }
346
347 comment.Langs = langs
348 result = append(result, &comment)
349 }
350
351 if err = rows.Err(); err != nil {
352 return nil, fmt.Errorf("error iterating comments: %w", err)
353 }
354
355 return result, nil
356}
357
358// ListByParentWithHotRank retrieves direct replies to a post or comment with sorting and pagination
359// Supports three sort modes: hot (Lemmy algorithm), top (by score + timeframe), and new (by created_at)
360// Uses cursor-based pagination with composite keys for consistent ordering
361// Hydrates author info (handle, display_name, avatar) via JOIN with users table
362func (r *postgresCommentRepo) ListByParentWithHotRank(
363 ctx context.Context,
364 parentURI string,
365 sort string,
366 timeframe string,
367 limit int,
368 cursor *string,
369) ([]*comments.Comment, *string, error) {
370 // Build ORDER BY clause and time filter based on sort type
371 orderBy, timeFilter := r.buildCommentSortClause(sort, timeframe)
372
373 // Parse cursor for pagination
374 cursorFilter, cursorValues, err := r.parseCommentCursor(cursor, sort)
375 if err != nil {
376 return nil, nil, fmt.Errorf("invalid cursor: %w", err)
377 }
378
379 // Build SELECT clause - compute hot_rank for "hot" sort
380 // Hot rank formula (Lemmy algorithm):
381 // log(greatest(2, score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600) + 2), 1.8)
382 //
383 // This formula:
384 // - Gives logarithmic weight to score (prevents high-score dominance)
385 // - Decays over time with power 1.8 (faster than linear, slower than quadratic)
386 // - Uses hours as time unit (3600 seconds)
387 // - Adds constants to prevent division by zero and ensure positive values
388 var selectClause string
389 if sort == "hot" {
390 selectClause = `
391 SELECT
392 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
393 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
394 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
395 c.created_at, c.indexed_at, c.deleted_at,
396 c.upvote_count, c.downvote_count, c.score, c.reply_count,
397 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank,
398 COALESCE(u.handle, c.commenter_did) as author_handle
399 FROM comments c`
400 } else {
401 selectClause = `
402 SELECT
403 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
404 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
405 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
406 c.created_at, c.indexed_at, c.deleted_at,
407 c.upvote_count, c.downvote_count, c.score, c.reply_count,
408 NULL::numeric as hot_rank,
409 COALESCE(u.handle, c.commenter_did) as author_handle
410 FROM comments c`
411 }
412
413 // Build complete query with JOINs and filters
414 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events)
415 query := fmt.Sprintf(`
416 %s
417 LEFT JOIN users u ON c.commenter_did = u.did
418 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
419 %s
420 %s
421 ORDER BY %s
422 LIMIT $2
423 `, selectClause, timeFilter, cursorFilter, orderBy)
424
425 // Prepare query arguments
426 args := []interface{}{parentURI, limit + 1} // +1 to detect next page
427 args = append(args, cursorValues...)
428
429 // Execute query
430 rows, err := r.db.QueryContext(ctx, query, args...)
431 if err != nil {
432 return nil, nil, fmt.Errorf("failed to query comments with hot rank: %w", err)
433 }
434 defer func() {
435 if err := rows.Close(); err != nil {
436 log.Printf("Failed to close rows: %v", err)
437 }
438 }()
439
440 // Scan results
441 var result []*comments.Comment
442 var hotRanks []float64
443 for rows.Next() {
444 var comment comments.Comment
445 var langs pq.StringArray
446 var hotRank sql.NullFloat64
447 var authorHandle string
448
449 err := rows.Scan(
450 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
451 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
452 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
453 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
454 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
455 &hotRank, &authorHandle,
456 )
457 if err != nil {
458 return nil, nil, fmt.Errorf("failed to scan comment: %w", err)
459 }
460
461 comment.Langs = langs
462 comment.CommenterHandle = authorHandle
463
464 // Store hot_rank for cursor building
465 hotRankValue := 0.0
466 if hotRank.Valid {
467 hotRankValue = hotRank.Float64
468 }
469 hotRanks = append(hotRanks, hotRankValue)
470
471 result = append(result, &comment)
472 }
473
474 if err = rows.Err(); err != nil {
475 return nil, nil, fmt.Errorf("error iterating comments: %w", err)
476 }
477
478 // Handle pagination cursor
479 var nextCursor *string
480 if len(result) > limit && limit > 0 {
481 result = result[:limit]
482 hotRanks = hotRanks[:limit]
483 lastComment := result[len(result)-1]
484 lastHotRank := hotRanks[len(hotRanks)-1]
485 cursorStr := r.buildCommentCursor(lastComment, sort, lastHotRank)
486 nextCursor = &cursorStr
487 }
488
489 return result, nextCursor, nil
490}
491
492// buildCommentSortClause returns the ORDER BY SQL and optional time filter
493func (r *postgresCommentRepo) buildCommentSortClause(sort, timeframe string) (string, string) {
494 var orderBy string
495 switch sort {
496 case "hot":
497 // Hot rank DESC, then score DESC as tiebreaker, then created_at DESC, then uri DESC
498 orderBy = `hot_rank DESC, c.score DESC, c.created_at DESC, c.uri DESC`
499 case "top":
500 // Score DESC, then created_at DESC, then uri DESC
501 orderBy = `c.score DESC, c.created_at DESC, c.uri DESC`
502 case "new":
503 // Created at DESC, then uri DESC
504 orderBy = `c.created_at DESC, c.uri DESC`
505 default:
506 // Default to hot
507 orderBy = `hot_rank DESC, c.score DESC, c.created_at DESC, c.uri DESC`
508 }
509
510 // Add time filter for "top" sort
511 var timeFilter string
512 if sort == "top" {
513 timeFilter = r.buildCommentTimeFilter(timeframe)
514 }
515
516 return orderBy, timeFilter
517}
518
519// buildCommentTimeFilter returns SQL filter for timeframe
520func (r *postgresCommentRepo) buildCommentTimeFilter(timeframe string) string {
521 if timeframe == "" || timeframe == "all" {
522 return ""
523 }
524
525 var interval string
526 switch timeframe {
527 case "hour":
528 interval = "1 hour"
529 case "day":
530 interval = "1 day"
531 case "week":
532 interval = "7 days"
533 case "month":
534 interval = "30 days"
535 case "year":
536 interval = "1 year"
537 default:
538 return ""
539 }
540
541 return fmt.Sprintf("AND c.created_at >= NOW() - INTERVAL '%s'", interval)
542}
543
544// parseCommentCursor decodes pagination cursor for comments
545func (r *postgresCommentRepo) parseCommentCursor(cursor *string, sort string) (string, []interface{}, error) {
546 if cursor == nil || *cursor == "" {
547 return "", nil, nil
548 }
549
550 // Validate cursor size to prevent DoS via massive base64 strings
551 const maxCursorSize = 1024
552 if len(*cursor) > maxCursorSize {
553 return "", nil, fmt.Errorf("cursor too large: maximum %d bytes", maxCursorSize)
554 }
555
556 // Decode base64 cursor
557 decoded, err := base64.URLEncoding.DecodeString(*cursor)
558 if err != nil {
559 return "", nil, fmt.Errorf("invalid cursor encoding")
560 }
561
562 // Parse cursor based on sort type using | delimiter
563 // Format: hotRank|score|createdAt|uri (for hot)
564 // score|createdAt|uri (for top)
565 // createdAt|uri (for new)
566 parts := strings.Split(string(decoded), "|")
567
568 switch sort {
569 case "new":
570 // Cursor format: createdAt|uri
571 if len(parts) != 2 {
572 return "", nil, fmt.Errorf("invalid cursor format for new sort")
573 }
574
575 createdAt := parts[0]
576 uri := parts[1]
577
578 // Validate AT-URI format
579 if !strings.HasPrefix(uri, "at://") {
580 return "", nil, fmt.Errorf("invalid cursor URI")
581 }
582
583 filter := `AND (c.created_at < $3 OR (c.created_at = $3 AND c.uri < $4))`
584 return filter, []interface{}{createdAt, uri}, nil
585
586 case "top":
587 // Cursor format: score|createdAt|uri
588 if len(parts) != 3 {
589 return "", nil, fmt.Errorf("invalid cursor format for top sort")
590 }
591
592 scoreStr := parts[0]
593 createdAt := parts[1]
594 uri := parts[2]
595
596 // Parse score as integer
597 score := 0
598 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil {
599 return "", nil, fmt.Errorf("invalid cursor score")
600 }
601
602 // Validate AT-URI format
603 if !strings.HasPrefix(uri, "at://") {
604 return "", nil, fmt.Errorf("invalid cursor URI")
605 }
606
607 filter := `AND (c.score < $3 OR (c.score = $3 AND c.created_at < $4) OR (c.score = $3 AND c.created_at = $4 AND c.uri < $5))`
608 return filter, []interface{}{score, createdAt, uri}, nil
609
610 case "hot":
611 // Cursor format: hotRank|score|createdAt|uri
612 if len(parts) != 4 {
613 return "", nil, fmt.Errorf("invalid cursor format for hot sort")
614 }
615
616 hotRankStr := parts[0]
617 scoreStr := parts[1]
618 createdAt := parts[2]
619 uri := parts[3]
620
621 // Parse hot_rank as float
622 hotRank := 0.0
623 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil {
624 return "", nil, fmt.Errorf("invalid cursor hot rank")
625 }
626
627 // Parse score as integer
628 score := 0
629 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil {
630 return "", nil, fmt.Errorf("invalid cursor score")
631 }
632
633 // Validate AT-URI format
634 if !strings.HasPrefix(uri, "at://") {
635 return "", nil, fmt.Errorf("invalid cursor URI")
636 }
637
638 // Use computed hot_rank expression in comparison
639 hotRankExpr := `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8)`
640 filter := fmt.Sprintf(`AND ((%s < $3 OR (%s = $3 AND c.score < $4) OR (%s = $3 AND c.score = $4 AND c.created_at < $5) OR (%s = $3 AND c.score = $4 AND c.created_at = $5 AND c.uri < $6)) AND c.uri != $7)`,
641 hotRankExpr, hotRankExpr, hotRankExpr, hotRankExpr)
642 return filter, []interface{}{hotRank, score, createdAt, uri, uri}, nil
643
644 default:
645 return "", nil, nil
646 }
647}
648
649// buildCommentCursor creates pagination cursor from last comment
650func (r *postgresCommentRepo) buildCommentCursor(comment *comments.Comment, sort string, hotRank float64) string {
651 var cursorStr string
652 const delimiter = "|"
653
654 switch sort {
655 case "new":
656 // Format: createdAt|uri
657 cursorStr = fmt.Sprintf("%s%s%s",
658 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"),
659 delimiter,
660 comment.URI)
661
662 case "top":
663 // Format: score|createdAt|uri
664 cursorStr = fmt.Sprintf("%d%s%s%s%s",
665 comment.Score,
666 delimiter,
667 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"),
668 delimiter,
669 comment.URI)
670
671 case "hot":
672 // Format: hotRank|score|createdAt|uri
673 cursorStr = fmt.Sprintf("%f%s%d%s%s%s%s",
674 hotRank,
675 delimiter,
676 comment.Score,
677 delimiter,
678 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"),
679 delimiter,
680 comment.URI)
681
682 default:
683 cursorStr = comment.URI
684 }
685
686 return base64.URLEncoding.EncodeToString([]byte(cursorStr))
687}
688
689// GetByURIsBatch retrieves multiple comments by their AT-URIs in a single query
690// Returns map[uri]*Comment for efficient lookups without N+1 queries
691func (r *postgresCommentRepo) GetByURIsBatch(ctx context.Context, uris []string) (map[string]*comments.Comment, error) {
692 if len(uris) == 0 {
693 return make(map[string]*comments.Comment), nil
694 }
695
696 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events)
697 // COALESCE falls back to DID when handle is NULL (user not yet in users table)
698 query := `
699 SELECT
700 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
701 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
702 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
703 c.created_at, c.indexed_at, c.deleted_at,
704 c.upvote_count, c.downvote_count, c.score, c.reply_count,
705 COALESCE(u.handle, c.commenter_did) as author_handle
706 FROM comments c
707 LEFT JOIN users u ON c.commenter_did = u.did
708 WHERE c.uri = ANY($1) AND c.deleted_at IS NULL
709 `
710
711 rows, err := r.db.QueryContext(ctx, query, pq.Array(uris))
712 if err != nil {
713 return nil, fmt.Errorf("failed to batch get comments by URIs: %w", err)
714 }
715 defer func() {
716 if err := rows.Close(); err != nil {
717 log.Printf("Failed to close rows: %v", err)
718 }
719 }()
720
721 result := make(map[string]*comments.Comment)
722 for rows.Next() {
723 var comment comments.Comment
724 var langs pq.StringArray
725 var authorHandle string
726
727 err := rows.Scan(
728 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
729 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
730 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
731 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
732 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
733 &authorHandle,
734 )
735 if err != nil {
736 return nil, fmt.Errorf("failed to scan comment: %w", err)
737 }
738
739 comment.Langs = langs
740 result[comment.URI] = &comment
741 }
742
743 if err = rows.Err(); err != nil {
744 return nil, fmt.Errorf("error iterating comments: %w", err)
745 }
746
747 return result, nil
748}
749
750// ListByParentsBatch retrieves direct replies to multiple parents in a single query
751// Groups results by parent URI to prevent N+1 queries when loading nested replies
752// Uses window functions to limit results per parent efficiently
753func (r *postgresCommentRepo) ListByParentsBatch(
754 ctx context.Context,
755 parentURIs []string,
756 sort string,
757 limitPerParent int,
758) (map[string][]*comments.Comment, error) {
759 if len(parentURIs) == 0 {
760 return make(map[string][]*comments.Comment), nil
761 }
762
763 // Build ORDER BY clause based on sort type
764 // windowOrderBy must inline expressions (can't use SELECT aliases in window functions)
765 var windowOrderBy string
766 var selectClause string
767 switch sort {
768 case "hot":
769 selectClause = `
770 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
771 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
772 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
773 c.created_at, c.indexed_at, c.deleted_at,
774 c.upvote_count, c.downvote_count, c.score, c.reply_count,
775 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank,
776 COALESCE(u.handle, c.commenter_did) as author_handle`
777 // CRITICAL: Must inline hot_rank formula - PostgreSQL doesn't allow SELECT aliases in window ORDER BY
778 windowOrderBy = `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) DESC, c.score DESC, c.created_at DESC`
779 case "top":
780 selectClause = `
781 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
782 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
783 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
784 c.created_at, c.indexed_at, c.deleted_at,
785 c.upvote_count, c.downvote_count, c.score, c.reply_count,
786 NULL::numeric as hot_rank,
787 COALESCE(u.handle, c.commenter_did) as author_handle`
788 windowOrderBy = `c.score DESC, c.created_at DESC`
789 case "new":
790 selectClause = `
791 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
792 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
793 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
794 c.created_at, c.indexed_at, c.deleted_at,
795 c.upvote_count, c.downvote_count, c.score, c.reply_count,
796 NULL::numeric as hot_rank,
797 COALESCE(u.handle, c.commenter_did) as author_handle`
798 windowOrderBy = `c.created_at DESC`
799 default:
800 // Default to hot
801 selectClause = `
802 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
803 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
804 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
805 c.created_at, c.indexed_at, c.deleted_at,
806 c.upvote_count, c.downvote_count, c.score, c.reply_count,
807 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank,
808 COALESCE(u.handle, c.commenter_did) as author_handle`
809 // CRITICAL: Must inline hot_rank formula - PostgreSQL doesn't allow SELECT aliases in window ORDER BY
810 windowOrderBy = `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) DESC, c.score DESC, c.created_at DESC`
811 }
812
813 // Use window function to limit results per parent
814 // This is more efficient than LIMIT in a subquery per parent
815 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events)
816 query := fmt.Sprintf(`
817 WITH ranked_comments AS (
818 SELECT
819 %s,
820 ROW_NUMBER() OVER (
821 PARTITION BY c.parent_uri
822 ORDER BY %s
823 ) as rn
824 FROM comments c
825 LEFT JOIN users u ON c.commenter_did = u.did
826 WHERE c.parent_uri = ANY($1) AND c.deleted_at IS NULL
827 )
828 SELECT
829 id, uri, cid, rkey, commenter_did,
830 root_uri, root_cid, parent_uri, parent_cid,
831 content, content_facets, embed, content_labels, langs,
832 created_at, indexed_at, deleted_at,
833 upvote_count, downvote_count, score, reply_count,
834 hot_rank, author_handle
835 FROM ranked_comments
836 WHERE rn <= $2
837 ORDER BY parent_uri, rn
838 `, selectClause, windowOrderBy)
839
840 rows, err := r.db.QueryContext(ctx, query, pq.Array(parentURIs), limitPerParent)
841 if err != nil {
842 return nil, fmt.Errorf("failed to batch query comments by parents: %w", err)
843 }
844 defer func() {
845 if err := rows.Close(); err != nil {
846 log.Printf("Failed to close rows: %v", err)
847 }
848 }()
849
850 // Group results by parent URI
851 result := make(map[string][]*comments.Comment)
852 for rows.Next() {
853 var comment comments.Comment
854 var langs pq.StringArray
855 var hotRank sql.NullFloat64
856 var authorHandle string
857
858 err := rows.Scan(
859 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
860 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
861 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
862 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
863 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
864 &hotRank, &authorHandle,
865 )
866 if err != nil {
867 return nil, fmt.Errorf("failed to scan comment: %w", err)
868 }
869
870 comment.Langs = langs
871 comment.CommenterHandle = authorHandle
872
873 // Group by parent URI
874 result[comment.ParentURI] = append(result[comment.ParentURI], &comment)
875 }
876
877 if err = rows.Err(); err != nil {
878 return nil, fmt.Errorf("error iterating comments: %w", err)
879 }
880
881 return result, nil
882}
883
884// GetVoteStateForComments retrieves the viewer's votes on a batch of comments
885// Returns map[commentURI]*Vote for efficient lookups
886// Note: This implementation is prepared for when the votes table indexing is implemented
887// Currently returns an empty map as votes may not be fully indexed yet
888func (r *postgresCommentRepo) GetVoteStateForComments(ctx context.Context, viewerDID string, commentURIs []string) (map[string]interface{}, error) {
889 if len(commentURIs) == 0 || viewerDID == "" {
890 return make(map[string]interface{}), nil
891 }
892
893 // Query votes table for viewer's votes on these comments
894 // Note: This assumes votes table exists and is being indexed
895 // If votes table doesn't exist yet, this query will fail gracefully
896 query := `
897 SELECT subject_uri, direction, uri
898 FROM votes
899 WHERE voter_did = $1 AND subject_uri = ANY($2) AND deleted_at IS NULL
900 `
901
902 rows, err := r.db.QueryContext(ctx, query, viewerDID, pq.Array(commentURIs))
903 if err != nil {
904 // If votes table doesn't exist yet, return empty map instead of error
905 // This allows the API to work before votes indexing is fully implemented
906 if strings.Contains(err.Error(), "does not exist") {
907 return make(map[string]interface{}), nil
908 }
909 return nil, fmt.Errorf("failed to get vote state for comments: %w", err)
910 }
911 defer func() {
912 if err := rows.Close(); err != nil {
913 log.Printf("Failed to close rows: %v", err)
914 }
915 }()
916
917 // Build result map with vote information
918 result := make(map[string]interface{})
919 for rows.Next() {
920 var subjectURI, direction, uri string
921
922 err := rows.Scan(&subjectURI, &direction, &uri)
923 if err != nil {
924 return nil, fmt.Errorf("failed to scan vote: %w", err)
925 }
926
927 // Store vote info as a simple map (can be enhanced later with proper Vote struct)
928 result[subjectURI] = map[string]interface{}{
929 "direction": direction,
930 "uri": uri,
931 }
932 }
933
934 if err = rows.Err(); err != nil {
935 return nil, fmt.Errorf("error iterating votes: %w", err)
936 }
937
938 return result, nil
939}