A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/posts"
5 "crypto/hmac"
6 "crypto/sha256"
7 "database/sql"
8 "encoding/base64"
9 "encoding/hex"
10 "encoding/json"
11 "fmt"
12 "strconv"
13 "strings"
14 "time"
15)
16
17// feedRepoBase contains shared logic for timeline and discover feed repositories
18// This eliminates ~85% code duplication and ensures bug fixes apply to both feeds
19//
20// DATABASE INDEXES REQUIRED:
21// The feed queries rely on these indexes (created in migration 011_create_posts_table.sql):
22//
23// 1. idx_posts_community_created ON posts(community_did, created_at DESC) WHERE deleted_at IS NULL
24// - Used by: Both timeline and discover for "new" sort
25// - Covers: Community filtering + chronological ordering + soft delete filter
26//
27// 2. idx_posts_community_score ON posts(community_did, score DESC, created_at DESC) WHERE deleted_at IS NULL
28// - Used by: Both timeline and discover for "top" sort
29// - Covers: Community filtering + score ordering + tie-breaking + soft delete filter
30//
31// 3. idx_subscriptions_user_community ON community_subscriptions(user_did, community_did)
32// - Used by: Timeline feed (JOIN with subscriptions)
33// - Covers: User subscription lookup
34//
35// 4. Hot sort uses computed expression: (score / POWER(age_hours + 2, 1.5))
36// - Cannot be indexed directly (computed at query time)
37// - Uses idx_posts_community_created for base ordering
38// - Performance: ~10-20ms for timeline, ~8-15ms for discover (acceptable for alpha)
39//
40// PERFORMANCE NOTES:
41// - All queries use single execution (no N+1)
42// - JOINs are minimal (3 for timeline, 2 for discover)
43// - Partial indexes (WHERE deleted_at IS NULL) eliminate soft-deleted posts efficiently
44// - Cursor pagination is stable (no offset drift)
45// - Limit+1 pattern checks for next page without extra query
46type feedRepoBase struct {
47 db *sql.DB
48 hotRankExpression string
49 sortClauses map[string]string
50 cursorSecret string // HMAC secret for cursor integrity protection
51}
52
53// newFeedRepoBase creates a new base repository with shared feed logic
54func newFeedRepoBase(db *sql.DB, hotRankExpr string, sortClauses map[string]string, cursorSecret string) *feedRepoBase {
55 return &feedRepoBase{
56 db: db,
57 hotRankExpression: hotRankExpr,
58 sortClauses: sortClauses,
59 cursorSecret: cursorSecret,
60 }
61}
62
63// buildSortClause returns the ORDER BY SQL and optional time filter
64// Uses whitelist map to prevent SQL injection via dynamic ORDER BY
65func (r *feedRepoBase) buildSortClause(sort, timeframe string) (string, string) {
66 // Use whitelist map for ORDER BY clause (defense-in-depth against SQL injection)
67 orderBy := r.sortClauses[sort]
68 if orderBy == "" {
69 orderBy = r.sortClauses["hot"] // safe default
70 }
71
72 // Add time filter for "top" sort
73 var timeFilter string
74 if sort == "top" {
75 timeFilter = r.buildTimeFilter(timeframe)
76 }
77
78 return orderBy, timeFilter
79}
80
81// buildTimeFilter returns SQL filter for timeframe
82func (r *feedRepoBase) buildTimeFilter(timeframe string) string {
83 if timeframe == "" || timeframe == "all" {
84 return ""
85 }
86
87 var interval string
88 switch timeframe {
89 case "hour":
90 interval = "1 hour"
91 case "day":
92 interval = "1 day"
93 case "week":
94 interval = "1 week"
95 case "month":
96 interval = "1 month"
97 case "year":
98 interval = "1 year"
99 default:
100 return ""
101 }
102
103 return fmt.Sprintf("AND p.created_at > NOW() - INTERVAL '%s'", interval)
104}
105
106// parseCursor decodes and validates pagination cursor
107// paramOffset is the starting parameter number for cursor values ($2 for discover, $3 for timeline)
108func (r *feedRepoBase) parseCursor(cursor *string, sort string, paramOffset int) (string, []interface{}, error) {
109 if cursor == nil || *cursor == "" {
110 return "", nil, nil
111 }
112
113 // Decode base64 cursor
114 decoded, err := base64.StdEncoding.DecodeString(*cursor)
115 if err != nil {
116 return "", nil, fmt.Errorf("invalid cursor encoding")
117 }
118
119 // Parse cursor: payload::signature
120 parts := strings.Split(string(decoded), "::")
121 if len(parts) < 2 {
122 return "", nil, fmt.Errorf("invalid cursor format")
123 }
124
125 // Verify HMAC signature
126 signatureHex := parts[len(parts)-1]
127 payload := strings.Join(parts[:len(parts)-1], "::")
128
129 expectedMAC := hmac.New(sha256.New, []byte(r.cursorSecret))
130 expectedMAC.Write([]byte(payload))
131 expectedSignature := hex.EncodeToString(expectedMAC.Sum(nil))
132
133 if !hmac.Equal([]byte(signatureHex), []byte(expectedSignature)) {
134 return "", nil, fmt.Errorf("invalid cursor signature")
135 }
136
137 // Parse payload based on sort type
138 payloadParts := strings.Split(payload, "::")
139
140 switch sort {
141 case "new":
142 // Cursor format: timestamp::uri
143 if len(payloadParts) != 2 {
144 return "", nil, fmt.Errorf("invalid cursor format")
145 }
146
147 createdAt := payloadParts[0]
148 uri := payloadParts[1]
149
150 // Validate timestamp format
151 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil {
152 return "", nil, fmt.Errorf("invalid cursor timestamp")
153 }
154
155 // Validate URI format (must be AT-URI)
156 if !strings.HasPrefix(uri, "at://") {
157 return "", nil, fmt.Errorf("invalid cursor URI")
158 }
159
160 filter := fmt.Sprintf(`AND (p.created_at < $%d OR (p.created_at = $%d AND p.uri < $%d))`,
161 paramOffset, paramOffset, paramOffset+1)
162 return filter, []interface{}{createdAt, uri}, nil
163
164 case "top":
165 // Cursor format: score::timestamp::uri
166 if len(payloadParts) != 3 {
167 return "", nil, fmt.Errorf("invalid cursor format for %s sort", sort)
168 }
169
170 scoreStr := payloadParts[0]
171 createdAt := payloadParts[1]
172 uri := payloadParts[2]
173
174 // Validate score is numeric
175 score := 0
176 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil {
177 return "", nil, fmt.Errorf("invalid cursor score")
178 }
179
180 // Validate timestamp format
181 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil {
182 return "", nil, fmt.Errorf("invalid cursor timestamp")
183 }
184
185 // Validate URI format (must be AT-URI)
186 if !strings.HasPrefix(uri, "at://") {
187 return "", nil, fmt.Errorf("invalid cursor URI")
188 }
189
190 filter := fmt.Sprintf(`AND (p.score < $%d OR (p.score = $%d AND p.created_at < $%d) OR (p.score = $%d AND p.created_at = $%d AND p.uri < $%d))`,
191 paramOffset, paramOffset, paramOffset+1, paramOffset, paramOffset+1, paramOffset+2)
192 return filter, []interface{}{score, createdAt, uri}, nil
193
194 case "hot":
195 // Cursor format: hot_rank::timestamp::uri
196 // CRITICAL: Must use computed hot_rank, not raw score, to prevent pagination bugs
197 if len(payloadParts) != 3 {
198 return "", nil, fmt.Errorf("invalid cursor format for hot sort")
199 }
200
201 hotRankStr := payloadParts[0]
202 createdAt := payloadParts[1]
203 uri := payloadParts[2]
204
205 // Validate hot_rank is numeric (float)
206 hotRank := 0.0
207 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil {
208 return "", nil, fmt.Errorf("invalid cursor hot rank")
209 }
210
211 // Validate timestamp format
212 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil {
213 return "", nil, fmt.Errorf("invalid cursor timestamp")
214 }
215
216 // Validate URI format (must be AT-URI)
217 if !strings.HasPrefix(uri, "at://") {
218 return "", nil, fmt.Errorf("invalid cursor URI")
219 }
220
221 // CRITICAL: Compare against the computed hot_rank expression, not p.score
222 filter := fmt.Sprintf(`AND ((%s < $%d OR (%s = $%d AND p.created_at < $%d) OR (%s = $%d AND p.created_at = $%d AND p.uri < $%d)) AND p.uri != $%d)`,
223 r.hotRankExpression, paramOffset,
224 r.hotRankExpression, paramOffset, paramOffset+1,
225 r.hotRankExpression, paramOffset, paramOffset+1, paramOffset+2,
226 paramOffset+3)
227 return filter, []interface{}{hotRank, createdAt, uri, uri}, nil
228
229 default:
230 return "", nil, nil
231 }
232}
233
234// buildCursor creates HMAC-signed pagination cursor from last post
235// SECURITY: Cursor is signed with HMAC-SHA256 to prevent manipulation
236func (r *feedRepoBase) buildCursor(post *posts.PostView, sort string, hotRank float64) string {
237 var payload string
238 // Use :: as delimiter following Bluesky convention
239 const delimiter = "::"
240
241 switch sort {
242 case "new":
243 // Format: timestamp::uri
244 payload = fmt.Sprintf("%s%s%s", post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI)
245
246 case "top":
247 // Format: score::timestamp::uri
248 score := 0
249 if post.Stats != nil {
250 score = post.Stats.Score
251 }
252 payload = fmt.Sprintf("%d%s%s%s%s", score, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI)
253
254 case "hot":
255 // Format: hot_rank::timestamp::uri
256 // CRITICAL: Use computed hot_rank with full precision
257 hotRankStr := strconv.FormatFloat(hotRank, 'g', -1, 64)
258 payload = fmt.Sprintf("%s%s%s%s%s", hotRankStr, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI)
259
260 default:
261 payload = post.URI
262 }
263
264 // Sign the payload with HMAC-SHA256
265 mac := hmac.New(sha256.New, []byte(r.cursorSecret))
266 mac.Write([]byte(payload))
267 signature := hex.EncodeToString(mac.Sum(nil))
268
269 // Append signature to payload
270 signed := payload + delimiter + signature
271
272 return base64.StdEncoding.EncodeToString([]byte(signed))
273}
274
275// scanFeedPost scans a database row into a PostView
276// This is the shared scanning logic used by both timeline and discover feeds
277func (r *feedRepoBase) scanFeedPost(rows *sql.Rows) (*posts.PostView, float64, error) {
278 var (
279 postView posts.PostView
280 authorView posts.AuthorView
281 communityRef posts.CommunityRef
282 title, content sql.NullString
283 facets, embed sql.NullString
284 labelsJSON sql.NullString
285 editedAt sql.NullTime
286 communityHandle sql.NullString
287 communityAvatar sql.NullString
288 communityPDSURL sql.NullString
289 hotRank sql.NullFloat64
290 )
291
292 err := rows.Scan(
293 &postView.URI, &postView.CID, &postView.RKey,
294 &authorView.DID, &authorView.Handle,
295 &communityRef.DID, &communityHandle, &communityRef.Name, &communityAvatar, &communityPDSURL,
296 &title, &content, &facets, &embed, &labelsJSON,
297 &postView.CreatedAt, &editedAt, &postView.IndexedAt,
298 &postView.UpvoteCount, &postView.DownvoteCount, &postView.Score, &postView.CommentCount,
299 &hotRank,
300 )
301 if err != nil {
302 return nil, 0, err
303 }
304
305 // Build author view
306 postView.Author = &authorView
307
308 // Build community ref
309 if communityHandle.Valid {
310 communityRef.Handle = communityHandle.String
311 }
312 communityRef.Avatar = nullStringPtr(communityAvatar)
313 if communityPDSURL.Valid {
314 communityRef.PDSURL = communityPDSURL.String
315 }
316 postView.Community = &communityRef
317
318 // Set optional fields
319 postView.Title = nullStringPtr(title)
320 postView.Text = nullStringPtr(content)
321
322 // Parse facets JSON
323 if facets.Valid {
324 var facetArray []interface{}
325 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil {
326 postView.TextFacets = facetArray
327 }
328 }
329
330 // Parse embed JSON
331 if embed.Valid {
332 var embedData interface{}
333 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil {
334 postView.Embed = embedData
335 }
336 }
337
338 // Build stats
339 postView.Stats = &posts.PostStats{
340 Upvotes: postView.UpvoteCount,
341 Downvotes: postView.DownvoteCount,
342 Score: postView.Score,
343 CommentCount: postView.CommentCount,
344 }
345
346 // Build the record (required by lexicon)
347 record := map[string]interface{}{
348 "$type": "social.coves.community.post",
349 "community": communityRef.DID,
350 "author": authorView.DID,
351 "createdAt": postView.CreatedAt.Format(time.RFC3339),
352 }
353
354 // Add optional fields to record if present
355 if title.Valid {
356 record["title"] = title.String
357 }
358 if content.Valid {
359 record["content"] = content.String
360 }
361 if facets.Valid {
362 var facetArray []interface{}
363 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil {
364 record["facets"] = facetArray
365 }
366 }
367 if embed.Valid {
368 var embedData interface{}
369 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil {
370 record["embed"] = embedData
371 }
372 }
373 if labelsJSON.Valid {
374 // Labels are stored as JSONB containing full com.atproto.label.defs#selfLabels structure
375 // Deserialize and include in record
376 var selfLabels posts.SelfLabels
377 if err := json.Unmarshal([]byte(labelsJSON.String), &selfLabels); err == nil {
378 record["labels"] = selfLabels
379 }
380 }
381
382 postView.Record = record
383
384 // Return the computed hot_rank (0.0 if NULL for non-hot sorts)
385 hotRankValue := 0.0
386 if hotRank.Valid {
387 hotRankValue = hotRank.Float64
388 }
389
390 return &postView, hotRankValue, nil
391}
392
393// nullStringPtr converts sql.NullString to *string
394// Helper function used by feed scanning logic across all feed types
395func nullStringPtr(ns sql.NullString) *string {
396 if !ns.Valid {
397 return nil
398 }
399 return &ns.String
400}