A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/timeline"
5 "context"
6 "database/sql"
7 "fmt"
8)
9
10type postgresTimelineRepo struct {
11 *feedRepoBase
12}
13
14// sortClauses maps sort types to safe SQL ORDER BY clauses
15// This whitelist prevents SQL injection via dynamic ORDER BY construction
16var timelineSortClauses = map[string]string{
17 "hot": `(p.score / POWER(EXTRACT(EPOCH FROM (NOW() - p.created_at))/3600 + 2, 1.5)) DESC, p.created_at DESC, p.uri DESC`,
18 "top": `p.score DESC, p.created_at DESC, p.uri DESC`,
19 "new": `p.created_at DESC, p.uri DESC`,
20}
21
22// hotRankExpression is the SQL expression for computing the hot rank
23// NOTE: Uses NOW() which means hot_rank changes over time - this is expected behavior
24const timelineHotRankExpression = `(p.score / POWER(EXTRACT(EPOCH FROM (NOW() - p.created_at))/3600 + 2, 1.5))`
25
26// NewTimelineRepository creates a new PostgreSQL timeline repository
27func NewTimelineRepository(db *sql.DB, cursorSecret string) timeline.Repository {
28 return &postgresTimelineRepo{
29 feedRepoBase: newFeedRepoBase(db, timelineHotRankExpression, timelineSortClauses, cursorSecret),
30 }
31}
32
33// GetTimeline retrieves posts from all communities the user subscribes to
34// Single query with JOINs for optimal performance
35func (r *postgresTimelineRepo) GetTimeline(ctx context.Context, req timeline.GetTimelineRequest) ([]*timeline.FeedViewPost, *string, error) {
36 // Build ORDER BY clause based on sort type
37 orderBy, timeFilter := r.buildSortClause(req.Sort, req.Timeframe)
38
39 // Build cursor filter for pagination
40 // Timeline uses $3+ for cursor params (after $1=userDID and $2=limit)
41 cursorFilter, cursorValues, err := r.feedRepoBase.parseCursor(req.Cursor, req.Sort, 3)
42 if err != nil {
43 return nil, nil, timeline.ErrInvalidCursor
44 }
45
46 // Build the main query
47 // For hot sort, we need to compute and return the hot_rank for cursor building
48 var selectClause string
49 if req.Sort == "hot" {
50 selectClause = fmt.Sprintf(`
51 SELECT
52 p.uri, p.cid, p.rkey,
53 p.author_did, u.handle as author_handle,
54 p.community_did, c.handle as community_handle, c.name as community_name, c.avatar_cid as community_avatar, c.pds_url as community_pds_url,
55 p.title, p.content, p.content_facets, p.embed, p.content_labels,
56 p.created_at, p.edited_at, p.indexed_at,
57 p.upvote_count, p.downvote_count, p.score, p.comment_count,
58 %s as hot_rank
59 FROM posts p`, timelineHotRankExpression)
60 } else {
61 selectClause = `
62 SELECT
63 p.uri, p.cid, p.rkey,
64 p.author_did, u.handle as author_handle,
65 p.community_did, c.handle as community_handle, c.name as community_name, c.avatar_cid as community_avatar, c.pds_url as community_pds_url,
66 p.title, p.content, p.content_facets, p.embed, p.content_labels,
67 p.created_at, p.edited_at, p.indexed_at,
68 p.upvote_count, p.downvote_count, p.score, p.comment_count,
69 NULL::numeric as hot_rank
70 FROM posts p`
71 }
72
73 // Join with community_subscriptions to get posts from subscribed communities
74 query := fmt.Sprintf(`
75 %s
76 INNER JOIN users u ON p.author_did = u.did
77 INNER JOIN communities c ON p.community_did = c.did
78 INNER JOIN community_subscriptions cs ON p.community_did = cs.community_did
79 WHERE cs.user_did = $1
80 AND p.deleted_at IS NULL
81 %s
82 %s
83 ORDER BY %s
84 LIMIT $2
85 `, selectClause, timeFilter, cursorFilter, orderBy)
86
87 // Prepare query arguments
88 args := []interface{}{req.UserDID, req.Limit + 1} // +1 to check for next page
89 args = append(args, cursorValues...)
90
91 // Execute query
92 rows, err := r.db.QueryContext(ctx, query, args...)
93 if err != nil {
94 return nil, nil, fmt.Errorf("failed to query timeline: %w", err)
95 }
96 defer func() {
97 if err := rows.Close(); err != nil {
98 // Log close errors (non-fatal but worth noting)
99 fmt.Printf("Warning: failed to close rows: %v\n", err)
100 }
101 }()
102
103 // Scan results
104 var feedPosts []*timeline.FeedViewPost
105 var hotRanks []float64 // Store hot ranks for cursor building
106 for rows.Next() {
107 postView, hotRank, err := r.feedRepoBase.scanFeedPost(rows)
108 if err != nil {
109 return nil, nil, fmt.Errorf("failed to scan timeline post: %w", err)
110 }
111 feedPosts = append(feedPosts, &timeline.FeedViewPost{Post: postView})
112 hotRanks = append(hotRanks, hotRank)
113 }
114
115 if err := rows.Err(); err != nil {
116 return nil, nil, fmt.Errorf("error iterating timeline results: %w", err)
117 }
118
119 // Handle pagination cursor
120 var cursor *string
121 if len(feedPosts) > req.Limit && req.Limit > 0 {
122 feedPosts = feedPosts[:req.Limit]
123 hotRanks = hotRanks[:req.Limit]
124 lastPost := feedPosts[len(feedPosts)-1].Post
125 lastHotRank := hotRanks[len(hotRanks)-1]
126 cursorStr := r.feedRepoBase.buildCursor(lastPost, req.Sort, lastHotRank)
127 cursor = &cursorStr
128 }
129
130 return feedPosts, cursor, nil
131}