A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/atproto/utils" 5 "Coves/internal/core/comments" 6 "context" 7 "database/sql" 8 "encoding/json" 9 "fmt" 10 "log" 11 "strings" 12 "time" 13 14 "github.com/lib/pq" 15) 16 17// Constants for comment validation and processing 18const ( 19 // CommentCollection is the lexicon collection identifier for comments 20 CommentCollection = "social.coves.community.comment" 21 22 // ATProtoScheme is the URI scheme for atProto AT-URIs 23 ATProtoScheme = "at://" 24 25 // MaxCommentContentBytes is the maximum allowed size for comment content 26 // Per lexicon: max 3000 graphemes, ~30000 bytes 27 MaxCommentContentBytes = 30000 28) 29 30// CommentEventConsumer consumes comment-related events from Jetstream 31// Handles CREATE, UPDATE, and DELETE operations for social.coves.community.comment 32type CommentEventConsumer struct { 33 commentRepo comments.Repository 34 db *sql.DB // Direct DB access for atomic count updates 35} 36 37// NewCommentEventConsumer creates a new Jetstream consumer for comment events 38func NewCommentEventConsumer( 39 commentRepo comments.Repository, 40 db *sql.DB, 41) *CommentEventConsumer { 42 return &CommentEventConsumer{ 43 commentRepo: commentRepo, 44 db: db, 45 } 46} 47 48// HandleEvent processes a Jetstream event for comment records 49func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 50 // We only care about commit events for comment records 51 if event.Kind != "commit" || event.Commit == nil { 52 return nil 53 } 54 55 commit := event.Commit 56 57 // Handle comment record operations 58 if commit.Collection == CommentCollection { 59 switch commit.Operation { 60 case "create": 61 return c.createComment(ctx, event.Did, commit) 62 case "update": 63 return c.updateComment(ctx, event.Did, commit) 64 case "delete": 65 return c.deleteComment(ctx, event.Did, commit) 66 } 67 } 68 69 // Silently ignore other operations and collections 70 return nil 71} 72 73// createComment indexes a new comment from the firehose and updates parent counts 74func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 75 if commit.Record == nil { 76 return fmt.Errorf("comment create event missing record data") 77 } 78 79 // Parse the comment record 80 commentRecord, err := parseCommentRecord(commit.Record) 81 if err != nil { 82 return fmt.Errorf("failed to parse comment record: %w", err) 83 } 84 85 // SECURITY: Validate this is a legitimate comment event 86 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil { 87 log.Printf("🚨 SECURITY: Rejecting comment event: %v", err) 88 return err 89 } 90 91 // Build AT-URI for this comment 92 // Format: at://commenter_did/social.coves.community.comment/rkey 93 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey) 94 95 // Parse timestamp from record 96 createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt) 97 if err != nil { 98 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err) 99 createdAt = time.Now() 100 } 101 102 // Serialize optional JSON fields 103 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord) 104 105 // Build comment entity 106 comment := &comments.Comment{ 107 URI: uri, 108 CID: commit.CID, 109 RKey: commit.RKey, 110 CommenterDID: repoDID, // Comment comes from user's repository 111 RootURI: commentRecord.Reply.Root.URI, 112 RootCID: commentRecord.Reply.Root.CID, 113 ParentURI: commentRecord.Reply.Parent.URI, 114 ParentCID: commentRecord.Reply.Parent.CID, 115 Content: commentRecord.Content, 116 ContentFacets: facetsJSON, 117 Embed: embedJSON, 118 ContentLabels: labelsJSON, 119 Langs: commentRecord.Langs, 120 CreatedAt: createdAt, 121 IndexedAt: time.Now(), 122 } 123 124 // Atomically: Index comment + Update parent counts 125 if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil { 126 return fmt.Errorf("failed to index comment and update counts: %w", err) 127 } 128 129 log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI) 130 return nil 131} 132 133// updateComment updates an existing comment's content fields 134func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 135 if commit.Record == nil { 136 return fmt.Errorf("comment update event missing record data") 137 } 138 139 // Parse the updated comment record 140 commentRecord, err := parseCommentRecord(commit.Record) 141 if err != nil { 142 return fmt.Errorf("failed to parse comment record: %w", err) 143 } 144 145 // SECURITY: Validate this is a legitimate update 146 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil { 147 log.Printf("🚨 SECURITY: Rejecting comment update: %v", err) 148 return err 149 } 150 151 // Build AT-URI for the comment being updated 152 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey) 153 154 // Fetch existing comment to validate threading references are immutable 155 existingComment, err := c.commentRepo.GetByURI(ctx, uri) 156 if err != nil { 157 if err == comments.ErrCommentNotFound { 158 // Comment doesn't exist yet - might arrive out of order 159 log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri) 160 return nil 161 } 162 return fmt.Errorf("failed to get existing comment for validation: %w", err) 163 } 164 165 // SECURITY: Threading references are IMMUTABLE after creation 166 // Reject updates that attempt to change root/parent (prevents thread hijacking) 167 if existingComment.RootURI != commentRecord.Reply.Root.URI || 168 existingComment.RootCID != commentRecord.Reply.Root.CID || 169 existingComment.ParentURI != commentRecord.Reply.Parent.URI || 170 existingComment.ParentCID != commentRecord.Reply.Parent.CID { 171 log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri) 172 log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID) 173 log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID) 174 log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID) 175 log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID) 176 return fmt.Errorf("comment threading references cannot be changed after creation") 177 } 178 179 // Serialize optional JSON fields 180 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord) 181 182 // Build comment update entity (preserves vote counts and created_at) 183 comment := &comments.Comment{ 184 URI: uri, 185 CID: commit.CID, 186 Content: commentRecord.Content, 187 ContentFacets: facetsJSON, 188 Embed: embedJSON, 189 ContentLabels: labelsJSON, 190 Langs: commentRecord.Langs, 191 } 192 193 // Update the comment in repository 194 if err := c.commentRepo.Update(ctx, comment); err != nil { 195 return fmt.Errorf("failed to update comment: %w", err) 196 } 197 198 log.Printf("✓ Updated comment: %s", uri) 199 return nil 200} 201 202// deleteComment soft-deletes a comment and updates parent counts 203func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 204 // Build AT-URI for the comment being deleted 205 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey) 206 207 // Get existing comment to know its parent (for decrementing the right counter) 208 existingComment, err := c.commentRepo.GetByURI(ctx, uri) 209 if err != nil { 210 if err == comments.ErrCommentNotFound { 211 // Idempotent: Comment already deleted or never existed 212 log.Printf("Comment already deleted or not found: %s", uri) 213 return nil 214 } 215 return fmt.Errorf("failed to get existing comment: %w", err) 216 } 217 218 // Atomically: Soft-delete comment + Update parent counts 219 if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil { 220 return fmt.Errorf("failed to delete comment and update counts: %w", err) 221 } 222 223 log.Printf("✓ Deleted comment: %s", uri) 224 return nil 225} 226 227// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts 228func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error { 229 tx, err := c.db.BeginTx(ctx, nil) 230 if err != nil { 231 return fmt.Errorf("failed to begin transaction: %w", err) 232 } 233 defer func() { 234 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 235 log.Printf("Failed to rollback transaction: %v", rollbackErr) 236 } 237 }() 238 239 // 1. Check if comment exists and handle resurrection case 240 // In atProto, deleted records' rkeys become available - users can recreate with same rkey 241 // We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts) 242 var existingID int64 243 var existingDeletedAt *time.Time 244 checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1` 245 checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt) 246 247 var commentID int64 248 249 if checkErr == nil { 250 // Comment exists 251 if existingDeletedAt == nil { 252 // Not deleted - this is an idempotent replay, skip gracefully 253 log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI) 254 if commitErr := tx.Commit(); commitErr != nil { 255 return fmt.Errorf("failed to commit transaction: %w", commitErr) 256 } 257 return nil 258 } 259 260 // Comment was soft-deleted, now being recreated (resurrection) 261 // This is a NEW record with same rkey - update ALL fields including threading refs 262 // User may have deleted old comment and created a new one on a different parent/root 263 log.Printf("Resurrecting previously deleted comment: %s", comment.URI) 264 commentID = existingID 265 266 resurrectQuery := ` 267 UPDATE comments 268 SET 269 cid = $1, 270 commenter_did = $2, 271 root_uri = $3, 272 root_cid = $4, 273 parent_uri = $5, 274 parent_cid = $6, 275 content = $7, 276 content_facets = $8, 277 embed = $9, 278 content_labels = $10, 279 langs = $11, 280 created_at = $12, 281 indexed_at = $13, 282 deleted_at = NULL, 283 reply_count = 0 284 WHERE id = $14 285 ` 286 287 _, err = tx.ExecContext( 288 ctx, resurrectQuery, 289 comment.CID, 290 comment.CommenterDID, 291 comment.RootURI, 292 comment.RootCID, 293 comment.ParentURI, 294 comment.ParentCID, 295 comment.Content, 296 comment.ContentFacets, 297 comment.Embed, 298 comment.ContentLabels, 299 pq.Array(comment.Langs), 300 comment.CreatedAt, 301 time.Now(), 302 commentID, 303 ) 304 if err != nil { 305 return fmt.Errorf("failed to resurrect comment: %w", err) 306 } 307 308 } else if checkErr == sql.ErrNoRows { 309 // Comment doesn't exist - insert new comment 310 insertQuery := ` 311 INSERT INTO comments ( 312 uri, cid, rkey, commenter_did, 313 root_uri, root_cid, parent_uri, parent_cid, 314 content, content_facets, embed, content_labels, langs, 315 created_at, indexed_at 316 ) VALUES ( 317 $1, $2, $3, $4, 318 $5, $6, $7, $8, 319 $9, $10, $11, $12, $13, 320 $14, $15 321 ) 322 RETURNING id 323 ` 324 325 err = tx.QueryRowContext( 326 ctx, insertQuery, 327 comment.URI, comment.CID, comment.RKey, comment.CommenterDID, 328 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID, 329 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs), 330 comment.CreatedAt, time.Now(), 331 ).Scan(&commentID) 332 if err != nil { 333 return fmt.Errorf("failed to insert comment: %w", err) 334 } 335 336 } else { 337 // Unexpected error checking for existing comment 338 return fmt.Errorf("failed to check for existing comment: %w", checkErr) 339 } 340 341 // 1.5. Reconcile reply_count for this newly inserted comment 342 // In case any replies arrived out-of-order before this parent was indexed 343 reconcileQuery := ` 344 UPDATE comments 345 SET reply_count = ( 346 SELECT COUNT(*) 347 FROM comments c 348 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL 349 ) 350 WHERE id = $2 351 ` 352 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID) 353 if reconcileErr != nil { 354 log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr) 355 // Continue anyway - this is a best-effort reconciliation 356 } 357 358 // 2. Update parent counts atomically 359 // Parent could be a post (increment comment_count) or a comment (increment reply_count) 360 // Parse collection from parent URI to determine target table 361 // 362 // FIXME(P1): Post comment_count reconciliation not implemented 363 // When a comment arrives before its parent post (common with cross-repo Jetstream ordering), 364 // the post update below returns 0 rows and we only log a warning. Later, when the post 365 // is indexed by the post consumer, there's NO reconciliation logic to count pre-existing 366 // comments. This causes posts to have permanently stale comment_count values. 367 // 368 // FIX REQUIRED: Post consumer MUST implement the same reconciliation pattern as comments 369 // (see lines 292-305 above). When indexing a new post, count any comments where parent_uri 370 // matches the post URI and set comment_count accordingly. 371 // 372 // Test demonstrating issue: TestCommentConsumer_PostCountReconciliation_Limitation 373 collection := utils.ExtractCollectionFromURI(comment.ParentURI) 374 375 var updateQuery string 376 switch collection { 377 case "social.coves.community.post": 378 // Comment on post - update posts.comment_count 379 updateQuery = ` 380 UPDATE posts 381 SET comment_count = comment_count + 1 382 WHERE uri = $1 AND deleted_at IS NULL 383 ` 384 385 case "social.coves.community.comment": 386 // Reply to comment - update comments.reply_count 387 updateQuery = ` 388 UPDATE comments 389 SET reply_count = reply_count + 1 390 WHERE uri = $1 AND deleted_at IS NULL 391 ` 392 393 default: 394 // Unknown or unsupported parent collection 395 // Comment is still indexed, we just don't update parent counts 396 log.Printf("Comment parent has unsupported collection: %s (comment indexed, parent count not updated)", collection) 397 if commitErr := tx.Commit(); commitErr != nil { 398 return fmt.Errorf("failed to commit transaction: %w", commitErr) 399 } 400 return nil 401 } 402 403 result, err := tx.ExecContext(ctx, updateQuery, comment.ParentURI) 404 if err != nil { 405 return fmt.Errorf("failed to update parent count: %w", err) 406 } 407 408 rowsAffected, err := result.RowsAffected() 409 if err != nil { 410 return fmt.Errorf("failed to check update result: %w", err) 411 } 412 413 // If parent not found, that's OK (parent might not be indexed yet) 414 if rowsAffected == 0 { 415 log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI) 416 } 417 418 // Commit transaction 419 if err := tx.Commit(); err != nil { 420 return fmt.Errorf("failed to commit transaction: %w", err) 421 } 422 423 return nil 424} 425 426// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts 427func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error { 428 tx, err := c.db.BeginTx(ctx, nil) 429 if err != nil { 430 return fmt.Errorf("failed to begin transaction: %w", err) 431 } 432 defer func() { 433 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 434 log.Printf("Failed to rollback transaction: %v", rollbackErr) 435 } 436 }() 437 438 // 1. Soft-delete the comment (idempotent) 439 deleteQuery := ` 440 UPDATE comments 441 SET deleted_at = $2 442 WHERE uri = $1 AND deleted_at IS NULL 443 ` 444 445 result, err := tx.ExecContext(ctx, deleteQuery, comment.URI, time.Now()) 446 if err != nil { 447 return fmt.Errorf("failed to delete comment: %w", err) 448 } 449 450 rowsAffected, err := result.RowsAffected() 451 if err != nil { 452 return fmt.Errorf("failed to check delete result: %w", err) 453 } 454 455 // Idempotent: If no rows affected, comment already deleted 456 if rowsAffected == 0 { 457 log.Printf("Comment already deleted: %s (idempotent)", comment.URI) 458 if commitErr := tx.Commit(); commitErr != nil { 459 return fmt.Errorf("failed to commit transaction: %w", commitErr) 460 } 461 return nil 462 } 463 464 // 2. Decrement parent counts atomically 465 // Parent could be a post or comment - parse collection to determine target table 466 collection := utils.ExtractCollectionFromURI(comment.ParentURI) 467 468 var updateQuery string 469 switch collection { 470 case "social.coves.community.post": 471 // Comment on post - decrement posts.comment_count 472 updateQuery = ` 473 UPDATE posts 474 SET comment_count = GREATEST(0, comment_count - 1) 475 WHERE uri = $1 AND deleted_at IS NULL 476 ` 477 478 case "social.coves.community.comment": 479 // Reply to comment - decrement comments.reply_count 480 updateQuery = ` 481 UPDATE comments 482 SET reply_count = GREATEST(0, reply_count - 1) 483 WHERE uri = $1 AND deleted_at IS NULL 484 ` 485 486 default: 487 // Unknown or unsupported parent collection 488 // Comment is still deleted, we just don't update parent counts 489 log.Printf("Comment parent has unsupported collection: %s (comment deleted, parent count not updated)", collection) 490 if commitErr := tx.Commit(); commitErr != nil { 491 return fmt.Errorf("failed to commit transaction: %w", commitErr) 492 } 493 return nil 494 } 495 496 result, err = tx.ExecContext(ctx, updateQuery, comment.ParentURI) 497 if err != nil { 498 return fmt.Errorf("failed to update parent count: %w", err) 499 } 500 501 rowsAffected, err = result.RowsAffected() 502 if err != nil { 503 return fmt.Errorf("failed to check update result: %w", err) 504 } 505 506 // If parent not found, that's OK (parent might be deleted) 507 if rowsAffected == 0 { 508 log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI) 509 } 510 511 // Commit transaction 512 if err := tx.Commit(); err != nil { 513 return fmt.Errorf("failed to commit transaction: %w", err) 514 } 515 516 return nil 517} 518 519// validateCommentEvent performs security validation on comment events 520func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error { 521 // SECURITY: Comments MUST come from user repositories (repo owner = commenter DID) 522 // The repository owner (repoDID) IS the commenter - comments are stored in user repos. 523 // 524 // We do NOT check if the user exists in AppView because: 525 // 1. Comment events may arrive before user events in Jetstream (race condition) 526 // 2. The comment came from the user's PDS repository (authenticated by PDS) 527 // 3. The database FK constraint was removed to allow out-of-order indexing 528 // 4. Orphaned comments (from never-indexed users) are harmless 529 // 530 // Security is maintained because: 531 // - Comment must come from user's own PDS repository (verified by atProto) 532 // - Fake DIDs will fail PDS authentication 533 534 // Validate DID format (basic sanity check) 535 if !strings.HasPrefix(repoDID, "did:") { 536 return fmt.Errorf("invalid commenter DID format: %s", repoDID) 537 } 538 539 // Validate content is not empty (required per lexicon) 540 if comment.Content == "" { 541 return fmt.Errorf("comment content is required") 542 } 543 544 // Validate content length (defensive check - PDS should enforce this) 545 // Per lexicon: max 3000 graphemes, ~30000 bytes 546 // We check bytes as a simple defensive measure 547 if len(comment.Content) > MaxCommentContentBytes { 548 return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content)) 549 } 550 551 // Validate reply references exist 552 if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" { 553 return fmt.Errorf("invalid root reference: must have both URI and CID") 554 } 555 556 if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" { 557 return fmt.Errorf("invalid parent reference: must have both URI and CID") 558 } 559 560 // Validate AT-URI structure for root and parent 561 if err := validateATURI(comment.Reply.Root.URI); err != nil { 562 return fmt.Errorf("invalid root URI: %w", err) 563 } 564 565 if err := validateATURI(comment.Reply.Parent.URI); err != nil { 566 return fmt.Errorf("invalid parent URI: %w", err) 567 } 568 569 return nil 570} 571 572// validateATURI performs basic structure validation on AT-URIs 573// Format: at://did:method:id/collection/rkey 574// This is defensive validation - we trust PDS but catch obviously malformed URIs 575func validateATURI(uri string) error { 576 if !strings.HasPrefix(uri, ATProtoScheme) { 577 return fmt.Errorf("must start with %s", ATProtoScheme) 578 } 579 580 // Remove at:// prefix and split by / 581 withoutScheme := strings.TrimPrefix(uri, ATProtoScheme) 582 parts := strings.Split(withoutScheme, "/") 583 584 // Must have at least 3 parts: did, collection, rkey 585 if len(parts) < 3 { 586 return fmt.Errorf("invalid structure (expected at://did/collection/rkey)") 587 } 588 589 // First part should be a DID 590 if !strings.HasPrefix(parts[0], "did:") { 591 return fmt.Errorf("repository identifier must be a DID") 592 } 593 594 // Collection and rkey should not be empty 595 if parts[1] == "" || parts[2] == "" { 596 return fmt.Errorf("collection and rkey cannot be empty") 597 } 598 599 return nil 600} 601 602// CommentRecordFromJetstream represents a comment record as received from Jetstream 603// Matches social.coves.community.comment lexicon 604type CommentRecordFromJetstream struct { 605 Labels interface{} `json:"labels,omitempty"` 606 Embed map[string]interface{} `json:"embed,omitempty"` 607 Reply ReplyRefFromJetstream `json:"reply"` 608 Type string `json:"$type"` 609 Content string `json:"content"` 610 CreatedAt string `json:"createdAt"` 611 Facets []interface{} `json:"facets,omitempty"` 612 Langs []string `json:"langs,omitempty"` 613} 614 615// ReplyRefFromJetstream represents the threading structure 616type ReplyRefFromJetstream struct { 617 Root StrongRefFromJetstream `json:"root"` 618 Parent StrongRefFromJetstream `json:"parent"` 619} 620 621// parseCommentRecord parses a comment record from Jetstream event data 622func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) { 623 // Marshal to JSON and back for proper type conversion 624 recordJSON, err := json.Marshal(record) 625 if err != nil { 626 return nil, fmt.Errorf("failed to marshal record: %w", err) 627 } 628 629 var comment CommentRecordFromJetstream 630 if err := json.Unmarshal(recordJSON, &comment); err != nil { 631 return nil, fmt.Errorf("failed to unmarshal comment record: %w", err) 632 } 633 634 // Validate required fields 635 if comment.Content == "" { 636 return nil, fmt.Errorf("comment record missing content field") 637 } 638 639 if comment.CreatedAt == "" { 640 return nil, fmt.Errorf("comment record missing createdAt field") 641 } 642 643 return &comment, nil 644} 645 646// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings 647// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication) 648func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) { 649 // Serialize facets if present 650 if len(commentRecord.Facets) > 0 { 651 if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil { 652 facetsStr := string(facetsBytes) 653 facetsJSON = &facetsStr 654 } 655 } 656 657 // Serialize embed if present 658 if len(commentRecord.Embed) > 0 { 659 if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil { 660 embedStr := string(embedBytes) 661 embedJSON = &embedStr 662 } 663 } 664 665 // Serialize labels if present 666 if commentRecord.Labels != nil { 667 if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil { 668 labelsStr := string(labelsBytes) 669 labelsJSON = &labelsStr 670 } 671 } 672 673 return facetsJSON, embedJSON, labelsJSON 674}