A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/core/comments"
5 "context"
6 "database/sql"
7 "encoding/json"
8 "fmt"
9 "log"
10 "strings"
11 "time"
12
13 "github.com/lib/pq"
14)
15
16// Constants for comment validation and processing
17const (
18 // CommentCollection is the lexicon collection identifier for comments
19 CommentCollection = "social.coves.feed.comment"
20
21 // ATProtoScheme is the URI scheme for atProto AT-URIs
22 ATProtoScheme = "at://"
23
24 // MaxCommentContentBytes is the maximum allowed size for comment content
25 // Per lexicon: max 3000 graphemes, ~30000 bytes
26 MaxCommentContentBytes = 30000
27)
28
29// CommentEventConsumer consumes comment-related events from Jetstream
30// Handles CREATE, UPDATE, and DELETE operations for social.coves.feed.comment
31type CommentEventConsumer struct {
32 commentRepo comments.Repository
33 db *sql.DB // Direct DB access for atomic count updates
34}
35
36// NewCommentEventConsumer creates a new Jetstream consumer for comment events
37func NewCommentEventConsumer(
38 commentRepo comments.Repository,
39 db *sql.DB,
40) *CommentEventConsumer {
41 return &CommentEventConsumer{
42 commentRepo: commentRepo,
43 db: db,
44 }
45}
46
47// HandleEvent processes a Jetstream event for comment records
48func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
49 // We only care about commit events for comment records
50 if event.Kind != "commit" || event.Commit == nil {
51 return nil
52 }
53
54 commit := event.Commit
55
56 // Handle comment record operations
57 if commit.Collection == CommentCollection {
58 switch commit.Operation {
59 case "create":
60 return c.createComment(ctx, event.Did, commit)
61 case "update":
62 return c.updateComment(ctx, event.Did, commit)
63 case "delete":
64 return c.deleteComment(ctx, event.Did, commit)
65 }
66 }
67
68 // Silently ignore other operations and collections
69 return nil
70}
71
72// createComment indexes a new comment from the firehose and updates parent counts
73func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
74 if commit.Record == nil {
75 return fmt.Errorf("comment create event missing record data")
76 }
77
78 // Parse the comment record
79 commentRecord, err := parseCommentRecord(commit.Record)
80 if err != nil {
81 return fmt.Errorf("failed to parse comment record: %w", err)
82 }
83
84 // SECURITY: Validate this is a legitimate comment event
85 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
86 log.Printf("🚨 SECURITY: Rejecting comment event: %v", err)
87 return err
88 }
89
90 // Build AT-URI for this comment
91 // Format: at://commenter_did/social.coves.feed.comment/rkey
92 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey)
93
94 // Parse timestamp from record
95 createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt)
96 if err != nil {
97 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err)
98 createdAt = time.Now()
99 }
100
101 // Serialize optional JSON fields
102 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
103
104 // Build comment entity
105 comment := &comments.Comment{
106 URI: uri,
107 CID: commit.CID,
108 RKey: commit.RKey,
109 CommenterDID: repoDID, // Comment comes from user's repository
110 RootURI: commentRecord.Reply.Root.URI,
111 RootCID: commentRecord.Reply.Root.CID,
112 ParentURI: commentRecord.Reply.Parent.URI,
113 ParentCID: commentRecord.Reply.Parent.CID,
114 Content: commentRecord.Content,
115 ContentFacets: facetsJSON,
116 Embed: embedJSON,
117 ContentLabels: labelsJSON,
118 Langs: commentRecord.Langs,
119 CreatedAt: createdAt,
120 IndexedAt: time.Now(),
121 }
122
123 // Atomically: Index comment + Update parent counts
124 if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil {
125 return fmt.Errorf("failed to index comment and update counts: %w", err)
126 }
127
128 log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI)
129 return nil
130}
131
132// updateComment updates an existing comment's content fields
133func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
134 if commit.Record == nil {
135 return fmt.Errorf("comment update event missing record data")
136 }
137
138 // Parse the updated comment record
139 commentRecord, err := parseCommentRecord(commit.Record)
140 if err != nil {
141 return fmt.Errorf("failed to parse comment record: %w", err)
142 }
143
144 // SECURITY: Validate this is a legitimate update
145 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
146 log.Printf("🚨 SECURITY: Rejecting comment update: %v", err)
147 return err
148 }
149
150 // Build AT-URI for the comment being updated
151 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey)
152
153 // Fetch existing comment to validate threading references are immutable
154 existingComment, err := c.commentRepo.GetByURI(ctx, uri)
155 if err != nil {
156 if err == comments.ErrCommentNotFound {
157 // Comment doesn't exist yet - might arrive out of order
158 log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri)
159 return nil
160 }
161 return fmt.Errorf("failed to get existing comment for validation: %w", err)
162 }
163
164 // SECURITY: Threading references are IMMUTABLE after creation
165 // Reject updates that attempt to change root/parent (prevents thread hijacking)
166 if existingComment.RootURI != commentRecord.Reply.Root.URI ||
167 existingComment.RootCID != commentRecord.Reply.Root.CID ||
168 existingComment.ParentURI != commentRecord.Reply.Parent.URI ||
169 existingComment.ParentCID != commentRecord.Reply.Parent.CID {
170 log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri)
171 log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID)
172 log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID)
173 log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID)
174 log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID)
175 return fmt.Errorf("comment threading references cannot be changed after creation")
176 }
177
178 // Serialize optional JSON fields
179 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
180
181 // Build comment update entity (preserves vote counts and created_at)
182 comment := &comments.Comment{
183 URI: uri,
184 CID: commit.CID,
185 Content: commentRecord.Content,
186 ContentFacets: facetsJSON,
187 Embed: embedJSON,
188 ContentLabels: labelsJSON,
189 Langs: commentRecord.Langs,
190 }
191
192 // Update the comment in repository
193 if err := c.commentRepo.Update(ctx, comment); err != nil {
194 return fmt.Errorf("failed to update comment: %w", err)
195 }
196
197 log.Printf("✓ Updated comment: %s", uri)
198 return nil
199}
200
201// deleteComment soft-deletes a comment and updates parent counts
202func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
203 // Build AT-URI for the comment being deleted
204 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey)
205
206 // Get existing comment to know its parent (for decrementing the right counter)
207 existingComment, err := c.commentRepo.GetByURI(ctx, uri)
208 if err != nil {
209 if err == comments.ErrCommentNotFound {
210 // Idempotent: Comment already deleted or never existed
211 log.Printf("Comment already deleted or not found: %s", uri)
212 return nil
213 }
214 return fmt.Errorf("failed to get existing comment: %w", err)
215 }
216
217 // Atomically: Soft-delete comment + Update parent counts
218 if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil {
219 return fmt.Errorf("failed to delete comment and update counts: %w", err)
220 }
221
222 log.Printf("✓ Deleted comment: %s", uri)
223 return nil
224}
225
226// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts
227func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
228 tx, err := c.db.BeginTx(ctx, nil)
229 if err != nil {
230 return fmt.Errorf("failed to begin transaction: %w", err)
231 }
232 defer func() {
233 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
234 log.Printf("Failed to rollback transaction: %v", rollbackErr)
235 }
236 }()
237
238 // 1. Check if comment exists and handle resurrection case
239 // In atProto, deleted records' rkeys become available - users can recreate with same rkey
240 // We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts)
241 var existingID int64
242 var existingDeletedAt *time.Time
243 checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1`
244 checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt)
245
246 var commentID int64
247
248 if checkErr == nil {
249 // Comment exists
250 if existingDeletedAt == nil {
251 // Not deleted - this is an idempotent replay, skip gracefully
252 log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI)
253 if commitErr := tx.Commit(); commitErr != nil {
254 return fmt.Errorf("failed to commit transaction: %w", commitErr)
255 }
256 return nil
257 }
258
259 // Comment was soft-deleted, now being recreated (resurrection)
260 // This is a NEW record with same rkey - update ALL fields including threading refs
261 // User may have deleted old comment and created a new one on a different parent/root
262 log.Printf("Resurrecting previously deleted comment: %s", comment.URI)
263 commentID = existingID
264
265 resurrectQuery := `
266 UPDATE comments
267 SET
268 cid = $1,
269 commenter_did = $2,
270 root_uri = $3,
271 root_cid = $4,
272 parent_uri = $5,
273 parent_cid = $6,
274 content = $7,
275 content_facets = $8,
276 embed = $9,
277 content_labels = $10,
278 langs = $11,
279 created_at = $12,
280 indexed_at = $13,
281 deleted_at = NULL,
282 reply_count = 0
283 WHERE id = $14
284 `
285
286 _, err = tx.ExecContext(
287 ctx, resurrectQuery,
288 comment.CID,
289 comment.CommenterDID,
290 comment.RootURI,
291 comment.RootCID,
292 comment.ParentURI,
293 comment.ParentCID,
294 comment.Content,
295 comment.ContentFacets,
296 comment.Embed,
297 comment.ContentLabels,
298 pq.Array(comment.Langs),
299 comment.CreatedAt,
300 time.Now(),
301 commentID,
302 )
303
304 if err != nil {
305 return fmt.Errorf("failed to resurrect comment: %w", err)
306 }
307
308 } else if checkErr == sql.ErrNoRows {
309 // Comment doesn't exist - insert new comment
310 insertQuery := `
311 INSERT INTO comments (
312 uri, cid, rkey, commenter_did,
313 root_uri, root_cid, parent_uri, parent_cid,
314 content, content_facets, embed, content_labels, langs,
315 created_at, indexed_at
316 ) VALUES (
317 $1, $2, $3, $4,
318 $5, $6, $7, $8,
319 $9, $10, $11, $12, $13,
320 $14, $15
321 )
322 RETURNING id
323 `
324
325 err = tx.QueryRowContext(
326 ctx, insertQuery,
327 comment.URI, comment.CID, comment.RKey, comment.CommenterDID,
328 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID,
329 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
330 comment.CreatedAt, time.Now(),
331 ).Scan(&commentID)
332
333 if err != nil {
334 return fmt.Errorf("failed to insert comment: %w", err)
335 }
336
337 } else {
338 // Unexpected error checking for existing comment
339 return fmt.Errorf("failed to check for existing comment: %w", checkErr)
340 }
341
342 // 1.5. Reconcile reply_count for this newly inserted comment
343 // In case any replies arrived out-of-order before this parent was indexed
344 reconcileQuery := `
345 UPDATE comments
346 SET reply_count = (
347 SELECT COUNT(*)
348 FROM comments c
349 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
350 )
351 WHERE id = $2
352 `
353 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID)
354 if reconcileErr != nil {
355 log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr)
356 // Continue anyway - this is a best-effort reconciliation
357 }
358
359 // 2. Update parent counts atomically
360 // Parent could be a post (increment comment_count) or a comment (increment reply_count)
361 // Try posts table first
362 //
363 // FIXME(P1): Post comment_count reconciliation not implemented
364 // When a comment arrives before its parent post (common with cross-repo Jetstream ordering),
365 // the post update below returns 0 rows and we only log a warning. Later, when the post
366 // is indexed by the post consumer, there's NO reconciliation logic to count pre-existing
367 // comments. This causes posts to have permanently stale comment_count values.
368 //
369 // FIX REQUIRED: Post consumer MUST implement the same reconciliation pattern as comments
370 // (see lines 292-305 above). When indexing a new post, count any comments where parent_uri
371 // matches the post URI and set comment_count accordingly.
372 //
373 // Test demonstrating issue: TestCommentConsumer_PostCountReconciliation_Limitation
374 updatePostQuery := `
375 UPDATE posts
376 SET comment_count = comment_count + 1
377 WHERE uri = $1 AND deleted_at IS NULL
378 `
379
380 result, err := tx.ExecContext(ctx, updatePostQuery, comment.ParentURI)
381 if err != nil {
382 return fmt.Errorf("failed to update post comment count: %w", err)
383 }
384
385 rowsAffected, err := result.RowsAffected()
386 if err != nil {
387 return fmt.Errorf("failed to check update result: %w", err)
388 }
389
390 // If no post was updated, parent is probably a comment
391 if rowsAffected == 0 {
392 updateCommentQuery := `
393 UPDATE comments
394 SET reply_count = reply_count + 1
395 WHERE uri = $1 AND deleted_at IS NULL
396 `
397
398 result, err := tx.ExecContext(ctx, updateCommentQuery, comment.ParentURI)
399 if err != nil {
400 return fmt.Errorf("failed to update comment reply count: %w", err)
401 }
402
403 rowsAffected, err := result.RowsAffected()
404 if err != nil {
405 return fmt.Errorf("failed to check update result: %w", err)
406 }
407
408 // If neither post nor comment was found, that's OK (parent might not be indexed yet)
409 if rowsAffected == 0 {
410 log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI)
411 }
412 }
413
414 // Commit transaction
415 if err := tx.Commit(); err != nil {
416 return fmt.Errorf("failed to commit transaction: %w", err)
417 }
418
419 return nil
420}
421
422// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts
423func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
424 tx, err := c.db.BeginTx(ctx, nil)
425 if err != nil {
426 return fmt.Errorf("failed to begin transaction: %w", err)
427 }
428 defer func() {
429 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
430 log.Printf("Failed to rollback transaction: %v", rollbackErr)
431 }
432 }()
433
434 // 1. Soft-delete the comment (idempotent)
435 deleteQuery := `
436 UPDATE comments
437 SET deleted_at = $2
438 WHERE uri = $1 AND deleted_at IS NULL
439 `
440
441 result, err := tx.ExecContext(ctx, deleteQuery, comment.URI, time.Now())
442 if err != nil {
443 return fmt.Errorf("failed to delete comment: %w", err)
444 }
445
446 rowsAffected, err := result.RowsAffected()
447 if err != nil {
448 return fmt.Errorf("failed to check delete result: %w", err)
449 }
450
451 // Idempotent: If no rows affected, comment already deleted
452 if rowsAffected == 0 {
453 log.Printf("Comment already deleted: %s (idempotent)", comment.URI)
454 if commitErr := tx.Commit(); commitErr != nil {
455 return fmt.Errorf("failed to commit transaction: %w", commitErr)
456 }
457 return nil
458 }
459
460 // 2. Decrement parent counts atomically
461 // Parent could be a post or comment - try both (use GREATEST to prevent negative)
462 updatePostQuery := `
463 UPDATE posts
464 SET comment_count = GREATEST(0, comment_count - 1)
465 WHERE uri = $1 AND deleted_at IS NULL
466 `
467
468 result, err = tx.ExecContext(ctx, updatePostQuery, comment.ParentURI)
469 if err != nil {
470 return fmt.Errorf("failed to update post comment count: %w", err)
471 }
472
473 rowsAffected, err = result.RowsAffected()
474 if err != nil {
475 return fmt.Errorf("failed to check update result: %w", err)
476 }
477
478 // If no post was updated, parent is probably a comment
479 if rowsAffected == 0 {
480 updateCommentQuery := `
481 UPDATE comments
482 SET reply_count = GREATEST(0, reply_count - 1)
483 WHERE uri = $1 AND deleted_at IS NULL
484 `
485
486 result, err := tx.ExecContext(ctx, updateCommentQuery, comment.ParentURI)
487 if err != nil {
488 return fmt.Errorf("failed to update comment reply count: %w", err)
489 }
490
491 rowsAffected, err := result.RowsAffected()
492 if err != nil {
493 return fmt.Errorf("failed to check update result: %w", err)
494 }
495
496 // If neither was found, that's OK (parent might be deleted)
497 if rowsAffected == 0 {
498 log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI)
499 }
500 }
501
502 // Commit transaction
503 if err := tx.Commit(); err != nil {
504 return fmt.Errorf("failed to commit transaction: %w", err)
505 }
506
507 return nil
508}
509
510// validateCommentEvent performs security validation on comment events
511func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error {
512 // SECURITY: Comments MUST come from user repositories (repo owner = commenter DID)
513 // The repository owner (repoDID) IS the commenter - comments are stored in user repos.
514 //
515 // We do NOT check if the user exists in AppView because:
516 // 1. Comment events may arrive before user events in Jetstream (race condition)
517 // 2. The comment came from the user's PDS repository (authenticated by PDS)
518 // 3. The database FK constraint was removed to allow out-of-order indexing
519 // 4. Orphaned comments (from never-indexed users) are harmless
520 //
521 // Security is maintained because:
522 // - Comment must come from user's own PDS repository (verified by atProto)
523 // - Fake DIDs will fail PDS authentication
524
525 // Validate DID format (basic sanity check)
526 if !strings.HasPrefix(repoDID, "did:") {
527 return fmt.Errorf("invalid commenter DID format: %s", repoDID)
528 }
529
530 // Validate content is not empty (required per lexicon)
531 if comment.Content == "" {
532 return fmt.Errorf("comment content is required")
533 }
534
535 // Validate content length (defensive check - PDS should enforce this)
536 // Per lexicon: max 3000 graphemes, ~30000 bytes
537 // We check bytes as a simple defensive measure
538 if len(comment.Content) > MaxCommentContentBytes {
539 return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content))
540 }
541
542 // Validate reply references exist
543 if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" {
544 return fmt.Errorf("invalid root reference: must have both URI and CID")
545 }
546
547 if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" {
548 return fmt.Errorf("invalid parent reference: must have both URI and CID")
549 }
550
551 // Validate AT-URI structure for root and parent
552 if err := validateATURI(comment.Reply.Root.URI); err != nil {
553 return fmt.Errorf("invalid root URI: %w", err)
554 }
555
556 if err := validateATURI(comment.Reply.Parent.URI); err != nil {
557 return fmt.Errorf("invalid parent URI: %w", err)
558 }
559
560 return nil
561}
562
563// validateATURI performs basic structure validation on AT-URIs
564// Format: at://did:method:id/collection/rkey
565// This is defensive validation - we trust PDS but catch obviously malformed URIs
566func validateATURI(uri string) error {
567 if !strings.HasPrefix(uri, ATProtoScheme) {
568 return fmt.Errorf("must start with %s", ATProtoScheme)
569 }
570
571 // Remove at:// prefix and split by /
572 withoutScheme := strings.TrimPrefix(uri, ATProtoScheme)
573 parts := strings.Split(withoutScheme, "/")
574
575 // Must have at least 3 parts: did, collection, rkey
576 if len(parts) < 3 {
577 return fmt.Errorf("invalid structure (expected at://did/collection/rkey)")
578 }
579
580 // First part should be a DID
581 if !strings.HasPrefix(parts[0], "did:") {
582 return fmt.Errorf("repository identifier must be a DID")
583 }
584
585 // Collection and rkey should not be empty
586 if parts[1] == "" || parts[2] == "" {
587 return fmt.Errorf("collection and rkey cannot be empty")
588 }
589
590 return nil
591}
592
593// CommentRecordFromJetstream represents a comment record as received from Jetstream
594// Matches social.coves.feed.comment lexicon
595type CommentRecordFromJetstream struct {
596 Type string `json:"$type"`
597 Reply ReplyRefFromJetstream `json:"reply"`
598 Content string `json:"content"`
599 Facets []interface{} `json:"facets,omitempty"`
600 Embed map[string]interface{} `json:"embed,omitempty"`
601 Langs []string `json:"langs,omitempty"`
602 Labels interface{} `json:"labels,omitempty"`
603 CreatedAt string `json:"createdAt"`
604}
605
606// ReplyRefFromJetstream represents the threading structure
607type ReplyRefFromJetstream struct {
608 Root StrongRefFromJetstream `json:"root"`
609 Parent StrongRefFromJetstream `json:"parent"`
610}
611
612// parseCommentRecord parses a comment record from Jetstream event data
613func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) {
614 // Marshal to JSON and back for proper type conversion
615 recordJSON, err := json.Marshal(record)
616 if err != nil {
617 return nil, fmt.Errorf("failed to marshal record: %w", err)
618 }
619
620 var comment CommentRecordFromJetstream
621 if err := json.Unmarshal(recordJSON, &comment); err != nil {
622 return nil, fmt.Errorf("failed to unmarshal comment record: %w", err)
623 }
624
625 // Validate required fields
626 if comment.Content == "" {
627 return nil, fmt.Errorf("comment record missing content field")
628 }
629
630 if comment.CreatedAt == "" {
631 return nil, fmt.Errorf("comment record missing createdAt field")
632 }
633
634 return &comment, nil
635}
636
637// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings
638// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication)
639func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) {
640 // Serialize facets if present
641 if commentRecord.Facets != nil && len(commentRecord.Facets) > 0 {
642 if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil {
643 facetsStr := string(facetsBytes)
644 facetsJSON = &facetsStr
645 }
646 }
647
648 // Serialize embed if present
649 if commentRecord.Embed != nil && len(commentRecord.Embed) > 0 {
650 if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil {
651 embedStr := string(embedBytes)
652 embedJSON = &embedStr
653 }
654 }
655
656 // Serialize labels if present
657 if commentRecord.Labels != nil {
658 if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil {
659 labelsStr := string(labelsBytes)
660 labelsJSON = &labelsStr
661 }
662 }
663
664 return facetsJSON, embedJSON, labelsJSON
665}