A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "Coves/internal/core/timeline" 5 "context" 6 "database/sql" 7 "fmt" 8) 9 10type postgresTimelineRepo struct { 11 *feedRepoBase 12} 13 14// sortClauses maps sort types to safe SQL ORDER BY clauses 15// This whitelist prevents SQL injection via dynamic ORDER BY construction 16var timelineSortClauses = map[string]string{ 17 "hot": `(p.score / POWER(EXTRACT(EPOCH FROM (NOW() - p.created_at))/3600 + 2, 1.5)) DESC, p.created_at DESC, p.uri DESC`, 18 "top": `p.score DESC, p.created_at DESC, p.uri DESC`, 19 "new": `p.created_at DESC, p.uri DESC`, 20} 21 22// hotRankExpression is the SQL expression for computing the hot rank 23// NOTE: Uses NOW() which means hot_rank changes over time - this is expected behavior 24const timelineHotRankExpression = `(p.score / POWER(EXTRACT(EPOCH FROM (NOW() - p.created_at))/3600 + 2, 1.5))` 25 26// NewTimelineRepository creates a new PostgreSQL timeline repository 27func NewTimelineRepository(db *sql.DB, cursorSecret string) timeline.Repository { 28 return &postgresTimelineRepo{ 29 feedRepoBase: newFeedRepoBase(db, timelineHotRankExpression, timelineSortClauses, cursorSecret), 30 } 31} 32 33// GetTimeline retrieves posts from all communities the user subscribes to 34// Single query with JOINs for optimal performance 35func (r *postgresTimelineRepo) GetTimeline(ctx context.Context, req timeline.GetTimelineRequest) ([]*timeline.FeedViewPost, *string, error) { 36 // Build ORDER BY clause based on sort type 37 orderBy, timeFilter := r.buildSortClause(req.Sort, req.Timeframe) 38 39 // Build cursor filter for pagination 40 // Timeline uses $3+ for cursor params (after $1=userDID and $2=limit) 41 cursorFilter, cursorValues, err := r.feedRepoBase.parseCursor(req.Cursor, req.Sort, 3) 42 if err != nil { 43 return nil, nil, timeline.ErrInvalidCursor 44 } 45 46 // Build the main query 47 // For hot sort, we need to compute and return the hot_rank for cursor building 48 var selectClause string 49 if req.Sort == "hot" { 50 selectClause = fmt.Sprintf(` 51 SELECT 52 p.uri, p.cid, p.rkey, 53 p.author_did, u.handle as author_handle, 54 p.community_did, c.handle as community_handle, c.name as community_name, c.avatar_cid as community_avatar, c.pds_url as community_pds_url, 55 p.title, p.content, p.content_facets, p.embed, p.content_labels, 56 p.created_at, p.edited_at, p.indexed_at, 57 p.upvote_count, p.downvote_count, p.score, p.comment_count, 58 %s as hot_rank 59 FROM posts p`, timelineHotRankExpression) 60 } else { 61 selectClause = ` 62 SELECT 63 p.uri, p.cid, p.rkey, 64 p.author_did, u.handle as author_handle, 65 p.community_did, c.handle as community_handle, c.name as community_name, c.avatar_cid as community_avatar, c.pds_url as community_pds_url, 66 p.title, p.content, p.content_facets, p.embed, p.content_labels, 67 p.created_at, p.edited_at, p.indexed_at, 68 p.upvote_count, p.downvote_count, p.score, p.comment_count, 69 NULL::numeric as hot_rank 70 FROM posts p` 71 } 72 73 // Join with community_subscriptions to get posts from subscribed communities 74 query := fmt.Sprintf(` 75 %s 76 INNER JOIN users u ON p.author_did = u.did 77 INNER JOIN communities c ON p.community_did = c.did 78 INNER JOIN community_subscriptions cs ON p.community_did = cs.community_did 79 WHERE cs.user_did = $1 80 AND p.deleted_at IS NULL 81 %s 82 %s 83 ORDER BY %s 84 LIMIT $2 85 `, selectClause, timeFilter, cursorFilter, orderBy) 86 87 // Prepare query arguments 88 args := []interface{}{req.UserDID, req.Limit + 1} // +1 to check for next page 89 args = append(args, cursorValues...) 90 91 // Execute query 92 rows, err := r.db.QueryContext(ctx, query, args...) 93 if err != nil { 94 return nil, nil, fmt.Errorf("failed to query timeline: %w", err) 95 } 96 defer func() { 97 if err := rows.Close(); err != nil { 98 // Log close errors (non-fatal but worth noting) 99 fmt.Printf("Warning: failed to close rows: %v\n", err) 100 } 101 }() 102 103 // Scan results 104 var feedPosts []*timeline.FeedViewPost 105 var hotRanks []float64 // Store hot ranks for cursor building 106 for rows.Next() { 107 postView, hotRank, err := r.feedRepoBase.scanFeedPost(rows) 108 if err != nil { 109 return nil, nil, fmt.Errorf("failed to scan timeline post: %w", err) 110 } 111 feedPosts = append(feedPosts, &timeline.FeedViewPost{Post: postView}) 112 hotRanks = append(hotRanks, hotRank) 113 } 114 115 if err := rows.Err(); err != nil { 116 return nil, nil, fmt.Errorf("error iterating timeline results: %w", err) 117 } 118 119 // Handle pagination cursor 120 var cursor *string 121 if len(feedPosts) > req.Limit && req.Limit > 0 { 122 feedPosts = feedPosts[:req.Limit] 123 hotRanks = hotRanks[:req.Limit] 124 lastPost := feedPosts[len(feedPosts)-1].Post 125 lastHotRank := hotRanks[len(hotRanks)-1] 126 cursorStr := r.feedRepoBase.buildCursor(lastPost, req.Sort, lastHotRank) 127 cursor = &cursorStr 128 } 129 130 return feedPosts, cursor, nil 131}