A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/comments"
5 "context"
6 "database/sql"
7 "encoding/base64"
8 "fmt"
9 "log"
10 "strings"
11
12 "github.com/lib/pq"
13)
14
15type postgresCommentRepo struct {
16 db *sql.DB
17}
18
19// NewCommentRepository creates a new PostgreSQL comment repository
20func NewCommentRepository(db *sql.DB) comments.Repository {
21 return &postgresCommentRepo{db: db}
22}
23
24// Create inserts a new comment into the comments table
25// Called by Jetstream consumer after comment is created on PDS
26// Idempotent: Returns success if comment already exists (for Jetstream replays)
27func (r *postgresCommentRepo) Create(ctx context.Context, comment *comments.Comment) error {
28 query := `
29 INSERT INTO comments (
30 uri, cid, rkey, commenter_did,
31 root_uri, root_cid, parent_uri, parent_cid,
32 content, content_facets, embed, content_labels, langs,
33 created_at, indexed_at
34 ) VALUES (
35 $1, $2, $3, $4,
36 $5, $6, $7, $8,
37 $9, $10, $11, $12, $13,
38 $14, NOW()
39 )
40 ON CONFLICT (uri) DO NOTHING
41 RETURNING id, indexed_at
42 `
43
44 err := r.db.QueryRowContext(
45 ctx, query,
46 comment.URI, comment.CID, comment.RKey, comment.CommenterDID,
47 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID,
48 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
49 comment.CreatedAt,
50 ).Scan(&comment.ID, &comment.IndexedAt)
51
52 // ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent)
53 if err == sql.ErrNoRows {
54 return nil // Comment already exists, no error for idempotency
55 }
56
57 if err != nil {
58 // Check for unique constraint violation
59 if strings.Contains(err.Error(), "duplicate key") {
60 return comments.ErrCommentAlreadyExists
61 }
62
63 return fmt.Errorf("failed to insert comment: %w", err)
64 }
65
66 return nil
67}
68
69// Update modifies an existing comment's content fields
70// Called by Jetstream consumer after comment is updated on PDS
71// Preserves vote counts and created_at timestamp
72func (r *postgresCommentRepo) Update(ctx context.Context, comment *comments.Comment) error {
73 query := `
74 UPDATE comments
75 SET
76 cid = $1,
77 content = $2,
78 content_facets = $3,
79 embed = $4,
80 content_labels = $5,
81 langs = $6
82 WHERE uri = $7 AND deleted_at IS NULL
83 RETURNING id, indexed_at, created_at, upvote_count, downvote_count, score, reply_count
84 `
85
86 err := r.db.QueryRowContext(
87 ctx, query,
88 comment.CID,
89 comment.Content,
90 comment.ContentFacets,
91 comment.Embed,
92 comment.ContentLabels,
93 pq.Array(comment.Langs),
94 comment.URI,
95 ).Scan(
96 &comment.ID,
97 &comment.IndexedAt,
98 &comment.CreatedAt,
99 &comment.UpvoteCount,
100 &comment.DownvoteCount,
101 &comment.Score,
102 &comment.ReplyCount,
103 )
104
105 if err == sql.ErrNoRows {
106 return comments.ErrCommentNotFound
107 }
108 if err != nil {
109 return fmt.Errorf("failed to update comment: %w", err)
110 }
111
112 return nil
113}
114
115// GetByURI retrieves a comment by its AT-URI
116// Used by Jetstream consumer for UPDATE/DELETE operations
117func (r *postgresCommentRepo) GetByURI(ctx context.Context, uri string) (*comments.Comment, error) {
118 query := `
119 SELECT
120 id, uri, cid, rkey, commenter_did,
121 root_uri, root_cid, parent_uri, parent_cid,
122 content, content_facets, embed, content_labels, langs,
123 created_at, indexed_at, deleted_at,
124 upvote_count, downvote_count, score, reply_count
125 FROM comments
126 WHERE uri = $1
127 `
128
129 var comment comments.Comment
130 var langs pq.StringArray
131
132 err := r.db.QueryRowContext(ctx, query, uri).Scan(
133 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
134 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
135 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
136 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
137 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
138 )
139
140 if err == sql.ErrNoRows {
141 return nil, comments.ErrCommentNotFound
142 }
143 if err != nil {
144 return nil, fmt.Errorf("failed to get comment by URI: %w", err)
145 }
146
147 comment.Langs = langs
148
149 return &comment, nil
150}
151
152// Delete soft-deletes a comment (sets deleted_at)
153// Called by Jetstream consumer after comment is deleted from PDS
154// Idempotent: Returns success if comment already deleted
155func (r *postgresCommentRepo) Delete(ctx context.Context, uri string) error {
156 query := `
157 UPDATE comments
158 SET deleted_at = NOW()
159 WHERE uri = $1 AND deleted_at IS NULL
160 `
161
162 result, err := r.db.ExecContext(ctx, query, uri)
163 if err != nil {
164 return fmt.Errorf("failed to delete comment: %w", err)
165 }
166
167 rowsAffected, err := result.RowsAffected()
168 if err != nil {
169 return fmt.Errorf("failed to check delete result: %w", err)
170 }
171
172 // Idempotent: If no rows affected, comment already deleted (OK for Jetstream replays)
173 if rowsAffected == 0 {
174 return nil
175 }
176
177 return nil
178}
179
180// ListByRoot retrieves all active comments in a thread (flat)
181// Used for fetching entire comment threads on posts
182func (r *postgresCommentRepo) ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*comments.Comment, error) {
183 query := `
184 SELECT
185 id, uri, cid, rkey, commenter_did,
186 root_uri, root_cid, parent_uri, parent_cid,
187 content, content_facets, embed, content_labels, langs,
188 created_at, indexed_at, deleted_at,
189 upvote_count, downvote_count, score, reply_count
190 FROM comments
191 WHERE root_uri = $1 AND deleted_at IS NULL
192 ORDER BY created_at ASC
193 LIMIT $2 OFFSET $3
194 `
195
196 rows, err := r.db.QueryContext(ctx, query, rootURI, limit, offset)
197 if err != nil {
198 return nil, fmt.Errorf("failed to list comments by root: %w", err)
199 }
200 defer func() {
201 if err := rows.Close(); err != nil {
202 log.Printf("Failed to close rows: %v", err)
203 }
204 }()
205
206 var result []*comments.Comment
207 for rows.Next() {
208 var comment comments.Comment
209 var langs pq.StringArray
210
211 err := rows.Scan(
212 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
213 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
214 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
215 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
216 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
217 )
218 if err != nil {
219 return nil, fmt.Errorf("failed to scan comment: %w", err)
220 }
221
222 comment.Langs = langs
223 result = append(result, &comment)
224 }
225
226 if err = rows.Err(); err != nil {
227 return nil, fmt.Errorf("error iterating comments: %w", err)
228 }
229
230 return result, nil
231}
232
233// ListByParent retrieves direct replies to a post or comment
234// Used for building nested/threaded comment views
235func (r *postgresCommentRepo) ListByParent(ctx context.Context, parentURI string, limit, offset int) ([]*comments.Comment, error) {
236 query := `
237 SELECT
238 id, uri, cid, rkey, commenter_did,
239 root_uri, root_cid, parent_uri, parent_cid,
240 content, content_facets, embed, content_labels, langs,
241 created_at, indexed_at, deleted_at,
242 upvote_count, downvote_count, score, reply_count
243 FROM comments
244 WHERE parent_uri = $1 AND deleted_at IS NULL
245 ORDER BY created_at ASC
246 LIMIT $2 OFFSET $3
247 `
248
249 rows, err := r.db.QueryContext(ctx, query, parentURI, limit, offset)
250 if err != nil {
251 return nil, fmt.Errorf("failed to list comments by parent: %w", err)
252 }
253 defer func() {
254 if err := rows.Close(); err != nil {
255 log.Printf("Failed to close rows: %v", err)
256 }
257 }()
258
259 var result []*comments.Comment
260 for rows.Next() {
261 var comment comments.Comment
262 var langs pq.StringArray
263
264 err := rows.Scan(
265 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
266 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
267 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
268 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
269 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
270 )
271 if err != nil {
272 return nil, fmt.Errorf("failed to scan comment: %w", err)
273 }
274
275 comment.Langs = langs
276 result = append(result, &comment)
277 }
278
279 if err = rows.Err(); err != nil {
280 return nil, fmt.Errorf("error iterating comments: %w", err)
281 }
282
283 return result, nil
284}
285
286// CountByParent counts direct replies to a post or comment
287// Used for showing reply counts in threading UI
288func (r *postgresCommentRepo) CountByParent(ctx context.Context, parentURI string) (int, error) {
289 query := `
290 SELECT COUNT(*)
291 FROM comments
292 WHERE parent_uri = $1 AND deleted_at IS NULL
293 `
294
295 var count int
296 err := r.db.QueryRowContext(ctx, query, parentURI).Scan(&count)
297 if err != nil {
298 return 0, fmt.Errorf("failed to count comments by parent: %w", err)
299 }
300
301 return count, nil
302}
303
304// ListByCommenter retrieves all active comments by a specific user
305// Future: Used for user comment history
306func (r *postgresCommentRepo) ListByCommenter(ctx context.Context, commenterDID string, limit, offset int) ([]*comments.Comment, error) {
307 query := `
308 SELECT
309 id, uri, cid, rkey, commenter_did,
310 root_uri, root_cid, parent_uri, parent_cid,
311 content, content_facets, embed, content_labels, langs,
312 created_at, indexed_at, deleted_at,
313 upvote_count, downvote_count, score, reply_count
314 FROM comments
315 WHERE commenter_did = $1 AND deleted_at IS NULL
316 ORDER BY created_at DESC
317 LIMIT $2 OFFSET $3
318 `
319
320 rows, err := r.db.QueryContext(ctx, query, commenterDID, limit, offset)
321 if err != nil {
322 return nil, fmt.Errorf("failed to list comments by commenter: %w", err)
323 }
324 defer func() {
325 if err := rows.Close(); err != nil {
326 log.Printf("Failed to close rows: %v", err)
327 }
328 }()
329
330 var result []*comments.Comment
331 for rows.Next() {
332 var comment comments.Comment
333 var langs pq.StringArray
334
335 err := rows.Scan(
336 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
337 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
338 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
339 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
340 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
341 )
342 if err != nil {
343 return nil, fmt.Errorf("failed to scan comment: %w", err)
344 }
345
346 comment.Langs = langs
347 result = append(result, &comment)
348 }
349
350 if err = rows.Err(); err != nil {
351 return nil, fmt.Errorf("error iterating comments: %w", err)
352 }
353
354 return result, nil
355}
356
357// ListByParentWithHotRank retrieves direct replies to a post or comment with sorting and pagination
358// Supports three sort modes: hot (Lemmy algorithm), top (by score + timeframe), and new (by created_at)
359// Uses cursor-based pagination with composite keys for consistent ordering
360// Hydrates author info (handle, display_name, avatar) via JOIN with users table
361func (r *postgresCommentRepo) ListByParentWithHotRank(
362 ctx context.Context,
363 parentURI string,
364 sort string,
365 timeframe string,
366 limit int,
367 cursor *string,
368) ([]*comments.Comment, *string, error) {
369 // Build ORDER BY clause and time filter based on sort type
370 orderBy, timeFilter := r.buildCommentSortClause(sort, timeframe)
371
372 // Parse cursor for pagination
373 cursorFilter, cursorValues, err := r.parseCommentCursor(cursor, sort)
374 if err != nil {
375 return nil, nil, fmt.Errorf("invalid cursor: %w", err)
376 }
377
378 // Build SELECT clause - compute hot_rank for "hot" sort
379 // Hot rank formula (Lemmy algorithm):
380 // log(greatest(2, score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600) + 2), 1.8)
381 //
382 // This formula:
383 // - Gives logarithmic weight to score (prevents high-score dominance)
384 // - Decays over time with power 1.8 (faster than linear, slower than quadratic)
385 // - Uses hours as time unit (3600 seconds)
386 // - Adds constants to prevent division by zero and ensure positive values
387 var selectClause string
388 if sort == "hot" {
389 selectClause = `
390 SELECT
391 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
392 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
393 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
394 c.created_at, c.indexed_at, c.deleted_at,
395 c.upvote_count, c.downvote_count, c.score, c.reply_count,
396 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank,
397 COALESCE(u.handle, c.commenter_did) as author_handle
398 FROM comments c`
399 } else {
400 selectClause = `
401 SELECT
402 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
403 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
404 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
405 c.created_at, c.indexed_at, c.deleted_at,
406 c.upvote_count, c.downvote_count, c.score, c.reply_count,
407 NULL::numeric as hot_rank,
408 COALESCE(u.handle, c.commenter_did) as author_handle
409 FROM comments c`
410 }
411
412 // Build complete query with JOINs and filters
413 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events)
414 query := fmt.Sprintf(`
415 %s
416 LEFT JOIN users u ON c.commenter_did = u.did
417 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
418 %s
419 %s
420 ORDER BY %s
421 LIMIT $2
422 `, selectClause, timeFilter, cursorFilter, orderBy)
423
424 // Prepare query arguments
425 args := []interface{}{parentURI, limit + 1} // +1 to detect next page
426 args = append(args, cursorValues...)
427
428 // Execute query
429 rows, err := r.db.QueryContext(ctx, query, args...)
430 if err != nil {
431 return nil, nil, fmt.Errorf("failed to query comments with hot rank: %w", err)
432 }
433 defer func() {
434 if err := rows.Close(); err != nil {
435 log.Printf("Failed to close rows: %v", err)
436 }
437 }()
438
439 // Scan results
440 var result []*comments.Comment
441 var hotRanks []float64
442 for rows.Next() {
443 var comment comments.Comment
444 var langs pq.StringArray
445 var hotRank sql.NullFloat64
446 var authorHandle string
447
448 err := rows.Scan(
449 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
450 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
451 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
452 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
453 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
454 &hotRank, &authorHandle,
455 )
456 if err != nil {
457 return nil, nil, fmt.Errorf("failed to scan comment: %w", err)
458 }
459
460 comment.Langs = langs
461 comment.CommenterHandle = authorHandle
462
463 // Store hot_rank for cursor building
464 hotRankValue := 0.0
465 if hotRank.Valid {
466 hotRankValue = hotRank.Float64
467 }
468 hotRanks = append(hotRanks, hotRankValue)
469
470 result = append(result, &comment)
471 }
472
473 if err = rows.Err(); err != nil {
474 return nil, nil, fmt.Errorf("error iterating comments: %w", err)
475 }
476
477 // Handle pagination cursor
478 var nextCursor *string
479 if len(result) > limit && limit > 0 {
480 result = result[:limit]
481 hotRanks = hotRanks[:limit]
482 lastComment := result[len(result)-1]
483 lastHotRank := hotRanks[len(hotRanks)-1]
484 cursorStr := r.buildCommentCursor(lastComment, sort, lastHotRank)
485 nextCursor = &cursorStr
486 }
487
488 return result, nextCursor, nil
489}
490
491// buildCommentSortClause returns the ORDER BY SQL and optional time filter
492func (r *postgresCommentRepo) buildCommentSortClause(sort, timeframe string) (string, string) {
493 var orderBy string
494 switch sort {
495 case "hot":
496 // Hot rank DESC, then score DESC as tiebreaker, then created_at DESC, then uri DESC
497 orderBy = `hot_rank DESC, c.score DESC, c.created_at DESC, c.uri DESC`
498 case "top":
499 // Score DESC, then created_at DESC, then uri DESC
500 orderBy = `c.score DESC, c.created_at DESC, c.uri DESC`
501 case "new":
502 // Created at DESC, then uri DESC
503 orderBy = `c.created_at DESC, c.uri DESC`
504 default:
505 // Default to hot
506 orderBy = `hot_rank DESC, c.score DESC, c.created_at DESC, c.uri DESC`
507 }
508
509 // Add time filter for "top" sort
510 var timeFilter string
511 if sort == "top" {
512 timeFilter = r.buildCommentTimeFilter(timeframe)
513 }
514
515 return orderBy, timeFilter
516}
517
518// buildCommentTimeFilter returns SQL filter for timeframe
519func (r *postgresCommentRepo) buildCommentTimeFilter(timeframe string) string {
520 if timeframe == "" || timeframe == "all" {
521 return ""
522 }
523
524 var interval string
525 switch timeframe {
526 case "hour":
527 interval = "1 hour"
528 case "day":
529 interval = "1 day"
530 case "week":
531 interval = "7 days"
532 case "month":
533 interval = "30 days"
534 case "year":
535 interval = "1 year"
536 default:
537 return ""
538 }
539
540 return fmt.Sprintf("AND c.created_at >= NOW() - INTERVAL '%s'", interval)
541}
542
543// parseCommentCursor decodes pagination cursor for comments
544func (r *postgresCommentRepo) parseCommentCursor(cursor *string, sort string) (string, []interface{}, error) {
545 if cursor == nil || *cursor == "" {
546 return "", nil, nil
547 }
548
549 // Validate cursor size to prevent DoS via massive base64 strings
550 const maxCursorSize = 1024
551 if len(*cursor) > maxCursorSize {
552 return "", nil, fmt.Errorf("cursor too large: maximum %d bytes", maxCursorSize)
553 }
554
555 // Decode base64 cursor
556 decoded, err := base64.URLEncoding.DecodeString(*cursor)
557 if err != nil {
558 return "", nil, fmt.Errorf("invalid cursor encoding")
559 }
560
561 // Parse cursor based on sort type using | delimiter
562 // Format: hotRank|score|createdAt|uri (for hot)
563 // score|createdAt|uri (for top)
564 // createdAt|uri (for new)
565 parts := strings.Split(string(decoded), "|")
566
567 switch sort {
568 case "new":
569 // Cursor format: createdAt|uri
570 if len(parts) != 2 {
571 return "", nil, fmt.Errorf("invalid cursor format for new sort")
572 }
573
574 createdAt := parts[0]
575 uri := parts[1]
576
577 // Validate AT-URI format
578 if !strings.HasPrefix(uri, "at://") {
579 return "", nil, fmt.Errorf("invalid cursor URI")
580 }
581
582 filter := `AND (c.created_at < $3 OR (c.created_at = $3 AND c.uri < $4))`
583 return filter, []interface{}{createdAt, uri}, nil
584
585 case "top":
586 // Cursor format: score|createdAt|uri
587 if len(parts) != 3 {
588 return "", nil, fmt.Errorf("invalid cursor format for top sort")
589 }
590
591 scoreStr := parts[0]
592 createdAt := parts[1]
593 uri := parts[2]
594
595 // Parse score as integer
596 score := 0
597 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil {
598 return "", nil, fmt.Errorf("invalid cursor score")
599 }
600
601 // Validate AT-URI format
602 if !strings.HasPrefix(uri, "at://") {
603 return "", nil, fmt.Errorf("invalid cursor URI")
604 }
605
606 filter := `AND (c.score < $3 OR (c.score = $3 AND c.created_at < $4) OR (c.score = $3 AND c.created_at = $4 AND c.uri < $5))`
607 return filter, []interface{}{score, createdAt, uri}, nil
608
609 case "hot":
610 // Cursor format: hotRank|score|createdAt|uri
611 if len(parts) != 4 {
612 return "", nil, fmt.Errorf("invalid cursor format for hot sort")
613 }
614
615 hotRankStr := parts[0]
616 scoreStr := parts[1]
617 createdAt := parts[2]
618 uri := parts[3]
619
620 // Parse hot_rank as float
621 hotRank := 0.0
622 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil {
623 return "", nil, fmt.Errorf("invalid cursor hot rank")
624 }
625
626 // Parse score as integer
627 score := 0
628 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil {
629 return "", nil, fmt.Errorf("invalid cursor score")
630 }
631
632 // Validate AT-URI format
633 if !strings.HasPrefix(uri, "at://") {
634 return "", nil, fmt.Errorf("invalid cursor URI")
635 }
636
637 // Use computed hot_rank expression in comparison
638 hotRankExpr := `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8)`
639 filter := fmt.Sprintf(`AND ((%s < $3 OR (%s = $3 AND c.score < $4) OR (%s = $3 AND c.score = $4 AND c.created_at < $5) OR (%s = $3 AND c.score = $4 AND c.created_at = $5 AND c.uri < $6)) AND c.uri != $7)`,
640 hotRankExpr, hotRankExpr, hotRankExpr, hotRankExpr)
641 return filter, []interface{}{hotRank, score, createdAt, uri, uri}, nil
642
643 default:
644 return "", nil, nil
645 }
646}
647
648// buildCommentCursor creates pagination cursor from last comment
649func (r *postgresCommentRepo) buildCommentCursor(comment *comments.Comment, sort string, hotRank float64) string {
650 var cursorStr string
651 const delimiter = "|"
652
653 switch sort {
654 case "new":
655 // Format: createdAt|uri
656 cursorStr = fmt.Sprintf("%s%s%s",
657 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"),
658 delimiter,
659 comment.URI)
660
661 case "top":
662 // Format: score|createdAt|uri
663 cursorStr = fmt.Sprintf("%d%s%s%s%s",
664 comment.Score,
665 delimiter,
666 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"),
667 delimiter,
668 comment.URI)
669
670 case "hot":
671 // Format: hotRank|score|createdAt|uri
672 cursorStr = fmt.Sprintf("%f%s%d%s%s%s%s",
673 hotRank,
674 delimiter,
675 comment.Score,
676 delimiter,
677 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"),
678 delimiter,
679 comment.URI)
680
681 default:
682 cursorStr = comment.URI
683 }
684
685 return base64.URLEncoding.EncodeToString([]byte(cursorStr))
686}
687
688// GetByURIsBatch retrieves multiple comments by their AT-URIs in a single query
689// Returns map[uri]*Comment for efficient lookups without N+1 queries
690func (r *postgresCommentRepo) GetByURIsBatch(ctx context.Context, uris []string) (map[string]*comments.Comment, error) {
691 if len(uris) == 0 {
692 return make(map[string]*comments.Comment), nil
693 }
694
695 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events)
696 // COALESCE falls back to DID when handle is NULL (user not yet in users table)
697 query := `
698 SELECT
699 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
700 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
701 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
702 c.created_at, c.indexed_at, c.deleted_at,
703 c.upvote_count, c.downvote_count, c.score, c.reply_count,
704 COALESCE(u.handle, c.commenter_did) as author_handle
705 FROM comments c
706 LEFT JOIN users u ON c.commenter_did = u.did
707 WHERE c.uri = ANY($1) AND c.deleted_at IS NULL
708 `
709
710 rows, err := r.db.QueryContext(ctx, query, pq.Array(uris))
711 if err != nil {
712 return nil, fmt.Errorf("failed to batch get comments by URIs: %w", err)
713 }
714 defer func() {
715 if err := rows.Close(); err != nil {
716 log.Printf("Failed to close rows: %v", err)
717 }
718 }()
719
720 result := make(map[string]*comments.Comment)
721 for rows.Next() {
722 var comment comments.Comment
723 var langs pq.StringArray
724 var authorHandle string
725
726 err := rows.Scan(
727 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
728 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
729 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
730 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
731 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
732 &authorHandle,
733 )
734 if err != nil {
735 return nil, fmt.Errorf("failed to scan comment: %w", err)
736 }
737
738 comment.Langs = langs
739 result[comment.URI] = &comment
740 }
741
742 if err = rows.Err(); err != nil {
743 return nil, fmt.Errorf("error iterating comments: %w", err)
744 }
745
746 return result, nil
747}
748
749// ListByParentsBatch retrieves direct replies to multiple parents in a single query
750// Groups results by parent URI to prevent N+1 queries when loading nested replies
751// Uses window functions to limit results per parent efficiently
752func (r *postgresCommentRepo) ListByParentsBatch(
753 ctx context.Context,
754 parentURIs []string,
755 sort string,
756 limitPerParent int,
757) (map[string][]*comments.Comment, error) {
758 if len(parentURIs) == 0 {
759 return make(map[string][]*comments.Comment), nil
760 }
761
762 // Build ORDER BY clause based on sort type
763 // windowOrderBy must inline expressions (can't use SELECT aliases in window functions)
764 var windowOrderBy string
765 var selectClause string
766 switch sort {
767 case "hot":
768 selectClause = `
769 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
770 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
771 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
772 c.created_at, c.indexed_at, c.deleted_at,
773 c.upvote_count, c.downvote_count, c.score, c.reply_count,
774 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank,
775 COALESCE(u.handle, c.commenter_did) as author_handle`
776 // CRITICAL: Must inline hot_rank formula - PostgreSQL doesn't allow SELECT aliases in window ORDER BY
777 windowOrderBy = `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) DESC, c.score DESC, c.created_at DESC`
778 case "top":
779 selectClause = `
780 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
781 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
782 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
783 c.created_at, c.indexed_at, c.deleted_at,
784 c.upvote_count, c.downvote_count, c.score, c.reply_count,
785 NULL::numeric as hot_rank,
786 COALESCE(u.handle, c.commenter_did) as author_handle`
787 windowOrderBy = `c.score DESC, c.created_at DESC`
788 case "new":
789 selectClause = `
790 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
791 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
792 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
793 c.created_at, c.indexed_at, c.deleted_at,
794 c.upvote_count, c.downvote_count, c.score, c.reply_count,
795 NULL::numeric as hot_rank,
796 COALESCE(u.handle, c.commenter_did) as author_handle`
797 windowOrderBy = `c.created_at DESC`
798 default:
799 // Default to hot
800 selectClause = `
801 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
802 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
803 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
804 c.created_at, c.indexed_at, c.deleted_at,
805 c.upvote_count, c.downvote_count, c.score, c.reply_count,
806 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank,
807 COALESCE(u.handle, c.commenter_did) as author_handle`
808 // CRITICAL: Must inline hot_rank formula - PostgreSQL doesn't allow SELECT aliases in window ORDER BY
809 windowOrderBy = `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) DESC, c.score DESC, c.created_at DESC`
810 }
811
812 // Use window function to limit results per parent
813 // This is more efficient than LIMIT in a subquery per parent
814 // LEFT JOIN prevents data loss when user record hasn't been indexed yet (out-of-order Jetstream events)
815 query := fmt.Sprintf(`
816 WITH ranked_comments AS (
817 SELECT
818 %s,
819 ROW_NUMBER() OVER (
820 PARTITION BY c.parent_uri
821 ORDER BY %s
822 ) as rn
823 FROM comments c
824 LEFT JOIN users u ON c.commenter_did = u.did
825 WHERE c.parent_uri = ANY($1) AND c.deleted_at IS NULL
826 )
827 SELECT
828 id, uri, cid, rkey, commenter_did,
829 root_uri, root_cid, parent_uri, parent_cid,
830 content, content_facets, embed, content_labels, langs,
831 created_at, indexed_at, deleted_at,
832 upvote_count, downvote_count, score, reply_count,
833 hot_rank, author_handle
834 FROM ranked_comments
835 WHERE rn <= $2
836 ORDER BY parent_uri, rn
837 `, selectClause, windowOrderBy)
838
839 rows, err := r.db.QueryContext(ctx, query, pq.Array(parentURIs), limitPerParent)
840 if err != nil {
841 return nil, fmt.Errorf("failed to batch query comments by parents: %w", err)
842 }
843 defer func() {
844 if err := rows.Close(); err != nil {
845 log.Printf("Failed to close rows: %v", err)
846 }
847 }()
848
849 // Group results by parent URI
850 result := make(map[string][]*comments.Comment)
851 for rows.Next() {
852 var comment comments.Comment
853 var langs pq.StringArray
854 var hotRank sql.NullFloat64
855 var authorHandle string
856
857 err := rows.Scan(
858 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
859 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
860 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
861 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
862 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
863 &hotRank, &authorHandle,
864 )
865 if err != nil {
866 return nil, fmt.Errorf("failed to scan comment: %w", err)
867 }
868
869 comment.Langs = langs
870 comment.CommenterHandle = authorHandle
871
872 // Group by parent URI
873 result[comment.ParentURI] = append(result[comment.ParentURI], &comment)
874 }
875
876 if err = rows.Err(); err != nil {
877 return nil, fmt.Errorf("error iterating comments: %w", err)
878 }
879
880 return result, nil
881}
882
883// GetVoteStateForComments retrieves the viewer's votes on a batch of comments
884// Returns map[commentURI]*Vote for efficient lookups
885// Note: This implementation is prepared for when the votes table indexing is implemented
886// Currently returns an empty map as votes may not be fully indexed yet
887func (r *postgresCommentRepo) GetVoteStateForComments(ctx context.Context, viewerDID string, commentURIs []string) (map[string]interface{}, error) {
888 if len(commentURIs) == 0 || viewerDID == "" {
889 return make(map[string]interface{}), nil
890 }
891
892 // Query votes table for viewer's votes on these comments
893 // Note: This assumes votes table exists and is being indexed
894 // If votes table doesn't exist yet, this query will fail gracefully
895 query := `
896 SELECT subject_uri, direction, uri
897 FROM votes
898 WHERE voter_did = $1 AND subject_uri = ANY($2) AND deleted_at IS NULL
899 `
900
901 rows, err := r.db.QueryContext(ctx, query, viewerDID, pq.Array(commentURIs))
902 if err != nil {
903 // If votes table doesn't exist yet, return empty map instead of error
904 // This allows the API to work before votes indexing is fully implemented
905 if strings.Contains(err.Error(), "does not exist") {
906 return make(map[string]interface{}), nil
907 }
908 return nil, fmt.Errorf("failed to get vote state for comments: %w", err)
909 }
910 defer func() {
911 if err := rows.Close(); err != nil {
912 log.Printf("Failed to close rows: %v", err)
913 }
914 }()
915
916 // Build result map with vote information
917 result := make(map[string]interface{})
918 for rows.Next() {
919 var subjectURI, direction, uri string
920
921 err := rows.Scan(&subjectURI, &direction, &uri)
922 if err != nil {
923 return nil, fmt.Errorf("failed to scan vote: %w", err)
924 }
925
926 // Store vote info as a simple map (can be enhanced later with proper Vote struct)
927 result[subjectURI] = map[string]interface{}{
928 "direction": direction,
929 "uri": uri,
930 }
931 }
932
933 if err = rows.Err(); err != nil {
934 return nil, fmt.Errorf("error iterating votes: %w", err)
935 }
936
937 return result, nil
938}