A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/core/comments" 5 "context" 6 "database/sql" 7 "encoding/json" 8 "fmt" 9 "log" 10 "strings" 11 "time" 12 13 "github.com/lib/pq" 14) 15 16// Constants for comment validation and processing 17const ( 18 // CommentCollection is the lexicon collection identifier for comments 19 CommentCollection = "social.coves.feed.comment" 20 21 // ATProtoScheme is the URI scheme for atProto AT-URIs 22 ATProtoScheme = "at://" 23 24 // MaxCommentContentBytes is the maximum allowed size for comment content 25 // Per lexicon: max 3000 graphemes, ~30000 bytes 26 MaxCommentContentBytes = 30000 27) 28 29// CommentEventConsumer consumes comment-related events from Jetstream 30// Handles CREATE, UPDATE, and DELETE operations for social.coves.feed.comment 31type CommentEventConsumer struct { 32 commentRepo comments.Repository 33 db *sql.DB // Direct DB access for atomic count updates 34} 35 36// NewCommentEventConsumer creates a new Jetstream consumer for comment events 37func NewCommentEventConsumer( 38 commentRepo comments.Repository, 39 db *sql.DB, 40) *CommentEventConsumer { 41 return &CommentEventConsumer{ 42 commentRepo: commentRepo, 43 db: db, 44 } 45} 46 47// HandleEvent processes a Jetstream event for comment records 48func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 49 // We only care about commit events for comment records 50 if event.Kind != "commit" || event.Commit == nil { 51 return nil 52 } 53 54 commit := event.Commit 55 56 // Handle comment record operations 57 if commit.Collection == CommentCollection { 58 switch commit.Operation { 59 case "create": 60 return c.createComment(ctx, event.Did, commit) 61 case "update": 62 return c.updateComment(ctx, event.Did, commit) 63 case "delete": 64 return c.deleteComment(ctx, event.Did, commit) 65 } 66 } 67 68 // Silently ignore other operations and collections 69 return nil 70} 71 72// createComment indexes a new comment from the firehose and updates parent counts 73func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 74 if commit.Record == nil { 75 return fmt.Errorf("comment create event missing record data") 76 } 77 78 // Parse the comment record 79 commentRecord, err := parseCommentRecord(commit.Record) 80 if err != nil { 81 return fmt.Errorf("failed to parse comment record: %w", err) 82 } 83 84 // SECURITY: Validate this is a legitimate comment event 85 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil { 86 log.Printf("🚨 SECURITY: Rejecting comment event: %v", err) 87 return err 88 } 89 90 // Build AT-URI for this comment 91 // Format: at://commenter_did/social.coves.feed.comment/rkey 92 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey) 93 94 // Parse timestamp from record 95 createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt) 96 if err != nil { 97 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err) 98 createdAt = time.Now() 99 } 100 101 // Serialize optional JSON fields 102 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord) 103 104 // Build comment entity 105 comment := &comments.Comment{ 106 URI: uri, 107 CID: commit.CID, 108 RKey: commit.RKey, 109 CommenterDID: repoDID, // Comment comes from user's repository 110 RootURI: commentRecord.Reply.Root.URI, 111 RootCID: commentRecord.Reply.Root.CID, 112 ParentURI: commentRecord.Reply.Parent.URI, 113 ParentCID: commentRecord.Reply.Parent.CID, 114 Content: commentRecord.Content, 115 ContentFacets: facetsJSON, 116 Embed: embedJSON, 117 ContentLabels: labelsJSON, 118 Langs: commentRecord.Langs, 119 CreatedAt: createdAt, 120 IndexedAt: time.Now(), 121 } 122 123 // Atomically: Index comment + Update parent counts 124 if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil { 125 return fmt.Errorf("failed to index comment and update counts: %w", err) 126 } 127 128 log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI) 129 return nil 130} 131 132// updateComment updates an existing comment's content fields 133func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 134 if commit.Record == nil { 135 return fmt.Errorf("comment update event missing record data") 136 } 137 138 // Parse the updated comment record 139 commentRecord, err := parseCommentRecord(commit.Record) 140 if err != nil { 141 return fmt.Errorf("failed to parse comment record: %w", err) 142 } 143 144 // SECURITY: Validate this is a legitimate update 145 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil { 146 log.Printf("🚨 SECURITY: Rejecting comment update: %v", err) 147 return err 148 } 149 150 // Build AT-URI for the comment being updated 151 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey) 152 153 // Fetch existing comment to validate threading references are immutable 154 existingComment, err := c.commentRepo.GetByURI(ctx, uri) 155 if err != nil { 156 if err == comments.ErrCommentNotFound { 157 // Comment doesn't exist yet - might arrive out of order 158 log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri) 159 return nil 160 } 161 return fmt.Errorf("failed to get existing comment for validation: %w", err) 162 } 163 164 // SECURITY: Threading references are IMMUTABLE after creation 165 // Reject updates that attempt to change root/parent (prevents thread hijacking) 166 if existingComment.RootURI != commentRecord.Reply.Root.URI || 167 existingComment.RootCID != commentRecord.Reply.Root.CID || 168 existingComment.ParentURI != commentRecord.Reply.Parent.URI || 169 existingComment.ParentCID != commentRecord.Reply.Parent.CID { 170 log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri) 171 log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID) 172 log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID) 173 log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID) 174 log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID) 175 return fmt.Errorf("comment threading references cannot be changed after creation") 176 } 177 178 // Serialize optional JSON fields 179 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord) 180 181 // Build comment update entity (preserves vote counts and created_at) 182 comment := &comments.Comment{ 183 URI: uri, 184 CID: commit.CID, 185 Content: commentRecord.Content, 186 ContentFacets: facetsJSON, 187 Embed: embedJSON, 188 ContentLabels: labelsJSON, 189 Langs: commentRecord.Langs, 190 } 191 192 // Update the comment in repository 193 if err := c.commentRepo.Update(ctx, comment); err != nil { 194 return fmt.Errorf("failed to update comment: %w", err) 195 } 196 197 log.Printf("✓ Updated comment: %s", uri) 198 return nil 199} 200 201// deleteComment soft-deletes a comment and updates parent counts 202func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 203 // Build AT-URI for the comment being deleted 204 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey) 205 206 // Get existing comment to know its parent (for decrementing the right counter) 207 existingComment, err := c.commentRepo.GetByURI(ctx, uri) 208 if err != nil { 209 if err == comments.ErrCommentNotFound { 210 // Idempotent: Comment already deleted or never existed 211 log.Printf("Comment already deleted or not found: %s", uri) 212 return nil 213 } 214 return fmt.Errorf("failed to get existing comment: %w", err) 215 } 216 217 // Atomically: Soft-delete comment + Update parent counts 218 if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil { 219 return fmt.Errorf("failed to delete comment and update counts: %w", err) 220 } 221 222 log.Printf("✓ Deleted comment: %s", uri) 223 return nil 224} 225 226// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts 227func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error { 228 tx, err := c.db.BeginTx(ctx, nil) 229 if err != nil { 230 return fmt.Errorf("failed to begin transaction: %w", err) 231 } 232 defer func() { 233 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 234 log.Printf("Failed to rollback transaction: %v", rollbackErr) 235 } 236 }() 237 238 // 1. Check if comment exists and handle resurrection case 239 // In atProto, deleted records' rkeys become available - users can recreate with same rkey 240 // We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts) 241 var existingID int64 242 var existingDeletedAt *time.Time 243 checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1` 244 checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt) 245 246 var commentID int64 247 248 if checkErr == nil { 249 // Comment exists 250 if existingDeletedAt == nil { 251 // Not deleted - this is an idempotent replay, skip gracefully 252 log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI) 253 if commitErr := tx.Commit(); commitErr != nil { 254 return fmt.Errorf("failed to commit transaction: %w", commitErr) 255 } 256 return nil 257 } 258 259 // Comment was soft-deleted, now being recreated (resurrection) 260 // This is a NEW record with same rkey - update ALL fields including threading refs 261 // User may have deleted old comment and created a new one on a different parent/root 262 log.Printf("Resurrecting previously deleted comment: %s", comment.URI) 263 commentID = existingID 264 265 resurrectQuery := ` 266 UPDATE comments 267 SET 268 cid = $1, 269 commenter_did = $2, 270 root_uri = $3, 271 root_cid = $4, 272 parent_uri = $5, 273 parent_cid = $6, 274 content = $7, 275 content_facets = $8, 276 embed = $9, 277 content_labels = $10, 278 langs = $11, 279 created_at = $12, 280 indexed_at = $13, 281 deleted_at = NULL, 282 reply_count = 0 283 WHERE id = $14 284 ` 285 286 _, err = tx.ExecContext( 287 ctx, resurrectQuery, 288 comment.CID, 289 comment.CommenterDID, 290 comment.RootURI, 291 comment.RootCID, 292 comment.ParentURI, 293 comment.ParentCID, 294 comment.Content, 295 comment.ContentFacets, 296 comment.Embed, 297 comment.ContentLabels, 298 pq.Array(comment.Langs), 299 comment.CreatedAt, 300 time.Now(), 301 commentID, 302 ) 303 304 if err != nil { 305 return fmt.Errorf("failed to resurrect comment: %w", err) 306 } 307 308 } else if checkErr == sql.ErrNoRows { 309 // Comment doesn't exist - insert new comment 310 insertQuery := ` 311 INSERT INTO comments ( 312 uri, cid, rkey, commenter_did, 313 root_uri, root_cid, parent_uri, parent_cid, 314 content, content_facets, embed, content_labels, langs, 315 created_at, indexed_at 316 ) VALUES ( 317 $1, $2, $3, $4, 318 $5, $6, $7, $8, 319 $9, $10, $11, $12, $13, 320 $14, $15 321 ) 322 RETURNING id 323 ` 324 325 err = tx.QueryRowContext( 326 ctx, insertQuery, 327 comment.URI, comment.CID, comment.RKey, comment.CommenterDID, 328 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID, 329 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs), 330 comment.CreatedAt, time.Now(), 331 ).Scan(&commentID) 332 333 if err != nil { 334 return fmt.Errorf("failed to insert comment: %w", err) 335 } 336 337 } else { 338 // Unexpected error checking for existing comment 339 return fmt.Errorf("failed to check for existing comment: %w", checkErr) 340 } 341 342 // 1.5. Reconcile reply_count for this newly inserted comment 343 // In case any replies arrived out-of-order before this parent was indexed 344 reconcileQuery := ` 345 UPDATE comments 346 SET reply_count = ( 347 SELECT COUNT(*) 348 FROM comments c 349 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL 350 ) 351 WHERE id = $2 352 ` 353 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID) 354 if reconcileErr != nil { 355 log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr) 356 // Continue anyway - this is a best-effort reconciliation 357 } 358 359 // 2. Update parent counts atomically 360 // Parent could be a post (increment comment_count) or a comment (increment reply_count) 361 // Try posts table first 362 // 363 // FIXME(P1): Post comment_count reconciliation not implemented 364 // When a comment arrives before its parent post (common with cross-repo Jetstream ordering), 365 // the post update below returns 0 rows and we only log a warning. Later, when the post 366 // is indexed by the post consumer, there's NO reconciliation logic to count pre-existing 367 // comments. This causes posts to have permanently stale comment_count values. 368 // 369 // FIX REQUIRED: Post consumer MUST implement the same reconciliation pattern as comments 370 // (see lines 292-305 above). When indexing a new post, count any comments where parent_uri 371 // matches the post URI and set comment_count accordingly. 372 // 373 // Test demonstrating issue: TestCommentConsumer_PostCountReconciliation_Limitation 374 updatePostQuery := ` 375 UPDATE posts 376 SET comment_count = comment_count + 1 377 WHERE uri = $1 AND deleted_at IS NULL 378 ` 379 380 result, err := tx.ExecContext(ctx, updatePostQuery, comment.ParentURI) 381 if err != nil { 382 return fmt.Errorf("failed to update post comment count: %w", err) 383 } 384 385 rowsAffected, err := result.RowsAffected() 386 if err != nil { 387 return fmt.Errorf("failed to check update result: %w", err) 388 } 389 390 // If no post was updated, parent is probably a comment 391 if rowsAffected == 0 { 392 updateCommentQuery := ` 393 UPDATE comments 394 SET reply_count = reply_count + 1 395 WHERE uri = $1 AND deleted_at IS NULL 396 ` 397 398 result, err := tx.ExecContext(ctx, updateCommentQuery, comment.ParentURI) 399 if err != nil { 400 return fmt.Errorf("failed to update comment reply count: %w", err) 401 } 402 403 rowsAffected, err := result.RowsAffected() 404 if err != nil { 405 return fmt.Errorf("failed to check update result: %w", err) 406 } 407 408 // If neither post nor comment was found, that's OK (parent might not be indexed yet) 409 if rowsAffected == 0 { 410 log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI) 411 } 412 } 413 414 // Commit transaction 415 if err := tx.Commit(); err != nil { 416 return fmt.Errorf("failed to commit transaction: %w", err) 417 } 418 419 return nil 420} 421 422// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts 423func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error { 424 tx, err := c.db.BeginTx(ctx, nil) 425 if err != nil { 426 return fmt.Errorf("failed to begin transaction: %w", err) 427 } 428 defer func() { 429 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 430 log.Printf("Failed to rollback transaction: %v", rollbackErr) 431 } 432 }() 433 434 // 1. Soft-delete the comment (idempotent) 435 deleteQuery := ` 436 UPDATE comments 437 SET deleted_at = $2 438 WHERE uri = $1 AND deleted_at IS NULL 439 ` 440 441 result, err := tx.ExecContext(ctx, deleteQuery, comment.URI, time.Now()) 442 if err != nil { 443 return fmt.Errorf("failed to delete comment: %w", err) 444 } 445 446 rowsAffected, err := result.RowsAffected() 447 if err != nil { 448 return fmt.Errorf("failed to check delete result: %w", err) 449 } 450 451 // Idempotent: If no rows affected, comment already deleted 452 if rowsAffected == 0 { 453 log.Printf("Comment already deleted: %s (idempotent)", comment.URI) 454 if commitErr := tx.Commit(); commitErr != nil { 455 return fmt.Errorf("failed to commit transaction: %w", commitErr) 456 } 457 return nil 458 } 459 460 // 2. Decrement parent counts atomically 461 // Parent could be a post or comment - try both (use GREATEST to prevent negative) 462 updatePostQuery := ` 463 UPDATE posts 464 SET comment_count = GREATEST(0, comment_count - 1) 465 WHERE uri = $1 AND deleted_at IS NULL 466 ` 467 468 result, err = tx.ExecContext(ctx, updatePostQuery, comment.ParentURI) 469 if err != nil { 470 return fmt.Errorf("failed to update post comment count: %w", err) 471 } 472 473 rowsAffected, err = result.RowsAffected() 474 if err != nil { 475 return fmt.Errorf("failed to check update result: %w", err) 476 } 477 478 // If no post was updated, parent is probably a comment 479 if rowsAffected == 0 { 480 updateCommentQuery := ` 481 UPDATE comments 482 SET reply_count = GREATEST(0, reply_count - 1) 483 WHERE uri = $1 AND deleted_at IS NULL 484 ` 485 486 result, err := tx.ExecContext(ctx, updateCommentQuery, comment.ParentURI) 487 if err != nil { 488 return fmt.Errorf("failed to update comment reply count: %w", err) 489 } 490 491 rowsAffected, err := result.RowsAffected() 492 if err != nil { 493 return fmt.Errorf("failed to check update result: %w", err) 494 } 495 496 // If neither was found, that's OK (parent might be deleted) 497 if rowsAffected == 0 { 498 log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI) 499 } 500 } 501 502 // Commit transaction 503 if err := tx.Commit(); err != nil { 504 return fmt.Errorf("failed to commit transaction: %w", err) 505 } 506 507 return nil 508} 509 510// validateCommentEvent performs security validation on comment events 511func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error { 512 // SECURITY: Comments MUST come from user repositories (repo owner = commenter DID) 513 // The repository owner (repoDID) IS the commenter - comments are stored in user repos. 514 // 515 // We do NOT check if the user exists in AppView because: 516 // 1. Comment events may arrive before user events in Jetstream (race condition) 517 // 2. The comment came from the user's PDS repository (authenticated by PDS) 518 // 3. The database FK constraint was removed to allow out-of-order indexing 519 // 4. Orphaned comments (from never-indexed users) are harmless 520 // 521 // Security is maintained because: 522 // - Comment must come from user's own PDS repository (verified by atProto) 523 // - Fake DIDs will fail PDS authentication 524 525 // Validate DID format (basic sanity check) 526 if !strings.HasPrefix(repoDID, "did:") { 527 return fmt.Errorf("invalid commenter DID format: %s", repoDID) 528 } 529 530 // Validate content is not empty (required per lexicon) 531 if comment.Content == "" { 532 return fmt.Errorf("comment content is required") 533 } 534 535 // Validate content length (defensive check - PDS should enforce this) 536 // Per lexicon: max 3000 graphemes, ~30000 bytes 537 // We check bytes as a simple defensive measure 538 if len(comment.Content) > MaxCommentContentBytes { 539 return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content)) 540 } 541 542 // Validate reply references exist 543 if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" { 544 return fmt.Errorf("invalid root reference: must have both URI and CID") 545 } 546 547 if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" { 548 return fmt.Errorf("invalid parent reference: must have both URI and CID") 549 } 550 551 // Validate AT-URI structure for root and parent 552 if err := validateATURI(comment.Reply.Root.URI); err != nil { 553 return fmt.Errorf("invalid root URI: %w", err) 554 } 555 556 if err := validateATURI(comment.Reply.Parent.URI); err != nil { 557 return fmt.Errorf("invalid parent URI: %w", err) 558 } 559 560 return nil 561} 562 563// validateATURI performs basic structure validation on AT-URIs 564// Format: at://did:method:id/collection/rkey 565// This is defensive validation - we trust PDS but catch obviously malformed URIs 566func validateATURI(uri string) error { 567 if !strings.HasPrefix(uri, ATProtoScheme) { 568 return fmt.Errorf("must start with %s", ATProtoScheme) 569 } 570 571 // Remove at:// prefix and split by / 572 withoutScheme := strings.TrimPrefix(uri, ATProtoScheme) 573 parts := strings.Split(withoutScheme, "/") 574 575 // Must have at least 3 parts: did, collection, rkey 576 if len(parts) < 3 { 577 return fmt.Errorf("invalid structure (expected at://did/collection/rkey)") 578 } 579 580 // First part should be a DID 581 if !strings.HasPrefix(parts[0], "did:") { 582 return fmt.Errorf("repository identifier must be a DID") 583 } 584 585 // Collection and rkey should not be empty 586 if parts[1] == "" || parts[2] == "" { 587 return fmt.Errorf("collection and rkey cannot be empty") 588 } 589 590 return nil 591} 592 593// CommentRecordFromJetstream represents a comment record as received from Jetstream 594// Matches social.coves.feed.comment lexicon 595type CommentRecordFromJetstream struct { 596 Type string `json:"$type"` 597 Reply ReplyRefFromJetstream `json:"reply"` 598 Content string `json:"content"` 599 Facets []interface{} `json:"facets,omitempty"` 600 Embed map[string]interface{} `json:"embed,omitempty"` 601 Langs []string `json:"langs,omitempty"` 602 Labels interface{} `json:"labels,omitempty"` 603 CreatedAt string `json:"createdAt"` 604} 605 606// ReplyRefFromJetstream represents the threading structure 607type ReplyRefFromJetstream struct { 608 Root StrongRefFromJetstream `json:"root"` 609 Parent StrongRefFromJetstream `json:"parent"` 610} 611 612// parseCommentRecord parses a comment record from Jetstream event data 613func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) { 614 // Marshal to JSON and back for proper type conversion 615 recordJSON, err := json.Marshal(record) 616 if err != nil { 617 return nil, fmt.Errorf("failed to marshal record: %w", err) 618 } 619 620 var comment CommentRecordFromJetstream 621 if err := json.Unmarshal(recordJSON, &comment); err != nil { 622 return nil, fmt.Errorf("failed to unmarshal comment record: %w", err) 623 } 624 625 // Validate required fields 626 if comment.Content == "" { 627 return nil, fmt.Errorf("comment record missing content field") 628 } 629 630 if comment.CreatedAt == "" { 631 return nil, fmt.Errorf("comment record missing createdAt field") 632 } 633 634 return &comment, nil 635} 636 637// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings 638// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication) 639func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) { 640 // Serialize facets if present 641 if commentRecord.Facets != nil && len(commentRecord.Facets) > 0 { 642 if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil { 643 facetsStr := string(facetsBytes) 644 facetsJSON = &facetsStr 645 } 646 } 647 648 // Serialize embed if present 649 if commentRecord.Embed != nil && len(commentRecord.Embed) > 0 { 650 if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil { 651 embedStr := string(embedBytes) 652 embedJSON = &embedStr 653 } 654 } 655 656 // Serialize labels if present 657 if commentRecord.Labels != nil { 658 if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil { 659 labelsStr := string(labelsBytes) 660 labelsJSON = &labelsStr 661 } 662 } 663 664 return facetsJSON, embedJSON, labelsJSON 665}