A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 8 "Coves/internal/core/timeline" 9) 10 11type postgresTimelineRepo struct { 12 *feedRepoBase 13} 14 15// sortClauses maps sort types to safe SQL ORDER BY clauses 16// This whitelist prevents SQL injection via dynamic ORDER BY construction 17var timelineSortClauses = map[string]string{ 18 "hot": `(p.score / POWER(EXTRACT(EPOCH FROM (NOW() - p.created_at))/3600 + 2, 1.5)) DESC, p.created_at DESC, p.uri DESC`, 19 "top": `p.score DESC, p.created_at DESC, p.uri DESC`, 20 "new": `p.created_at DESC, p.uri DESC`, 21} 22 23// hotRankExpression is the SQL expression for computing the hot rank 24// NOTE: Uses NOW() which means hot_rank changes over time - this is expected behavior 25const timelineHotRankExpression = `(p.score / POWER(EXTRACT(EPOCH FROM (NOW() - p.created_at))/3600 + 2, 1.5))` 26 27// NewTimelineRepository creates a new PostgreSQL timeline repository 28func NewTimelineRepository(db *sql.DB, cursorSecret string) timeline.Repository { 29 return &postgresTimelineRepo{ 30 feedRepoBase: newFeedRepoBase(db, timelineHotRankExpression, timelineSortClauses, cursorSecret), 31 } 32} 33 34// GetTimeline retrieves posts from all communities the user subscribes to 35// Single query with JOINs for optimal performance 36func (r *postgresTimelineRepo) GetTimeline(ctx context.Context, req timeline.GetTimelineRequest) ([]*timeline.FeedViewPost, *string, error) { 37 // Build ORDER BY clause based on sort type 38 orderBy, timeFilter := r.buildSortClause(req.Sort, req.Timeframe) 39 40 // Build cursor filter for pagination 41 // Timeline uses $3+ for cursor params (after $1=userDID and $2=limit) 42 cursorFilter, cursorValues, err := r.feedRepoBase.parseCursor(req.Cursor, req.Sort, 3) 43 if err != nil { 44 return nil, nil, timeline.ErrInvalidCursor 45 } 46 47 // Build the main query 48 // For hot sort, we need to compute and return the hot_rank for cursor building 49 var selectClause string 50 if req.Sort == "hot" { 51 selectClause = fmt.Sprintf(` 52 SELECT 53 p.uri, p.cid, p.rkey, 54 p.author_did, u.handle as author_handle, 55 p.community_did, c.handle as community_handle, c.name as community_name, c.avatar_cid as community_avatar, 56 p.title, p.content, p.content_facets, p.embed, p.content_labels, 57 p.created_at, p.edited_at, p.indexed_at, 58 p.upvote_count, p.downvote_count, p.score, p.comment_count, 59 %s as hot_rank 60 FROM posts p`, timelineHotRankExpression) 61 } else { 62 selectClause = ` 63 SELECT 64 p.uri, p.cid, p.rkey, 65 p.author_did, u.handle as author_handle, 66 p.community_did, c.handle as community_handle, c.name as community_name, c.avatar_cid as community_avatar, 67 p.title, p.content, p.content_facets, p.embed, p.content_labels, 68 p.created_at, p.edited_at, p.indexed_at, 69 p.upvote_count, p.downvote_count, p.score, p.comment_count, 70 NULL::numeric as hot_rank 71 FROM posts p` 72 } 73 74 // Join with community_subscriptions to get posts from subscribed communities 75 query := fmt.Sprintf(` 76 %s 77 INNER JOIN users u ON p.author_did = u.did 78 INNER JOIN communities c ON p.community_did = c.did 79 INNER JOIN community_subscriptions cs ON p.community_did = cs.community_did 80 WHERE cs.user_did = $1 81 AND p.deleted_at IS NULL 82 %s 83 %s 84 ORDER BY %s 85 LIMIT $2 86 `, selectClause, timeFilter, cursorFilter, orderBy) 87 88 // Prepare query arguments 89 args := []interface{}{req.UserDID, req.Limit + 1} // +1 to check for next page 90 args = append(args, cursorValues...) 91 92 // Execute query 93 rows, err := r.db.QueryContext(ctx, query, args...) 94 if err != nil { 95 return nil, nil, fmt.Errorf("failed to query timeline: %w", err) 96 } 97 defer func() { 98 if err := rows.Close(); err != nil { 99 // Log close errors (non-fatal but worth noting) 100 fmt.Printf("Warning: failed to close rows: %v\n", err) 101 } 102 }() 103 104 // Scan results 105 var feedPosts []*timeline.FeedViewPost 106 var hotRanks []float64 // Store hot ranks for cursor building 107 for rows.Next() { 108 postView, hotRank, err := r.feedRepoBase.scanFeedPost(rows) 109 if err != nil { 110 return nil, nil, fmt.Errorf("failed to scan timeline post: %w", err) 111 } 112 feedPosts = append(feedPosts, &timeline.FeedViewPost{Post: postView}) 113 hotRanks = append(hotRanks, hotRank) 114 } 115 116 if err := rows.Err(); err != nil { 117 return nil, nil, fmt.Errorf("error iterating timeline results: %w", err) 118 } 119 120 // Handle pagination cursor 121 var cursor *string 122 if len(feedPosts) > req.Limit && req.Limit > 0 { 123 feedPosts = feedPosts[:req.Limit] 124 hotRanks = hotRanks[:req.Limit] 125 lastPost := feedPosts[len(feedPosts)-1].Post 126 lastHotRank := hotRanks[len(hotRanks)-1] 127 cursorStr := r.feedRepoBase.buildCursor(lastPost, req.Sort, lastHotRank) 128 cursor = &cursorStr 129 } 130 131 return feedPosts, cursor, nil 132}