A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "crypto/hmac"
5 "crypto/sha256"
6 "database/sql"
7 "encoding/base64"
8 "encoding/hex"
9 "encoding/json"
10 "fmt"
11 "strconv"
12 "strings"
13 "time"
14
15 "Coves/internal/core/posts"
16)
17
18// feedRepoBase contains shared logic for timeline and discover feed repositories
19// This eliminates ~85% code duplication and ensures bug fixes apply to both feeds
20//
21// DATABASE INDEXES REQUIRED:
22// The feed queries rely on these indexes (created in migration 011_create_posts_table.sql):
23//
24// 1. idx_posts_community_created ON posts(community_did, created_at DESC) WHERE deleted_at IS NULL
25// - Used by: Both timeline and discover for "new" sort
26// - Covers: Community filtering + chronological ordering + soft delete filter
27//
28// 2. idx_posts_community_score ON posts(community_did, score DESC, created_at DESC) WHERE deleted_at IS NULL
29// - Used by: Both timeline and discover for "top" sort
30// - Covers: Community filtering + score ordering + tie-breaking + soft delete filter
31//
32// 3. idx_subscriptions_user_community ON community_subscriptions(user_did, community_did)
33// - Used by: Timeline feed (JOIN with subscriptions)
34// - Covers: User subscription lookup
35//
36// 4. Hot sort uses computed expression: (score / POWER(age_hours + 2, 1.5))
37// - Cannot be indexed directly (computed at query time)
38// - Uses idx_posts_community_created for base ordering
39// - Performance: ~10-20ms for timeline, ~8-15ms for discover (acceptable for alpha)
40//
41// PERFORMANCE NOTES:
42// - All queries use single execution (no N+1)
43// - JOINs are minimal (3 for timeline, 2 for discover)
44// - Partial indexes (WHERE deleted_at IS NULL) eliminate soft-deleted posts efficiently
45// - Cursor pagination is stable (no offset drift)
46// - Limit+1 pattern checks for next page without extra query
47type feedRepoBase struct {
48 db *sql.DB
49 hotRankExpression string
50 sortClauses map[string]string
51 cursorSecret string // HMAC secret for cursor integrity protection
52}
53
54// newFeedRepoBase creates a new base repository with shared feed logic
55func newFeedRepoBase(db *sql.DB, hotRankExpr string, sortClauses map[string]string, cursorSecret string) *feedRepoBase {
56 return &feedRepoBase{
57 db: db,
58 hotRankExpression: hotRankExpr,
59 sortClauses: sortClauses,
60 cursorSecret: cursorSecret,
61 }
62}
63
64// buildSortClause returns the ORDER BY SQL and optional time filter
65// Uses whitelist map to prevent SQL injection via dynamic ORDER BY
66func (r *feedRepoBase) buildSortClause(sort, timeframe string) (string, string) {
67 // Use whitelist map for ORDER BY clause (defense-in-depth against SQL injection)
68 orderBy := r.sortClauses[sort]
69 if orderBy == "" {
70 orderBy = r.sortClauses["hot"] // safe default
71 }
72
73 // Add time filter for "top" sort
74 var timeFilter string
75 if sort == "top" {
76 timeFilter = r.buildTimeFilter(timeframe)
77 }
78
79 return orderBy, timeFilter
80}
81
82// buildTimeFilter returns SQL filter for timeframe
83func (r *feedRepoBase) buildTimeFilter(timeframe string) string {
84 if timeframe == "" || timeframe == "all" {
85 return ""
86 }
87
88 var interval string
89 switch timeframe {
90 case "hour":
91 interval = "1 hour"
92 case "day":
93 interval = "1 day"
94 case "week":
95 interval = "1 week"
96 case "month":
97 interval = "1 month"
98 case "year":
99 interval = "1 year"
100 default:
101 return ""
102 }
103
104 return fmt.Sprintf("AND p.created_at > NOW() - INTERVAL '%s'", interval)
105}
106
107// parseCursor decodes and validates pagination cursor
108// paramOffset is the starting parameter number for cursor values ($2 for discover, $3 for timeline)
109func (r *feedRepoBase) parseCursor(cursor *string, sort string, paramOffset int) (string, []interface{}, error) {
110 if cursor == nil || *cursor == "" {
111 return "", nil, nil
112 }
113
114 // Decode base64 cursor
115 decoded, err := base64.StdEncoding.DecodeString(*cursor)
116 if err != nil {
117 return "", nil, fmt.Errorf("invalid cursor encoding")
118 }
119
120 // Parse cursor: payload::signature
121 parts := strings.Split(string(decoded), "::")
122 if len(parts) < 2 {
123 return "", nil, fmt.Errorf("invalid cursor format")
124 }
125
126 // Verify HMAC signature
127 signatureHex := parts[len(parts)-1]
128 payload := strings.Join(parts[:len(parts)-1], "::")
129
130 expectedMAC := hmac.New(sha256.New, []byte(r.cursorSecret))
131 expectedMAC.Write([]byte(payload))
132 expectedSignature := hex.EncodeToString(expectedMAC.Sum(nil))
133
134 if !hmac.Equal([]byte(signatureHex), []byte(expectedSignature)) {
135 return "", nil, fmt.Errorf("invalid cursor signature")
136 }
137
138 // Parse payload based on sort type
139 payloadParts := strings.Split(payload, "::")
140
141 switch sort {
142 case "new":
143 // Cursor format: timestamp::uri
144 if len(payloadParts) != 2 {
145 return "", nil, fmt.Errorf("invalid cursor format")
146 }
147
148 createdAt := payloadParts[0]
149 uri := payloadParts[1]
150
151 // Validate timestamp format
152 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil {
153 return "", nil, fmt.Errorf("invalid cursor timestamp")
154 }
155
156 // Validate URI format (must be AT-URI)
157 if !strings.HasPrefix(uri, "at://") {
158 return "", nil, fmt.Errorf("invalid cursor URI")
159 }
160
161 filter := fmt.Sprintf(`AND (p.created_at < $%d OR (p.created_at = $%d AND p.uri < $%d))`,
162 paramOffset, paramOffset, paramOffset+1)
163 return filter, []interface{}{createdAt, uri}, nil
164
165 case "top":
166 // Cursor format: score::timestamp::uri
167 if len(payloadParts) != 3 {
168 return "", nil, fmt.Errorf("invalid cursor format for %s sort", sort)
169 }
170
171 scoreStr := payloadParts[0]
172 createdAt := payloadParts[1]
173 uri := payloadParts[2]
174
175 // Validate score is numeric
176 score := 0
177 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil {
178 return "", nil, fmt.Errorf("invalid cursor score")
179 }
180
181 // Validate timestamp format
182 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil {
183 return "", nil, fmt.Errorf("invalid cursor timestamp")
184 }
185
186 // Validate URI format (must be AT-URI)
187 if !strings.HasPrefix(uri, "at://") {
188 return "", nil, fmt.Errorf("invalid cursor URI")
189 }
190
191 filter := fmt.Sprintf(`AND (p.score < $%d OR (p.score = $%d AND p.created_at < $%d) OR (p.score = $%d AND p.created_at = $%d AND p.uri < $%d))`,
192 paramOffset, paramOffset, paramOffset+1, paramOffset, paramOffset+1, paramOffset+2)
193 return filter, []interface{}{score, createdAt, uri}, nil
194
195 case "hot":
196 // Cursor format: hot_rank::timestamp::uri
197 // CRITICAL: Must use computed hot_rank, not raw score, to prevent pagination bugs
198 if len(payloadParts) != 3 {
199 return "", nil, fmt.Errorf("invalid cursor format for hot sort")
200 }
201
202 hotRankStr := payloadParts[0]
203 createdAt := payloadParts[1]
204 uri := payloadParts[2]
205
206 // Validate hot_rank is numeric (float)
207 hotRank := 0.0
208 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil {
209 return "", nil, fmt.Errorf("invalid cursor hot rank")
210 }
211
212 // Validate timestamp format
213 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil {
214 return "", nil, fmt.Errorf("invalid cursor timestamp")
215 }
216
217 // Validate URI format (must be AT-URI)
218 if !strings.HasPrefix(uri, "at://") {
219 return "", nil, fmt.Errorf("invalid cursor URI")
220 }
221
222 // CRITICAL: Compare against the computed hot_rank expression, not p.score
223 filter := fmt.Sprintf(`AND ((%s < $%d OR (%s = $%d AND p.created_at < $%d) OR (%s = $%d AND p.created_at = $%d AND p.uri < $%d)) AND p.uri != $%d)`,
224 r.hotRankExpression, paramOffset,
225 r.hotRankExpression, paramOffset, paramOffset+1,
226 r.hotRankExpression, paramOffset, paramOffset+1, paramOffset+2,
227 paramOffset+3)
228 return filter, []interface{}{hotRank, createdAt, uri, uri}, nil
229
230 default:
231 return "", nil, nil
232 }
233}
234
235// buildCursor creates HMAC-signed pagination cursor from last post
236// SECURITY: Cursor is signed with HMAC-SHA256 to prevent manipulation
237func (r *feedRepoBase) buildCursor(post *posts.PostView, sort string, hotRank float64) string {
238 var payload string
239 // Use :: as delimiter following Bluesky convention
240 const delimiter = "::"
241
242 switch sort {
243 case "new":
244 // Format: timestamp::uri
245 payload = fmt.Sprintf("%s%s%s", post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI)
246
247 case "top":
248 // Format: score::timestamp::uri
249 score := 0
250 if post.Stats != nil {
251 score = post.Stats.Score
252 }
253 payload = fmt.Sprintf("%d%s%s%s%s", score, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI)
254
255 case "hot":
256 // Format: hot_rank::timestamp::uri
257 // CRITICAL: Use computed hot_rank with full precision
258 hotRankStr := strconv.FormatFloat(hotRank, 'g', -1, 64)
259 payload = fmt.Sprintf("%s%s%s%s%s", hotRankStr, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI)
260
261 default:
262 payload = post.URI
263 }
264
265 // Sign the payload with HMAC-SHA256
266 mac := hmac.New(sha256.New, []byte(r.cursorSecret))
267 mac.Write([]byte(payload))
268 signature := hex.EncodeToString(mac.Sum(nil))
269
270 // Append signature to payload
271 signed := payload + delimiter + signature
272
273 return base64.StdEncoding.EncodeToString([]byte(signed))
274}
275
276// scanFeedPost scans a database row into a PostView
277// This is the shared scanning logic used by both timeline and discover feeds
278func (r *feedRepoBase) scanFeedPost(rows *sql.Rows) (*posts.PostView, float64, error) {
279 var (
280 postView posts.PostView
281 authorView posts.AuthorView
282 communityRef posts.CommunityRef
283 title, content sql.NullString
284 facets, embed sql.NullString
285 labelsJSON sql.NullString
286 editedAt sql.NullTime
287 communityHandle sql.NullString
288 communityAvatar sql.NullString
289 hotRank sql.NullFloat64
290 )
291
292 err := rows.Scan(
293 &postView.URI, &postView.CID, &postView.RKey,
294 &authorView.DID, &authorView.Handle,
295 &communityRef.DID, &communityHandle, &communityRef.Name, &communityAvatar,
296 &title, &content, &facets, &embed, &labelsJSON,
297 &postView.CreatedAt, &editedAt, &postView.IndexedAt,
298 &postView.UpvoteCount, &postView.DownvoteCount, &postView.Score, &postView.CommentCount,
299 &hotRank,
300 )
301 if err != nil {
302 return nil, 0, err
303 }
304
305 // Build author view
306 postView.Author = &authorView
307
308 // Build community ref
309 if communityHandle.Valid {
310 communityRef.Handle = communityHandle.String
311 }
312 communityRef.Avatar = nullStringPtr(communityAvatar)
313 postView.Community = &communityRef
314
315 // Set optional fields
316 postView.Title = nullStringPtr(title)
317 postView.Text = nullStringPtr(content)
318
319 // Parse facets JSON
320 if facets.Valid {
321 var facetArray []interface{}
322 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil {
323 postView.TextFacets = facetArray
324 }
325 }
326
327 // Parse embed JSON
328 if embed.Valid {
329 var embedData interface{}
330 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil {
331 postView.Embed = embedData
332 }
333 }
334
335 // Build stats
336 postView.Stats = &posts.PostStats{
337 Upvotes: postView.UpvoteCount,
338 Downvotes: postView.DownvoteCount,
339 Score: postView.Score,
340 CommentCount: postView.CommentCount,
341 }
342
343 // Build the record (required by lexicon)
344 record := map[string]interface{}{
345 "$type": "social.coves.community.post",
346 "community": communityRef.DID,
347 "author": authorView.DID,
348 "createdAt": postView.CreatedAt.Format(time.RFC3339),
349 }
350
351 // Add optional fields to record if present
352 if title.Valid {
353 record["title"] = title.String
354 }
355 if content.Valid {
356 record["content"] = content.String
357 }
358 if facets.Valid {
359 var facetArray []interface{}
360 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil {
361 record["facets"] = facetArray
362 }
363 }
364 if embed.Valid {
365 var embedData interface{}
366 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil {
367 record["embed"] = embedData
368 }
369 }
370 if labelsJSON.Valid {
371 // Labels are stored as JSONB containing full com.atproto.label.defs#selfLabels structure
372 // Deserialize and include in record
373 var selfLabels posts.SelfLabels
374 if err := json.Unmarshal([]byte(labelsJSON.String), &selfLabels); err == nil {
375 record["labels"] = selfLabels
376 }
377 }
378
379 postView.Record = record
380
381 // Return the computed hot_rank (0.0 if NULL for non-hot sorts)
382 hotRankValue := 0.0
383 if hotRank.Valid {
384 hotRankValue = hotRank.Float64
385 }
386
387 return &postView, hotRankValue, nil
388}
389
390// nullStringPtr converts sql.NullString to *string
391// Helper function used by feed scanning logic across all feed types
392func nullStringPtr(ns sql.NullString) *string {
393 if !ns.Valid {
394 return nil
395 }
396 return &ns.String
397}