A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/core/comments" 5 "context" 6 "database/sql" 7 "encoding/json" 8 "fmt" 9 "log" 10 "strings" 11 "time" 12 13 "github.com/lib/pq" 14) 15 16// Constants for comment validation and processing 17const ( 18 // CommentCollection is the lexicon collection identifier for comments 19 CommentCollection = "social.coves.feed.comment" 20 21 // ATProtoScheme is the URI scheme for atProto AT-URIs 22 ATProtoScheme = "at://" 23 24 // MaxCommentContentBytes is the maximum allowed size for comment content 25 // Per lexicon: max 3000 graphemes, ~30000 bytes 26 MaxCommentContentBytes = 30000 27) 28 29// CommentEventConsumer consumes comment-related events from Jetstream 30// Handles CREATE, UPDATE, and DELETE operations for social.coves.feed.comment 31type CommentEventConsumer struct { 32 commentRepo comments.Repository 33 db *sql.DB // Direct DB access for atomic count updates 34} 35 36// NewCommentEventConsumer creates a new Jetstream consumer for comment events 37func NewCommentEventConsumer( 38 commentRepo comments.Repository, 39 db *sql.DB, 40) *CommentEventConsumer { 41 return &CommentEventConsumer{ 42 commentRepo: commentRepo, 43 db: db, 44 } 45} 46 47// HandleEvent processes a Jetstream event for comment records 48func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 49 // We only care about commit events for comment records 50 if event.Kind != "commit" || event.Commit == nil { 51 return nil 52 } 53 54 commit := event.Commit 55 56 // Handle comment record operations 57 if commit.Collection == CommentCollection { 58 switch commit.Operation { 59 case "create": 60 return c.createComment(ctx, event.Did, commit) 61 case "update": 62 return c.updateComment(ctx, event.Did, commit) 63 case "delete": 64 return c.deleteComment(ctx, event.Did, commit) 65 } 66 } 67 68 // Silently ignore other operations and collections 69 return nil 70} 71 72// createComment indexes a new comment from the firehose and updates parent counts 73func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 74 if commit.Record == nil { 75 return fmt.Errorf("comment create event missing record data") 76 } 77 78 // Parse the comment record 79 commentRecord, err := parseCommentRecord(commit.Record) 80 if err != nil { 81 return fmt.Errorf("failed to parse comment record: %w", err) 82 } 83 84 // SECURITY: Validate this is a legitimate comment event 85 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil { 86 log.Printf("🚨 SECURITY: Rejecting comment event: %v", err) 87 return err 88 } 89 90 // Build AT-URI for this comment 91 // Format: at://commenter_did/social.coves.feed.comment/rkey 92 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey) 93 94 // Parse timestamp from record 95 createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt) 96 if err != nil { 97 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err) 98 createdAt = time.Now() 99 } 100 101 // Serialize optional JSON fields 102 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord) 103 104 // Build comment entity 105 comment := &comments.Comment{ 106 URI: uri, 107 CID: commit.CID, 108 RKey: commit.RKey, 109 CommenterDID: repoDID, // Comment comes from user's repository 110 RootURI: commentRecord.Reply.Root.URI, 111 RootCID: commentRecord.Reply.Root.CID, 112 ParentURI: commentRecord.Reply.Parent.URI, 113 ParentCID: commentRecord.Reply.Parent.CID, 114 Content: commentRecord.Content, 115 ContentFacets: facetsJSON, 116 Embed: embedJSON, 117 ContentLabels: labelsJSON, 118 Langs: commentRecord.Langs, 119 CreatedAt: createdAt, 120 IndexedAt: time.Now(), 121 } 122 123 // Atomically: Index comment + Update parent counts 124 if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil { 125 return fmt.Errorf("failed to index comment and update counts: %w", err) 126 } 127 128 log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI) 129 return nil 130} 131 132// updateComment updates an existing comment's content fields 133func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 134 if commit.Record == nil { 135 return fmt.Errorf("comment update event missing record data") 136 } 137 138 // Parse the updated comment record 139 commentRecord, err := parseCommentRecord(commit.Record) 140 if err != nil { 141 return fmt.Errorf("failed to parse comment record: %w", err) 142 } 143 144 // SECURITY: Validate this is a legitimate update 145 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil { 146 log.Printf("🚨 SECURITY: Rejecting comment update: %v", err) 147 return err 148 } 149 150 // Build AT-URI for the comment being updated 151 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey) 152 153 // Fetch existing comment to validate threading references are immutable 154 existingComment, err := c.commentRepo.GetByURI(ctx, uri) 155 if err != nil { 156 if err == comments.ErrCommentNotFound { 157 // Comment doesn't exist yet - might arrive out of order 158 log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri) 159 return nil 160 } 161 return fmt.Errorf("failed to get existing comment for validation: %w", err) 162 } 163 164 // SECURITY: Threading references are IMMUTABLE after creation 165 // Reject updates that attempt to change root/parent (prevents thread hijacking) 166 if existingComment.RootURI != commentRecord.Reply.Root.URI || 167 existingComment.RootCID != commentRecord.Reply.Root.CID || 168 existingComment.ParentURI != commentRecord.Reply.Parent.URI || 169 existingComment.ParentCID != commentRecord.Reply.Parent.CID { 170 log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri) 171 log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID) 172 log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID) 173 log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID) 174 log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID) 175 return fmt.Errorf("comment threading references cannot be changed after creation") 176 } 177 178 // Serialize optional JSON fields 179 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord) 180 181 // Build comment update entity (preserves vote counts and created_at) 182 comment := &comments.Comment{ 183 URI: uri, 184 CID: commit.CID, 185 Content: commentRecord.Content, 186 ContentFacets: facetsJSON, 187 Embed: embedJSON, 188 ContentLabels: labelsJSON, 189 Langs: commentRecord.Langs, 190 } 191 192 // Update the comment in repository 193 if err := c.commentRepo.Update(ctx, comment); err != nil { 194 return fmt.Errorf("failed to update comment: %w", err) 195 } 196 197 log.Printf("✓ Updated comment: %s", uri) 198 return nil 199} 200 201// deleteComment soft-deletes a comment and updates parent counts 202func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 203 // Build AT-URI for the comment being deleted 204 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey) 205 206 // Get existing comment to know its parent (for decrementing the right counter) 207 existingComment, err := c.commentRepo.GetByURI(ctx, uri) 208 if err != nil { 209 if err == comments.ErrCommentNotFound { 210 // Idempotent: Comment already deleted or never existed 211 log.Printf("Comment already deleted or not found: %s", uri) 212 return nil 213 } 214 return fmt.Errorf("failed to get existing comment: %w", err) 215 } 216 217 // Atomically: Soft-delete comment + Update parent counts 218 if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil { 219 return fmt.Errorf("failed to delete comment and update counts: %w", err) 220 } 221 222 log.Printf("✓ Deleted comment: %s", uri) 223 return nil 224} 225 226// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts 227func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error { 228 tx, err := c.db.BeginTx(ctx, nil) 229 if err != nil { 230 return fmt.Errorf("failed to begin transaction: %w", err) 231 } 232 defer func() { 233 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 234 log.Printf("Failed to rollback transaction: %v", rollbackErr) 235 } 236 }() 237 238 // 1. Check if comment exists and handle resurrection case 239 // In atProto, deleted records' rkeys become available - users can recreate with same rkey 240 // We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts) 241 var existingID int64 242 var existingDeletedAt *time.Time 243 checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1` 244 checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt) 245 246 var commentID int64 247 248 if checkErr == nil { 249 // Comment exists 250 if existingDeletedAt == nil { 251 // Not deleted - this is an idempotent replay, skip gracefully 252 log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI) 253 if commitErr := tx.Commit(); commitErr != nil { 254 return fmt.Errorf("failed to commit transaction: %w", commitErr) 255 } 256 return nil 257 } 258 259 // Comment was soft-deleted, now being recreated (resurrection) 260 // This is a NEW record with same rkey - update ALL fields including threading refs 261 // User may have deleted old comment and created a new one on a different parent/root 262 log.Printf("Resurrecting previously deleted comment: %s", comment.URI) 263 commentID = existingID 264 265 resurrectQuery := ` 266 UPDATE comments 267 SET 268 cid = $1, 269 commenter_did = $2, 270 root_uri = $3, 271 root_cid = $4, 272 parent_uri = $5, 273 parent_cid = $6, 274 content = $7, 275 content_facets = $8, 276 embed = $9, 277 content_labels = $10, 278 langs = $11, 279 created_at = $12, 280 indexed_at = $13, 281 deleted_at = NULL, 282 reply_count = 0 283 WHERE id = $14 284 ` 285 286 _, err = tx.ExecContext( 287 ctx, resurrectQuery, 288 comment.CID, 289 comment.CommenterDID, 290 comment.RootURI, 291 comment.RootCID, 292 comment.ParentURI, 293 comment.ParentCID, 294 comment.Content, 295 comment.ContentFacets, 296 comment.Embed, 297 comment.ContentLabels, 298 pq.Array(comment.Langs), 299 comment.CreatedAt, 300 time.Now(), 301 commentID, 302 ) 303 if err != nil { 304 return fmt.Errorf("failed to resurrect comment: %w", err) 305 } 306 307 } else if checkErr == sql.ErrNoRows { 308 // Comment doesn't exist - insert new comment 309 insertQuery := ` 310 INSERT INTO comments ( 311 uri, cid, rkey, commenter_did, 312 root_uri, root_cid, parent_uri, parent_cid, 313 content, content_facets, embed, content_labels, langs, 314 created_at, indexed_at 315 ) VALUES ( 316 $1, $2, $3, $4, 317 $5, $6, $7, $8, 318 $9, $10, $11, $12, $13, 319 $14, $15 320 ) 321 RETURNING id 322 ` 323 324 err = tx.QueryRowContext( 325 ctx, insertQuery, 326 comment.URI, comment.CID, comment.RKey, comment.CommenterDID, 327 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID, 328 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs), 329 comment.CreatedAt, time.Now(), 330 ).Scan(&commentID) 331 if err != nil { 332 return fmt.Errorf("failed to insert comment: %w", err) 333 } 334 335 } else { 336 // Unexpected error checking for existing comment 337 return fmt.Errorf("failed to check for existing comment: %w", checkErr) 338 } 339 340 // 1.5. Reconcile reply_count for this newly inserted comment 341 // In case any replies arrived out-of-order before this parent was indexed 342 reconcileQuery := ` 343 UPDATE comments 344 SET reply_count = ( 345 SELECT COUNT(*) 346 FROM comments c 347 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL 348 ) 349 WHERE id = $2 350 ` 351 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID) 352 if reconcileErr != nil { 353 log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr) 354 // Continue anyway - this is a best-effort reconciliation 355 } 356 357 // 2. Update parent counts atomically 358 // Parent could be a post (increment comment_count) or a comment (increment reply_count) 359 // Try posts table first 360 // 361 // FIXME(P1): Post comment_count reconciliation not implemented 362 // When a comment arrives before its parent post (common with cross-repo Jetstream ordering), 363 // the post update below returns 0 rows and we only log a warning. Later, when the post 364 // is indexed by the post consumer, there's NO reconciliation logic to count pre-existing 365 // comments. This causes posts to have permanently stale comment_count values. 366 // 367 // FIX REQUIRED: Post consumer MUST implement the same reconciliation pattern as comments 368 // (see lines 292-305 above). When indexing a new post, count any comments where parent_uri 369 // matches the post URI and set comment_count accordingly. 370 // 371 // Test demonstrating issue: TestCommentConsumer_PostCountReconciliation_Limitation 372 updatePostQuery := ` 373 UPDATE posts 374 SET comment_count = comment_count + 1 375 WHERE uri = $1 AND deleted_at IS NULL 376 ` 377 378 result, err := tx.ExecContext(ctx, updatePostQuery, comment.ParentURI) 379 if err != nil { 380 return fmt.Errorf("failed to update post comment count: %w", err) 381 } 382 383 rowsAffected, err := result.RowsAffected() 384 if err != nil { 385 return fmt.Errorf("failed to check update result: %w", err) 386 } 387 388 // If no post was updated, parent is probably a comment 389 if rowsAffected == 0 { 390 updateCommentQuery := ` 391 UPDATE comments 392 SET reply_count = reply_count + 1 393 WHERE uri = $1 AND deleted_at IS NULL 394 ` 395 396 result, err := tx.ExecContext(ctx, updateCommentQuery, comment.ParentURI) 397 if err != nil { 398 return fmt.Errorf("failed to update comment reply count: %w", err) 399 } 400 401 rowsAffected, err := result.RowsAffected() 402 if err != nil { 403 return fmt.Errorf("failed to check update result: %w", err) 404 } 405 406 // If neither post nor comment was found, that's OK (parent might not be indexed yet) 407 if rowsAffected == 0 { 408 log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI) 409 } 410 } 411 412 // Commit transaction 413 if err := tx.Commit(); err != nil { 414 return fmt.Errorf("failed to commit transaction: %w", err) 415 } 416 417 return nil 418} 419 420// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts 421func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error { 422 tx, err := c.db.BeginTx(ctx, nil) 423 if err != nil { 424 return fmt.Errorf("failed to begin transaction: %w", err) 425 } 426 defer func() { 427 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 428 log.Printf("Failed to rollback transaction: %v", rollbackErr) 429 } 430 }() 431 432 // 1. Soft-delete the comment (idempotent) 433 deleteQuery := ` 434 UPDATE comments 435 SET deleted_at = $2 436 WHERE uri = $1 AND deleted_at IS NULL 437 ` 438 439 result, err := tx.ExecContext(ctx, deleteQuery, comment.URI, time.Now()) 440 if err != nil { 441 return fmt.Errorf("failed to delete comment: %w", err) 442 } 443 444 rowsAffected, err := result.RowsAffected() 445 if err != nil { 446 return fmt.Errorf("failed to check delete result: %w", err) 447 } 448 449 // Idempotent: If no rows affected, comment already deleted 450 if rowsAffected == 0 { 451 log.Printf("Comment already deleted: %s (idempotent)", comment.URI) 452 if commitErr := tx.Commit(); commitErr != nil { 453 return fmt.Errorf("failed to commit transaction: %w", commitErr) 454 } 455 return nil 456 } 457 458 // 2. Decrement parent counts atomically 459 // Parent could be a post or comment - try both (use GREATEST to prevent negative) 460 updatePostQuery := ` 461 UPDATE posts 462 SET comment_count = GREATEST(0, comment_count - 1) 463 WHERE uri = $1 AND deleted_at IS NULL 464 ` 465 466 result, err = tx.ExecContext(ctx, updatePostQuery, comment.ParentURI) 467 if err != nil { 468 return fmt.Errorf("failed to update post comment count: %w", err) 469 } 470 471 rowsAffected, err = result.RowsAffected() 472 if err != nil { 473 return fmt.Errorf("failed to check update result: %w", err) 474 } 475 476 // If no post was updated, parent is probably a comment 477 if rowsAffected == 0 { 478 updateCommentQuery := ` 479 UPDATE comments 480 SET reply_count = GREATEST(0, reply_count - 1) 481 WHERE uri = $1 AND deleted_at IS NULL 482 ` 483 484 result, err := tx.ExecContext(ctx, updateCommentQuery, comment.ParentURI) 485 if err != nil { 486 return fmt.Errorf("failed to update comment reply count: %w", err) 487 } 488 489 rowsAffected, err := result.RowsAffected() 490 if err != nil { 491 return fmt.Errorf("failed to check update result: %w", err) 492 } 493 494 // If neither was found, that's OK (parent might be deleted) 495 if rowsAffected == 0 { 496 log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI) 497 } 498 } 499 500 // Commit transaction 501 if err := tx.Commit(); err != nil { 502 return fmt.Errorf("failed to commit transaction: %w", err) 503 } 504 505 return nil 506} 507 508// validateCommentEvent performs security validation on comment events 509func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error { 510 // SECURITY: Comments MUST come from user repositories (repo owner = commenter DID) 511 // The repository owner (repoDID) IS the commenter - comments are stored in user repos. 512 // 513 // We do NOT check if the user exists in AppView because: 514 // 1. Comment events may arrive before user events in Jetstream (race condition) 515 // 2. The comment came from the user's PDS repository (authenticated by PDS) 516 // 3. The database FK constraint was removed to allow out-of-order indexing 517 // 4. Orphaned comments (from never-indexed users) are harmless 518 // 519 // Security is maintained because: 520 // - Comment must come from user's own PDS repository (verified by atProto) 521 // - Fake DIDs will fail PDS authentication 522 523 // Validate DID format (basic sanity check) 524 if !strings.HasPrefix(repoDID, "did:") { 525 return fmt.Errorf("invalid commenter DID format: %s", repoDID) 526 } 527 528 // Validate content is not empty (required per lexicon) 529 if comment.Content == "" { 530 return fmt.Errorf("comment content is required") 531 } 532 533 // Validate content length (defensive check - PDS should enforce this) 534 // Per lexicon: max 3000 graphemes, ~30000 bytes 535 // We check bytes as a simple defensive measure 536 if len(comment.Content) > MaxCommentContentBytes { 537 return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content)) 538 } 539 540 // Validate reply references exist 541 if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" { 542 return fmt.Errorf("invalid root reference: must have both URI and CID") 543 } 544 545 if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" { 546 return fmt.Errorf("invalid parent reference: must have both URI and CID") 547 } 548 549 // Validate AT-URI structure for root and parent 550 if err := validateATURI(comment.Reply.Root.URI); err != nil { 551 return fmt.Errorf("invalid root URI: %w", err) 552 } 553 554 if err := validateATURI(comment.Reply.Parent.URI); err != nil { 555 return fmt.Errorf("invalid parent URI: %w", err) 556 } 557 558 return nil 559} 560 561// validateATURI performs basic structure validation on AT-URIs 562// Format: at://did:method:id/collection/rkey 563// This is defensive validation - we trust PDS but catch obviously malformed URIs 564func validateATURI(uri string) error { 565 if !strings.HasPrefix(uri, ATProtoScheme) { 566 return fmt.Errorf("must start with %s", ATProtoScheme) 567 } 568 569 // Remove at:// prefix and split by / 570 withoutScheme := strings.TrimPrefix(uri, ATProtoScheme) 571 parts := strings.Split(withoutScheme, "/") 572 573 // Must have at least 3 parts: did, collection, rkey 574 if len(parts) < 3 { 575 return fmt.Errorf("invalid structure (expected at://did/collection/rkey)") 576 } 577 578 // First part should be a DID 579 if !strings.HasPrefix(parts[0], "did:") { 580 return fmt.Errorf("repository identifier must be a DID") 581 } 582 583 // Collection and rkey should not be empty 584 if parts[1] == "" || parts[2] == "" { 585 return fmt.Errorf("collection and rkey cannot be empty") 586 } 587 588 return nil 589} 590 591// CommentRecordFromJetstream represents a comment record as received from Jetstream 592// Matches social.coves.feed.comment lexicon 593type CommentRecordFromJetstream struct { 594 Labels interface{} `json:"labels,omitempty"` 595 Embed map[string]interface{} `json:"embed,omitempty"` 596 Reply ReplyRefFromJetstream `json:"reply"` 597 Type string `json:"$type"` 598 Content string `json:"content"` 599 CreatedAt string `json:"createdAt"` 600 Facets []interface{} `json:"facets,omitempty"` 601 Langs []string `json:"langs,omitempty"` 602} 603 604// ReplyRefFromJetstream represents the threading structure 605type ReplyRefFromJetstream struct { 606 Root StrongRefFromJetstream `json:"root"` 607 Parent StrongRefFromJetstream `json:"parent"` 608} 609 610// parseCommentRecord parses a comment record from Jetstream event data 611func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) { 612 // Marshal to JSON and back for proper type conversion 613 recordJSON, err := json.Marshal(record) 614 if err != nil { 615 return nil, fmt.Errorf("failed to marshal record: %w", err) 616 } 617 618 var comment CommentRecordFromJetstream 619 if err := json.Unmarshal(recordJSON, &comment); err != nil { 620 return nil, fmt.Errorf("failed to unmarshal comment record: %w", err) 621 } 622 623 // Validate required fields 624 if comment.Content == "" { 625 return nil, fmt.Errorf("comment record missing content field") 626 } 627 628 if comment.CreatedAt == "" { 629 return nil, fmt.Errorf("comment record missing createdAt field") 630 } 631 632 return &comment, nil 633} 634 635// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings 636// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication) 637func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) { 638 // Serialize facets if present 639 if len(commentRecord.Facets) > 0 { 640 if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil { 641 facetsStr := string(facetsBytes) 642 facetsJSON = &facetsStr 643 } 644 } 645 646 // Serialize embed if present 647 if len(commentRecord.Embed) > 0 { 648 if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil { 649 embedStr := string(embedBytes) 650 embedJSON = &embedStr 651 } 652 } 653 654 // Serialize labels if present 655 if commentRecord.Labels != nil { 656 if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil { 657 labelsStr := string(labelsBytes) 658 labelsJSON = &labelsStr 659 } 660 } 661 662 return facetsJSON, embedJSON, labelsJSON 663}