A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "log"
9 "strings"
10 "time"
11
12 "Coves/internal/atproto/utils"
13 "Coves/internal/core/comments"
14
15 "github.com/lib/pq"
16)
17
18// Constants for comment validation and processing
19const (
20 // CommentCollection is the lexicon collection identifier for comments
21 CommentCollection = "social.coves.community.comment"
22
23 // ATProtoScheme is the URI scheme for atProto AT-URIs
24 ATProtoScheme = "at://"
25
26 // MaxCommentContentBytes is the maximum allowed size for comment content
27 // Per lexicon: max 3000 graphemes, ~30000 bytes
28 MaxCommentContentBytes = 30000
29)
30
31// CommentEventConsumer consumes comment-related events from Jetstream
32// Handles CREATE, UPDATE, and DELETE operations for social.coves.community.comment
33type CommentEventConsumer struct {
34 commentRepo comments.Repository
35 db *sql.DB // Direct DB access for atomic count updates
36}
37
38// NewCommentEventConsumer creates a new Jetstream consumer for comment events
39func NewCommentEventConsumer(
40 commentRepo comments.Repository,
41 db *sql.DB,
42) *CommentEventConsumer {
43 return &CommentEventConsumer{
44 commentRepo: commentRepo,
45 db: db,
46 }
47}
48
49// HandleEvent processes a Jetstream event for comment records
50func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
51 // We only care about commit events for comment records
52 if event.Kind != "commit" || event.Commit == nil {
53 return nil
54 }
55
56 commit := event.Commit
57
58 // Handle comment record operations
59 if commit.Collection == CommentCollection {
60 switch commit.Operation {
61 case "create":
62 return c.createComment(ctx, event.Did, commit)
63 case "update":
64 return c.updateComment(ctx, event.Did, commit)
65 case "delete":
66 return c.deleteComment(ctx, event.Did, commit)
67 }
68 }
69
70 // Silently ignore other operations and collections
71 return nil
72}
73
74// createComment indexes a new comment from the firehose and updates parent counts
75func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
76 if commit.Record == nil {
77 return fmt.Errorf("comment create event missing record data")
78 }
79
80 // Parse the comment record
81 commentRecord, err := parseCommentRecord(commit.Record)
82 if err != nil {
83 return fmt.Errorf("failed to parse comment record: %w", err)
84 }
85
86 // SECURITY: Validate this is a legitimate comment event
87 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
88 log.Printf("🚨 SECURITY: Rejecting comment event: %v", err)
89 return err
90 }
91
92 // Build AT-URI for this comment
93 // Format: at://commenter_did/social.coves.community.comment/rkey
94 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey)
95
96 // Parse timestamp from record
97 createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt)
98 if err != nil {
99 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err)
100 createdAt = time.Now()
101 }
102
103 // Serialize optional JSON fields
104 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
105
106 // Build comment entity
107 comment := &comments.Comment{
108 URI: uri,
109 CID: commit.CID,
110 RKey: commit.RKey,
111 CommenterDID: repoDID, // Comment comes from user's repository
112 RootURI: commentRecord.Reply.Root.URI,
113 RootCID: commentRecord.Reply.Root.CID,
114 ParentURI: commentRecord.Reply.Parent.URI,
115 ParentCID: commentRecord.Reply.Parent.CID,
116 Content: commentRecord.Content,
117 ContentFacets: facetsJSON,
118 Embed: embedJSON,
119 ContentLabels: labelsJSON,
120 Langs: commentRecord.Langs,
121 CreatedAt: createdAt,
122 IndexedAt: time.Now(),
123 }
124
125 // Atomically: Index comment + Update parent counts
126 if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil {
127 return fmt.Errorf("failed to index comment and update counts: %w", err)
128 }
129
130 log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI)
131 return nil
132}
133
134// updateComment updates an existing comment's content fields
135func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
136 if commit.Record == nil {
137 return fmt.Errorf("comment update event missing record data")
138 }
139
140 // Parse the updated comment record
141 commentRecord, err := parseCommentRecord(commit.Record)
142 if err != nil {
143 return fmt.Errorf("failed to parse comment record: %w", err)
144 }
145
146 // SECURITY: Validate this is a legitimate update
147 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
148 log.Printf("🚨 SECURITY: Rejecting comment update: %v", err)
149 return err
150 }
151
152 // Build AT-URI for the comment being updated
153 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey)
154
155 // Fetch existing comment to validate threading references are immutable
156 existingComment, err := c.commentRepo.GetByURI(ctx, uri)
157 if err != nil {
158 if err == comments.ErrCommentNotFound {
159 // Comment doesn't exist yet - might arrive out of order
160 log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri)
161 return nil
162 }
163 return fmt.Errorf("failed to get existing comment for validation: %w", err)
164 }
165
166 // SECURITY: Threading references are IMMUTABLE after creation
167 // Reject updates that attempt to change root/parent (prevents thread hijacking)
168 if existingComment.RootURI != commentRecord.Reply.Root.URI ||
169 existingComment.RootCID != commentRecord.Reply.Root.CID ||
170 existingComment.ParentURI != commentRecord.Reply.Parent.URI ||
171 existingComment.ParentCID != commentRecord.Reply.Parent.CID {
172 log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri)
173 log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID)
174 log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID)
175 log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID)
176 log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID)
177 return fmt.Errorf("comment threading references cannot be changed after creation")
178 }
179
180 // Serialize optional JSON fields
181 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
182
183 // Build comment update entity (preserves vote counts and created_at)
184 comment := &comments.Comment{
185 URI: uri,
186 CID: commit.CID,
187 Content: commentRecord.Content,
188 ContentFacets: facetsJSON,
189 Embed: embedJSON,
190 ContentLabels: labelsJSON,
191 Langs: commentRecord.Langs,
192 }
193
194 // Update the comment in repository
195 if err := c.commentRepo.Update(ctx, comment); err != nil {
196 return fmt.Errorf("failed to update comment: %w", err)
197 }
198
199 log.Printf("✓ Updated comment: %s", uri)
200 return nil
201}
202
203// deleteComment soft-deletes a comment and updates parent counts
204func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
205 // Build AT-URI for the comment being deleted
206 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey)
207
208 // Get existing comment to know its parent (for decrementing the right counter)
209 existingComment, err := c.commentRepo.GetByURI(ctx, uri)
210 if err != nil {
211 if err == comments.ErrCommentNotFound {
212 // Idempotent: Comment already deleted or never existed
213 log.Printf("Comment already deleted or not found: %s", uri)
214 return nil
215 }
216 return fmt.Errorf("failed to get existing comment: %w", err)
217 }
218
219 // Atomically: Soft-delete comment + Update parent counts
220 if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil {
221 return fmt.Errorf("failed to delete comment and update counts: %w", err)
222 }
223
224 log.Printf("✓ Deleted comment: %s", uri)
225 return nil
226}
227
228// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts
229func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
230 tx, err := c.db.BeginTx(ctx, nil)
231 if err != nil {
232 return fmt.Errorf("failed to begin transaction: %w", err)
233 }
234 defer func() {
235 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
236 log.Printf("Failed to rollback transaction: %v", rollbackErr)
237 }
238 }()
239
240 // 1. Check if comment exists and handle resurrection case
241 // In atProto, deleted records' rkeys become available - users can recreate with same rkey
242 // We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts)
243 var existingID int64
244 var existingDeletedAt *time.Time
245 checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1`
246 checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt)
247
248 var commentID int64
249
250 if checkErr == nil {
251 // Comment exists
252 if existingDeletedAt == nil {
253 // Not deleted - this is an idempotent replay, skip gracefully
254 log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI)
255 if commitErr := tx.Commit(); commitErr != nil {
256 return fmt.Errorf("failed to commit transaction: %w", commitErr)
257 }
258 return nil
259 }
260
261 // Comment was soft-deleted, now being recreated (resurrection)
262 // This is a NEW record with same rkey - update ALL fields including threading refs
263 // User may have deleted old comment and created a new one on a different parent/root
264 log.Printf("Resurrecting previously deleted comment: %s", comment.URI)
265 commentID = existingID
266
267 resurrectQuery := `
268 UPDATE comments
269 SET
270 cid = $1,
271 commenter_did = $2,
272 root_uri = $3,
273 root_cid = $4,
274 parent_uri = $5,
275 parent_cid = $6,
276 content = $7,
277 content_facets = $8,
278 embed = $9,
279 content_labels = $10,
280 langs = $11,
281 created_at = $12,
282 indexed_at = $13,
283 deleted_at = NULL,
284 reply_count = 0
285 WHERE id = $14
286 `
287
288 _, err = tx.ExecContext(
289 ctx, resurrectQuery,
290 comment.CID,
291 comment.CommenterDID,
292 comment.RootURI,
293 comment.RootCID,
294 comment.ParentURI,
295 comment.ParentCID,
296 comment.Content,
297 comment.ContentFacets,
298 comment.Embed,
299 comment.ContentLabels,
300 pq.Array(comment.Langs),
301 comment.CreatedAt,
302 time.Now(),
303 commentID,
304 )
305 if err != nil {
306 return fmt.Errorf("failed to resurrect comment: %w", err)
307 }
308
309 } else if checkErr == sql.ErrNoRows {
310 // Comment doesn't exist - insert new comment
311 insertQuery := `
312 INSERT INTO comments (
313 uri, cid, rkey, commenter_did,
314 root_uri, root_cid, parent_uri, parent_cid,
315 content, content_facets, embed, content_labels, langs,
316 created_at, indexed_at
317 ) VALUES (
318 $1, $2, $3, $4,
319 $5, $6, $7, $8,
320 $9, $10, $11, $12, $13,
321 $14, $15
322 )
323 RETURNING id
324 `
325
326 err = tx.QueryRowContext(
327 ctx, insertQuery,
328 comment.URI, comment.CID, comment.RKey, comment.CommenterDID,
329 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID,
330 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
331 comment.CreatedAt, time.Now(),
332 ).Scan(&commentID)
333 if err != nil {
334 return fmt.Errorf("failed to insert comment: %w", err)
335 }
336
337 } else {
338 // Unexpected error checking for existing comment
339 return fmt.Errorf("failed to check for existing comment: %w", checkErr)
340 }
341
342 // 1.5. Reconcile reply_count for this newly inserted comment
343 // In case any replies arrived out-of-order before this parent was indexed
344 reconcileQuery := `
345 UPDATE comments
346 SET reply_count = (
347 SELECT COUNT(*)
348 FROM comments c
349 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
350 )
351 WHERE id = $2
352 `
353 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID)
354 if reconcileErr != nil {
355 log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr)
356 // Continue anyway - this is a best-effort reconciliation
357 }
358
359 // 2. Update parent counts atomically
360 // Parent could be a post (increment comment_count) or a comment (increment reply_count)
361 // Parse collection from parent URI to determine target table
362 //
363 // NOTE: Post comment_count reconciliation IS implemented in post_consumer.go:210-226
364 // When a comment arrives before its parent post, the post update below returns 0 rows
365 // and we log a warning. Later, when the post is indexed, the post consumer reconciles
366 // comment_count by counting all pre-existing comments. This ensures accurate counts
367 // despite out-of-order Jetstream event delivery.
368 //
369 // Test coverage: TestPostConsumer_CommentCountReconciliation in post_consumer_test.go
370 collection := utils.ExtractCollectionFromURI(comment.ParentURI)
371
372 var updateQuery string
373 switch collection {
374 case "social.coves.community.post":
375 // Comment on post - update posts.comment_count
376 updateQuery = `
377 UPDATE posts
378 SET comment_count = comment_count + 1
379 WHERE uri = $1 AND deleted_at IS NULL
380 `
381
382 case "social.coves.community.comment":
383 // Reply to comment - update comments.reply_count
384 updateQuery = `
385 UPDATE comments
386 SET reply_count = reply_count + 1
387 WHERE uri = $1 AND deleted_at IS NULL
388 `
389
390 default:
391 // Unknown or unsupported parent collection
392 // Comment is still indexed, we just don't update parent counts
393 log.Printf("Comment parent has unsupported collection: %s (comment indexed, parent count not updated)", collection)
394 if commitErr := tx.Commit(); commitErr != nil {
395 return fmt.Errorf("failed to commit transaction: %w", commitErr)
396 }
397 return nil
398 }
399
400 result, err := tx.ExecContext(ctx, updateQuery, comment.ParentURI)
401 if err != nil {
402 return fmt.Errorf("failed to update parent count: %w", err)
403 }
404
405 rowsAffected, err := result.RowsAffected()
406 if err != nil {
407 return fmt.Errorf("failed to check update result: %w", err)
408 }
409
410 // If parent not found, that's OK (parent might not be indexed yet)
411 if rowsAffected == 0 {
412 log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI)
413 }
414
415 // Commit transaction
416 if err := tx.Commit(); err != nil {
417 return fmt.Errorf("failed to commit transaction: %w", err)
418 }
419
420 return nil
421}
422
423// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts
424func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
425 tx, err := c.db.BeginTx(ctx, nil)
426 if err != nil {
427 return fmt.Errorf("failed to begin transaction: %w", err)
428 }
429 defer func() {
430 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
431 log.Printf("Failed to rollback transaction: %v", rollbackErr)
432 }
433 }()
434
435 // 1. Soft-delete the comment (idempotent)
436 deleteQuery := `
437 UPDATE comments
438 SET deleted_at = $2
439 WHERE uri = $1 AND deleted_at IS NULL
440 `
441
442 result, err := tx.ExecContext(ctx, deleteQuery, comment.URI, time.Now())
443 if err != nil {
444 return fmt.Errorf("failed to delete comment: %w", err)
445 }
446
447 rowsAffected, err := result.RowsAffected()
448 if err != nil {
449 return fmt.Errorf("failed to check delete result: %w", err)
450 }
451
452 // Idempotent: If no rows affected, comment already deleted
453 if rowsAffected == 0 {
454 log.Printf("Comment already deleted: %s (idempotent)", comment.URI)
455 if commitErr := tx.Commit(); commitErr != nil {
456 return fmt.Errorf("failed to commit transaction: %w", commitErr)
457 }
458 return nil
459 }
460
461 // 2. Decrement parent counts atomically
462 // Parent could be a post or comment - parse collection to determine target table
463 collection := utils.ExtractCollectionFromURI(comment.ParentURI)
464
465 var updateQuery string
466 switch collection {
467 case "social.coves.community.post":
468 // Comment on post - decrement posts.comment_count
469 updateQuery = `
470 UPDATE posts
471 SET comment_count = GREATEST(0, comment_count - 1)
472 WHERE uri = $1 AND deleted_at IS NULL
473 `
474
475 case "social.coves.community.comment":
476 // Reply to comment - decrement comments.reply_count
477 updateQuery = `
478 UPDATE comments
479 SET reply_count = GREATEST(0, reply_count - 1)
480 WHERE uri = $1 AND deleted_at IS NULL
481 `
482
483 default:
484 // Unknown or unsupported parent collection
485 // Comment is still deleted, we just don't update parent counts
486 log.Printf("Comment parent has unsupported collection: %s (comment deleted, parent count not updated)", collection)
487 if commitErr := tx.Commit(); commitErr != nil {
488 return fmt.Errorf("failed to commit transaction: %w", commitErr)
489 }
490 return nil
491 }
492
493 result, err = tx.ExecContext(ctx, updateQuery, comment.ParentURI)
494 if err != nil {
495 return fmt.Errorf("failed to update parent count: %w", err)
496 }
497
498 rowsAffected, err = result.RowsAffected()
499 if err != nil {
500 return fmt.Errorf("failed to check update result: %w", err)
501 }
502
503 // If parent not found, that's OK (parent might be deleted)
504 if rowsAffected == 0 {
505 log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI)
506 }
507
508 // Commit transaction
509 if err := tx.Commit(); err != nil {
510 return fmt.Errorf("failed to commit transaction: %w", err)
511 }
512
513 return nil
514}
515
516// validateCommentEvent performs security validation on comment events
517func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error {
518 // SECURITY: Comments MUST come from user repositories (repo owner = commenter DID)
519 // The repository owner (repoDID) IS the commenter - comments are stored in user repos.
520 //
521 // We do NOT check if the user exists in AppView because:
522 // 1. Comment events may arrive before user events in Jetstream (race condition)
523 // 2. The comment came from the user's PDS repository (authenticated by PDS)
524 // 3. The database FK constraint was removed to allow out-of-order indexing
525 // 4. Orphaned comments (from never-indexed users) are harmless
526 //
527 // Security is maintained because:
528 // - Comment must come from user's own PDS repository (verified by atProto)
529 // - Fake DIDs will fail PDS authentication
530
531 // Validate DID format (basic sanity check)
532 if !strings.HasPrefix(repoDID, "did:") {
533 return fmt.Errorf("invalid commenter DID format: %s", repoDID)
534 }
535
536 // Validate content is not empty (required per lexicon)
537 if comment.Content == "" {
538 return fmt.Errorf("comment content is required")
539 }
540
541 // Validate content length (defensive check - PDS should enforce this)
542 // Per lexicon: max 3000 graphemes, ~30000 bytes
543 // We check bytes as a simple defensive measure
544 if len(comment.Content) > MaxCommentContentBytes {
545 return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content))
546 }
547
548 // Validate reply references exist
549 if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" {
550 return fmt.Errorf("invalid root reference: must have both URI and CID")
551 }
552
553 if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" {
554 return fmt.Errorf("invalid parent reference: must have both URI and CID")
555 }
556
557 // Validate AT-URI structure for root and parent
558 if err := validateATURI(comment.Reply.Root.URI); err != nil {
559 return fmt.Errorf("invalid root URI: %w", err)
560 }
561
562 if err := validateATURI(comment.Reply.Parent.URI); err != nil {
563 return fmt.Errorf("invalid parent URI: %w", err)
564 }
565
566 return nil
567}
568
569// validateATURI performs basic structure validation on AT-URIs
570// Format: at://did:method:id/collection/rkey
571// This is defensive validation - we trust PDS but catch obviously malformed URIs
572func validateATURI(uri string) error {
573 if !strings.HasPrefix(uri, ATProtoScheme) {
574 return fmt.Errorf("must start with %s", ATProtoScheme)
575 }
576
577 // Remove at:// prefix and split by /
578 withoutScheme := strings.TrimPrefix(uri, ATProtoScheme)
579 parts := strings.Split(withoutScheme, "/")
580
581 // Must have at least 3 parts: did, collection, rkey
582 if len(parts) < 3 {
583 return fmt.Errorf("invalid structure (expected at://did/collection/rkey)")
584 }
585
586 // First part should be a DID
587 if !strings.HasPrefix(parts[0], "did:") {
588 return fmt.Errorf("repository identifier must be a DID")
589 }
590
591 // Collection and rkey should not be empty
592 if parts[1] == "" || parts[2] == "" {
593 return fmt.Errorf("collection and rkey cannot be empty")
594 }
595
596 return nil
597}
598
599// CommentRecordFromJetstream represents a comment record as received from Jetstream
600// Matches social.coves.community.comment lexicon
601type CommentRecordFromJetstream struct {
602 Labels interface{} `json:"labels,omitempty"`
603 Embed map[string]interface{} `json:"embed,omitempty"`
604 Reply ReplyRefFromJetstream `json:"reply"`
605 Type string `json:"$type"`
606 Content string `json:"content"`
607 CreatedAt string `json:"createdAt"`
608 Facets []interface{} `json:"facets,omitempty"`
609 Langs []string `json:"langs,omitempty"`
610}
611
612// ReplyRefFromJetstream represents the threading structure
613type ReplyRefFromJetstream struct {
614 Root StrongRefFromJetstream `json:"root"`
615 Parent StrongRefFromJetstream `json:"parent"`
616}
617
618// parseCommentRecord parses a comment record from Jetstream event data
619func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) {
620 // Marshal to JSON and back for proper type conversion
621 recordJSON, err := json.Marshal(record)
622 if err != nil {
623 return nil, fmt.Errorf("failed to marshal record: %w", err)
624 }
625
626 var comment CommentRecordFromJetstream
627 if err := json.Unmarshal(recordJSON, &comment); err != nil {
628 return nil, fmt.Errorf("failed to unmarshal comment record: %w", err)
629 }
630
631 // Validate required fields
632 if comment.Content == "" {
633 return nil, fmt.Errorf("comment record missing content field")
634 }
635
636 if comment.CreatedAt == "" {
637 return nil, fmt.Errorf("comment record missing createdAt field")
638 }
639
640 return &comment, nil
641}
642
643// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings
644// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication)
645func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) {
646 // Serialize facets if present
647 if len(commentRecord.Facets) > 0 {
648 if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil {
649 facetsStr := string(facetsBytes)
650 facetsJSON = &facetsStr
651 }
652 }
653
654 // Serialize embed if present
655 if len(commentRecord.Embed) > 0 {
656 if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil {
657 embedStr := string(embedBytes)
658 embedJSON = &embedStr
659 }
660 }
661
662 // Serialize labels if present
663 if commentRecord.Labels != nil {
664 if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil {
665 labelsStr := string(labelsBytes)
666 labelsJSON = &labelsStr
667 }
668 }
669
670 return facetsJSON, embedJSON, labelsJSON
671}