A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/atproto/utils" 5 "Coves/internal/core/comments" 6 "context" 7 "database/sql" 8 "encoding/json" 9 "fmt" 10 "log" 11 "strings" 12 "time" 13 14 "github.com/lib/pq" 15) 16 17// Constants for comment validation and processing 18const ( 19 // CommentCollection is the lexicon collection identifier for comments 20 CommentCollection = "social.coves.community.comment" 21 22 // ATProtoScheme is the URI scheme for atProto AT-URIs 23 ATProtoScheme = "at://" 24 25 // MaxCommentContentBytes is the maximum allowed size for comment content 26 // Per lexicon: max 3000 graphemes, ~30000 bytes 27 MaxCommentContentBytes = 30000 28) 29 30// CommentEventConsumer consumes comment-related events from Jetstream 31// Handles CREATE, UPDATE, and DELETE operations for social.coves.community.comment 32type CommentEventConsumer struct { 33 commentRepo comments.Repository 34 db *sql.DB // Direct DB access for atomic count updates 35} 36 37// NewCommentEventConsumer creates a new Jetstream consumer for comment events 38func NewCommentEventConsumer( 39 commentRepo comments.Repository, 40 db *sql.DB, 41) *CommentEventConsumer { 42 return &CommentEventConsumer{ 43 commentRepo: commentRepo, 44 db: db, 45 } 46} 47 48// HandleEvent processes a Jetstream event for comment records 49func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 50 // We only care about commit events for comment records 51 if event.Kind != "commit" || event.Commit == nil { 52 return nil 53 } 54 55 commit := event.Commit 56 57 // Handle comment record operations 58 if commit.Collection == CommentCollection { 59 switch commit.Operation { 60 case "create": 61 return c.createComment(ctx, event.Did, commit) 62 case "update": 63 return c.updateComment(ctx, event.Did, commit) 64 case "delete": 65 return c.deleteComment(ctx, event.Did, commit) 66 } 67 } 68 69 // Silently ignore other operations and collections 70 return nil 71} 72 73// createComment indexes a new comment from the firehose and updates parent counts 74func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 75 if commit.Record == nil { 76 return fmt.Errorf("comment create event missing record data") 77 } 78 79 // Parse the comment record 80 commentRecord, err := parseCommentRecord(commit.Record) 81 if err != nil { 82 return fmt.Errorf("failed to parse comment record: %w", err) 83 } 84 85 // SECURITY: Validate this is a legitimate comment event 86 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil { 87 log.Printf("🚨 SECURITY: Rejecting comment event: %v", err) 88 return err 89 } 90 91 // Build AT-URI for this comment 92 // Format: at://commenter_did/social.coves.community.comment/rkey 93 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey) 94 95 // Parse timestamp from record 96 createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt) 97 if err != nil { 98 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err) 99 createdAt = time.Now() 100 } 101 102 // Serialize optional JSON fields 103 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord) 104 105 // Build comment entity 106 comment := &comments.Comment{ 107 URI: uri, 108 CID: commit.CID, 109 RKey: commit.RKey, 110 CommenterDID: repoDID, // Comment comes from user's repository 111 RootURI: commentRecord.Reply.Root.URI, 112 RootCID: commentRecord.Reply.Root.CID, 113 ParentURI: commentRecord.Reply.Parent.URI, 114 ParentCID: commentRecord.Reply.Parent.CID, 115 Content: commentRecord.Content, 116 ContentFacets: facetsJSON, 117 Embed: embedJSON, 118 ContentLabels: labelsJSON, 119 Langs: commentRecord.Langs, 120 CreatedAt: createdAt, 121 IndexedAt: time.Now(), 122 } 123 124 // Atomically: Index comment + Update parent counts 125 if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil { 126 return fmt.Errorf("failed to index comment and update counts: %w", err) 127 } 128 129 log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI) 130 return nil 131} 132 133// updateComment updates an existing comment's content fields 134func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 135 if commit.Record == nil { 136 return fmt.Errorf("comment update event missing record data") 137 } 138 139 // Parse the updated comment record 140 commentRecord, err := parseCommentRecord(commit.Record) 141 if err != nil { 142 return fmt.Errorf("failed to parse comment record: %w", err) 143 } 144 145 // SECURITY: Validate this is a legitimate update 146 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil { 147 log.Printf("🚨 SECURITY: Rejecting comment update: %v", err) 148 return err 149 } 150 151 // Build AT-URI for the comment being updated 152 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey) 153 154 // Fetch existing comment to validate threading references are immutable 155 existingComment, err := c.commentRepo.GetByURI(ctx, uri) 156 if err != nil { 157 if err == comments.ErrCommentNotFound { 158 // Comment doesn't exist yet - might arrive out of order 159 log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri) 160 return nil 161 } 162 return fmt.Errorf("failed to get existing comment for validation: %w", err) 163 } 164 165 // SECURITY: Threading references are IMMUTABLE after creation 166 // Reject updates that attempt to change root/parent (prevents thread hijacking) 167 if existingComment.RootURI != commentRecord.Reply.Root.URI || 168 existingComment.RootCID != commentRecord.Reply.Root.CID || 169 existingComment.ParentURI != commentRecord.Reply.Parent.URI || 170 existingComment.ParentCID != commentRecord.Reply.Parent.CID { 171 log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri) 172 log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID) 173 log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID) 174 log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID) 175 log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID) 176 return fmt.Errorf("comment threading references cannot be changed after creation") 177 } 178 179 // Serialize optional JSON fields 180 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord) 181 182 // Build comment update entity (preserves vote counts and created_at) 183 comment := &comments.Comment{ 184 URI: uri, 185 CID: commit.CID, 186 Content: commentRecord.Content, 187 ContentFacets: facetsJSON, 188 Embed: embedJSON, 189 ContentLabels: labelsJSON, 190 Langs: commentRecord.Langs, 191 } 192 193 // Update the comment in repository 194 if err := c.commentRepo.Update(ctx, comment); err != nil { 195 return fmt.Errorf("failed to update comment: %w", err) 196 } 197 198 log.Printf("✓ Updated comment: %s", uri) 199 return nil 200} 201 202// deleteComment soft-deletes a comment and updates parent counts 203func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 204 // Build AT-URI for the comment being deleted 205 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey) 206 207 // Get existing comment to know its parent (for decrementing the right counter) 208 existingComment, err := c.commentRepo.GetByURI(ctx, uri) 209 if err != nil { 210 if err == comments.ErrCommentNotFound { 211 // Idempotent: Comment already deleted or never existed 212 log.Printf("Comment already deleted or not found: %s", uri) 213 return nil 214 } 215 return fmt.Errorf("failed to get existing comment: %w", err) 216 } 217 218 // Atomically: Soft-delete comment + Update parent counts 219 if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil { 220 return fmt.Errorf("failed to delete comment and update counts: %w", err) 221 } 222 223 log.Printf("✓ Deleted comment: %s", uri) 224 return nil 225} 226 227// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts 228func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error { 229 tx, err := c.db.BeginTx(ctx, nil) 230 if err != nil { 231 return fmt.Errorf("failed to begin transaction: %w", err) 232 } 233 defer func() { 234 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 235 log.Printf("Failed to rollback transaction: %v", rollbackErr) 236 } 237 }() 238 239 // 1. Check if comment exists and handle resurrection case 240 // In atProto, deleted records' rkeys become available - users can recreate with same rkey 241 // We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts) 242 var existingID int64 243 var existingDeletedAt *time.Time 244 checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1` 245 checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt) 246 247 var commentID int64 248 249 if checkErr == nil { 250 // Comment exists 251 if existingDeletedAt == nil { 252 // Not deleted - this is an idempotent replay, skip gracefully 253 log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI) 254 if commitErr := tx.Commit(); commitErr != nil { 255 return fmt.Errorf("failed to commit transaction: %w", commitErr) 256 } 257 return nil 258 } 259 260 // Comment was soft-deleted, now being recreated (resurrection) 261 // This is a NEW record with same rkey - update ALL fields including threading refs 262 // User may have deleted old comment and created a new one on a different parent/root 263 log.Printf("Resurrecting previously deleted comment: %s", comment.URI) 264 commentID = existingID 265 266 resurrectQuery := ` 267 UPDATE comments 268 SET 269 cid = $1, 270 commenter_did = $2, 271 root_uri = $3, 272 root_cid = $4, 273 parent_uri = $5, 274 parent_cid = $6, 275 content = $7, 276 content_facets = $8, 277 embed = $9, 278 content_labels = $10, 279 langs = $11, 280 created_at = $12, 281 indexed_at = $13, 282 deleted_at = NULL, 283 reply_count = 0 284 WHERE id = $14 285 ` 286 287 _, err = tx.ExecContext( 288 ctx, resurrectQuery, 289 comment.CID, 290 comment.CommenterDID, 291 comment.RootURI, 292 comment.RootCID, 293 comment.ParentURI, 294 comment.ParentCID, 295 comment.Content, 296 comment.ContentFacets, 297 comment.Embed, 298 comment.ContentLabels, 299 pq.Array(comment.Langs), 300 comment.CreatedAt, 301 time.Now(), 302 commentID, 303 ) 304 if err != nil { 305 return fmt.Errorf("failed to resurrect comment: %w", err) 306 } 307 308 } else if checkErr == sql.ErrNoRows { 309 // Comment doesn't exist - insert new comment 310 insertQuery := ` 311 INSERT INTO comments ( 312 uri, cid, rkey, commenter_did, 313 root_uri, root_cid, parent_uri, parent_cid, 314 content, content_facets, embed, content_labels, langs, 315 created_at, indexed_at 316 ) VALUES ( 317 $1, $2, $3, $4, 318 $5, $6, $7, $8, 319 $9, $10, $11, $12, $13, 320 $14, $15 321 ) 322 RETURNING id 323 ` 324 325 err = tx.QueryRowContext( 326 ctx, insertQuery, 327 comment.URI, comment.CID, comment.RKey, comment.CommenterDID, 328 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID, 329 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs), 330 comment.CreatedAt, time.Now(), 331 ).Scan(&commentID) 332 if err != nil { 333 return fmt.Errorf("failed to insert comment: %w", err) 334 } 335 336 } else { 337 // Unexpected error checking for existing comment 338 return fmt.Errorf("failed to check for existing comment: %w", checkErr) 339 } 340 341 // 1.5. Reconcile reply_count for this newly inserted comment 342 // In case any replies arrived out-of-order before this parent was indexed 343 reconcileQuery := ` 344 UPDATE comments 345 SET reply_count = ( 346 SELECT COUNT(*) 347 FROM comments c 348 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL 349 ) 350 WHERE id = $2 351 ` 352 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID) 353 if reconcileErr != nil { 354 log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr) 355 // Continue anyway - this is a best-effort reconciliation 356 } 357 358 // 2. Update parent counts atomically 359 // Parent could be a post (increment comment_count) or a comment (increment reply_count) 360 // Parse collection from parent URI to determine target table 361 // 362 // NOTE: Post comment_count reconciliation IS implemented in post_consumer.go:210-226 363 // When a comment arrives before its parent post, the post update below returns 0 rows 364 // and we log a warning. Later, when the post is indexed, the post consumer reconciles 365 // comment_count by counting all pre-existing comments. This ensures accurate counts 366 // despite out-of-order Jetstream event delivery. 367 // 368 // Test coverage: TestPostConsumer_CommentCountReconciliation in post_consumer_test.go 369 collection := utils.ExtractCollectionFromURI(comment.ParentURI) 370 371 var updateQuery string 372 switch collection { 373 case "social.coves.community.post": 374 // Comment on post - update posts.comment_count 375 updateQuery = ` 376 UPDATE posts 377 SET comment_count = comment_count + 1 378 WHERE uri = $1 AND deleted_at IS NULL 379 ` 380 381 case "social.coves.community.comment": 382 // Reply to comment - update comments.reply_count 383 updateQuery = ` 384 UPDATE comments 385 SET reply_count = reply_count + 1 386 WHERE uri = $1 AND deleted_at IS NULL 387 ` 388 389 default: 390 // Unknown or unsupported parent collection 391 // Comment is still indexed, we just don't update parent counts 392 log.Printf("Comment parent has unsupported collection: %s (comment indexed, parent count not updated)", collection) 393 if commitErr := tx.Commit(); commitErr != nil { 394 return fmt.Errorf("failed to commit transaction: %w", commitErr) 395 } 396 return nil 397 } 398 399 result, err := tx.ExecContext(ctx, updateQuery, comment.ParentURI) 400 if err != nil { 401 return fmt.Errorf("failed to update parent count: %w", err) 402 } 403 404 rowsAffected, err := result.RowsAffected() 405 if err != nil { 406 return fmt.Errorf("failed to check update result: %w", err) 407 } 408 409 // If parent not found, that's OK (parent might not be indexed yet) 410 if rowsAffected == 0 { 411 log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI) 412 } 413 414 // Commit transaction 415 if err := tx.Commit(); err != nil { 416 return fmt.Errorf("failed to commit transaction: %w", err) 417 } 418 419 return nil 420} 421 422// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts 423func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error { 424 tx, err := c.db.BeginTx(ctx, nil) 425 if err != nil { 426 return fmt.Errorf("failed to begin transaction: %w", err) 427 } 428 defer func() { 429 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 430 log.Printf("Failed to rollback transaction: %v", rollbackErr) 431 } 432 }() 433 434 // 1. Soft-delete the comment (idempotent) 435 deleteQuery := ` 436 UPDATE comments 437 SET deleted_at = $2 438 WHERE uri = $1 AND deleted_at IS NULL 439 ` 440 441 result, err := tx.ExecContext(ctx, deleteQuery, comment.URI, time.Now()) 442 if err != nil { 443 return fmt.Errorf("failed to delete comment: %w", err) 444 } 445 446 rowsAffected, err := result.RowsAffected() 447 if err != nil { 448 return fmt.Errorf("failed to check delete result: %w", err) 449 } 450 451 // Idempotent: If no rows affected, comment already deleted 452 if rowsAffected == 0 { 453 log.Printf("Comment already deleted: %s (idempotent)", comment.URI) 454 if commitErr := tx.Commit(); commitErr != nil { 455 return fmt.Errorf("failed to commit transaction: %w", commitErr) 456 } 457 return nil 458 } 459 460 // 2. Decrement parent counts atomically 461 // Parent could be a post or comment - parse collection to determine target table 462 collection := utils.ExtractCollectionFromURI(comment.ParentURI) 463 464 var updateQuery string 465 switch collection { 466 case "social.coves.community.post": 467 // Comment on post - decrement posts.comment_count 468 updateQuery = ` 469 UPDATE posts 470 SET comment_count = GREATEST(0, comment_count - 1) 471 WHERE uri = $1 AND deleted_at IS NULL 472 ` 473 474 case "social.coves.community.comment": 475 // Reply to comment - decrement comments.reply_count 476 updateQuery = ` 477 UPDATE comments 478 SET reply_count = GREATEST(0, reply_count - 1) 479 WHERE uri = $1 AND deleted_at IS NULL 480 ` 481 482 default: 483 // Unknown or unsupported parent collection 484 // Comment is still deleted, we just don't update parent counts 485 log.Printf("Comment parent has unsupported collection: %s (comment deleted, parent count not updated)", collection) 486 if commitErr := tx.Commit(); commitErr != nil { 487 return fmt.Errorf("failed to commit transaction: %w", commitErr) 488 } 489 return nil 490 } 491 492 result, err = tx.ExecContext(ctx, updateQuery, comment.ParentURI) 493 if err != nil { 494 return fmt.Errorf("failed to update parent count: %w", err) 495 } 496 497 rowsAffected, err = result.RowsAffected() 498 if err != nil { 499 return fmt.Errorf("failed to check update result: %w", err) 500 } 501 502 // If parent not found, that's OK (parent might be deleted) 503 if rowsAffected == 0 { 504 log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI) 505 } 506 507 // Commit transaction 508 if err := tx.Commit(); err != nil { 509 return fmt.Errorf("failed to commit transaction: %w", err) 510 } 511 512 return nil 513} 514 515// validateCommentEvent performs security validation on comment events 516func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error { 517 // SECURITY: Comments MUST come from user repositories (repo owner = commenter DID) 518 // The repository owner (repoDID) IS the commenter - comments are stored in user repos. 519 // 520 // We do NOT check if the user exists in AppView because: 521 // 1. Comment events may arrive before user events in Jetstream (race condition) 522 // 2. The comment came from the user's PDS repository (authenticated by PDS) 523 // 3. The database FK constraint was removed to allow out-of-order indexing 524 // 4. Orphaned comments (from never-indexed users) are harmless 525 // 526 // Security is maintained because: 527 // - Comment must come from user's own PDS repository (verified by atProto) 528 // - Fake DIDs will fail PDS authentication 529 530 // Validate DID format (basic sanity check) 531 if !strings.HasPrefix(repoDID, "did:") { 532 return fmt.Errorf("invalid commenter DID format: %s", repoDID) 533 } 534 535 // Validate content is not empty (required per lexicon) 536 if comment.Content == "" { 537 return fmt.Errorf("comment content is required") 538 } 539 540 // Validate content length (defensive check - PDS should enforce this) 541 // Per lexicon: max 3000 graphemes, ~30000 bytes 542 // We check bytes as a simple defensive measure 543 if len(comment.Content) > MaxCommentContentBytes { 544 return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content)) 545 } 546 547 // Validate reply references exist 548 if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" { 549 return fmt.Errorf("invalid root reference: must have both URI and CID") 550 } 551 552 if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" { 553 return fmt.Errorf("invalid parent reference: must have both URI and CID") 554 } 555 556 // Validate AT-URI structure for root and parent 557 if err := validateATURI(comment.Reply.Root.URI); err != nil { 558 return fmt.Errorf("invalid root URI: %w", err) 559 } 560 561 if err := validateATURI(comment.Reply.Parent.URI); err != nil { 562 return fmt.Errorf("invalid parent URI: %w", err) 563 } 564 565 return nil 566} 567 568// validateATURI performs basic structure validation on AT-URIs 569// Format: at://did:method:id/collection/rkey 570// This is defensive validation - we trust PDS but catch obviously malformed URIs 571func validateATURI(uri string) error { 572 if !strings.HasPrefix(uri, ATProtoScheme) { 573 return fmt.Errorf("must start with %s", ATProtoScheme) 574 } 575 576 // Remove at:// prefix and split by / 577 withoutScheme := strings.TrimPrefix(uri, ATProtoScheme) 578 parts := strings.Split(withoutScheme, "/") 579 580 // Must have at least 3 parts: did, collection, rkey 581 if len(parts) < 3 { 582 return fmt.Errorf("invalid structure (expected at://did/collection/rkey)") 583 } 584 585 // First part should be a DID 586 if !strings.HasPrefix(parts[0], "did:") { 587 return fmt.Errorf("repository identifier must be a DID") 588 } 589 590 // Collection and rkey should not be empty 591 if parts[1] == "" || parts[2] == "" { 592 return fmt.Errorf("collection and rkey cannot be empty") 593 } 594 595 return nil 596} 597 598// CommentRecordFromJetstream represents a comment record as received from Jetstream 599// Matches social.coves.community.comment lexicon 600type CommentRecordFromJetstream struct { 601 Labels interface{} `json:"labels,omitempty"` 602 Embed map[string]interface{} `json:"embed,omitempty"` 603 Reply ReplyRefFromJetstream `json:"reply"` 604 Type string `json:"$type"` 605 Content string `json:"content"` 606 CreatedAt string `json:"createdAt"` 607 Facets []interface{} `json:"facets,omitempty"` 608 Langs []string `json:"langs,omitempty"` 609} 610 611// ReplyRefFromJetstream represents the threading structure 612type ReplyRefFromJetstream struct { 613 Root StrongRefFromJetstream `json:"root"` 614 Parent StrongRefFromJetstream `json:"parent"` 615} 616 617// parseCommentRecord parses a comment record from Jetstream event data 618func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) { 619 // Marshal to JSON and back for proper type conversion 620 recordJSON, err := json.Marshal(record) 621 if err != nil { 622 return nil, fmt.Errorf("failed to marshal record: %w", err) 623 } 624 625 var comment CommentRecordFromJetstream 626 if err := json.Unmarshal(recordJSON, &comment); err != nil { 627 return nil, fmt.Errorf("failed to unmarshal comment record: %w", err) 628 } 629 630 // Validate required fields 631 if comment.Content == "" { 632 return nil, fmt.Errorf("comment record missing content field") 633 } 634 635 if comment.CreatedAt == "" { 636 return nil, fmt.Errorf("comment record missing createdAt field") 637 } 638 639 return &comment, nil 640} 641 642// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings 643// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication) 644func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) { 645 // Serialize facets if present 646 if len(commentRecord.Facets) > 0 { 647 if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil { 648 facetsStr := string(facetsBytes) 649 facetsJSON = &facetsStr 650 } 651 } 652 653 // Serialize embed if present 654 if len(commentRecord.Embed) > 0 { 655 if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil { 656 embedStr := string(embedBytes) 657 embedJSON = &embedStr 658 } 659 } 660 661 // Serialize labels if present 662 if commentRecord.Labels != nil { 663 if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil { 664 labelsStr := string(labelsBytes) 665 labelsJSON = &labelsStr 666 } 667 } 668 669 return facetsJSON, embedJSON, labelsJSON 670}