A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "Coves/internal/core/posts" 5 "crypto/hmac" 6 "crypto/sha256" 7 "database/sql" 8 "encoding/base64" 9 "encoding/hex" 10 "encoding/json" 11 "fmt" 12 "strconv" 13 "strings" 14 "time" 15) 16 17// feedRepoBase contains shared logic for timeline and discover feed repositories 18// This eliminates ~85% code duplication and ensures bug fixes apply to both feeds 19// 20// DATABASE INDEXES REQUIRED: 21// The feed queries rely on these indexes (created in migration 011_create_posts_table.sql): 22// 23// 1. idx_posts_community_created ON posts(community_did, created_at DESC) WHERE deleted_at IS NULL 24// - Used by: Both timeline and discover for "new" sort 25// - Covers: Community filtering + chronological ordering + soft delete filter 26// 27// 2. idx_posts_community_score ON posts(community_did, score DESC, created_at DESC) WHERE deleted_at IS NULL 28// - Used by: Both timeline and discover for "top" sort 29// - Covers: Community filtering + score ordering + tie-breaking + soft delete filter 30// 31// 3. idx_subscriptions_user_community ON community_subscriptions(user_did, community_did) 32// - Used by: Timeline feed (JOIN with subscriptions) 33// - Covers: User subscription lookup 34// 35// 4. Hot sort uses computed expression: (score / POWER(age_hours + 2, 1.5)) 36// - Cannot be indexed directly (computed at query time) 37// - Uses idx_posts_community_created for base ordering 38// - Performance: ~10-20ms for timeline, ~8-15ms for discover (acceptable for alpha) 39// 40// PERFORMANCE NOTES: 41// - All queries use single execution (no N+1) 42// - JOINs are minimal (3 for timeline, 2 for discover) 43// - Partial indexes (WHERE deleted_at IS NULL) eliminate soft-deleted posts efficiently 44// - Cursor pagination is stable (no offset drift) 45// - Limit+1 pattern checks for next page without extra query 46type feedRepoBase struct { 47 db *sql.DB 48 hotRankExpression string 49 sortClauses map[string]string 50 cursorSecret string // HMAC secret for cursor integrity protection 51} 52 53// newFeedRepoBase creates a new base repository with shared feed logic 54func newFeedRepoBase(db *sql.DB, hotRankExpr string, sortClauses map[string]string, cursorSecret string) *feedRepoBase { 55 return &feedRepoBase{ 56 db: db, 57 hotRankExpression: hotRankExpr, 58 sortClauses: sortClauses, 59 cursorSecret: cursorSecret, 60 } 61} 62 63// buildSortClause returns the ORDER BY SQL and optional time filter 64// Uses whitelist map to prevent SQL injection via dynamic ORDER BY 65func (r *feedRepoBase) buildSortClause(sort, timeframe string) (string, string) { 66 // Use whitelist map for ORDER BY clause (defense-in-depth against SQL injection) 67 orderBy := r.sortClauses[sort] 68 if orderBy == "" { 69 orderBy = r.sortClauses["hot"] // safe default 70 } 71 72 // Add time filter for "top" sort 73 var timeFilter string 74 if sort == "top" { 75 timeFilter = r.buildTimeFilter(timeframe) 76 } 77 78 return orderBy, timeFilter 79} 80 81// buildTimeFilter returns SQL filter for timeframe 82func (r *feedRepoBase) buildTimeFilter(timeframe string) string { 83 if timeframe == "" || timeframe == "all" { 84 return "" 85 } 86 87 var interval string 88 switch timeframe { 89 case "hour": 90 interval = "1 hour" 91 case "day": 92 interval = "1 day" 93 case "week": 94 interval = "1 week" 95 case "month": 96 interval = "1 month" 97 case "year": 98 interval = "1 year" 99 default: 100 return "" 101 } 102 103 return fmt.Sprintf("AND p.created_at > NOW() - INTERVAL '%s'", interval) 104} 105 106// parseCursor decodes and validates pagination cursor 107// paramOffset is the starting parameter number for cursor values ($2 for discover, $3 for timeline) 108func (r *feedRepoBase) parseCursor(cursor *string, sort string, paramOffset int) (string, []interface{}, error) { 109 if cursor == nil || *cursor == "" { 110 return "", nil, nil 111 } 112 113 // Decode base64 cursor 114 decoded, err := base64.StdEncoding.DecodeString(*cursor) 115 if err != nil { 116 return "", nil, fmt.Errorf("invalid cursor encoding") 117 } 118 119 // Parse cursor: payload::signature 120 parts := strings.Split(string(decoded), "::") 121 if len(parts) < 2 { 122 return "", nil, fmt.Errorf("invalid cursor format") 123 } 124 125 // Verify HMAC signature 126 signatureHex := parts[len(parts)-1] 127 payload := strings.Join(parts[:len(parts)-1], "::") 128 129 expectedMAC := hmac.New(sha256.New, []byte(r.cursorSecret)) 130 expectedMAC.Write([]byte(payload)) 131 expectedSignature := hex.EncodeToString(expectedMAC.Sum(nil)) 132 133 if !hmac.Equal([]byte(signatureHex), []byte(expectedSignature)) { 134 return "", nil, fmt.Errorf("invalid cursor signature") 135 } 136 137 // Parse payload based on sort type 138 payloadParts := strings.Split(payload, "::") 139 140 switch sort { 141 case "new": 142 // Cursor format: timestamp::uri 143 if len(payloadParts) != 2 { 144 return "", nil, fmt.Errorf("invalid cursor format") 145 } 146 147 createdAt := payloadParts[0] 148 uri := payloadParts[1] 149 150 // Validate timestamp format 151 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 152 return "", nil, fmt.Errorf("invalid cursor timestamp") 153 } 154 155 // Validate URI format (must be AT-URI) 156 if !strings.HasPrefix(uri, "at://") { 157 return "", nil, fmt.Errorf("invalid cursor URI") 158 } 159 160 filter := fmt.Sprintf(`AND (p.created_at < $%d OR (p.created_at = $%d AND p.uri < $%d))`, 161 paramOffset, paramOffset, paramOffset+1) 162 return filter, []interface{}{createdAt, uri}, nil 163 164 case "top": 165 // Cursor format: score::timestamp::uri 166 if len(payloadParts) != 3 { 167 return "", nil, fmt.Errorf("invalid cursor format for %s sort", sort) 168 } 169 170 scoreStr := payloadParts[0] 171 createdAt := payloadParts[1] 172 uri := payloadParts[2] 173 174 // Validate score is numeric 175 score := 0 176 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil { 177 return "", nil, fmt.Errorf("invalid cursor score") 178 } 179 180 // Validate timestamp format 181 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 182 return "", nil, fmt.Errorf("invalid cursor timestamp") 183 } 184 185 // Validate URI format (must be AT-URI) 186 if !strings.HasPrefix(uri, "at://") { 187 return "", nil, fmt.Errorf("invalid cursor URI") 188 } 189 190 filter := fmt.Sprintf(`AND (p.score < $%d OR (p.score = $%d AND p.created_at < $%d) OR (p.score = $%d AND p.created_at = $%d AND p.uri < $%d))`, 191 paramOffset, paramOffset, paramOffset+1, paramOffset, paramOffset+1, paramOffset+2) 192 return filter, []interface{}{score, createdAt, uri}, nil 193 194 case "hot": 195 // Cursor format: hot_rank::timestamp::uri 196 // CRITICAL: Must use computed hot_rank, not raw score, to prevent pagination bugs 197 if len(payloadParts) != 3 { 198 return "", nil, fmt.Errorf("invalid cursor format for hot sort") 199 } 200 201 hotRankStr := payloadParts[0] 202 createdAt := payloadParts[1] 203 uri := payloadParts[2] 204 205 // Validate hot_rank is numeric (float) 206 hotRank := 0.0 207 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil { 208 return "", nil, fmt.Errorf("invalid cursor hot rank") 209 } 210 211 // Validate timestamp format 212 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 213 return "", nil, fmt.Errorf("invalid cursor timestamp") 214 } 215 216 // Validate URI format (must be AT-URI) 217 if !strings.HasPrefix(uri, "at://") { 218 return "", nil, fmt.Errorf("invalid cursor URI") 219 } 220 221 // CRITICAL: Compare against the computed hot_rank expression, not p.score 222 filter := fmt.Sprintf(`AND ((%s < $%d OR (%s = $%d AND p.created_at < $%d) OR (%s = $%d AND p.created_at = $%d AND p.uri < $%d)) AND p.uri != $%d)`, 223 r.hotRankExpression, paramOffset, 224 r.hotRankExpression, paramOffset, paramOffset+1, 225 r.hotRankExpression, paramOffset, paramOffset+1, paramOffset+2, 226 paramOffset+3) 227 return filter, []interface{}{hotRank, createdAt, uri, uri}, nil 228 229 default: 230 return "", nil, nil 231 } 232} 233 234// buildCursor creates HMAC-signed pagination cursor from last post 235// SECURITY: Cursor is signed with HMAC-SHA256 to prevent manipulation 236func (r *feedRepoBase) buildCursor(post *posts.PostView, sort string, hotRank float64) string { 237 var payload string 238 // Use :: as delimiter following Bluesky convention 239 const delimiter = "::" 240 241 switch sort { 242 case "new": 243 // Format: timestamp::uri 244 payload = fmt.Sprintf("%s%s%s", post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 245 246 case "top": 247 // Format: score::timestamp::uri 248 score := 0 249 if post.Stats != nil { 250 score = post.Stats.Score 251 } 252 payload = fmt.Sprintf("%d%s%s%s%s", score, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 253 254 case "hot": 255 // Format: hot_rank::timestamp::uri 256 // CRITICAL: Use computed hot_rank with full precision 257 hotRankStr := strconv.FormatFloat(hotRank, 'g', -1, 64) 258 payload = fmt.Sprintf("%s%s%s%s%s", hotRankStr, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 259 260 default: 261 payload = post.URI 262 } 263 264 // Sign the payload with HMAC-SHA256 265 mac := hmac.New(sha256.New, []byte(r.cursorSecret)) 266 mac.Write([]byte(payload)) 267 signature := hex.EncodeToString(mac.Sum(nil)) 268 269 // Append signature to payload 270 signed := payload + delimiter + signature 271 272 return base64.StdEncoding.EncodeToString([]byte(signed)) 273} 274 275// scanFeedPost scans a database row into a PostView 276// This is the shared scanning logic used by both timeline and discover feeds 277func (r *feedRepoBase) scanFeedPost(rows *sql.Rows) (*posts.PostView, float64, error) { 278 var ( 279 postView posts.PostView 280 authorView posts.AuthorView 281 communityRef posts.CommunityRef 282 title, content sql.NullString 283 facets, embed sql.NullString 284 labelsJSON sql.NullString 285 editedAt sql.NullTime 286 communityHandle sql.NullString 287 communityAvatar sql.NullString 288 hotRank sql.NullFloat64 289 ) 290 291 err := rows.Scan( 292 &postView.URI, &postView.CID, &postView.RKey, 293 &authorView.DID, &authorView.Handle, 294 &communityRef.DID, &communityHandle, &communityRef.Name, &communityAvatar, 295 &title, &content, &facets, &embed, &labelsJSON, 296 &postView.CreatedAt, &editedAt, &postView.IndexedAt, 297 &postView.UpvoteCount, &postView.DownvoteCount, &postView.Score, &postView.CommentCount, 298 &hotRank, 299 ) 300 if err != nil { 301 return nil, 0, err 302 } 303 304 // Build author view 305 postView.Author = &authorView 306 307 // Build community ref 308 if communityHandle.Valid { 309 communityRef.Handle = communityHandle.String 310 } 311 communityRef.Avatar = nullStringPtr(communityAvatar) 312 postView.Community = &communityRef 313 314 // Set optional fields 315 postView.Title = nullStringPtr(title) 316 postView.Text = nullStringPtr(content) 317 318 // Parse facets JSON 319 if facets.Valid { 320 var facetArray []interface{} 321 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil { 322 postView.TextFacets = facetArray 323 } 324 } 325 326 // Parse embed JSON 327 if embed.Valid { 328 var embedData interface{} 329 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil { 330 postView.Embed = embedData 331 } 332 } 333 334 // Build stats 335 postView.Stats = &posts.PostStats{ 336 Upvotes: postView.UpvoteCount, 337 Downvotes: postView.DownvoteCount, 338 Score: postView.Score, 339 CommentCount: postView.CommentCount, 340 } 341 342 // Build the record (required by lexicon) 343 record := map[string]interface{}{ 344 "$type": "social.coves.community.post", 345 "community": communityRef.DID, 346 "author": authorView.DID, 347 "createdAt": postView.CreatedAt.Format(time.RFC3339), 348 } 349 350 // Add optional fields to record if present 351 if title.Valid { 352 record["title"] = title.String 353 } 354 if content.Valid { 355 record["content"] = content.String 356 } 357 if facets.Valid { 358 var facetArray []interface{} 359 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil { 360 record["facets"] = facetArray 361 } 362 } 363 if embed.Valid { 364 var embedData interface{} 365 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil { 366 record["embed"] = embedData 367 } 368 } 369 if labelsJSON.Valid { 370 // Labels are stored as JSONB containing full com.atproto.label.defs#selfLabels structure 371 // Deserialize and include in record 372 var selfLabels posts.SelfLabels 373 if err := json.Unmarshal([]byte(labelsJSON.String), &selfLabels); err == nil { 374 record["labels"] = selfLabels 375 } 376 } 377 378 postView.Record = record 379 380 // Return the computed hot_rank (0.0 if NULL for non-hot sorts) 381 hotRankValue := 0.0 382 if hotRank.Valid { 383 hotRankValue = hotRank.Float64 384 } 385 386 return &postView, hotRankValue, nil 387} 388 389// nullStringPtr converts sql.NullString to *string 390// Helper function used by feed scanning logic across all feed types 391func nullStringPtr(ns sql.NullString) *string { 392 if !ns.Valid { 393 return nil 394 } 395 return &ns.String 396}