A community based topic aggregation platform built on atproto
at main 12 kB view raw
1package postgres 2 3import ( 4 "Coves/internal/core/posts" 5 "crypto/hmac" 6 "crypto/sha256" 7 "database/sql" 8 "encoding/base64" 9 "encoding/hex" 10 "encoding/json" 11 "fmt" 12 "strconv" 13 "strings" 14 "time" 15) 16 17// feedRepoBase contains shared logic for timeline and discover feed repositories 18// This eliminates ~85% code duplication and ensures bug fixes apply to both feeds 19// 20// DATABASE INDEXES REQUIRED: 21// The feed queries rely on these indexes (created in migration 011_create_posts_table.sql): 22// 23// 1. idx_posts_community_created ON posts(community_did, created_at DESC) WHERE deleted_at IS NULL 24// - Used by: Both timeline and discover for "new" sort 25// - Covers: Community filtering + chronological ordering + soft delete filter 26// 27// 2. idx_posts_community_score ON posts(community_did, score DESC, created_at DESC) WHERE deleted_at IS NULL 28// - Used by: Both timeline and discover for "top" sort 29// - Covers: Community filtering + score ordering + tie-breaking + soft delete filter 30// 31// 3. idx_subscriptions_user_community ON community_subscriptions(user_did, community_did) 32// - Used by: Timeline feed (JOIN with subscriptions) 33// - Covers: User subscription lookup 34// 35// 4. Hot sort uses computed expression: (score / POWER(age_hours + 2, 1.5)) 36// - Cannot be indexed directly (computed at query time) 37// - Uses idx_posts_community_created for base ordering 38// - Performance: ~10-20ms for timeline, ~8-15ms for discover (acceptable for alpha) 39// 40// PERFORMANCE NOTES: 41// - All queries use single execution (no N+1) 42// - JOINs are minimal (3 for timeline, 2 for discover) 43// - Partial indexes (WHERE deleted_at IS NULL) eliminate soft-deleted posts efficiently 44// - Cursor pagination is stable (no offset drift) 45// - Limit+1 pattern checks for next page without extra query 46type feedRepoBase struct { 47 db *sql.DB 48 hotRankExpression string 49 sortClauses map[string]string 50 cursorSecret string // HMAC secret for cursor integrity protection 51} 52 53// newFeedRepoBase creates a new base repository with shared feed logic 54func newFeedRepoBase(db *sql.DB, hotRankExpr string, sortClauses map[string]string, cursorSecret string) *feedRepoBase { 55 return &feedRepoBase{ 56 db: db, 57 hotRankExpression: hotRankExpr, 58 sortClauses: sortClauses, 59 cursorSecret: cursorSecret, 60 } 61} 62 63// buildSortClause returns the ORDER BY SQL and optional time filter 64// Uses whitelist map to prevent SQL injection via dynamic ORDER BY 65func (r *feedRepoBase) buildSortClause(sort, timeframe string) (string, string) { 66 // Use whitelist map for ORDER BY clause (defense-in-depth against SQL injection) 67 orderBy := r.sortClauses[sort] 68 if orderBy == "" { 69 orderBy = r.sortClauses["hot"] // safe default 70 } 71 72 // Add time filter for "top" sort 73 var timeFilter string 74 if sort == "top" { 75 timeFilter = r.buildTimeFilter(timeframe) 76 } 77 78 return orderBy, timeFilter 79} 80 81// buildTimeFilter returns SQL filter for timeframe 82func (r *feedRepoBase) buildTimeFilter(timeframe string) string { 83 if timeframe == "" || timeframe == "all" { 84 return "" 85 } 86 87 var interval string 88 switch timeframe { 89 case "hour": 90 interval = "1 hour" 91 case "day": 92 interval = "1 day" 93 case "week": 94 interval = "1 week" 95 case "month": 96 interval = "1 month" 97 case "year": 98 interval = "1 year" 99 default: 100 return "" 101 } 102 103 return fmt.Sprintf("AND p.created_at > NOW() - INTERVAL '%s'", interval) 104} 105 106// parseCursor decodes and validates pagination cursor 107// paramOffset is the starting parameter number for cursor values ($2 for discover, $3 for timeline) 108func (r *feedRepoBase) parseCursor(cursor *string, sort string, paramOffset int) (string, []interface{}, error) { 109 if cursor == nil || *cursor == "" { 110 return "", nil, nil 111 } 112 113 // Decode base64 cursor 114 decoded, err := base64.StdEncoding.DecodeString(*cursor) 115 if err != nil { 116 return "", nil, fmt.Errorf("invalid cursor encoding") 117 } 118 119 // Parse cursor: payload::signature 120 parts := strings.Split(string(decoded), "::") 121 if len(parts) < 2 { 122 return "", nil, fmt.Errorf("invalid cursor format") 123 } 124 125 // Verify HMAC signature 126 signatureHex := parts[len(parts)-1] 127 payload := strings.Join(parts[:len(parts)-1], "::") 128 129 expectedMAC := hmac.New(sha256.New, []byte(r.cursorSecret)) 130 expectedMAC.Write([]byte(payload)) 131 expectedSignature := hex.EncodeToString(expectedMAC.Sum(nil)) 132 133 if !hmac.Equal([]byte(signatureHex), []byte(expectedSignature)) { 134 return "", nil, fmt.Errorf("invalid cursor signature") 135 } 136 137 // Parse payload based on sort type 138 payloadParts := strings.Split(payload, "::") 139 140 switch sort { 141 case "new": 142 // Cursor format: timestamp::uri 143 if len(payloadParts) != 2 { 144 return "", nil, fmt.Errorf("invalid cursor format") 145 } 146 147 createdAt := payloadParts[0] 148 uri := payloadParts[1] 149 150 // Validate timestamp format 151 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 152 return "", nil, fmt.Errorf("invalid cursor timestamp") 153 } 154 155 // Validate URI format (must be AT-URI) 156 if !strings.HasPrefix(uri, "at://") { 157 return "", nil, fmt.Errorf("invalid cursor URI") 158 } 159 160 filter := fmt.Sprintf(`AND (p.created_at < $%d OR (p.created_at = $%d AND p.uri < $%d))`, 161 paramOffset, paramOffset, paramOffset+1) 162 return filter, []interface{}{createdAt, uri}, nil 163 164 case "top": 165 // Cursor format: score::timestamp::uri 166 if len(payloadParts) != 3 { 167 return "", nil, fmt.Errorf("invalid cursor format for %s sort", sort) 168 } 169 170 scoreStr := payloadParts[0] 171 createdAt := payloadParts[1] 172 uri := payloadParts[2] 173 174 // Validate score is numeric 175 score := 0 176 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil { 177 return "", nil, fmt.Errorf("invalid cursor score") 178 } 179 180 // Validate timestamp format 181 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 182 return "", nil, fmt.Errorf("invalid cursor timestamp") 183 } 184 185 // Validate URI format (must be AT-URI) 186 if !strings.HasPrefix(uri, "at://") { 187 return "", nil, fmt.Errorf("invalid cursor URI") 188 } 189 190 filter := fmt.Sprintf(`AND (p.score < $%d OR (p.score = $%d AND p.created_at < $%d) OR (p.score = $%d AND p.created_at = $%d AND p.uri < $%d))`, 191 paramOffset, paramOffset, paramOffset+1, paramOffset, paramOffset+1, paramOffset+2) 192 return filter, []interface{}{score, createdAt, uri}, nil 193 194 case "hot": 195 // Cursor format: hot_rank::timestamp::uri 196 // CRITICAL: Must use computed hot_rank, not raw score, to prevent pagination bugs 197 if len(payloadParts) != 3 { 198 return "", nil, fmt.Errorf("invalid cursor format for hot sort") 199 } 200 201 hotRankStr := payloadParts[0] 202 createdAt := payloadParts[1] 203 uri := payloadParts[2] 204 205 // Validate hot_rank is numeric (float) 206 hotRank := 0.0 207 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil { 208 return "", nil, fmt.Errorf("invalid cursor hot rank") 209 } 210 211 // Validate timestamp format 212 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 213 return "", nil, fmt.Errorf("invalid cursor timestamp") 214 } 215 216 // Validate URI format (must be AT-URI) 217 if !strings.HasPrefix(uri, "at://") { 218 return "", nil, fmt.Errorf("invalid cursor URI") 219 } 220 221 // CRITICAL: Compare against the computed hot_rank expression, not p.score 222 filter := fmt.Sprintf(`AND ((%s < $%d OR (%s = $%d AND p.created_at < $%d) OR (%s = $%d AND p.created_at = $%d AND p.uri < $%d)) AND p.uri != $%d)`, 223 r.hotRankExpression, paramOffset, 224 r.hotRankExpression, paramOffset, paramOffset+1, 225 r.hotRankExpression, paramOffset, paramOffset+1, paramOffset+2, 226 paramOffset+3) 227 return filter, []interface{}{hotRank, createdAt, uri, uri}, nil 228 229 default: 230 return "", nil, nil 231 } 232} 233 234// buildCursor creates HMAC-signed pagination cursor from last post 235// SECURITY: Cursor is signed with HMAC-SHA256 to prevent manipulation 236func (r *feedRepoBase) buildCursor(post *posts.PostView, sort string, hotRank float64) string { 237 var payload string 238 // Use :: as delimiter following Bluesky convention 239 const delimiter = "::" 240 241 switch sort { 242 case "new": 243 // Format: timestamp::uri 244 payload = fmt.Sprintf("%s%s%s", post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 245 246 case "top": 247 // Format: score::timestamp::uri 248 score := 0 249 if post.Stats != nil { 250 score = post.Stats.Score 251 } 252 payload = fmt.Sprintf("%d%s%s%s%s", score, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 253 254 case "hot": 255 // Format: hot_rank::timestamp::uri 256 // CRITICAL: Use computed hot_rank with full precision 257 hotRankStr := strconv.FormatFloat(hotRank, 'g', -1, 64) 258 payload = fmt.Sprintf("%s%s%s%s%s", hotRankStr, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 259 260 default: 261 payload = post.URI 262 } 263 264 // Sign the payload with HMAC-SHA256 265 mac := hmac.New(sha256.New, []byte(r.cursorSecret)) 266 mac.Write([]byte(payload)) 267 signature := hex.EncodeToString(mac.Sum(nil)) 268 269 // Append signature to payload 270 signed := payload + delimiter + signature 271 272 return base64.StdEncoding.EncodeToString([]byte(signed)) 273} 274 275// scanFeedPost scans a database row into a PostView 276// This is the shared scanning logic used by both timeline and discover feeds 277func (r *feedRepoBase) scanFeedPost(rows *sql.Rows) (*posts.PostView, float64, error) { 278 var ( 279 postView posts.PostView 280 authorView posts.AuthorView 281 communityRef posts.CommunityRef 282 title, content sql.NullString 283 facets, embed sql.NullString 284 labelsJSON sql.NullString 285 editedAt sql.NullTime 286 communityHandle sql.NullString 287 communityAvatar sql.NullString 288 communityPDSURL sql.NullString 289 hotRank sql.NullFloat64 290 ) 291 292 err := rows.Scan( 293 &postView.URI, &postView.CID, &postView.RKey, 294 &authorView.DID, &authorView.Handle, 295 &communityRef.DID, &communityHandle, &communityRef.Name, &communityAvatar, &communityPDSURL, 296 &title, &content, &facets, &embed, &labelsJSON, 297 &postView.CreatedAt, &editedAt, &postView.IndexedAt, 298 &postView.UpvoteCount, &postView.DownvoteCount, &postView.Score, &postView.CommentCount, 299 &hotRank, 300 ) 301 if err != nil { 302 return nil, 0, err 303 } 304 305 // Build author view 306 postView.Author = &authorView 307 308 // Build community ref 309 if communityHandle.Valid { 310 communityRef.Handle = communityHandle.String 311 } 312 communityRef.Avatar = nullStringPtr(communityAvatar) 313 if communityPDSURL.Valid { 314 communityRef.PDSURL = communityPDSURL.String 315 } 316 postView.Community = &communityRef 317 318 // Set optional fields 319 postView.Title = nullStringPtr(title) 320 postView.Text = nullStringPtr(content) 321 322 // Parse facets JSON 323 if facets.Valid { 324 var facetArray []interface{} 325 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil { 326 postView.TextFacets = facetArray 327 } 328 } 329 330 // Parse embed JSON 331 if embed.Valid { 332 var embedData interface{} 333 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil { 334 postView.Embed = embedData 335 } 336 } 337 338 // Build stats 339 postView.Stats = &posts.PostStats{ 340 Upvotes: postView.UpvoteCount, 341 Downvotes: postView.DownvoteCount, 342 Score: postView.Score, 343 CommentCount: postView.CommentCount, 344 } 345 346 // Build the record (required by lexicon) 347 record := map[string]interface{}{ 348 "$type": "social.coves.community.post", 349 "community": communityRef.DID, 350 "author": authorView.DID, 351 "createdAt": postView.CreatedAt.Format(time.RFC3339), 352 } 353 354 // Add optional fields to record if present 355 if title.Valid { 356 record["title"] = title.String 357 } 358 if content.Valid { 359 record["content"] = content.String 360 } 361 if facets.Valid { 362 var facetArray []interface{} 363 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil { 364 record["facets"] = facetArray 365 } 366 } 367 if embed.Valid { 368 var embedData interface{} 369 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil { 370 record["embed"] = embedData 371 } 372 } 373 if labelsJSON.Valid { 374 // Labels are stored as JSONB containing full com.atproto.label.defs#selfLabels structure 375 // Deserialize and include in record 376 var selfLabels posts.SelfLabels 377 if err := json.Unmarshal([]byte(labelsJSON.String), &selfLabels); err == nil { 378 record["labels"] = selfLabels 379 } 380 } 381 382 postView.Record = record 383 384 // Return the computed hot_rank (0.0 if NULL for non-hot sorts) 385 hotRankValue := 0.0 386 if hotRank.Valid { 387 hotRankValue = hotRank.Float64 388 } 389 390 return &postView, hotRankValue, nil 391} 392 393// nullStringPtr converts sql.NullString to *string 394// Helper function used by feed scanning logic across all feed types 395func nullStringPtr(ns sql.NullString) *string { 396 if !ns.Valid { 397 return nil 398 } 399 return &ns.String 400}