A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "crypto/hmac"
5 "crypto/sha256"
6 "database/sql"
7 "encoding/base64"
8 "encoding/hex"
9 "encoding/json"
10 "fmt"
11 "strconv"
12 "strings"
13 "time"
14
15 "Coves/internal/core/posts"
16)
17
18// feedRepoBase contains shared logic for timeline and discover feed repositories
19// This eliminates ~85% code duplication and ensures bug fixes apply to both feeds
20//
21// DATABASE INDEXES REQUIRED:
22// The feed queries rely on these indexes (created in migration 011_create_posts_table.sql):
23//
24// 1. idx_posts_community_created ON posts(community_did, created_at DESC) WHERE deleted_at IS NULL
25// - Used by: Both timeline and discover for "new" sort
26// - Covers: Community filtering + chronological ordering + soft delete filter
27//
28// 2. idx_posts_community_score ON posts(community_did, score DESC, created_at DESC) WHERE deleted_at IS NULL
29// - Used by: Both timeline and discover for "top" sort
30// - Covers: Community filtering + score ordering + tie-breaking + soft delete filter
31//
32// 3. idx_subscriptions_user_community ON community_subscriptions(user_did, community_did)
33// - Used by: Timeline feed (JOIN with subscriptions)
34// - Covers: User subscription lookup
35//
36// 4. Hot sort uses computed expression: (score / POWER(age_hours + 2, 1.5))
37// - Cannot be indexed directly (computed at query time)
38// - Uses idx_posts_community_created for base ordering
39// - Performance: ~10-20ms for timeline, ~8-15ms for discover (acceptable for alpha)
40//
41// PERFORMANCE NOTES:
42// - All queries use single execution (no N+1)
43// - JOINs are minimal (3 for timeline, 2 for discover)
44// - Partial indexes (WHERE deleted_at IS NULL) eliminate soft-deleted posts efficiently
45// - Cursor pagination is stable (no offset drift)
46// - Limit+1 pattern checks for next page without extra query
47type feedRepoBase struct {
48 db *sql.DB
49 hotRankExpression string
50 sortClauses map[string]string
51 cursorSecret string // HMAC secret for cursor integrity protection
52}
53
54// newFeedRepoBase creates a new base repository with shared feed logic
55func newFeedRepoBase(db *sql.DB, hotRankExpr string, sortClauses map[string]string, cursorSecret string) *feedRepoBase {
56 return &feedRepoBase{
57 db: db,
58 hotRankExpression: hotRankExpr,
59 sortClauses: sortClauses,
60 cursorSecret: cursorSecret,
61 }
62}
63
64// buildSortClause returns the ORDER BY SQL and optional time filter
65// Uses whitelist map to prevent SQL injection via dynamic ORDER BY
66func (r *feedRepoBase) buildSortClause(sort, timeframe string) (string, string) {
67 // Use whitelist map for ORDER BY clause (defense-in-depth against SQL injection)
68 orderBy := r.sortClauses[sort]
69 if orderBy == "" {
70 orderBy = r.sortClauses["hot"] // safe default
71 }
72
73 // Add time filter for "top" sort
74 var timeFilter string
75 if sort == "top" {
76 timeFilter = r.buildTimeFilter(timeframe)
77 }
78
79 return orderBy, timeFilter
80}
81
82// buildTimeFilter returns SQL filter for timeframe
83func (r *feedRepoBase) buildTimeFilter(timeframe string) string {
84 if timeframe == "" || timeframe == "all" {
85 return ""
86 }
87
88 var interval string
89 switch timeframe {
90 case "hour":
91 interval = "1 hour"
92 case "day":
93 interval = "1 day"
94 case "week":
95 interval = "1 week"
96 case "month":
97 interval = "1 month"
98 case "year":
99 interval = "1 year"
100 default:
101 return ""
102 }
103
104 return fmt.Sprintf("AND p.created_at > NOW() - INTERVAL '%s'", interval)
105}
106
107// parseCursor decodes and validates pagination cursor
108// paramOffset is the starting parameter number for cursor values ($2 for discover, $3 for timeline)
109func (r *feedRepoBase) parseCursor(cursor *string, sort string, paramOffset int) (string, []interface{}, error) {
110 if cursor == nil || *cursor == "" {
111 return "", nil, nil
112 }
113
114 // Decode base64 cursor
115 decoded, err := base64.StdEncoding.DecodeString(*cursor)
116 if err != nil {
117 return "", nil, fmt.Errorf("invalid cursor encoding")
118 }
119
120 // Parse cursor: payload::signature
121 parts := strings.Split(string(decoded), "::")
122 if len(parts) < 2 {
123 return "", nil, fmt.Errorf("invalid cursor format")
124 }
125
126 // Verify HMAC signature
127 signatureHex := parts[len(parts)-1]
128 payload := strings.Join(parts[:len(parts)-1], "::")
129
130 expectedMAC := hmac.New(sha256.New, []byte(r.cursorSecret))
131 expectedMAC.Write([]byte(payload))
132 expectedSignature := hex.EncodeToString(expectedMAC.Sum(nil))
133
134 if !hmac.Equal([]byte(signatureHex), []byte(expectedSignature)) {
135 return "", nil, fmt.Errorf("invalid cursor signature")
136 }
137
138 // Parse payload based on sort type
139 payloadParts := strings.Split(payload, "::")
140
141 switch sort {
142 case "new":
143 // Cursor format: timestamp::uri
144 if len(payloadParts) != 2 {
145 return "", nil, fmt.Errorf("invalid cursor format")
146 }
147
148 createdAt := payloadParts[0]
149 uri := payloadParts[1]
150
151 // Validate timestamp format
152 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil {
153 return "", nil, fmt.Errorf("invalid cursor timestamp")
154 }
155
156 // Validate URI format (must be AT-URI)
157 if !strings.HasPrefix(uri, "at://") {
158 return "", nil, fmt.Errorf("invalid cursor URI")
159 }
160
161 filter := fmt.Sprintf(`AND (p.created_at < $%d OR (p.created_at = $%d AND p.uri < $%d))`,
162 paramOffset, paramOffset, paramOffset+1)
163 return filter, []interface{}{createdAt, uri}, nil
164
165 case "top":
166 // Cursor format: score::timestamp::uri
167 if len(payloadParts) != 3 {
168 return "", nil, fmt.Errorf("invalid cursor format for %s sort", sort)
169 }
170
171 scoreStr := payloadParts[0]
172 createdAt := payloadParts[1]
173 uri := payloadParts[2]
174
175 // Validate score is numeric
176 score := 0
177 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil {
178 return "", nil, fmt.Errorf("invalid cursor score")
179 }
180
181 // Validate timestamp format
182 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil {
183 return "", nil, fmt.Errorf("invalid cursor timestamp")
184 }
185
186 // Validate URI format (must be AT-URI)
187 if !strings.HasPrefix(uri, "at://") {
188 return "", nil, fmt.Errorf("invalid cursor URI")
189 }
190
191 filter := fmt.Sprintf(`AND (p.score < $%d OR (p.score = $%d AND p.created_at < $%d) OR (p.score = $%d AND p.created_at = $%d AND p.uri < $%d))`,
192 paramOffset, paramOffset, paramOffset+1, paramOffset, paramOffset+1, paramOffset+2)
193 return filter, []interface{}{score, createdAt, uri}, nil
194
195 case "hot":
196 // Cursor format: hot_rank::timestamp::uri
197 // CRITICAL: Must use computed hot_rank, not raw score, to prevent pagination bugs
198 if len(payloadParts) != 3 {
199 return "", nil, fmt.Errorf("invalid cursor format for hot sort")
200 }
201
202 hotRankStr := payloadParts[0]
203 createdAt := payloadParts[1]
204 uri := payloadParts[2]
205
206 // Validate hot_rank is numeric (float)
207 hotRank := 0.0
208 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil {
209 return "", nil, fmt.Errorf("invalid cursor hot rank")
210 }
211
212 // Validate timestamp format
213 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil {
214 return "", nil, fmt.Errorf("invalid cursor timestamp")
215 }
216
217 // Validate URI format (must be AT-URI)
218 if !strings.HasPrefix(uri, "at://") {
219 return "", nil, fmt.Errorf("invalid cursor URI")
220 }
221
222 // CRITICAL: Compare against the computed hot_rank expression, not p.score
223 filter := fmt.Sprintf(`AND ((%s < $%d OR (%s = $%d AND p.created_at < $%d) OR (%s = $%d AND p.created_at = $%d AND p.uri < $%d)) AND p.uri != $%d)`,
224 r.hotRankExpression, paramOffset,
225 r.hotRankExpression, paramOffset, paramOffset+1,
226 r.hotRankExpression, paramOffset, paramOffset+1, paramOffset+2,
227 paramOffset+3)
228 return filter, []interface{}{hotRank, createdAt, uri, uri}, nil
229
230 default:
231 return "", nil, nil
232 }
233}
234
235// buildCursor creates HMAC-signed pagination cursor from last post
236// SECURITY: Cursor is signed with HMAC-SHA256 to prevent manipulation
237func (r *feedRepoBase) buildCursor(post *posts.PostView, sort string, hotRank float64) string {
238 var payload string
239 // Use :: as delimiter following Bluesky convention
240 const delimiter = "::"
241
242 switch sort {
243 case "new":
244 // Format: timestamp::uri
245 payload = fmt.Sprintf("%s%s%s", post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI)
246
247 case "top":
248 // Format: score::timestamp::uri
249 score := 0
250 if post.Stats != nil {
251 score = post.Stats.Score
252 }
253 payload = fmt.Sprintf("%d%s%s%s%s", score, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI)
254
255 case "hot":
256 // Format: hot_rank::timestamp::uri
257 // CRITICAL: Use computed hot_rank with full precision
258 hotRankStr := strconv.FormatFloat(hotRank, 'g', -1, 64)
259 payload = fmt.Sprintf("%s%s%s%s%s", hotRankStr, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI)
260
261 default:
262 payload = post.URI
263 }
264
265 // Sign the payload with HMAC-SHA256
266 mac := hmac.New(sha256.New, []byte(r.cursorSecret))
267 mac.Write([]byte(payload))
268 signature := hex.EncodeToString(mac.Sum(nil))
269
270 // Append signature to payload
271 signed := payload + delimiter + signature
272
273 return base64.StdEncoding.EncodeToString([]byte(signed))
274}
275
276// scanFeedPost scans a database row into a PostView
277// This is the shared scanning logic used by both timeline and discover feeds
278func (r *feedRepoBase) scanFeedPost(rows *sql.Rows) (*posts.PostView, float64, error) {
279 var (
280 postView posts.PostView
281 authorView posts.AuthorView
282 communityRef posts.CommunityRef
283 title, content sql.NullString
284 facets, embed sql.NullString
285 labelsJSON sql.NullString
286 editedAt sql.NullTime
287 communityHandle sql.NullString
288 communityAvatar sql.NullString
289 communityPDSURL sql.NullString
290 hotRank sql.NullFloat64
291 )
292
293 err := rows.Scan(
294 &postView.URI, &postView.CID, &postView.RKey,
295 &authorView.DID, &authorView.Handle,
296 &communityRef.DID, &communityHandle, &communityRef.Name, &communityAvatar, &communityPDSURL,
297 &title, &content, &facets, &embed, &labelsJSON,
298 &postView.CreatedAt, &editedAt, &postView.IndexedAt,
299 &postView.UpvoteCount, &postView.DownvoteCount, &postView.Score, &postView.CommentCount,
300 &hotRank,
301 )
302 if err != nil {
303 return nil, 0, err
304 }
305
306 // Build author view
307 postView.Author = &authorView
308
309 // Build community ref
310 if communityHandle.Valid {
311 communityRef.Handle = communityHandle.String
312 }
313 communityRef.Avatar = nullStringPtr(communityAvatar)
314 if communityPDSURL.Valid {
315 communityRef.PDSURL = communityPDSURL.String
316 }
317 postView.Community = &communityRef
318
319 // Set optional fields
320 postView.Title = nullStringPtr(title)
321 postView.Text = nullStringPtr(content)
322
323 // Parse facets JSON
324 if facets.Valid {
325 var facetArray []interface{}
326 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil {
327 postView.TextFacets = facetArray
328 }
329 }
330
331 // Parse embed JSON
332 if embed.Valid {
333 var embedData interface{}
334 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil {
335 postView.Embed = embedData
336 }
337 }
338
339 // Build stats
340 postView.Stats = &posts.PostStats{
341 Upvotes: postView.UpvoteCount,
342 Downvotes: postView.DownvoteCount,
343 Score: postView.Score,
344 CommentCount: postView.CommentCount,
345 }
346
347 // Build the record (required by lexicon)
348 record := map[string]interface{}{
349 "$type": "social.coves.community.post",
350 "community": communityRef.DID,
351 "author": authorView.DID,
352 "createdAt": postView.CreatedAt.Format(time.RFC3339),
353 }
354
355 // Add optional fields to record if present
356 if title.Valid {
357 record["title"] = title.String
358 }
359 if content.Valid {
360 record["content"] = content.String
361 }
362 if facets.Valid {
363 var facetArray []interface{}
364 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil {
365 record["facets"] = facetArray
366 }
367 }
368 if embed.Valid {
369 var embedData interface{}
370 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil {
371 record["embed"] = embedData
372 }
373 }
374 if labelsJSON.Valid {
375 // Labels are stored as JSONB containing full com.atproto.label.defs#selfLabels structure
376 // Deserialize and include in record
377 var selfLabels posts.SelfLabels
378 if err := json.Unmarshal([]byte(labelsJSON.String), &selfLabels); err == nil {
379 record["labels"] = selfLabels
380 }
381 }
382
383 postView.Record = record
384
385 // Return the computed hot_rank (0.0 if NULL for non-hot sorts)
386 hotRankValue := 0.0
387 if hotRank.Valid {
388 hotRankValue = hotRank.Float64
389 }
390
391 return &postView, hotRankValue, nil
392}
393
394// nullStringPtr converts sql.NullString to *string
395// Helper function used by feed scanning logic across all feed types
396func nullStringPtr(ns sql.NullString) *string {
397 if !ns.Valid {
398 return nil
399 }
400 return &ns.String
401}