A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/core/comments"
5 "context"
6 "database/sql"
7 "encoding/json"
8 "fmt"
9 "log"
10 "strings"
11 "time"
12
13 "github.com/lib/pq"
14)
15
16// Constants for comment validation and processing
17const (
18 // CommentCollection is the lexicon collection identifier for comments
19 CommentCollection = "social.coves.feed.comment"
20
21 // ATProtoScheme is the URI scheme for atProto AT-URIs
22 ATProtoScheme = "at://"
23
24 // MaxCommentContentBytes is the maximum allowed size for comment content
25 // Per lexicon: max 3000 graphemes, ~30000 bytes
26 MaxCommentContentBytes = 30000
27)
28
29// CommentEventConsumer consumes comment-related events from Jetstream
30// Handles CREATE, UPDATE, and DELETE operations for social.coves.feed.comment
31type CommentEventConsumer struct {
32 commentRepo comments.Repository
33 db *sql.DB // Direct DB access for atomic count updates
34}
35
36// NewCommentEventConsumer creates a new Jetstream consumer for comment events
37func NewCommentEventConsumer(
38 commentRepo comments.Repository,
39 db *sql.DB,
40) *CommentEventConsumer {
41 return &CommentEventConsumer{
42 commentRepo: commentRepo,
43 db: db,
44 }
45}
46
47// HandleEvent processes a Jetstream event for comment records
48func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
49 // We only care about commit events for comment records
50 if event.Kind != "commit" || event.Commit == nil {
51 return nil
52 }
53
54 commit := event.Commit
55
56 // Handle comment record operations
57 if commit.Collection == CommentCollection {
58 switch commit.Operation {
59 case "create":
60 return c.createComment(ctx, event.Did, commit)
61 case "update":
62 return c.updateComment(ctx, event.Did, commit)
63 case "delete":
64 return c.deleteComment(ctx, event.Did, commit)
65 }
66 }
67
68 // Silently ignore other operations and collections
69 return nil
70}
71
72// createComment indexes a new comment from the firehose and updates parent counts
73func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
74 if commit.Record == nil {
75 return fmt.Errorf("comment create event missing record data")
76 }
77
78 // Parse the comment record
79 commentRecord, err := parseCommentRecord(commit.Record)
80 if err != nil {
81 return fmt.Errorf("failed to parse comment record: %w", err)
82 }
83
84 // SECURITY: Validate this is a legitimate comment event
85 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
86 log.Printf("🚨 SECURITY: Rejecting comment event: %v", err)
87 return err
88 }
89
90 // Build AT-URI for this comment
91 // Format: at://commenter_did/social.coves.feed.comment/rkey
92 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey)
93
94 // Parse timestamp from record
95 createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt)
96 if err != nil {
97 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err)
98 createdAt = time.Now()
99 }
100
101 // Serialize optional JSON fields
102 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
103
104 // Build comment entity
105 comment := &comments.Comment{
106 URI: uri,
107 CID: commit.CID,
108 RKey: commit.RKey,
109 CommenterDID: repoDID, // Comment comes from user's repository
110 RootURI: commentRecord.Reply.Root.URI,
111 RootCID: commentRecord.Reply.Root.CID,
112 ParentURI: commentRecord.Reply.Parent.URI,
113 ParentCID: commentRecord.Reply.Parent.CID,
114 Content: commentRecord.Content,
115 ContentFacets: facetsJSON,
116 Embed: embedJSON,
117 ContentLabels: labelsJSON,
118 Langs: commentRecord.Langs,
119 CreatedAt: createdAt,
120 IndexedAt: time.Now(),
121 }
122
123 // Atomically: Index comment + Update parent counts
124 if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil {
125 return fmt.Errorf("failed to index comment and update counts: %w", err)
126 }
127
128 log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI)
129 return nil
130}
131
132// updateComment updates an existing comment's content fields
133func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
134 if commit.Record == nil {
135 return fmt.Errorf("comment update event missing record data")
136 }
137
138 // Parse the updated comment record
139 commentRecord, err := parseCommentRecord(commit.Record)
140 if err != nil {
141 return fmt.Errorf("failed to parse comment record: %w", err)
142 }
143
144 // SECURITY: Validate this is a legitimate update
145 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
146 log.Printf("🚨 SECURITY: Rejecting comment update: %v", err)
147 return err
148 }
149
150 // Build AT-URI for the comment being updated
151 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey)
152
153 // Fetch existing comment to validate threading references are immutable
154 existingComment, err := c.commentRepo.GetByURI(ctx, uri)
155 if err != nil {
156 if err == comments.ErrCommentNotFound {
157 // Comment doesn't exist yet - might arrive out of order
158 log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri)
159 return nil
160 }
161 return fmt.Errorf("failed to get existing comment for validation: %w", err)
162 }
163
164 // SECURITY: Threading references are IMMUTABLE after creation
165 // Reject updates that attempt to change root/parent (prevents thread hijacking)
166 if existingComment.RootURI != commentRecord.Reply.Root.URI ||
167 existingComment.RootCID != commentRecord.Reply.Root.CID ||
168 existingComment.ParentURI != commentRecord.Reply.Parent.URI ||
169 existingComment.ParentCID != commentRecord.Reply.Parent.CID {
170 log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri)
171 log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID)
172 log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID)
173 log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID)
174 log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID)
175 return fmt.Errorf("comment threading references cannot be changed after creation")
176 }
177
178 // Serialize optional JSON fields
179 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
180
181 // Build comment update entity (preserves vote counts and created_at)
182 comment := &comments.Comment{
183 URI: uri,
184 CID: commit.CID,
185 Content: commentRecord.Content,
186 ContentFacets: facetsJSON,
187 Embed: embedJSON,
188 ContentLabels: labelsJSON,
189 Langs: commentRecord.Langs,
190 }
191
192 // Update the comment in repository
193 if err := c.commentRepo.Update(ctx, comment); err != nil {
194 return fmt.Errorf("failed to update comment: %w", err)
195 }
196
197 log.Printf("✓ Updated comment: %s", uri)
198 return nil
199}
200
201// deleteComment soft-deletes a comment and updates parent counts
202func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
203 // Build AT-URI for the comment being deleted
204 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey)
205
206 // Get existing comment to know its parent (for decrementing the right counter)
207 existingComment, err := c.commentRepo.GetByURI(ctx, uri)
208 if err != nil {
209 if err == comments.ErrCommentNotFound {
210 // Idempotent: Comment already deleted or never existed
211 log.Printf("Comment already deleted or not found: %s", uri)
212 return nil
213 }
214 return fmt.Errorf("failed to get existing comment: %w", err)
215 }
216
217 // Atomically: Soft-delete comment + Update parent counts
218 if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil {
219 return fmt.Errorf("failed to delete comment and update counts: %w", err)
220 }
221
222 log.Printf("✓ Deleted comment: %s", uri)
223 return nil
224}
225
226// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts
227func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
228 tx, err := c.db.BeginTx(ctx, nil)
229 if err != nil {
230 return fmt.Errorf("failed to begin transaction: %w", err)
231 }
232 defer func() {
233 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
234 log.Printf("Failed to rollback transaction: %v", rollbackErr)
235 }
236 }()
237
238 // 1. Check if comment exists and handle resurrection case
239 // In atProto, deleted records' rkeys become available - users can recreate with same rkey
240 // We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts)
241 var existingID int64
242 var existingDeletedAt *time.Time
243 checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1`
244 checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt)
245
246 var commentID int64
247
248 if checkErr == nil {
249 // Comment exists
250 if existingDeletedAt == nil {
251 // Not deleted - this is an idempotent replay, skip gracefully
252 log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI)
253 if commitErr := tx.Commit(); commitErr != nil {
254 return fmt.Errorf("failed to commit transaction: %w", commitErr)
255 }
256 return nil
257 }
258
259 // Comment was soft-deleted, now being recreated (resurrection)
260 // This is a NEW record with same rkey - update ALL fields including threading refs
261 // User may have deleted old comment and created a new one on a different parent/root
262 log.Printf("Resurrecting previously deleted comment: %s", comment.URI)
263 commentID = existingID
264
265 resurrectQuery := `
266 UPDATE comments
267 SET
268 cid = $1,
269 commenter_did = $2,
270 root_uri = $3,
271 root_cid = $4,
272 parent_uri = $5,
273 parent_cid = $6,
274 content = $7,
275 content_facets = $8,
276 embed = $9,
277 content_labels = $10,
278 langs = $11,
279 created_at = $12,
280 indexed_at = $13,
281 deleted_at = NULL,
282 reply_count = 0
283 WHERE id = $14
284 `
285
286 _, err = tx.ExecContext(
287 ctx, resurrectQuery,
288 comment.CID,
289 comment.CommenterDID,
290 comment.RootURI,
291 comment.RootCID,
292 comment.ParentURI,
293 comment.ParentCID,
294 comment.Content,
295 comment.ContentFacets,
296 comment.Embed,
297 comment.ContentLabels,
298 pq.Array(comment.Langs),
299 comment.CreatedAt,
300 time.Now(),
301 commentID,
302 )
303 if err != nil {
304 return fmt.Errorf("failed to resurrect comment: %w", err)
305 }
306
307 } else if checkErr == sql.ErrNoRows {
308 // Comment doesn't exist - insert new comment
309 insertQuery := `
310 INSERT INTO comments (
311 uri, cid, rkey, commenter_did,
312 root_uri, root_cid, parent_uri, parent_cid,
313 content, content_facets, embed, content_labels, langs,
314 created_at, indexed_at
315 ) VALUES (
316 $1, $2, $3, $4,
317 $5, $6, $7, $8,
318 $9, $10, $11, $12, $13,
319 $14, $15
320 )
321 RETURNING id
322 `
323
324 err = tx.QueryRowContext(
325 ctx, insertQuery,
326 comment.URI, comment.CID, comment.RKey, comment.CommenterDID,
327 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID,
328 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
329 comment.CreatedAt, time.Now(),
330 ).Scan(&commentID)
331 if err != nil {
332 return fmt.Errorf("failed to insert comment: %w", err)
333 }
334
335 } else {
336 // Unexpected error checking for existing comment
337 return fmt.Errorf("failed to check for existing comment: %w", checkErr)
338 }
339
340 // 1.5. Reconcile reply_count for this newly inserted comment
341 // In case any replies arrived out-of-order before this parent was indexed
342 reconcileQuery := `
343 UPDATE comments
344 SET reply_count = (
345 SELECT COUNT(*)
346 FROM comments c
347 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
348 )
349 WHERE id = $2
350 `
351 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID)
352 if reconcileErr != nil {
353 log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr)
354 // Continue anyway - this is a best-effort reconciliation
355 }
356
357 // 2. Update parent counts atomically
358 // Parent could be a post (increment comment_count) or a comment (increment reply_count)
359 // Try posts table first
360 //
361 // FIXME(P1): Post comment_count reconciliation not implemented
362 // When a comment arrives before its parent post (common with cross-repo Jetstream ordering),
363 // the post update below returns 0 rows and we only log a warning. Later, when the post
364 // is indexed by the post consumer, there's NO reconciliation logic to count pre-existing
365 // comments. This causes posts to have permanently stale comment_count values.
366 //
367 // FIX REQUIRED: Post consumer MUST implement the same reconciliation pattern as comments
368 // (see lines 292-305 above). When indexing a new post, count any comments where parent_uri
369 // matches the post URI and set comment_count accordingly.
370 //
371 // Test demonstrating issue: TestCommentConsumer_PostCountReconciliation_Limitation
372 updatePostQuery := `
373 UPDATE posts
374 SET comment_count = comment_count + 1
375 WHERE uri = $1 AND deleted_at IS NULL
376 `
377
378 result, err := tx.ExecContext(ctx, updatePostQuery, comment.ParentURI)
379 if err != nil {
380 return fmt.Errorf("failed to update post comment count: %w", err)
381 }
382
383 rowsAffected, err := result.RowsAffected()
384 if err != nil {
385 return fmt.Errorf("failed to check update result: %w", err)
386 }
387
388 // If no post was updated, parent is probably a comment
389 if rowsAffected == 0 {
390 updateCommentQuery := `
391 UPDATE comments
392 SET reply_count = reply_count + 1
393 WHERE uri = $1 AND deleted_at IS NULL
394 `
395
396 result, err := tx.ExecContext(ctx, updateCommentQuery, comment.ParentURI)
397 if err != nil {
398 return fmt.Errorf("failed to update comment reply count: %w", err)
399 }
400
401 rowsAffected, err := result.RowsAffected()
402 if err != nil {
403 return fmt.Errorf("failed to check update result: %w", err)
404 }
405
406 // If neither post nor comment was found, that's OK (parent might not be indexed yet)
407 if rowsAffected == 0 {
408 log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI)
409 }
410 }
411
412 // Commit transaction
413 if err := tx.Commit(); err != nil {
414 return fmt.Errorf("failed to commit transaction: %w", err)
415 }
416
417 return nil
418}
419
420// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts
421func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
422 tx, err := c.db.BeginTx(ctx, nil)
423 if err != nil {
424 return fmt.Errorf("failed to begin transaction: %w", err)
425 }
426 defer func() {
427 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
428 log.Printf("Failed to rollback transaction: %v", rollbackErr)
429 }
430 }()
431
432 // 1. Soft-delete the comment (idempotent)
433 deleteQuery := `
434 UPDATE comments
435 SET deleted_at = $2
436 WHERE uri = $1 AND deleted_at IS NULL
437 `
438
439 result, err := tx.ExecContext(ctx, deleteQuery, comment.URI, time.Now())
440 if err != nil {
441 return fmt.Errorf("failed to delete comment: %w", err)
442 }
443
444 rowsAffected, err := result.RowsAffected()
445 if err != nil {
446 return fmt.Errorf("failed to check delete result: %w", err)
447 }
448
449 // Idempotent: If no rows affected, comment already deleted
450 if rowsAffected == 0 {
451 log.Printf("Comment already deleted: %s (idempotent)", comment.URI)
452 if commitErr := tx.Commit(); commitErr != nil {
453 return fmt.Errorf("failed to commit transaction: %w", commitErr)
454 }
455 return nil
456 }
457
458 // 2. Decrement parent counts atomically
459 // Parent could be a post or comment - try both (use GREATEST to prevent negative)
460 updatePostQuery := `
461 UPDATE posts
462 SET comment_count = GREATEST(0, comment_count - 1)
463 WHERE uri = $1 AND deleted_at IS NULL
464 `
465
466 result, err = tx.ExecContext(ctx, updatePostQuery, comment.ParentURI)
467 if err != nil {
468 return fmt.Errorf("failed to update post comment count: %w", err)
469 }
470
471 rowsAffected, err = result.RowsAffected()
472 if err != nil {
473 return fmt.Errorf("failed to check update result: %w", err)
474 }
475
476 // If no post was updated, parent is probably a comment
477 if rowsAffected == 0 {
478 updateCommentQuery := `
479 UPDATE comments
480 SET reply_count = GREATEST(0, reply_count - 1)
481 WHERE uri = $1 AND deleted_at IS NULL
482 `
483
484 result, err := tx.ExecContext(ctx, updateCommentQuery, comment.ParentURI)
485 if err != nil {
486 return fmt.Errorf("failed to update comment reply count: %w", err)
487 }
488
489 rowsAffected, err := result.RowsAffected()
490 if err != nil {
491 return fmt.Errorf("failed to check update result: %w", err)
492 }
493
494 // If neither was found, that's OK (parent might be deleted)
495 if rowsAffected == 0 {
496 log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI)
497 }
498 }
499
500 // Commit transaction
501 if err := tx.Commit(); err != nil {
502 return fmt.Errorf("failed to commit transaction: %w", err)
503 }
504
505 return nil
506}
507
508// validateCommentEvent performs security validation on comment events
509func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error {
510 // SECURITY: Comments MUST come from user repositories (repo owner = commenter DID)
511 // The repository owner (repoDID) IS the commenter - comments are stored in user repos.
512 //
513 // We do NOT check if the user exists in AppView because:
514 // 1. Comment events may arrive before user events in Jetstream (race condition)
515 // 2. The comment came from the user's PDS repository (authenticated by PDS)
516 // 3. The database FK constraint was removed to allow out-of-order indexing
517 // 4. Orphaned comments (from never-indexed users) are harmless
518 //
519 // Security is maintained because:
520 // - Comment must come from user's own PDS repository (verified by atProto)
521 // - Fake DIDs will fail PDS authentication
522
523 // Validate DID format (basic sanity check)
524 if !strings.HasPrefix(repoDID, "did:") {
525 return fmt.Errorf("invalid commenter DID format: %s", repoDID)
526 }
527
528 // Validate content is not empty (required per lexicon)
529 if comment.Content == "" {
530 return fmt.Errorf("comment content is required")
531 }
532
533 // Validate content length (defensive check - PDS should enforce this)
534 // Per lexicon: max 3000 graphemes, ~30000 bytes
535 // We check bytes as a simple defensive measure
536 if len(comment.Content) > MaxCommentContentBytes {
537 return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content))
538 }
539
540 // Validate reply references exist
541 if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" {
542 return fmt.Errorf("invalid root reference: must have both URI and CID")
543 }
544
545 if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" {
546 return fmt.Errorf("invalid parent reference: must have both URI and CID")
547 }
548
549 // Validate AT-URI structure for root and parent
550 if err := validateATURI(comment.Reply.Root.URI); err != nil {
551 return fmt.Errorf("invalid root URI: %w", err)
552 }
553
554 if err := validateATURI(comment.Reply.Parent.URI); err != nil {
555 return fmt.Errorf("invalid parent URI: %w", err)
556 }
557
558 return nil
559}
560
561// validateATURI performs basic structure validation on AT-URIs
562// Format: at://did:method:id/collection/rkey
563// This is defensive validation - we trust PDS but catch obviously malformed URIs
564func validateATURI(uri string) error {
565 if !strings.HasPrefix(uri, ATProtoScheme) {
566 return fmt.Errorf("must start with %s", ATProtoScheme)
567 }
568
569 // Remove at:// prefix and split by /
570 withoutScheme := strings.TrimPrefix(uri, ATProtoScheme)
571 parts := strings.Split(withoutScheme, "/")
572
573 // Must have at least 3 parts: did, collection, rkey
574 if len(parts) < 3 {
575 return fmt.Errorf("invalid structure (expected at://did/collection/rkey)")
576 }
577
578 // First part should be a DID
579 if !strings.HasPrefix(parts[0], "did:") {
580 return fmt.Errorf("repository identifier must be a DID")
581 }
582
583 // Collection and rkey should not be empty
584 if parts[1] == "" || parts[2] == "" {
585 return fmt.Errorf("collection and rkey cannot be empty")
586 }
587
588 return nil
589}
590
591// CommentRecordFromJetstream represents a comment record as received from Jetstream
592// Matches social.coves.feed.comment lexicon
593type CommentRecordFromJetstream struct {
594 Labels interface{} `json:"labels,omitempty"`
595 Embed map[string]interface{} `json:"embed,omitempty"`
596 Reply ReplyRefFromJetstream `json:"reply"`
597 Type string `json:"$type"`
598 Content string `json:"content"`
599 CreatedAt string `json:"createdAt"`
600 Facets []interface{} `json:"facets,omitempty"`
601 Langs []string `json:"langs,omitempty"`
602}
603
604// ReplyRefFromJetstream represents the threading structure
605type ReplyRefFromJetstream struct {
606 Root StrongRefFromJetstream `json:"root"`
607 Parent StrongRefFromJetstream `json:"parent"`
608}
609
610// parseCommentRecord parses a comment record from Jetstream event data
611func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) {
612 // Marshal to JSON and back for proper type conversion
613 recordJSON, err := json.Marshal(record)
614 if err != nil {
615 return nil, fmt.Errorf("failed to marshal record: %w", err)
616 }
617
618 var comment CommentRecordFromJetstream
619 if err := json.Unmarshal(recordJSON, &comment); err != nil {
620 return nil, fmt.Errorf("failed to unmarshal comment record: %w", err)
621 }
622
623 // Validate required fields
624 if comment.Content == "" {
625 return nil, fmt.Errorf("comment record missing content field")
626 }
627
628 if comment.CreatedAt == "" {
629 return nil, fmt.Errorf("comment record missing createdAt field")
630 }
631
632 return &comment, nil
633}
634
635// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings
636// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication)
637func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) {
638 // Serialize facets if present
639 if len(commentRecord.Facets) > 0 {
640 if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil {
641 facetsStr := string(facetsBytes)
642 facetsJSON = &facetsStr
643 }
644 }
645
646 // Serialize embed if present
647 if len(commentRecord.Embed) > 0 {
648 if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil {
649 embedStr := string(embedBytes)
650 embedJSON = &embedStr
651 }
652 }
653
654 // Serialize labels if present
655 if commentRecord.Labels != nil {
656 if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil {
657 labelsStr := string(labelsBytes)
658 labelsJSON = &labelsStr
659 }
660 }
661
662 return facetsJSON, embedJSON, labelsJSON
663}