A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/atproto/utils"
5 "Coves/internal/core/comments"
6 "context"
7 "database/sql"
8 "encoding/json"
9 "fmt"
10 "log"
11 "strings"
12 "time"
13
14 "github.com/lib/pq"
15)
16
17// Constants for comment validation and processing
18const (
19 // CommentCollection is the lexicon collection identifier for comments
20 CommentCollection = "social.coves.community.comment"
21
22 // ATProtoScheme is the URI scheme for atProto AT-URIs
23 ATProtoScheme = "at://"
24
25 // MaxCommentContentBytes is the maximum allowed size for comment content
26 // Per lexicon: max 3000 graphemes, ~30000 bytes
27 MaxCommentContentBytes = 30000
28)
29
30// CommentEventConsumer consumes comment-related events from Jetstream
31// Handles CREATE, UPDATE, and DELETE operations for social.coves.community.comment
32type CommentEventConsumer struct {
33 commentRepo comments.Repository
34 db *sql.DB // Direct DB access for atomic count updates
35}
36
37// NewCommentEventConsumer creates a new Jetstream consumer for comment events
38func NewCommentEventConsumer(
39 commentRepo comments.Repository,
40 db *sql.DB,
41) *CommentEventConsumer {
42 return &CommentEventConsumer{
43 commentRepo: commentRepo,
44 db: db,
45 }
46}
47
48// HandleEvent processes a Jetstream event for comment records
49func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
50 // We only care about commit events for comment records
51 if event.Kind != "commit" || event.Commit == nil {
52 return nil
53 }
54
55 commit := event.Commit
56
57 // Handle comment record operations
58 if commit.Collection == CommentCollection {
59 switch commit.Operation {
60 case "create":
61 return c.createComment(ctx, event.Did, commit)
62 case "update":
63 return c.updateComment(ctx, event.Did, commit)
64 case "delete":
65 return c.deleteComment(ctx, event.Did, commit)
66 }
67 }
68
69 // Silently ignore other operations and collections
70 return nil
71}
72
73// createComment indexes a new comment from the firehose and updates parent counts
74func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
75 if commit.Record == nil {
76 return fmt.Errorf("comment create event missing record data")
77 }
78
79 // Parse the comment record
80 commentRecord, err := parseCommentRecord(commit.Record)
81 if err != nil {
82 return fmt.Errorf("failed to parse comment record: %w", err)
83 }
84
85 // SECURITY: Validate this is a legitimate comment event
86 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
87 log.Printf("🚨 SECURITY: Rejecting comment event: %v", err)
88 return err
89 }
90
91 // Build AT-URI for this comment
92 // Format: at://commenter_did/social.coves.community.comment/rkey
93 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey)
94
95 // Parse timestamp from record
96 createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt)
97 if err != nil {
98 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err)
99 createdAt = time.Now()
100 }
101
102 // Serialize optional JSON fields
103 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
104
105 // Build comment entity
106 comment := &comments.Comment{
107 URI: uri,
108 CID: commit.CID,
109 RKey: commit.RKey,
110 CommenterDID: repoDID, // Comment comes from user's repository
111 RootURI: commentRecord.Reply.Root.URI,
112 RootCID: commentRecord.Reply.Root.CID,
113 ParentURI: commentRecord.Reply.Parent.URI,
114 ParentCID: commentRecord.Reply.Parent.CID,
115 Content: commentRecord.Content,
116 ContentFacets: facetsJSON,
117 Embed: embedJSON,
118 ContentLabels: labelsJSON,
119 Langs: commentRecord.Langs,
120 CreatedAt: createdAt,
121 IndexedAt: time.Now(),
122 }
123
124 // Atomically: Index comment + Update parent counts
125 if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil {
126 return fmt.Errorf("failed to index comment and update counts: %w", err)
127 }
128
129 log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI)
130 return nil
131}
132
133// updateComment updates an existing comment's content fields
134func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
135 if commit.Record == nil {
136 return fmt.Errorf("comment update event missing record data")
137 }
138
139 // Parse the updated comment record
140 commentRecord, err := parseCommentRecord(commit.Record)
141 if err != nil {
142 return fmt.Errorf("failed to parse comment record: %w", err)
143 }
144
145 // SECURITY: Validate this is a legitimate update
146 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
147 log.Printf("🚨 SECURITY: Rejecting comment update: %v", err)
148 return err
149 }
150
151 // Build AT-URI for the comment being updated
152 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey)
153
154 // Fetch existing comment to validate threading references are immutable
155 existingComment, err := c.commentRepo.GetByURI(ctx, uri)
156 if err != nil {
157 if err == comments.ErrCommentNotFound {
158 // Comment doesn't exist yet - might arrive out of order
159 log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri)
160 return nil
161 }
162 return fmt.Errorf("failed to get existing comment for validation: %w", err)
163 }
164
165 // SECURITY: Threading references are IMMUTABLE after creation
166 // Reject updates that attempt to change root/parent (prevents thread hijacking)
167 if existingComment.RootURI != commentRecord.Reply.Root.URI ||
168 existingComment.RootCID != commentRecord.Reply.Root.CID ||
169 existingComment.ParentURI != commentRecord.Reply.Parent.URI ||
170 existingComment.ParentCID != commentRecord.Reply.Parent.CID {
171 log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri)
172 log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID)
173 log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID)
174 log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID)
175 log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID)
176 return fmt.Errorf("comment threading references cannot be changed after creation")
177 }
178
179 // Serialize optional JSON fields
180 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
181
182 // Build comment update entity (preserves vote counts and created_at)
183 comment := &comments.Comment{
184 URI: uri,
185 CID: commit.CID,
186 Content: commentRecord.Content,
187 ContentFacets: facetsJSON,
188 Embed: embedJSON,
189 ContentLabels: labelsJSON,
190 Langs: commentRecord.Langs,
191 }
192
193 // Update the comment in repository
194 if err := c.commentRepo.Update(ctx, comment); err != nil {
195 return fmt.Errorf("failed to update comment: %w", err)
196 }
197
198 log.Printf("✓ Updated comment: %s", uri)
199 return nil
200}
201
202// deleteComment soft-deletes a comment and updates parent counts
203func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
204 // Build AT-URI for the comment being deleted
205 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey)
206
207 // Get existing comment to know its parent (for decrementing the right counter)
208 existingComment, err := c.commentRepo.GetByURI(ctx, uri)
209 if err != nil {
210 if err == comments.ErrCommentNotFound {
211 // Idempotent: Comment already deleted or never existed
212 log.Printf("Comment already deleted or not found: %s", uri)
213 return nil
214 }
215 return fmt.Errorf("failed to get existing comment: %w", err)
216 }
217
218 // Atomically: Soft-delete comment + Update parent counts
219 if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil {
220 return fmt.Errorf("failed to delete comment and update counts: %w", err)
221 }
222
223 log.Printf("✓ Deleted comment: %s", uri)
224 return nil
225}
226
227// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts
228func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
229 tx, err := c.db.BeginTx(ctx, nil)
230 if err != nil {
231 return fmt.Errorf("failed to begin transaction: %w", err)
232 }
233 defer func() {
234 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
235 log.Printf("Failed to rollback transaction: %v", rollbackErr)
236 }
237 }()
238
239 // 1. Check if comment exists and handle resurrection case
240 // In atProto, deleted records' rkeys become available - users can recreate with same rkey
241 // We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts)
242 var existingID int64
243 var existingDeletedAt *time.Time
244 checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1`
245 checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt)
246
247 var commentID int64
248
249 if checkErr == nil {
250 // Comment exists
251 if existingDeletedAt == nil {
252 // Not deleted - this is an idempotent replay, skip gracefully
253 log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI)
254 if commitErr := tx.Commit(); commitErr != nil {
255 return fmt.Errorf("failed to commit transaction: %w", commitErr)
256 }
257 return nil
258 }
259
260 // Comment was soft-deleted, now being recreated (resurrection)
261 // This is a NEW record with same rkey - update ALL fields including threading refs
262 // User may have deleted old comment and created a new one on a different parent/root
263 log.Printf("Resurrecting previously deleted comment: %s", comment.URI)
264 commentID = existingID
265
266 resurrectQuery := `
267 UPDATE comments
268 SET
269 cid = $1,
270 commenter_did = $2,
271 root_uri = $3,
272 root_cid = $4,
273 parent_uri = $5,
274 parent_cid = $6,
275 content = $7,
276 content_facets = $8,
277 embed = $9,
278 content_labels = $10,
279 langs = $11,
280 created_at = $12,
281 indexed_at = $13,
282 deleted_at = NULL,
283 reply_count = 0
284 WHERE id = $14
285 `
286
287 _, err = tx.ExecContext(
288 ctx, resurrectQuery,
289 comment.CID,
290 comment.CommenterDID,
291 comment.RootURI,
292 comment.RootCID,
293 comment.ParentURI,
294 comment.ParentCID,
295 comment.Content,
296 comment.ContentFacets,
297 comment.Embed,
298 comment.ContentLabels,
299 pq.Array(comment.Langs),
300 comment.CreatedAt,
301 time.Now(),
302 commentID,
303 )
304 if err != nil {
305 return fmt.Errorf("failed to resurrect comment: %w", err)
306 }
307
308 } else if checkErr == sql.ErrNoRows {
309 // Comment doesn't exist - insert new comment
310 insertQuery := `
311 INSERT INTO comments (
312 uri, cid, rkey, commenter_did,
313 root_uri, root_cid, parent_uri, parent_cid,
314 content, content_facets, embed, content_labels, langs,
315 created_at, indexed_at
316 ) VALUES (
317 $1, $2, $3, $4,
318 $5, $6, $7, $8,
319 $9, $10, $11, $12, $13,
320 $14, $15
321 )
322 RETURNING id
323 `
324
325 err = tx.QueryRowContext(
326 ctx, insertQuery,
327 comment.URI, comment.CID, comment.RKey, comment.CommenterDID,
328 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID,
329 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
330 comment.CreatedAt, time.Now(),
331 ).Scan(&commentID)
332 if err != nil {
333 return fmt.Errorf("failed to insert comment: %w", err)
334 }
335
336 } else {
337 // Unexpected error checking for existing comment
338 return fmt.Errorf("failed to check for existing comment: %w", checkErr)
339 }
340
341 // 1.5. Reconcile reply_count for this newly inserted comment
342 // In case any replies arrived out-of-order before this parent was indexed
343 reconcileQuery := `
344 UPDATE comments
345 SET reply_count = (
346 SELECT COUNT(*)
347 FROM comments c
348 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
349 )
350 WHERE id = $2
351 `
352 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID)
353 if reconcileErr != nil {
354 log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr)
355 // Continue anyway - this is a best-effort reconciliation
356 }
357
358 // 2. Update parent counts atomically
359 // Parent could be a post (increment comment_count) or a comment (increment reply_count)
360 // Parse collection from parent URI to determine target table
361 //
362 // NOTE: Post comment_count reconciliation IS implemented in post_consumer.go:210-226
363 // When a comment arrives before its parent post, the post update below returns 0 rows
364 // and we log a warning. Later, when the post is indexed, the post consumer reconciles
365 // comment_count by counting all pre-existing comments. This ensures accurate counts
366 // despite out-of-order Jetstream event delivery.
367 //
368 // Test coverage: TestPostConsumer_CommentCountReconciliation in post_consumer_test.go
369 collection := utils.ExtractCollectionFromURI(comment.ParentURI)
370
371 var updateQuery string
372 switch collection {
373 case "social.coves.community.post":
374 // Comment on post - update posts.comment_count
375 updateQuery = `
376 UPDATE posts
377 SET comment_count = comment_count + 1
378 WHERE uri = $1 AND deleted_at IS NULL
379 `
380
381 case "social.coves.community.comment":
382 // Reply to comment - update comments.reply_count
383 updateQuery = `
384 UPDATE comments
385 SET reply_count = reply_count + 1
386 WHERE uri = $1 AND deleted_at IS NULL
387 `
388
389 default:
390 // Unknown or unsupported parent collection
391 // Comment is still indexed, we just don't update parent counts
392 log.Printf("Comment parent has unsupported collection: %s (comment indexed, parent count not updated)", collection)
393 if commitErr := tx.Commit(); commitErr != nil {
394 return fmt.Errorf("failed to commit transaction: %w", commitErr)
395 }
396 return nil
397 }
398
399 result, err := tx.ExecContext(ctx, updateQuery, comment.ParentURI)
400 if err != nil {
401 return fmt.Errorf("failed to update parent count: %w", err)
402 }
403
404 rowsAffected, err := result.RowsAffected()
405 if err != nil {
406 return fmt.Errorf("failed to check update result: %w", err)
407 }
408
409 // If parent not found, that's OK (parent might not be indexed yet)
410 if rowsAffected == 0 {
411 log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI)
412 }
413
414 // Commit transaction
415 if err := tx.Commit(); err != nil {
416 return fmt.Errorf("failed to commit transaction: %w", err)
417 }
418
419 return nil
420}
421
422// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts
423func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
424 tx, err := c.db.BeginTx(ctx, nil)
425 if err != nil {
426 return fmt.Errorf("failed to begin transaction: %w", err)
427 }
428 defer func() {
429 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
430 log.Printf("Failed to rollback transaction: %v", rollbackErr)
431 }
432 }()
433
434 // 1. Soft-delete the comment (idempotent)
435 deleteQuery := `
436 UPDATE comments
437 SET deleted_at = $2
438 WHERE uri = $1 AND deleted_at IS NULL
439 `
440
441 result, err := tx.ExecContext(ctx, deleteQuery, comment.URI, time.Now())
442 if err != nil {
443 return fmt.Errorf("failed to delete comment: %w", err)
444 }
445
446 rowsAffected, err := result.RowsAffected()
447 if err != nil {
448 return fmt.Errorf("failed to check delete result: %w", err)
449 }
450
451 // Idempotent: If no rows affected, comment already deleted
452 if rowsAffected == 0 {
453 log.Printf("Comment already deleted: %s (idempotent)", comment.URI)
454 if commitErr := tx.Commit(); commitErr != nil {
455 return fmt.Errorf("failed to commit transaction: %w", commitErr)
456 }
457 return nil
458 }
459
460 // 2. Decrement parent counts atomically
461 // Parent could be a post or comment - parse collection to determine target table
462 collection := utils.ExtractCollectionFromURI(comment.ParentURI)
463
464 var updateQuery string
465 switch collection {
466 case "social.coves.community.post":
467 // Comment on post - decrement posts.comment_count
468 updateQuery = `
469 UPDATE posts
470 SET comment_count = GREATEST(0, comment_count - 1)
471 WHERE uri = $1 AND deleted_at IS NULL
472 `
473
474 case "social.coves.community.comment":
475 // Reply to comment - decrement comments.reply_count
476 updateQuery = `
477 UPDATE comments
478 SET reply_count = GREATEST(0, reply_count - 1)
479 WHERE uri = $1 AND deleted_at IS NULL
480 `
481
482 default:
483 // Unknown or unsupported parent collection
484 // Comment is still deleted, we just don't update parent counts
485 log.Printf("Comment parent has unsupported collection: %s (comment deleted, parent count not updated)", collection)
486 if commitErr := tx.Commit(); commitErr != nil {
487 return fmt.Errorf("failed to commit transaction: %w", commitErr)
488 }
489 return nil
490 }
491
492 result, err = tx.ExecContext(ctx, updateQuery, comment.ParentURI)
493 if err != nil {
494 return fmt.Errorf("failed to update parent count: %w", err)
495 }
496
497 rowsAffected, err = result.RowsAffected()
498 if err != nil {
499 return fmt.Errorf("failed to check update result: %w", err)
500 }
501
502 // If parent not found, that's OK (parent might be deleted)
503 if rowsAffected == 0 {
504 log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI)
505 }
506
507 // Commit transaction
508 if err := tx.Commit(); err != nil {
509 return fmt.Errorf("failed to commit transaction: %w", err)
510 }
511
512 return nil
513}
514
515// validateCommentEvent performs security validation on comment events
516func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error {
517 // SECURITY: Comments MUST come from user repositories (repo owner = commenter DID)
518 // The repository owner (repoDID) IS the commenter - comments are stored in user repos.
519 //
520 // We do NOT check if the user exists in AppView because:
521 // 1. Comment events may arrive before user events in Jetstream (race condition)
522 // 2. The comment came from the user's PDS repository (authenticated by PDS)
523 // 3. The database FK constraint was removed to allow out-of-order indexing
524 // 4. Orphaned comments (from never-indexed users) are harmless
525 //
526 // Security is maintained because:
527 // - Comment must come from user's own PDS repository (verified by atProto)
528 // - Fake DIDs will fail PDS authentication
529
530 // Validate DID format (basic sanity check)
531 if !strings.HasPrefix(repoDID, "did:") {
532 return fmt.Errorf("invalid commenter DID format: %s", repoDID)
533 }
534
535 // Validate content is not empty (required per lexicon)
536 if comment.Content == "" {
537 return fmt.Errorf("comment content is required")
538 }
539
540 // Validate content length (defensive check - PDS should enforce this)
541 // Per lexicon: max 3000 graphemes, ~30000 bytes
542 // We check bytes as a simple defensive measure
543 if len(comment.Content) > MaxCommentContentBytes {
544 return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content))
545 }
546
547 // Validate reply references exist
548 if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" {
549 return fmt.Errorf("invalid root reference: must have both URI and CID")
550 }
551
552 if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" {
553 return fmt.Errorf("invalid parent reference: must have both URI and CID")
554 }
555
556 // Validate AT-URI structure for root and parent
557 if err := validateATURI(comment.Reply.Root.URI); err != nil {
558 return fmt.Errorf("invalid root URI: %w", err)
559 }
560
561 if err := validateATURI(comment.Reply.Parent.URI); err != nil {
562 return fmt.Errorf("invalid parent URI: %w", err)
563 }
564
565 return nil
566}
567
568// validateATURI performs basic structure validation on AT-URIs
569// Format: at://did:method:id/collection/rkey
570// This is defensive validation - we trust PDS but catch obviously malformed URIs
571func validateATURI(uri string) error {
572 if !strings.HasPrefix(uri, ATProtoScheme) {
573 return fmt.Errorf("must start with %s", ATProtoScheme)
574 }
575
576 // Remove at:// prefix and split by /
577 withoutScheme := strings.TrimPrefix(uri, ATProtoScheme)
578 parts := strings.Split(withoutScheme, "/")
579
580 // Must have at least 3 parts: did, collection, rkey
581 if len(parts) < 3 {
582 return fmt.Errorf("invalid structure (expected at://did/collection/rkey)")
583 }
584
585 // First part should be a DID
586 if !strings.HasPrefix(parts[0], "did:") {
587 return fmt.Errorf("repository identifier must be a DID")
588 }
589
590 // Collection and rkey should not be empty
591 if parts[1] == "" || parts[2] == "" {
592 return fmt.Errorf("collection and rkey cannot be empty")
593 }
594
595 return nil
596}
597
598// CommentRecordFromJetstream represents a comment record as received from Jetstream
599// Matches social.coves.community.comment lexicon
600type CommentRecordFromJetstream struct {
601 Labels interface{} `json:"labels,omitempty"`
602 Embed map[string]interface{} `json:"embed,omitempty"`
603 Reply ReplyRefFromJetstream `json:"reply"`
604 Type string `json:"$type"`
605 Content string `json:"content"`
606 CreatedAt string `json:"createdAt"`
607 Facets []interface{} `json:"facets,omitempty"`
608 Langs []string `json:"langs,omitempty"`
609}
610
611// ReplyRefFromJetstream represents the threading structure
612type ReplyRefFromJetstream struct {
613 Root StrongRefFromJetstream `json:"root"`
614 Parent StrongRefFromJetstream `json:"parent"`
615}
616
617// parseCommentRecord parses a comment record from Jetstream event data
618func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) {
619 // Marshal to JSON and back for proper type conversion
620 recordJSON, err := json.Marshal(record)
621 if err != nil {
622 return nil, fmt.Errorf("failed to marshal record: %w", err)
623 }
624
625 var comment CommentRecordFromJetstream
626 if err := json.Unmarshal(recordJSON, &comment); err != nil {
627 return nil, fmt.Errorf("failed to unmarshal comment record: %w", err)
628 }
629
630 // Validate required fields
631 if comment.Content == "" {
632 return nil, fmt.Errorf("comment record missing content field")
633 }
634
635 if comment.CreatedAt == "" {
636 return nil, fmt.Errorf("comment record missing createdAt field")
637 }
638
639 return &comment, nil
640}
641
642// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings
643// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication)
644func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) {
645 // Serialize facets if present
646 if len(commentRecord.Facets) > 0 {
647 if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil {
648 facetsStr := string(facetsBytes)
649 facetsJSON = &facetsStr
650 }
651 }
652
653 // Serialize embed if present
654 if len(commentRecord.Embed) > 0 {
655 if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil {
656 embedStr := string(embedBytes)
657 embedJSON = &embedStr
658 }
659 }
660
661 // Serialize labels if present
662 if commentRecord.Labels != nil {
663 if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil {
664 labelsStr := string(labelsBytes)
665 labelsJSON = &labelsStr
666 }
667 }
668
669 return facetsJSON, embedJSON, labelsJSON
670}