A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "log" 9 "strings" 10 "time" 11 12 "Coves/internal/atproto/utils" 13 "Coves/internal/core/comments" 14 15 "github.com/lib/pq" 16) 17 18// Constants for comment validation and processing 19const ( 20 // CommentCollection is the lexicon collection identifier for comments 21 CommentCollection = "social.coves.community.comment" 22 23 // ATProtoScheme is the URI scheme for atProto AT-URIs 24 ATProtoScheme = "at://" 25 26 // MaxCommentContentBytes is the maximum allowed size for comment content 27 // Per lexicon: max 3000 graphemes, ~30000 bytes 28 MaxCommentContentBytes = 30000 29) 30 31// CommentEventConsumer consumes comment-related events from Jetstream 32// Handles CREATE, UPDATE, and DELETE operations for social.coves.community.comment 33type CommentEventConsumer struct { 34 commentRepo comments.Repository 35 db *sql.DB // Direct DB access for atomic count updates 36} 37 38// NewCommentEventConsumer creates a new Jetstream consumer for comment events 39func NewCommentEventConsumer( 40 commentRepo comments.Repository, 41 db *sql.DB, 42) *CommentEventConsumer { 43 return &CommentEventConsumer{ 44 commentRepo: commentRepo, 45 db: db, 46 } 47} 48 49// HandleEvent processes a Jetstream event for comment records 50func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 51 // We only care about commit events for comment records 52 if event.Kind != "commit" || event.Commit == nil { 53 return nil 54 } 55 56 commit := event.Commit 57 58 // Handle comment record operations 59 if commit.Collection == CommentCollection { 60 switch commit.Operation { 61 case "create": 62 return c.createComment(ctx, event.Did, commit) 63 case "update": 64 return c.updateComment(ctx, event.Did, commit) 65 case "delete": 66 return c.deleteComment(ctx, event.Did, commit) 67 } 68 } 69 70 // Silently ignore other operations and collections 71 return nil 72} 73 74// createComment indexes a new comment from the firehose and updates parent counts 75func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 76 if commit.Record == nil { 77 return fmt.Errorf("comment create event missing record data") 78 } 79 80 // Parse the comment record 81 commentRecord, err := parseCommentRecord(commit.Record) 82 if err != nil { 83 return fmt.Errorf("failed to parse comment record: %w", err) 84 } 85 86 // SECURITY: Validate this is a legitimate comment event 87 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil { 88 log.Printf("🚨 SECURITY: Rejecting comment event: %v", err) 89 return err 90 } 91 92 // Build AT-URI for this comment 93 // Format: at://commenter_did/social.coves.community.comment/rkey 94 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey) 95 96 // Parse timestamp from record 97 createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt) 98 if err != nil { 99 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err) 100 createdAt = time.Now() 101 } 102 103 // Serialize optional JSON fields 104 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord) 105 106 // Build comment entity 107 comment := &comments.Comment{ 108 URI: uri, 109 CID: commit.CID, 110 RKey: commit.RKey, 111 CommenterDID: repoDID, // Comment comes from user's repository 112 RootURI: commentRecord.Reply.Root.URI, 113 RootCID: commentRecord.Reply.Root.CID, 114 ParentURI: commentRecord.Reply.Parent.URI, 115 ParentCID: commentRecord.Reply.Parent.CID, 116 Content: commentRecord.Content, 117 ContentFacets: facetsJSON, 118 Embed: embedJSON, 119 ContentLabels: labelsJSON, 120 Langs: commentRecord.Langs, 121 CreatedAt: createdAt, 122 IndexedAt: time.Now(), 123 } 124 125 // Atomically: Index comment + Update parent counts 126 if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil { 127 return fmt.Errorf("failed to index comment and update counts: %w", err) 128 } 129 130 log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI) 131 return nil 132} 133 134// updateComment updates an existing comment's content fields 135func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 136 if commit.Record == nil { 137 return fmt.Errorf("comment update event missing record data") 138 } 139 140 // Parse the updated comment record 141 commentRecord, err := parseCommentRecord(commit.Record) 142 if err != nil { 143 return fmt.Errorf("failed to parse comment record: %w", err) 144 } 145 146 // SECURITY: Validate this is a legitimate update 147 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil { 148 log.Printf("🚨 SECURITY: Rejecting comment update: %v", err) 149 return err 150 } 151 152 // Build AT-URI for the comment being updated 153 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey) 154 155 // Fetch existing comment to validate threading references are immutable 156 existingComment, err := c.commentRepo.GetByURI(ctx, uri) 157 if err != nil { 158 if err == comments.ErrCommentNotFound { 159 // Comment doesn't exist yet - might arrive out of order 160 log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri) 161 return nil 162 } 163 return fmt.Errorf("failed to get existing comment for validation: %w", err) 164 } 165 166 // SECURITY: Threading references are IMMUTABLE after creation 167 // Reject updates that attempt to change root/parent (prevents thread hijacking) 168 if existingComment.RootURI != commentRecord.Reply.Root.URI || 169 existingComment.RootCID != commentRecord.Reply.Root.CID || 170 existingComment.ParentURI != commentRecord.Reply.Parent.URI || 171 existingComment.ParentCID != commentRecord.Reply.Parent.CID { 172 log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri) 173 log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID) 174 log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID) 175 log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID) 176 log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID) 177 return fmt.Errorf("comment threading references cannot be changed after creation") 178 } 179 180 // Serialize optional JSON fields 181 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord) 182 183 // Build comment update entity (preserves vote counts and created_at) 184 comment := &comments.Comment{ 185 URI: uri, 186 CID: commit.CID, 187 Content: commentRecord.Content, 188 ContentFacets: facetsJSON, 189 Embed: embedJSON, 190 ContentLabels: labelsJSON, 191 Langs: commentRecord.Langs, 192 } 193 194 // Update the comment in repository 195 if err := c.commentRepo.Update(ctx, comment); err != nil { 196 return fmt.Errorf("failed to update comment: %w", err) 197 } 198 199 log.Printf("✓ Updated comment: %s", uri) 200 return nil 201} 202 203// deleteComment soft-deletes a comment and updates parent counts 204func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 205 // Build AT-URI for the comment being deleted 206 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey) 207 208 // Get existing comment to know its parent (for decrementing the right counter) 209 existingComment, err := c.commentRepo.GetByURI(ctx, uri) 210 if err != nil { 211 if err == comments.ErrCommentNotFound { 212 // Idempotent: Comment already deleted or never existed 213 log.Printf("Comment already deleted or not found: %s", uri) 214 return nil 215 } 216 return fmt.Errorf("failed to get existing comment: %w", err) 217 } 218 219 // Atomically: Soft-delete comment + Update parent counts 220 if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil { 221 return fmt.Errorf("failed to delete comment and update counts: %w", err) 222 } 223 224 log.Printf("✓ Deleted comment: %s", uri) 225 return nil 226} 227 228// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts 229func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error { 230 tx, err := c.db.BeginTx(ctx, nil) 231 if err != nil { 232 return fmt.Errorf("failed to begin transaction: %w", err) 233 } 234 defer func() { 235 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 236 log.Printf("Failed to rollback transaction: %v", rollbackErr) 237 } 238 }() 239 240 // 1. Check if comment exists and handle resurrection case 241 // In atProto, deleted records' rkeys become available - users can recreate with same rkey 242 // We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts) 243 var existingID int64 244 var existingDeletedAt *time.Time 245 checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1` 246 checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt) 247 248 var commentID int64 249 250 if checkErr == nil { 251 // Comment exists 252 if existingDeletedAt == nil { 253 // Not deleted - this is an idempotent replay, skip gracefully 254 log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI) 255 if commitErr := tx.Commit(); commitErr != nil { 256 return fmt.Errorf("failed to commit transaction: %w", commitErr) 257 } 258 return nil 259 } 260 261 // Comment was soft-deleted, now being recreated (resurrection) 262 // This is a NEW record with same rkey - update ALL fields including threading refs 263 // User may have deleted old comment and created a new one on a different parent/root 264 log.Printf("Resurrecting previously deleted comment: %s", comment.URI) 265 commentID = existingID 266 267 resurrectQuery := ` 268 UPDATE comments 269 SET 270 cid = $1, 271 commenter_did = $2, 272 root_uri = $3, 273 root_cid = $4, 274 parent_uri = $5, 275 parent_cid = $6, 276 content = $7, 277 content_facets = $8, 278 embed = $9, 279 content_labels = $10, 280 langs = $11, 281 created_at = $12, 282 indexed_at = $13, 283 deleted_at = NULL, 284 reply_count = 0 285 WHERE id = $14 286 ` 287 288 _, err = tx.ExecContext( 289 ctx, resurrectQuery, 290 comment.CID, 291 comment.CommenterDID, 292 comment.RootURI, 293 comment.RootCID, 294 comment.ParentURI, 295 comment.ParentCID, 296 comment.Content, 297 comment.ContentFacets, 298 comment.Embed, 299 comment.ContentLabels, 300 pq.Array(comment.Langs), 301 comment.CreatedAt, 302 time.Now(), 303 commentID, 304 ) 305 if err != nil { 306 return fmt.Errorf("failed to resurrect comment: %w", err) 307 } 308 309 } else if checkErr == sql.ErrNoRows { 310 // Comment doesn't exist - insert new comment 311 insertQuery := ` 312 INSERT INTO comments ( 313 uri, cid, rkey, commenter_did, 314 root_uri, root_cid, parent_uri, parent_cid, 315 content, content_facets, embed, content_labels, langs, 316 created_at, indexed_at 317 ) VALUES ( 318 $1, $2, $3, $4, 319 $5, $6, $7, $8, 320 $9, $10, $11, $12, $13, 321 $14, $15 322 ) 323 RETURNING id 324 ` 325 326 err = tx.QueryRowContext( 327 ctx, insertQuery, 328 comment.URI, comment.CID, comment.RKey, comment.CommenterDID, 329 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID, 330 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs), 331 comment.CreatedAt, time.Now(), 332 ).Scan(&commentID) 333 if err != nil { 334 return fmt.Errorf("failed to insert comment: %w", err) 335 } 336 337 } else { 338 // Unexpected error checking for existing comment 339 return fmt.Errorf("failed to check for existing comment: %w", checkErr) 340 } 341 342 // 1.5. Reconcile reply_count for this newly inserted comment 343 // In case any replies arrived out-of-order before this parent was indexed 344 reconcileQuery := ` 345 UPDATE comments 346 SET reply_count = ( 347 SELECT COUNT(*) 348 FROM comments c 349 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL 350 ) 351 WHERE id = $2 352 ` 353 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID) 354 if reconcileErr != nil { 355 log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr) 356 // Continue anyway - this is a best-effort reconciliation 357 } 358 359 // 2. Update parent counts atomically 360 // Parent could be a post (increment comment_count) or a comment (increment reply_count) 361 // Parse collection from parent URI to determine target table 362 // 363 // NOTE: Post comment_count reconciliation IS implemented in post_consumer.go:210-226 364 // When a comment arrives before its parent post, the post update below returns 0 rows 365 // and we log a warning. Later, when the post is indexed, the post consumer reconciles 366 // comment_count by counting all pre-existing comments. This ensures accurate counts 367 // despite out-of-order Jetstream event delivery. 368 // 369 // Test coverage: TestPostConsumer_CommentCountReconciliation in post_consumer_test.go 370 collection := utils.ExtractCollectionFromURI(comment.ParentURI) 371 372 var updateQuery string 373 switch collection { 374 case "social.coves.community.post": 375 // Comment on post - update posts.comment_count 376 updateQuery = ` 377 UPDATE posts 378 SET comment_count = comment_count + 1 379 WHERE uri = $1 AND deleted_at IS NULL 380 ` 381 382 case "social.coves.community.comment": 383 // Reply to comment - update comments.reply_count 384 updateQuery = ` 385 UPDATE comments 386 SET reply_count = reply_count + 1 387 WHERE uri = $1 AND deleted_at IS NULL 388 ` 389 390 default: 391 // Unknown or unsupported parent collection 392 // Comment is still indexed, we just don't update parent counts 393 log.Printf("Comment parent has unsupported collection: %s (comment indexed, parent count not updated)", collection) 394 if commitErr := tx.Commit(); commitErr != nil { 395 return fmt.Errorf("failed to commit transaction: %w", commitErr) 396 } 397 return nil 398 } 399 400 result, err := tx.ExecContext(ctx, updateQuery, comment.ParentURI) 401 if err != nil { 402 return fmt.Errorf("failed to update parent count: %w", err) 403 } 404 405 rowsAffected, err := result.RowsAffected() 406 if err != nil { 407 return fmt.Errorf("failed to check update result: %w", err) 408 } 409 410 // If parent not found, that's OK (parent might not be indexed yet) 411 if rowsAffected == 0 { 412 log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI) 413 } 414 415 // Commit transaction 416 if err := tx.Commit(); err != nil { 417 return fmt.Errorf("failed to commit transaction: %w", err) 418 } 419 420 return nil 421} 422 423// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts 424func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error { 425 tx, err := c.db.BeginTx(ctx, nil) 426 if err != nil { 427 return fmt.Errorf("failed to begin transaction: %w", err) 428 } 429 defer func() { 430 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 431 log.Printf("Failed to rollback transaction: %v", rollbackErr) 432 } 433 }() 434 435 // 1. Soft-delete the comment (idempotent) 436 deleteQuery := ` 437 UPDATE comments 438 SET deleted_at = $2 439 WHERE uri = $1 AND deleted_at IS NULL 440 ` 441 442 result, err := tx.ExecContext(ctx, deleteQuery, comment.URI, time.Now()) 443 if err != nil { 444 return fmt.Errorf("failed to delete comment: %w", err) 445 } 446 447 rowsAffected, err := result.RowsAffected() 448 if err != nil { 449 return fmt.Errorf("failed to check delete result: %w", err) 450 } 451 452 // Idempotent: If no rows affected, comment already deleted 453 if rowsAffected == 0 { 454 log.Printf("Comment already deleted: %s (idempotent)", comment.URI) 455 if commitErr := tx.Commit(); commitErr != nil { 456 return fmt.Errorf("failed to commit transaction: %w", commitErr) 457 } 458 return nil 459 } 460 461 // 2. Decrement parent counts atomically 462 // Parent could be a post or comment - parse collection to determine target table 463 collection := utils.ExtractCollectionFromURI(comment.ParentURI) 464 465 var updateQuery string 466 switch collection { 467 case "social.coves.community.post": 468 // Comment on post - decrement posts.comment_count 469 updateQuery = ` 470 UPDATE posts 471 SET comment_count = GREATEST(0, comment_count - 1) 472 WHERE uri = $1 AND deleted_at IS NULL 473 ` 474 475 case "social.coves.community.comment": 476 // Reply to comment - decrement comments.reply_count 477 updateQuery = ` 478 UPDATE comments 479 SET reply_count = GREATEST(0, reply_count - 1) 480 WHERE uri = $1 AND deleted_at IS NULL 481 ` 482 483 default: 484 // Unknown or unsupported parent collection 485 // Comment is still deleted, we just don't update parent counts 486 log.Printf("Comment parent has unsupported collection: %s (comment deleted, parent count not updated)", collection) 487 if commitErr := tx.Commit(); commitErr != nil { 488 return fmt.Errorf("failed to commit transaction: %w", commitErr) 489 } 490 return nil 491 } 492 493 result, err = tx.ExecContext(ctx, updateQuery, comment.ParentURI) 494 if err != nil { 495 return fmt.Errorf("failed to update parent count: %w", err) 496 } 497 498 rowsAffected, err = result.RowsAffected() 499 if err != nil { 500 return fmt.Errorf("failed to check update result: %w", err) 501 } 502 503 // If parent not found, that's OK (parent might be deleted) 504 if rowsAffected == 0 { 505 log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI) 506 } 507 508 // Commit transaction 509 if err := tx.Commit(); err != nil { 510 return fmt.Errorf("failed to commit transaction: %w", err) 511 } 512 513 return nil 514} 515 516// validateCommentEvent performs security validation on comment events 517func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error { 518 // SECURITY: Comments MUST come from user repositories (repo owner = commenter DID) 519 // The repository owner (repoDID) IS the commenter - comments are stored in user repos. 520 // 521 // We do NOT check if the user exists in AppView because: 522 // 1. Comment events may arrive before user events in Jetstream (race condition) 523 // 2. The comment came from the user's PDS repository (authenticated by PDS) 524 // 3. The database FK constraint was removed to allow out-of-order indexing 525 // 4. Orphaned comments (from never-indexed users) are harmless 526 // 527 // Security is maintained because: 528 // - Comment must come from user's own PDS repository (verified by atProto) 529 // - Fake DIDs will fail PDS authentication 530 531 // Validate DID format (basic sanity check) 532 if !strings.HasPrefix(repoDID, "did:") { 533 return fmt.Errorf("invalid commenter DID format: %s", repoDID) 534 } 535 536 // Validate content is not empty (required per lexicon) 537 if comment.Content == "" { 538 return fmt.Errorf("comment content is required") 539 } 540 541 // Validate content length (defensive check - PDS should enforce this) 542 // Per lexicon: max 3000 graphemes, ~30000 bytes 543 // We check bytes as a simple defensive measure 544 if len(comment.Content) > MaxCommentContentBytes { 545 return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content)) 546 } 547 548 // Validate reply references exist 549 if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" { 550 return fmt.Errorf("invalid root reference: must have both URI and CID") 551 } 552 553 if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" { 554 return fmt.Errorf("invalid parent reference: must have both URI and CID") 555 } 556 557 // Validate AT-URI structure for root and parent 558 if err := validateATURI(comment.Reply.Root.URI); err != nil { 559 return fmt.Errorf("invalid root URI: %w", err) 560 } 561 562 if err := validateATURI(comment.Reply.Parent.URI); err != nil { 563 return fmt.Errorf("invalid parent URI: %w", err) 564 } 565 566 return nil 567} 568 569// validateATURI performs basic structure validation on AT-URIs 570// Format: at://did:method:id/collection/rkey 571// This is defensive validation - we trust PDS but catch obviously malformed URIs 572func validateATURI(uri string) error { 573 if !strings.HasPrefix(uri, ATProtoScheme) { 574 return fmt.Errorf("must start with %s", ATProtoScheme) 575 } 576 577 // Remove at:// prefix and split by / 578 withoutScheme := strings.TrimPrefix(uri, ATProtoScheme) 579 parts := strings.Split(withoutScheme, "/") 580 581 // Must have at least 3 parts: did, collection, rkey 582 if len(parts) < 3 { 583 return fmt.Errorf("invalid structure (expected at://did/collection/rkey)") 584 } 585 586 // First part should be a DID 587 if !strings.HasPrefix(parts[0], "did:") { 588 return fmt.Errorf("repository identifier must be a DID") 589 } 590 591 // Collection and rkey should not be empty 592 if parts[1] == "" || parts[2] == "" { 593 return fmt.Errorf("collection and rkey cannot be empty") 594 } 595 596 return nil 597} 598 599// CommentRecordFromJetstream represents a comment record as received from Jetstream 600// Matches social.coves.community.comment lexicon 601type CommentRecordFromJetstream struct { 602 Labels interface{} `json:"labels,omitempty"` 603 Embed map[string]interface{} `json:"embed,omitempty"` 604 Reply ReplyRefFromJetstream `json:"reply"` 605 Type string `json:"$type"` 606 Content string `json:"content"` 607 CreatedAt string `json:"createdAt"` 608 Facets []interface{} `json:"facets,omitempty"` 609 Langs []string `json:"langs,omitempty"` 610} 611 612// ReplyRefFromJetstream represents the threading structure 613type ReplyRefFromJetstream struct { 614 Root StrongRefFromJetstream `json:"root"` 615 Parent StrongRefFromJetstream `json:"parent"` 616} 617 618// parseCommentRecord parses a comment record from Jetstream event data 619func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) { 620 // Marshal to JSON and back for proper type conversion 621 recordJSON, err := json.Marshal(record) 622 if err != nil { 623 return nil, fmt.Errorf("failed to marshal record: %w", err) 624 } 625 626 var comment CommentRecordFromJetstream 627 if err := json.Unmarshal(recordJSON, &comment); err != nil { 628 return nil, fmt.Errorf("failed to unmarshal comment record: %w", err) 629 } 630 631 // Validate required fields 632 if comment.Content == "" { 633 return nil, fmt.Errorf("comment record missing content field") 634 } 635 636 if comment.CreatedAt == "" { 637 return nil, fmt.Errorf("comment record missing createdAt field") 638 } 639 640 return &comment, nil 641} 642 643// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings 644// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication) 645func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) { 646 // Serialize facets if present 647 if len(commentRecord.Facets) > 0 { 648 if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil { 649 facetsStr := string(facetsBytes) 650 facetsJSON = &facetsStr 651 } 652 } 653 654 // Serialize embed if present 655 if len(commentRecord.Embed) > 0 { 656 if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil { 657 embedStr := string(embedBytes) 658 embedJSON = &embedStr 659 } 660 } 661 662 // Serialize labels if present 663 if commentRecord.Labels != nil { 664 if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil { 665 labelsStr := string(labelsBytes) 666 labelsJSON = &labelsStr 667 } 668 } 669 670 return facetsJSON, embedJSON, labelsJSON 671}