A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "log" 9 "strings" 10 "time" 11 12 "Coves/internal/atproto/utils" 13 "Coves/internal/core/comments" 14 15 "github.com/lib/pq" 16) 17 18// Constants for comment validation and processing 19const ( 20 // CommentCollection is the lexicon collection identifier for comments 21 CommentCollection = "social.coves.feed.comment" 22 23 // ATProtoScheme is the URI scheme for atProto AT-URIs 24 ATProtoScheme = "at://" 25 26 // MaxCommentContentBytes is the maximum allowed size for comment content 27 // Per lexicon: max 3000 graphemes, ~30000 bytes 28 MaxCommentContentBytes = 30000 29) 30 31// CommentEventConsumer consumes comment-related events from Jetstream 32// Handles CREATE, UPDATE, and DELETE operations for social.coves.feed.comment 33type CommentEventConsumer struct { 34 commentRepo comments.Repository 35 db *sql.DB // Direct DB access for atomic count updates 36} 37 38// NewCommentEventConsumer creates a new Jetstream consumer for comment events 39func NewCommentEventConsumer( 40 commentRepo comments.Repository, 41 db *sql.DB, 42) *CommentEventConsumer { 43 return &CommentEventConsumer{ 44 commentRepo: commentRepo, 45 db: db, 46 } 47} 48 49// HandleEvent processes a Jetstream event for comment records 50func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 51 // We only care about commit events for comment records 52 if event.Kind != "commit" || event.Commit == nil { 53 return nil 54 } 55 56 commit := event.Commit 57 58 // Handle comment record operations 59 if commit.Collection == CommentCollection { 60 switch commit.Operation { 61 case "create": 62 return c.createComment(ctx, event.Did, commit) 63 case "update": 64 return c.updateComment(ctx, event.Did, commit) 65 case "delete": 66 return c.deleteComment(ctx, event.Did, commit) 67 } 68 } 69 70 // Silently ignore other operations and collections 71 return nil 72} 73 74// createComment indexes a new comment from the firehose and updates parent counts 75func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 76 if commit.Record == nil { 77 return fmt.Errorf("comment create event missing record data") 78 } 79 80 // Parse the comment record 81 commentRecord, err := parseCommentRecord(commit.Record) 82 if err != nil { 83 return fmt.Errorf("failed to parse comment record: %w", err) 84 } 85 86 // SECURITY: Validate this is a legitimate comment event 87 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil { 88 log.Printf("🚨 SECURITY: Rejecting comment event: %v", err) 89 return err 90 } 91 92 // Build AT-URI for this comment 93 // Format: at://commenter_did/social.coves.feed.comment/rkey 94 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey) 95 96 // Parse timestamp from record 97 createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt) 98 if err != nil { 99 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err) 100 createdAt = time.Now() 101 } 102 103 // Serialize optional JSON fields 104 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord) 105 106 // Build comment entity 107 comment := &comments.Comment{ 108 URI: uri, 109 CID: commit.CID, 110 RKey: commit.RKey, 111 CommenterDID: repoDID, // Comment comes from user's repository 112 RootURI: commentRecord.Reply.Root.URI, 113 RootCID: commentRecord.Reply.Root.CID, 114 ParentURI: commentRecord.Reply.Parent.URI, 115 ParentCID: commentRecord.Reply.Parent.CID, 116 Content: commentRecord.Content, 117 ContentFacets: facetsJSON, 118 Embed: embedJSON, 119 ContentLabels: labelsJSON, 120 Langs: commentRecord.Langs, 121 CreatedAt: createdAt, 122 IndexedAt: time.Now(), 123 } 124 125 // Atomically: Index comment + Update parent counts 126 if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil { 127 return fmt.Errorf("failed to index comment and update counts: %w", err) 128 } 129 130 log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI) 131 return nil 132} 133 134// updateComment updates an existing comment's content fields 135func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 136 if commit.Record == nil { 137 return fmt.Errorf("comment update event missing record data") 138 } 139 140 // Parse the updated comment record 141 commentRecord, err := parseCommentRecord(commit.Record) 142 if err != nil { 143 return fmt.Errorf("failed to parse comment record: %w", err) 144 } 145 146 // SECURITY: Validate this is a legitimate update 147 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil { 148 log.Printf("🚨 SECURITY: Rejecting comment update: %v", err) 149 return err 150 } 151 152 // Build AT-URI for the comment being updated 153 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey) 154 155 // Fetch existing comment to validate threading references are immutable 156 existingComment, err := c.commentRepo.GetByURI(ctx, uri) 157 if err != nil { 158 if err == comments.ErrCommentNotFound { 159 // Comment doesn't exist yet - might arrive out of order 160 log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri) 161 return nil 162 } 163 return fmt.Errorf("failed to get existing comment for validation: %w", err) 164 } 165 166 // SECURITY: Threading references are IMMUTABLE after creation 167 // Reject updates that attempt to change root/parent (prevents thread hijacking) 168 if existingComment.RootURI != commentRecord.Reply.Root.URI || 169 existingComment.RootCID != commentRecord.Reply.Root.CID || 170 existingComment.ParentURI != commentRecord.Reply.Parent.URI || 171 existingComment.ParentCID != commentRecord.Reply.Parent.CID { 172 log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri) 173 log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID) 174 log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID) 175 log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID) 176 log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID) 177 return fmt.Errorf("comment threading references cannot be changed after creation") 178 } 179 180 // Serialize optional JSON fields 181 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord) 182 183 // Build comment update entity (preserves vote counts and created_at) 184 comment := &comments.Comment{ 185 URI: uri, 186 CID: commit.CID, 187 Content: commentRecord.Content, 188 ContentFacets: facetsJSON, 189 Embed: embedJSON, 190 ContentLabels: labelsJSON, 191 Langs: commentRecord.Langs, 192 } 193 194 // Update the comment in repository 195 if err := c.commentRepo.Update(ctx, comment); err != nil { 196 return fmt.Errorf("failed to update comment: %w", err) 197 } 198 199 log.Printf("✓ Updated comment: %s", uri) 200 return nil 201} 202 203// deleteComment soft-deletes a comment and updates parent counts 204func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 205 // Build AT-URI for the comment being deleted 206 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey) 207 208 // Get existing comment to know its parent (for decrementing the right counter) 209 existingComment, err := c.commentRepo.GetByURI(ctx, uri) 210 if err != nil { 211 if err == comments.ErrCommentNotFound { 212 // Idempotent: Comment already deleted or never existed 213 log.Printf("Comment already deleted or not found: %s", uri) 214 return nil 215 } 216 return fmt.Errorf("failed to get existing comment: %w", err) 217 } 218 219 // Atomically: Soft-delete comment + Update parent counts 220 if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil { 221 return fmt.Errorf("failed to delete comment and update counts: %w", err) 222 } 223 224 log.Printf("✓ Deleted comment: %s", uri) 225 return nil 226} 227 228// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts 229func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error { 230 tx, err := c.db.BeginTx(ctx, nil) 231 if err != nil { 232 return fmt.Errorf("failed to begin transaction: %w", err) 233 } 234 defer func() { 235 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 236 log.Printf("Failed to rollback transaction: %v", rollbackErr) 237 } 238 }() 239 240 // 1. Check if comment exists and handle resurrection case 241 // In atProto, deleted records' rkeys become available - users can recreate with same rkey 242 // We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts) 243 var existingID int64 244 var existingDeletedAt *time.Time 245 checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1` 246 checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt) 247 248 var commentID int64 249 250 if checkErr == nil { 251 // Comment exists 252 if existingDeletedAt == nil { 253 // Not deleted - this is an idempotent replay, skip gracefully 254 log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI) 255 if commitErr := tx.Commit(); commitErr != nil { 256 return fmt.Errorf("failed to commit transaction: %w", commitErr) 257 } 258 return nil 259 } 260 261 // Comment was soft-deleted, now being recreated (resurrection) 262 // This is a NEW record with same rkey - update ALL fields including threading refs 263 // User may have deleted old comment and created a new one on a different parent/root 264 log.Printf("Resurrecting previously deleted comment: %s", comment.URI) 265 commentID = existingID 266 267 resurrectQuery := ` 268 UPDATE comments 269 SET 270 cid = $1, 271 commenter_did = $2, 272 root_uri = $3, 273 root_cid = $4, 274 parent_uri = $5, 275 parent_cid = $6, 276 content = $7, 277 content_facets = $8, 278 embed = $9, 279 content_labels = $10, 280 langs = $11, 281 created_at = $12, 282 indexed_at = $13, 283 deleted_at = NULL, 284 reply_count = 0 285 WHERE id = $14 286 ` 287 288 _, err = tx.ExecContext( 289 ctx, resurrectQuery, 290 comment.CID, 291 comment.CommenterDID, 292 comment.RootURI, 293 comment.RootCID, 294 comment.ParentURI, 295 comment.ParentCID, 296 comment.Content, 297 comment.ContentFacets, 298 comment.Embed, 299 comment.ContentLabels, 300 pq.Array(comment.Langs), 301 comment.CreatedAt, 302 time.Now(), 303 commentID, 304 ) 305 if err != nil { 306 return fmt.Errorf("failed to resurrect comment: %w", err) 307 } 308 309 } else if checkErr == sql.ErrNoRows { 310 // Comment doesn't exist - insert new comment 311 insertQuery := ` 312 INSERT INTO comments ( 313 uri, cid, rkey, commenter_did, 314 root_uri, root_cid, parent_uri, parent_cid, 315 content, content_facets, embed, content_labels, langs, 316 created_at, indexed_at 317 ) VALUES ( 318 $1, $2, $3, $4, 319 $5, $6, $7, $8, 320 $9, $10, $11, $12, $13, 321 $14, $15 322 ) 323 RETURNING id 324 ` 325 326 err = tx.QueryRowContext( 327 ctx, insertQuery, 328 comment.URI, comment.CID, comment.RKey, comment.CommenterDID, 329 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID, 330 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs), 331 comment.CreatedAt, time.Now(), 332 ).Scan(&commentID) 333 if err != nil { 334 return fmt.Errorf("failed to insert comment: %w", err) 335 } 336 337 } else { 338 // Unexpected error checking for existing comment 339 return fmt.Errorf("failed to check for existing comment: %w", checkErr) 340 } 341 342 // 1.5. Reconcile reply_count for this newly inserted comment 343 // In case any replies arrived out-of-order before this parent was indexed 344 reconcileQuery := ` 345 UPDATE comments 346 SET reply_count = ( 347 SELECT COUNT(*) 348 FROM comments c 349 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL 350 ) 351 WHERE id = $2 352 ` 353 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID) 354 if reconcileErr != nil { 355 log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr) 356 // Continue anyway - this is a best-effort reconciliation 357 } 358 359 // 2. Update parent counts atomically 360 // Parent could be a post (increment comment_count) or a comment (increment reply_count) 361 // Parse collection from parent URI to determine target table 362 // 363 // FIXME(P1): Post comment_count reconciliation not implemented 364 // When a comment arrives before its parent post (common with cross-repo Jetstream ordering), 365 // the post update below returns 0 rows and we only log a warning. Later, when the post 366 // is indexed by the post consumer, there's NO reconciliation logic to count pre-existing 367 // comments. This causes posts to have permanently stale comment_count values. 368 // 369 // FIX REQUIRED: Post consumer MUST implement the same reconciliation pattern as comments 370 // (see lines 292-305 above). When indexing a new post, count any comments where parent_uri 371 // matches the post URI and set comment_count accordingly. 372 // 373 // Test demonstrating issue: TestCommentConsumer_PostCountReconciliation_Limitation 374 collection := utils.ExtractCollectionFromURI(comment.ParentURI) 375 376 var updateQuery string 377 switch collection { 378 case "social.coves.community.post": 379 // Comment on post - update posts.comment_count 380 updateQuery = ` 381 UPDATE posts 382 SET comment_count = comment_count + 1 383 WHERE uri = $1 AND deleted_at IS NULL 384 ` 385 386 case "social.coves.feed.comment": 387 // Reply to comment - update comments.reply_count 388 updateQuery = ` 389 UPDATE comments 390 SET reply_count = reply_count + 1 391 WHERE uri = $1 AND deleted_at IS NULL 392 ` 393 394 default: 395 // Unknown or unsupported parent collection 396 // Comment is still indexed, we just don't update parent counts 397 log.Printf("Comment parent has unsupported collection: %s (comment indexed, parent count not updated)", collection) 398 if commitErr := tx.Commit(); commitErr != nil { 399 return fmt.Errorf("failed to commit transaction: %w", commitErr) 400 } 401 return nil 402 } 403 404 result, err := tx.ExecContext(ctx, updateQuery, comment.ParentURI) 405 if err != nil { 406 return fmt.Errorf("failed to update parent count: %w", err) 407 } 408 409 rowsAffected, err := result.RowsAffected() 410 if err != nil { 411 return fmt.Errorf("failed to check update result: %w", err) 412 } 413 414 // If parent not found, that's OK (parent might not be indexed yet) 415 if rowsAffected == 0 { 416 log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI) 417 } 418 419 // Commit transaction 420 if err := tx.Commit(); err != nil { 421 return fmt.Errorf("failed to commit transaction: %w", err) 422 } 423 424 return nil 425} 426 427// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts 428func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error { 429 tx, err := c.db.BeginTx(ctx, nil) 430 if err != nil { 431 return fmt.Errorf("failed to begin transaction: %w", err) 432 } 433 defer func() { 434 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 435 log.Printf("Failed to rollback transaction: %v", rollbackErr) 436 } 437 }() 438 439 // 1. Soft-delete the comment (idempotent) 440 deleteQuery := ` 441 UPDATE comments 442 SET deleted_at = $2 443 WHERE uri = $1 AND deleted_at IS NULL 444 ` 445 446 result, err := tx.ExecContext(ctx, deleteQuery, comment.URI, time.Now()) 447 if err != nil { 448 return fmt.Errorf("failed to delete comment: %w", err) 449 } 450 451 rowsAffected, err := result.RowsAffected() 452 if err != nil { 453 return fmt.Errorf("failed to check delete result: %w", err) 454 } 455 456 // Idempotent: If no rows affected, comment already deleted 457 if rowsAffected == 0 { 458 log.Printf("Comment already deleted: %s (idempotent)", comment.URI) 459 if commitErr := tx.Commit(); commitErr != nil { 460 return fmt.Errorf("failed to commit transaction: %w", commitErr) 461 } 462 return nil 463 } 464 465 // 2. Decrement parent counts atomically 466 // Parent could be a post or comment - parse collection to determine target table 467 collection := utils.ExtractCollectionFromURI(comment.ParentURI) 468 469 var updateQuery string 470 switch collection { 471 case "social.coves.community.post": 472 // Comment on post - decrement posts.comment_count 473 updateQuery = ` 474 UPDATE posts 475 SET comment_count = GREATEST(0, comment_count - 1) 476 WHERE uri = $1 AND deleted_at IS NULL 477 ` 478 479 case "social.coves.feed.comment": 480 // Reply to comment - decrement comments.reply_count 481 updateQuery = ` 482 UPDATE comments 483 SET reply_count = GREATEST(0, reply_count - 1) 484 WHERE uri = $1 AND deleted_at IS NULL 485 ` 486 487 default: 488 // Unknown or unsupported parent collection 489 // Comment is still deleted, we just don't update parent counts 490 log.Printf("Comment parent has unsupported collection: %s (comment deleted, parent count not updated)", collection) 491 if commitErr := tx.Commit(); commitErr != nil { 492 return fmt.Errorf("failed to commit transaction: %w", commitErr) 493 } 494 return nil 495 } 496 497 result, err = tx.ExecContext(ctx, updateQuery, comment.ParentURI) 498 if err != nil { 499 return fmt.Errorf("failed to update parent count: %w", err) 500 } 501 502 rowsAffected, err = result.RowsAffected() 503 if err != nil { 504 return fmt.Errorf("failed to check update result: %w", err) 505 } 506 507 // If parent not found, that's OK (parent might be deleted) 508 if rowsAffected == 0 { 509 log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI) 510 } 511 512 // Commit transaction 513 if err := tx.Commit(); err != nil { 514 return fmt.Errorf("failed to commit transaction: %w", err) 515 } 516 517 return nil 518} 519 520// validateCommentEvent performs security validation on comment events 521func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error { 522 // SECURITY: Comments MUST come from user repositories (repo owner = commenter DID) 523 // The repository owner (repoDID) IS the commenter - comments are stored in user repos. 524 // 525 // We do NOT check if the user exists in AppView because: 526 // 1. Comment events may arrive before user events in Jetstream (race condition) 527 // 2. The comment came from the user's PDS repository (authenticated by PDS) 528 // 3. The database FK constraint was removed to allow out-of-order indexing 529 // 4. Orphaned comments (from never-indexed users) are harmless 530 // 531 // Security is maintained because: 532 // - Comment must come from user's own PDS repository (verified by atProto) 533 // - Fake DIDs will fail PDS authentication 534 535 // Validate DID format (basic sanity check) 536 if !strings.HasPrefix(repoDID, "did:") { 537 return fmt.Errorf("invalid commenter DID format: %s", repoDID) 538 } 539 540 // Validate content is not empty (required per lexicon) 541 if comment.Content == "" { 542 return fmt.Errorf("comment content is required") 543 } 544 545 // Validate content length (defensive check - PDS should enforce this) 546 // Per lexicon: max 3000 graphemes, ~30000 bytes 547 // We check bytes as a simple defensive measure 548 if len(comment.Content) > MaxCommentContentBytes { 549 return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content)) 550 } 551 552 // Validate reply references exist 553 if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" { 554 return fmt.Errorf("invalid root reference: must have both URI and CID") 555 } 556 557 if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" { 558 return fmt.Errorf("invalid parent reference: must have both URI and CID") 559 } 560 561 // Validate AT-URI structure for root and parent 562 if err := validateATURI(comment.Reply.Root.URI); err != nil { 563 return fmt.Errorf("invalid root URI: %w", err) 564 } 565 566 if err := validateATURI(comment.Reply.Parent.URI); err != nil { 567 return fmt.Errorf("invalid parent URI: %w", err) 568 } 569 570 return nil 571} 572 573// validateATURI performs basic structure validation on AT-URIs 574// Format: at://did:method:id/collection/rkey 575// This is defensive validation - we trust PDS but catch obviously malformed URIs 576func validateATURI(uri string) error { 577 if !strings.HasPrefix(uri, ATProtoScheme) { 578 return fmt.Errorf("must start with %s", ATProtoScheme) 579 } 580 581 // Remove at:// prefix and split by / 582 withoutScheme := strings.TrimPrefix(uri, ATProtoScheme) 583 parts := strings.Split(withoutScheme, "/") 584 585 // Must have at least 3 parts: did, collection, rkey 586 if len(parts) < 3 { 587 return fmt.Errorf("invalid structure (expected at://did/collection/rkey)") 588 } 589 590 // First part should be a DID 591 if !strings.HasPrefix(parts[0], "did:") { 592 return fmt.Errorf("repository identifier must be a DID") 593 } 594 595 // Collection and rkey should not be empty 596 if parts[1] == "" || parts[2] == "" { 597 return fmt.Errorf("collection and rkey cannot be empty") 598 } 599 600 return nil 601} 602 603// CommentRecordFromJetstream represents a comment record as received from Jetstream 604// Matches social.coves.feed.comment lexicon 605type CommentRecordFromJetstream struct { 606 Labels interface{} `json:"labels,omitempty"` 607 Embed map[string]interface{} `json:"embed,omitempty"` 608 Reply ReplyRefFromJetstream `json:"reply"` 609 Type string `json:"$type"` 610 Content string `json:"content"` 611 CreatedAt string `json:"createdAt"` 612 Facets []interface{} `json:"facets,omitempty"` 613 Langs []string `json:"langs,omitempty"` 614} 615 616// ReplyRefFromJetstream represents the threading structure 617type ReplyRefFromJetstream struct { 618 Root StrongRefFromJetstream `json:"root"` 619 Parent StrongRefFromJetstream `json:"parent"` 620} 621 622// parseCommentRecord parses a comment record from Jetstream event data 623func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) { 624 // Marshal to JSON and back for proper type conversion 625 recordJSON, err := json.Marshal(record) 626 if err != nil { 627 return nil, fmt.Errorf("failed to marshal record: %w", err) 628 } 629 630 var comment CommentRecordFromJetstream 631 if err := json.Unmarshal(recordJSON, &comment); err != nil { 632 return nil, fmt.Errorf("failed to unmarshal comment record: %w", err) 633 } 634 635 // Validate required fields 636 if comment.Content == "" { 637 return nil, fmt.Errorf("comment record missing content field") 638 } 639 640 if comment.CreatedAt == "" { 641 return nil, fmt.Errorf("comment record missing createdAt field") 642 } 643 644 return &comment, nil 645} 646 647// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings 648// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication) 649func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) { 650 // Serialize facets if present 651 if len(commentRecord.Facets) > 0 { 652 if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil { 653 facetsStr := string(facetsBytes) 654 facetsJSON = &facetsStr 655 } 656 } 657 658 // Serialize embed if present 659 if len(commentRecord.Embed) > 0 { 660 if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil { 661 embedStr := string(embedBytes) 662 embedJSON = &embedStr 663 } 664 } 665 666 // Serialize labels if present 667 if commentRecord.Labels != nil { 668 if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil { 669 labelsStr := string(labelsBytes) 670 labelsJSON = &labelsStr 671 } 672 } 673 674 return facetsJSON, embedJSON, labelsJSON 675}