A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "crypto/hmac" 5 "crypto/sha256" 6 "database/sql" 7 "encoding/base64" 8 "encoding/hex" 9 "encoding/json" 10 "fmt" 11 "strconv" 12 "strings" 13 "time" 14 15 "Coves/internal/core/posts" 16) 17 18// feedRepoBase contains shared logic for timeline and discover feed repositories 19// This eliminates ~85% code duplication and ensures bug fixes apply to both feeds 20// 21// DATABASE INDEXES REQUIRED: 22// The feed queries rely on these indexes (created in migration 011_create_posts_table.sql): 23// 24// 1. idx_posts_community_created ON posts(community_did, created_at DESC) WHERE deleted_at IS NULL 25// - Used by: Both timeline and discover for "new" sort 26// - Covers: Community filtering + chronological ordering + soft delete filter 27// 28// 2. idx_posts_community_score ON posts(community_did, score DESC, created_at DESC) WHERE deleted_at IS NULL 29// - Used by: Both timeline and discover for "top" sort 30// - Covers: Community filtering + score ordering + tie-breaking + soft delete filter 31// 32// 3. idx_subscriptions_user_community ON community_subscriptions(user_did, community_did) 33// - Used by: Timeline feed (JOIN with subscriptions) 34// - Covers: User subscription lookup 35// 36// 4. Hot sort uses computed expression: (score / POWER(age_hours + 2, 1.5)) 37// - Cannot be indexed directly (computed at query time) 38// - Uses idx_posts_community_created for base ordering 39// - Performance: ~10-20ms for timeline, ~8-15ms for discover (acceptable for alpha) 40// 41// PERFORMANCE NOTES: 42// - All queries use single execution (no N+1) 43// - JOINs are minimal (3 for timeline, 2 for discover) 44// - Partial indexes (WHERE deleted_at IS NULL) eliminate soft-deleted posts efficiently 45// - Cursor pagination is stable (no offset drift) 46// - Limit+1 pattern checks for next page without extra query 47type feedRepoBase struct { 48 db *sql.DB 49 hotRankExpression string 50 sortClauses map[string]string 51 cursorSecret string // HMAC secret for cursor integrity protection 52} 53 54// newFeedRepoBase creates a new base repository with shared feed logic 55func newFeedRepoBase(db *sql.DB, hotRankExpr string, sortClauses map[string]string, cursorSecret string) *feedRepoBase { 56 return &feedRepoBase{ 57 db: db, 58 hotRankExpression: hotRankExpr, 59 sortClauses: sortClauses, 60 cursorSecret: cursorSecret, 61 } 62} 63 64// buildSortClause returns the ORDER BY SQL and optional time filter 65// Uses whitelist map to prevent SQL injection via dynamic ORDER BY 66func (r *feedRepoBase) buildSortClause(sort, timeframe string) (string, string) { 67 // Use whitelist map for ORDER BY clause (defense-in-depth against SQL injection) 68 orderBy := r.sortClauses[sort] 69 if orderBy == "" { 70 orderBy = r.sortClauses["hot"] // safe default 71 } 72 73 // Add time filter for "top" sort 74 var timeFilter string 75 if sort == "top" { 76 timeFilter = r.buildTimeFilter(timeframe) 77 } 78 79 return orderBy, timeFilter 80} 81 82// buildTimeFilter returns SQL filter for timeframe 83func (r *feedRepoBase) buildTimeFilter(timeframe string) string { 84 if timeframe == "" || timeframe == "all" { 85 return "" 86 } 87 88 var interval string 89 switch timeframe { 90 case "hour": 91 interval = "1 hour" 92 case "day": 93 interval = "1 day" 94 case "week": 95 interval = "1 week" 96 case "month": 97 interval = "1 month" 98 case "year": 99 interval = "1 year" 100 default: 101 return "" 102 } 103 104 return fmt.Sprintf("AND p.created_at > NOW() - INTERVAL '%s'", interval) 105} 106 107// parseCursor decodes and validates pagination cursor 108// paramOffset is the starting parameter number for cursor values ($2 for discover, $3 for timeline) 109func (r *feedRepoBase) parseCursor(cursor *string, sort string, paramOffset int) (string, []interface{}, error) { 110 if cursor == nil || *cursor == "" { 111 return "", nil, nil 112 } 113 114 // Decode base64 cursor 115 decoded, err := base64.StdEncoding.DecodeString(*cursor) 116 if err != nil { 117 return "", nil, fmt.Errorf("invalid cursor encoding") 118 } 119 120 // Parse cursor: payload::signature 121 parts := strings.Split(string(decoded), "::") 122 if len(parts) < 2 { 123 return "", nil, fmt.Errorf("invalid cursor format") 124 } 125 126 // Verify HMAC signature 127 signatureHex := parts[len(parts)-1] 128 payload := strings.Join(parts[:len(parts)-1], "::") 129 130 expectedMAC := hmac.New(sha256.New, []byte(r.cursorSecret)) 131 expectedMAC.Write([]byte(payload)) 132 expectedSignature := hex.EncodeToString(expectedMAC.Sum(nil)) 133 134 if !hmac.Equal([]byte(signatureHex), []byte(expectedSignature)) { 135 return "", nil, fmt.Errorf("invalid cursor signature") 136 } 137 138 // Parse payload based on sort type 139 payloadParts := strings.Split(payload, "::") 140 141 switch sort { 142 case "new": 143 // Cursor format: timestamp::uri 144 if len(payloadParts) != 2 { 145 return "", nil, fmt.Errorf("invalid cursor format") 146 } 147 148 createdAt := payloadParts[0] 149 uri := payloadParts[1] 150 151 // Validate timestamp format 152 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 153 return "", nil, fmt.Errorf("invalid cursor timestamp") 154 } 155 156 // Validate URI format (must be AT-URI) 157 if !strings.HasPrefix(uri, "at://") { 158 return "", nil, fmt.Errorf("invalid cursor URI") 159 } 160 161 filter := fmt.Sprintf(`AND (p.created_at < $%d OR (p.created_at = $%d AND p.uri < $%d))`, 162 paramOffset, paramOffset, paramOffset+1) 163 return filter, []interface{}{createdAt, uri}, nil 164 165 case "top": 166 // Cursor format: score::timestamp::uri 167 if len(payloadParts) != 3 { 168 return "", nil, fmt.Errorf("invalid cursor format for %s sort", sort) 169 } 170 171 scoreStr := payloadParts[0] 172 createdAt := payloadParts[1] 173 uri := payloadParts[2] 174 175 // Validate score is numeric 176 score := 0 177 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil { 178 return "", nil, fmt.Errorf("invalid cursor score") 179 } 180 181 // Validate timestamp format 182 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 183 return "", nil, fmt.Errorf("invalid cursor timestamp") 184 } 185 186 // Validate URI format (must be AT-URI) 187 if !strings.HasPrefix(uri, "at://") { 188 return "", nil, fmt.Errorf("invalid cursor URI") 189 } 190 191 filter := fmt.Sprintf(`AND (p.score < $%d OR (p.score = $%d AND p.created_at < $%d) OR (p.score = $%d AND p.created_at = $%d AND p.uri < $%d))`, 192 paramOffset, paramOffset, paramOffset+1, paramOffset, paramOffset+1, paramOffset+2) 193 return filter, []interface{}{score, createdAt, uri}, nil 194 195 case "hot": 196 // Cursor format: hot_rank::timestamp::uri 197 // CRITICAL: Must use computed hot_rank, not raw score, to prevent pagination bugs 198 if len(payloadParts) != 3 { 199 return "", nil, fmt.Errorf("invalid cursor format for hot sort") 200 } 201 202 hotRankStr := payloadParts[0] 203 createdAt := payloadParts[1] 204 uri := payloadParts[2] 205 206 // Validate hot_rank is numeric (float) 207 hotRank := 0.0 208 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil { 209 return "", nil, fmt.Errorf("invalid cursor hot rank") 210 } 211 212 // Validate timestamp format 213 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 214 return "", nil, fmt.Errorf("invalid cursor timestamp") 215 } 216 217 // Validate URI format (must be AT-URI) 218 if !strings.HasPrefix(uri, "at://") { 219 return "", nil, fmt.Errorf("invalid cursor URI") 220 } 221 222 // CRITICAL: Compare against the computed hot_rank expression, not p.score 223 filter := fmt.Sprintf(`AND ((%s < $%d OR (%s = $%d AND p.created_at < $%d) OR (%s = $%d AND p.created_at = $%d AND p.uri < $%d)) AND p.uri != $%d)`, 224 r.hotRankExpression, paramOffset, 225 r.hotRankExpression, paramOffset, paramOffset+1, 226 r.hotRankExpression, paramOffset, paramOffset+1, paramOffset+2, 227 paramOffset+3) 228 return filter, []interface{}{hotRank, createdAt, uri, uri}, nil 229 230 default: 231 return "", nil, nil 232 } 233} 234 235// buildCursor creates HMAC-signed pagination cursor from last post 236// SECURITY: Cursor is signed with HMAC-SHA256 to prevent manipulation 237func (r *feedRepoBase) buildCursor(post *posts.PostView, sort string, hotRank float64) string { 238 var payload string 239 // Use :: as delimiter following Bluesky convention 240 const delimiter = "::" 241 242 switch sort { 243 case "new": 244 // Format: timestamp::uri 245 payload = fmt.Sprintf("%s%s%s", post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 246 247 case "top": 248 // Format: score::timestamp::uri 249 score := 0 250 if post.Stats != nil { 251 score = post.Stats.Score 252 } 253 payload = fmt.Sprintf("%d%s%s%s%s", score, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 254 255 case "hot": 256 // Format: hot_rank::timestamp::uri 257 // CRITICAL: Use computed hot_rank with full precision 258 hotRankStr := strconv.FormatFloat(hotRank, 'g', -1, 64) 259 payload = fmt.Sprintf("%s%s%s%s%s", hotRankStr, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 260 261 default: 262 payload = post.URI 263 } 264 265 // Sign the payload with HMAC-SHA256 266 mac := hmac.New(sha256.New, []byte(r.cursorSecret)) 267 mac.Write([]byte(payload)) 268 signature := hex.EncodeToString(mac.Sum(nil)) 269 270 // Append signature to payload 271 signed := payload + delimiter + signature 272 273 return base64.StdEncoding.EncodeToString([]byte(signed)) 274} 275 276// scanFeedPost scans a database row into a PostView 277// This is the shared scanning logic used by both timeline and discover feeds 278func (r *feedRepoBase) scanFeedPost(rows *sql.Rows) (*posts.PostView, float64, error) { 279 var ( 280 postView posts.PostView 281 authorView posts.AuthorView 282 communityRef posts.CommunityRef 283 title, content sql.NullString 284 facets, embed sql.NullString 285 labelsJSON sql.NullString 286 editedAt sql.NullTime 287 communityHandle sql.NullString 288 communityAvatar sql.NullString 289 communityPDSURL sql.NullString 290 hotRank sql.NullFloat64 291 ) 292 293 err := rows.Scan( 294 &postView.URI, &postView.CID, &postView.RKey, 295 &authorView.DID, &authorView.Handle, 296 &communityRef.DID, &communityHandle, &communityRef.Name, &communityAvatar, &communityPDSURL, 297 &title, &content, &facets, &embed, &labelsJSON, 298 &postView.CreatedAt, &editedAt, &postView.IndexedAt, 299 &postView.UpvoteCount, &postView.DownvoteCount, &postView.Score, &postView.CommentCount, 300 &hotRank, 301 ) 302 if err != nil { 303 return nil, 0, err 304 } 305 306 // Build author view 307 postView.Author = &authorView 308 309 // Build community ref 310 if communityHandle.Valid { 311 communityRef.Handle = communityHandle.String 312 } 313 communityRef.Avatar = nullStringPtr(communityAvatar) 314 if communityPDSURL.Valid { 315 communityRef.PDSURL = communityPDSURL.String 316 } 317 postView.Community = &communityRef 318 319 // Set optional fields 320 postView.Title = nullStringPtr(title) 321 postView.Text = nullStringPtr(content) 322 323 // Parse facets JSON 324 if facets.Valid { 325 var facetArray []interface{} 326 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil { 327 postView.TextFacets = facetArray 328 } 329 } 330 331 // Parse embed JSON 332 if embed.Valid { 333 var embedData interface{} 334 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil { 335 postView.Embed = embedData 336 } 337 } 338 339 // Build stats 340 postView.Stats = &posts.PostStats{ 341 Upvotes: postView.UpvoteCount, 342 Downvotes: postView.DownvoteCount, 343 Score: postView.Score, 344 CommentCount: postView.CommentCount, 345 } 346 347 // Build the record (required by lexicon) 348 record := map[string]interface{}{ 349 "$type": "social.coves.community.post", 350 "community": communityRef.DID, 351 "author": authorView.DID, 352 "createdAt": postView.CreatedAt.Format(time.RFC3339), 353 } 354 355 // Add optional fields to record if present 356 if title.Valid { 357 record["title"] = title.String 358 } 359 if content.Valid { 360 record["content"] = content.String 361 } 362 if facets.Valid { 363 var facetArray []interface{} 364 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil { 365 record["facets"] = facetArray 366 } 367 } 368 if embed.Valid { 369 var embedData interface{} 370 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil { 371 record["embed"] = embedData 372 } 373 } 374 if labelsJSON.Valid { 375 // Labels are stored as JSONB containing full com.atproto.label.defs#selfLabels structure 376 // Deserialize and include in record 377 var selfLabels posts.SelfLabels 378 if err := json.Unmarshal([]byte(labelsJSON.String), &selfLabels); err == nil { 379 record["labels"] = selfLabels 380 } 381 } 382 383 postView.Record = record 384 385 // Return the computed hot_rank (0.0 if NULL for non-hot sorts) 386 hotRankValue := 0.0 387 if hotRank.Valid { 388 hotRankValue = hotRank.Float64 389 } 390 391 return &postView, hotRankValue, nil 392} 393 394// nullStringPtr converts sql.NullString to *string 395// Helper function used by feed scanning logic across all feed types 396func nullStringPtr(ns sql.NullString) *string { 397 if !ns.Valid { 398 return nil 399 } 400 return &ns.String 401}