A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/atproto/utils" 5 "Coves/internal/core/comments" 6 "context" 7 "database/sql" 8 "encoding/json" 9 "fmt" 10 "log" 11 "strings" 12 "time" 13 14 "github.com/lib/pq" 15) 16 17// Constants for comment validation and processing 18const ( 19 // CommentCollection is the lexicon collection identifier for comments 20 CommentCollection = "social.coves.community.comment" 21 22 // ATProtoScheme is the URI scheme for atProto AT-URIs 23 ATProtoScheme = "at://" 24 25 // MaxCommentContentBytes is the maximum allowed size for comment content 26 // Per lexicon: max 3000 graphemes, ~30000 bytes 27 MaxCommentContentBytes = 30000 28) 29 30// CommentEventConsumer consumes comment-related events from Jetstream 31// Handles CREATE, UPDATE, and DELETE operations for social.coves.community.comment 32type CommentEventConsumer struct { 33 commentRepo comments.Repository 34 db *sql.DB // Direct DB access for atomic count updates 35} 36 37// NewCommentEventConsumer creates a new Jetstream consumer for comment events 38func NewCommentEventConsumer( 39 commentRepo comments.Repository, 40 db *sql.DB, 41) *CommentEventConsumer { 42 return &CommentEventConsumer{ 43 commentRepo: commentRepo, 44 db: db, 45 } 46} 47 48// HandleEvent processes a Jetstream event for comment records 49func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 50 // We only care about commit events for comment records 51 if event.Kind != "commit" || event.Commit == nil { 52 return nil 53 } 54 55 commit := event.Commit 56 57 // Handle comment record operations 58 if commit.Collection == CommentCollection { 59 switch commit.Operation { 60 case "create": 61 return c.createComment(ctx, event.Did, commit) 62 case "update": 63 return c.updateComment(ctx, event.Did, commit) 64 case "delete": 65 return c.deleteComment(ctx, event.Did, commit) 66 } 67 } 68 69 // Silently ignore other operations and collections 70 return nil 71} 72 73// createComment indexes a new comment from the firehose and updates parent counts 74func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 75 if commit.Record == nil { 76 return fmt.Errorf("comment create event missing record data") 77 } 78 79 // Parse the comment record 80 commentRecord, err := parseCommentRecord(commit.Record) 81 if err != nil { 82 return fmt.Errorf("failed to parse comment record: %w", err) 83 } 84 85 // SECURITY: Validate this is a legitimate comment event 86 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil { 87 log.Printf("🚨 SECURITY: Rejecting comment event: %v", err) 88 return err 89 } 90 91 // Build AT-URI for this comment 92 // Format: at://commenter_did/social.coves.community.comment/rkey 93 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey) 94 95 // Parse timestamp from record 96 createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt) 97 if err != nil { 98 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err) 99 createdAt = time.Now() 100 } 101 102 // Serialize optional JSON fields 103 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord) 104 105 // Build comment entity 106 comment := &comments.Comment{ 107 URI: uri, 108 CID: commit.CID, 109 RKey: commit.RKey, 110 CommenterDID: repoDID, // Comment comes from user's repository 111 RootURI: commentRecord.Reply.Root.URI, 112 RootCID: commentRecord.Reply.Root.CID, 113 ParentURI: commentRecord.Reply.Parent.URI, 114 ParentCID: commentRecord.Reply.Parent.CID, 115 Content: commentRecord.Content, 116 ContentFacets: facetsJSON, 117 Embed: embedJSON, 118 ContentLabels: labelsJSON, 119 Langs: commentRecord.Langs, 120 CreatedAt: createdAt, 121 IndexedAt: time.Now(), 122 } 123 124 // Atomically: Index comment + Update parent counts 125 if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil { 126 return fmt.Errorf("failed to index comment and update counts: %w", err) 127 } 128 129 log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI) 130 return nil 131} 132 133// updateComment updates an existing comment's content fields 134func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 135 if commit.Record == nil { 136 return fmt.Errorf("comment update event missing record data") 137 } 138 139 // Parse the updated comment record 140 commentRecord, err := parseCommentRecord(commit.Record) 141 if err != nil { 142 return fmt.Errorf("failed to parse comment record: %w", err) 143 } 144 145 // SECURITY: Validate this is a legitimate update 146 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil { 147 log.Printf("🚨 SECURITY: Rejecting comment update: %v", err) 148 return err 149 } 150 151 // Build AT-URI for the comment being updated 152 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey) 153 154 // Fetch existing comment to validate threading references are immutable 155 existingComment, err := c.commentRepo.GetByURI(ctx, uri) 156 if err != nil { 157 if err == comments.ErrCommentNotFound { 158 // Comment doesn't exist yet - might arrive out of order 159 log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri) 160 return nil 161 } 162 return fmt.Errorf("failed to get existing comment for validation: %w", err) 163 } 164 165 // SECURITY: Threading references are IMMUTABLE after creation 166 // Reject updates that attempt to change root/parent (prevents thread hijacking) 167 if existingComment.RootURI != commentRecord.Reply.Root.URI || 168 existingComment.RootCID != commentRecord.Reply.Root.CID || 169 existingComment.ParentURI != commentRecord.Reply.Parent.URI || 170 existingComment.ParentCID != commentRecord.Reply.Parent.CID { 171 log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri) 172 log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID) 173 log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID) 174 log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID) 175 log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID) 176 return fmt.Errorf("comment threading references cannot be changed after creation") 177 } 178 179 // Serialize optional JSON fields 180 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord) 181 182 // Build comment update entity (preserves vote counts and created_at) 183 comment := &comments.Comment{ 184 URI: uri, 185 CID: commit.CID, 186 Content: commentRecord.Content, 187 ContentFacets: facetsJSON, 188 Embed: embedJSON, 189 ContentLabels: labelsJSON, 190 Langs: commentRecord.Langs, 191 } 192 193 // Update the comment in repository 194 if err := c.commentRepo.Update(ctx, comment); err != nil { 195 return fmt.Errorf("failed to update comment: %w", err) 196 } 197 198 log.Printf("✓ Updated comment: %s", uri) 199 return nil 200} 201 202// deleteComment soft-deletes a comment and updates parent counts 203func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 204 // Build AT-URI for the comment being deleted 205 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey) 206 207 // Get existing comment to know its parent (for decrementing the right counter) 208 existingComment, err := c.commentRepo.GetByURI(ctx, uri) 209 if err != nil { 210 if err == comments.ErrCommentNotFound { 211 // Idempotent: Comment already deleted or never existed 212 log.Printf("Comment already deleted or not found: %s", uri) 213 return nil 214 } 215 return fmt.Errorf("failed to get existing comment: %w", err) 216 } 217 218 // Atomically: Soft-delete comment + Update parent counts 219 if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil { 220 return fmt.Errorf("failed to delete comment and update counts: %w", err) 221 } 222 223 log.Printf("✓ Deleted comment: %s", uri) 224 return nil 225} 226 227// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts 228func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error { 229 tx, err := c.db.BeginTx(ctx, nil) 230 if err != nil { 231 return fmt.Errorf("failed to begin transaction: %w", err) 232 } 233 defer func() { 234 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 235 log.Printf("Failed to rollback transaction: %v", rollbackErr) 236 } 237 }() 238 239 // 1. Check if comment exists and handle resurrection case 240 // In atProto, deleted records' rkeys become available - users can recreate with same rkey 241 // We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts) 242 var existingID int64 243 var existingDeletedAt *time.Time 244 checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1` 245 checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt) 246 247 var commentID int64 248 249 if checkErr == nil { 250 // Comment exists 251 if existingDeletedAt == nil { 252 // Not deleted - this is an idempotent replay, skip gracefully 253 log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI) 254 if commitErr := tx.Commit(); commitErr != nil { 255 return fmt.Errorf("failed to commit transaction: %w", commitErr) 256 } 257 return nil 258 } 259 260 // Comment was soft-deleted, now being recreated (resurrection) 261 // This is a NEW record with same rkey - update ALL fields including threading refs 262 // User may have deleted old comment and created a new one on a different parent/root 263 // Clear deletion metadata to restore the comment 264 log.Printf("Resurrecting previously deleted comment: %s", comment.URI) 265 commentID = existingID 266 267 resurrectQuery := ` 268 UPDATE comments 269 SET 270 cid = $1, 271 commenter_did = $2, 272 root_uri = $3, 273 root_cid = $4, 274 parent_uri = $5, 275 parent_cid = $6, 276 content = $7, 277 content_facets = $8, 278 embed = $9, 279 content_labels = $10, 280 langs = $11, 281 created_at = $12, 282 indexed_at = $13, 283 deleted_at = NULL, 284 deletion_reason = NULL, 285 deleted_by = NULL, 286 reply_count = 0 287 WHERE id = $14 288 ` 289 290 _, err = tx.ExecContext( 291 ctx, resurrectQuery, 292 comment.CID, 293 comment.CommenterDID, 294 comment.RootURI, 295 comment.RootCID, 296 comment.ParentURI, 297 comment.ParentCID, 298 comment.Content, 299 comment.ContentFacets, 300 comment.Embed, 301 comment.ContentLabels, 302 pq.Array(comment.Langs), 303 comment.CreatedAt, 304 time.Now(), 305 commentID, 306 ) 307 if err != nil { 308 return fmt.Errorf("failed to resurrect comment: %w", err) 309 } 310 311 } else if checkErr == sql.ErrNoRows { 312 // Comment doesn't exist - insert new comment 313 insertQuery := ` 314 INSERT INTO comments ( 315 uri, cid, rkey, commenter_did, 316 root_uri, root_cid, parent_uri, parent_cid, 317 content, content_facets, embed, content_labels, langs, 318 created_at, indexed_at 319 ) VALUES ( 320 $1, $2, $3, $4, 321 $5, $6, $7, $8, 322 $9, $10, $11, $12, $13, 323 $14, $15 324 ) 325 RETURNING id 326 ` 327 328 err = tx.QueryRowContext( 329 ctx, insertQuery, 330 comment.URI, comment.CID, comment.RKey, comment.CommenterDID, 331 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID, 332 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs), 333 comment.CreatedAt, time.Now(), 334 ).Scan(&commentID) 335 if err != nil { 336 return fmt.Errorf("failed to insert comment: %w", err) 337 } 338 339 } else { 340 // Unexpected error checking for existing comment 341 return fmt.Errorf("failed to check for existing comment: %w", checkErr) 342 } 343 344 // 1.5. Reconcile reply_count for this newly inserted comment 345 // In case any replies arrived out-of-order before this parent was indexed 346 reconcileQuery := ` 347 UPDATE comments 348 SET reply_count = ( 349 SELECT COUNT(*) 350 FROM comments c 351 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL 352 ) 353 WHERE id = $2 354 ` 355 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID) 356 if reconcileErr != nil { 357 log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr) 358 // Continue anyway - this is a best-effort reconciliation 359 } 360 361 // 2. Update parent counts atomically 362 // Parent could be a post (increment comment_count) or a comment (increment reply_count) 363 // Parse collection from parent URI to determine target table 364 // 365 // NOTE: Post comment_count reconciliation IS implemented in post_consumer.go:210-226 366 // When a comment arrives before its parent post, the post update below returns 0 rows 367 // and we log a warning. Later, when the post is indexed, the post consumer reconciles 368 // comment_count by counting all pre-existing comments. This ensures accurate counts 369 // despite out-of-order Jetstream event delivery. 370 // 371 // Test coverage: TestPostConsumer_CommentCountReconciliation in post_consumer_test.go 372 collection := utils.ExtractCollectionFromURI(comment.ParentURI) 373 374 var updateQuery string 375 switch collection { 376 case "social.coves.community.post": 377 // Comment on post - update posts.comment_count 378 updateQuery = ` 379 UPDATE posts 380 SET comment_count = comment_count + 1 381 WHERE uri = $1 AND deleted_at IS NULL 382 ` 383 384 case "social.coves.community.comment": 385 // Reply to comment - update comments.reply_count 386 updateQuery = ` 387 UPDATE comments 388 SET reply_count = reply_count + 1 389 WHERE uri = $1 AND deleted_at IS NULL 390 ` 391 392 default: 393 // Unknown or unsupported parent collection 394 // Comment is still indexed, we just don't update parent counts 395 log.Printf("Comment parent has unsupported collection: %s (comment indexed, parent count not updated)", collection) 396 if commitErr := tx.Commit(); commitErr != nil { 397 return fmt.Errorf("failed to commit transaction: %w", commitErr) 398 } 399 return nil 400 } 401 402 result, err := tx.ExecContext(ctx, updateQuery, comment.ParentURI) 403 if err != nil { 404 return fmt.Errorf("failed to update parent count: %w", err) 405 } 406 407 rowsAffected, err := result.RowsAffected() 408 if err != nil { 409 return fmt.Errorf("failed to check update result: %w", err) 410 } 411 412 // If parent not found, that's OK (parent might not be indexed yet) 413 if rowsAffected == 0 { 414 log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI) 415 } 416 417 // Commit transaction 418 if err := tx.Commit(); err != nil { 419 return fmt.Errorf("failed to commit transaction: %w", err) 420 } 421 422 return nil 423} 424 425// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts 426// Blanks content to preserve thread structure while respecting user privacy 427// The comment remains in the database but is shown as "[deleted]" in thread views 428func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error { 429 tx, err := c.db.BeginTx(ctx, nil) 430 if err != nil { 431 return fmt.Errorf("failed to begin transaction: %w", err) 432 } 433 defer func() { 434 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 435 log.Printf("Failed to rollback transaction: %v", rollbackErr) 436 } 437 }() 438 439 // 1. Soft-delete the comment: blank content but preserve structure 440 // DELETE event from Jetstream = author deleted their own comment 441 // Content is blanked to respect user privacy while preserving thread structure 442 // Use the repository's transaction-aware method for DRY 443 repoTx, ok := c.commentRepo.(comments.RepositoryTx) 444 if !ok { 445 return fmt.Errorf("comment repository does not support transactional operations") 446 } 447 448 rowsAffected, err := repoTx.SoftDeleteWithReasonTx(ctx, tx, comment.URI, comments.DeletionReasonAuthor, comment.CommenterDID) 449 if err != nil { 450 return fmt.Errorf("failed to delete comment: %w", err) 451 } 452 453 // Idempotent: If no rows affected, comment already deleted 454 if rowsAffected == 0 { 455 log.Printf("Comment already deleted: %s (idempotent)", comment.URI) 456 if commitErr := tx.Commit(); commitErr != nil { 457 return fmt.Errorf("failed to commit transaction: %w", commitErr) 458 } 459 return nil 460 } 461 462 // 2. Decrement parent counts atomically 463 // Parent could be a post or comment - parse collection to determine target table 464 collection := utils.ExtractCollectionFromURI(comment.ParentURI) 465 466 var updateQuery string 467 var result sql.Result 468 switch collection { 469 case "social.coves.community.post": 470 // Comment on post - decrement posts.comment_count 471 updateQuery = ` 472 UPDATE posts 473 SET comment_count = GREATEST(0, comment_count - 1) 474 WHERE uri = $1 AND deleted_at IS NULL 475 ` 476 477 case "social.coves.community.comment": 478 // Reply to comment - decrement comments.reply_count 479 updateQuery = ` 480 UPDATE comments 481 SET reply_count = GREATEST(0, reply_count - 1) 482 WHERE uri = $1 AND deleted_at IS NULL 483 ` 484 485 default: 486 // Unknown or unsupported parent collection 487 // Comment is still deleted, we just don't update parent counts 488 log.Printf("Comment parent has unsupported collection: %s (comment deleted, parent count not updated)", collection) 489 if commitErr := tx.Commit(); commitErr != nil { 490 return fmt.Errorf("failed to commit transaction: %w", commitErr) 491 } 492 return nil 493 } 494 495 result, err = tx.ExecContext(ctx, updateQuery, comment.ParentURI) 496 if err != nil { 497 return fmt.Errorf("failed to update parent count: %w", err) 498 } 499 500 rowsAffected, err = result.RowsAffected() 501 if err != nil { 502 return fmt.Errorf("failed to check update result: %w", err) 503 } 504 505 // If parent not found, that's OK (parent might be deleted) 506 if rowsAffected == 0 { 507 log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI) 508 } 509 510 // Commit transaction 511 if err := tx.Commit(); err != nil { 512 return fmt.Errorf("failed to commit transaction: %w", err) 513 } 514 515 return nil 516} 517 518// validateCommentEvent performs security validation on comment events 519func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error { 520 // SECURITY: Comments MUST come from user repositories (repo owner = commenter DID) 521 // The repository owner (repoDID) IS the commenter - comments are stored in user repos. 522 // 523 // We do NOT check if the user exists in AppView because: 524 // 1. Comment events may arrive before user events in Jetstream (race condition) 525 // 2. The comment came from the user's PDS repository (authenticated by PDS) 526 // 3. The database FK constraint was removed to allow out-of-order indexing 527 // 4. Orphaned comments (from never-indexed users) are harmless 528 // 529 // Security is maintained because: 530 // - Comment must come from user's own PDS repository (verified by atProto) 531 // - Fake DIDs will fail PDS authentication 532 533 // Validate DID format (basic sanity check) 534 if !strings.HasPrefix(repoDID, "did:") { 535 return fmt.Errorf("invalid commenter DID format: %s", repoDID) 536 } 537 538 // Validate content is not empty (required per lexicon) 539 if comment.Content == "" { 540 return fmt.Errorf("comment content is required") 541 } 542 543 // Validate content length (defensive check - PDS should enforce this) 544 // Per lexicon: max 3000 graphemes, ~30000 bytes 545 // We check bytes as a simple defensive measure 546 if len(comment.Content) > MaxCommentContentBytes { 547 return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content)) 548 } 549 550 // Validate reply references exist 551 if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" { 552 return fmt.Errorf("invalid root reference: must have both URI and CID") 553 } 554 555 if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" { 556 return fmt.Errorf("invalid parent reference: must have both URI and CID") 557 } 558 559 // Validate AT-URI structure for root and parent 560 if err := validateATURI(comment.Reply.Root.URI); err != nil { 561 return fmt.Errorf("invalid root URI: %w", err) 562 } 563 564 if err := validateATURI(comment.Reply.Parent.URI); err != nil { 565 return fmt.Errorf("invalid parent URI: %w", err) 566 } 567 568 return nil 569} 570 571// validateATURI performs basic structure validation on AT-URIs 572// Format: at://did:method:id/collection/rkey 573// This is defensive validation - we trust PDS but catch obviously malformed URIs 574func validateATURI(uri string) error { 575 if !strings.HasPrefix(uri, ATProtoScheme) { 576 return fmt.Errorf("must start with %s", ATProtoScheme) 577 } 578 579 // Remove at:// prefix and split by / 580 withoutScheme := strings.TrimPrefix(uri, ATProtoScheme) 581 parts := strings.Split(withoutScheme, "/") 582 583 // Must have at least 3 parts: did, collection, rkey 584 if len(parts) < 3 { 585 return fmt.Errorf("invalid structure (expected at://did/collection/rkey)") 586 } 587 588 // First part should be a DID 589 if !strings.HasPrefix(parts[0], "did:") { 590 return fmt.Errorf("repository identifier must be a DID") 591 } 592 593 // Collection and rkey should not be empty 594 if parts[1] == "" || parts[2] == "" { 595 return fmt.Errorf("collection and rkey cannot be empty") 596 } 597 598 return nil 599} 600 601// CommentRecordFromJetstream represents a comment record as received from Jetstream 602// Matches social.coves.community.comment lexicon 603type CommentRecordFromJetstream struct { 604 Labels interface{} `json:"labels,omitempty"` 605 Embed map[string]interface{} `json:"embed,omitempty"` 606 Reply ReplyRefFromJetstream `json:"reply"` 607 Type string `json:"$type"` 608 Content string `json:"content"` 609 CreatedAt string `json:"createdAt"` 610 Facets []interface{} `json:"facets,omitempty"` 611 Langs []string `json:"langs,omitempty"` 612} 613 614// ReplyRefFromJetstream represents the threading structure 615type ReplyRefFromJetstream struct { 616 Root StrongRefFromJetstream `json:"root"` 617 Parent StrongRefFromJetstream `json:"parent"` 618} 619 620// parseCommentRecord parses a comment record from Jetstream event data 621func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) { 622 // Marshal to JSON and back for proper type conversion 623 recordJSON, err := json.Marshal(record) 624 if err != nil { 625 return nil, fmt.Errorf("failed to marshal record: %w", err) 626 } 627 628 var comment CommentRecordFromJetstream 629 if err := json.Unmarshal(recordJSON, &comment); err != nil { 630 return nil, fmt.Errorf("failed to unmarshal comment record: %w", err) 631 } 632 633 // Validate required fields 634 if comment.Content == "" { 635 return nil, fmt.Errorf("comment record missing content field") 636 } 637 638 if comment.CreatedAt == "" { 639 return nil, fmt.Errorf("comment record missing createdAt field") 640 } 641 642 return &comment, nil 643} 644 645// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings 646// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication) 647func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) { 648 // Serialize facets if present 649 if len(commentRecord.Facets) > 0 { 650 if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil { 651 facetsStr := string(facetsBytes) 652 facetsJSON = &facetsStr 653 } 654 } 655 656 // Serialize embed if present 657 if len(commentRecord.Embed) > 0 { 658 if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil { 659 embedStr := string(embedBytes) 660 embedJSON = &embedStr 661 } 662 } 663 664 // Serialize labels if present 665 if commentRecord.Labels != nil { 666 if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil { 667 labelsStr := string(labelsBytes) 668 labelsJSON = &labelsStr 669 } 670 } 671 672 return facetsJSON, embedJSON, labelsJSON 673}