A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "crypto/hmac" 5 "crypto/sha256" 6 "database/sql" 7 "encoding/base64" 8 "encoding/hex" 9 "encoding/json" 10 "fmt" 11 "strconv" 12 "strings" 13 "time" 14 15 "Coves/internal/core/posts" 16) 17 18// feedRepoBase contains shared logic for timeline and discover feed repositories 19// This eliminates ~85% code duplication and ensures bug fixes apply to both feeds 20// 21// DATABASE INDEXES REQUIRED: 22// The feed queries rely on these indexes (created in migration 011_create_posts_table.sql): 23// 24// 1. idx_posts_community_created ON posts(community_did, created_at DESC) WHERE deleted_at IS NULL 25// - Used by: Both timeline and discover for "new" sort 26// - Covers: Community filtering + chronological ordering + soft delete filter 27// 28// 2. idx_posts_community_score ON posts(community_did, score DESC, created_at DESC) WHERE deleted_at IS NULL 29// - Used by: Both timeline and discover for "top" sort 30// - Covers: Community filtering + score ordering + tie-breaking + soft delete filter 31// 32// 3. idx_subscriptions_user_community ON community_subscriptions(user_did, community_did) 33// - Used by: Timeline feed (JOIN with subscriptions) 34// - Covers: User subscription lookup 35// 36// 4. Hot sort uses computed expression: (score / POWER(age_hours + 2, 1.5)) 37// - Cannot be indexed directly (computed at query time) 38// - Uses idx_posts_community_created for base ordering 39// - Performance: ~10-20ms for timeline, ~8-15ms for discover (acceptable for alpha) 40// 41// PERFORMANCE NOTES: 42// - All queries use single execution (no N+1) 43// - JOINs are minimal (3 for timeline, 2 for discover) 44// - Partial indexes (WHERE deleted_at IS NULL) eliminate soft-deleted posts efficiently 45// - Cursor pagination is stable (no offset drift) 46// - Limit+1 pattern checks for next page without extra query 47type feedRepoBase struct { 48 db *sql.DB 49 hotRankExpression string 50 sortClauses map[string]string 51 cursorSecret string // HMAC secret for cursor integrity protection 52} 53 54// newFeedRepoBase creates a new base repository with shared feed logic 55func newFeedRepoBase(db *sql.DB, hotRankExpr string, sortClauses map[string]string, cursorSecret string) *feedRepoBase { 56 return &feedRepoBase{ 57 db: db, 58 hotRankExpression: hotRankExpr, 59 sortClauses: sortClauses, 60 cursorSecret: cursorSecret, 61 } 62} 63 64// buildSortClause returns the ORDER BY SQL and optional time filter 65// Uses whitelist map to prevent SQL injection via dynamic ORDER BY 66func (r *feedRepoBase) buildSortClause(sort, timeframe string) (string, string) { 67 // Use whitelist map for ORDER BY clause (defense-in-depth against SQL injection) 68 orderBy := r.sortClauses[sort] 69 if orderBy == "" { 70 orderBy = r.sortClauses["hot"] // safe default 71 } 72 73 // Add time filter for "top" sort 74 var timeFilter string 75 if sort == "top" { 76 timeFilter = r.buildTimeFilter(timeframe) 77 } 78 79 return orderBy, timeFilter 80} 81 82// buildTimeFilter returns SQL filter for timeframe 83func (r *feedRepoBase) buildTimeFilter(timeframe string) string { 84 if timeframe == "" || timeframe == "all" { 85 return "" 86 } 87 88 var interval string 89 switch timeframe { 90 case "hour": 91 interval = "1 hour" 92 case "day": 93 interval = "1 day" 94 case "week": 95 interval = "1 week" 96 case "month": 97 interval = "1 month" 98 case "year": 99 interval = "1 year" 100 default: 101 return "" 102 } 103 104 return fmt.Sprintf("AND p.created_at > NOW() - INTERVAL '%s'", interval) 105} 106 107// parseCursor decodes and validates pagination cursor 108// paramOffset is the starting parameter number for cursor values ($2 for discover, $3 for timeline) 109func (r *feedRepoBase) parseCursor(cursor *string, sort string, paramOffset int) (string, []interface{}, error) { 110 if cursor == nil || *cursor == "" { 111 return "", nil, nil 112 } 113 114 // Decode base64 cursor 115 decoded, err := base64.StdEncoding.DecodeString(*cursor) 116 if err != nil { 117 return "", nil, fmt.Errorf("invalid cursor encoding") 118 } 119 120 // Parse cursor: payload::signature 121 parts := strings.Split(string(decoded), "::") 122 if len(parts) < 2 { 123 return "", nil, fmt.Errorf("invalid cursor format") 124 } 125 126 // Verify HMAC signature 127 signatureHex := parts[len(parts)-1] 128 payload := strings.Join(parts[:len(parts)-1], "::") 129 130 expectedMAC := hmac.New(sha256.New, []byte(r.cursorSecret)) 131 expectedMAC.Write([]byte(payload)) 132 expectedSignature := hex.EncodeToString(expectedMAC.Sum(nil)) 133 134 if !hmac.Equal([]byte(signatureHex), []byte(expectedSignature)) { 135 return "", nil, fmt.Errorf("invalid cursor signature") 136 } 137 138 // Parse payload based on sort type 139 payloadParts := strings.Split(payload, "::") 140 141 switch sort { 142 case "new": 143 // Cursor format: timestamp::uri 144 if len(payloadParts) != 2 { 145 return "", nil, fmt.Errorf("invalid cursor format") 146 } 147 148 createdAt := payloadParts[0] 149 uri := payloadParts[1] 150 151 // Validate timestamp format 152 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 153 return "", nil, fmt.Errorf("invalid cursor timestamp") 154 } 155 156 // Validate URI format (must be AT-URI) 157 if !strings.HasPrefix(uri, "at://") { 158 return "", nil, fmt.Errorf("invalid cursor URI") 159 } 160 161 filter := fmt.Sprintf(`AND (p.created_at < $%d OR (p.created_at = $%d AND p.uri < $%d))`, 162 paramOffset, paramOffset, paramOffset+1) 163 return filter, []interface{}{createdAt, uri}, nil 164 165 case "top": 166 // Cursor format: score::timestamp::uri 167 if len(payloadParts) != 3 { 168 return "", nil, fmt.Errorf("invalid cursor format for %s sort", sort) 169 } 170 171 scoreStr := payloadParts[0] 172 createdAt := payloadParts[1] 173 uri := payloadParts[2] 174 175 // Validate score is numeric 176 score := 0 177 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil { 178 return "", nil, fmt.Errorf("invalid cursor score") 179 } 180 181 // Validate timestamp format 182 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 183 return "", nil, fmt.Errorf("invalid cursor timestamp") 184 } 185 186 // Validate URI format (must be AT-URI) 187 if !strings.HasPrefix(uri, "at://") { 188 return "", nil, fmt.Errorf("invalid cursor URI") 189 } 190 191 filter := fmt.Sprintf(`AND (p.score < $%d OR (p.score = $%d AND p.created_at < $%d) OR (p.score = $%d AND p.created_at = $%d AND p.uri < $%d))`, 192 paramOffset, paramOffset, paramOffset+1, paramOffset, paramOffset+1, paramOffset+2) 193 return filter, []interface{}{score, createdAt, uri}, nil 194 195 case "hot": 196 // Cursor format: hot_rank::timestamp::uri 197 // CRITICAL: Must use computed hot_rank, not raw score, to prevent pagination bugs 198 if len(payloadParts) != 3 { 199 return "", nil, fmt.Errorf("invalid cursor format for hot sort") 200 } 201 202 hotRankStr := payloadParts[0] 203 createdAt := payloadParts[1] 204 uri := payloadParts[2] 205 206 // Validate hot_rank is numeric (float) 207 hotRank := 0.0 208 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil { 209 return "", nil, fmt.Errorf("invalid cursor hot rank") 210 } 211 212 // Validate timestamp format 213 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 214 return "", nil, fmt.Errorf("invalid cursor timestamp") 215 } 216 217 // Validate URI format (must be AT-URI) 218 if !strings.HasPrefix(uri, "at://") { 219 return "", nil, fmt.Errorf("invalid cursor URI") 220 } 221 222 // CRITICAL: Compare against the computed hot_rank expression, not p.score 223 filter := fmt.Sprintf(`AND ((%s < $%d OR (%s = $%d AND p.created_at < $%d) OR (%s = $%d AND p.created_at = $%d AND p.uri < $%d)) AND p.uri != $%d)`, 224 r.hotRankExpression, paramOffset, 225 r.hotRankExpression, paramOffset, paramOffset+1, 226 r.hotRankExpression, paramOffset, paramOffset+1, paramOffset+2, 227 paramOffset+3) 228 return filter, []interface{}{hotRank, createdAt, uri, uri}, nil 229 230 default: 231 return "", nil, nil 232 } 233} 234 235// buildCursor creates HMAC-signed pagination cursor from last post 236// SECURITY: Cursor is signed with HMAC-SHA256 to prevent manipulation 237func (r *feedRepoBase) buildCursor(post *posts.PostView, sort string, hotRank float64) string { 238 var payload string 239 // Use :: as delimiter following Bluesky convention 240 const delimiter = "::" 241 242 switch sort { 243 case "new": 244 // Format: timestamp::uri 245 payload = fmt.Sprintf("%s%s%s", post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 246 247 case "top": 248 // Format: score::timestamp::uri 249 score := 0 250 if post.Stats != nil { 251 score = post.Stats.Score 252 } 253 payload = fmt.Sprintf("%d%s%s%s%s", score, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 254 255 case "hot": 256 // Format: hot_rank::timestamp::uri 257 // CRITICAL: Use computed hot_rank with full precision 258 hotRankStr := strconv.FormatFloat(hotRank, 'g', -1, 64) 259 payload = fmt.Sprintf("%s%s%s%s%s", hotRankStr, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 260 261 default: 262 payload = post.URI 263 } 264 265 // Sign the payload with HMAC-SHA256 266 mac := hmac.New(sha256.New, []byte(r.cursorSecret)) 267 mac.Write([]byte(payload)) 268 signature := hex.EncodeToString(mac.Sum(nil)) 269 270 // Append signature to payload 271 signed := payload + delimiter + signature 272 273 return base64.StdEncoding.EncodeToString([]byte(signed)) 274} 275 276// scanFeedPost scans a database row into a PostView 277// This is the shared scanning logic used by both timeline and discover feeds 278func (r *feedRepoBase) scanFeedPost(rows *sql.Rows) (*posts.PostView, float64, error) { 279 var ( 280 postView posts.PostView 281 authorView posts.AuthorView 282 communityRef posts.CommunityRef 283 title, content sql.NullString 284 facets, embed sql.NullString 285 labelsJSON sql.NullString 286 editedAt sql.NullTime 287 communityHandle sql.NullString 288 communityAvatar sql.NullString 289 hotRank sql.NullFloat64 290 ) 291 292 err := rows.Scan( 293 &postView.URI, &postView.CID, &postView.RKey, 294 &authorView.DID, &authorView.Handle, 295 &communityRef.DID, &communityHandle, &communityRef.Name, &communityAvatar, 296 &title, &content, &facets, &embed, &labelsJSON, 297 &postView.CreatedAt, &editedAt, &postView.IndexedAt, 298 &postView.UpvoteCount, &postView.DownvoteCount, &postView.Score, &postView.CommentCount, 299 &hotRank, 300 ) 301 if err != nil { 302 return nil, 0, err 303 } 304 305 // Build author view 306 postView.Author = &authorView 307 308 // Build community ref 309 if communityHandle.Valid { 310 communityRef.Handle = communityHandle.String 311 } 312 communityRef.Avatar = nullStringPtr(communityAvatar) 313 postView.Community = &communityRef 314 315 // Set optional fields 316 postView.Title = nullStringPtr(title) 317 postView.Text = nullStringPtr(content) 318 319 // Parse facets JSON 320 if facets.Valid { 321 var facetArray []interface{} 322 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil { 323 postView.TextFacets = facetArray 324 } 325 } 326 327 // Parse embed JSON 328 if embed.Valid { 329 var embedData interface{} 330 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil { 331 postView.Embed = embedData 332 } 333 } 334 335 // Build stats 336 postView.Stats = &posts.PostStats{ 337 Upvotes: postView.UpvoteCount, 338 Downvotes: postView.DownvoteCount, 339 Score: postView.Score, 340 CommentCount: postView.CommentCount, 341 } 342 343 // Build the record (required by lexicon) 344 record := map[string]interface{}{ 345 "$type": "social.coves.community.post", 346 "community": communityRef.DID, 347 "author": authorView.DID, 348 "createdAt": postView.CreatedAt.Format(time.RFC3339), 349 } 350 351 // Add optional fields to record if present 352 if title.Valid { 353 record["title"] = title.String 354 } 355 if content.Valid { 356 record["content"] = content.String 357 } 358 if facets.Valid { 359 var facetArray []interface{} 360 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil { 361 record["facets"] = facetArray 362 } 363 } 364 if embed.Valid { 365 var embedData interface{} 366 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil { 367 record["embed"] = embedData 368 } 369 } 370 if labelsJSON.Valid { 371 // Labels are stored as JSONB containing full com.atproto.label.defs#selfLabels structure 372 // Deserialize and include in record 373 var selfLabels posts.SelfLabels 374 if err := json.Unmarshal([]byte(labelsJSON.String), &selfLabels); err == nil { 375 record["labels"] = selfLabels 376 } 377 } 378 379 postView.Record = record 380 381 // Return the computed hot_rank (0.0 if NULL for non-hot sorts) 382 hotRankValue := 0.0 383 if hotRank.Valid { 384 hotRankValue = hotRank.Float64 385 } 386 387 return &postView, hotRankValue, nil 388} 389 390// nullStringPtr converts sql.NullString to *string 391// Helper function used by feed scanning logic across all feed types 392func nullStringPtr(ns sql.NullString) *string { 393 if !ns.Valid { 394 return nil 395 } 396 return &ns.String 397}