A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "Coves/internal/core/posts" 5 "crypto/hmac" 6 "crypto/sha256" 7 "database/sql" 8 "encoding/base64" 9 "encoding/hex" 10 "encoding/json" 11 "fmt" 12 "strconv" 13 "strings" 14 "time" 15) 16 17// feedRepoBase contains shared logic for timeline and discover feed repositories 18// This eliminates ~85% code duplication and ensures bug fixes apply to both feeds 19// 20// DATABASE INDEXES REQUIRED: 21// The feed queries rely on these indexes (created in migration 011_create_posts_table.sql): 22// 23// 1. idx_posts_community_created ON posts(community_did, created_at DESC) WHERE deleted_at IS NULL 24// - Used by: Both timeline and discover for "new" sort 25// - Covers: Community filtering + chronological ordering + soft delete filter 26// 27// 2. idx_posts_community_score ON posts(community_did, score DESC, created_at DESC) WHERE deleted_at IS NULL 28// - Used by: Both timeline and discover for "top" sort 29// - Covers: Community filtering + score ordering + tie-breaking + soft delete filter 30// 31// 3. idx_subscriptions_user_community ON community_subscriptions(user_did, community_did) 32// - Used by: Timeline feed (JOIN with subscriptions) 33// - Covers: User subscription lookup 34// 35// 4. Hot sort uses computed expression: (score / POWER(age_hours + 2, 1.5)) 36// - Cannot be indexed directly (computed at query time) 37// - Uses idx_posts_community_created for base ordering 38// - Performance: ~10-20ms for timeline, ~8-15ms for discover (acceptable for alpha) 39// 40// PERFORMANCE NOTES: 41// - All queries use single execution (no N+1) 42// - JOINs are minimal (3 for timeline, 2 for discover) 43// - Partial indexes (WHERE deleted_at IS NULL) eliminate soft-deleted posts efficiently 44// - Cursor pagination is stable (no offset drift) 45// - Limit+1 pattern checks for next page without extra query 46type feedRepoBase struct { 47 db *sql.DB 48 hotRankExpression string 49 sortClauses map[string]string 50 cursorSecret string // HMAC secret for cursor integrity protection 51} 52 53// newFeedRepoBase creates a new base repository with shared feed logic 54func newFeedRepoBase(db *sql.DB, hotRankExpr string, sortClauses map[string]string, cursorSecret string) *feedRepoBase { 55 return &feedRepoBase{ 56 db: db, 57 hotRankExpression: hotRankExpr, 58 sortClauses: sortClauses, 59 cursorSecret: cursorSecret, 60 } 61} 62 63// buildSortClause returns the ORDER BY SQL and optional time filter 64// Uses whitelist map to prevent SQL injection via dynamic ORDER BY 65func (r *feedRepoBase) buildSortClause(sort, timeframe string) (string, string) { 66 // Use whitelist map for ORDER BY clause (defense-in-depth against SQL injection) 67 orderBy := r.sortClauses[sort] 68 if orderBy == "" { 69 orderBy = r.sortClauses["hot"] // safe default 70 } 71 72 // Add time filter for "top" sort 73 var timeFilter string 74 if sort == "top" { 75 timeFilter = r.buildTimeFilter(timeframe) 76 } 77 78 return orderBy, timeFilter 79} 80 81// buildTimeFilter returns SQL filter for timeframe 82func (r *feedRepoBase) buildTimeFilter(timeframe string) string { 83 if timeframe == "" || timeframe == "all" { 84 return "" 85 } 86 87 var interval string 88 switch timeframe { 89 case "hour": 90 interval = "1 hour" 91 case "day": 92 interval = "1 day" 93 case "week": 94 interval = "1 week" 95 case "month": 96 interval = "1 month" 97 case "year": 98 interval = "1 year" 99 default: 100 return "" 101 } 102 103 return fmt.Sprintf("AND p.created_at > NOW() - INTERVAL '%s'", interval) 104} 105 106// parseCursor decodes and validates pagination cursor 107// paramOffset is the starting parameter number for cursor values ($2 for discover, $3 for timeline) 108func (r *feedRepoBase) parseCursor(cursor *string, sort string, paramOffset int) (string, []interface{}, error) { 109 if cursor == nil || *cursor == "" { 110 return "", nil, nil 111 } 112 113 // Decode base64 cursor 114 decoded, err := base64.StdEncoding.DecodeString(*cursor) 115 if err != nil { 116 return "", nil, fmt.Errorf("invalid cursor encoding") 117 } 118 119 // Parse cursor: payload::signature 120 parts := strings.Split(string(decoded), "::") 121 if len(parts) < 2 { 122 return "", nil, fmt.Errorf("invalid cursor format") 123 } 124 125 // Verify HMAC signature 126 signatureHex := parts[len(parts)-1] 127 payload := strings.Join(parts[:len(parts)-1], "::") 128 129 expectedMAC := hmac.New(sha256.New, []byte(r.cursorSecret)) 130 expectedMAC.Write([]byte(payload)) 131 expectedSignature := hex.EncodeToString(expectedMAC.Sum(nil)) 132 133 if !hmac.Equal([]byte(signatureHex), []byte(expectedSignature)) { 134 return "", nil, fmt.Errorf("invalid cursor signature") 135 } 136 137 // Parse payload based on sort type 138 payloadParts := strings.Split(payload, "::") 139 140 switch sort { 141 case "new": 142 // Cursor format: timestamp::uri 143 if len(payloadParts) != 2 { 144 return "", nil, fmt.Errorf("invalid cursor format") 145 } 146 147 createdAt := payloadParts[0] 148 uri := payloadParts[1] 149 150 // Validate timestamp format 151 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 152 return "", nil, fmt.Errorf("invalid cursor timestamp") 153 } 154 155 // Validate URI format (must be AT-URI) 156 if !strings.HasPrefix(uri, "at://") { 157 return "", nil, fmt.Errorf("invalid cursor URI") 158 } 159 160 filter := fmt.Sprintf(`AND (p.created_at < $%d OR (p.created_at = $%d AND p.uri < $%d))`, 161 paramOffset, paramOffset, paramOffset+1) 162 return filter, []interface{}{createdAt, uri}, nil 163 164 case "top": 165 // Cursor format: score::timestamp::uri 166 if len(payloadParts) != 3 { 167 return "", nil, fmt.Errorf("invalid cursor format for %s sort", sort) 168 } 169 170 scoreStr := payloadParts[0] 171 createdAt := payloadParts[1] 172 uri := payloadParts[2] 173 174 // Validate score is numeric 175 score := 0 176 if _, err := fmt.Sscanf(scoreStr, "%d", &score); err != nil { 177 return "", nil, fmt.Errorf("invalid cursor score") 178 } 179 180 // Validate timestamp format 181 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 182 return "", nil, fmt.Errorf("invalid cursor timestamp") 183 } 184 185 // Validate URI format (must be AT-URI) 186 if !strings.HasPrefix(uri, "at://") { 187 return "", nil, fmt.Errorf("invalid cursor URI") 188 } 189 190 filter := fmt.Sprintf(`AND (p.score < $%d OR (p.score = $%d AND p.created_at < $%d) OR (p.score = $%d AND p.created_at = $%d AND p.uri < $%d))`, 191 paramOffset, paramOffset, paramOffset+1, paramOffset, paramOffset+1, paramOffset+2) 192 return filter, []interface{}{score, createdAt, uri}, nil 193 194 case "hot": 195 // Cursor format: hot_rank::timestamp::uri 196 // CRITICAL: Must use computed hot_rank, not raw score, to prevent pagination bugs 197 if len(payloadParts) != 3 { 198 return "", nil, fmt.Errorf("invalid cursor format for hot sort") 199 } 200 201 hotRankStr := payloadParts[0] 202 createdAt := payloadParts[1] 203 uri := payloadParts[2] 204 205 // Validate hot_rank is numeric (float) 206 hotRank := 0.0 207 if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil { 208 return "", nil, fmt.Errorf("invalid cursor hot rank") 209 } 210 211 // Validate timestamp format 212 if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 213 return "", nil, fmt.Errorf("invalid cursor timestamp") 214 } 215 216 // Validate URI format (must be AT-URI) 217 if !strings.HasPrefix(uri, "at://") { 218 return "", nil, fmt.Errorf("invalid cursor URI") 219 } 220 221 // CRITICAL: Compare against the computed hot_rank expression, not p.score 222 filter := fmt.Sprintf(`AND ((%s < $%d OR (%s = $%d AND p.created_at < $%d) OR (%s = $%d AND p.created_at = $%d AND p.uri < $%d)) AND p.uri != $%d)`, 223 r.hotRankExpression, paramOffset, 224 r.hotRankExpression, paramOffset, paramOffset+1, 225 r.hotRankExpression, paramOffset, paramOffset+1, paramOffset+2, 226 paramOffset+3) 227 return filter, []interface{}{hotRank, createdAt, uri, uri}, nil 228 229 default: 230 return "", nil, nil 231 } 232} 233 234// buildCursor creates HMAC-signed pagination cursor from last post 235// SECURITY: Cursor is signed with HMAC-SHA256 to prevent manipulation 236func (r *feedRepoBase) buildCursor(post *posts.PostView, sort string, hotRank float64) string { 237 var payload string 238 // Use :: as delimiter following Bluesky convention 239 const delimiter = "::" 240 241 switch sort { 242 case "new": 243 // Format: timestamp::uri 244 payload = fmt.Sprintf("%s%s%s", post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 245 246 case "top": 247 // Format: score::timestamp::uri 248 score := 0 249 if post.Stats != nil { 250 score = post.Stats.Score 251 } 252 payload = fmt.Sprintf("%d%s%s%s%s", score, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 253 254 case "hot": 255 // Format: hot_rank::timestamp::uri 256 // CRITICAL: Use computed hot_rank with full precision 257 hotRankStr := strconv.FormatFloat(hotRank, 'g', -1, 64) 258 payload = fmt.Sprintf("%s%s%s%s%s", hotRankStr, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 259 260 default: 261 payload = post.URI 262 } 263 264 // Sign the payload with HMAC-SHA256 265 mac := hmac.New(sha256.New, []byte(r.cursorSecret)) 266 mac.Write([]byte(payload)) 267 signature := hex.EncodeToString(mac.Sum(nil)) 268 269 // Append signature to payload 270 signed := payload + delimiter + signature 271 272 return base64.StdEncoding.EncodeToString([]byte(signed)) 273} 274 275// scanFeedPost scans a database row into a PostView 276// This is the shared scanning logic used by both timeline and discover feeds 277func (r *feedRepoBase) scanFeedPost(rows *sql.Rows) (*posts.PostView, float64, error) { 278 var ( 279 postView posts.PostView 280 authorView posts.AuthorView 281 communityRef posts.CommunityRef 282 title, content sql.NullString 283 facets, embed sql.NullString 284 labelsJSON sql.NullString 285 editedAt sql.NullTime 286 communityAvatar sql.NullString 287 hotRank sql.NullFloat64 288 ) 289 290 err := rows.Scan( 291 &postView.URI, &postView.CID, &postView.RKey, 292 &authorView.DID, &authorView.Handle, 293 &communityRef.DID, &communityRef.Name, &communityAvatar, 294 &title, &content, &facets, &embed, &labelsJSON, 295 &postView.CreatedAt, &editedAt, &postView.IndexedAt, 296 &postView.UpvoteCount, &postView.DownvoteCount, &postView.Score, &postView.CommentCount, 297 &hotRank, 298 ) 299 if err != nil { 300 return nil, 0, err 301 } 302 303 // Build author view 304 postView.Author = &authorView 305 306 // Build community ref 307 communityRef.Avatar = nullStringPtr(communityAvatar) 308 postView.Community = &communityRef 309 310 // Set optional fields 311 postView.Title = nullStringPtr(title) 312 postView.Text = nullStringPtr(content) 313 314 // Parse facets JSON 315 if facets.Valid { 316 var facetArray []interface{} 317 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil { 318 postView.TextFacets = facetArray 319 } 320 } 321 322 // Parse embed JSON 323 if embed.Valid { 324 var embedData interface{} 325 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil { 326 postView.Embed = embedData 327 } 328 } 329 330 // Build stats 331 postView.Stats = &posts.PostStats{ 332 Upvotes: postView.UpvoteCount, 333 Downvotes: postView.DownvoteCount, 334 Score: postView.Score, 335 CommentCount: postView.CommentCount, 336 } 337 338 // Build the record (required by lexicon) 339 record := map[string]interface{}{ 340 "$type": "social.coves.community.post", 341 "community": communityRef.DID, 342 "author": authorView.DID, 343 "createdAt": postView.CreatedAt.Format(time.RFC3339), 344 } 345 346 // Add optional fields to record if present 347 if title.Valid { 348 record["title"] = title.String 349 } 350 if content.Valid { 351 record["content"] = content.String 352 } 353 if facets.Valid { 354 var facetArray []interface{} 355 if err := json.Unmarshal([]byte(facets.String), &facetArray); err == nil { 356 record["facets"] = facetArray 357 } 358 } 359 if embed.Valid { 360 var embedData interface{} 361 if err := json.Unmarshal([]byte(embed.String), &embedData); err == nil { 362 record["embed"] = embedData 363 } 364 } 365 if labelsJSON.Valid { 366 // Labels are stored as JSONB containing full com.atproto.label.defs#selfLabels structure 367 // Deserialize and include in record 368 var selfLabels posts.SelfLabels 369 if err := json.Unmarshal([]byte(labelsJSON.String), &selfLabels); err == nil { 370 record["labels"] = selfLabels 371 } 372 } 373 374 postView.Record = record 375 376 // Return the computed hot_rank (0.0 if NULL for non-hot sorts) 377 hotRankValue := 0.0 378 if hotRank.Valid { 379 hotRankValue = hotRank.Float64 380 } 381 382 return &postView, hotRankValue, nil 383}