A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "crypto/hmac" 5 "crypto/sha256" 6 "database/sql" 7 "encoding/base64" 8 "encoding/hex" 9 "encoding/json" 10 "fmt" 11 "strconv" 12 "strings" 13 "time" 14 15 "Coves/internal/core/posts" 16) 17 18// feedRepoBase contains shared logic for timeline and discover feed repositories 19// This eliminates ~85% code duplication and ensures bug fixes apply to both feeds 20// 21// DATABASE INDEXES REQUIRED: 22// The feed queries rely on these indexes (created in migration 011_create_posts_table.sql): 23// 24// 1. idx_posts_community_created ON posts(community_did, created_at DESC) WHERE deleted_at IS NULL 25// - Used by: Both timeline and discover for "new" sort 26// - Covers: Community filtering + chronological ordering + soft delete filter 27// 28// 2. idx_posts_community_score ON posts(community_did, score DESC, created_at DESC) WHERE deleted_at IS NULL 29// - Used by: Both timeline and discover for "top" sort 30// - Covers: Community filtering + score ordering + tie-breaking + soft delete filter 31// 32// 3. idx_subscriptions_user_community ON community_subscriptions(user_did, community_did) 33// - Used by: Timeline feed (JOIN with subscriptions) 34// - Covers: User subscription lookup 35// 36// 4. Hot sort uses computed expression: (score / POWER(age_hours + 2, 1.5)) 37// - Cannot be indexed directly (computed at query time) 38// - Uses idx_posts_community_created for base ordering 39// - Performance: ~10-20ms for timeline, ~8-15ms for discover (acceptable for alpha) 40// 41// PERFORMANCE NOTES: 42// - All queries use single execution (no N+1) 43// - JOINs are minimal (3 for timeline, 2 for discover) 44// - Partial indexes (WHERE deleted_at IS NULL) eliminate soft-deleted posts efficiently 45// - Cursor pagination is stable (no offset drift) 46// - Limit+1 pattern checks for next page without extra query 47type feedRepoBase struct { 48 db *sql.DB 49 hotRankExpression string 50 sortClauses map[string]string 51 cursorSecret string // HMAC secret for cursor integrity protection 52} 53 54// newFeedRepoBase creates a new base repository with shared feed logic 55func newFeedRepoBase(db *sql.DB, hotRankExpr string, sortClauses map[string]string, cursorSecret string) *feedRepoBase { 56 return &feedRepoBase{ 57 db: db, 58 hotRankExpression: hotRankExpr, 59 sortClauses: sortClauses, 60 cursorSecret: cursorSecret, 61 } 62} 63 64// buildSortClause returns the ORDER BY SQL and optional time filter 65// Uses whitelist map to prevent SQL injection via dynamic ORDER BY 66func (r *feedRepoBase) buildSortClause(sort, timeframe string) (string, string) { 67 // Use whitelist map for ORDER BY clause (defense-in-depth against SQL injection) 68 orderBy := r.sortClauses[sort] 69 if orderBy == "" { 70 orderBy = r.sortClauses["hot"] // safe default 71 } 72 73 // Add time filter for "top" sort 74 var timeFilter string 75 if sort == "top" { 76 timeFilter = r.buildTimeFilter(timeframe) 77 } 78 79 return orderBy, timeFilter 80} 81 82// buildTimeFilter returns SQL filter for timeframe 83func (r *feedRepoBase) buildTimeFilter(timeframe string) string { 84 if timeframe == "" || timeframe == "all" { 85 return "" 86 } 87 88 var interval string 89 switch timeframe { 90 case "hour": 91 interval = "1 hour" 92 case "day": 93 interval = "1 day" 94 case "week": 95 interval = "1 week" 96 case "month": 97 interval = "1 month" 98 case "year": 99 interval = "1 year" 100 default: 101 return "" 102 } 103 104 return fmt.Sprintf("AND p.created_at > NOW() - INTERVAL '%s'", interval) 105} 106 107// parseCursor decodes and validates pagination cursor 108// paramOffset is the starting parameter number for cursor values ($2 for discover, $3 for timeline) 109func (r *feedRepoBase) parseCursor(cursor *string, sort string, paramOffset int) (string, []interface{}, error) { 110 if cursor == nil || *cursor == "" { 111 return "", nil, nil 112 } 113 114 // Decode base64 cursor 115 decoded, err := base64.StdEncoding.DecodeString(*cursor) 116 if err != nil { 117 return "", nil, fmt.Errorf("invalid cursor encoding") 118 } 119 120 // Parse cursor: payload::signature 121 parts := strings.Split(string(decoded), "::") 122 if len(parts) < 2 { 123 return "", nil, fmt.Errorf("invalid cursor format") 124 } 125 126 // Verify HMAC signature 127 signatureHex := parts[len(parts)-1] 128 payload := strings.Join(parts[:len(parts)-1], "::") 129 130 expectedMAC := hmac.New(sha256.New, []byte(r.cursorSecret)) 131 expectedMAC.Write([]byte(payload)) 132 expectedSignature := hex.EncodeToString(expectedMAC.Sum(nil)) 133 134 if !hmac.Equal([]byte(signatureHex), []byte(expectedSignature)) { 135 return "", nil, fmt.Errorf("invalid cursor signature") 136 } 137 138 // Parse payload based on sort type 139 payloadParts := strings.Split(payload, "::") 140 141 switch sort { 142 case "new": 143 // Cursor format: timestamp::uri 144 if len(payloadParts) != 2 { 145 return "", nil, fmt.Errorf("invalid cursor format") 146 } 147 148 createdAt := payloadParts[0] 149 uri := payloadParts[1] 150 151 // Validate timestamp format 152 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 153 return "", nil, fmt.Errorf("invalid cursor timestamp") 154 } 155 156 // Validate URI format (must be AT-URI) 157 if !strings.HasPrefix(uri, "at://") { 158 return "", nil, fmt.Errorf("invalid cursor URI") 159 } 160 161 filter := fmt.Sprintf(`AND (p.created_at < $%d OR (p.created_at = $%d AND p.uri < $%d))`, 162 paramOffset, paramOffset, paramOffset+1) 163 return filter, []interface{}{createdAt, uri}, nil 164 165 case "top": 166 // Cursor format: score::timestamp::uri 167 if len(payloadParts) != 3 { 168 return "", nil, fmt.Errorf("invalid cursor format for %s sort", sort) 169 } 170 171 scoreStr := payloadParts[0] 172 createdAt := payloadParts[1] 173 uri := payloadParts[2] 174 175 // Validate score is numeric 176 score := 0 177 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil { 178 return "", nil, fmt.Errorf("invalid cursor score") 179 } 180 181 // Validate timestamp format 182 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 183 return "", nil, fmt.Errorf("invalid cursor timestamp") 184 } 185 186 // Validate URI format (must be AT-URI) 187 if !strings.HasPrefix(uri, "at://") { 188 return "", nil, fmt.Errorf("invalid cursor URI") 189 } 190 191 filter := fmt.Sprintf(`AND (p.score < $%d OR (p.score = $%d AND p.created_at < $%d) OR (p.score = $%d AND p.created_at = $%d AND p.uri < $%d))`, 192 paramOffset, paramOffset, paramOffset+1, paramOffset, paramOffset+1, paramOffset+2) 193 return filter, []interface{}{score, createdAt, uri}, nil 194 195 case "hot": 196 // Cursor format: hot_rank::timestamp::uri 197 // CRITICAL: Must use computed hot_rank, not raw score, to prevent pagination bugs 198 if len(payloadParts) != 3 { 199 return "", nil, fmt.Errorf("invalid cursor format for hot sort") 200 } 201 202 hotRankStr := payloadParts[0] 203 createdAt := payloadParts[1] 204 uri := payloadParts[2] 205 206 // Validate hot_rank is numeric (float) 207 hotRank := 0.0 208 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil { 209 return "", nil, fmt.Errorf("invalid cursor hot rank") 210 } 211 212 // Validate timestamp format 213 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 214 return "", nil, fmt.Errorf("invalid cursor timestamp") 215 } 216 217 // Validate URI format (must be AT-URI) 218 if !strings.HasPrefix(uri, "at://") { 219 return "", nil, fmt.Errorf("invalid cursor URI") 220 } 221 222 // CRITICAL: Compare against the computed hot_rank expression, not p.score 223 filter := fmt.Sprintf(`AND ((%s < $%d OR (%s = $%d AND p.created_at < $%d) OR (%s = $%d AND p.created_at = $%d AND p.uri < $%d)) AND p.uri != $%d)`, 224 r.hotRankExpression, paramOffset, 225 r.hotRankExpression, paramOffset, paramOffset+1, 226 r.hotRankExpression, paramOffset, paramOffset+1, paramOffset+2, 227 paramOffset+3) 228 return filter, []interface{}{hotRank, createdAt, uri, uri}, nil 229 230 default: 231 return "", nil, nil 232 } 233} 234 235// buildCursor creates HMAC-signed pagination cursor from last post 236// SECURITY: Cursor is signed with HMAC-SHA256 to prevent manipulation 237func (r *feedRepoBase) buildCursor(post *posts.PostView, sort string, hotRank float64) string { 238 var payload string 239 // Use :: as delimiter following Bluesky convention 240 const delimiter = "::" 241 242 switch sort { 243 case "new": 244 // Format: timestamp::uri 245 payload = fmt.Sprintf("%s%s%s", post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 246 247 case "top": 248 // Format: score::timestamp::uri 249 score := 0 250 if post.Stats != nil { 251 score = post.Stats.Score 252 } 253 payload = fmt.Sprintf("%d%s%s%s%s", score, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 254 255 case "hot": 256 // Format: hot_rank::timestamp::uri 257 // CRITICAL: Use computed hot_rank with full precision 258 hotRankStr := strconv.FormatFloat(hotRank, 'g', -1, 64) 259 payload = fmt.Sprintf("%s%s%s%s%s", hotRankStr, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 260 261 default: 262 payload = post.URI 263 } 264 265 // Sign the payload with HMAC-SHA256 266 mac := hmac.New(sha256.New, []byte(r.cursorSecret)) 267 mac.Write([]byte(payload)) 268 signature := hex.EncodeToString(mac.Sum(nil)) 269 270 // Append signature to payload 271 signed := payload + delimiter + signature 272 273 return base64.StdEncoding.EncodeToString([]byte(signed)) 274} 275 276// scanFeedPost scans a database row into a PostView 277// This is the shared scanning logic used by both timeline and discover feeds 278func (r *feedRepoBase) scanFeedPost(rows *sql.Rows) (*posts.PostView, float64, error) { 279 var ( 280 postView posts.PostView 281 authorView posts.AuthorView 282 communityRef posts.CommunityRef 283 title, content sql.NullString 284 facets, embed sql.NullString 285 labelsJSON sql.NullString 286 editedAt sql.NullTime 287 communityAvatar sql.NullString 288 hotRank sql.NullFloat64 289 ) 290 291 err := rows.Scan( 292 &postView.URI, &postView.CID, &postView.RKey, 293 &authorView.DID, &authorView.Handle, 294 &communityRef.DID, &communityRef.Name, &communityAvatar, 295 &title, &content, &facets, &embed, &labelsJSON, 296 &postView.CreatedAt, &editedAt, &postView.IndexedAt, 297 &postView.UpvoteCount, &postView.DownvoteCount, &postView.Score, &postView.CommentCount, 298 &hotRank, 299 ) 300 if err != nil { 301 return nil, 0, err 302 } 303 304 // Build author view 305 postView.Author = &authorView 306 307 // Build community ref 308 communityRef.Avatar = nullStringPtr(communityAvatar) 309 postView.Community = &communityRef 310 311 // Set optional fields 312 postView.Title = nullStringPtr(title) 313 postView.Text = nullStringPtr(content) 314 315 // Parse facets JSON 316 if facets.Valid { 317 var facetArray []interface{} 318 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil { 319 postView.TextFacets = facetArray 320 } 321 } 322 323 // Parse embed JSON 324 if embed.Valid { 325 var embedData interface{} 326 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil { 327 postView.Embed = embedData 328 } 329 } 330 331 // Build stats 332 postView.Stats = &posts.PostStats{ 333 Upvotes: postView.UpvoteCount, 334 Downvotes: postView.DownvoteCount, 335 Score: postView.Score, 336 CommentCount: postView.CommentCount, 337 } 338 339 // Build the record (required by lexicon) 340 record := map[string]interface{}{ 341 "$type": "social.coves.community.post", 342 "community": communityRef.DID, 343 "author": authorView.DID, 344 "createdAt": postView.CreatedAt.Format(time.RFC3339), 345 } 346 347 // Add optional fields to record if present 348 if title.Valid { 349 record["title"] = title.String 350 } 351 if content.Valid { 352 record["content"] = content.String 353 } 354 if facets.Valid { 355 var facetArray []interface{} 356 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil { 357 record["facets"] = facetArray 358 } 359 } 360 if embed.Valid { 361 var embedData interface{} 362 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil { 363 record["embed"] = embedData 364 } 365 } 366 if labelsJSON.Valid { 367 // Labels are stored as JSONB containing full com.atproto.label.defs#selfLabels structure 368 // Deserialize and include in record 369 var selfLabels posts.SelfLabels 370 if err := json.Unmarshal([]byte(labelsJSON.String), &selfLabels); err == nil { 371 record["labels"] = selfLabels 372 } 373 } 374 375 postView.Record = record 376 377 // Return the computed hot_rank (0.0 if NULL for non-hot sorts) 378 hotRankValue := 0.0 379 if hotRank.Valid { 380 hotRankValue = hotRank.Float64 381 } 382 383 return &postView, hotRankValue, nil 384}