A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/comments"
5 "context"
6 "database/sql"
7 "encoding/base64"
8 "fmt"
9 "log"
10 "strings"
11
12 "github.com/lib/pq"
13)
14
15type postgresCommentRepo struct {
16 db *sql.DB
17}
18
19// NewCommentRepository creates a new PostgreSQL comment repository
20func NewCommentRepository(db *sql.DB) comments.Repository {
21 return &postgresCommentRepo{db: db}
22}
23
24// Create inserts a new comment into the comments table
25// Called by Jetstream consumer after comment is created on PDS
26// Idempotent: Returns success if comment already exists (for Jetstream replays)
27func (r *postgresCommentRepo) Create(ctx context.Context, comment *comments.Comment) error {
28 query := `
29 INSERT INTO comments (
30 uri, cid, rkey, commenter_did,
31 root_uri, root_cid, parent_uri, parent_cid,
32 content, content_facets, embed, content_labels, langs,
33 created_at, indexed_at
34 ) VALUES (
35 $1, $2, $3, $4,
36 $5, $6, $7, $8,
37 $9, $10, $11, $12, $13,
38 $14, NOW()
39 )
40 ON CONFLICT (uri) DO NOTHING
41 RETURNING id, indexed_at
42 `
43
44 err := r.db.QueryRowContext(
45 ctx, query,
46 comment.URI, comment.CID, comment.RKey, comment.CommenterDID,
47 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID,
48 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
49 comment.CreatedAt,
50 ).Scan(&comment.ID, &comment.IndexedAt)
51
52 // ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent)
53 if err == sql.ErrNoRows {
54 return nil // Comment already exists, no error for idempotency
55 }
56
57 if err != nil {
58 // Check for unique constraint violation
59 if strings.Contains(err.Error(), "duplicate key") {
60 return comments.ErrCommentAlreadyExists
61 }
62
63 return fmt.Errorf("failed to insert comment: %w", err)
64 }
65
66 return nil
67}
68
69// Update modifies an existing comment's content fields
70// Called by Jetstream consumer after comment is updated on PDS
71// Preserves vote counts and created_at timestamp
72func (r *postgresCommentRepo) Update(ctx context.Context, comment *comments.Comment) error {
73 query := `
74 UPDATE comments
75 SET
76 cid = $1,
77 content = $2,
78 content_facets = $3,
79 embed = $4,
80 content_labels = $5,
81 langs = $6
82 WHERE uri = $7 AND deleted_at IS NULL
83 RETURNING id, indexed_at, created_at, upvote_count, downvote_count, score, reply_count
84 `
85
86 err := r.db.QueryRowContext(
87 ctx, query,
88 comment.CID,
89 comment.Content,
90 comment.ContentFacets,
91 comment.Embed,
92 comment.ContentLabels,
93 pq.Array(comment.Langs),
94 comment.URI,
95 ).Scan(
96 &comment.ID,
97 &comment.IndexedAt,
98 &comment.CreatedAt,
99 &comment.UpvoteCount,
100 &comment.DownvoteCount,
101 &comment.Score,
102 &comment.ReplyCount,
103 )
104
105 if err == sql.ErrNoRows {
106 return comments.ErrCommentNotFound
107 }
108 if err != nil {
109 return fmt.Errorf("failed to update comment: %w", err)
110 }
111
112 return nil
113}
114
115// GetByURI retrieves a comment by its AT-URI
116// Used by Jetstream consumer for UPDATE/DELETE operations
117func (r *postgresCommentRepo) GetByURI(ctx context.Context, uri string) (*comments.Comment, error) {
118 query := `
119 SELECT
120 id, uri, cid, rkey, commenter_did,
121 root_uri, root_cid, parent_uri, parent_cid,
122 content, content_facets, embed, content_labels, langs,
123 created_at, indexed_at, deleted_at,
124 upvote_count, downvote_count, score, reply_count
125 FROM comments
126 WHERE uri = $1
127 `
128
129 var comment comments.Comment
130 var langs pq.StringArray
131
132 err := r.db.QueryRowContext(ctx, query, uri).Scan(
133 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
134 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
135 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
136 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
137 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
138 )
139
140 if err == sql.ErrNoRows {
141 return nil, comments.ErrCommentNotFound
142 }
143 if err != nil {
144 return nil, fmt.Errorf("failed to get comment by URI: %w", err)
145 }
146
147 comment.Langs = langs
148
149 return &comment, nil
150}
151
152// Delete soft-deletes a comment (sets deleted_at)
153// Called by Jetstream consumer after comment is deleted from PDS
154// Idempotent: Returns success if comment already deleted
155func (r *postgresCommentRepo) Delete(ctx context.Context, uri string) error {
156 query := `
157 UPDATE comments
158 SET deleted_at = NOW()
159 WHERE uri = $1 AND deleted_at IS NULL
160 `
161
162 result, err := r.db.ExecContext(ctx, query, uri)
163 if err != nil {
164 return fmt.Errorf("failed to delete comment: %w", err)
165 }
166
167 rowsAffected, err := result.RowsAffected()
168 if err != nil {
169 return fmt.Errorf("failed to check delete result: %w", err)
170 }
171
172 // Idempotent: If no rows affected, comment already deleted (OK for Jetstream replays)
173 if rowsAffected == 0 {
174 return nil
175 }
176
177 return nil
178}
179
180// ListByRoot retrieves all active comments in a thread (flat)
181// Used for fetching entire comment threads on posts
182func (r *postgresCommentRepo) ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*comments.Comment, error) {
183 query := `
184 SELECT
185 id, uri, cid, rkey, commenter_did,
186 root_uri, root_cid, parent_uri, parent_cid,
187 content, content_facets, embed, content_labels, langs,
188 created_at, indexed_at, deleted_at,
189 upvote_count, downvote_count, score, reply_count
190 FROM comments
191 WHERE root_uri = $1 AND deleted_at IS NULL
192 ORDER BY created_at ASC
193 LIMIT $2 OFFSET $3
194 `
195
196 rows, err := r.db.QueryContext(ctx, query, rootURI, limit, offset)
197 if err != nil {
198 return nil, fmt.Errorf("failed to list comments by root: %w", err)
199 }
200 defer func() {
201 if err := rows.Close(); err != nil {
202 log.Printf("Failed to close rows: %v", err)
203 }
204 }()
205
206 var result []*comments.Comment
207 for rows.Next() {
208 var comment comments.Comment
209 var langs pq.StringArray
210
211 err := rows.Scan(
212 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
213 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
214 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
215 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
216 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
217 )
218 if err != nil {
219 return nil, fmt.Errorf("failed to scan comment: %w", err)
220 }
221
222 comment.Langs = langs
223 result = append(result, &comment)
224 }
225
226 if err = rows.Err(); err != nil {
227 return nil, fmt.Errorf("error iterating comments: %w", err)
228 }
229
230 return result, nil
231}
232
233// ListByParent retrieves direct replies to a post or comment
234// Used for building nested/threaded comment views
235func (r *postgresCommentRepo) ListByParent(ctx context.Context, parentURI string, limit, offset int) ([]*comments.Comment, error) {
236 query := `
237 SELECT
238 id, uri, cid, rkey, commenter_did,
239 root_uri, root_cid, parent_uri, parent_cid,
240 content, content_facets, embed, content_labels, langs,
241 created_at, indexed_at, deleted_at,
242 upvote_count, downvote_count, score, reply_count
243 FROM comments
244 WHERE parent_uri = $1 AND deleted_at IS NULL
245 ORDER BY created_at ASC
246 LIMIT $2 OFFSET $3
247 `
248
249 rows, err := r.db.QueryContext(ctx, query, parentURI, limit, offset)
250 if err != nil {
251 return nil, fmt.Errorf("failed to list comments by parent: %w", err)
252 }
253 defer func() {
254 if err := rows.Close(); err != nil {
255 log.Printf("Failed to close rows: %v", err)
256 }
257 }()
258
259 var result []*comments.Comment
260 for rows.Next() {
261 var comment comments.Comment
262 var langs pq.StringArray
263
264 err := rows.Scan(
265 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
266 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
267 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
268 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
269 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
270 )
271 if err != nil {
272 return nil, fmt.Errorf("failed to scan comment: %w", err)
273 }
274
275 comment.Langs = langs
276 result = append(result, &comment)
277 }
278
279 if err = rows.Err(); err != nil {
280 return nil, fmt.Errorf("error iterating comments: %w", err)
281 }
282
283 return result, nil
284}
285
286// CountByParent counts direct replies to a post or comment
287// Used for showing reply counts in threading UI
288func (r *postgresCommentRepo) CountByParent(ctx context.Context, parentURI string) (int, error) {
289 query := `
290 SELECT COUNT(*)
291 FROM comments
292 WHERE parent_uri = $1 AND deleted_at IS NULL
293 `
294
295 var count int
296 err := r.db.QueryRowContext(ctx, query, parentURI).Scan(&count)
297 if err != nil {
298 return 0, fmt.Errorf("failed to count comments by parent: %w", err)
299 }
300
301 return count, nil
302}
303
304// ListByCommenter retrieves all active comments by a specific user
305// Future: Used for user comment history
306func (r *postgresCommentRepo) ListByCommenter(ctx context.Context, commenterDID string, limit, offset int) ([]*comments.Comment, error) {
307 query := `
308 SELECT
309 id, uri, cid, rkey, commenter_did,
310 root_uri, root_cid, parent_uri, parent_cid,
311 content, content_facets, embed, content_labels, langs,
312 created_at, indexed_at, deleted_at,
313 upvote_count, downvote_count, score, reply_count
314 FROM comments
315 WHERE commenter_did = $1 AND deleted_at IS NULL
316 ORDER BY created_at DESC
317 LIMIT $2 OFFSET $3
318 `
319
320 rows, err := r.db.QueryContext(ctx, query, commenterDID, limit, offset)
321 if err != nil {
322 return nil, fmt.Errorf("failed to list comments by commenter: %w", err)
323 }
324 defer func() {
325 if err := rows.Close(); err != nil {
326 log.Printf("Failed to close rows: %v", err)
327 }
328 }()
329
330 var result []*comments.Comment
331 for rows.Next() {
332 var comment comments.Comment
333 var langs pq.StringArray
334
335 err := rows.Scan(
336 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
337 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
338 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
339 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
340 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
341 )
342 if err != nil {
343 return nil, fmt.Errorf("failed to scan comment: %w", err)
344 }
345
346 comment.Langs = langs
347 result = append(result, &comment)
348 }
349
350 if err = rows.Err(); err != nil {
351 return nil, fmt.Errorf("error iterating comments: %w", err)
352 }
353
354 return result, nil
355}
356// ListByParentWithHotRank retrieves direct replies to a post or comment with sorting and pagination
357// Supports three sort modes: hot (Lemmy algorithm), top (by score + timeframe), and new (by created_at)
358// Uses cursor-based pagination with composite keys for consistent ordering
359// Hydrates author info (handle, display_name, avatar) via JOIN with users table
360func (r *postgresCommentRepo) ListByParentWithHotRank(
361 ctx context.Context,
362 parentURI string,
363 sort string,
364 timeframe string,
365 limit int,
366 cursor *string,
367) ([]*comments.Comment, *string, error) {
368 // Build ORDER BY clause and time filter based on sort type
369 orderBy, timeFilter := r.buildCommentSortClause(sort, timeframe)
370
371 // Parse cursor for pagination
372 cursorFilter, cursorValues, err := r.parseCommentCursor(cursor, sort)
373 if err != nil {
374 return nil, nil, fmt.Errorf("invalid cursor: %w", err)
375 }
376
377 // Build SELECT clause - compute hot_rank for "hot" sort
378 // Hot rank formula (Lemmy algorithm):
379 // log(greatest(2, score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600) + 2), 1.8)
380 //
381 // This formula:
382 // - Gives logarithmic weight to score (prevents high-score dominance)
383 // - Decays over time with power 1.8 (faster than linear, slower than quadratic)
384 // - Uses hours as time unit (3600 seconds)
385 // - Adds constants to prevent division by zero and ensure positive values
386 var selectClause string
387 if sort == "hot" {
388 selectClause = `
389 SELECT
390 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
391 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
392 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
393 c.created_at, c.indexed_at, c.deleted_at,
394 c.upvote_count, c.downvote_count, c.score, c.reply_count,
395 log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8) as hot_rank,
396 u.handle as author_handle
397 FROM comments c`
398 } else {
399 selectClause = `
400 SELECT
401 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
402 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
403 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
404 c.created_at, c.indexed_at, c.deleted_at,
405 c.upvote_count, c.downvote_count, c.score, c.reply_count,
406 NULL::numeric as hot_rank,
407 u.handle as author_handle
408 FROM comments c`
409 }
410
411 // Build complete query with JOINs and filters
412 query := fmt.Sprintf(`
413 %s
414 INNER JOIN users u ON c.commenter_did = u.did
415 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
416 %s
417 %s
418 ORDER BY %s
419 LIMIT $2
420 `, selectClause, timeFilter, cursorFilter, orderBy)
421
422 // Prepare query arguments
423 args := []interface{}{parentURI, limit + 1} // +1 to detect next page
424 args = append(args, cursorValues...)
425
426 // Execute query
427 rows, err := r.db.QueryContext(ctx, query, args...)
428 if err != nil {
429 return nil, nil, fmt.Errorf("failed to query comments with hot rank: %w", err)
430 }
431 defer func() {
432 if err := rows.Close(); err != nil {
433 log.Printf("Failed to close rows: %v", err)
434 }
435 }()
436
437 // Scan results
438 var result []*comments.Comment
439 var hotRanks []float64
440 for rows.Next() {
441 var comment comments.Comment
442 var langs pq.StringArray
443 var hotRank sql.NullFloat64
444 var authorHandle string
445
446 err := rows.Scan(
447 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
448 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
449 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
450 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
451 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
452 &hotRank, &authorHandle,
453 )
454 if err != nil {
455 return nil, nil, fmt.Errorf("failed to scan comment: %w", err)
456 }
457
458 comment.Langs = langs
459 comment.CommenterHandle = authorHandle
460
461 // Store hot_rank for cursor building
462 hotRankValue := 0.0
463 if hotRank.Valid {
464 hotRankValue = hotRank.Float64
465 }
466 hotRanks = append(hotRanks, hotRankValue)
467
468 result = append(result, &comment)
469 }
470
471 if err = rows.Err(); err != nil {
472 return nil, nil, fmt.Errorf("error iterating comments: %w", err)
473 }
474
475 // Handle pagination cursor
476 var nextCursor *string
477 if len(result) > limit && limit > 0 {
478 result = result[:limit]
479 hotRanks = hotRanks[:limit]
480 lastComment := result[len(result)-1]
481 lastHotRank := hotRanks[len(hotRanks)-1]
482 cursorStr := r.buildCommentCursor(lastComment, sort, lastHotRank)
483 nextCursor = &cursorStr
484 }
485
486 return result, nextCursor, nil
487}
488
489// buildCommentSortClause returns the ORDER BY SQL and optional time filter
490func (r *postgresCommentRepo) buildCommentSortClause(sort, timeframe string) (string, string) {
491 var orderBy string
492 switch sort {
493 case "hot":
494 // Hot rank DESC, then score DESC as tiebreaker, then created_at DESC, then uri DESC
495 orderBy = `hot_rank DESC, c.score DESC, c.created_at DESC, c.uri DESC`
496 case "top":
497 // Score DESC, then created_at DESC, then uri DESC
498 orderBy = `c.score DESC, c.created_at DESC, c.uri DESC`
499 case "new":
500 // Created at DESC, then uri DESC
501 orderBy = `c.created_at DESC, c.uri DESC`
502 default:
503 // Default to hot
504 orderBy = `hot_rank DESC, c.score DESC, c.created_at DESC, c.uri DESC`
505 }
506
507 // Add time filter for "top" sort
508 var timeFilter string
509 if sort == "top" {
510 timeFilter = r.buildCommentTimeFilter(timeframe)
511 }
512
513 return orderBy, timeFilter
514}
515
516// buildCommentTimeFilter returns SQL filter for timeframe
517func (r *postgresCommentRepo) buildCommentTimeFilter(timeframe string) string {
518 if timeframe == "" || timeframe == "all" {
519 return ""
520 }
521
522 var interval string
523 switch timeframe {
524 case "hour":
525 interval = "1 hour"
526 case "day":
527 interval = "1 day"
528 case "week":
529 interval = "7 days"
530 case "month":
531 interval = "30 days"
532 case "year":
533 interval = "1 year"
534 default:
535 return ""
536 }
537
538 return fmt.Sprintf("AND c.created_at >= NOW() - INTERVAL '%s'", interval)
539}
540
541// parseCommentCursor decodes pagination cursor for comments
542func (r *postgresCommentRepo) parseCommentCursor(cursor *string, sort string) (string, []interface{}, error) {
543 if cursor == nil || *cursor == "" {
544 return "", nil, nil
545 }
546
547 // Decode base64 cursor
548 decoded, err := base64.URLEncoding.DecodeString(*cursor)
549 if err != nil {
550 return "", nil, fmt.Errorf("invalid cursor encoding")
551 }
552
553 // Parse cursor based on sort type using | delimiter
554 // Format: hotRank|score|createdAt|uri (for hot)
555 // score|createdAt|uri (for top)
556 // createdAt|uri (for new)
557 parts := strings.Split(string(decoded), "|")
558
559 switch sort {
560 case "new":
561 // Cursor format: createdAt|uri
562 if len(parts) != 2 {
563 return "", nil, fmt.Errorf("invalid cursor format for new sort")
564 }
565
566 createdAt := parts[0]
567 uri := parts[1]
568
569 // Validate AT-URI format
570 if !strings.HasPrefix(uri, "at://") {
571 return "", nil, fmt.Errorf("invalid cursor URI")
572 }
573
574 filter := `AND (c.created_at < $3 OR (c.created_at = $3 AND c.uri < $4))`
575 return filter, []interface{}{createdAt, uri}, nil
576
577 case "top":
578 // Cursor format: score|createdAt|uri
579 if len(parts) != 3 {
580 return "", nil, fmt.Errorf("invalid cursor format for top sort")
581 }
582
583 scoreStr := parts[0]
584 createdAt := parts[1]
585 uri := parts[2]
586
587 // Parse score as integer
588 score := 0
589 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil {
590 return "", nil, fmt.Errorf("invalid cursor score")
591 }
592
593 // Validate AT-URI format
594 if !strings.HasPrefix(uri, "at://") {
595 return "", nil, fmt.Errorf("invalid cursor URI")
596 }
597
598 filter := `AND (c.score < $3 OR (c.score = $3 AND c.created_at < $4) OR (c.score = $3 AND c.created_at = $4 AND c.uri < $5))`
599 return filter, []interface{}{score, createdAt, uri}, nil
600
601 case "hot":
602 // Cursor format: hotRank|score|createdAt|uri
603 if len(parts) != 4 {
604 return "", nil, fmt.Errorf("invalid cursor format for hot sort")
605 }
606
607 hotRankStr := parts[0]
608 scoreStr := parts[1]
609 createdAt := parts[2]
610 uri := parts[3]
611
612 // Parse hot_rank as float
613 hotRank := 0.0
614 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil {
615 return "", nil, fmt.Errorf("invalid cursor hot rank")
616 }
617
618 // Parse score as integer
619 score := 0
620 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil {
621 return "", nil, fmt.Errorf("invalid cursor score")
622 }
623
624 // Validate AT-URI format
625 if !strings.HasPrefix(uri, "at://") {
626 return "", nil, fmt.Errorf("invalid cursor URI")
627 }
628
629 // Use computed hot_rank expression in comparison
630 hotRankExpr := `log(greatest(2, c.score + 2)) / power(((EXTRACT(EPOCH FROM (NOW() - c.created_at)) / 3600) + 2), 1.8)`
631 filter := fmt.Sprintf(`AND ((%s < $3 OR (%s = $3 AND c.score < $4) OR (%s = $3 AND c.score = $4 AND c.created_at < $5) OR (%s = $3 AND c.score = $4 AND c.created_at = $5 AND c.uri < $6)) AND c.uri != $7)`,
632 hotRankExpr, hotRankExpr, hotRankExpr, hotRankExpr)
633 return filter, []interface{}{hotRank, score, createdAt, uri, uri}, nil
634
635 default:
636 return "", nil, nil
637 }
638}
639
640// buildCommentCursor creates pagination cursor from last comment
641func (r *postgresCommentRepo) buildCommentCursor(comment *comments.Comment, sort string, hotRank float64) string {
642 var cursorStr string
643 const delimiter = "|"
644
645 switch sort {
646 case "new":
647 // Format: createdAt|uri
648 cursorStr = fmt.Sprintf("%s%s%s",
649 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"),
650 delimiter,
651 comment.URI)
652
653 case "top":
654 // Format: score|createdAt|uri
655 cursorStr = fmt.Sprintf("%d%s%s%s%s",
656 comment.Score,
657 delimiter,
658 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"),
659 delimiter,
660 comment.URI)
661
662 case "hot":
663 // Format: hotRank|score|createdAt|uri
664 cursorStr = fmt.Sprintf("%f%s%d%s%s%s%s",
665 hotRank,
666 delimiter,
667 comment.Score,
668 delimiter,
669 comment.CreatedAt.Format("2006-01-02T15:04:05.999999999Z07:00"),
670 delimiter,
671 comment.URI)
672
673 default:
674 cursorStr = comment.URI
675 }
676
677 return base64.URLEncoding.EncodeToString([]byte(cursorStr))
678}
679
680// GetByURIsBatch retrieves multiple comments by their AT-URIs in a single query
681// Returns map[uri]*Comment for efficient lookups without N+1 queries
682func (r *postgresCommentRepo) GetByURIsBatch(ctx context.Context, uris []string) (map[string]*comments.Comment, error) {
683 if len(uris) == 0 {
684 return make(map[string]*comments.Comment), nil
685 }
686
687 query := `
688 SELECT
689 c.id, c.uri, c.cid, c.rkey, c.commenter_did,
690 c.root_uri, c.root_cid, c.parent_uri, c.parent_cid,
691 c.content, c.content_facets, c.embed, c.content_labels, c.langs,
692 c.created_at, c.indexed_at, c.deleted_at,
693 c.upvote_count, c.downvote_count, c.score, c.reply_count,
694 u.handle as author_handle
695 FROM comments c
696 INNER JOIN users u ON c.commenter_did = u.did
697 WHERE c.uri = ANY($1) AND c.deleted_at IS NULL
698 `
699
700 rows, err := r.db.QueryContext(ctx, query, pq.Array(uris))
701 if err != nil {
702 return nil, fmt.Errorf("failed to batch get comments by URIs: %w", err)
703 }
704 defer func() {
705 if err := rows.Close(); err != nil {
706 log.Printf("Failed to close rows: %v", err)
707 }
708 }()
709
710 result := make(map[string]*comments.Comment)
711 for rows.Next() {
712 var comment comments.Comment
713 var langs pq.StringArray
714 var authorHandle string
715
716 err := rows.Scan(
717 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
718 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
719 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
720 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
721 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
722 &authorHandle,
723 )
724 if err != nil {
725 return nil, fmt.Errorf("failed to scan comment: %w", err)
726 }
727
728 comment.Langs = langs
729 result[comment.URI] = &comment
730 }
731
732 if err = rows.Err(); err != nil {
733 return nil, fmt.Errorf("error iterating comments: %w", err)
734 }
735
736 return result, nil
737}
738
739// GetVoteStateForComments retrieves the viewer's votes on a batch of comments
740// Returns map[commentURI]*Vote for efficient lookups
741// Note: This implementation is prepared for when the votes table indexing is implemented
742// Currently returns an empty map as votes may not be fully indexed yet
743func (r *postgresCommentRepo) GetVoteStateForComments(ctx context.Context, viewerDID string, commentURIs []string) (map[string]interface{}, error) {
744 if len(commentURIs) == 0 || viewerDID == "" {
745 return make(map[string]interface{}), nil
746 }
747
748 // Query votes table for viewer's votes on these comments
749 // Note: This assumes votes table exists and is being indexed
750 // If votes table doesn't exist yet, this query will fail gracefully
751 query := `
752 SELECT subject_uri, direction, uri, cid, created_at
753 FROM votes
754 WHERE voter_did = $1 AND subject_uri = ANY($2) AND deleted_at IS NULL
755 `
756
757 rows, err := r.db.QueryContext(ctx, query, viewerDID, pq.Array(commentURIs))
758 if err != nil {
759 // If votes table doesn't exist yet, return empty map instead of error
760 // This allows the API to work before votes indexing is fully implemented
761 if strings.Contains(err.Error(), "does not exist") {
762 return make(map[string]interface{}), nil
763 }
764 return nil, fmt.Errorf("failed to get vote state for comments: %w", err)
765 }
766 defer func() {
767 if err := rows.Close(); err != nil {
768 log.Printf("Failed to close rows: %v", err)
769 }
770 }()
771
772 // Build result map with vote information
773 result := make(map[string]interface{})
774 for rows.Next() {
775 var subjectURI, direction, uri, cid string
776 var createdAt sql.NullTime
777
778 err := rows.Scan(&subjectURI, &direction, &uri, &cid, &createdAt)
779 if err != nil {
780 return nil, fmt.Errorf("failed to scan vote: %w", err)
781 }
782
783 // Store vote info as a simple map (can be enhanced later with proper Vote struct)
784 result[subjectURI] = map[string]interface{}{
785 "direction": direction,
786 "uri": uri,
787 "cid": cid,
788 "createdAt": createdAt.Time,
789 }
790 }
791
792 if err = rows.Err(); err != nil {
793 return nil, fmt.Errorf("error iterating votes: %w", err)
794 }
795
796 return result, nil
797}